analytics
723 TopicsSecuring Azure HDInsight: ESM Support with Ubuntu 18.04, Cluster Updates, and Best Practices
Azure HDInsight, Microsoft's cloud-based big data analytics platform, continues to advance its features to provide users with a secure and efficient environment. In this article, we will explore the latest enhancements, focusing on Expanded Security Maintenance (ESM) support, the importance of regular cluster updates, and best practices recommended by Microsoft to fortify HDInsight deployments. The foundation of a secure Azure HDInsight environment lies in its ability to address critical vulnerabilities promptly. Microsoft ensures this by shipping the latest HDInsight images with Expanded Security Maintenance (ESM) support, which provides a framework for ongoing support, stability with minimal changes specifically targeting critical, high and some medium-level fixes. This ensures that HDInsight users benefit from a continuously updated and secure environment. ESM Support in Latest Images: Azure HDInsight 5.0 and 5.1 versions use Ubuntu 18.04 pro image. Ubuntu Pro includes security patching for all Ubuntu packages due to Expanded Security Maintenance (ESM) for Infrastructure and Applications. Ubuntu Pro 18.04 LTS will remain fully supported until April 2028. For more information on what's new in the latest HDInsight images with ESM support, users can refer to the official release notes on the Azure HDInsight Release Notes Archive. Periodic Cluster Updates: Maintaining a secure HDInsight environment requires diligence in keeping clusters up to date. Microsoft facilitates this process through the HDInsight OS patching mechanism. Periodically updating clusters using the procedures outlined in the official documentation ensures that users benefit from the latest features, performance improvements, and crucial security patches. Learn more about updating HDInsight clusters through the Azure HDInsight OS Patching documentation. ESM and HDI Release Integration: Extended Security Maintenance is seamlessly integrated into HDInsight releases. As part of each HDInsight release, critical fixes provided by ESM are bundled. This ensures that users benefit from the latest security enhancements with each new release. Customer Recommendation: Use the Latest Image: To maximize the benefits of the latest features and security updates, customers are strongly recommended to use the most recent HDInsight image number. By doing so, organizations ensure that their HDInsight clusters are fortified against the latest threats and vulnerabilities. Accessing Fixed CVE Details: For users seeking detailed information about the fixed Common Vulnerabilities and Exposures (CVEs), the Ubuntu CVE site serves as a valuable resource. Here, users can access comprehensive insights into the specific vulnerabilities addressed in each release, empowering them to make informed decisions about their security posture.2.1KViews0likes0CommentsFrom Doubt to Victory: How I Passed Microsoft SC-200
Hey everyone! I wanted to share my journey of how I went from doubting my chances to successfully passing the Microsoft SC-200 exam. At first, the idea of taking the SC-200 seemed overwhelming. With so many topics to cover, especially with the integration of Microsoft security technologies, I wasn’t sure if I could pull it off. But after months of studying and staying consistent, I finally passed! 🎉 Here’s what worked for me: Study Plan: I created a structured study schedule and stuck to it. I broke down each section of the exam objectives and allocated time for each part. Authentic Exam Questions: I used it-examstest for practice exams. Their realistic test format helped me get a good grasp of the exam pattern. Plus, the explanations for the answers were super helpful in understanding the concepts. Practice Exams: I did multiple mock tests. Honestly, they helped me more than I expected! They boosted my confidence, and I could pinpoint areas where I needed to improve. SC-200 Study Materials: I relied on a combination of online courses, books, and video resources. Watching the study videos and taking notes helped me retain the information better. Don’t Cram: I didn’t leave things to the last minute. It took me about 2-3 months of consistent study to get comfortable with the material. I made sure to take breaks and not burn myself out. Passing this exam felt amazing! If you're in the same boat and feeling uncertain, just stick with it! It’s a challenging exam, but with the right tools and preparation, you can do it. Keep pushing forward, and good luck to everyone! 💪 Would be happy to answer any questions if anyone has them!135Views0likes2CommentsEfficient Log Management with Microsoft Fabric
Introduction In the era of digital transformation, managing and analyzing log files in real-time is essential for maintaining application health, security, and performance. There are many 3rd party solutions in this area allowing collecting / processing storing, analyzing and acting upon this data source. But sometimes as your systems scale, those solution can become very costly, their cost model increases based on the amount of ingested data and not according to the real resources utilization or customer value This blog post explores a robust architecture leveraging Microsoft Fabric SaaS platform focused on its Realtime Intelligence capabilities for efficient log files collection processing and analysis. The use cases can vary from simple application errors troubleshooting, to more advanced use case such as application trends detection: detecting slowly degrading performance issues: like average user session in the app for specific activities last more than expected to more proactive monitoring using log based KPIs definition and monitoring those APIS for alerts generation Regarding cost , since Fabric provides a complete separation between compute and storage you can grow your data without necessarily growing your compute costs and you still pay for the resources that re used in a pay as you go model. Architecture Overview The proposed architecture integrates Microsoft Fabric’s Real time intelligence (Realtime Hub) with your source log files to create a seamless, near real-time log collection solution It is based on Microsoft Fabric: a SAAS solution which is a unified suite integrating several best of breed Microsoft analytical experiences. Fabric is a modern data/ai platform based on unified and open data formats (parquet/delta) allowing both classical data lakes experiences using both traditional Lakehouse/warehouse SQL analytics as well as real-time intelligence on semi structured data , all in on a lake-centric SaaS platform. Fabric's open foundation with built-in governance enables you to connect to various clouds and tools while maintaining data trust. This is High level Overview of Realtime Intelligence inside Fabric Log events - Fabric based Architecture When looking in more details a solution for log collection processing storage and analysis we propose the following architecture Now let's discuss it in more details: General notes: Since Fabric is a SAAS solution, all the components can be used without deploying any infrastructure in advance, just by a click of a button and very simple configurations you can customize the relevant components for this solution The main components used in this solution are Data Pipeline Onelake and Eventhouse Our data source for this example is taken from this public git repo: https://github.com/logpai/loghub/tree/master/Spark The files were taken and stored inside an S3 bucket to simulate the easiness of the data pipeline integration to external data sources. A typical log file looks like this : 16/07/26 12:00:30 INFO util.Utils: Successfully started service 'sparkDriverActorSystem' on port 59219. 16/07/26 12:00:30 INFO spark.SparkEnv: Registering MapOutputTracker 16/07/26 12:00:30 INFO spark.SparkEnv: Registering BlockManagerMaster 16/07/26 12:00:30 INFO storage.DiskBlockManager: Created local directory at /opt/hdfs/nodemanager/usercache/curi/appcache/application_1460011102909_0176/blockmgr-5ea750cb-dd00-4593-8b55-4fec98723714 16/07/26 12:00:30 INFO storage.MemoryStore: MemoryStore started with capacity 2.4 GB Components Data Pipeline First challenge to solve is how to bring the log files from your system into Fabric this is the Log collection phase: many solutions exist for this phase each with its pros and cons In Fabric the standard approach to bring data in is by use of Copy Activity in ADF or in its Fabric SAAS version is now called Data Pipeline: Data pipeline is a low code / no code tool allowing to manage and automate the process of moving and transforming data within Microsoft Fabric, a serverless ETL tool with more than 100 connectors enabling integration with a wide variety of data sources, including databases, cloud services, file systems, and more. In addition, it supports an on prem agent called self-hosted integration runtime, this agent that you install on a VM, acts as a bridge allowing to run your pipeline on a local VM and securing your connection from on prem network to the cloud Let’s describe in more details our solution data pipeline: Bear in mind ADF is very flexible and supports reading at scale from a wide range of data sources / files integrated as well to all major cloud vendors from blob storage retrieval : like S3, GCS, Oracle Cloud, File systems, FTP/SFTP etc so that even if your files are generated externally to Azure this is not an issue at all. Visualization of Fabric Data Pipeline Log Collection ADF Copy Activity: Inside Data pipeline we will create an Activity called Copy Activity with the following basic config Source: mapped to your data sources: it can be azure blob storage with container containing the log files, other cloud object storage like S3 or GCS , log files will be retrieved in general from a specific container/folder and are fetched based on some prefix/suffix in the file name. To support incremental load process we can configure it to delete the source files that it reads so that once the files are successfully transferred to their target they will be automatically deleted from their source . On the next iteration pipeline will not have to process the same files again. Sink: Onelake/Lakehouse folder: we create ahead of time a Lakehouse which is an abstract data container allowing to hold and manage at scale your data either structured or unstructured, we will then select it from the list of connectors (look for Onelake/Lakehouse) Log Shippers: This is an optional component, sometimes it is not allowed for the ETL to access your OnPrem Vnet , in this case tools like Fluentd , Filebeat , Open Telemetry collector used to forward your application collected logs to the main entry point of the system: the Azure Blob Storage. Azcopy CLI: if you don’t wish to invest into expensive tools and all you need to copy your data in a scale/secure manner to Azure Storage, you might consider create your own log shipping solution based on the free Azcopy tool together withs some basic scripting around it for scheduling: Azcopy is a command-line utility designed for high-performance uploading, downloading, and copying data to and from Microsoft Azure Blob and File storage. Fabric first Activity : Copy from Source Bucket to Lakehouse Log Preparation Upon log files landing in the azure blob storage, EventStream can be used to trigger the Data Pipeline that will handle the data preparation and loading phase. So what is Data preparation phase’s main purpose? After the log files land in the storage and before they are loaded to the realtime logs database the KQL Database , it might be necessary to transform the data with some basic manipulations . The reasons for that might be different A Few examples Bad data formats: for example, sometimes logs files contain problematic characters like new lines inside a row (stack trace error message with new lines as part of the message field of the record) Metadata enrichment: sometimes the log file names contain in their name some meaningful data : for example file name describes the originating process name / server name , so this metadata can be lost once the file content is loaded into database Regulation restrictions: sometimes logs contain private data like names, credit card numbers, social security number etc called PII that must be removed , hashed or encrypted before the load to database In our case we will be running a pyspark notebook who reads the files from Onelake folder, fixes the new lines inside a row issue, and create new files in another Onelake folder, we call this notebook with a base parameter called log_path that defines the log files location on the Onelake to read from Fabric second Activity : Running the Notebook Log Loading Inside Data pipeline , the last step, after the transformation phase, we call again the Copy data activity but this time source and sink are differen: Source: Lakehouse folder (previous notebook output) Sink: Evenhouse specific Table (created ahead of time): it is basically an empty table (lograw) Visualization of Fabric last Activity : Loading to EventHouse In summary for this stage the log collection and preparation: we broke this into 3 data pipeline activities: Copy Activity: Read the log files from source: This is the first step of the log ingestion pipeline it is running inside our orchestrator Data pipeline. Run Notebook Activity : Transform the log files : this is the execution of a single or chain of notebooks Copy Activity : Load the log files into destination datatbase : KQL inside Evenhouse : the logs database table called lograw, it is a specific table created ahead of time inside EventHouse Database Inside The Eventhouse We needed to create a KQL database with a table to hold the raw ingested log records KQL datbase is a scalable and efficient storage solution for log files, optimized for high-volume data ingestion and retrieval. Eventhouses and KQL databases operate on a fully managed Kusto engine. With an Eventhouse or KQL database, you can expect available compute for your analytics within 5 to 10 seconds. The compute resources grow with your data analytic needs. Log Ingestion to KQL Database with Update Policy We can separate the ETL transformation logic of what happens to the data before, it reaches the Eventhouse KQL database and after that. Before it reached the database , the only transformation we did was calling during the data pipeline a notebook to handle the new lines merge logic, This cannot be easily done as part of the database ingestion logic simply because when we try to load the files with new lines as part of a field of a record , it breaks the convention and what happens is that the ingestion process creates separate table records for each new line of the exceptions stacktrace. On the other hand, we might need to define basic transformation rules: such as date formatting, type conversion (string to numbers) , parse and extract some interesting value from a String based on regular exception, create JSON (dynamic type) of a hierarchical string (XML / JSON string etc) for all these transformations we can work with what is called an update policy we can define a simple ETL logic inside KQL database as explained here During this step we create from logsraw staging table a new table called logparsed , that will be our destination final table for the log queries. Those are the KQL Tables defined to hold the log files .create table logsraw ( timestamp:string , log_level:string, module:string, message:string) .create table logsparsed ( formattedDatetime:datetime , log_level:string, log_module:string, message:string) This is the update policy that automatically converts data from, the staging table logsraw to the destination table logparsed .create-or-alter function parse_lograw() { logsraw | project formattedtime = todatetime(strcat("20", substring(timestamp, 0, 2), "-", substring(timestamp, 3, 2), "-", substring(timestamp, 6, 2), "T", substring(timestamp, 9, 8))), log_level, logmodule=module, message } .alter table logsparsed policy update @'[{ "IsEnabled": true, "Source": "logsraw", "Query": "parse_lograw()", "IsTransactional": true, "PropagateIngestionProperties": true}]' Since we don't need to retain the data in the staging table (lograw) we can define a retention policy of 0 TTL like this : .alter-merge table logsraw policy retention softdelete = 0sec recoverability = disabled Query Log files After data is ingested and transformed it lands in a basic logs table that is schematized : logparsed, in general we have some common fields that are mapped to their own columns like : log level (INFO/ ERROR/ DEBUG) , log category , log timestamp (a datetime typed column) and log message which can be in general either a simple error string or a complex JSON formatted string in which case it is usually preferred to be converted to dynamic type that will bring additional benefits like simplified query logic, and reduced data processing (to avoid expensive joins) Example for Typical Log Queries Category Purpose KQL Query Troubleshooting Looking for an error at specific datetime range logsparsed | where message contains "Exception" and formattedDatetime between ( datetime(2016-07-26T12:10:00) .. datetime(2016-07-26T12:20:00)) Statistics Basic statistics Min/Max timestamp of log events logsparsed | summarize minTimestamp=min(formattedDatetime), maxTimestamp=max(formattedDatetime) Exceptions Stats Check Exceptions Distributions logsparsed | extend exceptionType = case(message contains "java.io.IOException","IOException", message contains "java.lang.IllegalStateException","IllegalStateException", message contains "org.apache.spark.rpc.RpcTimeoutException", "RpcTimeoutException", message contains "org.apache.spark.SparkException","SparkException", message contains "Exception","Other Exceptions", "No Exception") | where exceptionType != "No Exception" | summarize count() by exceptionType Log Module Stats Check Modules Distribution logsparsed | summarize count() by log_module | order by count_ desc | take 10 Realtime Dashboards After querying the logs, it is possible to visualize the query results in Realtime dashboards, for that all what’s required Select the query Click on Pin to Dashboard After adding the queries to tiles inside the dashboard this is a typical dashboard we can easily build: Realtime dashboards can be configured to be refreshed in Realtime like illustrated here: in which case user can very easily configure how often to refresh the queries and visualization : at the extreme case it can be as low as Continuus There are many more capabilities implemented in the Real-Time Dashboard, like data exploration Alerting using Data Activator , conditional formatting (change items colors based on KPIs threshold) and this framework and capabilities are heavily invested and keep growing. What about AI Integration ? Machine Learning Models: Kusto supports out of the box time series analysis allowing for example anomaly detection: https://learn.microsoft.com/en-us/fabric/real-time-intelligence/dashboard-explore-data and clustering but if it’s not enough for you, you can always mirror the data of your KQL tables into Onelake delta parquet format by selecting OneLake availability This configuration will create another copy of your data in open format delta parquet : you have it available for any Spark/Python/SparkML/SQL analytics for whatever machine learning exploration and ML modeling you wish to explore train and serve This is illustrated here : Bear in mind , there is no additional storage cost to turn on OneLake availability Conclusion A well-designed real-time intelligence solution for log file management using Microsoft Fabric and EventHouse can significantly enhance an organization’s ability to monitor, analyze, and respond to log events. By leveraging modern technologies and best practices, organizations can gain valuable insights and maintain robust system performance and security.380Views0likes0CommentsDecision Guide for Selecting an Analytical Data Store in Microsoft Fabric
Learn how to select an analytical data store in Microsoft Fabric based on your workload's data volumes, data type requirements, compute engine preferences, data ingestion patterns, data transformation needs, query patterns, and other factors.1.5KViews4likes1CommentARM template for deploying a workbook template to Microsoft Sentinel
Hello, I am attempting to deploy an ARM Template (execution using PowerShell) for any Analytic Rule to a Microsoft Sentinel instance. I have been following this link: https://learn.microsoft.com/en-us/azure/azure-monitor/visualize/workbooks-automate#next-steps. I am struggling with ensuring the Workbook is deployed to the Microsoft Sentinel workbook gallery and NOT the Azure Monitor one. The link includes a sample ARM template where you can add <templateData> (JSON code), which represents the workbook you wish to deploy. I get it working to deploy to the Azure Monitor workbook gallery but not for it to be present in the Microsoft Sentinel one. JasonSolved255Views0likes15CommentsExtracting Device and Participation Details in Microsoft Teams
Hi everyone, I am working on a project for my organization to extract detailed information about Teams calls. Specifically, I need to gather data on the devices used by participants, as well as their join and leave times for each Teams meeting. This includes both participants and organizers. My current approach involves: 1. Using Microsoft Graph API to access call and participant data. 2. Fetching relevant details such as device type, join time, and leave time. - What are the best practices for extracting and managing this data? - Are there any specific API endpoints or techniques you recommend for efficiently retrieving this information? - How can I ensure the accuracy and completeness of the data collected? - Are there any tools or libraries that can simplify this process? Looking forward to your insights and experiences! Thanks in advance for your help!50Views1like2CommentsHow to Create an AI Model for Streaming Data
A Practical Guide with Microsoft Fabric, Kafka and MLFlow Intro In today’s digital landscape, the ability to detect and respond to threats in real-time isn’t just a luxury—it’s a necessity. Imagine building a system that can analyze thousands of user interactions per second, identifying potential phishing attempts before they impact your users. While this may sound complex, Microsoft Fabric makes it possible, even with streaming data. Let’s explore how. In this hands-on guide, I’ll walk you through creating an end-to-end AI solution that processes streaming data from Kafka and employs machine learning for real-time threat detection. We’ll leverage Microsoft Fabric’s comprehensive suite of tools to build, train, and deploy an AI model that works seamlessly with streaming data. Why This Matters Before we dive into the technical details, let’s explore the key advantages of this approach: real-time detection, proactive protection, and the ability to adapt to emerging threats. Real-Time Processing: Traditional batch processing isn’t enough in today’s fast-paced threat landscape. We need immediate insights. Scalability: With Microsoft Fabric’s distributed computing capabilities, our solution can handle enterprise-scale data volumes. Integration: By combining streaming data processing with AI, we create a system that’s both intelligent and responsive. What We’ll Build I’ve created a practical demonstration that showcases how to: Ingest streaming data from Kafka using Microsoft Fabric’s Eventhouse Clean and prepare data in real-time using PySpark Train and evaluate an AI model for phishing detection Deploy the model for real-time predictions Store and analyze results for continuous improvement The best part? Everything stays within the Microsoft Fabric ecosystem, making deployment and maintenance straightforward. Azure Event Hub Start by creating an Event Hub namespace and a new Event Hub. Azure Event Hubs have Kafka endpoints ready to start receiving Streaming Data. Create a new Shared Access Signature and utilize the Python i have created. You may adopt the Constructor to your own idea. import uuid import random import time from confluent_kafka import Producer # Kafka configuration for Azure Event Hub config = { 'bootstrap.servers': 'streamiot-dev1.servicebus.windows.net:9093', 'sasl.mechanisms': 'PLAIN', 'security.protocol': 'SASL_SSL', 'sasl.username': '$ConnectionString', 'sasl.password': 'Endpoint=sb://<replacewithyourendpoint>.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxxxxx', } # Create a Kafka producer producer = Producer(config) # Shadow traffic generation def generate_shadow_payload(): """Generates a shadow traffic payload.""" subscriber_id = str(uuid.uuid4()) # Weighted choice for subscriberData if random.choices([True, False], weights=[5, 1])[0]: subscriber_data = f"{random.choice(['John', 'Mark', 'Alex', 'Gordon', 'Silia' 'Jane', 'Alice', 'Bob'])} {random.choice(['Doe', 'White', 'Blue', 'Green', 'Beck', 'Rogers', 'Fergs', 'Coolio', 'Hanks', 'Oliver', 'Smith', 'Brown'])}" else: subscriber_data = f"https://{random.choice(['example.com', 'examplez.com', 'testz.com', 'samplez.com', 'testsite.com', 'mysite.org'])}" return { "subscriberId": subscriber_id, "subscriberData": subscriber_data, } # Delivery report callback def delivery_report(err, msg): """Callback for delivery reports.""" if err is not None: print(f"Message delivery failed: {err}") else: print(f"Message delivered to {msg.topic()} [partition {msg.partition()}] at offset {msg.offset()}") # Topic configuration topic = 'streamio-events1' # Simulate shadow traffic generation and sending to Kafka try: print("Starting shadow traffic simulation. Press Ctrl+C to stop.") while True: # Generate payload payload = generate_shadow_payload() # Send payload to Kafka producer.produce( topic=topic, key=str(payload["subscriberId"]), value=str(payload), callback=delivery_report ) # Throttle messages (1500ms) producer.flush() # Ensure messages are sent before throttling time.sleep(1.5) except KeyboardInterrupt: print("\nSimulation stopped.") finally: producer.flush() You can run this from your Workstation, an Azure Function or whatever fits your case. Architecture Deep Dive: The Three-Layer Approach When building AI-powered streaming solutions, thinking in layers helps manage complexity. Let’s break down our architecture into three distinct layers: Bronze Layer: Raw Streaming Data Ingestion At the foundation of our solution lies the raw data ingestion layer. Here’s where our streaming story begins: A web service generates JSON payloads containing subscriber data These events flow through Kafka endpoints Data arrives as structured JSON with key fields like subscriberId, subscriberData, and timestamps Microsoft Fabric’s Eventstream captures this raw streaming data, providing a reliable foundation for our ML pipeline and stores the payloads in Eventhouse Silver Layer: The Intelligence Hub This is where the magic happens. Our silver layer transforms raw data into actionable insights: The EventHouse KQL database stores and manages our streaming data Our ML model, trained using PySpark’s RandomForest classifier, processes the data SynapseML’s Predict API enables seamless model deployment A dedicated pipeline applies our ML model to detect potential phishing attempts Results are stored in Lakehouse Delta Tables for immediate access Gold Layer: Business Value Delivery The final layer focuses on making our insights accessible and actionable: Lakehouse tables store cleaned, processed data Semantic models transform our predictions into business-friendly formats Power BI dashboards provide real-time visibility into phishing detection Real-time dashboards enable immediate response to potential threats The Power of Real-Time ML for Streaming Data What makes this architecture particularly powerful is its ability to: Process data in real-time as it streams in Apply sophisticated ML models without batch processing delays Provide immediate visibility into potential threats Scale automatically as data volumes grow Implementing the Machine Learning Pipeline Let’s dive into how we built and deployed our phishing detection model using Microsoft Fabric’s ML capabilities. What makes this implementation particularly interesting is how it combines traditional ML with streaming data processing. Building the ML Foundation First, let’s look at how we structured the training phase of our machine learning pipeline using PySpark: Training Notebook Connect to Eventhouse Load the data from pyspark.sql import SparkSession # Initialize Spark session (already set up in Fabric Notebooks) spark = SparkSession.builder.getOrCreate() # Define connection details kustoQuery = """ SampleData | project subscriberId, subscriberData, ingestion_time() """ # Replace with your desired KQL query kustoUri = "https://<eventhousedbUri>.z9.kusto.fabric.microsoft.com" # Replace with your Kusto cluster URI database = "Eventhouse" # Replace with your Kusto database name # Fetch the access token for authentication accessToken = mssparkutils.credentials.getToken(kustoUri) # Read data from Kusto using Spark df = spark.read \ .format("com.microsoft.kusto.spark.synapse.datasource") \ .option("accessToken", accessToken) \ .option("kustoCluster", kustoUri) \ .option("kustoDatabase", database) \ .option("kustoQuery", kustoQuery) \ .load() # Show the loaded data print("Loaded data:") df.show() Separate and flag Phishing payload Load it with Spark from pyspark.sql.functions import col, expr, when, udf from urllib.parse import urlparse # Define a UDF (User Defined Function) to extract the domain def extract_domain(url): if url.startswith('http'): return urlparse(url).netloc return None # Register the UDF with Spark extract_domain_udf = udf(extract_domain) # Feature engineering with Spark df = df.withColumn("is_url", col("subscriberData").startswith("http")) \ .withColumn("domain", extract_domain_udf(col("subscriberData"))) \ .withColumn("is_phishing", col("is_url")) # Show the transformed data df.show() Use Spark ML Lib to Train the model Evaluate the Model from pyspark.sql.functions import col from pyspark.ml.feature import Tokenizer, HashingTF, IDF from pyspark.ml.classification import RandomForestClassifier from pyspark.ml import Pipeline from pyspark.ml.evaluation import MulticlassClassificationEvaluator # Ensure the label column is of type double df = df.withColumn("is_phishing", col("is_phishing").cast("double")) # Tokenizer to break text into words tokenizer = Tokenizer(inputCol="subscriberData", outputCol="words") # Convert words to raw features using hashing hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=100) # Compute the term frequency-inverse document frequency (TF-IDF) idf = IDF(inputCol="rawFeatures", outputCol="features") # Random Forest Classifier rf = RandomForestClassifier(labelCol="is_phishing", featuresCol="features", numTrees=10) # Build the ML pipeline pipeline = Pipeline(stages=[tokenizer, hashingTF, idf, rf]) # Split the dataset into training and testing sets train_data, test_data = df.randomSplit([0.7, 0.3], seed=42) # Train the model model = pipeline.fit(train_data) # Make predictions on the test data predictions = model.transform(test_data) # Evaluate the model's accuracy evaluator = MulticlassClassificationEvaluator( labelCol="is_phishing", predictionCol="prediction", metricName="accuracy" ) accuracy = evaluator.evaluate(predictions) # Output the accuracy print(f"Model Accuracy: {accuracy}") Add Signature to AI Model from mlflow.models.signature import infer_signature from pyspark.sql import Row # Select a sample for inferring signature sample_data = train_data.limit(10).toPandas() # Create a Pandas DataFrame for schema inference input_sample = sample_data[["subscriberData"]] # Input column(s) output_sample = model.transform(train_data.limit(10)).select("prediction").toPandas() # Infer the signature signature = infer_signature(input_sample, output_sample) Run – Publish Model and Log Metric: Accuracy import mlflow from mlflow import spark # Start an MLflow run with mlflow.start_run() as run: # Log the Spark MLlib model with the signature mlflow.spark.log_model( spark_model=model, artifact_path="phishing_detector", registered_model_name="PhishingDetector", signature=signature # Add the inferred signature ) # Log metrics like accuracy mlflow.log_metric("accuracy", accuracy) print(f"Model logged successfully under run ID: {run.info.run_id}") Results and Impact Our implementation achieved: 81.8% accuracy in phishing detection Sub-second prediction times for streaming data Scalable processing of thousands of events per second Yes, that's a good start ! Now let's continue our post by explaining the deployment and operation phase of our ML solution: From Model to Production: Automating the ML Pipeline After training our model, the next crucial step is operationalizing it for real-time use. We’ve implemented one Pipeline with two activities that process our streaming data every 5 minutes: All Streaming Data Notebook # Main prediction snippet from synapse.ml.predict import MLFlowTransformer # Apply ML model for phishing detection model = MLFlowTransformer( inputCols=["subscriberData"], outputCol="predictions", modelName="PhishingDetector", modelVersion=3 ) # Transform and save all predictions df_with_predictions = model.transform(df) df_with_predictions.write.format('delta').mode("append").save("Tables/phishing_predictions") Clean Streaming Data Notebook # Filter for non-phishing data only non_phishing_df = df_with_predictions.filter(col("predictions") == 0) # Save clean data for business analysis non_phishing_df.write.format("delta").mode("append").save("Tables/clean_data") Creating Business Value What makes this architecture particularly powerful is the seamless transition from ML predictions to business insights: Delta Lake Integration: All predictions are stored in Delta format, ensuring ACID compliance Enables time travel and data versioning Perfect for creating semantic models Real-Time Processing: 5-minute refresh cycle ensures near real-time threat detection Automatic segregation of clean vs. suspicious data Immediate visibility into potential threats Business Intelligence Ready: Delta tables are directly compatible with semantic modeling Power BI can connect to these tables for live reporting Enables both historical analysis and real-time monitoring The Power of Semantic Models With our data now organized in Delta tables, we’re ready for: Creating dimensional models for better analysis Building real-time dashboards Generating automated reports Setting up alerts for security teams Real-Time Visualization Capabilities While Microsoft Fabric offers extensive visualization capabilities through Power BI, it’s worth highlighting one particularly powerful feature: direct KQL querying for real-time monitoring. Here’s a glimpse of how simple yet powerful this can be: SampleData | where EventProcessedUtcTime > ago(1m) // Fetch rows processed in the last 1 minute | project subscriberId, subscriberData, EventProcessedUtcTime This simple KQL query, when integrated into a dashboard, provides near real-time visibility into your streaming data with sub-minute latency. The visualization possibilities are extensive, but that’s a topic for another day. Conclusion: Bringing It All Together What we’ve built here is more than just a machine learning model – it’s a complete, production-ready system that: Ingests and processes streaming data in real-time Applies sophisticated ML algorithms for threat detection Automatically segregates clean from suspicious data Provides immediate visibility into potential threats The real power of Microsoft Fabric lies in how it seamlessly integrates these different components. From data ingestion through Eventhouse ad Lakehouse, to ML model training and deployment, to real-time monitoring – everything works together in a unified platform. What’s Next? While we’ve focused on phishing detection, this architecture can be adapted for various use cases: Fraud detection in financial transactions Quality control in manufacturing Customer behavior analysis Anomaly detection in IoT devices The possibilities are endless with our imagination and creativity! Stay tuned for the Git Repo where all the code will be shared ! References Get Started with Microsoft Fabric Delta Lake in Fabric Overview of Eventhouse CloudBlogger: A guide to innovative Apps with MS Fabric124Views0likes0Comments