Forum Discussion

MutharasuN's avatar
MutharasuN
Copper Contributor
Jul 25, 2024

Azure Machine Learning Pipeline Issue

Hello Team,

Currently, we are running a Large set of ML recommendation models in the Azure Compute Cluster while running this model it will take more than 5 days.

How can we run a large number of datasets in the Azure Compute cluster? For Example Around (5 million) records.

Find the sample Code :

import os
import pickle
import argparse
import pandas as pd
import json
from azureml.core import Workspace, Datastore, Run
from azureml.data.dataset_factory import TabularDatasetFactory
import tempfile

# Load environment variables
from dotenv import load_dotenv
load_dotenv()

# Parse arguments
parser = argparse.ArgumentParser("model_training")
parser.add_argument("--model_training", type=str, help="Model training data path")
parser.add_argument("--interaction", type=str, help="Interaction type")
args = parser.parse_args()


# Workspace setup
workspace = Workspace(subscription_id=os.environ.get("SUBSCRIPTION_ID"),
resource_group=os.environ.get("RESOURCE_GROUP"),
workspace_name=os.environ.get("WORKSPACE_NAME"))

print('Workspace:', workspace)

# Get the datastore from the Azure ML workspace
datastore = Datastore.get(workspace, datastore_name='data_factory')
print('Datastore:', datastore)

# Define the path to your Parquet files in the datastore
datastore_path = [(datastore, 'sampless_silver/')]

# Create a TabularDataset from the Parquet files in the datastore
dataset = TabularDatasetFactory.from_parquet_files(path=datastore_path)
print('Dataset:', dataset)

# Convert the TabularDataset to a Pandas DataFrame
training_dataset = dataset.to_pandas_dataframe()
print('Training Dataset:', training_dataset)

# Sample data
training_dataset = training_dataset.head(25000000)
training_dataset = training_dataset.sample(frac=1).reset_index(drop=True)
training_dataset["views"] = pd.to_numeric(training_dataset['views'], errors='coerce')

df_selected = training_dataset.rename(columns={'clientId': 'userID', 'offerId': 'itemID', 'views': 'views'})
df_selected = df_selected[['userID', 'itemID', 'views']]
print('Selected Data:', df_selected)

# Create and fit model
from lightfm import LightFM
from lightfm import cross_validation

dataset = Dataset()
dataset.fit(users=df_selected['userID'], items=df_selected['itemID'])
(interactions, weights) = dataset.build_interactions(df_selected.iloc[:, 0:3].values)

user_dict_label = dataset.mapping()[0]
item_dict_label = dataset.mapping()[2]

train_interactions, test_interactions = cross_validation.random_train_test_split(
interactions, test_percentage=0.25, random_state=np.random.RandomState(2016))

model = LightFM(loss='warp', no_components=1300, learning_rate=0.000001,
random_state=np.random.RandomState(2016), user_alpha=0.000005, max_sampled=100, k=100,
learning_schedule='adadelta', item_alpha=0.000005)
print('Model:', model)

model.fit(interactions=train_interactions, epochs=2, verbose=True, num_threads=8)

user_dict_label = {str(key): value for key, value in user_dict_label.items()}
item_dict_label = {str(key): value for key, value in item_dict_label.items()}

# Save and upload model
with tempfile.TemporaryDirectory() as tmpdirname:
recommendation_model_offer = os.path.join(tmpdirname, "sample_recommendation_model.pkl")
with open(recommendation_model_offer, 'wb') as f:
pickle.dump(model, f)

model_intersection = os.path.join(tmpdirname, "sample_training_intersection.pkl")
with open(model_intersection, 'wb') as f:
pickle.dump(interactions, f)

model_user_dict = os.path.join(tmpdirname, "users_dict_label.json")
with open(model_user_dict, 'w') as f:
json.dump(user_dict_label, f)

model_item_dict = os.path.join(tmpdirname, "items_dict_label.json")
with open(model_item_dict, 'w') as f:
json.dump(item_dict_label, f)

datastore.upload_files(
files=[recommendation_model_offer, model_intersection, model_user_dict, model_item_dict],
target_path='SAMPLE_MODEL_TRAINING/',
overwrite=True
)
print('Files uploaded to datastore')

# Register the model
register_name = f"{args.interaction}_light_fm_recommendation_model"
Model.register(workspace=workspace, model_path=tmpdirname, model_name=register_name,
tags={'affinity': args.interaction, 'sample': 'recommendation'})
print('Model registered')

Please share the feedback. Thanks!

No RepliesBe the first to reply

Resources