Ray and Dask are two among the most popular frameworks to parallelize and scale Python computation. They are very helpful to speed up computing for data processing, hyperparameter tunning, reinforcement learning and model serving and many other scenarios.
For an Azure ML compute instance, we can easily install Ray and Dask to take advantage of parallel computing for all cores within the node. However, there is yet an easy way in Azure Machine Learning to extend this to a multi-node cluster when the computing and ML problems require the power of more than one nodes. One would need to setup a separate environment using VMs or K8s outside Azure ML to run multi-node Ray/Dask. This would mean losing all capabilities of Azure ML.
To address this gap, we have developed a library that can easily turn Azure ML compute instance and compute cluster into Ray and Dask cluster. The library does all the complex wirings and setup of a Ray cluster with Dask behind the scene while exposing a simple Ray context object for users perform parallel Python computing tasks. In addition, it is shipped with high performance Pyarrow APIs to access Azure storage and simple interface to install additional libraries.
The library also comes with support for both Interactive mode and job mode. Data scientist can perform fast interactive work with the cluster during exploratory phase then easily turn the code into the job mode with minimal change.
Checkout library repo at https://github.com/microsoft/ray-on-aml for details.
In this post, we'll walk through steps to setup and use the library
Installation of the library
- Prepare compute environment
For Interactive use at your compute instance, create a compute cluster in the same vnet where your compute instance is.
Check list
[ ] Azure Machine Learning Workspace
[ ] Virtual network/Subnet
[ ] Create Compute Instance in the Virtual Network
[ ] Create Compute Cluster in the same Virtual Network
Use azureml_py38
conda environment from (Jupyter) Notebook
in Azure Machine Learning Studio.
Again, don't forget to create the virtual network for Compute Instance and Compute Cluster. Without it, they cannot communicate to each other.
2. Install library
pip install --upgrade ray-on-aml
Installing this library will also install ray[default]==1.9.1, pyarrow>= 5.0.0, dask[complete]==2021.12.0, adlfs==2021.10.0 and fsspec==2021.10.1
3. Use cluster in interactive mode
Run in interactive mode in compute instance's notebook. Notice the option ci_is_head to enable your current CI as head node.
from ray_on_aml.core import Ray_On_AML
ws = Workspace.from_config()
ray_on_aml =Ray_On_AML(ws=ws, compute_cluster =NAME_OF_COMPUTE_CLUSTER, additional_pip_packages=['torch==1.10.0', 'torchvision', 'sklearn'], maxnode=4)
ray = ray_on_aml.getRay()
# Note that by default, ci_is_head=True which means compute instance as head node and all nodes in the remote compute cluster as workers
# But if you want to use one of the nodes in the remote AML compute cluster is used as head node and the remaining are worker nodes.
# then simply specify ray = ray_on_aml.getRay(ci_is_head=False)
# To install additional library, use additional_pip_packages and additional_conda_packages parameters.
At this point, you have the ray object where you can use to perform various parallel computing tasks using ray API.
* Advanced usage:There are two arguments to Ray_On_AML() object initilization with to specify base configuration for the library with following default values. Although it's possible, you should not change the default values of base_conda_dep and base_pip_dep as it may break the package. Only do so when you need to customize the cluster default configuration such as ray version.
Ray_On_AML(ws=ws, compute_cluster ="Name_of_Compute_Cluster",base_conda_dep =['adlfs==2021.10.0','pip'],base_pip_dep = ['ray[tune]==1.9.1', 'xgboost_ray==0.1.5', 'dask==2021.12.0','pyarrow >= 5.0.0','fsspec==2021.10.1'])
4. Use the cluster in job mode
For use in an AML job, simply include ray_on_aml as a pip dependency then inside your script, do this to get ray
from ray_on_aml.core import Ray_On_AML
ray_on_aml =Ray_On_AML()
ray = ray_on_aml.getRay()
if ray: #in the headnode
#logic to use Ray for distributed ML training, tunning or distributed data transformation with Dask
else:
print("in worker node")
Example scenarios
- Perform big data analysis with Dask on Ray
from adlfs import AzureBlobFileSystem
abfs = AzureBlobFileSystem(account_name="azureopendatastorage", container_name="isdweatherdatacontainer")
data = ray.data.read_parquet(["az://isdweatherdatacontainer/ISDWeather/year=2012/"], filesystem=abfs)
data1 = ray.data.read_parquet(["az://isdweatherdatacontainer/ISDWeather/year=2015/"], filesystem=abfs)
data2 = ray.data.read_parquet(["az://isdweatherdatacontainer/ISDWeather/year=2010/"], filesystem=abfs)
data3 = ray.data.read_parquet(["az://isdweatherdatacontainer/ISDWeather/year=2009/"], filesystem=abfs)
data4 = ray.data.read_parquet(["az://isdweatherdatacontainer/ISDWeather/year=2011/"], filesystem=abfs)
data5 = ray.data.read_parquet(["az://isdweatherdatacontainer/ISDWeather/year=2013/"], filesystem=abfs)
data6 = ray.data.read_parquet(["az://isdweatherdatacontainer/ISDWeather/year=2014/"], filesystem=abfs)
all_data =data.union(data1).union(data2).union(data3).union(data4).union(data5).union(data6)
print(all_data.count())
all_data_dask = data.to_dask().describe().compute()
print(all_data_dask)
2. Distributed hypeparam tunning with ray.tune
import sklearn.datasets
import sklearn.metrics
from sklearn.model_selection import train_test_split
import xgboost as xgb
from ray import tune
def train_breast_cancer(config):
# Load dataset
data, labels = sklearn.datasets.load_breast_cancer(return_X_y=True)
# Split into train and test set
train_x, test_x, train_y, test_y = train_test_split(
data, labels, test_size=0.25)
# Build input matrices for XGBoost
train_set = xgb.DMatrix(train_x, label=train_y)
test_set = xgb.DMatrix(test_x, label=test_y)
# Train the classifier
results = {}
xgb.train(
config,
train_set,
evals=[(test_set, "eval")],
evals_result=results,
verbose_eval=False)
# Return prediction accuracy
accuracy = 1. - results["eval"]["error"][-1]
tune.report(mean_accuracy=accuracy, done=True)
config = {
"objective": "binary:logistic",
"eval_metric": ["logloss", "error"],
"max_depth": tune.randint(1, 9),
"min_child_weight": tune.choice([1, 2, 3]),
"subsample": tune.uniform(0.5, 1.0),
"eta": tune.loguniform(1e-4, 1e-1)
}
analysis = tune.run(
train_breast_cancer,
resources_per_trial={"cpu": 1},
config=config,
num_samples=10)
3. Distributed XGBoost
from xgboost_ray import RayXGBClassifier, RayParams
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split
seed = 42
X, y = load_breast_cancer(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(
X, y, train_size=0.25, random_state=42
)
clf = RayXGBClassifier(
n_jobs=4, # In XGBoost-Ray, n_jobs sets the number of actors
random_state=seed
)
# scikit-learn API will automatically conver the data
# to RayDMatrix format as needed.
# You can also pass X as a RayDMatrix, in which case
# y will be ignored.
clf.fit(X_train, y_train)
pred_ray = clf.predict(X_test)
print(pred_ray.shape)
pred_proba_ray = clf.predict_proba(X_test)
print(pred_proba_ray.shape)
# It is also possible to pass a RayParams object
# to fit/predict/predict_proba methods - will override
# n_jobs set during initialization
clf.fit(X_train, y_train, ray_params=RayParams(num_actors=4))
pred_ray = clf.predict(X_test, ray_params=RayParams(num_actors=4))
print(pred_ray.shape)
4. Use with in job mode with AML job
ws = Workspace.from_config()
compute_cluster = 'worker-cpu-v3'
maxnode =5
vm_size='STANDARD_DS3_V2'
vnet='rayvnet'
subnet='default'
exp ='ray_on_aml_job'
ws_detail = ws.get_details()
ws_rg = ws_detail['id'].split("/")[4]
vnet_rg=None
try:
ray_cluster = ComputeTarget(workspace=ws, name=compute_cluster)
print('Found existing cluster, use it.')
except ComputeTargetException:
if vnet_rg is None:
vnet_rg = ws_rg
compute_config = AmlCompute.provisioning_configuration(vm_size=vm_size,
min_nodes=0, max_nodes=maxnode,
vnet_resourcegroup_name=vnet_rg,
vnet_name=vnet,
subnet_name=subnet)
ray_cluster = ComputeTarget.create(ws, compute_cluster, compute_config)
ray_cluster.wait_for_completion(show_output=True)
rayEnv = Environment.from_conda_specification(name = "rayEnv",
file_path = "../examples/conda_env.yml")
# rayEnv = Environment.get(ws, "rayEnv", version=19)
src=ScriptRunConfig(source_directory='../examples/job',
script='aml_job.py',
environment=rayEnv,
compute_target=ray_cluster,
distributed_job_config=PyTorchConfiguration(node_count=maxnode),
# arguments = ["--master_ip",master_ip]
)
run = Experiment(ws, exp).submit(src)
This is the code inside aml_job.py with details omitted for brevity
if __name__ == "__main__":
run = Run.get_context()
ws = run.experiment.workspace
account_key = ws.get_default_keyvault().get_secret("adls7-account-key")
ray_on_aml =Ray_On_AML()
ray = ray_on_aml.getRay()
if ray: #in the headnode
print("head node detected")
datasets.MNIST("~/data", train=True, download=True)
analysis = tune.run(train_mnist, config=search_space)
print(ray.cluster_resources())
print("data count result", get_data_count(account_key))
else:
print("in worker node")
5. View Ray dashboard
The easiest way to view Ray dashboard is using the connection from VSCode for Azure ML. Open VSCode to your Compute Instance, open a terminal windows, type http://127.0.0.1:8265/ then ctrl+click to open the Ray Dashboard in your local computer.
This trick tells VScode to forward port to your local machine without having to setup ssh port forwarding using VScode's extension on the CI.
See more examples at quick start examples
In partnership with Hyun Suk Shin.