analytics
13 TopicsBYOPI - Design your own custom private AI Search indexer with no code ADF (SQLServer on private VM)
Executive Summary Building a fully private search indexing solution using Azure Data Factory (ADF) to sync SQL Server data from private VM to Azure AI Search is achievable but comes with notable complexities and limitations. This blog shares my journey, discoveries, and honest assessment of the BYOPI (Build Your Own Private Indexer) architecture. Architectural flow: Table of Contents Overall Setup How ADF works in this approach with Azure AI Search Challenges - discovered Pros and Cons: An Honest Assessment Conclusion and Recommendations 1. Overall Setup: Phase 1: Resource Group & Network Setup : create resource group and vNET (virtual network) in any region of your choice Phase 2: Deploy SQL Server VM: Phase 3: Create Azure Services - ADF (Azure Data Factory), Azure AI Search and AKV (Azure Key Vault) service from portal or from your choice of deployment. Phase 4: Create Private Endpoints for all the services in their dedicated subnets: Phase 5: Configure SQL Server on VM : connect to VM via bastion and setup database, tables & SP: Sample metadata used as below: CREATE DATABASE BYOPI_DB; GO USE BYOPI_DB; GO CREATE TABLE Products ( ProductId INT IDENTITY(1,1) PRIMARY KEY, ProductName NVARCHAR(200) NOT NULL, Description NVARCHAR(MAX), Category NVARCHAR(100), Price DECIMAL(10,2), InStock BIT DEFAULT 1, Tags NVARCHAR(500), IsDeleted BIT DEFAULT 0, CreatedDate DATETIME DEFAULT GETDATE(), ModifiedDate DATETIME DEFAULT GETDATE() ); CREATE TABLE WatermarkTable ( TableName NVARCHAR(100) PRIMARY KEY, WatermarkValue DATETIME ); INSERT INTO WatermarkTable VALUES ('Products', '2024-01-01'); CREATE PROCEDURE sp_update_watermark @TableName NVARCHAR(100), @NewWatermark DATETIME AS BEGIN UPDATE WatermarkTable SET WatermarkValue = @NewWatermark WHERE TableName = @TableName; END; INSERT INTO Products (ProductName, Description, Category, Price, Tags) VALUES ('Laptop Pro', 'High-end laptop', 'Electronics', 1299.99, 'laptop,computer'), ('Office Desk', 'Adjustable desk', 'Furniture', 599.99, 'desk,office'), ('Wireless Mouse', 'Bluetooth mouse', 'Electronics', 29.99, 'mouse,wireless'); Phase 6: Install Self-Hosted Integration Runtime Create SHIR in ADF: Go to ADF resource in Azure Portal Click "Open Azure Data Factory Studio" Note: You need to access from a VM in the same VNet or via VPN since ADF is private In ADF Studio, click Manage (toolbox icon) Select Integration runtimes → "+ New" Select "Azure, Self-Hosted" → "Self-Hosted" Name: SHIR-BYOPI or of your choice Click "Create" Copy Key1 (save it) Install SHIR on VM In the VM (via Bastion): Open browser, go to: https://www.microsoft.com/download/details.aspx?id=39717 Download and install Integration Runtime During setup: Launch Configuration Manager Paste the Key1 from Step 14 Click "Register" Wait for "Connected" status Phase 7: Create Search Index through below powershell script and saving it as search_index.ps1 $searchService = "search-byopi" $apiKey = "YOUR-ADMIN-KEY" $headers = @{ 'api-key' = $apiKey 'Content-Type' = 'application/json' } $index = @{ name = "products-index" fields = @( @{name="id"; type="Edm.String"; key=$true} @{name="productName"; type="Edm.String"; searchable=$true} @{name="description"; type="Edm.String"; searchable=$true} @{name="category"; type="Edm.String"; filterable=$true; facetable=$true} @{name="price"; type="Edm.Double"; filterable=$true} @{name="inStock"; type="Edm.Boolean"; filterable=$true} @{name="tags"; type="Collection(Edm.String)"; searchable=$true} ) } | ConvertTo-Json -Depth 10 Invoke-RestMethod ` -Uri "https://$searchService.search.windows.net/indexes/products-index?api-version=2020-06-30" ` -Method PUT ` -Headers $headers ` -Body $index Phase 8: Configure AKV & ADF Components - Link AKV and ADF for secrets Create Key Vault Secrets Navigate to kv-byopi (created AKV resource) in Portal Go to "Access policies" Click "+ Create" Select permissions: Get, List for secrets Select principal: adf-byopi-private Create Go to "Secrets" → "+ Generate/Import": Name: sql-password, Value: <> Name: search-api-key, Value: Your search key Create Linked Services in ADF Access ADF Studio from the VM (since it's private): Key Vault Linked Service: Manage → Linked services → "+ New" Search "Azure Key Vault" Configure: Name: LS_KeyVault Azure Key Vault: kv-byopi Integration runtime: AutoResolveIntegrationRuntime Test connection → Create SQL Server Linked Service: "+ New" → "SQL Server" Configure: Name: LS_SqlServer Connect via: SHIR-BYOPI Server name: localhost Database: BYOPI_DB Authentication: SQL Authentication User: sqladmin Password: Select from Key Vault → LS_KeyVault → sql-password Test → Create Azure Search Linked Service: "+ New" → "Azure Search" Configure: Name: LS_AzureSearch URL: https://search-byopi.search.windows.net Connect via: SHIR-BYOPI - Important - use SHIR API Key: From Key Vault → LS_KeyVault → search-api-key Test → Create Phase 9: Create ADF Datasets and PipelineCreate Datasets SQL Products Dataset: Author → Datasets → "+" → "New dataset" Select "SQL Server" → Continue Select "Table" → Continue Properties: Name: DS_SQL_Products Linked service: LS_SqlServer Table: Select Products click OK Watermark Dataset: Repeat with: Name: DS_SQL_Watermark Table: WatermarkTable Search Dataset: "+" → "Azure Search" Properties: Name: DS_Search_Index Linked service: LS_AzureSearch Index name: products-index Create Pipeline Author → Pipelines → "+" → "Pipeline" Name: PL_BYOPI_Private From Activities → General, drag "Lookup" activity Configure Lookup 1: Name: LookupOldWatermark Settings: Source dataset: DS_SQL_Watermark Query: below sql SELECT WatermarkValue FROM WatermarkTable WHERE TableName='Products' - **First row only**: ✓ Add another Lookup: Name: LookupNewWatermark Query: below sql SELECT MAX(ModifiedDate) as NewWatermark FROM Products Add Copy Data activity: Name: CopyToSearchIndex Source: Dataset: DS_SQL_Products Query: sql SELECT CAST(ProductId AS NVARCHAR(50)) as id, ProductName as productName, Description as description, Category as category, Price as price, InStock as inStock, Tags as tags, CASE WHEN IsDeleted = 1 THEN 'delete' ELSE 'upload' END as [@search.action] FROM Products WHERE ModifiedDate > '@{activity('LookupOldWatermark').output.firstRow.WatermarkValue}' AND ModifiedDate <= '@{activity('LookupNewWatermark').output.firstRow.NewWatermark}' Sink: Dataset: DS_Search_Index Write behavior: Merge Batch size: 1000 Add Stored Procedure activity: Name: UpdateWatermark SQL Account: LS_SqlServer Stored procedure: sp_update_watermark Parameters: TableName: Products NewWatermark: @{activity('LookupNewWatermark').output.firstRow.NewWatermark} Connect activities with success conditions Phase 10: Test and Schedule Test Pipeline Click "Debug" in pipeline Monitor in Output panel Check for green checkmarks Create Trigger In pipeline, click "Add trigger" → "New/Edit" Click "+ New" Configure: Name: TR_Hourly Type: Schedule Recurrence: Every 1 Hour OK → Publish All Monitor Go to Monitor tab View Pipeline runs Check Trigger runs Your pipeline should look like this: Phase 11: Validation & Testing Verify Private Connectivity From the VM, run PowerShell: # Test DNS resolution (should return private IPs) nslookup adf-byopi-private.datafactory.azure.net # Should show private IP like : 10.0.2.x nslookup search-byopi.search.windows.net # Should show private IP like : 10.0.2.x nslookup kv-byopi.vault.azure.net # Should show private IP like : 10.0.2.x # Test Search $headers = @{ 'api-key' = 'YOUR-KEY' } Invoke-RestMethod -Uri "https://search-byopi.search.windows.net/indexes/products-index/docs?`$count=true&api-version=2020-06-30" -Headers $headers Test Data Sync (adding few records) and verify in search index: -- Add test record INSERT INTO Products (ProductName, Description, Category, Price, Tags) VALUES ('Test Product Private', 'Testing private pipeline', 'Test', 199.99, 'test,private'); -- Trigger pipeline manually or wait for schedule -- Then verify in Search index 2. How ADF works in this approach with Azure AI search: Azure AI Search uses a REST API for indexing or called as uploading. When ADF sink uploads data to AI Search, it's actually making HTTP POST requests: for example - POST https://search-byopi.search.windows.net/indexes/products-index/docs/index?api-version=2020-06-30 Content-Type: application/json api-key: YOUR-ADMIN-KEY { "value": [ { "@search.action": "upload", "id": "1", "productName": "Laptop", "price": 999.99 }, { "@search.action": "delete", "id": "2" } ] } Delete action used here is soft delete and not hard delete. pipeline query: SELECT CAST(ProductId AS NVARCHAR(50)) as id, -- Renamed to match index field ProductName as productName, -- Renamed to match index field Description as description, Category as category, Price as price, InStock as inStock, Tags as tags, CASE WHEN IsDeleted = 1 THEN 'delete' ELSE 'upload' END as [@search.action] -- Special field with @ prefix FROM Products WHERE ModifiedDate > '2024-01-01' ``` Returns this resultset: ``` id | productName | description | category | price | inStock | tags | @search.action ----|----------------|------------------|-------------|--------|---------|----------------|--------------- 1 | Laptop Pro | High-end laptop | Electronics | 1299 | 1 | laptop,computer| upload 2 | Office Chair | Ergonomic chair | Furniture | 399 | 1 | chair,office | upload 3 | Deleted Item | Old product | Archive | 0 | 0 | old | delete The @search.action Field - The Magic Control This special field tells Azure AI Search what to do with each document: @search.action What It Does When to Use What Happens If Document... upload Insert OR Update Most common - upsert operation Exists: Updates it<br>Doesn't exist: Creates it merge Update only When you know it exists Exists: Updates specified fields<br>Doesn't exist: ERROR mergeOrUpload Update OR Insert Safe update Exists: Updates fields<br>Doesn't exist: Creates it delete Remove from index To remove documents Exists: Deletes it<br>Doesn't exist: Ignores (no error) ADF automatically converts SQL results to JSON format required by Azure Search: { "value": [ { "@search.action": "upload", "id": "1", "productName": "Laptop Pro", "description": "High-end laptop", "category": "Electronics", "price": 1299.00, "inStock": true, "tags": "laptop,computer" }, { "@search.action": "upload", "id": "2", "productName": "Office Chair", "description": "Ergonomic chair", "category": "Furniture", "price": 399.00, "inStock": true, "tags": "chair,office" }, { "@search.action": "delete", "id": "3" // For delete, only ID is needed } ] } ADF doesn't send all records at once. It batches them based on writeBatchSize and each batch is a separate HTTP POST to Azure Search How ADF will detect new changes and run batches: Watermark will be updated after each successful ADF run to detect new changes as below: Handling different scenarios: Scenario 1: No Changes Between Runs: Run at 10:00 AM: - Old Watermark: 09:45:00 - New Watermark: 10:00:00 - Query: WHERE ModifiedDate > '09:45' AND <= '10:00' - Result: 0 rows - Action: Still update watermark to 10:00 - Why: Prevents reprocessing if changes come later Scenario 2: Bulk Insert Happens: Someone inserts 5000 records at 10:05 AM Run at 10:15 AM: - Old Watermark: 10:00:00 - New Watermark: 10:15:00 - Query: WHERE ModifiedDate > '10:00' AND <= '10:15' - Result: 5000 rows - Action: Process all 5000, update watermark to 10:15 Scenario 3: Pipeline Fails Run at 10:30 AM: - Old Watermark: 10:15:00 (unchanged from last success) - Pipeline fails during Copy activity - Watermark NOT updated (still 10:15:00) Next Run at 10:45 AM: - Old Watermark: 10:15:00 (still the last successful) - New Watermark: 10:45:00 - Query: WHERE ModifiedDate > '10:15' AND <= '10:45' - Result: Gets ALL changes from 10:15 to 10:45 (30 minutes of data) - No data loss! Note: There is still room for improvement by refining this logic to handle more advanced scenarios. However, I have not examined the logic in depth, as the goal here is to review how the overall setup functions, identify its limitations, and compare it with the indexing solutions available in AI Search. 3. Challenges - disovered: When I tried to set out to build a private search indexer for SQL Server data residing on an Azure VM with no public IP, the solution seemed straightforward: use Azure Data Factory to orchestrate the data movement to Azure AI Search. The materials made it sound simple. The reality? It's possible, but the devil is in the details. What We Needed: ✅ SQL Server on private VM (no public IP) ✅ Azure AI Search with private endpoint ✅ No data over public internet ✅ Support for full CRUD operations ✅ Near real-time synchronization ✅ No-code/low-code solution Reality Check: ⚠️ DELETE operations not natively supported in ADF sink ⚠️ Complex networking requirements ⚠️ Higher costs than expected ⚠️ Significant setup complexity ✅ But it IS possible with workarounds Components Required Azure VM: ~$150/month (D4s_v3) Self-Hosted Integration Runtime: Free (runs on VM) Private Endpoints: ~$30/month (approx 3 endpoints) Azure Data Factory: ~$15-60/month (depends on frequency) Azure AI Search: ~$75/month (Basic tier) Total: ~$270-315/month** The DELETE Challenge: Despite Azure AI Search REST API fully supporting delete operations via @search.action, ADF's native Azure Search sink does NOT support delete operations. -- This SQL query with delete action SELECT ProductId as id, CASE WHEN IsDeleted = 1 THEN 'delete' ELSE 'upload' END as [@search.action] FROM Products -- Will NOT delete documents in Azure Search when using Copy activity -- The @search.action = 'delete' is ignored by ADF sink! Nevertheless, there is a workaround using the Web Activity approach or by calling the REST API from the ADF side to perform the delete operation. { "name": "DeleteViaREST", "type": "Web", "typeProperties": { "url": "https://search.windows.net/indexes/index/docs/index", "method": "POST", "body": { "value": [ {"@search.action": "delete", "id": "123"} ] } } } Development Challenges No Direct Portal Access: With ADF private, you need: Jump box in the same VNet VPN connection Bastion for access Testing Complexity: Can't use Postman from local machine Need to test from within VNet Debugging requires multiple tools 4. Pros and Cons: An Honest Assessment: Pros: Security: Complete network isolation Compliance: Meets strict requirements No-Code: Mostly configuration-based Scalability: Can handle large datasets Monitoring: Built-in ADF monitoring Managed Service: Microsoft handles updates Cons: DELETE Complexity: Not natively supported Cost: Higher than expected Setup Complexity: Many moving parts Debugging: Difficult with private endpoints Hidden Gotchas: - SHIR requires Windows VM (Linux in preview) - Private endpoint DNS propagation delays - ADF Studio timeout with private endpoints - SHIR auto-update can break pipelines 5. Conclusion and Recommendations: When to Use BYOPI: ✅ Good Fit: Strict security requirements Needs indexing from an un-supported scenarios for example SQL server residing on private VM Budget > $500/month Team familiar with Azure networking Read-heavy workloads ❌ Poor Fit: Simple search requirements Budget conscious Need real-time updates Heavy DELETE operations Small team without Azure expertise BYOPI works, but it's more complex and expensive than initially expected. The lack of native DELETE support in ADF sink is a significant limitation that requires workarounds. Key Takeaways It works but requires significant effort DELETE (hard) operations need workarounds Costs will be higher than expected Complexity is substantial for a "no-code" solution Alternative solutions might be better for many scenarios Disclaimer: The sample scripts provided in this article are provided AS IS without warranty of any kind. The author is not responsible for any issues, damages, or problems that may arise from using these scripts. Users should thoroughly test any implementation in their environment before deploying to production. Azure services and APIs may change over time, which could affect the functionality of the provided scripts. Always refer to the latest Azure documentation for the most up-to-date information. Thanks for reading this blog! I hope you've found this approach of creating own private indexing solution for Azure AI Search (BYOPI) useful 😀154Views1like0CommentsFrom Doubt to Victory: How I Passed Microsoft SC-200
Hey everyone! I wanted to share my journey of how I went from doubting my chances to successfully passing the Microsoft SC-200 exam. At first, the idea of taking the SC-200 seemed overwhelming. With so many topics to cover, especially with the integration of Microsoft security technologies, I wasn’t sure if I could pull it off. But after months of studying and staying consistent, I finally passed! 🎉 Here’s what worked for me: Study Plan: I created a structured study schedule and stuck to it. I broke down each section of the exam objectives and allocated time for each part. Authentic Exam Questions: I used it-examstest for practice exams. Their realistic test format helped me get a good grasp of the exam pattern. Plus, the explanations for the answers were super helpful in understanding the concepts. Practice Exams: I did multiple mock tests. Honestly, they helped me more than I expected! They boosted my confidence, and I could pinpoint areas where I needed to improve. SC-200 Study Materials: I relied on a combination of online courses, books, and video resources. Watching the study videos and taking notes helped me retain the information better. Don’t Cram: I didn’t leave things to the last minute. It took me about 2-3 months of consistent study to get comfortable with the material. I made sure to take breaks and not burn myself out. Passing this exam felt amazing! If you're in the same boat and feeling uncertain, just stick with it! It’s a challenging exam, but with the right tools and preparation, you can do it. Keep pushing forward, and good luck to everyone! 💪 Would be happy to answer any questions if anyone has them!1.3KViews1like7CommentsScaling Smart with Azure: Architecture That Works
Hi Tech Community! I’m Zainab, currently based in Abu Dhabi and serving as Vice President of Finance & HR at Hoddz Trends LLC a global tech solutions company headquartered in Arkansas, USA. While I lead on strategy, people, and financials, I also roll up my sleeves when it comes to tech innovation. In this discussion, I want to explore the real-world challenges of scaling systems with Microsoft Azure. From choosing the right architecture to optimizing performance and cost, I’ll be sharing insights drawn from experience and I’d love to hear yours too. Whether you're building from scratch, migrating legacy systems, or refining deployments, let’s talk about what actually works.84Views0likes1CommentResoure Graph Explorer
I’m looking to retrieve a list of Azure resources that were created within the last 24 hours. However, it appears that Azure does not consistently expose the timeCreated property across all resource types, which makes direct filtering challenging. Request for Clarification/Support: Could you please confirm if there’s a reliable way to filter resources based on their creation time — for example, resources created in the last N days or within the last 6 hours? If timeCreated is not uniformly available, what’s the recommended approach (e.g., using Resource Graph, Activity Logs, or any other reliable method) to achieve this?126Views0likes2CommentsAzure Form Recognizer Redaction Issue with Scanned PDFs and Page Size Variations
Hi all, I’m working on a PDF redaction process using Azure Form Recognizer and Azure Functions. The flow works well in most cases — I extract the text and bounding box coordinates and apply redaction based on that. However, I’m facing an issue with scanned PDFs or PDFs with slightly different page sizes. In these cases, the redaction boxes don’t align properly — they either miss the text or appear slightly off (above or below the intended area). It seems like the coordinate mapping doesn't match accurately when the document isn't a standard A4 size or has DPI inconsistencies. Has anyone else encountered this? Any suggestions on: Adjusting for page size or DPI dynamically? Mapping normalized coordinates correctly for scanned PDFs? Appreciate any help or suggestions!87Views0likes1CommentError code 11408: The operation has timed out. Id. de actividad
Hello, I am starting with Azure Synapse, and when I want to ingest data with a copy, when I configure the connection to the data source (In this case, it is HTTP with a URL), I get this error, and I don't know why. I have configured the storage account with the IPs that have permissions, and I have also configured the IPs that have access in my Synapse resource. Additionally, I have enabled the managed virtual network with Data exfiltration protection enabled. I believe this should be related to that, but I don't know what extra configuration I need to do to allow this type of connections and others. I haven't found information regarding this error code, I would greatly appreciate any help.108Views0likes1CommentHow integreate Azure IoT Hub with Azure Synapse in RealTime
Hello, I'm researching how to connect Azure IoT Hub with Azure Synapse, I've already used IoT Hub a bit but I don't have any knowledge of Synapse, it is also required that the data be in RT, so if someone has already done something similar or knows where I can find answers I would appreciate it. Have a good day.211Views0likes4CommentsNetwork Monitoring
Hi, I recently applied Network Security Groups on Virtual Networks (NSG). Now my question is, is it possible to monitor / record the network traffic? For example, I've configured many rules on the NSG, now a application on a Server won't work and my first guess is the NSG is blocking the communication. How do I see now which port the application is using so I can set a new rule to the NSG? I know when you already know the port you can check it in Network Watcher "IP flow verify and NSG diagnostics" as a whatif state. Traffic Analytics isn't the right answer too or am I seeing it wrong? Vnet Flow Logs should be the right thing. I configured it, applied traffic analytics and a account storage. Applied it for testing on a nic but I don't see anything practical for my use? The only thing Iwish is to see live or logged the traffic if the NSG blocked anything and troubleshoot.346Views0likes4CommentsUnable to process AAS model connecting to Azure SQL with Service Account
Hello I have built a demo SSAS model that I am hosting on an Azure Analysis Services Server. The model connects to an Azure SQL database in my tenant (the Database is the default AdventureWorks provided by Azure when creating your first DB). To connect to the Azure SQL, I have created an App (service principal) and granted it reader access to my Azure SQL DB. If I login to the Azure SQL DB from SSMS with this account, using Microsoft Entra Service Principal Authentication providing ClientId@TenantID for the Username and SecretValue as the password, I am able to login and SELECT from the tables. However, when I try to process the SSAS model, I get an error. For reference, below I have put the TMSL script that sets the DataSource part of the SSAS after deployment via YAML pipelines (variables are replaced when running). I think the issue lies in the "AuthenticationKind" value I have provided in the credential, but I can't figure out what to use. When I create the datasource like this and process, I get error: Failed to save modifications to the server. Error returned: '<ccon>Windows authentication has been disabled in the current context.</ccon>. I don't understand why since I am not using Windows authentication kind. Every other keyword I used ib the "AuthenticationKind" part returns error AuthenticationKind not supported. Any help on how to change this script would be useful. { "createOrReplace": { "object": { "database": "$(AAS_DATABASE)", "dataSource": "$(AZSQLDataSourceName)" }, "dataSource": { "type": "structured", "name": "$(AZSQLDataSourceName)", "connectionDetails": { "protocol": "tds", "address": { "server": "$(AZSQLServer)" }, "initialCatalog": "$(AZSQLDatabase)" }, "credential": { "AuthenticationKind": "ServiceAccount", "username": "$(AZSQL_CLIENT_ID)@$(AZSQL_TENANT_ID)", "password": "$(AZSQL_CLIENT_SECRET)" } } } }119Views0likes1CommentHow to Create an AI Model for Streaming Data
A Practical Guide with Microsoft Fabric, Kafka and MLFlow Intro In today’s digital landscape, the ability to detect and respond to threats in real-time isn’t just a luxury—it’s a necessity. Imagine building a system that can analyze thousands of user interactions per second, identifying potential phishing attempts before they impact your users. While this may sound complex, Microsoft Fabric makes it possible, even with streaming data. Let’s explore how. In this hands-on guide, I’ll walk you through creating an end-to-end AI solution that processes streaming data from Kafka and employs machine learning for real-time threat detection. We’ll leverage Microsoft Fabric’s comprehensive suite of tools to build, train, and deploy an AI model that works seamlessly with streaming data. Why This Matters Before we dive into the technical details, let’s explore the key advantages of this approach: real-time detection, proactive protection, and the ability to adapt to emerging threats. Real-Time Processing: Traditional batch processing isn’t enough in today’s fast-paced threat landscape. We need immediate insights. Scalability: With Microsoft Fabric’s distributed computing capabilities, our solution can handle enterprise-scale data volumes. Integration: By combining streaming data processing with AI, we create a system that’s both intelligent and responsive. What We’ll Build I’ve created a practical demonstration that showcases how to: Ingest streaming data from Kafka using Microsoft Fabric’s Eventhouse Clean and prepare data in real-time using PySpark Train and evaluate an AI model for phishing detection Deploy the model for real-time predictions Store and analyze results for continuous improvement The best part? Everything stays within the Microsoft Fabric ecosystem, making deployment and maintenance straightforward. Azure Event Hub Start by creating an Event Hub namespace and a new Event Hub. Azure Event Hubs have Kafka endpoints ready to start receiving Streaming Data. Create a new Shared Access Signature and utilize the Python i have created. You may adopt the Constructor to your own idea. import uuid import random import time from confluent_kafka import Producer # Kafka configuration for Azure Event Hub config = { 'bootstrap.servers': 'streamiot-dev1.servicebus.windows.net:9093', 'sasl.mechanisms': 'PLAIN', 'security.protocol': 'SASL_SSL', 'sasl.username': '$ConnectionString', 'sasl.password': 'Endpoint=sb://<replacewithyourendpoint>.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxxxxx', } # Create a Kafka producer producer = Producer(config) # Shadow traffic generation def generate_shadow_payload(): """Generates a shadow traffic payload.""" subscriber_id = str(uuid.uuid4()) # Weighted choice for subscriberData if random.choices([True, False], weights=[5, 1])[0]: subscriber_data = f"{random.choice(['John', 'Mark', 'Alex', 'Gordon', 'Silia' 'Jane', 'Alice', 'Bob'])} {random.choice(['Doe', 'White', 'Blue', 'Green', 'Beck', 'Rogers', 'Fergs', 'Coolio', 'Hanks', 'Oliver', 'Smith', 'Brown'])}" else: subscriber_data = f"https://{random.choice(['example.com', 'examplez.com', 'testz.com', 'samplez.com', 'testsite.com', 'mysite.org'])}" return { "subscriberId": subscriber_id, "subscriberData": subscriber_data, } # Delivery report callback def delivery_report(err, msg): """Callback for delivery reports.""" if err is not None: print(f"Message delivery failed: {err}") else: print(f"Message delivered to {msg.topic()} [partition {msg.partition()}] at offset {msg.offset()}") # Topic configuration topic = 'streamio-events1' # Simulate shadow traffic generation and sending to Kafka try: print("Starting shadow traffic simulation. Press Ctrl+C to stop.") while True: # Generate payload payload = generate_shadow_payload() # Send payload to Kafka producer.produce( topic=topic, key=str(payload["subscriberId"]), value=str(payload), callback=delivery_report ) # Throttle messages (1500ms) producer.flush() # Ensure messages are sent before throttling time.sleep(1.5) except KeyboardInterrupt: print("\nSimulation stopped.") finally: producer.flush() You can run this from your Workstation, an Azure Function or whatever fits your case. Architecture Deep Dive: The Three-Layer Approach When building AI-powered streaming solutions, thinking in layers helps manage complexity. Let’s break down our architecture into three distinct layers: Bronze Layer: Raw Streaming Data Ingestion At the foundation of our solution lies the raw data ingestion layer. Here’s where our streaming story begins: A web service generates JSON payloads containing subscriber data These events flow through Kafka endpoints Data arrives as structured JSON with key fields like subscriberId, subscriberData, and timestamps Microsoft Fabric’s Eventstream captures this raw streaming data, providing a reliable foundation for our ML pipeline and stores the payloads in Eventhouse Silver Layer: The Intelligence Hub This is where the magic happens. Our silver layer transforms raw data into actionable insights: The EventHouse KQL database stores and manages our streaming data Our ML model, trained using PySpark’s RandomForest classifier, processes the data SynapseML’s Predict API enables seamless model deployment A dedicated pipeline applies our ML model to detect potential phishing attempts Results are stored in Lakehouse Delta Tables for immediate access Gold Layer: Business Value Delivery The final layer focuses on making our insights accessible and actionable: Lakehouse tables store cleaned, processed data Semantic models transform our predictions into business-friendly formats Power BI dashboards provide real-time visibility into phishing detection Real-time dashboards enable immediate response to potential threats The Power of Real-Time ML for Streaming Data What makes this architecture particularly powerful is its ability to: Process data in real-time as it streams in Apply sophisticated ML models without batch processing delays Provide immediate visibility into potential threats Scale automatically as data volumes grow Implementing the Machine Learning Pipeline Let’s dive into how we built and deployed our phishing detection model using Microsoft Fabric’s ML capabilities. What makes this implementation particularly interesting is how it combines traditional ML with streaming data processing. Building the ML Foundation First, let’s look at how we structured the training phase of our machine learning pipeline using PySpark: Training Notebook Connect to Eventhouse Load the data from pyspark.sql import SparkSession # Initialize Spark session (already set up in Fabric Notebooks) spark = SparkSession.builder.getOrCreate() # Define connection details kustoQuery = """ SampleData | project subscriberId, subscriberData, ingestion_time() """ # Replace with your desired KQL query kustoUri = "https://<eventhousedbUri>.z9.kusto.fabric.microsoft.com" # Replace with your Kusto cluster URI database = "Eventhouse" # Replace with your Kusto database name # Fetch the access token for authentication accessToken = mssparkutils.credentials.getToken(kustoUri) # Read data from Kusto using Spark df = spark.read \ .format("com.microsoft.kusto.spark.synapse.datasource") \ .option("accessToken", accessToken) \ .option("kustoCluster", kustoUri) \ .option("kustoDatabase", database) \ .option("kustoQuery", kustoQuery) \ .load() # Show the loaded data print("Loaded data:") df.show() Separate and flag Phishing payload Load it with Spark from pyspark.sql.functions import col, expr, when, udf from urllib.parse import urlparse # Define a UDF (User Defined Function) to extract the domain def extract_domain(url): if url.startswith('http'): return urlparse(url).netloc return None # Register the UDF with Spark extract_domain_udf = udf(extract_domain) # Feature engineering with Spark df = df.withColumn("is_url", col("subscriberData").startswith("http")) \ .withColumn("domain", extract_domain_udf(col("subscriberData"))) \ .withColumn("is_phishing", col("is_url")) # Show the transformed data df.show() Use Spark ML Lib to Train the model Evaluate the Model from pyspark.sql.functions import col from pyspark.ml.feature import Tokenizer, HashingTF, IDF from pyspark.ml.classification import RandomForestClassifier from pyspark.ml import Pipeline from pyspark.ml.evaluation import MulticlassClassificationEvaluator # Ensure the label column is of type double df = df.withColumn("is_phishing", col("is_phishing").cast("double")) # Tokenizer to break text into words tokenizer = Tokenizer(inputCol="subscriberData", outputCol="words") # Convert words to raw features using hashing hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=100) # Compute the term frequency-inverse document frequency (TF-IDF) idf = IDF(inputCol="rawFeatures", outputCol="features") # Random Forest Classifier rf = RandomForestClassifier(labelCol="is_phishing", featuresCol="features", numTrees=10) # Build the ML pipeline pipeline = Pipeline(stages=[tokenizer, hashingTF, idf, rf]) # Split the dataset into training and testing sets train_data, test_data = df.randomSplit([0.7, 0.3], seed=42) # Train the model model = pipeline.fit(train_data) # Make predictions on the test data predictions = model.transform(test_data) # Evaluate the model's accuracy evaluator = MulticlassClassificationEvaluator( labelCol="is_phishing", predictionCol="prediction", metricName="accuracy" ) accuracy = evaluator.evaluate(predictions) # Output the accuracy print(f"Model Accuracy: {accuracy}") Add Signature to AI Model from mlflow.models.signature import infer_signature from pyspark.sql import Row # Select a sample for inferring signature sample_data = train_data.limit(10).toPandas() # Create a Pandas DataFrame for schema inference input_sample = sample_data[["subscriberData"]] # Input column(s) output_sample = model.transform(train_data.limit(10)).select("prediction").toPandas() # Infer the signature signature = infer_signature(input_sample, output_sample) Run – Publish Model and Log Metric: Accuracy import mlflow from mlflow import spark # Start an MLflow run with mlflow.start_run() as run: # Log the Spark MLlib model with the signature mlflow.spark.log_model( spark_model=model, artifact_path="phishing_detector", registered_model_name="PhishingDetector", signature=signature # Add the inferred signature ) # Log metrics like accuracy mlflow.log_metric("accuracy", accuracy) print(f"Model logged successfully under run ID: {run.info.run_id}") Results and Impact Our implementation achieved: 81.8% accuracy in phishing detection Sub-second prediction times for streaming data Scalable processing of thousands of events per second Yes, that's a good start ! Now let's continue our post by explaining the deployment and operation phase of our ML solution: From Model to Production: Automating the ML Pipeline After training our model, the next crucial step is operationalizing it for real-time use. We’ve implemented one Pipeline with two activities that process our streaming data every 5 minutes: All Streaming Data Notebook # Main prediction snippet from synapse.ml.predict import MLFlowTransformer # Apply ML model for phishing detection model = MLFlowTransformer( inputCols=["subscriberData"], outputCol="predictions", modelName="PhishingDetector", modelVersion=3 ) # Transform and save all predictions df_with_predictions = model.transform(df) df_with_predictions.write.format('delta').mode("append").save("Tables/phishing_predictions") Clean Streaming Data Notebook # Filter for non-phishing data only non_phishing_df = df_with_predictions.filter(col("predictions") == 0) # Save clean data for business analysis non_phishing_df.write.format("delta").mode("append").save("Tables/clean_data") Creating Business Value What makes this architecture particularly powerful is the seamless transition from ML predictions to business insights: Delta Lake Integration: All predictions are stored in Delta format, ensuring ACID compliance Enables time travel and data versioning Perfect for creating semantic models Real-Time Processing: 5-minute refresh cycle ensures near real-time threat detection Automatic segregation of clean vs. suspicious data Immediate visibility into potential threats Business Intelligence Ready: Delta tables are directly compatible with semantic modeling Power BI can connect to these tables for live reporting Enables both historical analysis and real-time monitoring The Power of Semantic Models With our data now organized in Delta tables, we’re ready for: Creating dimensional models for better analysis Building real-time dashboards Generating automated reports Setting up alerts for security teams Real-Time Visualization Capabilities While Microsoft Fabric offers extensive visualization capabilities through Power BI, it’s worth highlighting one particularly powerful feature: direct KQL querying for real-time monitoring. Here’s a glimpse of how simple yet powerful this can be: SampleData | where EventProcessedUtcTime > ago(1m) // Fetch rows processed in the last 1 minute | project subscriberId, subscriberData, EventProcessedUtcTime This simple KQL query, when integrated into a dashboard, provides near real-time visibility into your streaming data with sub-minute latency. The visualization possibilities are extensive, but that’s a topic for another day. Conclusion: Bringing It All Together What we’ve built here is more than just a machine learning model – it’s a complete, production-ready system that: Ingests and processes streaming data in real-time Applies sophisticated ML algorithms for threat detection Automatically segregates clean from suspicious data Provides immediate visibility into potential threats The real power of Microsoft Fabric lies in how it seamlessly integrates these different components. From data ingestion through Eventhouse ad Lakehouse, to ML model training and deployment, to real-time monitoring – everything works together in a unified platform. What’s Next? While we’ve focused on phishing detection, this architecture can be adapted for various use cases: Fraud detection in financial transactions Quality control in manufacturing Customer behavior analysis Anomaly detection in IoT devices The possibilities are endless with our imagination and creativity! Stay tuned for the Git Repo where all the code will be shared ! References Get Started with Microsoft Fabric Delta Lake in Fabric Overview of Eventhouse CloudBlogger: A guide to innovative Apps with MS Fabric356Views0likes0Comments