Turn AML compute cluster into Spark
Published Feb 01 2022 05:48 PM 5,828 Views
Microsoft

Following the blog post on Turning AML compute into Ray and Dask , we've added a new exciting capability to run Spark within AML compute where Spark shares the same context with your ML code. The Spark version is 3.2.1 with support for Delta Lake and Synapse SQL read/write. This enables users of AML to perform powerful data transformation and even Spark ML within AML interactive notebook or in a job run. 

Traditionally, Azure ML integrates with Spark Synapse or external compute services via a pipeline step or better via magic command like %synapse, but the computing context is separate from your AML logic so you still need to run Spark in a separate step and persist the output to some storage and load it in your AML script.

With this approach, Spark is available right within your AML code whether it's AML notebook, python script or pipeline step. It shares the common computing context and most of the cases you can just directly convert the Spark Dataframe to Pandas and Dask Dataframe without persisting first to an intermediary storage.

Here are examples of how Spark inside AML can benefit users:


1. Easy Spark setting up with credentials to access Azure Storage

 

 

 

from ray_on_aml.core import Ray_On_AML
ray_on_aml =Ray_On_AML(ws=ws, compute_cluster ="d12-v2-ssh", maxnode=5)
ray = ray_on_aml.getRay()
storage_account_name ="adlsdatalakegen6"
storage_account_key=ws.get_default_keyvault().get_secret("adlsdatalakegen6")
additional_spark_configs ={f"fs.azure.account.key.{storage_account_name}.dfs.core.windows.net":f"{storage_account_key}"}
spark = ray_on_aml.getSpark(executor_cores =3,num_executors =3 ,executor_memory='10GB', additional_spark_configs=additional_spark_configs)
#Number of nodes (including head node) can be set as number of executor. 

 

 

 


2. Fast loading high volume data from delta lake in ADLS or Synapse SQL Pool

 

 

 

#Access delta lake data
adls_data = spark.read.format("delta").load("abfss://mltraining@adlsdatalakegen6.dfs.core.windows.net/ISDWeatherDelta")

 

 

 

 

 

 

server_name = "jdbc:sqlserver://sy2qwhqqkv7eacsws1.sql.azuresynapse.net:1433"
database_name = "sy2qwhqqkv7eacsws1p1"
url = server_name + ";" + "databaseName=" + database_name + ";"
table_name = "ISDWeatherDelta"
username = "azureuser"
password = "" 
jdbcDF = spark.read \
        .format("jdbc") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
        .option("user", username) \
        .option("password", password).load()

jdbcDF.show()

 

 

 


3. Using Koalas (pyspark Pandas) API for EDA and data transformation on big dataset.
Data scientists loves pandas but not everyone is comfortable with Pyspark so Koalas (Pandas on pyspark) is the answer. You can easily load dataset that is way bigger than the memory of a machine and use Pandas like APIs to manipulate and visualize big dataset.

 

 

 

#Reading 73M rows of data into a Pandas like dataframe
from pyspark.pandas import read_delta
koalas_df= read_delta("abfss://mltraining@adlsdatalakegen6.dfs.core.windows.net/ISDWeatherDelta")
koalas_df.describe()

 

 

 

JamesN_2-1643807917330.pngand plotting with big dataset with plotly backend

 

 

koalas_df["snowDepth"].plot.area()

 

 

JamesN_0-1643858344144.png

 

4. Loading big dataset, transform with Spark then train XGBoost within a single ML step.
(credit: The below example is taken from raydp project)

 

 

 

data = spark.read.format("csv").option("header", "true") \
        .option("inferSchema", "true") \
        .load(NYC_TRAIN_CSV)
# Set spark timezone for processing datetime
spark.conf.set("spark.sql.session.timeZone", "UTC")
# Transform the dataset
data = nyc_taxi_preprocess(data)
# Split data into train_dataset and test_dataset
train_df, test_df = random_split(data, [0.9, 0.1], 0)
# Convert spark dataframe into ray dataset
train_dataset = ray.data.from_spark(train_df)
test_dataset = ray.data.from_spark(test_df)
# Then convert them into DMatrix used by xgboost
dtrain = RayDMatrix(train_dataset, label='fare_amount')
dtest = RayDMatrix(test_dataset, label='fare_amount')
# Configure the XGBoost model
config = {
    "tree_method": "hist",
    "eval_metric": ["logloss", "error"],
}
evals_result = {}
# Train the model
bst = train(
        config,
        dtrain,
        evals=[(dtest, "eval")],
        evals_result=evals_result,
        ray_params=RayParams(max_actor_restarts=2, num_actors=2, cpus_per_actor=2),
        num_boost_round=10)

 

 

 

 

5. Spark mllib

(Collaborative filtering example from https://spark.apache.org/docs/3.2.1/ml-collaborative-filtering.html)

 

 

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

lines = spark.read.text("abfss://mltraining@adlsdatalakegen6.dfs.core.windows.net/mllib/sample_movielens_ratings.txt").rdd
parts = lines.map(lambda row: row.value.split("::"))
ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
                                     rating=float(p[2]), timestamp=int(p[3])))
ratings = spark.createDataFrame(ratingsRDD)
(training, test) = ratings.randomSplit([0.8, 0.2])

# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")
model = als.fit(training)

# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)

# Generate top 10 movie recommendations for a specified set of users
users = ratings.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = model.recommendForUserSubset(users, 10)
# Generate top 10 user recommendations for a specified set of movies
movies = ratings.select(als.getItemCol()).distinct().limit(3)
movieSubSetRecs = model.recommendForItemSubset(movies, 10)

 

 


6. Use in a remote AML job (CLI v2 is supported)
You can easily turn your remote AML cluster into Spark in the same way you would do with ray in ray-on-aml.
In your python script, do something like this

 

 

 

if __name__ == "__main__":
    run = Run.get_context()
    ws = run.experiment.workspace
    ray_on_aml =Ray_On_AML()
    ray = ray_on_aml.getRay()
    if ray: #in the headnode
        spark = ray_on_aml.getSpark(executor_cores =3,num_executors =3 ,executor_memory='10GB')

 

 

 

 

This capability is based on the work from raydp which develops a framework to run Spark on ray

For installation, this capability is part of ray-on-aml library, so you just need to upgrade the library to the latest version to use the Spark capability.

Checkout Spark example notebook from ray-on-aml repo to learn more!

Co-Authors
Version history
Last update:
‎Apr 27 2022 06:45 PM
Updated by: