biz apps
5 TopicsBuilding a Custom Continuous Export Pipeline for Azure Application Insights
1. Introduction MonitorLiftApp is a modular Python application designed to export telemetry data from Azure Application Insights to custom sinks such as Azure Data Lake Storage Gen2 (ADLS). This solution is ideal when Event Hub integration is not feasible, providing a flexible, maintainable, and scalable alternative for organizations needing to move telemetry data for analytics, compliance, or integration scenarios. 2. Problem Statement Exporting telemetry from Azure Application Insights is commonly achieved via Event Hub streaming. However, there are scenarios where Event Hub integration is not possible due to cost, complexity, or architectural constraints. In such cases, organizations need a reliable, incremental, and customizable way to extract telemetry data and move it to a destination of their choice, such as ADLS, SQL, or REST APIs. 3. Investigation Why Not Event Hub? Event Hub integration may not be available in all environments. Some organizations have security or cost concerns. Custom sinks (like ADLS) may be required for downstream analytics or compliance. Alternative Approach Use the Azure Monitor Query SDK to periodically export data. Build a modular, pluggable Python app for maintainability and extensibility. 4. Solution 4.1 Architecture Overview MonitorLiftApp is structured into four main components: Configuration: Centralizes all settings and credentials. Main Application: Orchestrates the export process. Query Execution: Runs KQL queries and serializes results. Sink Abstraction: Allows easy swapping of data targets (e.g., ADLS, SQL). 4.2 Configuration (app_config.py) All configuration is centralized in app_config.py, making it easy to adapt the app to different environments. CONFIG = { "APPINSIGHTS_APP_ID": "<your-app-id>", "APPINSIGHTS_WORKSPACE_ID": "<your-workspace-id>", "STORAGE_ACCOUNT_URL": "<your-adls-url>", "CONTAINER_NAME": "<your-container>", "Dependencies_KQL": "dependencies \n limit 10000", "Exceptions_KQL": "exceptions \n limit 10000", "Pages_KQL": "pageViews \n limit 10000", "Requests_KQL": "requests \n limit 10000", "Traces_KQL": "traces \n limit 10000", "START_STOP_MINUTES": 5, "TIMER_MINUTES": 5, "CLIENT_ID": "<your-client-id>", "CLIENT_SECRET": "<your-client-secret>", "TENANT_ID": "<your-tenant-id>" } Explanation: This configuration file contains all the necessary parameters for connecting to Azure resources, defining KQL queries, and scheduling the export job. By centralizing these settings, the app becomes easy to maintain and adapt. 4.3 Main Application (main.py) The main application is the entry point and can be run as a Python console app. It loads the configuration, sets up credentials, and runs the export job on a schedule. from app_config import CONFIG from azure.identity import ClientSecretCredential from monitorlift.query_runner import run_all_queries from monitorlift.target_repository import ADLSTargetRepository def build_env(): env = {} keys = [ "APPINSIGHTS_WORKSPACE_ID", "APPINSIGHTS_APP_ID", "STORAGE_ACCOUNT_URL", "CONTAINER_NAME" ] for k in keys: env[k] = CONFIG[k] for k, v in CONFIG.items(): if k.endswith("KQL"): env[k] = v return env class MonitorLiftApp: def __init__(self, client_id, client_secret, tenant_id): self.env = build_env() self.credential = ClientSecretCredential(tenant_id, client_id, client_secret) self.target_repo = ADLSTargetRepository( account_url=self.env["STORAGE_ACCOUNT_URL"], container_name=self.env["CONTAINER_NAME"], cred=self.credential ) def run(self): run_all_queries(self.target_repo, self.credential, self.env) if __name__ == "__main__": import time client_id = CONFIG["CLIENT_ID"] client_secret = CONFIG["CLIENT_SECRET"] tenant_id = CONFIG["TENANT_ID"] app = MonitorLiftApp(client_id, client_secret, tenant_id) timer_interval = app.env.get("TIMER_MINUTES", 5) print(f"Starting continuous export job. Interval: {timer_interval} minutes.") while True: print("\n[INFO] Running export job at", time.strftime('%Y-%m-%d %H:%M:%S')) try: app.run() print("[INFO] Export complete.") except Exception as e: print(f"[ERROR] Export failed: {e}") print(f"[INFO] Sleeping for {timer_interval} minutes...") time.sleep(timer_interval * 60) Explanation: The app can be run from any machine with Python and the required libraries installed—locally or in the cloud (VM, container, etc.). No compilation is needed; just run as a Python script. Optionally, you can package it as an executable using tools like PyInstaller. The main loop schedules the export job at regular intervals. 4.4 Query Execution (query_runner.py) This module orchestrates KQL queries, runs them in parallel, and serializes results. import datetime import json from concurrent.futures import ThreadPoolExecutor, as_completed from azure.monitor.query import LogsQueryClient def run_query_for_kql_var(kql_var, target_repo, credential, env): query_name = kql_var[:-4] print(f"[START] run_query_for_kql_var: {kql_var}") query_template = env[kql_var] app_id = env["APPINSIGHTS_APP_ID"] workspace_id = env["APPINSIGHTS_WORKSPACE_ID"] try: latest_ts = target_repo.get_latest_timestamp(query_name) print(f"Latest timestamp for {query_name}: {latest_ts}") except Exception as e: print(f"Error getting latest timestamp for {query_name}: {e}") return start = latest_ts time_window = env.get("START_STOP_MINUTES", 5) end = start + datetime.timedelta(minutes=time_window) query = f"app('{app_id}')." + query_template logs_client = LogsQueryClient(credential) try: response = logs_client.query_workspace(workspace_id, query, timespan=(start, end)) if response.tables and len(response.tables[0].rows) > 0: print(f"Query for {query_name} returned {len(response.tables[0].rows)} rows.") table = response.tables[0] rows = [ [v.isoformat() if isinstance(v, datetime.datetime) else v for v in row] for row in table.rows ] result_json = json.dumps({"columns": table.columns, "rows": rows}) target_repo.save_results(query_name, result_json, start, end) print(f"Saved results for {query_name}") except Exception as e: print(f"Error running query or saving results for {query_name}: {e}") def run_all_queries(target_repo, credential, env): print("[INFO] run_all_queries triggered.") kql_vars = [k for k in env if k.endswith('KQL') and not k.startswith('APPSETTING_')] print(f"Number of KQL queries to run: {len(kql_vars)}. KQL vars: {kql_vars}") with ThreadPoolExecutor(max_workers=len(kql_vars)) as executor: futures = { executor.submit(run_query_for_kql_var, kql_var, target_repo, credential, env): kql_var for kql_var in kql_vars } for future in as_completed(futures): kql_var = futures[future] try: future.result() except Exception as exc: print(f"[ERROR] Exception in query {kql_var}: {exc}") Explanation: Queries are executed in parallel for efficiency. Results are serialized and saved to the configured sink. Incremental export is achieved by tracking the latest timestamp for each query. 4.5 Sink Abstraction (target_repository.py) This module abstracts the sink implementation, allowing you to swap out ADLS for SQL, REST API, or other targets. from abc import ABC, abstractmethod import datetime from azure.storage.blob import BlobServiceClient class TargetRepository(ABC): @abstractmethod def get_latest_timestamp(self, query_name): pass @abstractmethod def save_results(self, query_name, data, start, end): pass class ADLSTargetRepository(TargetRepository): def __init__(self, account_url, container_name, cred): self.account_url = account_url self.container_name = container_name self.credential = cred self.blob_service_client = BlobServiceClient(account_url=account_url, credential=cred) def get_latest_timestamp(self, query_name, fallback_hours=3): blob_client = self.blob_service_client.get_blob_client(self.container_name, f"{query_name}/latest_timestamp.txt") try: timestamp_str = blob_client.download_blob().readall().decode() return datetime.datetime.fromisoformat(timestamp_str) except Exception as e: if hasattr(e, 'error_code') and e.error_code == 'BlobNotFound': print(f"[INFO] No timestamp blob for {query_name}, starting from {fallback_hours} hours ago.") else: print(f"[WARNING] Could not get latest timestamp for {query_name}: {type(e).__name__}: {e}") return datetime.datetime.utcnow() - datetime.timedelta(hours=fallback_hours) def save_results(self, query_name, data, start, end): filename = f"{query_name}/{start:%Y%m%d%H%M}_{end:%Y%m%d%H%M}.json" blob_client = self.blob_service_client.get_blob_client(self.container_name, filename) try: blob_client.upload_blob(data, overwrite=True) print(f"[SUCCESS] Saved results to blob for {query_name} from {start} to {end}") except Exception as e: print(f"[ERROR] Failed to save results to blob for {query_name}: {type(e).__name__}: {e}") ts_blob_client = self.blob_service_client.get_blob_client(self.container_name, f"{query_name}/latest_timestamp.txt") try: ts_blob_client.upload_blob(end.isoformat(), overwrite=True) except Exception as e: print(f"[ERROR] Failed to update latest timestamp for {query_name}: {type(e).__name__}: {e}") Explanation: The sink abstraction allows you to easily switch between different storage backends. The ADLS implementation saves both the results and the latest timestamp for incremental exports. End-to-End Setup Guide Prerequisites Python 3.8+ Azure SDKs: azure-identity, azure-monitor-query, azure-storage-blob Access to Azure Application Insights and ADLS Service principal credentials with appropriate permissions Steps Clone or Download the Repository Place all code files in a working directory. 2. **Configure **app_config.py Fill in your Azure resource IDs, credentials, and KQL queries. 3. Install Dependencies pip install azure-identity azure-monitor-query azure-storage-blob 4. Run the Application Locally: python monitorliftapp/main.py Deploy to a VM, Azure Container Instance, or as a scheduled job. (Optional) Package as Executable Use PyInstaller or similar tools if you want a standalone executable. 2. Verify Data Movement Check your ADLS container for exported JSON files. 6. Screenshots App Insights Logs:\ Exported Data in ADLS: \ 7. Final Thoughts MonitorLiftApp provides a robust, modular, and extensible solution for exporting telemetry from Azure Monitor to custom sinks. Its design supports parallel query execution, pluggable storage backends, and incremental data movement, making it suitable for a wide range of enterprise scenarios.119Views1like0Comments🚀 Export D365 CE Dataverse Org Data to Cosmos DB via the Office365 Management Activity API
📘 Preface This post demonstrates one method to export Dynamics 365 Customer Engagement (CE) Dataverse organization data using the Office 365 Management Activity API and Azure Functions. It is feasible for customers to build a custom lake-house architecture with this feed, enabling advanced analytics, archiving, or ML/AI scenarios. https://learn.microsoft.com/en-us/office/office-365-management-api/office-365-management-activity-api-reference 🧭 When to Use This Custom Integration While Microsoft offers powerful native integrations like Dataverse Synapse Link and Microsoft Fabric, this custom solution is observed implemented and relevant in the following scenarios: Third-party observability and security tools already use this approach Solutions such as Splunk and other enterprise-grade platforms commonly implement integrations based on the Office 365 Management Activity API to ingest tenant-wide audit data. This makes it easier for customers to align with existing observability pipelines or security frameworks. Customers opt out of Synapse Link or Fabric Whether due to architectural preferences, licensing constraints, or specific compliance requirements, some customers choose not to adopt Microsoft’s native integrations. The Office Management API offers a viable alternative for building custom data export and monitoring solutions tailored to their needs. 🎯 Why Use the Office 365 Management Activity API? Tenant-wide Data Capture: Captures audit logs and activity data across all Dataverse orgs in a tenant. Integration Flexibility: Enables export to Cosmos DB, cold storage, or other platforms for analytics, compliance, or ML/AI. Third-party Compatibility: Many enterprise tools use similar mechanisms to ingest and archive activity data. 🏗️ Architecture Overview Azure Function App (.NET Isolated): Built as webhook, processes notifications, fetches audit content, and stores filtered events in Cosmos DB. Cosmos DB: Stores audit events for further analysis or archiving. Application Insights: Captures logs and diagnostics for troubleshooting. 🛠️ Step-by-Step Implementation https://learn.microsoft.com/en-us/office/office-365-management-api/get-started-with-office-365-management-apis#build-your-app 1. Prerequisites Azure subscription Dynamics 365 CE environment (Dataverse) Azure Cosmos DB account (SQL API) Office 365 tenant admin rights Enable Auditing in Dataverse org 2. Register an Azure AD App Go to Azure Portal > Azure Active Directory > App registrations > New registration Note: Application (client) ID Directory (tenant) ID Create a client secret Grant API permissions: ActivityFeed.Read ActivityFeed.ReadDlp ServiceHealth.Read Grant admin consent 3. Set Up Cosmos DB Create a Cosmos DB account (SQL API) Create: Database: officewebhook Container: dynamicsevents Partition key: /tenantId Note endpoint URI and primary key 4. Create the Azure Function App Use Visual Studio or VS Code Create a new Azure Functions project (.NET 8 Isolated Worker) Add NuGet packages: Microsoft.Azure.Functions.Worker Microsoft.Azure.Cosmos Newtonsoft.Json Function Logic: Webhook validation Notification processing Audit content fetching Event filtering Storage in Cosmos DB 5. Configure Environment Variables { "OfficeApiTenantId": "<your-tenant-id>", "OfficeApiClientId": "<your-client-id>", "OfficeApiClientSecret": "<your-client-secret>", "CrmOrganizationUniqueName": "<your-org-name>", "CosmosDbEndpoint": "<your-cosmos-endpoint>", "CosmosDbKey": "<your-cosmos-key>", "CosmosDbDatabaseId": "officewebhook", "CosmosDbContainerId": "dynamicsevents", "EntityOperationsFilter": { "incident": ["create", "update"], "account": ["create"] } } 6. Deploy the Function App Build and publish using Azure Functions Core Tools or Visual Studio Restart the Function App from Azure Portal Monitor logs via Application Insights 🔔 How to Subscribe to the Office 365 Management Activity API for Audit Notifications To receive audit notifications, you must first subscribe to the Office 365 Management Activity API. This is a two-step process: https://learn.microsoft.com/en-us/office/office-365-management-api/office-365-management-activity-api-reference#start-a-subscription 1. Fetch an OAuth2 Token Authenticate using your Azure AD app credentials to get a bearer token: https://learn.microsoft.com/en-us/office/office-365-management-api/get-started-with-office-365-management-apis # Define your Azure AD app credentials $tenantId = "<your-tenant-id>" $clientId = "<your-client-id>" $clientSecret = "<your-client-secret>" # Prepare the request body for token fetch $body = @{ grant_type = "client_credentials" client_id = $clientId client_secret = $clientSecret scope = "https://manage.office.com/.default" } # Fetch the OAuth2 token $tokenResponse = Invoke-RestMethod -Method Post -Uri "https://login.microsoftonline.com/$tenantId/oauth2/v2.0/token" -Body $body $token = $tokenResponse.access_token 2. Subscribe to the Content Type Use the token to subscribe to the desired content type (e.g., Audit.General): https://learn.microsoft.com/en-us/office/office-365-management-api/office-365-management-activity-api-reference#working-with-the-office-365-management-activity-api $contentType = "Audit.General" $headers = @{ Authorization = "Bearer $token" "Content-Type" = "application/json" } $uri = "https://manage.office.com/api/v1.0/$tenantId/activity/feed/subscriptions/start?contentType=$contentType" $response = Invoke-RestMethod -Method Post -Uri $uri -Headers $headers $response ⚙️ How the Azure Function Works 🔸 Trigger The Azure Function is triggered by notifications from the Office 365 Management Activity API. These notifications include audit events across your entire Azure tenant—not just Dynamics 365. 🔸 Filtering Logic Each notification is evaluated against your business rules: Organization match Entity type (e.g., incident, account) Operation type (e.g., create, update) These filters are defined in the EntityOperationsFilter environment variable: { "incident": ["create", "update"], "account": ["create"] } 🔸 Processing If the event matches your filters, the function fetches the full audit data and stores it in Cosmos DB. If not, the event is ignored. 🔍 Code Explanation: The Run Method 1. Webhook Validation https://learn.microsoft.com/en-us/office/office-365-management-api/office-365-management-activity-api-reference#webhook-validation string validationToken = query["validationToken"]; if (!string.IsNullOrEmpty(validationToken)) { await response.WriteStringAsync(validationToken); response.StatusCode = HttpStatusCode.OK; return response; } 2. Notification Handling https://learn.microsoft.com/en-us/office/office-365-management-api/office-365-management-activity-api-reference#receiving-notifications var notifications = JsonConvert.DeserializeObject<dynamic[]>(requestBody); foreach (var notification in notifications) { if (notification.contentType == "Audit.General" && notification.contentUri != null) { // Process each notification } } 3. Bearer Token Fetch string bearerToken = await GetBearerTokenAsync(log); if (string.IsNullOrEmpty(bearerToken)) continue; 4. Fetch Audit Content https://learn.microsoft.com/en-us/office/office-365-management-api/office-365-management-activity-api-reference#retrieve-content var requestMsg = new HttpRequestMessage(HttpMethod.Get, contentUri); requestMsg.Headers.Authorization = new AuthenticationHeaderValue("Bearer", bearerToken); var result = await httpClient.SendAsync(requestMsg); if (!result.IsSuccessStatusCode) continue; var auditContentJson = await result.Content.ReadAsStringAsync(); 5. Deserialize and Filter Audit Records https://learn.microsoft.com/en-us/office/office-365-management-api/office-365-management-activity-api-schema#dynamics-365-schema var auditRecords = JsonConvert.DeserializeObject<dynamic[]>(auditContentJson); foreach (var eventData in auditRecords) { string orgName = eventData.CrmOrganizationUniqueName ?? ""; string workload = eventData.Workload ?? ""; string entityName = eventData.EntityName ?? ""; string operation = eventData.Message ?? ""; if (workload != "Dynamics 365" && workload != "CRM" && workload != "Power Platform") continue; if (!entityOpsFilter.ContainsKey(entityName)) continue; if (!entityOpsFilter[entityName].Contains(operation)) continue; // Store in Cosmos DB } 6. Store in Cosmos DB var cosmosDoc = new { id = Guid.NewGuid().ToString(), tenantId = notification.tenantId, raw = eventData }; var partitionKey = (string)notification.tenantId; var resp = await cosmosContainer.CreateItemAsync(cosmosDoc, new PartitionKey(partitionKey)); 7. Logging and Error Handling https://learn.microsoft.com/en-us/office/office-365-management-api/office-365-management-activity-api-reference#errors log.LogInformation($"Stored notification in Cosmos DB for contentUri: {notification.contentUri}, DocumentId: {cosmosDoc.id}"); catch (Exception dbEx) { log.LogError($"Error storing notification in Cosmos DB: {dbEx.Message}"); } 🧠 Conclusion This solution provides a robust, extensible pattern for exporting Dynamics 365 CE Dataverse org data to Cosmos DB using the Office 365 Management Activity API. Solution architects can use this as a reference for building or evaluating similar integrations, especially when working with third-party archiving or analytics solutions.166Views1like0Comments🚀 Scaling Dynamics 365 CRM Integrations in Azure: The Right Way to Use the SDK ServiceClient
This blog explores common pitfalls and presents a scalable pattern using the .Clone() method to ensure thread safety, avoid redundant authentication, and prevent SNAT port exhaustion. 🗺️ Connection Factory with Optimized Configuration The first step to building a scalable integration is to configure your ServiceClient properly. Here's how to set up a connection factory that includes all the necessary performance optimizations: public static class CrmClientFactory { private static readonly ServiceClient _baseClient; static CrmClientFactory() { ThreadPool.SetMinThreads(100, 100); // Faster thread ramp-up ServicePointManager.DefaultConnectionLimit = 65000; // Avoid connection bottlenecks ServicePointManager.Expect100Continue = false; // Reduce HTTP latency ServicePointManager.UseNagleAlgorithm = false; // Improve responsiveness _baseClient = new ServiceClient(connectionString); _baseClient.EnableAffinityCookie = false; // Distribute load across Dataverse web servers } public static ServiceClient GetClient() => _baseClient.Clone(); } ❌ Anti-Pattern: One Static Client for All Operations A common anti-pattern is to create a single static instance of ServiceClient and reuse it across all operations: public static class CrmClientFactory { private static readonly ServiceClient _client = new ServiceClient(connectionString); public static ServiceClient GetClient() => _client; } This struggles under load due to thread contention, throttling, and unpredictable behavior. ⚠️ Misleading Fix: New Client Per Request To avoid thread contention, some developers create a new ServiceClient per request, however the below does not truly create seperate connection unless RequireNewInstance=True connection string param or useUniqueInstance:true constructor param are utilized. Many a times these intricate details are missed out and causing same connection be shared across threads with high lock times compounding overall slowness. public async Task Run(HttpRequest req) { var client = new ServiceClient(connectionString); // Use client here } Even with above flags there is a risk of auth failures and SNAT exhaustion in a high throughout service integration scenario due to repeated OAuth authentication every time a ServiceClient instance is created with constructor. ✅ Best Practice: Clone Once, Reuse Per Request The best practice is to create a single authenticated ServiceClient and use its .Clone() method to generate lightweight, thread-safe copies for each request: public static class CrmClientFactory { private static readonly ServiceClient _baseClient = new ServiceClient(connectionString); public static ServiceClient GetClient() => _baseClient.Clone(); } Then, in your Azure Function or App Service operation: ❗ Avoid calling the factory again inside helper methods. Clone once and pass the client down the call stack. public async Task HandleRequest() { var client = CrmClientFactory.GetClient(); // Clone once per request await DoSomething1(client); await DoSomething2(client); } public async Task DoSomething1(ServiceClient client) { await client.Create(); // Avoid new client cloning but just use passed down client as is } 🧵 Parallel Processing with Batching When working with large datasets, combining parallelism with batching using ExecuteMultiple can significantly improve throughput—if done correctly. 🔄 Common Mistake: Dynamic Batching Inside Parallel Loops Many implementations dynamically batch records inside Parallel.ForEach, assuming consistent batch sizes. But in practice, this leads to: Inconsistent batch sizes (1 to 100+) Unpredictable performance Difficult-to-analyze telemetry ✅ Fix: Chunk Before You Batch public static List> ChunkRecords(List records, int chunkSize) { return records .Select((record, index) => new { record, index }) .GroupBy(x => x.index / chunkSize) .Select(g => g.Select(x => x.record).ToList()) .ToList(); } public static void ProcessBatches(List records, ServiceClient serviceClient, int batchSize = 100, int maxParallelism = 5) { var batches = ChunkRecords(records, batchSize); Parallel.ForEach(batches, new ParallelOptions { MaxDegreeOfParallelism = maxParallelism }, batch => { using var service = serviceClient.Clone(); // Clone once per thread var executeMultiple = new ExecuteMultipleRequest { Requests = new OrganizationRequestCollection(), Settings = new ExecuteMultipleSettings { ContinueOnError = true, ReturnResponses = false } }; foreach (var record in batch) { executeMultiple.Requests.Add(new CreateRequest { Target = record }); } service.Execute(executeMultiple); }); } 🚫 Avoiding Throttling: Plan, Don’t Just Retry While it’s possible to implement retry logic for HTTP 429 responses using the Retry-After header, the best approach is to avoid throttling altogether. ✅ Best Practices Control DOP and batch size: Keep them conservative and telemetry driven. Use alternate app registrations: Distribute load across identities but do not overload the Dataverse org. Avoid triggering sync plugins or real-time workflows: These amplify load. Address long-running queries: Optimize operations with Microsoft support help before scaling Relax time constraints: There’s no need to finish a job in 1 hour if it can be done safely in 3. 🌐 When to Consider Horizontal Scaling Even with all the right optimizations, your integration may still hit limits under the HTTP stack—such as: WCF binding timeouts SNAT port exhaustion Slowness not explained by Dataverse telemetry In these cases, horizontal scaling becomes essential. App Services: Easily scale out using auto scale rules. Function Apps (service model): Scale well with HTTP or service bus triggers. Scheduled Functions: Require deduplication logic to avoid duplicate processing. On-Premises VM: When D365 SDK based integrations hosted on VM infra, they shall need horizontal scaling by increasing servers. 🧠 Final Thoughts Scaling CRM integrations in Azure is about resilience, observability, and control. Follow these patterns: Clone once per thread Pre-chunk batches Tune with telemetry evidence Avoid overload when you can Scale horizontally when needed—but wisely Build integrations that are fast, reliable, and future proof.268Views1like0Comments