Leverage Copy Data Parallelism with Dynamic Partitions in ADF/Synapse Metadata-driven Pipelines
Published Dec 08 2022 10:42 AM 3,346 Views
Microsoft

The Copy Data activity in Azure Data Factory/Synapse Analytics allows data to be moved from a source table to sink destination in parallel, allowing for better performance versus single threaded operations. Non-file source connectors such as Azure SQL DB, SQL Server, Oracle and others have an option to pull data in parallel by source data partition, potentially improving performance by extracting your source data with a thread for each source table partition.

 

The Copy Data activity can take advantage of partitions built into your source table. Partitions can also be defined on the fly with Dynamic Partition Ranges. 

 

In this blog post, I will illustrate how to create Dynamic Partition Ranges as part of a metadata-driven pipeline, allowing your Copy Data activity to take advantage of the parallelism features of ADF/Synapse Analytics Pipelines, even when your source table is not physically partitioned.

 

Partition setting overview

Partition settings are shown on the Source settings of the Copy Data activity. If partitions are defined on your source table, you are good to go! Just check the “Physical partitions of table” option, as shown below:

 

jehayes_1-1670462314062.png

 

A thread will be created for each physical partition when the Copy Data activity is run up to the maximum number of threads, which is specified on the Copy Data activity Settings, Degree of copy parallelism property:

 

jehayes_1-1670507171166.png

 

The Degree of copy parallelism default value is 20; the maximum value is 50.

 

If you do not have physical partitions in your data, or want to use a different column for partitions, you can still leverage parallel copy by partition with the Dynamic range option:

 

jehayes_2-1670507557481.png

 

Note this requires some knowledge of the table, including the column name for partitioning and the range of data. Other than that, what’s not to like about this?

 

Well... my table does not have a good candidate for partitioning – it has a date field and I really want to perform the parallel copy by year rather than date. The partition column name only allows a column name, not an expression.

 

Also hardcoding the partition column name and partition ranges does not fit well into a metadata-driven pipeline – I may have different partition column names for each table, different data types, different column expressions as well as different partition ranges.

 

What can I do? I take advantage of parameter and dynamic content expression capabilities in Azure Data Factory and Synapse Analytics Pipelines!

 

Dynamic range partitions for meta-data driven pipeline

jehayes_3-1670507930088.png

Solution Overview

The Data Factory in my demo environment uses Azure SQL DB as the source. It includes a Linked Service to my Azure SQL DB along with an Azure SQL DB dataset with parameters for the SQL schema name and table name.

 

The Data Factory also includes a pipeline which has pipeline parameters for schema name, table name, and column expression to be used in dynamic content expressions. The pipeline first performs a Lookup to return the upper bound and lower bound partition values for the column expression. Then it executes a Copy Data activity, leveraging the pipeline parameter values and upper bound/lower bound values for the dynamic range settings.

 

Solution Detail

1. Define a dataset with parameters for schema and table names

 

A dataset was created for Azure SQL DB with parameters for SchemaName and TableName:

 

jehayes_0-1670511669439.png

 

The parameters are then used in the Table properties on the Connection settings:

 

jehayes_1-1670511888102.png

 

 

2.  Create a pipeline and define pipeline parameters

 

The pipeline includes parameters for schema name, table name, and column name/expression:

 

jehayes_1-1670462907765.png

The parameters would be populated by an Execute Pipeline activity of an orchestrator pipeline or by a trigger calling this pipeline. The parameters are later used in the Lookup Activity and Copy Data Activity.

 

3. Add the Lookup activity to the pipeline 

 

The Lookup Activity returns Upper Bound and Lower Bound over my partition column/expression: 

 

jehayes_0-1670508830446.png

 

 Below is the full Query expression specified in the pipeline expression builder:

 

 

select Max(@{pipeline().parameters.dynamicpartitioncolumn}) as UpperBound, Min(@{pipeline().parameters.dynamicpartitioncolumn}) as LowerBound from @{pipeline().parameters.schemaname}.@{pipeline().parameters.tablename}

 

 

Below is the Lookup Input of the pipeline Output when the activity is run, showing the actual query executed:

 

 

jehayes_4-1670463192035.png

 

One row with two columns, UpperBound and LowerBound, are returned by the Lookup activity with the Max Year and Min Year of ModifiedDate. 

 

In the Lookup Output, you can see the UpperBound and LowerBound return values:

 

jehayes_5-1670463215458.png

 

4.  Add Copy Data activity and set Source settings

 

The Source settings in the Copy Data activity are where the source table and partition values are specified

 

jehayes_2-1670512299366.png

  1. Source dataset is the parameterized dataset created in Step 1
  2. The Dataset properties require that the SchemaName and TableName parameters are populated for the dataset; those are populated from the pipeline parameters for schema name and table name
  3. Table is selected for the Use query property
  4. Dynamic range is selected for the Partition option
  5. I then jumped down to Additional columns and added a new column called DynamicPartionColumn; I clicked on the drop-down arrow in the Value box and selected Add dynamic content
    1. jehayes_3-1670512729608.png

       

    2. For the dynamic content value, I selected the dynamic partition column pipeline parameter 
  6. For Partition column name in the source settings, I can now select DynamicPartitionColumn
  7. Partition upper bound and partition lower bound reference the output columns from the previous Lookup activity:

 

 

@{activity('Get Column Value Min and Max').output.firstRow.UpperBound}
@{activity('Get Column Value Min and Max').output.firstRow.LowerBound}

 

 

Below is the Input of the Copy Data activity run output, showing the new column that was added to the source query and the upper bound and lower bound ranges of the data:

 

jehayes_4-1670513459094.png

 

Since I only had one year of data, you can see that the number of parallel copies used was only 1:

 

jehayes_4-1670464113871.png

 

That's all there is to it!

 

Summary

Parallelism in Copy Data activities provides the opportunity for data ingestion performance improvements. The pattern demonstrated in this blog shows you how you can achieve parallelism, even when your source data is not partitioned, all within a metadata-driven pipeline!

1 Comment
Co-Authors
Version history
Last update:
‎Dec 09 2022 06:52 AM
Updated by: