Updated: 12 Oct 2020
Machine Learning is widely used these days for various data driven tasks including detection of security threats, monitoring IoT devices for predictive maintenance, recommendation systems, financial analysis and many other domains. Most ML models are built and deployed in two steps:
ML Training is done by researchers/data scientists. They fetch the training data, clean it, engineer features, try different models and tune parameters, repeating this cycle to improve the model’s quality and accuracy. This process is usually done using data science tools such as Jupyter, PyCharm, VS Code, Matlab etc. Once the model meets the required quality it is serialized and saved for scoring.
ML Scoring is the process of applying the model on new data to get insights and predictions. This is actually the business goal for building the model. Scoring usually needs to be done at scale with minimal latency, processing large sets of new records. For ADX users, the best solution for scoring is directly in ADX. ADX scoring is done on its compute nodes, in distributed manner near the data, thus achieving the best performance with minimal latency.
There are many types of models such as Bayesian models, decision trees and forests, regressions, deep neural networks and many more. These models can be built by various frameworks and/or packages like Scikit-learn, Tensorflow, CNTK, Keras, Caffe2, PyTorch etc. (here is a nice overview of ML algorithms, tools and frameworks). On one hand this variety is very good – you can find the most convenient algorithm and framework for your scenario, but on the other hand it creates an interoperability issue, as usually the ML scoring is done on infrastructure which is different from the one used for the training.
To resolve it, Microsoft and Facebook introduced in 2017 ONNX, Open Neural Network Exchange, that was adopted by many companies including AWS, IBM, Intel, Baidu, Mathworks, NVIDIA and many more. ONNX is a system for representation and serialization of ML models to a common file format. This format enables smooth switching among ML frameworks as well as allowing hardware vendors and others to improve the performance of deep neural networks for multiple frameworks at once by targeting the ONNX representation.
In this blog we explain how ADX can consume ONNX models, that were built and trained externally, for near real time scoring of new samples that are ingested into ADX.
How to use ADX for scoring ONNX models
ADX supports running Python code embedded in Kusto Query Language (KQL) using the python() plugin. The Python code is run in multiple sandboxes on ADX existing compute nodes. The Python image is based on Anaconda distribution and contains the most common ML frameworks including Scikit-learn, TensorFlow, Keras and PyTorch. To score ONNX models in ADX follow these steps:
We build a model to predict room occupancy based on Occupancy Detection data, a public dataset from UCI Repository. This model is a binary classifier to predict occupied/empty room based on Temperature, Humidity, Light and CO2 sensors measurements. The complete process can be found in this Jupyter notebook. Here we embed few snips just to present the main concepts
reload_ext Kqlmagic
%config Kqlmagic.auto_dataframe = True
%kql kusto://code;cluster='demo11.westus';database='ML' -try_azcli_login
%kql df << OccupancyDetection
df[-4:]
Timestamp |
Temperature |
Humidity |
Light |
CO2 |
HumidityRatio |
Occupancy |
Test |
|
20556 |
2015-02-18 09:16:00.0000000 |
20.865 |
27.7450 |
423.50 |
1514.5 |
0.004230 |
True |
True |
20557 |
2015-02-18 09:16:00.0000000 |
20.890 |
27.7450 |
423.50 |
1521.5 |
0.004237 |
True |
True |
20558 |
2015-02-18 09:17:00.0000000 |
20.890 |
28.0225 |
418.75 |
1632.0 |
0.004279 |
True |
True |
20559 |
2015-02-18 09:19:00.0000000 |
21.000 |
28.1000 |
409.00 |
1864.0 |
0.004321 |
True |
True |
Split the data to features (x), labels (y) and for training/testing:
train_x = df[df['Test'] == False][['Temperature', 'Humidity', 'Light', 'CO2', 'HumidityRatio']]
train_y = df[df['Test'] == False]['Occupancy']
test_x = df[df['Test'] == True][['Temperature', 'Humidity', 'Light', 'CO2', 'HumidityRatio']]
test_y = df[df['Test'] == True]['Occupancy']
print(train_x.shape, train_y.shape, test_x.shape, test_y.shape)
(8143, 5) (8143,) (12417, 5) (12417,)
Train few classic models from Scikit-learn:
from sklearn import tree
from sklearn import neighbors
from sklearn import naive_bayes
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_score
#four classifier types
clf1 = tree.DecisionTreeClassifier()
clf2 = LogisticRegression(solver='liblinear')
clf3 = neighbors.KNeighborsClassifier()
clf4 = naive_bayes.GaussianNB()
clf1 = clf1.fit(train_x, train_y)
clf2 = clf2.fit(train_x, train_y)
clf3 = clf3.fit(train_x, train_y)
clf4 = clf4.fit(train_x, train_y)
# Accuracy on Testing set
for clf, label in zip([clf1, clf2, clf3, clf4], ['Decision Tree', 'Logistic Regression', 'K Nearest Neighbour', 'Naive Bayes']):
scores = cross_val_score(clf, test_x, test_y, cv=5, scoring='accuracy')
print("Accuracy: %0.4f (+/- %0.4f) [%s]" % (scores.mean(), scores.std(), label))
Accuracy: 0.8605 (+/- 0.1130) [Decision Tree]
Accuracy: 0.9887 (+/- 0.0071) [Logistic Regression]
Accuracy: 0.9656 (+/- 0.0224) [K Nearest Neighbour]
Accuracy: 0.8893 (+/- 0.1265) [Naive Bayes]
The logistic regression model is the best one
from skl2onnx import convert_sklearn
from skl2onnx.common.data_types import FloatTensorType
# We define the input type (5 sensors readings), convert the scikit-learn model to ONNX and serialize it
initial_type = [('float_input', FloatTensorType([None, 5]))]
onnx_model = convert_sklearn(clf2, initial_types=initial_type)
bmodel = onnx_model.SerializeToString()
Predict using ONNX runtime
import numpy as np
import onnxruntime as rt
sess = rt.InferenceSession(bmodel)
input_name = sess.get_inputs()[0].name
label_name = sess.get_outputs()[0].name
pred_onnx = sess.run([label_name], {input_name: test_x.values.astype(np.float32)})[0]
# Verify ONNX and Scikit-learn predictions are same
pred_clf2 = clf2.predict(test_x)
diff_num = (pred_onnx != pred_clf2).sum()
if diff_num:
print(f'Predictions difference between sklearn and onnxruntime, total {diff_num} elements differ')
else:
print('Same prediction using sklearn and onnxruntime')
Same prediction using sklearn and onnxruntime
There are 2 options for retrieving the model for scoring:
Serializing the model and store it in ADX models table using KqlMagic
import pandas as pd
import datetime
models_tbl = 'ML_Models'
model_name = 'ONNX-Occupancy'
smodel = bmodel.hex()
now = datetime.datetime.now()
dfm = pd.DataFrame({'name':[model_name], 'timestamp':[now], 'model':[smodel]})
dfm
name |
timestamp |
model |
|
0 |
ONNX-Occupancy |
2020-07-28 17:07:20.280040 |
08031208736b6c326f6e6e781a05312e362e3122076169... |
set_query = '''
.set-or-append {0} <|
let tbl = dfm;
tbl
'''.format(models_tbl)
print(set_query)
.set-or-append ML_Models <|
let tbl = dfm;
tbl
%kql -query set_query
ExtentId |
OriginalSize |
ExtentSize |
CompressedSize |
IndexSize |
RowCount |
|
0 |
bfc9acc2-3d79-4e64-9a79-d2681547e43d |
1430.0 |
1490.0 |
1040.0 |
450.0 |
1 |
Scoring from serialized model which is stored in ADX table
# NOTE:
#
# We run ADX scoring query here using KqlMagic by embedding the query from Kusto Explorer
# with r'''Kusto Explorer query''':
scoring_from_table_query = r'''
let predict_onnx_fl=(samples:(*), models_tbl:(name:string, timestamp:datetime, model:string), model_name:string, features_cols:dynamic, pred_col:string)
{
let model_str = toscalar(models_tbl | where name == model_name | top 1 by timestamp desc | project model);
let kwargs = pack('smodel', model_str, 'features_cols', features_cols, 'pred_col', pred_col);
let code =
'\n'
'import pickle\n'
'import binascii\n'
'\n'
'smodel = kargs["smodel"]\n'
'features_cols = kargs["features_cols"]\n'
'pred_col = kargs["pred_col"]\n'
'bmodel = binascii.unhexlify(smodel)\n'
'\n'
'features_cols = kargs["features_cols"]\n'
'pred_col = kargs["pred_col"]\n'
'\n'
'import onnxruntime as rt\n'
'sess = rt.InferenceSession(bmodel)\n'
'input_name = sess.get_inputs()[0].name\n'
'label_name = sess.get_outputs()[0].name\n'
'df1 = df[features_cols]\n'
'predictions = sess.run([label_name], {input_name: df1.values.astype(np.float32)})[0]\n'
'\n'
'result = df\n'
'result[pred_col] = pd.DataFrame(predictions, columns=[pred_col])'
'\n'
;
samples | evaluate python(typeof(*), code, kwargs)
};
OccupancyDetection
| where Test == 1
| extend pred_Occupancy=bool(0)
| invoke predict_onnx_fl(ML_Models, 'ONNX-Occupancy', pack_array('Temperature', 'Humidity', 'Light', 'CO2', 'HumidityRatio'), 'pred_Occupancy')
'''
%kql pred_df << -query scoring_from_table_query
pred_df[-4:]
Timestamp |
Temperature |
Humidity |
Light |
CO2 |
HumidityRatio |
Occupancy |
Test |
pred_Occupancy |
|
12413 |
2015-02-18 09:16:00+00:00 |
20.865 |
27.7450 |
423.50 |
1514.5 |
0.004230 |
True |
True |
True |
12414 |
2015-02-18 09:16:00+00:00 |
20.890 |
27.7450 |
423.50 |
1521.5 |
0.004237 |
True |
True |
True |
12415 |
2015-02-18 09:17:00+00:00 |
20.890 |
28.0225 |
418.75 |
1632.0 |
0.004279 |
True |
True |
True |
12416 |
2015-02-18 09:19:00+00:00 |
21.000 |
28.1000 |
409.00 |
1864.0 |
0.004321 |
True |
True |
True |
print('Confusion Matrix')
pred_df.groupby(['Occupancy', 'pred_Occupancy']).size()
Confusion Matrix
Occupancy pred_Occupancy
False False 9284
True 112
True False 15
True 3006
For further details have a look at predict_onnx_fl() in ADX functions library
Copy the model to blob
Note again that the blob container should be whitelisted to be accessible by ADX Python sandbox
from azure.storage.blob import BlobClient
conn_str = "BlobEndpoint=https://artifcatswestus.blob.core.windows.net/kusto;SharedAccessSignature=?**** YOUR SAS KEY ****"
blob_client = BlobClient.from_connection_string(conn_str, container_name="ONNX", blob_name="room_occupancy.onnx")
res = blob_client.upload_blob(bmodel, overwrite=True)
# Notes:
#
# Replace the string "**** YOUR SAS KEY ****" below with the real SAS
# We run ADX scoring query here using KqlMagic by embedding the query from Kusto Explorer
# with r'''Kusto Explorer query''':
scoring_from_blob_query = r'''
let predict_onnx_blob_fl=(samples:(*), model_sas:string, features_cols:dynamic, pred_col:string)
{
let kwargs = pack('features_cols', features_cols, 'pred_col', pred_col);
let code =
'\n'
'features_cols = kargs["features_cols"]\n'
'pred_col = kargs["pred_col"]\n'
'\n'
'import onnxruntime as rt\n'
'sess = rt.InferenceSession(r"C:\\Temp\\model.onnx")\n'
'input_name = sess.get_inputs()[0].name\n'
'label_name = sess.get_outputs()[0].name\n'
'df1 = df[features_cols]\n'
'predictions = sess.run([label_name], {input_name: df1.values.astype(np.float32)})[0]\n'
'\n'
'result = df\n'
'result[pred_col] = pd.DataFrame(predictions, columns=[pred_col])'
'\n'
;
samples | evaluate python(typeof(*), code, kwargs,
external_artifacts=pack('model.onnx', model_sas))
};
OccupancyDetection
| where Test == 1
| extend pred_Occupancy=bool(0)
| invoke predict_onnx_blob_fl('https://artifcatswestus.blob.core.windows.net/kusto/ONNX/room_occupancy.onnx?**** YOUR SAS KEY ****',
pack_array('Temperature', 'Humidity', 'Light', 'CO2', 'HumidityRatio'), 'pred_Occupancy')
'''
%kql pred_df << -query scoring_from_blob_query
pred_df[-4:]
Timestamp |
Temperature |
Humidity |
Light |
CO2 |
HumidityRatio |
Occupancy |
Test |
pred_Occupancy |
|
12413 |
2015-02-18 09:16:00+00:00 |
20.865 |
27.7450 |
423.50 |
1514.5 |
0.004230 |
True |
True |
True |
12414 |
2015-02-18 09:16:00+00:00 |
20.890 |
27.7450 |
423.50 |
1521.5 |
0.004237 |
True |
True |
True |
12415 |
2015-02-18 09:17:00+00:00 |
20.890 |
28.0225 |
418.75 |
1632.0 |
0.004279 |
True |
True |
True |
12416 |
2015-02-18 09:19:00+00:00 |
21.000 |
28.1000 |
409.00 |
1864.0 |
0.004321 |
True |
True |
True |
print('Confusion Matrix')
pred_df.groupby(['Occupancy', 'pred_Occupancy']).size()
Confusion Matrix
Occupancy pred_Occupancy
False False 9284
True 112
True False 15
True 3006
In this tutorial we showed how to train a model in Scikit-learn, convert it to ONNX format and export it to ADX for scoring. This workflow is convenient as
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.