monitoring
273 TopicsDrasi is Fluent in GQL: Integrating the New Graph Query Standard
Drasi , the open-source Rust data change processing platform, simplifies the creation of change-driven systems through continuous queries, reactions, and clearly defined change semantics. Continuous queries enable developers to specify precisely what data changes matter, track these changes in real-time, and react immediately as changes occur. Unlike traditional database queries, which provide static snapshots of data, continuous queries constantly maintain an up-to-date view of query results, automatically notifying reactions of precise additions, updates, and deletions to the result set as they happen.to the result set as they happen. To date, Drasi has supported only openCypher for writing continuous queries; openCypher is a powerful declarative graph query language. Recently, Drasi has added support for Graph Query Language (GQL), the new international ISO standard for querying property graphs. In this article, we describe what GQL means for writing continuous queries and describe how we implemented GQL Support. A Standardized Future for Graph Queries GQL is the first officially standardized database language since SQL in 1987. Published by ISO/IEC in April 2024, it defines a global specification for querying property graphs. Unlike the relational model that structures data into tables, the property graph model structures data inside of the database as a graph. With GQL support, Drasi enables users to benefit from a query language that we expect to be widely adopted across the database industry, ensuring compatibility with future standards in graph querying. Drasi continues to support openCypher, allowing users to select the query language that best fits their requirements and existing knowledge. With the introduction of GQL, Drasi users can now write continuous queries using the new international standard. Example GQL Continuous Query: Counting Unique Messages Event-driven architectures traditionally involve overhead for parsing event payloads, filtering irrelevant data, and managing contextual state to identify precise data transitions. Drasi eliminates much of this complexity through continuous queries, which maintain accurate real-time views of data and generate change notifications. Imagine a simple database with a message table containing the text of each message. Suppose you want to know, in real-time, how many times the same message has been sent. Traditionally, addressing these types of scenarios involves polling databases at set intervals, using middleware to detect state changes, and developing custom logic to handle reactions. It could also mean setting up change data capture (CDC) to feed a message broker and process events through a stream processing system. These methods can quickly become complex and difficult, especially when handling numerous or more sophisticated scenarios. Drasi simplifies this process by employing a change-driven architecture. Rather than relying on polling or other methods, Drasi uses continuous queries that actively monitor data for specific conditions. The moment a specified condition is met or changes, Drasi proactively sends notifications, ensuring real-time responsiveness. The following example shows the continuous query in GQL that counts the frequency of each unique message: MATCH (m:Message) LET Message = m.Message RETURN Message, count(Message) AS Frequency You can explore this example in the Drasi Getting Started tutorial. Key Features of the GQL Language OpenCypher had a significant influence on GQL and there are many things in common between the two languages; however, there are also some important differences. A new statement introduced in GQL is NEXT, which enables linear composition of multiple statements. It forms a pipeline where each subsequent statement receives the working table resulting from the previous statement. One application for NEXT is the ability to filter results after an aggregation. For example, to find colors associated with more than five vehicles, the following query can be used: MATCH (v:Vehicle) RETURN v.color AS color, count(v) AS vehicle_count NEXT FILTER vehicle_count > 5 RETURN color, vehicle_count Equivalent openCypher: MATCH (v:Vehicle) WITH v.color AS color, count(v) AS vehicle_count WHERE vehicle_count > 5 RETURN color, vehicle_count GQL introduces additional clauses and statements: LET, YIELD, and FILTER. The LET statement allows users to define new variables or computed fields for every row in the current working table. Each LET expression can reference existing columns in scope, and the resulting variables are added as new columns. Example: MATCH (v:Vehicle) LET makeAndModel = v.make + ' ' + v.model RETURN makeAndModel, v.year Equivalent openCypher: MATCH (v:Vehicle) WITH v, v.make + ' ' + v.model AS makeAndModel RETURN makeAndModel, v.year The YIELD clause projects and optionally renames columns from the working table, limiting the set of columns available in scope. Only specified columns remain in scope after YIELD. Example: MATCH (v:Vehicle)-[e:LOCATED_IN]->(z:Zone) YIELD v.color AS vehicleColor, z.type AS location RETURN vehicleColor, location FILTER is a standalone statement that removes rows from the current working table based on a specified condition. While GQL still supports a WHERE clause for filtering during the MATCH phase, the FILTER statement provides additional flexibility by allowing results to be filtered after previous steps. It does not create a new table; instead, it updates the working table. Unlike openCypher’s WHERE clause, which is tied to a MATCH or WITH, GQL's FILTER can be applied independently at various points in the query pipeline. Example: MATCH (n:Person) FILTER n.age > 30 RETURN n.name, n.age GQL also provides control in how aggregations are grouped. The GROUP BY clause can be used to explicitly define the grouping keys, ensuring results are aggregated exactly as intended. MATCH (v:Vehicle)-[:LOCATED_IN]->(z:Zone) RETURN z.type AS zone_type, v.color AS vehicle_color, count(v) AS vehicle_count GROUP BY zone_type, vehicle_color If the GROUP BY clause is omitted, GQL defaults to an implicit grouping behavior, having all non-aggregated columns in the RETURN clause automatically used as the grouping keys. While many of the core concepts, like pattern matching, projections, and filtering, will feel familiar to openCypher users, GQL’s statements are distinct in their usage. Supporting these differences in Drasi required design changes, described in the following section, that led to multiple query languages within the platform. Refactoring Drasi for Multi-Language Query Support Instead of migrating Drasi from openCypher to GQL, we saw this as an opportunity to address multi-language support in the system. Drasi's initial architecture was designed exclusively for openCypher. In this model, the query parser generated an Abstract Syntax Tree (AST) for openCypher. The execution engine was designed to process this AST format, executing the query it represented to produce the resulting dataset. Built‑in functions (such as toUpper() for string case conversion) followed openCypher naming and were implemented within the same module as the engine. This created an architectural challenge for supporting additional query languages, such as GQL. To enable multi-language support, the system was refactored to separate the parsing, execution, and function management. A key insight was that the existing AST structure, originally created for openCypher, was flexible enough to be used for GQL. Although GQL and openCypher are different languages, their core operations, matching patterns, filtering data, and projecting results, could be represented by this AST. The diagram shows the dependencies within this new architecture, highlighting the separation and interaction between the components. The language-specific function modules for openCypher and GQL provide the functions to the execution engine. The language-specific parsers for openCypher and GQL produce an AST, and the execution engine operates on this AST. The engine only needs to understand this AST format, making it language-agnostic. The AST structure is based on a sequence of `QueryPart` objects. Each `QueryPart` represents a distinct stage of the query, containing clauses for matching, filtering, and returning data. The execution engine processes these `QueryPart`s sequentially. pub struct QueryPart { pub match_clauses: Vec<MatchClause>, pub where_clauses: Vec<Expression>, pub return_clause: ProjectionClause, } The process begins when a query is submitted in either GQL or openCypher. The query is first directed to its corresponding language-specific parser, which handles the lexical analysis and transforms the raw query string into the standardized AST. When data changes occur in the graph, the execution engine uses the MATCH clauses from the first QueryPart to find affected graph patterns and captures the matched data. This matched data then flows through each QueryPart in sequence. The WHERE portion of the AST filters out data that does not meet the specified conditions. The RETURN portion transforms the data by selecting specific fields, computing new values, or performing aggregations. Each QueryPart's output becomes the next one's input, creating a pipeline that incrementally produces query results as the underlying graph changes. To support functions from multiple languages in this AST, we introduced a function registry to abstract a function's name from its implementation. Function names can differ (e.g., toUpper() in openCypher versus Upper() in GQL). For any given query, language-specific modules populate this registry, mapping each function name to its corresponding behavior. Functions with shared logic can be implemented once in the engine and registered under multiple names in specific function crates, preventing code duplication. Meanwhile, language-exclusive functions can be registered and implemented separately within their respective modules. When processing an AST, the engine uses the registry attached to that query to resolve and execute the correct function. The separate function modules allow developers to introduce their own function registry, supporting custom implementations or names. Conclusion By adding support for GQL, Drasi now offers developers a choice between openCypher and the new GQL standard. This capability ensures that teams can use the syntax that best fits their skills and project requirements. In addition, the architectural changes set the foundation for additional query languages. You can check out the code on our GitHub organization, dig into the technical details on our documentation site, and join our developer community on Discord.81Views1like0CommentsSentinel Data Connector: Google Workspace (G Suite) (using Azure Functions)
I'm encountering a problem when attempting to run the GWorkspace_Report workbook in Azure Sentinel. The query is throwing this error related to the union operator: 'union' operator: Failed to resolve table expression named 'GWorkspace_ReportsAPI_gcp_CL' I've double-checked, and the GoogleWorkspaceReports connector is installed and updated to version 3.0.2. Has anyone seen this or know what might be causing the table GWorkspace_ReportsAPI_gcp_CL to be unresolved? Thanks!4Views0likes0CommentsBuilding a Custom Continuous Export Pipeline for Azure Application Insights
1. Introduction MonitorLiftApp is a modular Python application designed to export telemetry data from Azure Application Insights to custom sinks such as Azure Data Lake Storage Gen2 (ADLS). This solution is ideal when Event Hub integration is not feasible, providing a flexible, maintainable, and scalable alternative for organizations needing to move telemetry data for analytics, compliance, or integration scenarios. 2. Problem Statement Exporting telemetry from Azure Application Insights is commonly achieved via Event Hub streaming. However, there are scenarios where Event Hub integration is not possible due to cost, complexity, or architectural constraints. In such cases, organizations need a reliable, incremental, and customizable way to extract telemetry data and move it to a destination of their choice, such as ADLS, SQL, or REST APIs. 3. Investigation Why Not Event Hub? Event Hub integration may not be available in all environments. Some organizations have security or cost concerns. Custom sinks (like ADLS) may be required for downstream analytics or compliance. Alternative Approach Use the Azure Monitor Query SDK to periodically export data. Build a modular, pluggable Python app for maintainability and extensibility. 4. Solution 4.1 Architecture Overview MonitorLiftApp is structured into four main components: Configuration: Centralizes all settings and credentials. Main Application: Orchestrates the export process. Query Execution: Runs KQL queries and serializes results. Sink Abstraction: Allows easy swapping of data targets (e.g., ADLS, SQL). 4.2 Configuration (app_config.py) All configuration is centralized in app_config.py, making it easy to adapt the app to different environments. CONFIG = { "APPINSIGHTS_APP_ID": "<your-app-id>", "APPINSIGHTS_WORKSPACE_ID": "<your-workspace-id>", "STORAGE_ACCOUNT_URL": "<your-adls-url>", "CONTAINER_NAME": "<your-container>", "Dependencies_KQL": "dependencies \n limit 10000", "Exceptions_KQL": "exceptions \n limit 10000", "Pages_KQL": "pageViews \n limit 10000", "Requests_KQL": "requests \n limit 10000", "Traces_KQL": "traces \n limit 10000", "START_STOP_MINUTES": 5, "TIMER_MINUTES": 5, "CLIENT_ID": "<your-client-id>", "CLIENT_SECRET": "<your-client-secret>", "TENANT_ID": "<your-tenant-id>" } Explanation: This configuration file contains all the necessary parameters for connecting to Azure resources, defining KQL queries, and scheduling the export job. By centralizing these settings, the app becomes easy to maintain and adapt. 4.3 Main Application (main.py) The main application is the entry point and can be run as a Python console app. It loads the configuration, sets up credentials, and runs the export job on a schedule. from app_config import CONFIG from azure.identity import ClientSecretCredential from monitorlift.query_runner import run_all_queries from monitorlift.target_repository import ADLSTargetRepository def build_env(): env = {} keys = [ "APPINSIGHTS_WORKSPACE_ID", "APPINSIGHTS_APP_ID", "STORAGE_ACCOUNT_URL", "CONTAINER_NAME" ] for k in keys: env[k] = CONFIG[k] for k, v in CONFIG.items(): if k.endswith("KQL"): env[k] = v return env class MonitorLiftApp: def __init__(self, client_id, client_secret, tenant_id): self.env = build_env() self.credential = ClientSecretCredential(tenant_id, client_id, client_secret) self.target_repo = ADLSTargetRepository( account_url=self.env["STORAGE_ACCOUNT_URL"], container_name=self.env["CONTAINER_NAME"], cred=self.credential ) def run(self): run_all_queries(self.target_repo, self.credential, self.env) if __name__ == "__main__": import time client_id = CONFIG["CLIENT_ID"] client_secret = CONFIG["CLIENT_SECRET"] tenant_id = CONFIG["TENANT_ID"] app = MonitorLiftApp(client_id, client_secret, tenant_id) timer_interval = app.env.get("TIMER_MINUTES", 5) print(f"Starting continuous export job. Interval: {timer_interval} minutes.") while True: print("\n[INFO] Running export job at", time.strftime('%Y-%m-%d %H:%M:%S')) try: app.run() print("[INFO] Export complete.") except Exception as e: print(f"[ERROR] Export failed: {e}") print(f"[INFO] Sleeping for {timer_interval} minutes...") time.sleep(timer_interval * 60) Explanation: The app can be run from any machine with Python and the required libraries installed—locally or in the cloud (VM, container, etc.). No compilation is needed; just run as a Python script. Optionally, you can package it as an executable using tools like PyInstaller. The main loop schedules the export job at regular intervals. 4.4 Query Execution (query_runner.py) This module orchestrates KQL queries, runs them in parallel, and serializes results. import datetime import json from concurrent.futures import ThreadPoolExecutor, as_completed from azure.monitor.query import LogsQueryClient def run_query_for_kql_var(kql_var, target_repo, credential, env): query_name = kql_var[:-4] print(f"[START] run_query_for_kql_var: {kql_var}") query_template = env[kql_var] app_id = env["APPINSIGHTS_APP_ID"] workspace_id = env["APPINSIGHTS_WORKSPACE_ID"] try: latest_ts = target_repo.get_latest_timestamp(query_name) print(f"Latest timestamp for {query_name}: {latest_ts}") except Exception as e: print(f"Error getting latest timestamp for {query_name}: {e}") return start = latest_ts time_window = env.get("START_STOP_MINUTES", 5) end = start + datetime.timedelta(minutes=time_window) query = f"app('{app_id}')." + query_template logs_client = LogsQueryClient(credential) try: response = logs_client.query_workspace(workspace_id, query, timespan=(start, end)) if response.tables and len(response.tables[0].rows) > 0: print(f"Query for {query_name} returned {len(response.tables[0].rows)} rows.") table = response.tables[0] rows = [ [v.isoformat() if isinstance(v, datetime.datetime) else v for v in row] for row in table.rows ] result_json = json.dumps({"columns": table.columns, "rows": rows}) target_repo.save_results(query_name, result_json, start, end) print(f"Saved results for {query_name}") except Exception as e: print(f"Error running query or saving results for {query_name}: {e}") def run_all_queries(target_repo, credential, env): print("[INFO] run_all_queries triggered.") kql_vars = [k for k in env if k.endswith('KQL') and not k.startswith('APPSETTING_')] print(f"Number of KQL queries to run: {len(kql_vars)}. KQL vars: {kql_vars}") with ThreadPoolExecutor(max_workers=len(kql_vars)) as executor: futures = { executor.submit(run_query_for_kql_var, kql_var, target_repo, credential, env): kql_var for kql_var in kql_vars } for future in as_completed(futures): kql_var = futures[future] try: future.result() except Exception as exc: print(f"[ERROR] Exception in query {kql_var}: {exc}") Explanation: Queries are executed in parallel for efficiency. Results are serialized and saved to the configured sink. Incremental export is achieved by tracking the latest timestamp for each query. 4.5 Sink Abstraction (target_repository.py) This module abstracts the sink implementation, allowing you to swap out ADLS for SQL, REST API, or other targets. from abc import ABC, abstractmethod import datetime from azure.storage.blob import BlobServiceClient class TargetRepository(ABC): @abstractmethod def get_latest_timestamp(self, query_name): pass @abstractmethod def save_results(self, query_name, data, start, end): pass class ADLSTargetRepository(TargetRepository): def __init__(self, account_url, container_name, cred): self.account_url = account_url self.container_name = container_name self.credential = cred self.blob_service_client = BlobServiceClient(account_url=account_url, credential=cred) def get_latest_timestamp(self, query_name, fallback_hours=3): blob_client = self.blob_service_client.get_blob_client(self.container_name, f"{query_name}/latest_timestamp.txt") try: timestamp_str = blob_client.download_blob().readall().decode() return datetime.datetime.fromisoformat(timestamp_str) except Exception as e: if hasattr(e, 'error_code') and e.error_code == 'BlobNotFound': print(f"[INFO] No timestamp blob for {query_name}, starting from {fallback_hours} hours ago.") else: print(f"[WARNING] Could not get latest timestamp for {query_name}: {type(e).__name__}: {e}") return datetime.datetime.utcnow() - datetime.timedelta(hours=fallback_hours) def save_results(self, query_name, data, start, end): filename = f"{query_name}/{start:%Y%m%d%H%M}_{end:%Y%m%d%H%M}.json" blob_client = self.blob_service_client.get_blob_client(self.container_name, filename) try: blob_client.upload_blob(data, overwrite=True) print(f"[SUCCESS] Saved results to blob for {query_name} from {start} to {end}") except Exception as e: print(f"[ERROR] Failed to save results to blob for {query_name}: {type(e).__name__}: {e}") ts_blob_client = self.blob_service_client.get_blob_client(self.container_name, f"{query_name}/latest_timestamp.txt") try: ts_blob_client.upload_blob(end.isoformat(), overwrite=True) except Exception as e: print(f"[ERROR] Failed to update latest timestamp for {query_name}: {type(e).__name__}: {e}") Explanation: The sink abstraction allows you to easily switch between different storage backends. The ADLS implementation saves both the results and the latest timestamp for incremental exports. End-to-End Setup Guide Prerequisites Python 3.8+ Azure SDKs: azure-identity, azure-monitor-query, azure-storage-blob Access to Azure Application Insights and ADLS Service principal credentials with appropriate permissions Steps Clone or Download the Repository Place all code files in a working directory. 2. **Configure **app_config.py Fill in your Azure resource IDs, credentials, and KQL queries. 3. Install Dependencies pip install azure-identity azure-monitor-query azure-storage-blob 4. Run the Application Locally: python monitorliftapp/main.py Deploy to a VM, Azure Container Instance, or as a scheduled job. (Optional) Package as Executable Use PyInstaller or similar tools if you want a standalone executable. 2. Verify Data Movement Check your ADLS container for exported JSON files. 6. Screenshots App Insights Logs:\ Exported Data in ADLS: \ 7. Final Thoughts MonitorLiftApp provides a robust, modular, and extensible solution for exporting telemetry from Azure Monitor to custom sinks. Its design supports parallel query execution, pluggable storage backends, and incremental data movement, making it suitable for a wide range of enterprise scenarios.66Views0likes0CommentsData Connectors Storage Account and Function App
Several data connectors downloaded via Content Hub has ARM deployment templates which is default OOB experience. If we need to customize we could however I wanted to ask community how do you go about addressing some of the infrastructure issues where these connectors deploy storage accounts with insecure configurations like infrastructure key requirement, vnet intergration, cmk, front door etc... Storage and Function Apps. It appears default configuration basically provisions all required services to get streams going but posture configuration seems to be dismissing security standards around hardening these services.14Views0likes0CommentsMicrosoft 365 defender alerts not capturing fields (entities) in azure sentinel
We got an alert from 365 defenders to azure sentinel ( A potentially malicious URL click was detected). To investigate this alert we have to check in the 365 defender portal. We noticed that entities are not capturing (user, host, IP). How can we resolve this issue? Note: This is not a custom rule.2.5KViews1like3CommentsSingle Rule for No logs receiving (Global + Per-device Thresholds)
Hi everyone, I currently maintain one Analytics rule per table to detect when logs stop coming in. Some tables receive data from multiple sources, each with a different expected interval (for example, some sources send every 10 minutes, others every 30 minutes). In other SIEM platforms there’s usually: A global threshold (e.g., 60 minutes) for all sources. Optional per-device/per-table thresholds that override the global value. Is there a recommended way to implement one global rule that uses a default threshold but allows per-source overrides when a particular device or log table has a different expected frequency? Also, if there are other approaches you use to manage “logs not received” detection, I’d love to hear your suggestions as well. This is a sample of my current rule let threshold = 1h; AzureActivity | summarize LastHeartBeat = max(TimeGenerated) | where LastHeartBeat < ago(threshold)28Views1like0CommentsMysterious Nightly CPU Spikes on App Service Plans (22:00-10:00) Despite Low Traffic
For several months now, all of our Azure App Service Plans have been experiencing consistent CPU spikes during off-peak hours, specifically from approximately 22:00 PM to 10:00 AM. This pattern is particularly puzzling because: This timeframe corresponds to our lowest traffic and activity periods We've conducted thorough investigations but haven't identified the root cause No scheduled timer functions or planned jobs are running during these hours that could explain the spikes What we've already checked: Application logs and metrics Scheduled functions and background jobs Traffic patterns and user activity Has anyone encountered similar behavior? What could be causing these nightly CPU spikes on otherwise idle App Service Plans?90Views0likes2CommentsLog Ingestion Delay in all Data connectors
Hi, I have integrated multiple log sources in sentinel and all the log sources are ingesting logs between 7:00 pm to 2:00 am I want the log ingestion in real time. I have integrated Azure WAF, syslog, Fortinet, Windows servers. For evidence I am attaching a screenshots. I am totally clueless if anyone can help I will be very thankful!89Views0likes1Comment