Blog Post

AI - Azure AI services Blog
5 MIN READ

Announcing Multivariate Anomaly Detector in SynapseML

Louise_Han's avatar
Louise_Han
Icon for Microsoft rankMicrosoft
Feb 06, 2022

Today, we are excited to announce a wonderful collaborated feature between Multivariate Anomaly Detector and  SynapseML , which joined together to provide a solution for developers and customers to do multivariate anomaly detection in Synapse. This new capability allows you to detect anomalies quickly and easily in very large datasets and databases, perfectly lighting up scenarios like equipment predictive maintenance. For those who is not familiar with predictive maintenance, it is a technique that uses data analysis tools and techniques to detect anomalies in the operation and possible defects in equipment and processes so customers can fix them before they result in failure. Therefore, this new capability will benefit customers who have a huge number of sensor data within hundreds of pieces of equipment, to do equipment monitor, anomaly detection, and even root cause analysis.

 

Introduction of Multivariate Anomaly Detector and SynapseML

 

Multivariate Anomaly Detector (MVAD) is an AI service in Cognitive Services, which provides APIs that further enable developers by easily integrating advanced AI for detecting anomalies from groups of sensor data, without the need for machine learning knowledge or labeled data. This service helps you to proactively protect your complex systems such as software applications, servers, factory machines, spacecraft, or even your business, from failures.

 

SynapseML v0.9.5 (previously MMLSpark), is an open-source library that aims to simplify the creation of massively scalable machine learning pipelines. SynapseML unifies several existing ML Frameworks and new MSFT algorithms in a single, scalable API that’s usable across Python, R, Scala, and Java.

 

Where to use this new capability?

 

SynapseML is Open Source and can be installed and used on any Spark 3 infrastructure including your local machine, Databricks, Synapse Analytics, and others. Therefore, you have multiple choices to experience this capability.

Can’t wait? You could check out a detailed recipe of this capability: [Demo] Multivariate Anomaly Detector with SynapseML. Also, you could look down for better and quicker understanding.

 

Getting started is simple!

Let’s take Synapse Analytics for example and dive into the following steps!

 

Create resources in Azure Portal

  1. Create an Anomaly Detector to get access to the capability of Multivariate Anomaly Detector.
  2. Create a resource for Azure Synapse Analytics to use the Synapse Studio.
  3. Create a Storage account resource to upload your data for model training and anomaly detection.
  4. Create a Key Vault to hold Anomaly Detector key and Storage Connection String.
    • Go to Key Vault > Access policies, and grant the Azure Synapse workspace MSI permissions to read secrets from Azure Key Vault.
    • Create a secret in Key Vault to hold Anomaly Detector key
    • Create a secret in Key Vault to hold Connection String of Storage account

Log in Azure Synapse Analytics and create a notebook

  1. Log in Azure Synapse Analytics and create a new Notebook for coding.
  2. Select ‘Manage pools’ to create a new if you don’t have one.

 

Start coding

1.Install the latest version of SynapseML.

You can also install SynapseML in Spark Packages, Databricks, Docker, etc. Please refer to SynapseML homepageFor Spark 3.2 pool:

 

%%configure -f
{
  "name": "synapseml",
  "conf": {
      "spark.jars.packages": " com.microsoft.azure:synapseml_2.12:0.9.5 ",
      "spark.jars.repositories": "https://mmlspark.azureedge.net/maven",
      "spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.12,org.scalactic:scalactic_2.12,org.scalatest:scalatest_2.12,io.netty:netty-tcnative-boringssl-static",
      "spark.yarn.user.classpath.first": "true"
  }
}

 

 

2. Load your data

Load your data for training, which is the CSV file stored in blob or another place. You could use our sample data(download here).

Compose your data as following format in a CSV file, and upload it to a cloud storage like Blob Storage. The timestamp column should be in ISO8601 format, and the feature columns should be integers or decimals with any number of decimal places.

 

3.Training Model

Use FitMultivariateAnomaly function to train a MVAD model.

 

from notebookutils import mssparkutils
from synapse.ml.cognitive import *

#Input your key vault information.
anomalyKey = mssparkutils.credentials.getSecret("[key_vault_name]", "[Secret_name]")
connectionString = mssparkutils.credentials.getSecret("[key_vault_name]", "[Connectionstring_name]")

#Specify information about your data.
startTime = "2021-01-01T00:00:00Z"
endTime = "2021-01-02T09:18:00Z"
timestampColumn = "timestamp"
inputColumns = ["feature0", "feature1", "feature2"]
#Specify the container you created in Storage account, you could also initialize a new name here, and Synapse will help you create that container automatically.
containerName = "[container_name]"
#Set a folder name in Storage account to store the intermediate data.
intermediateSaveDir = "[intermediate_folder]"

simpleMultiAnomalyEstimator = (FitMultivariateAnomaly()
    .setSubscriptionKey(anomalyKey)
#In .setLocation, use lowercase letter like: southcentralus.
    .setLocation("[anomaly_detector_resource_region]")
    .setOutputCol("[anomaly_result_column]")
    .setStartTime(startTime)
    .setEndTime(endTime)
    .setContainerName(containerName)
    .setIntermediateSaveDir(intermediateSaveDir)
    .setTimestampCol(timestampColumn)
    .setInputCols(inputColumns)
    .setSlidingWindow(200)
    .setConnectionString(connectionString))

model = simpleMultiAnomalyEstimator.fit(df)
type(model)

 

 

 

4.Inference

Use below code block to do inference with extra part of data.

 

startInferenceTime = "2021-01-02T09:19:00Z"
endInferenceTime = "2021-01-03T01:59:00Z"
result = (model
      .setStartTime(startInferenceTime)
      .setEndTime(endInferenceTime)
      .setOutputCol("result")
      .setTimestampCol(timestampColumn)
      .setInputCols(inputColumns)
      .transform(df))

display(result)

 

 

To show the results only for the infferred data, lets select the columns we need. We can then order the rows in the dataframe by ascending order, and filter the result to only show the rows that are in the range of the inference window. In our case inferenceEndTime is the same as the last row in the dataframe, so can ignore that.

Finally, to be able to better plot the results, let's convert the Spark dataframe to a Pandas dataframe.

 

rdf = (result.select("timestamp",*inputColumns, "results.contributors", "results.isAnomaly", "results.severity")
    .orderBy('timestamp', ascending=True)
    .filter(col('timestamp') >= lit(inferenceStartTime))
    .toPandas())
rdf

 

 

Let's now format the contributors column that stores the contribution score from each sensor to the detected anomalies. The next cell formats this data, and splits the contribution score of each sensor into its own column.

 

def parse(x):
    if type(x) is list:
        return dict([item[::-1] for item in x])
    else:
        return {'series_0': 0, 'series_1': 0, 'series_2': 0}

rdf['contributors'] = rdf['contributors'].apply(parse)
rdf = pd.concat([rdf.drop(['contributors'], axis=1), pd.json_normalize(rdf['contributors'])], axis=1)
rdf

 

 

After format the contributors column that stores the contribution score from each sensor to the detected anomalies, you could get a dataframe like this:


Resource

About Anomaly Detector

About Synapse

Updated Jun 01, 2022
Version 12.0
  • Hi yogiguru, may you try out this code? We fixed some bugs for the previous version. Feel feel free to email me directly if any further questions via: jingruhan@microsoft.com

     

    %%configure -f
    
    {
      "name": "synapseml",
      "conf": {
          "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:0.9.5-13-d1b51517-SNAPSHOT",
          "spark.jars.repositories": "https://mmlspark.azureedge.net/maven",
          "spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.12,org.scalactic:scalactic_2.12,org.scalatest:scalatest_2.12",
          "spark.yarn.user.classpath.first": "true"
      }
    }

     

     

  • yogiguru's avatar
    yogiguru
    Copper Contributor

    Nice article and very useful. I am following instruction but facing issue while creating Spark pool as in Azure we can do only 2.4 and 3.1 and not allowing for 3.2 

    I created pool for 3.1 and using SynapseML 3.1 code but getting error "NameError: name 'FitMultivariateAnomaly' is not defined"

    Please guide me.

    Thanks for your support!

     

    %%configure -f
    {
    "name": "synapseml",
    "conf": {
    "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:0.9.4",
    "spark.jars.repositories": "https://mmlspark.azureedge.net/maven",
    "spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.12,org.scalactic:scalactic_2.12,org.scalatest:scalatest_2.12",
    "spark.yarn.user.classpath.first": "true"
    }
    } 

     

     

     

  • Hi yogiguru, if you're using Spark 3.1 pool, please use the following code, let me know if that works, thanks!

    %%configure -f
    {
      "name": "synapseml",
      "conf": {
          "spark.jars.packages": " com.microsoft.azure:synapseml_2.12:0.9.5-10-47f0c8fc-SNAPSHOT",
          "spark.jars.repositories": "https://mmlspark.azureedge.net/maven",
          "spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.12,org.scalactic:scalactic_2.12,org.scalatest:scalatest_2.12,io.netty:netty-tcnative-boringssl-static",
          "spark.yarn.user.classpath.first": "true"
      }
    }
    

     

  • yogiguru's avatar
    yogiguru
    Copper Contributor

    Thanks Louise_Han  It works, and I have moved till step 4.Inference.  Thanks for sharing email and I will connect. 

  • itzhakhaf's avatar
    itzhakhaf
    Copper Contributor

    Hello, I am trying to follow this guide and getting an error under the training section:

     

    IllegalArgumentException Traceback (most recent call last) <ipython-input-41-30e01202> in <module> 28 ) 29 ---> 30 model = simpleMultiAnomalyEstimator.fit(df) 31 type(model) /opt/spark/python/lib/pyspark.zip/pyspark/ml/base.py in fit(self, dataset, params) 159 return self.copy(params)._fit(dataset) 160 else: --> 161 return self._fit(dataset) 162 else: 163 raise ValueError("Params must be either a param map or a list/tuple of param maps, " /mnt/var/hadoop/tmp/nm-local-dir/usercache/trusted-service-user/appcache/application_1675178823889_0001/container_1675178823889_0001_01_000001/com.microsoft.azure_synapseml-cognitive_2.12-0.9.5-13-d1b51517-SNAPSHOT.jar/synapse/ml/cognitive/FitMultivariateAnomaly.py in _fit(self, dataset) 652 653 def _fit(self, dataset): --> 654 java_model = self._fit_java(dataset) 655 return self._create_model(java_model) 656 /opt/spark/python/lib/pyspark.zip/pyspark/ml/wrapper.py in _fit_java(self, dataset) 330 """ 331 self._transfer_params_to_java() --> 332 return self._java_obj.fit(dataset._jdf) 333 334 def _fit(self, dataset): ~/cluster-env/env/lib/python3.8/site-packages/py4j/java_gateway.py in __call__(self, *args) 1302 1303 answer = self.gateway_client.send_command(command) -> 1304 return_value = get_return_value( 1305 answer, self.gateway_client, self.target_id, self.name) 1306 /opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py in deco(*a, **kw) 115 # Hide where the exception came from that shows a non-Pythonic 116 # JVM exception message. --> 117 raise converted from None 118 else: 119 raise IllegalArgumentException: Invalid connection string.
     
    basically line:
    model = simpleMultiAnomalyEstimator.fit(df)
    I get invalid connection string.
     
    Any thoughts?