Endpoint Management
42 TopicsBuilding a Custom Continuous Export Pipeline for Azure Application Insights
1. Introduction MonitorLiftApp is a modular Python application designed to export telemetry data from Azure Application Insights to custom sinks such as Azure Data Lake Storage Gen2 (ADLS). This solution is ideal when Event Hub integration is not feasible, providing a flexible, maintainable, and scalable alternative for organizations needing to move telemetry data for analytics, compliance, or integration scenarios. 2. Problem Statement Exporting telemetry from Azure Application Insights is commonly achieved via Event Hub streaming. However, there are scenarios where Event Hub integration is not possible due to cost, complexity, or architectural constraints. In such cases, organizations need a reliable, incremental, and customizable way to extract telemetry data and move it to a destination of their choice, such as ADLS, SQL, or REST APIs. 3. Investigation Why Not Event Hub? Event Hub integration may not be available in all environments. Some organizations have security or cost concerns. Custom sinks (like ADLS) may be required for downstream analytics or compliance. Alternative Approach Use the Azure Monitor Query SDK to periodically export data. Build a modular, pluggable Python app for maintainability and extensibility. 4. Solution 4.1 Architecture Overview MonitorLiftApp is structured into four main components: Configuration: Centralizes all settings and credentials. Main Application: Orchestrates the export process. Query Execution: Runs KQL queries and serializes results. Sink Abstraction: Allows easy swapping of data targets (e.g., ADLS, SQL). 4.2 Configuration (app_config.py) All configuration is centralized in app_config.py, making it easy to adapt the app to different environments. CONFIG = { "APPINSIGHTS_APP_ID": "<your-app-id>", "APPINSIGHTS_WORKSPACE_ID": "<your-workspace-id>", "STORAGE_ACCOUNT_URL": "<your-adls-url>", "CONTAINER_NAME": "<your-container>", "Dependencies_KQL": "dependencies \n limit 10000", "Exceptions_KQL": "exceptions \n limit 10000", "Pages_KQL": "pageViews \n limit 10000", "Requests_KQL": "requests \n limit 10000", "Traces_KQL": "traces \n limit 10000", "START_STOP_MINUTES": 5, "TIMER_MINUTES": 5, "CLIENT_ID": "<your-client-id>", "CLIENT_SECRET": "<your-client-secret>", "TENANT_ID": "<your-tenant-id>" } Explanation: This configuration file contains all the necessary parameters for connecting to Azure resources, defining KQL queries, and scheduling the export job. By centralizing these settings, the app becomes easy to maintain and adapt. 4.3 Main Application (main.py) The main application is the entry point and can be run as a Python console app. It loads the configuration, sets up credentials, and runs the export job on a schedule. from app_config import CONFIG from azure.identity import ClientSecretCredential from monitorlift.query_runner import run_all_queries from monitorlift.target_repository import ADLSTargetRepository def build_env(): env = {} keys = [ "APPINSIGHTS_WORKSPACE_ID", "APPINSIGHTS_APP_ID", "STORAGE_ACCOUNT_URL", "CONTAINER_NAME" ] for k in keys: env[k] = CONFIG[k] for k, v in CONFIG.items(): if k.endswith("KQL"): env[k] = v return env class MonitorLiftApp: def __init__(self, client_id, client_secret, tenant_id): self.env = build_env() self.credential = ClientSecretCredential(tenant_id, client_id, client_secret) self.target_repo = ADLSTargetRepository( account_url=self.env["STORAGE_ACCOUNT_URL"], container_name=self.env["CONTAINER_NAME"], cred=self.credential ) def run(self): run_all_queries(self.target_repo, self.credential, self.env) if __name__ == "__main__": import time client_id = CONFIG["CLIENT_ID"] client_secret = CONFIG["CLIENT_SECRET"] tenant_id = CONFIG["TENANT_ID"] app = MonitorLiftApp(client_id, client_secret, tenant_id) timer_interval = app.env.get("TIMER_MINUTES", 5) print(f"Starting continuous export job. Interval: {timer_interval} minutes.") while True: print("\n[INFO] Running export job at", time.strftime('%Y-%m-%d %H:%M:%S')) try: app.run() print("[INFO] Export complete.") except Exception as e: print(f"[ERROR] Export failed: {e}") print(f"[INFO] Sleeping for {timer_interval} minutes...") time.sleep(timer_interval * 60) Explanation: The app can be run from any machine with Python and the required libraries installed—locally or in the cloud (VM, container, etc.). No compilation is needed; just run as a Python script. Optionally, you can package it as an executable using tools like PyInstaller. The main loop schedules the export job at regular intervals. 4.4 Query Execution (query_runner.py) This module orchestrates KQL queries, runs them in parallel, and serializes results. import datetime import json from concurrent.futures import ThreadPoolExecutor, as_completed from azure.monitor.query import LogsQueryClient def run_query_for_kql_var(kql_var, target_repo, credential, env): query_name = kql_var[:-4] print(f"[START] run_query_for_kql_var: {kql_var}") query_template = env[kql_var] app_id = env["APPINSIGHTS_APP_ID"] workspace_id = env["APPINSIGHTS_WORKSPACE_ID"] try: latest_ts = target_repo.get_latest_timestamp(query_name) print(f"Latest timestamp for {query_name}: {latest_ts}") except Exception as e: print(f"Error getting latest timestamp for {query_name}: {e}") return start = latest_ts time_window = env.get("START_STOP_MINUTES", 5) end = start + datetime.timedelta(minutes=time_window) query = f"app('{app_id}')." + query_template logs_client = LogsQueryClient(credential) try: response = logs_client.query_workspace(workspace_id, query, timespan=(start, end)) if response.tables and len(response.tables[0].rows) > 0: print(f"Query for {query_name} returned {len(response.tables[0].rows)} rows.") table = response.tables[0] rows = [ [v.isoformat() if isinstance(v, datetime.datetime) else v for v in row] for row in table.rows ] result_json = json.dumps({"columns": table.columns, "rows": rows}) target_repo.save_results(query_name, result_json, start, end) print(f"Saved results for {query_name}") except Exception as e: print(f"Error running query or saving results for {query_name}: {e}") def run_all_queries(target_repo, credential, env): print("[INFO] run_all_queries triggered.") kql_vars = [k for k in env if k.endswith('KQL') and not k.startswith('APPSETTING_')] print(f"Number of KQL queries to run: {len(kql_vars)}. KQL vars: {kql_vars}") with ThreadPoolExecutor(max_workers=len(kql_vars)) as executor: futures = { executor.submit(run_query_for_kql_var, kql_var, target_repo, credential, env): kql_var for kql_var in kql_vars } for future in as_completed(futures): kql_var = futures[future] try: future.result() except Exception as exc: print(f"[ERROR] Exception in query {kql_var}: {exc}") Explanation: Queries are executed in parallel for efficiency. Results are serialized and saved to the configured sink. Incremental export is achieved by tracking the latest timestamp for each query. 4.5 Sink Abstraction (target_repository.py) This module abstracts the sink implementation, allowing you to swap out ADLS for SQL, REST API, or other targets. from abc import ABC, abstractmethod import datetime from azure.storage.blob import BlobServiceClient class TargetRepository(ABC): @abstractmethod def get_latest_timestamp(self, query_name): pass @abstractmethod def save_results(self, query_name, data, start, end): pass class ADLSTargetRepository(TargetRepository): def __init__(self, account_url, container_name, cred): self.account_url = account_url self.container_name = container_name self.credential = cred self.blob_service_client = BlobServiceClient(account_url=account_url, credential=cred) def get_latest_timestamp(self, query_name, fallback_hours=3): blob_client = self.blob_service_client.get_blob_client(self.container_name, f"{query_name}/latest_timestamp.txt") try: timestamp_str = blob_client.download_blob().readall().decode() return datetime.datetime.fromisoformat(timestamp_str) except Exception as e: if hasattr(e, 'error_code') and e.error_code == 'BlobNotFound': print(f"[INFO] No timestamp blob for {query_name}, starting from {fallback_hours} hours ago.") else: print(f"[WARNING] Could not get latest timestamp for {query_name}: {type(e).__name__}: {e}") return datetime.datetime.utcnow() - datetime.timedelta(hours=fallback_hours) def save_results(self, query_name, data, start, end): filename = f"{query_name}/{start:%Y%m%d%H%M}_{end:%Y%m%d%H%M}.json" blob_client = self.blob_service_client.get_blob_client(self.container_name, filename) try: blob_client.upload_blob(data, overwrite=True) print(f"[SUCCESS] Saved results to blob for {query_name} from {start} to {end}") except Exception as e: print(f"[ERROR] Failed to save results to blob for {query_name}: {type(e).__name__}: {e}") ts_blob_client = self.blob_service_client.get_blob_client(self.container_name, f"{query_name}/latest_timestamp.txt") try: ts_blob_client.upload_blob(end.isoformat(), overwrite=True) except Exception as e: print(f"[ERROR] Failed to update latest timestamp for {query_name}: {type(e).__name__}: {e}") Explanation: The sink abstraction allows you to easily switch between different storage backends. The ADLS implementation saves both the results and the latest timestamp for incremental exports. End-to-End Setup Guide Prerequisites Python 3.8+ Azure SDKs: azure-identity, azure-monitor-query, azure-storage-blob Access to Azure Application Insights and ADLS Service principal credentials with appropriate permissions Steps Clone or Download the Repository Place all code files in a working directory. 2. **Configure **app_config.py Fill in your Azure resource IDs, credentials, and KQL queries. 3. Install Dependencies pip install azure-identity azure-monitor-query azure-storage-blob 4. Run the Application Locally: python monitorliftapp/main.py Deploy to a VM, Azure Container Instance, or as a scheduled job. (Optional) Package as Executable Use PyInstaller or similar tools if you want a standalone executable. 2. Verify Data Movement Check your ADLS container for exported JSON files. 6. Screenshots App Insights Logs:\ Exported Data in ADLS: \ 7. Final Thoughts MonitorLiftApp provides a robust, modular, and extensible solution for exporting telemetry from Azure Monitor to custom sinks. Its design supports parallel query execution, pluggable storage backends, and incremental data movement, making it suitable for a wide range of enterprise scenarios.102Views1like0CommentsArcihtekt M365 // Ogłoszenie pracy
Kim jesteśmy? Technologia to nasza pasja, ale nie tylko! Wspieramy inicjatywy społeczne, ekologiczne i promujące aktywny styl życia. Jesteśmy laureatem prestiżowych nagród posiadamy certyfikat Great Place to Work, a na co dzień współpracujemy z globalnymi liderami IT - VMware, Fortinet, IBM, HPE, Dell, Hitachi, Microsoft, AWS. Nasz zespół tworzą utalentowani inżynierowie i doświadczeni architekci IT. Dołącz do nas i zostań częścią #ITSFteam! Kogo szukamy? Arhitekta M365, który dołączy do naszego zespołu i będzie odpowiedzialny za projektowanie, wdrażanie oraz zarządzanie rozwiązaniami opartymi na Microsoft 365. Idealny kandydat to osoba z doświadczeniem w architekturze chmurowych rozwiązań Microsoft, posiadająca umiejętność kompleksowego projektowania i optymalizacji procesów w obrębie aplikacji i usług M365, takich jak Teams, Sharepoint, Exchange Online, OneDrive, Power Platform czy Microsoft 365 Copilot. Warto od razu zaznaczyć, będzie to praca w modelu hybrydowym 4/1 w Warszawie. Co oferujemy? Współpaca bezpośrednio z nami na okres długofalowy (5+ lat); Możliwość rozwoju przy pracach dla największych klientów Enterprise w całym kraju; Pakiet medyczny Medicover; Karta Multisport; Program PPK; Lekcje angielskiego; Dodatkowy dzień urlopu z okazji urodzin; Około 8 integracji frmowych w roku :) Jeśli propozycja brzmi interesująco i chciałbyś poznać więcej szczegółów na temat wymagań, bądź zakresu obowiązków — to śmiało aplikuj przez link niżej: https://itsf.traffit.com/public/an/0ed08bcedcd522af2936290b48d33a9e4869756520Views0likes0CommentsWindows 11 Upgrade mit Intune
I used Intune (Feature Update) to upgrade from Windows 10 to Windows 11. For some devices, the update was completed within 12 hours. However, there were also devices that took 48 hours or longer to update to Windows. In the meantime, I carried out software installations (via Intune) on the devices within an hour. How can I force the feature update? Especially for new devices? Thank you for your support Stefan69Views0likes1CommentAzure AD Join (Entra Join) vs Hybrid Azure AD Join vs Azure AD Registration (Workplace Join)
I still find it hard to understand the differences between Azure AD Join (Entra Join) vs Hybrid Azure AD Join vs Azure AD Registration (Workplace Join). I know Azure AD Registration (Workplace Join) is supposed to be nest for Personal devices (BYOD) but if you have security as an important part of your business why would you want to allow this? You could end up with a billion random machines in your Entra. What's the benefit of this? Also, if I have a Hybrid environment and I have booth cloud and on prem apps that do auth via both on prem (for on prem apps linked to AD) and Entra for cloud do I need to be Hybrid Azure AD Joined to support on prem an cloud? Or will a person working from a Azure AD Joined machine still be able to access on prem resources like file servers and any app that uses AD groups for auth, access provisioning etc?332Views0likes2CommentsCompliance licenses at tenant level
Hi, We are a small organization of about 200 employees, and we have following requirements. DLP policies configuration at Exchange, OneDrive, SharePoint BYOD security Users should not be able to send files outside the org And so on as we evaluate We already have M365 Business Premium. However, after researching we figured out that M365 Business premium will alone not solve our requirements. May be compliance license will. We want to apply security policies at tenant level in our organization but definitely do not want every user to get licenses as this will be expensive for us and there is no requirement at all for our users. The question is, Is there a way to solve the above scenario?290Views0likes2Comments'$skiptoken' limit error for Microsoft Exchange online Reporting web service API
I was working on integrating MessageTrace report API as a part of my SIEM integration: https://reports.office365.com/ecp/reportingwebservice/reporting.svc/MessageTrace[?ODATA%20options] I have noticed that, whenever my $skiptoken reaches the limit 999999 , it throws the following error with 500 status code: { "odata.error": { "code": "UnknownError", "message": { "lang": "", "value": "An error has occurred on the server." } } } It was working fine for the 999998 value, but wasn't for the $skiptoken value 999999. Is there any limitations on $skiptoken value from the API itself? Also, need information, if $skiptoken value 999999 exists, for example, "odata.nextLink": "../../reportingwebservice/reporting.svc/MessageTrace?$filter=StartDate%20eq%20DateTime'2024-12-02T00%3A00%3A00Z'%20and%20EndDate%20eq%20DateTime'2024-12-02T23%3A59%3A59Z'&$skiptoken=999999" then how can we request the data from next set of events? Can someone let me know, is there any max limit from Microsoft API side or for the $skiptoken?54Views0likes0CommentsMDE Platform stuck in Version 4.18.24080.9
We currently have Microsoft Defender for Endpoint for our Windows 11 Devices. Upon checking the devices in security portal most of them have "NOT UP TO DATE" PLATFORM. We tried the following to update the MDE on the clients: Get-WindowsUpdate -Install -KBArticleID KB4052623 -> Restart Update-MpSignature -> Restart Manual update by going to Virus & Threat Protection Settings -> Restart But we only see update on Security Intelligence. For MDE Platform it is stuck on Version 4.18.24080.9. What are we missing?110Views0likes0CommentsEdge, Rewrite with Copilot, Work Profiles
I was enjoying the rewrite with CoPilot (Alt +I) feature in edge when using my online database for communication notes. With the "improvement" to Microsoft 365 & edge, they locked it down with enterprise data protection. I get it and understand the need for it. But... I need to disable this. I am my own global admin to my Microsoft 365 premium subscription. I have 3 users/employees. (One is my spouse). I have spent the last several days going through my Entra settings and Edge/Copilot settings in the Admin panel to try and figure out how to turn this feature back on in our Edge Work Profiles. Could someone here please explain it to me like I am 5 years old, the process in which to enable this rewrite with Copilot feature again? I understand I need to override the data protection settings it cannot figure out how to get it to work. Some of the technet articles are beyond me with all these policy & profiles. Does it need to be so difficult?159Views0likes0Comments