As Industry 5.0 is coming which is regarded as the next industrial evolution, the objective is to leverage the creativity of human experts in collaboration with efficient, intelligent, and accurate machines, to obtain resource-efficient and user-preferred manufacturing solutions compared to Industry 4.0. Many companies are struggling with the realities of AI implementation, and the predictive maintenance is helping determine the condition of equipment and predicting when maintenance should be performed. Needless to say, the implementation of ML-based solutions can lead to major cost savings, human resource cost savings, the increased availability of systems, and avoiding catastrophes that might happen.
With Azure Anomaly Detector, especially the feature of Multivariate Anomaly Detector, you could detect anomalies in real time on your equipment and system. This service is easy to use through its simple APIs or SDKs, and will give detailed interpretation at anomalous timestamps to help you better understand what’s happening behind the scenes, and immediately know which sensor(s) to fix among your streaming data of hundreds of sensors.
In February this year, we announced a wonderful integration between Multivariate Anomaly Detector and SynapseML , which provides a solution for developers and customers to do multivariate anomaly detection with SynapseML library, check the announcement blog here. This new capability allows you to detect anomalies quickly and easily in very large datasets, and with easier operations.
In this blog, we will show you how to detect anomalies on your equipment in Azure Databricks, which is a fast, easy, and collaborative Apache Spark-based big data analytics service designed for data science and data engineering.
Multivariate Anomaly Detector (MVAD) is an AI service in Cognitive Services, which provides APIs that further enable developers by easily integrating advanced AI for detecting anomalies from groups of sensor data, without the need for machine learning knowledge or labeled data. This service helps you to proactively protect your complex systems such as software applications, servers, factory machines, spacecraft, or even your business, from failures.
Refering to the workflow, first you should prepare your data with good quality and quantity. The training dataset should include normal pattern data as much as possible, and you should do some pre-procession work on the data like resampling, splitting out downtime, rounding up timestamps, you could also check detailed best practice of MVAD. After preparing the data, you could call the MVAD API directly or use the SDK or use SynapseML library in Synapse Analytics or Azure Databricks. When the model is trained, you could either trigger batch inference for validation purpose, or trigger streaming inference for real-time monitoring. A workflow of MVAD see below.
Coordinates: com.microsoft.azure:synapseml_2.12:0.9.5-103-4975dda5-SNAPSHOT
Repository: https://mmlspark.azureedge.net/maven
For Spark3.1, use the following settings to install library.
Coordinates: com.microsoft.azure:synapseml_2.12:0.9.5-13-d1b51517-SNAPSHOT
Repository: https://mmlspark.azureedge.net/maven
import os
import numpy as np
import pandas as pd
import datetime
import pyspark
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.types import DoubleType
import matplotlib.pyplot as plt
import synapse.ml
from synapse.ml.cognitive import *
import pyspark.sql.functions as F
Load your data for training and inference, which could be CSV file stored in blob or another place. You could use our sample data(download here ). The timestamp column should be in ISO8601 format, and the feature columns should be string type.
In this demo, we are loading data from 3 sensors in a piece of equipment.
df = spark.read.format("csv").option("header", True).load("wasbs://mvadcsvdata@sparkdemostorage.blob.core.windows.net/spark-demo-data.csv")
df = df.withColumn("sensor_1", col("sensor_1").cast(DoubleType())) \
.withColumn("sensor_2", col("sensor_2").cast(DoubleType())) \
.withColumn("sensor_3", col("sensor_3").cast(DoubleType()))
df.show(10)
Use FitMultivariateAnomaly function to train a MVAD model.
#Specify information about your data.
startTime = "2021-01-01T00:00:00Z"
endTime = "2021-01-02T09:18:00Z"
timestampColumn = "timestamp"
inputColumns = ["sensor_1", "sensor_2", "sensor_3"]
#Specify the container you created in Storage account, you could also initialize a new name here, and SynapseML will help you create that container automatically.
containerName = "mvadtest"
#Set a folder name in Storage account to store the intermediate data.
intermediateSaveDir = "intermediateData"
estimator = (FitMultivariateAnomaly()
.setSubscriptionKey("Key of your Anomaly Detector")
#In .setLocation, use lowercase letter like: eastus.
.setLocation("Region of your Anoamly Detector")
.setStartTime(startTime)
.setEndTime(endTime)
.setContainerName(containerName)
.setIntermediateSaveDir(intermediateSaveDir)
.setTimestampCol(timestampColumn)
.setInputCols(inputColumns)
.setSlidingWindow(200)
.setConnectionString("Connection String of your Storage Account"))
model = estimator.fit(df)
type(model)
Use below code block to do inference with extra part of data.
# Specify the time range for inference task.
startInferenceTime = "2021-01-02T09:19:00Z"
endInferenceTime = "2021-01-03T01:59:00Z"
result = (model
.setStartTime(startInferenceTime)
.setEndTime(endInferenceTime)
.setOutputCol("results")
.setErrorCol("errors")
.setTimestampCol(timestampColumn)
.setInputCols(inputColumns)
.transform(df))
rdf = (result.select("timestamp",*inputColumns, "results.contributors", "results.isAnomaly", "results.severity")
.orderBy('timestamp', ascending=True)
.filter(col('timestamp') >= lit(startInferenceTime))
.toPandas())
def parse(x):
if type(x) is list:
return dict([item[::-1] for item in x])
else:
return {'series_0': 0, 'series_1': 0, 'series_2': 0}
rdf['contributors'] = rdf['contributors'].apply(parse)
rdf = pd.concat([rdf.drop(['contributors'], axis=1), pd.json_normalize(rdf['contributors'])], axis=1)
rdf
In result, you will get a dataframe containing detecting timestamps and anomaly detection results. If the timestamp is anomalous, then the severity will be a number above 0 and below 1. For the last three columns, they indicated the contribution score of each sensor accordingly, the larger the contirbution score is, the more anomalous the sensor is.
You could draw a plot to see the results of anomaly detection using below codes.
minSeverity = 0.1
####### Main Figure #######
plt.figure(figsize=(23,8))
plt.plot(nrdf['timestamp'],nrdf['sensor_1'], color='tab:orange', linestyle='solid', linewidth=2, label='sensor_1')
plt.plot(nrdf['timestamp'],nrdf['sensor_2'], color='tab:green', linestyle='solid', linewidth=2, label='sensor_2')
plt.plot(nrdf['timestamp'],nrdf['sensor_3'], color='tab:blue', linestyle='solid', linewidth=2, label='sensor_3')
plt.grid(axis='y')
plt.tick_params(axis='x',which='both',bottom=False,labelbottom=False)
plt.legend()
anoms = list(nrdf["severity"] >= minSeverity)
_, _, ymin, ymax = plt.axis()
plt.vlines(np.where(anoms), ymin=ymin , ymax=ymax , color='r', alpha=0.8)
plt.legend()
plt.title('A plot of the values from the three sensors with the detected anomalies highlighted in red.')
plt.show()
####### Severity Figure #######
plt.figure(figsize=(23,1))
plt.tick_params(axis='x',which='both',bottom=False,labelbottom=False)
plt.plot(nrdf['timestamp'],nrdf['severity'], color='black', linestyle='solid', linewidth=2, label='Severity score')
plt.plot(nrdf['timestamp'],[minSeverity]*len(nrdf['severity']), color='red', linestyle='dotted', linewidth=1, label='minSeverity')
plt.grid(axis='y')
plt.legend()
plt.ylim([0,1])
plt.title("Severity of the detected anomalies")
plt.show()
####### Contributors Figure #######
plt.figure(figsize=(23,1))
plt.tick_params(axis='x',which='both',bottom=False,labelbottom=False)
plt.bar(nrdf['timestamp'],nrdf['series_0'], width=2, color='tab:orange', label='sensor_1')
plt.bar(nrdf['timestamp'],nrdf['series_1'], width=2, color='tab:green', label='sensor_2', bottom=nrdf['series_0'])
plt.bar(nrdf['timestamp'],nrdf['series_2'], width=2, color='tab:blue', label='sensor_3', bottom=nrdf['series_0']+nrdf['series_1'])
plt.grid(axis='y')
plt.legend()
plt.ylim([0,1])
plt.title("The contribution of each sensor to the detected anomaly")
plt.show()
For real-time inference, you could split out the code of inference part to another notebook and set a scheduler like below, which will trigger inference job running automatically per the cadence you set.
Resources
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.