Azure Data Factory's Mapping Data Flows feature enables graphical ETL designs that are generic and parameterized. In this example, I'll show you how to create a reusable SCD Type 1 pattern that could be applied to multiple dimension tables by minimizing the number of common columns required, leveraging parameters and ADF's built-in schema drift capability.
The completed version of this data flow is shown above and is also available as a pipeline template that you can add to your factory by downloading it here and then importing it into your factory. Go to New > "Pipeline from template" and then point to this local file.
Typically, when building ETL for a data warehouse solution, you will need to manage many different dimension tables. With ADF Mapping Data Flows, you can minimize the number of different flows that you need to create by creating generic, reusable data flows. The key is to identify the commonality between each dimension table and then create parameters for the elements that change between executions and tables.
For this demo data flow, I created string parameters for these elements:
1. dimname: can be used to create intermediate files with the name of the dimension
2. keycol: the name of the business key column in the target dimension table. In this case, it is "ProductID"
3. SurrogateKey: the column name for the dimension table's surrogate key (non-business key)
4. tablename: the target dimension table name
Inside of the logic in the example, you will see that I am assuming 4 fields will be common in every dimension table in your stars schema model:
Name: the member name in the dimension table
_Inserted: timestamp for when the new member was added to the table
_LastUpdated: timestamp for when this member row was last updated
DWhash: a SHA2 hash of the row to use for comparing new vs. old properties in each row
These are all very common attributes used in star schemas. But you can modify these columns and parameters to fit your model. The idea here is to provide a pattern that you can mimic in your business. I explain the steps below, but the best way to get comfortable with this technique is to download this sample and play around with it in your factory.
Here are the high-level steps to make this work for more than just one dimension:
Sources: There are two sources. One is a file of updated members. In this example, we are using the products dimension. So the first source at the top is a CSV file of members that were changed that day. Because we want this data flow to be reusable, the dataset we use is parameterized. When you execute this data flow from a pipeline, you will set the source filename so that a different file can be processed on every execution. Set a parameter in the source dataset of type Delimited Text called filename. Set this expression in the filename property in the dataset: @dataset().filename. The second source is a database source that points to the DimProducts table. But instead of hardcoding the table name, we'll parameterize it. In the dataset, create a parameter called tablename and use this formula in the table name property (click edit): @dataset().tablename. The 3rd source is used just to grab the max value of the existing surrogate key in the target dimension table. We'll use the dataflow parameters in this query which needs to be constructed from the Expression Builder to provide a string value to ADF: concat('select max(',$SurrogateKey,') as maxsk from ',$tablename).
Derived Column transformations: Both sources use derived column transformations to define the field names that you want to extract from each and then subsequently use in your data flow. The "PickCols" transformation on the 1st and 2nd streams are using pattern matching to create new columns that will be the metadata that I'll have to work with. We are using "schema drift" options in each source and since there is no schema, there is no projection. So I'm using the derived column transformation to create projections.
The first example above is used in the DimProduct source stream. To keep this pattern generic, I am looking for any field that has the characters 'ID' in the column name by using a pattern match with instr(). Then I map that field to the name 'surrogatekey2' so that I can use that canonical name throughout my data flow. Then I do the same thing for 'businesskey2'. For this to work, the dimension table must have a column with 'Number' in the column name for the business key.
The 2nd Derived Column uses the byPosition() function and is used by the CSV file source. With this, I can map ordinal positions in the file to my business key from the updated dimensions and the 'name' field, which I'll use to populate in the dimension database later.
On the 2nd stream, you'll see a "MapDrifted" derived column transformation next. That is created automatically by ADF when I click "Map Drifted" in the data preview. With this, I now have a full set of projection columns from a drifted source to work with.
The next step in the flow is to perform a Lookup. This will match incoming business keys with existing business keys so that we can determine what action to take with the products file: update or insert.
Even though this flow is generic and parameterized, because I mapped the key fields from the sources to canonical names, "businesskey1" and "businesskey2", I can use those in the Lookup as businesskey1 == businesskey2.
Next, I use the isMatch() function to check for which rows matched and which ones didn't match, then add a Conditional Split so that I now have a branch for matches and a branch for no matches.
The matched branch is the top branch in the diagram and the last steps will be a Derived Column to set properties like Updated Time and an Alter Row set the Update policy to true().
Because the top row is for SCD Type 1 updates, I need to use both the Alter Row and the update method settings on the Sink. My Sink dataset will be the same dataset I'm using for the DimProducts source. There is no schema defined in this dataset because this is a fully generic, template pattern. So I will set "schema drift" on for the sink, set the Key column to my parameter $keycol and allow updates.
To build a proper mapping for the dimension table, I must remove all drifted columns from the file source and only land the appropriate database columns. So, I'll use a regular expression in the name matches field to tell ADF to land all incoming columns except for those that end in underscore because those are unnamed drifted columns
Lastly, on the "no matches row", I'm only inserting new rows, so I don't need an Alter Row policy. Instead, I just set my properties with a derived column, then I use a Select transformation.
Select transformations allow you to manipulate the flow's metadata columns. I decided to use this to build my mapping for sink rather than do this directly in the sink like I did for the match row. The reason is that this mapping is a little more complex where I have to add both fixed and rule-based mappings. Note that I am able to map the canonical names of my key columns back to the database table key names by using parameters that I'll set in the data flow
The Sink mapping is just "auto map" because I've already used Select here to curate my fields.
Now you can execute this data from a pipeline and set all of your parameter values to make this SCD type 1 work across different dimension types.