Blog Post

Microsoft Mission Critical Blog
6 MIN READ

Building a Custom Continuous Export Pipeline for Azure Application Insights

PravinT's avatar
PravinT
Icon for Microsoft rankMicrosoft
Sep 30, 2025

From Logs to Lake: A Python-Powered Bridge for Azure Monitor Data

1. Introduction

MonitorLiftApp is a modular Python application designed to export telemetry data from Azure Application Insights to custom sinks such as Azure Data Lake Storage Gen2 (ADLS). This solution is ideal when Event Hub integration is not feasible, providing a flexible, maintainable, and scalable alternative for organizations needing to move telemetry data for analytics, compliance, or integration scenarios. 

2. Problem Statement

Exporting telemetry from Azure Application Insights is commonly achieved via Event Hub streaming. However, there are scenarios where Event Hub integration is not possible due to cost, complexity, or architectural constraints. In such cases, organizations need a reliable, incremental, and customizable way to extract telemetry data and move it to a destination of their choice, such as ADLS, SQL, or REST APIs. 

3. Investigation

Why Not Event Hub? 
  • Event Hub integration may not be available in all environments. 
  • Some organizations have security or cost concerns. 
  • Custom sinks (like ADLS) may be required for downstream analytics or compliance. 
Alternative Approach 
  • Use the Azure Monitor Query SDK to periodically export data. 
  • Build a modular, pluggable Python app for maintainability and extensibility. 

 

4. Solution

4.1 Architecture Overview 

 MonitorLiftApp is structured into four main components: 

  • Configuration: Centralizes all settings and credentials. 
  • Main Application: Orchestrates the export process. 
  • Query Execution: Runs KQL queries and serializes results. 
  • Sink Abstraction: Allows easy swapping of data targets (e.g., ADLS, SQL). 

 

4.2 Configuration (app_config.py) 

All configuration is centralized in app_config.py, making it easy to adapt the app to different environments. 

CONFIG = { 
    "APPINSIGHTS_APP_ID": "<your-app-id>", 
    "APPINSIGHTS_WORKSPACE_ID": "<your-workspace-id>", 
    "STORAGE_ACCOUNT_URL": "<your-adls-url>", 
    "CONTAINER_NAME": "<your-container>", 
    "Dependencies_KQL": "dependencies \n limit 10000", 
    "Exceptions_KQL": "exceptions \n limit 10000", 
    "Pages_KQL": "pageViews \n limit 10000", 
    "Requests_KQL": "requests \n limit 10000", 
    "Traces_KQL": "traces \n limit 10000", 
    "START_STOP_MINUTES": 5, 
    "TIMER_MINUTES": 5, 
    "CLIENT_ID": "<your-client-id>", 
    "CLIENT_SECRET": "<your-client-secret>", 
    "TENANT_ID": "<your-tenant-id>" 
}

ExplanationThis configuration file contains all the necessary parameters for connecting to Azure resources, defining KQL queries, and scheduling the export job. By centralizing these settings, the app becomes easy to maintain and adapt. 

 

4.3 Main Application (main.py) 

The main application is the entry point and can be run as a Python console app. It loads the configuration, sets up credentials, and runs the export job on a schedule. 

from app_config import CONFIG 
from azure.identity import ClientSecretCredential 
from monitorlift.query_runner import run_all_queries 
from monitorlift.target_repository import ADLSTargetRepository 

def build_env(): 
    env = {} 
    keys = [ 
        "APPINSIGHTS_WORKSPACE_ID", 
        "APPINSIGHTS_APP_ID", 
        "STORAGE_ACCOUNT_URL", 
        "CONTAINER_NAME" 
    ] 
    for k in keys: 
        env[k] = CONFIG[k] 
    for k, v in CONFIG.items(): 
        if k.endswith("KQL"): 
            env[k] = v 
    return env 

class MonitorLiftApp: 
    def __init__(self, client_id, client_secret, tenant_id): 
        self.env = build_env() 
        self.credential = ClientSecretCredential(tenant_id, client_id, client_secret) 
        self.target_repo = ADLSTargetRepository( 
            account_url=self.env["STORAGE_ACCOUNT_URL"], 
            container_name=self.env["CONTAINER_NAME"], 
            cred=self.credential 
        ) 

    def run(self): 
        run_all_queries(self.target_repo, self.credential, self.env) 

if __name__ == "__main__": 
    import time 
    client_id = CONFIG["CLIENT_ID"] 
    client_secret = CONFIG["CLIENT_SECRET"] 
    tenant_id = CONFIG["TENANT_ID"] 
    app = MonitorLiftApp(client_id, client_secret, tenant_id) 
    timer_interval = app.env.get("TIMER_MINUTES", 5) 
    print(f"Starting continuous export job. Interval: {timer_interval} minutes.") 
    while True: 
        print("\n[INFO] Running export job at", time.strftime('%Y-%m-%d %H:%M:%S')) 
        try: 
            app.run() 
            print("[INFO] Export complete.") 
        except Exception as e: 
            print(f"[ERROR] Export failed: {e}") 
        print(f"[INFO] Sleeping for {timer_interval} minutes...") 
        time.sleep(timer_interval * 60)

 

Explanation: 

  • The app can be run from any machine with Python and the required libraries installed—locally or in the cloud (VM, container, etc.). 
  • No compilation is needed; just run as a Python script. Optionally, you can package it as an executable using tools like PyInstaller. 
  • The main loop schedules the export job at regular intervals. 

 

4.4 Query Execution (query_runner.py) 

This module orchestrates KQL queries, runs them in parallel, and serializes results.

 

import datetime 
import json 
from concurrent.futures import ThreadPoolExecutor, as_completed 
from azure.monitor.query import LogsQueryClient 

def run_query_for_kql_var(kql_var, target_repo, credential, env): 
    query_name = kql_var[:-4] 
    print(f"[START] run_query_for_kql_var: {kql_var}") 
    query_template = env[kql_var] 
    app_id = env["APPINSIGHTS_APP_ID"] 
    workspace_id = env["APPINSIGHTS_WORKSPACE_ID"] 

    try: 
        latest_ts = target_repo.get_latest_timestamp(query_name) 
        print(f"Latest timestamp for {query_name}: {latest_ts}") 
    except Exception as e: 
        print(f"Error getting latest timestamp for {query_name}: {e}") 
        return 

    start = latest_ts 
    time_window = env.get("START_STOP_MINUTES", 5) 
    end = start + datetime.timedelta(minutes=time_window) 
    query = f"app('{app_id}')." + query_template 
    logs_client = LogsQueryClient(credential) 

    try: 
        response = logs_client.query_workspace(workspace_id, query, timespan=(start, end)) 
        if response.tables and len(response.tables[0].rows) > 0: 
            print(f"Query for {query_name} returned {len(response.tables[0].rows)} rows.") 
            table = response.tables[0] 
            rows = [ 
                [v.isoformat() if isinstance(v, datetime.datetime) else v for v in row] 
                for row in table.rows 
            ] 
            result_json = json.dumps({"columns": table.columns, "rows": rows}) 
            target_repo.save_results(query_name, result_json, start, end) 
            print(f"Saved results for {query_name}") 
    except Exception as e: 
        print(f"Error running query or saving results for {query_name}: {e}") 

def run_all_queries(target_repo, credential, env): 
    print("[INFO] run_all_queries triggered.") 
    kql_vars = [k for k in env if k.endswith('KQL') and not k.startswith('APPSETTING_')] 
    print(f"Number of KQL queries to run: {len(kql_vars)}. KQL vars: {kql_vars}") 
    with ThreadPoolExecutor(max_workers=len(kql_vars)) as executor: 
        futures = { 
            executor.submit(run_query_for_kql_var, kql_var, target_repo, credential, env): kql_var 
            for kql_var in kql_vars 
        } 
        for future in as_completed(futures): 
            kql_var = futures[future] 
            try: 
                future.result() 
            except Exception as exc: 
                print(f"[ERROR] Exception in query {kql_var}: {exc}")

Explanation: 

  • Queries are executed in parallel for efficiency. 
  • Results are serialized and saved to the configured sink. 
  • Incremental export is achieved by tracking the latest timestamp for each query. 

 

4.5 Sink Abstraction (target_repository.py) 

This module abstracts the sink implementation, allowing you to swap out ADLS for SQL, REST API, or other targets. 

 

from abc import ABC, abstractmethod 
import datetime 
from azure.storage.blob import BlobServiceClient 

class TargetRepository(ABC): 
    @abstractmethod 
    def get_latest_timestamp(self, query_name): 
        pass 

    @abstractmethod 
    def save_results(self, query_name, data, start, end): 
        pass 

class ADLSTargetRepository(TargetRepository): 
    def __init__(self, account_url, container_name, cred): 
        self.account_url = account_url 
        self.container_name = container_name 
        self.credential = cred 
        self.blob_service_client = BlobServiceClient(account_url=account_url, credential=cred) 

    def get_latest_timestamp(self, query_name, fallback_hours=3): 
        blob_client = self.blob_service_client.get_blob_client(self.container_name, f"{query_name}/latest_timestamp.txt") 
        try: 
            timestamp_str = blob_client.download_blob().readall().decode() 
            return datetime.datetime.fromisoformat(timestamp_str) 
        except Exception as e: 
            if hasattr(e, 'error_code') and e.error_code == 'BlobNotFound': 
                print(f"[INFO] No timestamp blob for {query_name}, starting from {fallback_hours} hours ago.") 
            else: 
                print(f"[WARNING] Could not get latest timestamp for {query_name}: {type(e).__name__}: {e}") 
            return datetime.datetime.utcnow() - datetime.timedelta(hours=fallback_hours) 

    def save_results(self, query_name, data, start, end): 
        filename = f"{query_name}/{start:%Y%m%d%H%M}_{end:%Y%m%d%H%M}.json" 
        blob_client = self.blob_service_client.get_blob_client(self.container_name, filename) 
        try: 
            blob_client.upload_blob(data, overwrite=True) 
            print(f"[SUCCESS] Saved results to blob for {query_name} from {start} to {end}") 
        except Exception as e: 
            print(f"[ERROR] Failed to save results to blob for {query_name}: {type(e).__name__}: {e}") 
        ts_blob_client = self.blob_service_client.get_blob_client(self.container_name, f"{query_name}/latest_timestamp.txt") 
        try: 
            ts_blob_client.upload_blob(end.isoformat(), overwrite=True) 
        except Exception as e: 
            print(f"[ERROR] Failed to update latest timestamp for {query_name}: {type(e).__name__}: {e}")

 

Explanation: 

  • The sink abstraction allows you to easily switch between different storage backends. 
  • The ADLS implementation saves both the results and the latest timestamp for incremental exports.

 

End-to-End Setup Guide

Prerequisites 

  • Python 3.8+ 
  • Azure SDKs: azure-identity, azure-monitor-query, azure-storage-blob 
  • Access to Azure Application Insights and ADLS 
  • Service principal credentials with appropriate permissions 

Steps 

  1. Clone or Download the Repository 
    • Place all code files in a working directory. 

     2. **Configure **app_config.py 

    • Fill in your Azure resource IDs, credentials, and KQL queries. 

     3. Install Dependencies 

         

pip install azure-identity azure-monitor-query azure-storage-blob 

 

     4. Run the Application 

    • Locally:   
      python monitorliftapp/main.py 

       

Deploy to a VM, Azure Container Instance, or as a scheduled job. 

  1. (Optional) Package as Executable 
    • Use PyInstaller or similar tools if you want a standalone executable. 

     2. Verify Data Movement 

    • Check your ADLS container for exported JSON files. 

 

6. Screenshots 

  • App Insights Logs:\ 

 

 

 

  • Exported Data in ADLS: \ 

 

 

 

 7. Final Thoughts

MonitorLiftApp provides a robust, modular, and extensible solution for exporting telemetry from Azure Monitor to custom sinks. Its design supports parallel query execution, pluggable storage backends, and incremental data movement, making it suitable for a wide range of enterprise scenarios. 

Updated Sep 30, 2025
Version 2.0
No CommentsBe the first to comment