The Copy Data activity in Azure Data Factory/Synapse Analytics allows data to be moved from a source table to sink destination in parallel, allowing for better performance versus single threaded operations. Non-file source connectors such as Azure SQL DB, SQL Server, Oracle and others have an option to pull data in parallel by source data partition, potentially improving performance by extracting your source data with a thread for each source table partition.
The Copy Data activity can take advantage of partitions built into your source table. Partitions can also be defined on the fly with Dynamic Partition Ranges.
In this blog post, I will illustrate how to create Dynamic Partition Ranges as part of a metadata-driven pipeline, allowing your Copy Data activity to take advantage of the parallelism features of ADF/Synapse Analytics Pipelines, even when your source table is not physically partitioned.
Partition settings are shown on the Source settings of the Copy Data activity. If partitions are defined on your source table, you are good to go! Just check the “Physical partitions of table” option, as shown below:
A thread will be created for each physical partition when the Copy Data activity is run up to the maximum number of threads, which is specified on the Copy Data activity Settings, Degree of copy parallelism property:
The Degree of copy parallelism default value is 20; the maximum value is 50.
If you do not have physical partitions in your data, or want to use a different column for partitions, you can still leverage parallel copy by partition with the Dynamic range option:
Note this requires some knowledge of the table, including the column name for partitioning and the range of data. Other than that, what’s not to like about this?
Well... my table does not have a good candidate for partitioning – it has a date field and I really want to perform the parallel copy by year rather than date. The partition column name only allows a column name, not an expression.
Also hardcoding the partition column name and partition ranges does not fit well into a metadata-driven pipeline – I may have different partition column names for each table, different data types, different column expressions as well as different partition ranges.
What can I do? I take advantage of parameter and dynamic content expression capabilities in Azure Data Factory and Synapse Analytics Pipelines!
Solution Overview
The Data Factory in my demo environment uses Azure SQL DB as the source. It includes a Linked Service to my Azure SQL DB along with an Azure SQL DB dataset with parameters for the SQL schema name and table name.
The Data Factory also includes a pipeline which has pipeline parameters for schema name, table name, and column expression to be used in dynamic content expressions. The pipeline first performs a Lookup to return the upper bound and lower bound partition values for the column expression. Then it executes a Copy Data activity, leveraging the pipeline parameter values and upper bound/lower bound values for the dynamic range settings.
Solution Detail
1. Define a dataset with parameters for schema and table names
A dataset was created for Azure SQL DB with parameters for SchemaName and TableName:
The parameters are then used in the Table properties on the Connection settings:
2. Create a pipeline and define pipeline parameters
The pipeline includes parameters for schema name, table name, and column name/expression:
The parameters would be populated by an Execute Pipeline activity of an orchestrator pipeline or by a trigger calling this pipeline. The parameters are later used in the Lookup Activity and Copy Data Activity.
3. Add the Lookup activity to the pipeline
The Lookup Activity returns Upper Bound and Lower Bound over my partition column/expression:
Below is the full Query expression specified in the pipeline expression builder:
select Max(@{pipeline().parameters.dynamicpartitioncolumn}) as UpperBound, Min(@{pipeline().parameters.dynamicpartitioncolumn}) as LowerBound from @{pipeline().parameters.schemaname}.@{pipeline().parameters.tablename}
Below is the Lookup Input of the pipeline Output when the activity is run, showing the actual query executed:
One row with two columns, UpperBound and LowerBound, are returned by the Lookup activity with the Max Year and Min Year of ModifiedDate.
In the Lookup Output, you can see the UpperBound and LowerBound return values:
4. Add Copy Data activity and set Source settings
The Source settings in the Copy Data activity are where the source table and partition values are specified
@{activity('Get Column Value Min and Max').output.firstRow.UpperBound}
@{activity('Get Column Value Min and Max').output.firstRow.LowerBound}
Below is the Input of the Copy Data activity run output, showing the new column that was added to the source query and the upper bound and lower bound ranges of the data:
Since I only had one year of data, you can see that the number of parallel copies used was only 1:
That's all there is to it!
Summary
Parallelism in Copy Data activities provides the opportunity for data ingestion performance improvements. The pattern demonstrated in this blog shows you how you can achieve parallelism, even when your source data is not partitioned, all within a metadata-driven pipeline!
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.