In many use cases Machine Learning models are built and applied over data that is stored and managed by Azure Data Explorer (ADX). Typical tasks can be fraud detection, identifying malicious attacks, predicting device failure, predicting capacity usage, recommendations for shopping or entertainment, medical diagnosis and many more. Most ML models are built and deployed in two steps:
ML Training is a long and iterative process. Commonly, developing a model starts by researchers/data scientists. They fetch the training data, clean it, engineer features, try different models and tune parameters, repeating this cycle until the ML model meets the required accuracy and robustness. Once this phase is done, software engineers takes the ML algorithm to be implemented in production code and deployed. Azure Machine Learning (AML) service is a great solution for managing and authoring the e2e process of ML models development, deployment and monitoring, aka ML Ops.
ML Scoring is the process of applying the model on new data to get insights and decision making. Scoring usually needs to be done at scale with minimal latency, processing large sets of new records. For ADX users the best solution for scoring data is directly in ADX. ADX scoring is done on its compute nodes, in distributed manner near the data, thus achieving the best performance with minimal latency.
ADX supports running Python code embedded in Kusto Query Language (KQL) using the python() plugin. The Python code is run in multiple sandboxes on ADX existing compute nodes. The Python image is based on Anaconda distribution and contains the most common ML frameworks including Scikit-learn, TensorFlow, Keras and PyTorch. To score AML models in ADX follow these steps:
We build a model to predict room occupancy based on Occupancy Detection data, a public dataset from UCI Repository. This model is a binary classifier to predict occupied/empty room based on Temperature, Humidity, Light and CO2 sensors measurements. The complete process can be found in this Jupyter notebook. Here we embed few snips just to present the main concepts
ws = Workspace.create("AML-ADX", "your subscription", "your RG", location = "westus", exist_ok = True)
exp = Experiment(ws, name="Prediction-Occupancy")
compute_name = "cpu-cluster"
vm_sku = "STANDARD_D2_V2"
if compute_name in ws.compute_targets:
compute_target = ws.compute_targets[compute_name]
if compute_target and type(compute_target) is AmlCompute:
print("found compute target: " + compute_name)
else:
print("creating new compute target...")
provisioning_config = AmlCompute.provisioning_configuration(vm_size=vm_sku, min_nodes=1,max_nodes=2)
compute_target = ComputeTarget.create(ws, compute_name, provisioning_config)
compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=10)
found compute target: cpu-cluster
from azure.storage.blob import BlockBlobService # v2.1
block_blob_service = BlockBlobService(account_name=aml_storage_account, sas_token=aml_sas_token)
block_blob_service.get_blob_to_path(aml_container_name, data_blob_name, 'data.csv')
df = pd.read_csv('data.csv')
df[-4:]
Timestamp |
Temperature |
Humidity |
Light |
CO2 |
HumidityRatio |
Occupancy |
Test |
|
20556 |
2015-02-18 09:16:00.0000000 |
20.865 |
27.7450 |
423.50 |
1514.5 |
0.004230 |
True |
True |
20557 |
2015-02-18 09:16:00.0000000 |
20.890 |
27.7450 |
423.50 |
1521.5 |
0.004237 |
True |
True |
20558 |
2015-02-18 09:17:00.0000000 |
20.890 |
28.0225 |
418.75 |
1632.0 |
0.004279 |
True |
True |
20559 |
2015-02-18 09:19:00.0000000 |
21.000 |
28.1000 |
409.00 |
1864.0 |
0.004321 |
True |
True |
run = exp.submit(config=est)run
Experiment |
Id |
Type |
Status |
Prediction-Occupancy |
Prediction-Occupancy_1587550546_0dd38412 |
azureml.scriptrun |
Starting |
run.wait_for_completion(show_output=True)
RunId: Prediction-Occupancy_1587550546_0dd38412
Streaming azureml-logs/70_driver_log.txt
========================================
Trimmed...
Accuracy: 0.8571 (+/- 0.1219) [Decision Tree]
Accuracy: 0.9887 (+/- 0.0071) [Logistic Regression]
Accuracy: 0.9656 (+/- 0.0224) [K Nearest Neighbour]
Accuracy: 0.8893 (+/- 0.1265) [Naive Bayes]
The experiment completed successfully. Finalizing run...
model_path = model.download(exist_ok=True)
adx_storage_account = "artifcatswestus"
adx_container_name = "kusto/AML"
model_blob_name = model_name + '.pkl'
adx_sas_token = "?********"
block_blob_service = BlockBlobService(account_name=adx_storage_account, sas_token=adx_sas_token)
block_blob_service.create_blob_from_path(adx_container_name, model_blob_name, model_path)
uri = f'https://{adx_storage_account}.blob.core.windows.net/{adx_container_name}/{model_blob_name}{adx_sas_token}'
Score in ADX from Jupyter notebook using KqlMagic
scoring_from_blob_query = r'''
let classify_sf=(samples:(*), model_sas:string, features_cols:dynamic, pred_col:string)
{
let kwargs = pack('model_sas', model_sas, 'features_cols', features_cols, 'pred_col', pred_col);
let code =
'import pickle\n'
'model_sas = kargs["model_sas"]\n'
'features_cols = kargs["features_cols"]\n'
'pred_col = kargs["pred_col"]\n'
'with open("/Temp/model.pkl", "rb") as f:\n'
' bmodel = f.read()\n'
'clf1 = pickle.loads(bmodel)\n'
'df1 = df[features_cols]\n'
'predictions = clf1.predict(df1)\n'
'result = df\n'
'result[pred_col] = pd.DataFrame(predictions, columns=[pred_col])';
samples | evaluate python(typeof(*), code, kwargs, external_artifacts=pack('model.pkl', model_sas))
};
OccupancyDetection
| where Test == 1
| extend pred_Occupancy=bool(0)
| invoke classify_sf('$model_uri$',
pack_array('Temperature', 'Humidity', 'Light', 'CO2', 'HumidityRatio'), 'pred_Occupancy')
| summarize n=count() by Occupancy, pred_Occupancy // confusion matrix
'''
scoring_from_blob_query = scoring_from_blob_query.replace('$model_uri$', uri)
%kql res << -query scoring_from_blob_query
df = res.to_dataframe()
print('Confusion Matrix')
Occupancy |
pred_Occupancy |
n |
|
0 |
True |
True |
3006 |
1 |
False |
True |
112 |
2 |
True |
False |
15 |
3 |
False |
False |
9284 |
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.