Forum Widgets
Latest Discussions
How to Create an AI Model for Streaming Data
A Practical Guide with Microsoft Fabric, Kafka and MLFlow Intro In today’s digital landscape, the ability to detect and respond to threats in real-time isn’t just a luxury—it’s a necessity. Imagine building a system that can analyze thousands of user interactions per second, identifying potential phishing attempts before they impact your users. While this may sound complex, Microsoft Fabric makes it possible, even with streaming data. Let’s explore how. In this hands-on guide, I’ll walk you through creating an end-to-end AI solution that processes streaming data from Kafka and employs machine learning for real-time threat detection. We’ll leverage Microsoft Fabric’s comprehensive suite of tools to build, train, and deploy an AI model that works seamlessly with streaming data. Why This Matters Before we dive into the technical details, let’s explore the key advantages of this approach: real-time detection, proactive protection, and the ability to adapt to emerging threats. Real-Time Processing: Traditional batch processing isn’t enough in today’s fast-paced threat landscape. We need immediate insights. Scalability: With Microsoft Fabric’s distributed computing capabilities, our solution can handle enterprise-scale data volumes. Integration: By combining streaming data processing with AI, we create a system that’s both intelligent and responsive. What We’ll Build I’ve created a practical demonstration that showcases how to: Ingest streaming data from Kafka using Microsoft Fabric’s Eventhouse Clean and prepare data in real-time using PySpark Train and evaluate an AI model for phishing detection Deploy the model for real-time predictions Store and analyze results for continuous improvement The best part? Everything stays within the Microsoft Fabric ecosystem, making deployment and maintenance straightforward. Azure Event Hub Start by creating an Event Hub namespace and a new Event Hub. Azure Event Hubs have Kafka endpoints ready to start receiving Streaming Data. Create a new Shared Access Signature and utilize the Python i have created. You may adopt the Constructor to your own idea. import uuid import random import time from confluent_kafka import Producer # Kafka configuration for Azure Event Hub config = { 'bootstrap.servers': 'streamiot-dev1.servicebus.windows.net:9093', 'sasl.mechanisms': 'PLAIN', 'security.protocol': 'SASL_SSL', 'sasl.username': '$ConnectionString', 'sasl.password': 'Endpoint=sb://<replacewithyourendpoint>.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxxxxx', } # Create a Kafka producer producer = Producer(config) # Shadow traffic generation def generate_shadow_payload(): """Generates a shadow traffic payload.""" subscriber_id = str(uuid.uuid4()) # Weighted choice for subscriberData if random.choices([True, False], weights=[5, 1])[0]: subscriber_data = f"{random.choice(['John', 'Mark', 'Alex', 'Gordon', 'Silia' 'Jane', 'Alice', 'Bob'])} {random.choice(['Doe', 'White', 'Blue', 'Green', 'Beck', 'Rogers', 'Fergs', 'Coolio', 'Hanks', 'Oliver', 'Smith', 'Brown'])}" else: subscriber_data = f"https://{random.choice(['example.com', 'examplez.com', 'testz.com', 'samplez.com', 'testsite.com', 'mysite.org'])}" return { "subscriberId": subscriber_id, "subscriberData": subscriber_data, } # Delivery report callback def delivery_report(err, msg): """Callback for delivery reports.""" if err is not None: print(f"Message delivery failed: {err}") else: print(f"Message delivered to {msg.topic()} [partition {msg.partition()}] at offset {msg.offset()}") # Topic configuration topic = 'streamio-events1' # Simulate shadow traffic generation and sending to Kafka try: print("Starting shadow traffic simulation. Press Ctrl+C to stop.") while True: # Generate payload payload = generate_shadow_payload() # Send payload to Kafka producer.produce( topic=topic, key=str(payload["subscriberId"]), value=str(payload), callback=delivery_report ) # Throttle messages (1500ms) producer.flush() # Ensure messages are sent before throttling time.sleep(1.5) except KeyboardInterrupt: print("\nSimulation stopped.") finally: producer.flush() You can run this from your Workstation, an Azure Function or whatever fits your case. Architecture Deep Dive: The Three-Layer Approach When building AI-powered streaming solutions, thinking in layers helps manage complexity. Let’s break down our architecture into three distinct layers: Bronze Layer: Raw Streaming Data Ingestion At the foundation of our solution lies the raw data ingestion layer. Here’s where our streaming story begins: A web service generates JSON payloads containing subscriber data These events flow through Kafka endpoints Data arrives as structured JSON with key fields like subscriberId, subscriberData, and timestamps Microsoft Fabric’s Eventstream captures this raw streaming data, providing a reliable foundation for our ML pipeline and stores the payloads in Eventhouse Silver Layer: The Intelligence Hub This is where the magic happens. Our silver layer transforms raw data into actionable insights: The EventHouse KQL database stores and manages our streaming data Our ML model, trained using PySpark’s RandomForest classifier, processes the data SynapseML’s Predict API enables seamless model deployment A dedicated pipeline applies our ML model to detect potential phishing attempts Results are stored in Lakehouse Delta Tables for immediate access Gold Layer: Business Value Delivery The final layer focuses on making our insights accessible and actionable: Lakehouse tables store cleaned, processed data Semantic models transform our predictions into business-friendly formats Power BI dashboards provide real-time visibility into phishing detection Real-time dashboards enable immediate response to potential threats The Power of Real-Time ML for Streaming Data What makes this architecture particularly powerful is its ability to: Process data in real-time as it streams in Apply sophisticated ML models without batch processing delays Provide immediate visibility into potential threats Scale automatically as data volumes grow Implementing the Machine Learning Pipeline Let’s dive into how we built and deployed our phishing detection model using Microsoft Fabric’s ML capabilities. What makes this implementation particularly interesting is how it combines traditional ML with streaming data processing. Building the ML Foundation First, let’s look at how we structured the training phase of our machine learning pipeline using PySpark: Training Notebook Connect to Eventhouse Load the data from pyspark.sql import SparkSession # Initialize Spark session (already set up in Fabric Notebooks) spark = SparkSession.builder.getOrCreate() # Define connection details kustoQuery = """ SampleData | project subscriberId, subscriberData, ingestion_time() """ # Replace with your desired KQL query kustoUri = "https://<eventhousedbUri>.z9.kusto.fabric.microsoft.com" # Replace with your Kusto cluster URI database = "Eventhouse" # Replace with your Kusto database name # Fetch the access token for authentication accessToken = mssparkutils.credentials.getToken(kustoUri) # Read data from Kusto using Spark df = spark.read \ .format("com.microsoft.kusto.spark.synapse.datasource") \ .option("accessToken", accessToken) \ .option("kustoCluster", kustoUri) \ .option("kustoDatabase", database) \ .option("kustoQuery", kustoQuery) \ .load() # Show the loaded data print("Loaded data:") df.show() Separate and flag Phishing payload Load it with Spark from pyspark.sql.functions import col, expr, when, udf from urllib.parse import urlparse # Define a UDF (User Defined Function) to extract the domain def extract_domain(url): if url.startswith('http'): return urlparse(url).netloc return None # Register the UDF with Spark extract_domain_udf = udf(extract_domain) # Feature engineering with Spark df = df.withColumn("is_url", col("subscriberData").startswith("http")) \ .withColumn("domain", extract_domain_udf(col("subscriberData"))) \ .withColumn("is_phishing", col("is_url")) # Show the transformed data df.show() Use Spark ML Lib to Train the model Evaluate the Model from pyspark.sql.functions import col from pyspark.ml.feature import Tokenizer, HashingTF, IDF from pyspark.ml.classification import RandomForestClassifier from pyspark.ml import Pipeline from pyspark.ml.evaluation import MulticlassClassificationEvaluator # Ensure the label column is of type double df = df.withColumn("is_phishing", col("is_phishing").cast("double")) # Tokenizer to break text into words tokenizer = Tokenizer(inputCol="subscriberData", outputCol="words") # Convert words to raw features using hashing hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=100) # Compute the term frequency-inverse document frequency (TF-IDF) idf = IDF(inputCol="rawFeatures", outputCol="features") # Random Forest Classifier rf = RandomForestClassifier(labelCol="is_phishing", featuresCol="features", numTrees=10) # Build the ML pipeline pipeline = Pipeline(stages=[tokenizer, hashingTF, idf, rf]) # Split the dataset into training and testing sets train_data, test_data = df.randomSplit([0.7, 0.3], seed=42) # Train the model model = pipeline.fit(train_data) # Make predictions on the test data predictions = model.transform(test_data) # Evaluate the model's accuracy evaluator = MulticlassClassificationEvaluator( labelCol="is_phishing", predictionCol="prediction", metricName="accuracy" ) accuracy = evaluator.evaluate(predictions) # Output the accuracy print(f"Model Accuracy: {accuracy}") Add Signature to AI Model from mlflow.models.signature import infer_signature from pyspark.sql import Row # Select a sample for inferring signature sample_data = train_data.limit(10).toPandas() # Create a Pandas DataFrame for schema inference input_sample = sample_data[["subscriberData"]] # Input column(s) output_sample = model.transform(train_data.limit(10)).select("prediction").toPandas() # Infer the signature signature = infer_signature(input_sample, output_sample) Run – Publish Model and Log Metric: Accuracy import mlflow from mlflow import spark # Start an MLflow run with mlflow.start_run() as run: # Log the Spark MLlib model with the signature mlflow.spark.log_model( spark_model=model, artifact_path="phishing_detector", registered_model_name="PhishingDetector", signature=signature # Add the inferred signature ) # Log metrics like accuracy mlflow.log_metric("accuracy", accuracy) print(f"Model logged successfully under run ID: {run.info.run_id}") Results and Impact Our implementation achieved: 81.8% accuracy in phishing detection Sub-second prediction times for streaming data Scalable processing of thousands of events per second Yes, let me continue the blog post by explaining the deployment and operation phase of our ML solution: From Model to Production: Automating the ML Pipeline After training our model, the next crucial step is operationalizing it for real-time use. We’ve implemented one Pipeline with two activities that process our streaming data every 5 minutes: All Streaming Data Notebook # Main prediction snippet from synapse.ml.predict import MLFlowTransformer # Apply ML model for phishing detection model = MLFlowTransformer( inputCols=["subscriberData"], outputCol="predictions", modelName="PhishingDetector", modelVersion=3 ) # Transform and save all predictions df_with_predictions = model.transform(df) df_with_predictions.write.format('delta').mode("append").save("Tables/phishing_predictions") Clean Streaming Data Notebook # Filter for non-phishing data only non_phishing_df = df_with_predictions.filter(col("predictions") == 0) # Save clean data for business analysis non_phishing_df.write.format("delta").mode("append").save("Tables/clean_data") Creating Business Value What makes this architecture particularly powerful is the seamless transition from ML predictions to business insights: Delta Lake Integration: All predictions are stored in Delta format, ensuring ACID compliance Enables time travel and data versioning Perfect for creating semantic models Real-Time Processing: 5-minute refresh cycle ensures near real-time threat detection Automatic segregation of clean vs. suspicious data Immediate visibility into potential threats Business Intelligence Ready: Delta tables are directly compatible with semantic modeling Power BI can connect to these tables for live reporting Enables both historical analysis and real-time monitoring The Power of Semantic Models With our data now organized in Delta tables, we’re ready for: Creating dimensional models for better analysis Building real-time dashboards Generating automated reports Setting up alerts for security teams Real-Time Visualization Capabilities While Microsoft Fabric offers extensive visualization capabilities through Power BI, it’s worth highlighting one particularly powerful feature: direct KQL querying for real-time monitoring. Here’s a glimpse of how simple yet powerful this can be: SampleData | where EventProcessedUtcTime > ago(1m) // Fetch rows processed in the last 1 minute | project subscriberId, subscriberData, EventProcessedUtcTime This simple KQL query, when integrated into a dashboard, provides near real-time visibility into your streaming data with sub-minute latency. The visualization possibilities are extensive, but that’s a topic for another day. Conclusion: Bringing It All Together What we’ve built here is more than just a machine learning model – it’s a complete, production-ready system that: Ingests and processes streaming data in real-time Applies sophisticated ML algorithms for threat detection Automatically segregates clean from suspicious data Provides immediate visibility into potential threats The real power of Microsoft Fabric lies in how it seamlessly integrates these different components. From data ingestion through Eventhouse ad Lakehouse, to ML model training and deployment, to real-time monitoring – everything works together in a unified platform. What’s Next? While we’ve focused on phishing detection, this architecture can be adapted for various use cases: Fraud detection in financial transactions Quality control in manufacturing Customer behavior analysis Anomaly detection in IoT devices The possibilities are endless with our imagination and creativity! Stay tuned for the Git Repo where all the code will be shared ! References Get Started with Microsoft Fabric Delta Lake in Fabric Overview of Eventhouse CloudBlogger: A guide to innovative Apps with MS Fabric11Views0likes0CommentsWhy Microsoft Azure Certification Matters for Cloud Careers
In today’s rapidly growing IT landscape, cloud computing is a critical skill for most organizations. Microsoft Azure certifications are designed to equip professionals with the knowledge and expertise required to thrive in this domain. By pursuing Microsoft Azure Certifications, you gain hands-on skills with various cloud-based services and demonstrate your abilities to potential employers, making you stand out in the competitive job market. Whether you are a beginner exploring the fundamentals or an experienced professional seeking career growth, Azure certifications can open doors to better job opportunities and help you advance in your current role. These certifications are recognized worldwide and are a solid step toward staying relevant in the ever-evolving cloud computing industry.breewickDec 20, 2024Occasional Reader3Views0likes0CommentsAzure AI for an API Platform
We are managing an API Platform in our company. We collect metrics in ELK (deployed on premise) and We have a Confluence WIKI page with our public documentation. We would like to have a Chatbot that can be trained by structured (JSON, CSV) Metrics (Prometheus) stored in ELK and by trained by the unstructured content in our WIKI page. The Chatbot should be able to answer questions like: (Source: WIKI page) - How can I publish an API with mTLS enabled - How can I authorize a client certificate for my API - How can I specify the RPS for my API - and so on and so forth (Source: ELK) - Plot the request of the API "XX" for the last month - Can you predict the API calls trend of the next month for the API "XXX" - Give me a list of the client IP that accessed the API "YYY" yesterday - and so on and so forth In the future We may want the chat bot to be able to do some basic automatic actions like (contacting our self-service API): - Allow this client certificate to access this API - Change the RPS for the API ZZZ to 10 rps/s - and so on and so forth - What Azure AI Services would you recommend to start looking into? Which AI models? Which Azure resource? - How can we train and feed the model from ELK? do we have to export the data from ELK daily and store it in an Azure Storage account or we can instruct the specific Azure AI Service to connect to our ELK endpoint (or a proxy API) to fetch the data? Thank youfabry_00Dec 20, 2024Copper Contributor1.4KViews0likes1CommentImplementing Azure ADConnect in a live environment
I have been tasked with implementing Azure ADConnect for my company. We currently have 2 locally virtualized domain controllers and are already utilizing Office365 for mail. What would be the easiest way to implement ADConnect while having the least amount of downtime/user interruptions.PVIAnthonyDec 19, 2024Copper Contributor56Views1like4CommentsTroubleshooting Azure Function App Proxy with Private Blob Container Access for Static Web App
Recently, I shared a problem I’m facing in my testing environment with a friend. I’ve decided to bring this issue to an open forum discussion to gather additional insights. I hope you can help me figure out what might be missing in my configuration. **Context:** I’m trying to replicate a solution in my test environment but encountering difficulties in a specific scenario. **Scenario:** I have a Function App acting as a proxy for a Static Web App hosted in a Blob Container. This Blob Container is set to private access, meaning public access is disabled. **The Problem:** The goal is for my Function App to authorize users and direct them correctly to the Static Web App. However, it’s not working as expected. **What I’ve tried so far:** 1. Configured Managed Identity for the Function App and granted the necessary permissions to the Blob Container. 2. Properly set up authentication and created the App Registration, which works flawlessly. 3. Verified that the proxy functions correctly when the Blob Container’s public access is enabled. **Current behavior:** - When public access to the Blob Container is enabled, everything works fine. - When public access is disabled, even with the proxy configured, access fails, and an error message "resource not found" is returned. **My questions are:** 1. Do I need to configure something additional in the proxy definition file? 2. Is there a specific setting, like a private endpoint or something similar, that I should implement to resolve this issue? **Additional considerations:** I haven’t configured a private endpoint yet, but I’m considering whether this would be the most appropriate solution for my case. My initial expectation was that granting the necessary permissions to the Function App via Managed Identity would solve the issue, but it hasn’t. I appreciate any guidance or suggestions you can provide!AugustoJabaDec 19, 2024Copper Contributor14Views0likes1CommentAzure PowerShell find LastOwnershipUpdateTime on disk
Hello: I wondering if it's possible to find LastOwnershipUpdateTime on the disk via PowerShell. I can see this info in the portal, but cannot figure out how to find it via script (PowerShell). Looks like MSFT recently released it, but even updating my Az.Compute module to the latest (9.0.0) version I still do not see it. Any help would be really appreciated. Thank you!SolvedOleg_ADec 18, 2024Copper Contributor37Views0likes3CommentsAzure support team not responding to support request
I am posting here because I have not received a response to my support request despite my plan stating that I should hear back within 8 hours. It has now gone a day beyond that limit, and I am still waiting for assistance with this urgent matter. This issue is critical for my operations, and the delay is unacceptable. The ticket/reference number for my original support request was 2410100040000309. And I have created a brand new service request with ID 2412160040010160. I need this addressed immediately._Adrian_Dec 17, 2024Occasional Reader46Views0likes3CommentsAccidentially changed my admin user to a guest user ... and cannot access azure anymore
Hi, I did something stupid and now cannot access my azure anymore: 1. I wanted to integrate the MS TEams into my Google account 2. To do this, I had to create an account via Azure - i was told on different platforms 3. But since it's the same email address (***@gmail.com) - I couldn't invite myself as an internal user, (I already was one!) so ... I turned myself into a guest user and wanted to invite me again. Obviously, this did not work - and now I can not access azure anymore - Can someone help me how to restore my account? Here the error messages i got: { "sessionId": "448959718eec490b89d366bb329cecec", "errors": [ { "errorMessage": "interaction_required: AADSTS16000: User account '{EUII Hidden}' from identity provider 'live.com' does not exist in tenant 'Microsoft Services' and cannot access the application 'b677c290-cf4b-4a8e-a60e-91ba650a4abe'(AzurePortal Console App) in that tenant. The account needs to be added as an external user in the tenant first. Sign out and sign in again with a different Azure Active Directory user account. Trace ID: c5672f20-06a6-421d-91d3-33ec930b3e00 Correlation ID: 304caf2a-0555-4935-bdb2-7724571ffebe Timestamp: 2024-10-15 10:35:32Z", "clientId": "b677c290-cf4b-4a8e-a60e-91ba650a4abe", "scopes": [ "https://management.core.windows.net//.default" ] }, { "errorMessage": "interaction_required: AADSTS16000: User account '{EUII Hidden}' from identity provider 'live.com' does not exist in tenant 'Microsoft Services' and cannot access the application 'b677c290-cf4b-4a8e-a60e-91ba650a4abe'(AzurePortal Console App) in that tenant. The account needs to be added as an external user in the tenant first. Sign out and sign in again with a different Azure Active Directory user account. Trace ID: a889d113-15ce-4c6d-9b36-f62228521f00 Correlation ID: 0625b216-085c-40df-9547-e233841dac35 Timestamp: 2024-10-15 10:35:32Z", "clientId": "b677c290-cf4b-4a8e-a60e-91ba650a4abe", "scopes": [ "https://management.core.windows.net//.default" ] } ] }denizseebacherDec 17, 2024Copper Contributor115Views0likes2Commentsazure board what is the max page size limit for get project level iteration api
i am using classificationnodes iteration api what if the number of project level iterations exceeds this limit how can we retrieve the next set of iterations using pagination? can we use filter as search iterations by name ? can we user top and skip for this api? please explain the use of parameter and any example if possible.surendra-pDec 17, 2024Copper Contributor49Views0likes4Comments
Resources
Tags
- azure2,208 Topics
- Azure DevOps1,383 Topics
- Data & Storage379 Topics
- Networking224 Topics
- Azure Friday220 Topics
- App Services195 Topics
- blockchain168 Topics
- devops146 Topics
- Security & Compliance137 Topics
- Analytics129 Topics