Introduction
Updated: 12 Oct 2020
Machine Learning is widely used these days for various data driven tasks including detection of security threats, monitoring IoT devices for predictive maintenance, recommendation systems, financial analysis and many other domains. Most ML models are built and deployed in two steps:
- Offline training
- Real time scoring
ML Training is done by researchers/data scientists. They fetch the training data, clean it, engineer features, try different models and tune parameters, repeating this cycle to improve the model’s quality and accuracy. This process is usually done using data science tools such as Jupyter, PyCharm, VS Code, Matlab etc. Once the model meets the required quality it is serialized and saved for scoring.
ML Scoring is the process of applying the model on new data to get insights and predictions. This is actually the business goal for building the model. Scoring usually needs to be done at scale with minimal latency, processing large sets of new records. For ADX users, the best solution for scoring is directly in ADX. ADX scoring is done on its compute nodes, in distributed manner near the data, thus achieving the best performance with minimal latency.
There are many types of models such as Bayesian models, decision trees and forests, regressions, deep neural networks and many more. These models can be built by various frameworks and/or packages like Scikit-learn, Tensorflow, CNTK, Keras, Caffe2, PyTorch etc. (here is a nice overview of ML algorithms, tools and frameworks). On one hand this variety is very good – you can find the most convenient algorithm and framework for your scenario, but on the other hand it creates an interoperability issue, as usually the ML scoring is done on infrastructure which is different from the one used for the training.
To resolve it, Microsoft and Facebook introduced in 2017 ONNX, Open Neural Network Exchange, that was adopted by many companies including AWS, IBM, Intel, Baidu, Mathworks, NVIDIA and many more. ONNX is a system for representation and serialization of ML models to a common file format. This format enables smooth switching among ML frameworks as well as allowing hardware vendors and others to improve the performance of deep neural networks for multiple frameworks at once by targeting the ONNX representation.
In this blog we explain how ADX can consume ONNX models, that were built and trained externally, for near real time scoring of new samples that are ingested into ADX.
How to use ADX for scoring ONNX models
ADX supports running Python code embedded in Kusto Query Language (KQL) using the python() plugin. The Python code is run in multiple sandboxes on ADX existing compute nodes. The Python image is based on Anaconda distribution and contains the most common ML frameworks including Scikit-learn, TensorFlow, Keras and PyTorch. To score ONNX models in ADX follow these steps:
- Develop your ML model using your favorite framework and tools
- Convert the final trained model to ONNX format
- Export the ONNX model to a table on ADX or to an Azure blob
- Score new data in ADX using the inline python() plugin
Example
We build a model to predict room occupancy based on Occupancy Detection data, a public dataset from UCI Repository. This model is a binary classifier to predict occupied/empty room based on Temperature, Humidity, Light and CO2 sensors measurements. The complete process can be found in this Jupyter notebook. Here we embed few snips just to present the main concepts
Prerequisite
- Enable Python plugin on your ADX cluster (see the Onboarding section of the python() plugin doc)
- Whitelist a blob container to be accessible by ADX Python sandbox (see the Appendix section of that doc)
- Create a Python environment (conda or virtual env) that reflects the Python sandbox image
- Install in that environment ONNX packages: onnxruntime and skl2onnx packages
- Install in that environment Azure Blob Storage package: azure-storage-blob
- Install KqlMagic to easily connect and query ADX cluster from Jupyter notebooks
Retrieve and explore the data using KqlMagic
reload_ext Kqlmagic
%config Kqlmagic.auto_dataframe = True
%kql kusto://code;cluster='demo11.westus';database='ML' -try_azcli_login
%kql df << OccupancyDetection
df[-4:]
Timestamp |
Temperature |
Humidity |
Light |
CO2 |
HumidityRatio |
Occupancy |
Test |
|
20556 |
2015-02-18 09:16:00.0000000 |
20.865 |
27.7450 |
423.50 |
1514.5 |
0.004230 |
True |
True |
20557 |
2015-02-18 09:16:00.0000000 |
20.890 |
27.7450 |
423.50 |
1521.5 |
0.004237 |
True |
True |
20558 |
2015-02-18 09:17:00.0000000 |
20.890 |
28.0225 |
418.75 |
1632.0 |
0.004279 |
True |
True |
20559 |
2015-02-18 09:19:00.0000000 |
21.000 |
28.1000 |
409.00 |
1864.0 |
0.004321 |
True |
True |
Train your model
Split the data to features (x), labels (y) and for training/testing:
train_x = df[df['Test'] == False][['Temperature', 'Humidity', 'Light', 'CO2', 'HumidityRatio']]
train_y = df[df['Test'] == False]['Occupancy']
test_x = df[df['Test'] == True][['Temperature', 'Humidity', 'Light', 'CO2', 'HumidityRatio']]
test_y = df[df['Test'] == True]['Occupancy']
print(train_x.shape, train_y.shape, test_x.shape, test_y.shape)
(8143, 5) (8143,) (12417, 5) (12417,)
Train few classic models from Scikit-learn:
from sklearn import tree
from sklearn import neighbors
from sklearn import naive_bayes
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_score
#four classifier types
clf1 = tree.DecisionTreeClassifier()
clf2 = LogisticRegression(solver='liblinear')
clf3 = neighbors.KNeighborsClassifier()
clf4 = naive_bayes.GaussianNB()
clf1 = clf1.fit(train_x, train_y)
clf2 = clf2.fit(train_x, train_y)
clf3 = clf3.fit(train_x, train_y)
clf4 = clf4.fit(train_x, train_y)
# Accuracy on Testing set
for clf, label in zip([clf1, clf2, clf3, clf4], ['Decision Tree', 'Logistic Regression', 'K Nearest Neighbour', 'Naive Bayes']):
scores = cross_val_score(clf, test_x, test_y, cv=5, scoring='accuracy')
print("Accuracy: %0.4f (+/- %0.4f) [%s]" % (scores.mean(), scores.std(), label))
Accuracy: 0.8605 (+/- 0.1130) [Decision Tree]
Accuracy: 0.9887 (+/- 0.0071) [Logistic Regression]
Accuracy: 0.9656 (+/- 0.0224) [K Nearest Neighbour]
Accuracy: 0.8893 (+/- 0.1265) [Naive Bayes]
The logistic regression model is the best one
Convert the model to ONNX
from skl2onnx import convert_sklearn
from skl2onnx.common.data_types import FloatTensorType
# We define the input type (5 sensors readings), convert the scikit-learn model to ONNX and serialize it
initial_type = [('float_input', FloatTensorType([None, 5]))]
onnx_model = convert_sklearn(clf2, initial_types=initial_type)
bmodel = onnx_model.SerializeToString()
Test ONNX Model
Predict using ONNX runtime
import numpy as np
import onnxruntime as rt
sess = rt.InferenceSession(bmodel)
input_name = sess.get_inputs()[0].name
label_name = sess.get_outputs()[0].name
pred_onnx = sess.run([label_name], {input_name: test_x.values.astype(np.float32)})[0]
# Verify ONNX and Scikit-learn predictions are same
pred_clf2 = clf2.predict(test_x)
diff_num = (pred_onnx != pred_clf2).sum()
if diff_num:
print(f'Predictions difference between sklearn and onnxruntime, total {diff_num} elements differ')
else:
print('Same prediction using sklearn and onnxruntime')
Same prediction using sklearn and onnxruntime
Scoring in ADX
There are 2 options for retrieving the model for scoring:
- serialize the model to a string to be stored in a standard table in ADX
- copy the model to a blob container (that was previously whitelisted for access by ADX Python sandbox)
Scoring from a serialized model which is stored in ADX table
Serializing the model and store it in ADX models table using KqlMagic
import pandas as pd
import datetime
models_tbl = 'ML_Models'
model_name = 'ONNX-Occupancy'
smodel = bmodel.hex()
now = datetime.datetime.now()
dfm = pd.DataFrame({'name':[model_name], 'timestamp':[now], 'model':[smodel]})
dfm
name |
timestamp |
model |
|
0 |
ONNX-Occupancy |
2020-07-28 17:07:20.280040 |
08031208736b6c326f6e6e781a05312e362e3122076169... |
set_query = '''
.set-or-append {0} <|
let tbl = dfm;
tbl
'''.format(models_tbl)
print(set_query)
.set-or-append ML_Models <|
let tbl = dfm;
tbl
%kql -query set_query
ExtentId |
OriginalSize |
ExtentSize |
CompressedSize |
IndexSize |
RowCount |
|
0 |
bfc9acc2-3d79-4e64-9a79-d2681547e43d |
1430.0 |
1490.0 |
1040.0 |
450.0 |
1 |
Scoring from serialized model which is stored in ADX table
# NOTE:
#
# We run ADX scoring query here using KqlMagic by embedding the query from Kusto Explorer
# with r'''Kusto Explorer query''':
scoring_from_table_query = r'''
let predict_onnx_fl=(samples:(*), models_tbl:(name:string, timestamp:datetime, model:string), model_name:string, features_cols:dynamic, pred_col:string)
{
let model_str = toscalar(models_tbl | where name == model_name | top 1 by timestamp desc | project model);
let kwargs = pack('smodel', model_str, 'features_cols', features_cols, 'pred_col', pred_col);
let code =
'\n'
'import pickle\n'
'import binascii\n'
'\n'
'smodel = kargs["smodel"]\n'
'features_cols = kargs["features_cols"]\n'
'pred_col = kargs["pred_col"]\n'
'bmodel = binascii.unhexlify(smodel)\n'
'\n'
'features_cols = kargs["features_cols"]\n'
'pred_col = kargs["pred_col"]\n'
'\n'
'import onnxruntime as rt\n'
'sess = rt.InferenceSession(bmodel)\n'
'input_name = sess.get_inputs()[0].name\n'
'label_name = sess.get_outputs()[0].name\n'
'df1 = df[features_cols]\n'
'predictions = sess.run([label_name], {input_name: df1.values.astype(np.float32)})[0]\n'
'\n'
'result = df\n'
'result[pred_col] = pd.DataFrame(predictions, columns=[pred_col])'
'\n'
;
samples | evaluate python(typeof(*), code, kwargs)
};
OccupancyDetection
| where Test == 1
| extend pred_Occupancy=bool(0)
| invoke predict_onnx_fl(ML_Models, 'ONNX-Occupancy', pack_array('Temperature', 'Humidity', 'Light', 'CO2', 'HumidityRatio'), 'pred_Occupancy')
'''
%kql pred_df << -query scoring_from_table_query
pred_df[-4:]
Timestamp |
Temperature |
Humidity |
Light |
CO2 |
HumidityRatio |
Occupancy |
Test |
pred_Occupancy |
|
12413 |
2015-02-18 09:16:00+00:00 |
20.865 |
27.7450 |
423.50 |
1514.5 |
0.004230 |
True |
True |
True |
12414 |
2015-02-18 09:16:00+00:00 |
20.890 |
27.7450 |
423.50 |
1521.5 |
0.004237 |
True |
True |
True |
12415 |
2015-02-18 09:17:00+00:00 |
20.890 |
28.0225 |
418.75 |
1632.0 |
0.004279 |
True |
True |
True |
12416 |
2015-02-18 09:19:00+00:00 |
21.000 |
28.1000 |
409.00 |
1864.0 |
0.004321 |
True |
True |
True |
print('Confusion Matrix')
pred_df.groupby(['Occupancy', 'pred_Occupancy']).size()
Confusion Matrix
Occupancy pred_Occupancy
False False 9284
True 112
True False 15
True 3006
For further details have a look at predict_onnx_fl() in ADX functions library
Scoring from model which is stored in blob storage
Copy the model to blob
Note again that the blob container should be whitelisted to be accessible by ADX Python sandbox
from azure.storage.blob import BlobClient
conn_str = "BlobEndpoint=https://artifcatswestus.blob.core.windows.net/kusto;SharedAccessSignature=?**** YOUR SAS KEY ****"
blob_client = BlobClient.from_connection_string(conn_str, container_name="ONNX", blob_name="room_occupancy.onnx")
res = blob_client.upload_blob(bmodel, overwrite=True)
# Notes:
#
# Replace the string "**** YOUR SAS KEY ****" below with the real SAS
# We run ADX scoring query here using KqlMagic by embedding the query from Kusto Explorer
# with r'''Kusto Explorer query''':
scoring_from_blob_query = r'''
let predict_onnx_blob_fl=(samples:(*), model_sas:string, features_cols:dynamic, pred_col:string)
{
let kwargs = pack('features_cols', features_cols, 'pred_col', pred_col);
let code =
'\n'
'features_cols = kargs["features_cols"]\n'
'pred_col = kargs["pred_col"]\n'
'\n'
'import onnxruntime as rt\n'
'sess = rt.InferenceSession(r"C:\\Temp\\model.onnx")\n'
'input_name = sess.get_inputs()[0].name\n'
'label_name = sess.get_outputs()[0].name\n'
'df1 = df[features_cols]\n'
'predictions = sess.run([label_name], {input_name: df1.values.astype(np.float32)})[0]\n'
'\n'
'result = df\n'
'result[pred_col] = pd.DataFrame(predictions, columns=[pred_col])'
'\n'
;
samples | evaluate python(typeof(*), code, kwargs,
external_artifacts=pack('model.onnx', model_sas))
};
OccupancyDetection
| where Test == 1
| extend pred_Occupancy=bool(0)
| invoke predict_onnx_blob_fl('https://artifcatswestus.blob.core.windows.net/kusto/ONNX/room_occupancy.onnx?**** YOUR SAS KEY ****',
pack_array('Temperature', 'Humidity', 'Light', 'CO2', 'HumidityRatio'), 'pred_Occupancy')
'''
%kql pred_df << -query scoring_from_blob_query
pred_df[-4:]
Timestamp |
Temperature |
Humidity |
Light |
CO2 |
HumidityRatio |
Occupancy |
Test |
pred_Occupancy |
|
12413 |
2015-02-18 09:16:00+00:00 |
20.865 |
27.7450 |
423.50 |
1514.5 |
0.004230 |
True |
True |
True |
12414 |
2015-02-18 09:16:00+00:00 |
20.890 |
27.7450 |
423.50 |
1521.5 |
0.004237 |
True |
True |
True |
12415 |
2015-02-18 09:17:00+00:00 |
20.890 |
28.0225 |
418.75 |
1632.0 |
0.004279 |
True |
True |
True |
12416 |
2015-02-18 09:19:00+00:00 |
21.000 |
28.1000 |
409.00 |
1864.0 |
0.004321 |
True |
True |
True |
print('Confusion Matrix')
pred_df.groupby(['Occupancy', 'pred_Occupancy']).size()
Confusion Matrix
Occupancy pred_Occupancy
False False 9284
True 112
True False 15
True 3006
Summary
In this tutorial we showed how to train a model in Scikit-learn, convert it to ONNX format and export it to ADX for scoring. This workflow is convenient as
- Training can be done on any hardware platform, using any framework supporting ONNX
- Scoring is done in ADX near the data, on the existing compute nodes, enabling near real time processing of big amounts of new data. There is no need to export the data to external scoring service and import back the results. Consequently, scoring architecture is simpler and performance is much faster and scalable
Updated Oct 12, 2020
Version 15.0adieldar
Microsoft
Joined March 26, 2019
Azure Data Explorer Blog
Follow this blog board to get notified when there's new activity