Blog Post

Azure Data Explorer Blog
5 MIN READ

Advanced Time Series Anomaly Detector in Fabric

adieldar's avatar
adieldar
Icon for Microsoft rankMicrosoft
Aug 22, 2024

Introduction

Anomaly Detector, one of Azure AI services, enables you to monitor and detect anomalies in your time series data. This service is based on advanced algorithms, SR-CNN for univariate analysis and MTAD-GAT for multivariate analysis and is being retired by October 2026. In this blog post we will lay out a migration strategy to Microsoft Fabric, allowing you to detect identical anomalies, using the same algorithms as the old service, and even more. Here are a few of the benefits of the strategy that we are about to lay out for you:

  • Easier management of the trained models’ lifecycle using Fabric ML.
  • No need to upload your data to external storage account, just stream your data to Fabric Eventhouse with OneLake availability and you can use it for training and scoring.
  • You can use your data by any Fabric engine (KQL DB, Fabric ML Notebook, PBI and more)
  • The algorithms are open sourced and published by the new  time-series-anomaly-detector · PyPI package, thus you can review and tweak them as needed.

Time Series Anomaly Detection in Fabric RTI

There are few options for time series anomaly detection in Fabric RTI (Real Time Intelligence):

Using time-series-anomaly-detector in Fabric

In the following example we shall

  • Upload stocks change table to Fabric
  • Train the multivariate anomaly detection model in a Python notebook using Spark engine
  • Predict anomalies by applying the trained model to new data using Eventhouse (Kusto) engine

Note that for the univariate model there is no need to train the model in a separate step (as the training is fast and done internally) and we can just predict.

Below we briefly present the steps, see Multivariate anomaly detection - Microsoft Fabric | Microsoft Learn for the detailed tutorial.

 

Creating the environments

  1. Create a Workspace
  2. Create Eventhouse – to store the incoming streaming data
    • Enable OneLake availability – so the older data that was ingested to the Eventhouse can be seamlessly accessed by the Spark Notebook for training the anomaly detection model
    • Enable KQL Python plugin – to be used for real time predictions of anomalies on the new streaming data. Select 3.11.7 DL image that contains the time-series-anomaly-detector package
  3. Create a Spark environment that includes the time-series-anomaly-detector package

Training & storing the Anomaly Detection model

  1. Upload the stocks data to the Eventhouse
  2. Create a notebook to train the model
  • Load the data from the Eventhouse using the OneLake path:

 

 

onelake_uri = "OneLakeTableURI" # Replace with your OneLake table URI 
abfss_uri = convert_onelake_to_abfss(onelake_uri)
df = spark.read.format('delta').load(abfss_uri)
df = df.toPandas().set_index('Date')

 

 

 

  • View the data:

 

 

import plotly.graph_objects as go

fig = go.Figure()
fig.add_trace(go.Scatter(x=df.index, y=df['AAPL'], mode='lines', name='AAPL'))
fig.add_trace(go.Scatter(x=df.index, y=df['AMZN'], mode='lines', name='AMZN'))
fig.add_trace(go.Scatter(x=df.index, y=df['GOOG'], mode='lines', name='GOOG'))
fig.add_trace(go.Scatter(x=df.index, y=df['MSFT'], mode='lines', name='MSFT'))
fig.add_trace(go.Scatter(x=df.index, y=df['SPY'], mode='lines', name='SPY'))
fig.update_layout(
    title='Stock Prices change',
    xaxis_title='Date',
    yaxis_title='Change %',
    legend_title='Tickers'
)

fig.show()

 

 

 

  • Prepare the data for training:

 

 

features_cols = ['AAPL', 'AMZN', 'GOOG', 'MSFT', 'SPY']
cutoff_date = pd.to_datetime('2023-01-01')
train_df = df[df.Date < cutoff_date]

 

 

  • Train the model:

 

 

import mlflow
from anomaly_detector import MultivariateAnomalyDetector
model = MultivariateAnomalyDetector()
sliding_window = 200
param   s = {"sliding_window": sliding_window}
model.fit(train_df, params=params)

 

 

  • Save the model in Fabric ML model registry

 

 

with mlflow.start_run():
    mlflow.log_params(params)
    mlflow.set_tag("Training Info", "MVAD on 5 Stocks Dataset")

    model_info = mlflow.pyfunc.log_model(
        python_model=model,
        artifact_path="mvad_artifacts",
        registered_model_name="mvad_5_stocks_model",
    )

 

 

 

  • Extract the mode path (to be used by the Eventhouse for the prediction):

 

 

mi = mlflow.search_registered_models(filter_string="name='mvad_5_stocks_model'")[0]
model_abfss = mi.latest_versions[0].source
print(model_abfss)

 

 

  1. Create a Query set and attached the Eventhouse to it
    • Run the ‘.create-or-alter function’ query to define predict_fabric_mvad_fl() stored function:

 

 

.create-or-alter function with (folder = "Packages\\ML", docstring = "Predict MVAD model in Microsoft Fabric")
predict_fabric_mvad_fl(samples:(*), features_cols:dynamic, artifacts_uri:string, trim_result:bool=false)
{
    let s = artifacts_uri;
    let artifacts = bag_pack('MLmodel', strcat(s, '/MLmodel;impersonate'), 'conda.yaml', strcat(s, '/conda.yaml;impersonate'),
                             'requirements.txt', strcat(s, '/requirements.txt;impersonate'), 'python_env.yaml', strcat(s, '/python_env.yaml;impersonate'),
                             'python_model.pkl', strcat(s, '/python_model.pkl;impersonate'));
    let kwargs = bag_pack('features_cols', features_cols, 'trim_result', trim_result);
    let code = ```if 1:
        import os
        import shutil
        import mlflow
        model_dir = 'C:/Temp/mvad_model'
        model_data_dir = model_dir + '/data'
        os.mkdir(model_dir)
        shutil.move('C:/Temp/MLmodel', model_dir)
        shutil.move('C:/Temp/conda.yaml', model_dir)
        shutil.move('C:/Temp/requirements.txt', model_dir)
        shutil.move('C:/Temp/python_env.yaml', model_dir)
        shutil.move('C:/Temp/python_model.pkl', model_dir)
        features_cols = kargs["features_cols"]
        trim_result = kargs["trim_result"]
        test_data = df[features_cols]
        model = mlflow.pyfunc.load_model(model_dir)
        predictions = model.predict(test_data)
        predict_result = pd.DataFrame(predictions)
        samples_offset = len(df) - len(predict_result)        # this model doesn't output predictions for the first sliding_window-1 samples
        if trim_result:                                       # trim the prefix samples
            result = df[samples_offset:]
            result.iloc[:,-4:] = predict_result.iloc[:, 1:]   # no need to copy 1st column which is the timestamp index
        else:
            result = df                                       # output all samples
            result.iloc[samples_offset:,-4:] = predict_result.iloc[:, 1:]
        ```;
    samples
    | evaluate python(typeof(*), code, kwargs, external_artifacts=artifacts)
}

 

 

  • Run the prediction query that will detect multivariate anomalies on the 5 stocks, based on the trained model, and render it as anomalychart. Note that the anomalous points are rendered on the first stock (AAPL), though they represent multivariate anomalies, i.e. anomalies of the vector of the 5 stocks in the specific date.

 

 

let cutoff_date=datetime(2023-01-01);
let num_predictions=toscalar(demo_stocks_change | where Date >= cutoff_date | count);   //  number of latest points to predict
let sliding_window=200;                                                                 //  should match the window that was set for model training
let prefix_score_len = sliding_window/2+min_of(sliding_window/2, 200)-1;
let num_samples = prefix_score_len + num_predictions;
demo_stocks_change
| top num_samples by Date desc 
| order by Date asc
| extend is_anomaly=bool(false), score=real(null), severity=real(null), interpretation=dynamic(null)
| invoke predict_fabric_mvad_fl(pack_array('AAPL', 'AMZN', 'GOOG', 'MSFT', 'SPY'),
            // NOTE: Update artifacts_uri to model path
            artifacts_uri='enter your model URI here',
            trim_result=true)
| summarize Date=make_list(Date), AAPL=make_list(AAPL), AMZN=make_list(AMZN), GOOG=make_list(GOOG), MSFT=make_list(MSFT), SPY=make_list(SPY), anomaly=make_list(toint(is_anomaly))
| render anomalychart with(anomalycolumns=anomaly, title='Stock Price Changest in % with Anomalies')

 

 

 

 

Summary

The addition of the time-series-anomaly-detector package to Fabric makes it the top platform for univariate & multivariate time series anomaly detection. Choose the anomaly detection method that best fits your scenario – from native KQL function for univariate analysis at scale, through standard multivariate analysis techniques and up to the best of breed time series anomaly detection algorithms implemented in the time-series-anomaly-detector package. For more information see the overview and tutorial.

Updated Aug 29, 2024
Version 3.0
No CommentsBe the first to comment