Blog Post

Modernization Best Practices and Reusable Assets Blog
7 MIN READ

Building Bronze Layer of Medallion Architecture in Fabric Lakehouse using WAL2JSON

KapilSamant's avatar
KapilSamant
Icon for Microsoft rankMicrosoft
Sep 10, 2024

To make the Fabric Lakehouse efficient, data should pass through several layers: Bronze, Silver, and Gold. Each layer focus on progressively enhancing data cleanliness and quality. In this article, we will specifically explore how to build the bronze layer using real-time data streaming from existing PostgreSQL databases.

Introduction

If you work in data engineering, you may have encountered the term "Medallion Architecture." This design pattern organizes data within a Lakehouse into distinct layers to facilitate efficient processing and analysis. Read more about it here. This is also a recommended design approach for Microsoft Fabric.

During migrations & modernization, one of the core goals is to make the data more accessible and thereby more valuable for end-users. Therefore, while migrating or ingesting data from on-perm databases (Oracle, Db2, etc.) and to make a Lakehouse efficient, data must pass through several layers: Bronze, Silver, and Gold. Each layer focus on progressively enhancing data cleanliness and quality.

In this article, we will specifically explore how to build the bronze layer using real-time data streaming from existing Azure PostgreSQL databases. This approach enables real-time analytics and supports AI applications by providing a real time, raw, and unprocessed data.

Note: These Fabric based implementations are possible when using other databases like Azure SQL, or Azure Cosmos DB.

 

 

Image source - https://www.databricks.com/glossary/medallion-architecture

 

What is Bronze Layer?

This layer is often referred to as the Raw Zone, where data is stored in its original format and structure. According to the common definition, the data in this layer is typically append-only and immutable, but this can be misunderstood. While the intention is to preserve the original data as it was ingested, this does not mean that there will be no deletions or updates. Instead, if deletions or updates occur, the original values are preserved as older versions. This approach ensures that historical data remains accessible and unaltered. Delta Lake is commonly used to manage this data, as they support versioning and maintain a complete history of changes

 

Azure PostgreSQL as the source for Bronze Layer

Imagine you have multiple Azure PostgreSQL databases running different applications and you want to integrate their data into a Delta Lake. You have a couple of options to achieve this. The first approach involves creating a Copy activity that extracts data from individual tables and stores it in Delta tables. However, this method requires a watermark column to track changes or necessitates full data reloads each time, which can be inefficient.

The second approach involves setting up Change Data Capture in Azure PostgreSQL to capture and stream data changes continuously. This method allows for real-time data synchronization and efficient updates to OneLake. In this blog, we will explore a proof of concept for implementing this CDC-based approach.

 

 

How to utilize PostgreSQL logical decoding, Wal2Json and Fabric Delta Lake to create a continuously replicating bronze layer?

We will be utilizing PostgreSQL logical replication, Wal2Json plugin and PySpark to capture and apply the changes to delta lake. In PostgreSQL, logical replication is a method used to replicate data changes from one PostgreSQL instance to another or to a different system. Wal2json is a PostgreSQL output plugin for logical replication that converts Write-Ahead Log (WAL) changes into JSON format.

 

Note: Mirroring in Fabric is available for Azure Databases (some in preview as of December 2024) that can continuously replicate your existing data estate directly into Fabric's OneLake.

Mirroring Azure SQL Database

Mirroring Azure SQL Managed Instance

Mirroring Azure Cosmos DB

Setup on Azure PostgreSQL

Change following server parameters by logging into Azure portal and navigating to “Server parameters” of the PostgreSQL service.

Parameter Name Value
wal_level logical
max_replication_slots >0 (e.g. 4 or 8 )
max_wal_senders >0 (e.g. 4 or 8 )

 

  1. Create publication for all the tables. Publication is a feature in logical replication that allows you to define which tables' changes should be streamed to subscribers.
    CREATE PUBLICATION cdc_publication FOR ALL TABLES;​
  2. create a replication slot with wal2json as plugin name. A slot represents a stream of changes that can be replayed to a client in the order they were made on the origin server. Each slot streams a sequence of changes from a single database. Note - Wal2json plugin is pre-installed in Azure PostgreSQL
    SELECT * FROM pg_create_logical_replication_slot('cdc_slot', 'wal2json');​
  3. You can test if the replication is running by updating some test data and running following command.
    SELECT * FROM pg_logical_slot_get_changes('cdc_slot', NULL, NULL,'include-xids', 'true', 'include-timestamp', 'true')​
  4. Now that you have tested the replication, let's look at the output format. Following are the key components of wal2jobs output followed by an example.
    Attribute Value
    xid Transaction ID.
    timestamp Timestamp when the transaction was committed.
    kind

    Type of operation (insert, update, delete).

    schema The schema of the table.
    table The name of the table where the change occurred.
    columnnames An array of column names affected by the change.
    columntypes An array of column data types corresponding to columnnames.
    columnvalues An array of new values for the columns (present for insert and update operations).
    oldkeys An object containing the primary key or unique key values before the change (present for update and delete operations).
    
    For INSERT statement
    
    {
      "xid": 8362757,
      "timestamp": "2024-08-01 15:09:34.086064+05:30",
      "change": [
        {
          "kind": "insert",
          "schema": "public",
          "table": "employees_synapse_test",
          "columnnames": [
            "EMPLOYEE_ID",
            "FIRST_NAME",
            "LAST_NAME",
            "EMAIL",
            "PHONE_NUMBER",
            "HIRE_DATE",
            "JOB_ID",
            "SALARY",
            "COMMISSION_PCT",
            "MANAGER_ID",
            "DEPARTMENT_ID"
          ],
          "columntypes": [
            "numeric(10,0)",
            "text",
            "text",
            "text",
            "text",
            "timestamp without time zone",
            "text",
            "numeric(8,2)",
            "numeric(2,2)",
            "numeric(6,0)",
            "numeric(4,0)"
          ],
          "columnvalues": [
            327,
            "3275FIRST NAME111",
            "3275LAST NAME",
            "3275EMAIL3275EMAIL",
            "3275",
            "2024-07-31 00:00:00",
            "IT_PROG",
            32750,
            0,
            100,
            60
          ]
        }
      ]
    }​
    
    For UPDATE statement
    
    {
      "xid": 8362759,
      "timestamp": "2024-08-01 15:09:37.228446+05:30",
      "change": [
        {
          "kind": "update",
          "schema": "public",
          "table": "employees_synapse_test",
          "columnnames": [
            "EMPLOYEE_ID",
            "FIRST_NAME",
            "LAST_NAME",
            "EMAIL",
            "PHONE_NUMBER",
            "HIRE_DATE",
            "JOB_ID",
            "SALARY",
            "COMMISSION_PCT",
            "MANAGER_ID",
            "DEPARTMENT_ID"
          ],
          "columntypes": [
            "numeric(10,0)",
            "text",
            "text",
            "text",
            "text",
            "timestamp without time zone",
            "text",
            "numeric(8,2)",
            "numeric(2,2)",
            "numeric(6,0)",
            "numeric(4,0)"
          ],
          "columnvalues": [
            100,
            "Third1111",
            "BLOB",
            "SKING",
            "515.123.4567",
            "2024-08-01 00:00:00",
            "AD_PRES",
            24000,
            null,
            null,
            90
          ],
          "oldkeys": {
            "keynames": [
              "EMPLOYEE_ID"
            ],
            "keytypes": [
              "numeric(10,0)"
            ],
            "keyvalues": [
              100
            ]
          }
        }
      ]
    }
    
    For DELETE statement
    
    {
      "xid": 8362756,
      "timestamp": "2024-08-01 15:09:29.552539+05:30",
      "change": [
        {
          "kind": "delete",
          "schema": "public",
          "table": "employees_synapse_test",
          "oldkeys": {
            "keynames": [
              "EMPLOYEE_ID"
            ],
            "keytypes": [
              "numeric(10,0)"
            ],
            "keyvalues": [
              327
            ]
          }
        }
      ]
    }
    
  5. Create OneLake in Fabric. For detailed instruction see this.
  6. Create a delta table with initial load of the data using Spark. 

    # PostgreSQL connection details
    jdbc_url = "jdbc:postgresql://your_postgres_db.postgres.database.azure.com:5432/postgres"
    jdbc_properties = {
        "user": "postgres",
        "driver": "org.postgresql.Driver"
    }
    
    # Read data from PostgreSQL employees table
    employee_df = spark.read.jdbc(url=jdbc_url, table="employees", properties=jdbc_properties)
    
    # Define the path for the Delta table in ADLS
    delta_table_path = "abfss://your_container@your_storage_account.dfs.core.windows.net/delta/employees"
    
    # Write DataFrame to Delta table
    employee_df.write.format("delta").mode("overwrite").save(delta_table_path)
    
    delta_df = spark.read.format("delta").load(delta_table_path)
    delta_df.show()
    

     

  7. Now running the following code continuously will keep the data in delta lake in sync with the primary PostgreSQL database.
    import json
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import lit
    from delta.tables import DeltaTable
    import pandas as pd
    
    # PostgreSQL connection details
    jdbc_url = "jdbc:postgresql://your_postgres_db.postgres.database.azure.com:5432/postgres"
    jdbc_properties = {
        "user": "postgres",
        "driver": "org.postgresql.Driver"
    }
    
    #Delta table details
    delta_table_path = "abfss://your_container@your_storage_account.dfs.core.windows.net/delta/employees"
    delta_table = DeltaTable.forPath(spark, delta_table_path)
    delta_df = spark.read.format("delta").load(delta_table_path)
    schema = delta_df.schema
    
    loop
        cdc_df = spark.read.jdbc(url=jdbc_url, table="(SELECT data FROM pg_logical_slot_get_changes('cdc_slot', NULL, NULL, 'include-xids', 'true', 'include-timestamp', 'true')) as cdc", properties=jdbc_properties)
    
        cdc_array = cdc_df.collect()
    
        for i in cdc_array:
            print(i)
            changedData = json.loads(i['data'])['change'][0]
            print(changedData)
    
            schema = changedData['schema']
            table = changedData['table']
            DMLtype = changedData['kind']
            if DMLtype == "insert" or DMLtype == "update":
                column_names = changedData['columnnames']
                column_values = changedData['columnvalues']
                source_data = {col: [val] for col, val in zip(column_names, column_values)}
                print(source_data)
                change_df = spark.createDataFrame(pd.DataFrame(source_data))
    
            if DMLtype == "insert":
                change_df.write.format("delta").mode("append").save(delta_table_path)
    
            if DMLtype == "update":
                old_keys = changedData['oldkeys']
    
                condition = " AND ".join(
                [f"target.{key} = source.{key}" for key in old_keys['keynames']]
                )
                print(condition)
    
                delta_table.alias("target").merge(
                            change_df.alias("source"),
                            condition
                        ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
    
            if DMLtype == "delete":
                condition = " AND ".join([
                            f"{key} = '{value}'"
                            for key, value in zip(changedData["oldkeys"]["keynames"], changedData["oldkeys"]["keyvalues"])
                        ])
                delta_table.delete(condition)
    end loop​

Conclusion

In conclusion, building the Bronze layer of the Medallion Architecture using wal2json from PostgreSQL as the source to Fabric OneLake provides a robust and scalable approach for handling raw data ingestion. This setup leverages PostgreSQL's logical replication capabilities to capture and stream changes in real-time, ensuring that the data lake remains up-to-date with the latest transactional data.

Implementing this architecture ensures that the foundational layer is well-structured and becomes a solid layer for next layers while also supporting real-time analytics, advanced data processing and AI applications.

By adopting this strategy, organizations can achieve greater data consistency, reduce latency in data processing, and enhance the overall efficiency of their data pipelines.

 

References
https://learn.microsoft.com/en-us/fabric/onelake/onelake-medallion-lakehouse-architecture
https://learn.microsoft.com/en-us/azure/databricks/lakehouse/medallion
https://blog.fabric.microsoft.com/en-us/blog/eventhouse-onelake-availability-is-now-generally-available?ft=All
https://learn.microsoft.com/en-us/fabric/data-engineering/lakehouse-and-delta-tables


Feedback and suggestions

If you have feedback or suggestions for improving this data migration asset, please send an email to Database Platform Engineering Team.

 

Updated Dec 19, 2024
Version 3.0
No CommentsBe the first to comment