Forum Discussion

MutharasuN's avatar
MutharasuN
Copper Contributor
Jul 25, 2024

Azure Machine Learning Pipeline Issue

Hello Team,

Currently, we are running a Large set of ML recommendation models in the Azure Compute Cluster while running this model it will take more than 5 days.

How can we run a large number of datasets in the Azure Compute cluster? For Example Around (5 million) records.

Find the sample Code :

import os
import pickle
import argparse
import pandas as pd
import json
from azureml.core import Workspace, Datastore, Run
from azureml.data.dataset_factory import TabularDatasetFactory
import tempfile

# Load environment variables
from dotenv import load_dotenv
load_dotenv()

# Parse arguments
parser = argparse.ArgumentParser("model_training")
parser.add_argument("--model_training", type=str, help="Model training data path")
parser.add_argument("--interaction", type=str, help="Interaction type")
args = parser.parse_args()


# Workspace setup
workspace = Workspace(subscription_id=os.environ.get("SUBSCRIPTION_ID"),
resource_group=os.environ.get("RESOURCE_GROUP"),
workspace_name=os.environ.get("WORKSPACE_NAME"))

print('Workspace:', workspace)

# Get the datastore from the Azure ML workspace
datastore = Datastore.get(workspace, datastore_name='data_factory')
print('Datastore:', datastore)

# Define the path to your Parquet files in the datastore
datastore_path = [(datastore, 'sampless_silver/')]

# Create a TabularDataset from the Parquet files in the datastore
dataset = TabularDatasetFactory.from_parquet_files(path=datastore_path)
print('Dataset:', dataset)

# Convert the TabularDataset to a Pandas DataFrame
training_dataset = dataset.to_pandas_dataframe()
print('Training Dataset:', training_dataset)

# Sample data
training_dataset = training_dataset.head(25000000)
training_dataset = training_dataset.sample(frac=1).reset_index(drop=True)
training_dataset["views"] = pd.to_numeric(training_dataset['views'], errors='coerce')

df_selected = training_dataset.rename(columns={'clientId': 'userID', 'offerId': 'itemID', 'views': 'views'})
df_selected = df_selected[['userID', 'itemID', 'views']]
print('Selected Data:', df_selected)

# Create and fit model
from lightfm import LightFM
from lightfm import cross_validation

dataset = Dataset()
dataset.fit(users=df_selected['userID'], items=df_selected['itemID'])
(interactions, weights) = dataset.build_interactions(df_selected.iloc[:, 0:3].values)

user_dict_label = dataset.mapping()[0]
item_dict_label = dataset.mapping()[2]

train_interactions, test_interactions = cross_validation.random_train_test_split(
interactions, test_percentage=0.25, random_state=np.random.RandomState(2016))

model = LightFM(loss='warp', no_components=1300, learning_rate=0.000001,
random_state=np.random.RandomState(2016), user_alpha=0.000005, max_sampled=100, k=100,
learning_schedule='adadelta', item_alpha=0.000005)
print('Model:', model)

model.fit(interactions=train_interactions, epochs=2, verbose=True, num_threads=8)

user_dict_label = {str(key): value for key, value in user_dict_label.items()}
item_dict_label = {str(key): value for key, value in item_dict_label.items()}

# Save and upload model
with tempfile.TemporaryDirectory() as tmpdirname:
recommendation_model_offer = os.path.join(tmpdirname, "sample_recommendation_model.pkl")
with open(recommendation_model_offer, 'wb') as f:
pickle.dump(model, f)

model_intersection = os.path.join(tmpdirname, "sample_training_intersection.pkl")
with open(model_intersection, 'wb') as f:
pickle.dump(interactions, f)

model_user_dict = os.path.join(tmpdirname, "users_dict_label.json")
with open(model_user_dict, 'w') as f:
json.dump(user_dict_label, f)

model_item_dict = os.path.join(tmpdirname, "items_dict_label.json")
with open(model_item_dict, 'w') as f:
json.dump(item_dict_label, f)

datastore.upload_files(
files=[recommendation_model_offer, model_intersection, model_user_dict, model_item_dict],
target_path='SAMPLE_MODEL_TRAINING/',
overwrite=True
)
print('Files uploaded to datastore')

# Register the model
register_name = f"{args.interaction}_light_fm_recommendation_model"
Model.register(workspace=workspace, model_path=tmpdirname, model_name=register_name,
tags={'affinity': args.interaction, 'sample': 'recommendation'})
print('Model registered')

Please share the feedback. Thanks!

  • Please consider at the following:

     

    1. Use Data Parallelism

    Break the data into smaller chunks and process them in parallel across multiple nodes in the Azure Compute Cluster. Azure supports distributed computing via frameworks like Dask or Apache Spark:

    • Convert your dataset into smaller partitions and run multiple training jobs in parallel.
    • Use Azure Databricks if it's available in your environment for a seamless Spark-based solution.

    Example:

    import dask.dataframe as dd
    
    # Convert Pandas DataFrame to Dask DataFrame
    training_dataset = dd.from_pandas(training_dataset, npartitions=10)
    
    # Perform operations in parallel
    result = training_dataset.map_partitions(your_processing_function).compute()

     

    1. Optimize LightFM Parameters

    When training a LightFM model:

    • Experiment with reducing the dimensionality (no_components) or decreasing the epochs initially to test.
    • Use a sampling strategy to work with a subset of your data and validate the impact on the results.

    For large-scale problems, you can also explore PyTorch-based collaborative filtering models, which are more optimized for GPU utilization.

     

    1. Use Azure Machine Learning Pipelines

    Azure Machine Learning (AML) pipelines enable efficient orchestration and management of large datasets:

    • Break your pipeline into stages (e.g., preprocessing, training, evaluation).
    • Use ParallelRunStep for parallel execution.

    Example:

    from azureml.pipeline.steps import ParallelRunStep
    parallel_run_step = ParallelRunStep(
        name="parallel-data-processing",
        inputs=[input_dataset],
        output=output_dataset,
        compute_target=compute_cluster,
        source_directory="./scripts",
        entry_script="process.py",
        parallel_run_config=parallel_run_config
    )

     

    1. Optimize Compute Cluster Usage
    • Scale Out: Use a larger number of nodes in the Azure Compute Cluster with sufficient cores and memory to handle the dataset.
    • Spot Instances: For cost-saving, configure your cluster to use spot instances if interruptions are acceptable.
    • Autoscaling: Enable autoscaling to dynamically allocate compute resources based on workload.

    Example:

    from azureml.core.compute import AmlCompute, ComputeTarget
    compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_D16_V3',
                                                           min_nodes=0,
                                                           max_nodes=20,
                                                           idle_seconds_before_scaledown=1200)
    compute_cluster = ComputeTarget.create(workspace, 'my-compute-cluster', compute_config)
    compute_cluster.wait_for_completion()

     

    1. Efficient Data Handling
    • Convert your data to Parquet format or use other optimized binary formats to speed up I/O operations.
    • Use TabularDataset for streaming data directly from Azure Blob Storage, avoiding large data downloads.

Resources