Endpoint Management
1 TopicBuilding a Custom Continuous Export Pipeline for Azure Application Insights
1. Introduction MonitorLiftApp is a modular Python application designed to export telemetry data from Azure Application Insights to custom sinks such as Azure Data Lake Storage Gen2 (ADLS). This solution is ideal when Event Hub integration is not feasible, providing a flexible, maintainable, and scalable alternative for organizations needing to move telemetry data for analytics, compliance, or integration scenarios. 2. Problem Statement Exporting telemetry from Azure Application Insights is commonly achieved via Event Hub streaming. However, there are scenarios where Event Hub integration is not possible due to cost, complexity, or architectural constraints. In such cases, organizations need a reliable, incremental, and customizable way to extract telemetry data and move it to a destination of their choice, such as ADLS, SQL, or REST APIs. 3. Investigation Why Not Event Hub? Event Hub integration may not be available in all environments. Some organizations have security or cost concerns. Custom sinks (like ADLS) may be required for downstream analytics or compliance. Alternative Approach Use the Azure Monitor Query SDK to periodically export data. Build a modular, pluggable Python app for maintainability and extensibility. 4. Solution 4.1 Architecture Overview MonitorLiftApp is structured into four main components: Configuration: Centralizes all settings and credentials. Main Application: Orchestrates the export process. Query Execution: Runs KQL queries and serializes results. Sink Abstraction: Allows easy swapping of data targets (e.g., ADLS, SQL). 4.2 Configuration (app_config.py) All configuration is centralized in app_config.py, making it easy to adapt the app to different environments. CONFIG = { "APPINSIGHTS_APP_ID": "<your-app-id>", "APPINSIGHTS_WORKSPACE_ID": "<your-workspace-id>", "STORAGE_ACCOUNT_URL": "<your-adls-url>", "CONTAINER_NAME": "<your-container>", "Dependencies_KQL": "dependencies \n limit 10000", "Exceptions_KQL": "exceptions \n limit 10000", "Pages_KQL": "pageViews \n limit 10000", "Requests_KQL": "requests \n limit 10000", "Traces_KQL": "traces \n limit 10000", "START_STOP_MINUTES": 5, "TIMER_MINUTES": 5, "CLIENT_ID": "<your-client-id>", "CLIENT_SECRET": "<your-client-secret>", "TENANT_ID": "<your-tenant-id>" } Explanation: This configuration file contains all the necessary parameters for connecting to Azure resources, defining KQL queries, and scheduling the export job. By centralizing these settings, the app becomes easy to maintain and adapt. 4.3 Main Application (main.py) The main application is the entry point and can be run as a Python console app. It loads the configuration, sets up credentials, and runs the export job on a schedule. from app_config import CONFIG from azure.identity import ClientSecretCredential from monitorlift.query_runner import run_all_queries from monitorlift.target_repository import ADLSTargetRepository def build_env(): env = {} keys = [ "APPINSIGHTS_WORKSPACE_ID", "APPINSIGHTS_APP_ID", "STORAGE_ACCOUNT_URL", "CONTAINER_NAME" ] for k in keys: env[k] = CONFIG[k] for k, v in CONFIG.items(): if k.endswith("KQL"): env[k] = v return env class MonitorLiftApp: def __init__(self, client_id, client_secret, tenant_id): self.env = build_env() self.credential = ClientSecretCredential(tenant_id, client_id, client_secret) self.target_repo = ADLSTargetRepository( account_url=self.env["STORAGE_ACCOUNT_URL"], container_name=self.env["CONTAINER_NAME"], cred=self.credential ) def run(self): run_all_queries(self.target_repo, self.credential, self.env) if __name__ == "__main__": import time client_id = CONFIG["CLIENT_ID"] client_secret = CONFIG["CLIENT_SECRET"] tenant_id = CONFIG["TENANT_ID"] app = MonitorLiftApp(client_id, client_secret, tenant_id) timer_interval = app.env.get("TIMER_MINUTES", 5) print(f"Starting continuous export job. Interval: {timer_interval} minutes.") while True: print("\n[INFO] Running export job at", time.strftime('%Y-%m-%d %H:%M:%S')) try: app.run() print("[INFO] Export complete.") except Exception as e: print(f"[ERROR] Export failed: {e}") print(f"[INFO] Sleeping for {timer_interval} minutes...") time.sleep(timer_interval * 60) Explanation: The app can be run from any machine with Python and the required libraries installed—locally or in the cloud (VM, container, etc.). No compilation is needed; just run as a Python script. Optionally, you can package it as an executable using tools like PyInstaller. The main loop schedules the export job at regular intervals. 4.4 Query Execution (query_runner.py) This module orchestrates KQL queries, runs them in parallel, and serializes results. import datetime import json from concurrent.futures import ThreadPoolExecutor, as_completed from azure.monitor.query import LogsQueryClient def run_query_for_kql_var(kql_var, target_repo, credential, env): query_name = kql_var[:-4] print(f"[START] run_query_for_kql_var: {kql_var}") query_template = env[kql_var] app_id = env["APPINSIGHTS_APP_ID"] workspace_id = env["APPINSIGHTS_WORKSPACE_ID"] try: latest_ts = target_repo.get_latest_timestamp(query_name) print(f"Latest timestamp for {query_name}: {latest_ts}") except Exception as e: print(f"Error getting latest timestamp for {query_name}: {e}") return start = latest_ts time_window = env.get("START_STOP_MINUTES", 5) end = start + datetime.timedelta(minutes=time_window) query = f"app('{app_id}')." + query_template logs_client = LogsQueryClient(credential) try: response = logs_client.query_workspace(workspace_id, query, timespan=(start, end)) if response.tables and len(response.tables[0].rows) > 0: print(f"Query for {query_name} returned {len(response.tables[0].rows)} rows.") table = response.tables[0] rows = [ [v.isoformat() if isinstance(v, datetime.datetime) else v for v in row] for row in table.rows ] result_json = json.dumps({"columns": table.columns, "rows": rows}) target_repo.save_results(query_name, result_json, start, end) print(f"Saved results for {query_name}") except Exception as e: print(f"Error running query or saving results for {query_name}: {e}") def run_all_queries(target_repo, credential, env): print("[INFO] run_all_queries triggered.") kql_vars = [k for k in env if k.endswith('KQL') and not k.startswith('APPSETTING_')] print(f"Number of KQL queries to run: {len(kql_vars)}. KQL vars: {kql_vars}") with ThreadPoolExecutor(max_workers=len(kql_vars)) as executor: futures = { executor.submit(run_query_for_kql_var, kql_var, target_repo, credential, env): kql_var for kql_var in kql_vars } for future in as_completed(futures): kql_var = futures[future] try: future.result() except Exception as exc: print(f"[ERROR] Exception in query {kql_var}: {exc}") Explanation: Queries are executed in parallel for efficiency. Results are serialized and saved to the configured sink. Incremental export is achieved by tracking the latest timestamp for each query. 4.5 Sink Abstraction (target_repository.py) This module abstracts the sink implementation, allowing you to swap out ADLS for SQL, REST API, or other targets. from abc import ABC, abstractmethod import datetime from azure.storage.blob import BlobServiceClient class TargetRepository(ABC): @abstractmethod def get_latest_timestamp(self, query_name): pass @abstractmethod def save_results(self, query_name, data, start, end): pass class ADLSTargetRepository(TargetRepository): def __init__(self, account_url, container_name, cred): self.account_url = account_url self.container_name = container_name self.credential = cred self.blob_service_client = BlobServiceClient(account_url=account_url, credential=cred) def get_latest_timestamp(self, query_name, fallback_hours=3): blob_client = self.blob_service_client.get_blob_client(self.container_name, f"{query_name}/latest_timestamp.txt") try: timestamp_str = blob_client.download_blob().readall().decode() return datetime.datetime.fromisoformat(timestamp_str) except Exception as e: if hasattr(e, 'error_code') and e.error_code == 'BlobNotFound': print(f"[INFO] No timestamp blob for {query_name}, starting from {fallback_hours} hours ago.") else: print(f"[WARNING] Could not get latest timestamp for {query_name}: {type(e).__name__}: {e}") return datetime.datetime.utcnow() - datetime.timedelta(hours=fallback_hours) def save_results(self, query_name, data, start, end): filename = f"{query_name}/{start:%Y%m%d%H%M}_{end:%Y%m%d%H%M}.json" blob_client = self.blob_service_client.get_blob_client(self.container_name, filename) try: blob_client.upload_blob(data, overwrite=True) print(f"[SUCCESS] Saved results to blob for {query_name} from {start} to {end}") except Exception as e: print(f"[ERROR] Failed to save results to blob for {query_name}: {type(e).__name__}: {e}") ts_blob_client = self.blob_service_client.get_blob_client(self.container_name, f"{query_name}/latest_timestamp.txt") try: ts_blob_client.upload_blob(end.isoformat(), overwrite=True) except Exception as e: print(f"[ERROR] Failed to update latest timestamp for {query_name}: {type(e).__name__}: {e}") Explanation: The sink abstraction allows you to easily switch between different storage backends. The ADLS implementation saves both the results and the latest timestamp for incremental exports. End-to-End Setup Guide Prerequisites Python 3.8+ Azure SDKs: azure-identity, azure-monitor-query, azure-storage-blob Access to Azure Application Insights and ADLS Service principal credentials with appropriate permissions Steps Clone or Download the Repository Place all code files in a working directory. 2. **Configure **app_config.py Fill in your Azure resource IDs, credentials, and KQL queries. 3. Install Dependencies pip install azure-identity azure-monitor-query azure-storage-blob 4. Run the Application Locally: python monitorliftapp/main.py Deploy to a VM, Azure Container Instance, or as a scheduled job. (Optional) Package as Executable Use PyInstaller or similar tools if you want a standalone executable. 2. Verify Data Movement Check your ADLS container for exported JSON files. 6. Screenshots App Insights Logs:\ Exported Data in ADLS: \ 7. Final Thoughts MonitorLiftApp provides a robust, modular, and extensible solution for exporting telemetry from Azure Monitor to custom sinks. Its design supports parallel query execution, pluggable storage backends, and incremental data movement, making it suitable for a wide range of enterprise scenarios.55Views0likes0Comments