Cloud resources and services requirements
Distributed ML training for click-through rate prediction using Dask
Distributed ML training for click-through rate prediction using the Terabyte Click Logs dataset
Create a delegated subnet for Azure NetApp Files
Azure NetApp Files configuration
Azure Netapp Files performance tiers
Dynamically change the service level of a volume
Automate performance tier change
Setting up Azure Kubernetes Service
Peer AKS VNet and Azure NetApp Files VNet
Set up Azure NetApp Files back-end and storage class
Setting up Dask with RAPIDS deployment on AKS using Helm
Click through rate prediction data processing and model training
Libraries for data processing and model training
Load Criteo Click Logs day 15 in Pandas and train a scikit-learn random forest model
Load Day 15 in Dask and train a Dask cuML random forest model
Monitor Dask using native Task Streams dashboard
Monitor Dask and RAPIDS with Prometheus and Grafana
Dataset and model versioning using NetApp DataOps Toolkit
Jupyter notebooks for reference
The work of a data scientist should be focused on the training and tuning of machine learning (ML) and artificial intelligence (AI) models. However, according to research by Google, data scientists spend approximately 80% of their time figuring out how to make their models work with enterprise applications and run at scale.
Co-authors: Rick Huang, Verron Martina, Muneer Ahmad (NetApp)
The work of a data scientist should be focused on the training and tuning of machine learning (ML) and artificial intelligence (AI) models. However, according to research by Google, data scientists spend approximately 80% of their time figuring out how to make their models work with enterprise applications and run at scale.
To manage end-to-end AI/ML projects, a wider understanding of enterprise components is needed. Although DevOps have taken over the definition, integration, and deployment, these types of components, ML operations target a similar flow that includes AI/ML projects. To get an idea of what an end-to-end AI/ML pipeline touches in the enterprise, see the following list of required components:
The world of data science touches multiple disciplines in IT and business:
In this article, we describe how Azure NetApp Files, RAPIDS AI, Dask, and Azure help each of these roles bring value to business.
This solution follows the lifecycle of an AI/ML application. We start with the work of data scientists to define the different steps needed to prepare data and train models. By leveraging RAPIDS on Dask, we perform distributed training across the Azure Kubernetes Service (AKS) cluster to drastically reduce the training time when compared to the conventional Python scikit-learn approach. To complete the full cycle, we integrate the pipeline with Azure NetApp Files.
Azure NetApp Files provides various performance tiers. Customers can start with a standard tier and scale out and scale up to a high-performance tier non-disruptively without moving any data. This capability enables data scientists to train models at scale without any performance issues, avoiding any data silos across the cluster, as shown in figure below.
This reference architecture gives IT organizations the following advantages:
Dask is an open-source, parallel computing tool that scales Python libraries on multiple machines and provides faster processing of large amounts of data. It provides an API similar to single-threaded conventional Python libraries, such as Pandas, Numpy, and scikit-learn. As a result, native Python users are not forced to change much in their existing code to use resources across the cluster.
NVIDIA RAPIDS is a suite of open-source libraries that makes it possible to run end-to-end ML and data analytics workflows entirely on GPUs. Together with Dask, it enables you to easily scale from GPU workstation (scale up) to multinode, multi-GPU clusters (scale out).
For deploying Dask on a cluster, you could use Kubernetes for resource orchestration. You could also scale up or scale down the worker nodes as per the process requirement, which in-turn can help to optimize the cluster resource consumption, as shown in the following figure.
The Azure NetApp Files service is an enterprise-class, high-performance, metered file storage service. Azure NetApp Files supports any workload type and is highly available by default. You can select service and performance levels and set up Snapshot copies through the service. Azure NetApp Files is an Azure first-party service for migrating and running the most demanding enterprise-file workloads in the cloud, including databases, SAP, and high-performance computing applications with no code changes.
Resource |
Minimum Type/version |
Azure Kubernetes Service |
1.18.14 |
Agent node |
3x Standard_DS2_v2 |
GPU node |
3x Standard_NC6s_v3 |
Azure NetApp Files |
Standard capacity pool |
Capacity in TiB |
2 |
The following table lists the software requirements needed for this solution.
Software |
Minimum Version |
Azure Kubernetes Service |
1.18.14 |
RAPIDS and Dask container image |
Repository: "rapidsai/rapidsai" Tag: 0.17-cuda11.0-runtime-ubuntu18.04 |
Astra Trident |
21.01.1 |
Helm |
3.0.0 |
This section provides details on setting up the platform for performing lane detection distributed training at scale using Dask with RAPIDS on Azure Kubernetes Service. We discuss installation of all the solution elements and running the distributed training job on the said platform.
This use case is based on the publicly available Terabyte Click Logs dataset from Criteo AI Lab. With the recent advances in ML platforms and applications, a lot of attention is now on learning at scale.
The click-through rate (CTR) is defined as the average number of click-throughs per hundred online ad impressions (expressed as a percentage). It is widely adopted as a key metric in various industry verticals and use cases, including digital marketing, retail, e-commerce, and service providers. Examples of using CTR as an important metric for potential customer traffic include the following:
In the context of digital marketing, Criteo Terabyte Click Logs are now the dataset of reference in assessing the scalability of ML platforms and algorithms. By predicting the click-through rate, an advertiser can select the visitors who are most likely to respond to the ads, analyse their browsing history, and show the most relevant ads based on the interests of the user.
The solution provided in this technical report highlights the following benefits:
An end-to-end workflow built on RAPIDS AI and Azure NetApp Files demonstrates the drastic improvement in random forest model training time by two orders of magnitude. This improvement is significant comparing to the conventional Pandas approach when dealing with real-world click logs with 45GB of structured tabular data (on average) each day. This is equivalent to a DataFrame containing roughly twenty billion rows. We will demonstrate cluster environment setup, framework and library installation, data loading and processing, conventional versus distributed training, visualization, and monitoring, and compare critical end-to-end runtime results in this technical report.
As organizations continue to embrace the scalability and flexibility of cloud-based solutions, Azure NetApp Files (ANF) has emerged as a powerful managed file storage service in Azure. ANF provides enterprise-grade file shares that are highly performant and integrate seamlessly with existing applications and workflows.
In this section, we will delve into two crucial aspects of leveraging the full potential of Azure NetApp Files: the creation of a delegated subnet and the initial configuration tasks. By following these steps, organizations can optimize their ANF deployment, enabling efficient data management and enhanced collaboration.
Firstly, we will explore the process of creating a delegated subnet, which plays a pivotal role in establishing a secure and isolated network environment for ANF. This delegated subnet ensures that ANF resources are efficiently isolated from other network traffic, providing an additional layer of protection and control.
Subsequently, we will discuss the initial configuration tasks necessary to set up Azure NetApp Files effectively. This includes key considerations such as setting up a NetApp account, and provisioning an ANF capacity pool.
By following these steps, administrators can streamline the deployment process and ensure smooth integration with existing infrastructure.
To create a delegated subnet for Azure NetApp Files, complete the following steps:
Configure Azure NetApp Files as described in QuickStart: Set up Azure NetApp Files and create an NFS volume.
You can skip the section “Create NFS volume for Azure NetApp Files” because you are going to create volumes through Astra Trident. Before continuing, complete the following steps:
The Dask cluster and Azure NetApp Files must be in the same Azure Virtual Network (VNet) or a peered VNet (see later in this document).
Azure NetApp Files volumes are allocated to the application cluster and are consumed as persistent volume claims (PVCs) in Kubernetes. In turn, this process provides you the flexibility to map them to different services, such as Jupyter notebooks, serverless functions, and so on.
Users of services can consume storage from the platform in many ways. As this technical report discusses NFS, the main benefits of Azure NetApp Files are:
You can change the service level of an existing volume by moving the volume to another capacity pool that uses the service level you want for the volume. This solution enables customers to start with a small dataset and small number of GPUs in standard tier and scale out or scale up to premium tier as the amount of data and GPUs increase. The premium tier offers four times the throughput per terabyte as the Standard Tier, and scale up is performed without having to move any data to change the service level of a volume.
To dynamically change the service level of a volume, complete the following steps:
The following options are available to automate performance tier changes:
az netappfiles volume pool-change -g mygroup --account-name myaccname --pool-name mypoolname --name myvolname --new-pool-resource-id mynewresourceid
Set-AzNetAppFilesVolumePool
-ResourceGroupName "MyRG"
-AccountName "MyAnfAccount"
-PoolName "MyAnfPool"
-Name "MyAnfVolume"
-NewPoolResourceId 7d6e4069-6c78-6c61-7bf6-c60968e45fbf
To install and set up the AKS cluster, see the webpage Create an AKS Cluster and then complete the following steps:
az account set –subscription xxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxxxx
aks get-credentials –resource-group resourcegroup –name aksclustername
kubectl get nodes
To peer the AKS VNet to the Azure NetApp Files VNet, complete the following steps:
For more information, see Create, change, or delete a virtual network peering.
Astra Trident is an open-source project that NetApp maintains for application container persistent storage. Astra Trident has been implemented as an external provisioner controller that runs as a pod itself, monitoring volumes and completely automating the provisioning process.
Astra Trident enables smooth integration with K8s by creating and attaching persistent volumes for storing training datasets and trained models. This capability makes it easier for data scientists and data engineers to use K8s without the hassle of manually storing and managing datasets. Trident also eliminates the need for data scientists to learn managing new data platforms as it integrates the data management-related tasks through the logical API integration.
To install Trident using Helm, complete the following steps:
$cd helm
$helm install trident trident-operator-21.01.1.tgz --namespace trident --create-namespace
$kubectl -n trident get pods
If all the pods are up and running, then Trident is installed and you can move forward.
To set up Azure NetApp Files back-end and storage class, complete the following steps:
$az ad sp create-for-rbac --name ""
The output should look like the following example:
{
"appId": "xxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
"displayName": "netapptrident",
"name": "",
"password": "xxxxxxxxxxxxxxx.xxxxxxxxxxxxxx",
"tenant": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxx"
}
{
"version": 1,
"storageDriverName": "azure-netapp-files",
"subscriptionID": "fakec765-4774-fake-ae98-a721add4fake",
"tenantID": "fakef836-edc1-fake-bff9-b2d865eefake",
"clientID": "fake0f63-bf8e-fake-8076-8de91e57fake",
"clientSecret": "SECRET",
"location": "westeurope",
"serviceLevel": "Standard",
"virtualNetwork": "anf-vnet",
"subnet": "default",
"nfsMountOptions": "vers=3,proto=tcp",
"limitVolumeSize": "500Gi",
"defaults": {
"exportRule": "0.0.0.0/0",
"size": "200Gi"
}
$tridentctl create backend -f anf-backend.json -n trident
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: azurenetappfiles
provisioner: netapp.io/trident
parameters:
backendType: "azure-netapp-files"
$kubectl create -f anf-storage-class.yaml
kubectl get sc azurenetappfiles
To set up Dask with RAPIDS deployment on AKS using Helm, complete the following steps:
kubectl create namespace rapids-dask
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
name: pvc-criteo-data
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 1000Gi
storageClassName: azurenetappfiles
kubectl -n rapids-dask apply -f <your yaml file>
git clone https://github.com/rapidsai/helm-chart helm-chart
cd helm-chart/rapidsai
dask:
…
worker:
name: worker
…
mounts:
volumes:
- name: data
persistentVolumeClaim:
claimName: pvc-criteo-data
volumeMounts:
- name: data
mountPath: /data
…
jupyter:
name: jupyter
…
mounts:
volumes:
- name: data
persistentVolumeClaim:
claimName: pvc-criteo-data
volumeMounts:
- name: data
mountPath: /data
…
cd ..
helm dep update rapidsai
helm install rapids-dask --namespace rapids-dask rapidsai
The following table lists the libraries and frameworks that were used to build this task. All these components have been fully integrated with Azure’s role-based access and security controls.
Libraries/framework |
Description |
Dask cuML
|
For ML to work on GPU, the cuML library provides access to the RAPIDS cuML package with Dask. RAPIDS cuML implements popular ML algorithms, including clustering, dimensionality reduction, and regression approaches, with high-performance GPU-based implementations, offering speed-ups of up to 100x over CPU-based approaches. |
Dask cuDF
|
cuDF includes various other functions supporting GPU-accelerated extract, transform, load (ETL), such as data subsetting, transformations, one-hot encoding, and more. The RAPIDS team maintains a dask-cudf library that includes helper methods to use Dask and cuDF. |
Scikit Learn
|
Scikit-learn provides dozens of built-in machine learning algorithms and models, called estimators. Each estimator can be fitted to some data using its fit method. |
We used two notebooks to construct the ML pipelines for comparison; one is the conventional Pandas scikit-learn approach, and the other is distributed training with RAPIDS and Dask. Each notebook can be tested individually to see the performance in terms of time and scale. We cover each notebook individually to demonstrate the benefits of distributed training using RAPIDS and Dask.
This section describes how we used Pandas and Dask DataFrames to load Click Logs data from the Criteo Terabyte dataset. The use case is relevant in digital advertising for ad exchanges to build users’ profiles by predicting whether ads will be clicked or if the exchange isn’t using an accurate model in an automated pipeline.
We loaded day 15 data from the Click Logs dataset, totaling 45GB. Running the following cell in Jupyter notebook CTR-PandasRF-collated.ipynb creates a Pandas DataFrame that contains the first 50 million rows and generates a scikit-learn random forest model.
%%time
import pandas as pd
import numpy as np
header = ['col'+str(i) for i in range (1,41)] #note that according to criteo, the first column in the dataset is Click Through (CT). Consist of 40 columns
first_row_taken = 50_000_000 # use this in pd.read_csv() if your compute resource is limited.
# total number of rows in day15 is 20B
# take 50M rows
"""
Read data & display the following metrics:
Total number of rows per day
df loading time in the cluster
Train a random forest model
"""
df = pd.read_csv(file, nrows=first_row_taken, delimiter='\t', names=header)
# take numerical columns
df_sliced = df.iloc[:, 0:14]
# split data into training and Y
Y = df_sliced.pop('col1') # first column is binary (click or not)
# change df_sliced data types & fillna
df_sliced = df_sliced.astype(np.float32).fillna(0)
from sklearn.ensemble import RandomForestClassifier
# Random Forest building parameters
# n_streams = 8 # optimization
max_depth = 10
n_bins = 16
n_trees = 10
rf_model = RandomForestClassifier(max_depth=max_depth, n_estimators=n_trees)
rf_model.fit(df_sliced, Y)
To perform prediction by using a trained random forest model, run the following paragraph in this notebook. We took the last one million rows from day 15 as the test set to avoid any duplication. The cell also calculates accuracy of prediction, defined as the percentage of occurrences the model accurately predicts whether a user clicks an ad or not. To review any unfamiliar components in this notebook, see the official scikit-learn documentation.
# testing data, last 1M rows in day15
test_file = '/data/day_15_test'
with open(test_file) as g:
print(g.readline())
# dataFrame processing for test data
test_df = pd.read_csv(test_file, delimiter='\t', names=header)
test_df_sliced = test_df.iloc[:, 0:14]
test_Y = test_df_sliced.pop('col1')
test_df_sliced = test_df_sliced.astype(np.float32).fillna(0)
# prediction & calculating error
pred_df = rf_model.predict(test_df_sliced)
from sklearn import metrics
# Model Accuracy
print("Accuracy:",metrics.accuracy_score(test_Y, pred_df))
In a manner similar to the previous section, load Criteo Click Logs day 15 in Pandas and train a scikit-learn random forest model. In this example, we performed DataFrame loading with Dask cuDF and trained a random forest model in Dask cuML. We compared the differences in training time and scale in the section Training time comparison.
This notebook imports numpy, cuml, and the necessary dask libraries, as shown in the following example:
import cuml
from dask.distributed import Client, progress, wait
import dask_cudf
import numpy as np
import cudf
from cuml.dask.ensemble import RandomForestClassifier as cumlDaskRF
from cuml.dask.common import utils as dask_utils
Initiate Dask Client().
client = Client()
If your cluster is configured correctly, you can see the status of worker nodes.
client
workers = client.has_what().keys()
n_workers = len(workers)
n_streams = 8 # Performance optimization
In our AKS cluster, the following status is displayed:
Note that Dask employs the lazy execution paradigm: rather than executing the processing code instantly, Dask builds a Directed Acyclic Graph (DAG) of execution instead. DAG contains a set of tasks and their interactions that each worker needs to run. This layout means the tasks do not run until the user tells Dask to execute them in one way or another. With Dask you have three main options:
Therefore, unless the user calls either of these actions, the workers sit idle waiting for the scheduler to initiate the processing. This lazy execution paradigm is common in modern parallel and distributed computing frameworks such as Apache Spark.
The following paragraph trains a random forest model by using Dask cuML for distributed GPU-accelerated computing and calculates model prediction accuracy.
Adsf
# Random Forest building parameters
n_streams = 8 # optimization
max_depth = 10
n_bins = 16
n_trees = 10
cuml_model = cumlDaskRF(max_depth=max_depth, n_estimators=n_trees, n_bins=n_bins, n_streams=n_streams, verbose=True, client=client)
cuml_model.fit(gdf_sliced_small, Y)
# Model prediction
pred_df = cuml_model.predict(gdf_test)
# calculate accuracy
cu_score = cuml.metrics.accuracy_score( test_y, pred_df )
The Dask distributed scheduler provides live feedback in two forms:
In our case, the following figure shows how you can monitor the task progress, including Bytes Stored, the Task Stream with a detailed breakdown of the number of streams, and Progress by task names with associated functions executed. In our case, because we have three worker nodes, there are three main chunks of stream and the color codes denote different tasks within each stream.
You have the option to analyze individual tasks and examine the execution time in milliseconds or identify any obstacles or hindrances. For example, the following figure shows the Task Streams for the random forest model fitting stage. There are considerably more functions being executed, including unique chunk for DataFrame processing, _construct_rf for fitting the random forest, and so on. Most of the time was spent on DataFrame operations due to the large size (45GB) of one day of data from the Criteo Click Logs.
This section compares the model training time using conventional Pandas compared to Dask. For Pandas, we loaded a smaller amount of data due to the nature of slower processing time to avoid memory overflow. Therefore, we interpolated the results to offer a fair comparison.
The following table shows the raw training time comparison when there is significantly less data used for the Pandas random forest model (50 million rows out of 20 billion per day15 of the dataset). This sample is only using less than 0.25% of all available data. Whereas for Dask-cuML we trained the random forest model on all 20 billion available rows. The two approaches yielded comparable training time.
Approach |
Training time |
Scikit-learn: Using only 50M rows in day15 as the training data |
47 minutes and 21 seconds |
RAPIDS-Dask: Using all 20B rows in day15 as the training data |
1 hour, 12 minutes, and 11 seconds |
If we interpolate the training time results linearly, as shown in the following table, there is a significant advantage to using distributed training with Dask. It would take the conventional Pandas scikit-learn approach 13 days to process and train 45GB of data for a single day of click logs, whereas the RAPIDS-Dask approach processes the same amount of data 262.39 times faster.
Approach |
Training time |
Scikit-learn: Using all 20B rows in day15 as the training data |
13 days, 3 hours, 40 minutes, and 11 seconds |
RAPIDS-Dask: Using all 20B rows in day15 as the training data |
1 hour, 12 minutes, and 11 seconds |
In the previous table, you can see that by using RAPIDS with Dask to distribute the data processing and model training across multiple GPU instances, the run time is significantly shorter compared to conventional Pandas DataFrame processing with scikit-learn model training. This framework enables scaling up and out in the cloud as well as on-premises in a multinode, multi-GPU cluster.
After everything is deployed, run inferences on new data. The models predict whether a user clicks an ad based on browsing activities. The results of the prediction are stored in a Dask cuDF. You can monitor the results with Prometheus and visualize them in Grafana dashboards.
For more information, see this RAPIDS AI Medium post.
The NetApp DataOps Toolkit for Kubernetes abstracts storage resources and Kubernetes workloads up to the data-science workspace level. These capabilities are packaged in a simple, easy-to-use interface that is designed for data scientists and data engineers. Using the familiar form of a Python program, the Toolkit enables data scientists and engineers to provision and destroy JupyterLab workspaces in just seconds. These workspaces can contain terabytes, or even petabytes, of storage capacity, enabling data scientists to store all their training datasets directly in their project workspaces. Gone are the days of separately managing workspaces and data volumes.
For more information, visit the Toolkit’s GitHub repository.
There are two Jupyter notebooks associated with this technical report:
Azure NetApp Files, RAPIDS, and Dask speed up and simplify the deployment of large-scale ML processing and training by integrating with orchestration tools such as Docker and Kubernetes. By unifying the end-to-end data pipeline, this solution reduces the latency and complexity inherent in many advanced computing workloads, effectively bridging the gap between development and operations. Data scientists can run queries on large datasets and securely share data and algorithmic models with other users during the training phase.
When building your own AI/ML pipelines, configuring the integration, management, security, and accessibility of the components in an architecture is a challenging task. Giving developers access and control of their environment presents another set of challenges.
By building an end-to-end distributed training model and data pipeline in the cloud, we demonstrated two orders of magnitude improvement in total workflow completion time versus a conventional, open-source approach that did not leverage GPU-accelerated data processing and compute frameworks.
The combination of NetApp, Microsoft, opens-source orchestration frameworks, and NVIDIA brings the latest technologies together as managed services with great flexibility to accelerate technology adoption and improve the time to market for new AI/ML applications. These advanced services are delivered in a cloud-native environment that can be easily ported for on-premises as well as hybrid deployment architectures.
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.