Metadata Driven Pipelines for Microsoft Fabric
Published Aug 04 2023 02:56 PM 47.7K Views
Microsoft

Metadata-driven pipelines in Azure Data Factory, Synapse Pipelines, and now, Microsoft Fabric, give you the capability to ingest and transform data with less code, reduced maintenance and greater scalability than writing code or pipelines for every data source that needs to be ingested and transformed. The key lies in identifying the data loading and transformation pattern(s) for your data sources and destinations and then building the framework to support each pattern.

 

In this blog post, I will provide an overview of a Metadata driven pipeline in Microsoft Fabric that follows the medallion architecture (Bronze, Silver, Gold). The intent is not to provide a full tutorial on building metadata driven pipelines or Microsoft Fabric; rather it is show you some new features of Fabric and give you some ideas implementing metadata driven pipelines in Fabric.

 

Update 10/30/2023: Read this blog first, then check out this GitHub Repo to recreate in your own environment!

 

Metadata driven architecture for Fabric Modern Data Warehouse

 

jehayes_0-1691177546670.png

The goal for this solution is to build a Star Schema in a Microsoft Fabric Lakehouse with Delta Tables, a Power BI Direct Lake Dataset and related reports for end user consumption. The solution contains full or incremental loads to the Bronze Lakehouse, leverages SQL Views as the Silver Layer, then performs full or incremental loads to the Gold Lakehouse.

 

Below are more details on each numbered part in the architecture diagram:

 

1 - Define pipeline configuration tables

Tables are defined that contain the configuration for each type of data load, 1 table for loading from the source SQL database to the Bronze Fabric Lakehouse and a 2nd table defined for loading from the Bronze Fabric Lakehouse to the Gold Lakehouse. Each table contains a row for each source/destination combination and includes such fields as source table name, source schema, date key, start date, and load type (full or incremental). The tables also contain fields for pipeline run results, such as the number of rows inserted, updated, status, max table transaction date, which are updated after each table is loaded.

 

The table below shows the configuration table for loading from a Source SQL database to the Bronze Lakehouse:

 

jehayes_0-1691177718511.png

 

 

The table below shows the configuration table for loading from the Bronze Lakehouse to the Gold Lakehouse:

jehayes_1-1691177748680.png

 

2 - Get Configuration details for tables to load from Source to Bronze Lakehouse

Below is what our final Orchestrator pipeline will look like, with the relevant steps from the architecture diagram above indicated:

jehayes_0-1691177819658.png

 

 

The orchestrator pipeline contains a Lookup activity on the Source to Bronze configuration table to get the list of tables to load from source to the Bronze.

jehayes_1-1691177819688.png

 

3 - Call child pipeline to load data from Source to Bronze Lakehouse

For each table defined in the Lookup activity, call a child pipeline to load the data from Source to Bronze Lakehouse, passing in the configuration detail from the lookup.

 

For Each activity:

jehayes_0-1691177900809.png

 

 

Child pipeline to load from Source to Bronze Lakehouse:

jehayes_1-1691177900821.png

 

4 - Copy Data from Source to Bronze Lakehouse

A step to set a variable called datepredicate is part of this pipeline. A selection predicate based upon date is needed for incremental loads from the source or if you want to load just a subset of the data. This simplifies the creation of the SQL source query string in the subsequent Copy Data activity.

jehayes_0-1691178073251.png

 

If the load type setting from the configuration table is a full load, do a Copy Data Activity from the Source to Bronze Lakehouse Delta Lake Table.

 

Full load Copy Data Source settings:

jehayes_1-1691178073286.png

 

Full Load Copy Data Destination Settings:

jehayes_0-1691178377639.png

If the load type setting from the configuration table is an incremental load, do a Copy Data Activity from the Source to Bronze Lakehouse but as set destination as a Parquet file.

 

Incremental load Copy Data source settings:

jehayes_0-1691178410971.png

 

Incremental load Copy Data destination settings:

jehayes_1-1691178423102.png

 

5 - Call Notebook for incremental load merge

For incremental loads only, call a Spark Notebook to merge the incremental data to the Bronze Delta Lake table.

jehayes_0-1691178464374.png

 

Create or Merge to Deltalake Notebook code below:

 

 

 

 

 

 

 

from delta.tables import *
from pyspark.sql.functions import *

lakehousePath = "abfss://yourpathhere"
tableName = "Invoices"
tableKey = "InvoiceID"
tableKey2 = None
dateColumn = "LastEditedWhen"

deltaTablePath = f"{lakehousePath}/Tables/{tableName}" 
parquetFilePath = f"{lakehousePath}/Files/incremental/{tableName}/{tableName}.parquet"

df2 = spark.read.parquet(parquetFilePath)

if tableKey2 is None:
    mergeKeyExpr = f"t.{tableKey} = s.{tableKey}"
else:
    mergeKeyExpr = f"t.{tableKey} = s.{tableKey} AND t.{tableKey2} = s.{tableKey2}"  

#Check if table already exists; if it does, do an upsert and return how many rows were inserted and update; if it does not exist, return how many rows were inserted
if DeltaTable.isDeltaTable(spark,deltaTablePath):
    deltaTable = DeltaTable.forPath(spark,deltaTablePath)
    deltaTable.alias("t").merge(
        df2.alias("s"),
        mergeKeyExpr
    ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
    history = deltaTable.history(1).select("operationMetrics")
    operationMetrics = history.collect()[0]["operationMetrics"]
    numInserted = operationMetrics["numTargetRowsInserted"]
    numUpdated = operationMetrics["numTargetRowsUpdated"]
else:
    df2.write.format("delta").save(deltaTablePath)  
    deltaTable = DeltaTable.forPath(spark,deltaTablePath)
    operationMetrics = history.collect()[0]["operationMetrics"]
    numInserted = operationMetrics["numTargetRowsInserted"]
    numUpdated = 0

#Get the latest date loaded into the table - this will be used for watermarking; return the max date, the number of rows inserted and number updated

deltaTablePath = f"{lakehousePath}/Tables/{tableName}"
df3 = spark.read.format("delta").load(deltaTablePath)
maxdate = df3.agg(max(dateColumn)).collect()[0][0]
# print(maxdate)
maxdate_str = maxdate.strftime("%Y-%m-%d %H:%M:%S")

result = "maxdate="+maxdate_str +  "|numInserted="+str(numInserted)+  "|numUpdated="+str(numUpdated)
# result = {"maxdate": maxdate_str, "numInserted": numInserted, "numUpdated": numUpdated}
mssparkutils.notebook.exit(str(result))

 

 

 

 

 

 

Return the number of rows inserted, updated and max date from the notebook results and store them in pipeline variables.

 

6 - Save pipeline run results to configuration table

For each table loaded, update the configuration table with the load details such as the number of rows read, inserted and updated from the variables or Copy Data output. What is especially critical for the incremental load is to update the start date configuration with the max transaction date loaded., which is returned in the Create or Merge to Deltalake notebook. This will be used to retrieve records from the source on the next subsequent run which are greater or equal to the max datetime of the table data loaded in this run.

jehayes_0-1691178593072.png

 

7 - Leverage SQL Views over Bronze Lakehouse tables for Silver layer

SQL views are defined over the Bronze Lakehouse Delta tables. These views will be the source for loading the Gold Lakehouse tables from the Bronze Lakehouse tables.


While SQL views are supported in the Lakehouse SQL Endpoint, they are created and accessible only via the SQL Endpoint, which means that they are not available to us in a Data Factory Copy Data activity. The Copy Data Activity only leverages the Lakehouse endpoint at this time. However, views are accessible in the Fabric Data Warehouse in a Copy Data Activity. Therefore, views are created in a Fabric Data Warehouse. All Data Warehouse views reference Lakehouse Bronze tables and there is no data movement between Bronze to Silver:

jehayes_0-1691178642500.png

 

8 - Get configuration details to load tables from Silver Views/Bronze Lakehouse to Gold Lakehouse

With the tables loaded in the Bronze Lakehouse and the views defined in the Silver layer, we can transform and load the tables to the Gold Lakehouse. In the orchestrator pipeline, do a Lookup on the configuration table to get the details for each Gold table load:

jehayes_0-1691178692223.png

9 - Call child pipeline to load data from Silver Views/Bronze Lakehouse to Gold Lakehouse

For each table configuration returned from the Lookup activity, call a child pipeline to load the data from Bronze Lakehouse to Gold Lakehouse, passing in the configuration detail from the lookup.

jehayes_0-1691178731611.png

Child pipeline to load from Bronze Lakehouse to Gold Lakehouse:

jehayes_1-1691178740902.png

The pipeline to load from Bronze to Gold has a similar pattern to the pipeline that loads from Source to Bronze, except our source is Fabric Data Warehouse views in the Silver layer that reference tables in the Bronze Lakehouse.

 

If the load type setting from the configuration table is a full load, do a Copy Data Activity from the Silver View to Gold Lakehouse Delta Lake Table.


Full load Copy Data Source settings:

jehayes_0-1691178777478.png

 

Full Load Copy Data Destination Settings:

jehayes_1-1691178789530.png

 

If the load type setting from the configuration table is a full load, set the datepredicate variable to set the selection predicate for the incremental load query.

 

jehayes_0-1691178833879.png

 

 

Then do a Copy Data Activity from the Bronze Lakehouse to the Gold Lakehouse but as a Parquet file.

 

Incremental load Copy Data source settings:

jehayes_1-1691178833902.png

 

Incremental load Copy Data destination settings:

jehayes_2-1691178849749.png

 

10 - Call Notebook for incremental load merge, Gold Lakehouse Delta tables

For incremental loads only, call a Spark Notebook to merge the incremental data to the Gold Delta Lake table. This is the same notebook that was called in the Load Source to Bronze Pipeline for incremental loads in step 5.

jehayes_0-1691178893706.png

 

 

 

Return the number of rows inserted, updated and max date from the notebook results and store them in variables.

11 - Save pipeline run results to configuration table

Like step 6 for the Load Source to Bronze pipeline, update the Gold configuration table with the load details such as the number of rows read, inserted and updated. Again, it is critical to update the start date configuration with the max transaction date loaded for incremental loads.

 

12 - Create Fabric Dataset

Now that you Star Schema is created in your Gold Lakehouse, you can create a Dataset to be consumed by Power BI reports:

jehayes_0-1691178936755.png

 

Note the Blue dashed line on the top edge of each table. Hover over the table and you can see that the Storage Mode is DirectLake, which means that Power BI reports will connect directly to the Delta Lake tables without having to import into a Power BI in-memory dataset or having to use a SQL Endpoint like Direct Query. Here you can create relationships between your fact and dimension tables, create measures, define hierarchies, etc. just like a Power BI dataset created in Power BI Desktop.

 

13 - Create Power BI Reports

Finally, create a Power BI Report on top of your dataset:

jehayes_1-1691178965391.png

 

In this post, I illustrated different features of Microsoft Fabric to build metadata driven pipelines for your data workloads. Microsoft Fabric offers a one stop shop for building a Modern Data Warehouse in a Lakehouse with Delta Lake tables. Power BI Direct Lake connectivity to the Fabric Lakehouse Delta Lake tables offers the performance of Power BI Import storage mode with the accessibility of Direct Query, getting critical data to your end users without the overhead of importing and scheduling data refreshes or with the performance lags of Direct Query.

 

In a couple weeks I will post another pattern on loading data to the Fabric Data Warehouse rather than the Fabric Lakehouse and reasons you may want to do so. Stay tuned!

Update 8/22/2023 - And here it is! Metadata Driven Pipelines for Microsoft Fabric – Part 2, Data Warehouse Style - Microsoft Community ...

 

Additional resources:

https://learn.microsoft.com/en-us/fabric/get-started/

https://learn.microsoft.com/en-us/fabric/data-engineering/lakehouse-overview

https://learn.microsoft.com/en-us/fabric/data-factory/

https://learn.microsoft.com/en-us/power-bi/enterprise/directlake-overview

 

 

19 Comments
Co-Authors
Version history
Last update:
‎Oct 30 2023 10:33 AM
Updated by: