This is a follow up on the blog about delta datasets. If you haven’t read it yet, take a look at MGDC for SharePoint FAQ: How can I use Delta State Datasets?
Our team got some follow-up questions on this, so I thought it would make sense to write a little more and make things clear.
First of all, from some conversations with CoPilot, the basic SQL code for merging a delta would be something like this:
-- Start a transaction
BEGIN TRANSACTION;
-- Assuming the Users table has a primary key constraint on user_id
-- and the UserChanges table has a foreign key constraint on user_id referencing Users
-- First, delete the users that have operation = 'Deleted' in UserChanges
DELETE FROM Users
WHERE user_id IN
(SELECT user_id
FROM UserChanges
WHERE operation = 'Deleted');
-- Next, update the users that have operation = 'Updated' in UserChanges
UPDATE Users
SET user_name = UC.user_name,
user_age = UC.user_age
FROM Users U
JOIN UserChanges UC ON U.user_id = UC.user_id
WHERE UC.operation = 'Updated';
-- Finally, insert the users that have operation = 'Created' in UserChanges
INSERT INTO Users (user_id, user_name, user_age)
SELECT user_id, user_name, user_age
FROM UserChanges
WHERE operation = 'Created';
-- Commit the transaction
COMMIT TRANSACTION;
Note that the column names used (shown here as user_id, user_name and user_age) need to be updated for each dataset, but the structure will be the same.
I also asked CoPilot to translate this SQL code to PySpark and it suggested the code below (with a few minor manual touches):
# Import SparkSession and functions
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
# Create SparkSession
spark = SparkSession.builder.appName("Delta dataset").getOrCreate()
# Assuming the Users and UserChanges tables are already loaded as DataFrames
users = spark.table("Users")
user_changes = spark.table("UserChanges")
# First, delete the users that have operation = 'Deleted' in UserChanges
users = users.join(user_changes.filter(user_changes.operation == "Deleted"), "user_id", "left_anti")
# Next, update the users that have operation = 'Updated' in UserChanges
users = users.join(user_changes.filter(user_changes.operation == "Updated"), "user_id", "left_outer") \
.select(F.coalesce(user_changes.user_name, users.user_name).alias("user_name"),
F.coalesce(user_changes.user_age, users.user_age).alias("user_age"),
users.user_id)
# Finally, insert the users that have operation = 'Created' in UserChanges
users = users.union(user_changes.filter(user_changes.operation == "Created")
.select("user_name", "user_age", "user_id"))
After that, there’s the question of how to run this in Azure Data Factory or Azure Synapse.
I would suggest going with Azure Synapse. You could get some inspiration from the template that we published https://go.microsoft.com/fwlink/?linkid=2207816. This includes examples of how to get the data and run a notebook to produce a new dataset.
Another good resource is this guide on “How to transform data by running a Synapse Notebook”. The link is at https://learn.microsoft.com/en-us/azure/data-factory/transform-data-synapse-notebook.
The more notable part missing from the code above is how to read the data from ADLS v2. For that, here is a link to stack overflow article on how to bring the data in and out of ADLS v2 using Linked Services. There is an article specifically on that at https://learn.microsoft.com/en-us/azure/synapse-analytics/spark/tutorial-spark-pool-filesystem-spec.
The code above is only an example. When you apply deltas to the actual SharePoint dataset, in order to find rows to update or delete, you will need to know the primary key that uniquely identifies those rows. Check the list of datasets and primary keys in this section of the previous blog post on delta datasets.
That's it! For more general information MGDC for SharePoint, visit the main blog at Links about SharePoint on MGDC.
Updated Sep 11, 2024
Version 4.0Jose_Barreto
Microsoft
Joined April 02, 2018
Microsoft Graph Data Connect for SharePoint Blog
Follow this blog board to get notified when there's new activity