The purpose of this blog post is to share an architectural pattern which I worked on with a customer team to process Azure Event Hubs data using Azure Data Factory Mapping Data Flows. Its more common to process Azure Event Hubs Streams using one of the Stream processing services like Azure Stream Analytics, Azure Functions or Apache Spark with Azure Databricks but using Azure Data Factory in more of a batch fashion is a perfectly valid pattern for certain use cases so thought it would a good idea to document for the benefit broader community.
I will share my thoughts on what use cases would be a good fit for this architectural pattern as well as point out a few technical details (which are often overlooked) for implementing this pattern correctly.
Note: I am using the name Azure Data Factory in this blog post but it is applicable to Synapse Pipelines as well.
The following diagram shows the reference architecture and two main things to highlight in the architecture:
The basic premise is that it might make sense for producers to write to Azure Event Hubs but not every consumer needs to consume that data in near-real time. Following are some of the reasons you might choose to adopt this pattern over others:
There could be many different factors to decide the scheduled frequency for Azure Data Factory and it can be run every few minutes but my usual guidance would be to use this pattern when you can afford to run every hour or every few hours, maybe even every 15 minutes but not less than that.
The following diagram shows the sample Mapping Data Flow I have implemented where source is ADLS Gen2 with data from Azure Event Hubs Capture and there are two sinks for processed data - Synapse SQL Dedicated Pool and Parquet in ADLS Gen2. The reason to have two sinks is to demonstrate some important concepts often overlooked but are essential for a correct implementation.
Summary of the steps executed in the Data Flow:
With overview of the Data Flow implementation out of the way let's get into some details.
Identifying what is new since last execution of Pipeline and needs to be processed
When batch processes are run on schedule there needs to be a way to identify what is new and needs to be processed since last execution of the processing pipeline. Mapping Data Flows make this really easy for ADLS Gen2 Storage Connector, there are two methods for this in the Source Settings:
1. After Completion Setting - If you prefer to move the processed files to archive folder you can select Move for the setting (deletion is also an option). In this method Azure Event Hub landing folder in Storage serves as a temporary staging area for ADF Pipeline to pick new files for processing
2. Change Data Capture - If you don't care about the processed files to be deleted or moved you can enable Change Data Capture setting and in this case pointer to what's new for processing is automatically handled by service.
In my sample implementation (1) above was used and the relevant settings for Source are shown in the screenshots below.
Duplicate and Upsert Handling
Azure Event Hubs at-least once delivery characteristic is very often overlooked, in simpler worlds this means Azure Event Hub Capture might persist same event (or message) to ADLS Gen2 multiple times hence leading to duplicates in the final sink if the processing logic doesn't handle the scenario appropriately. I have run into situations in the past where such issues get bubbled up as bugs with Azure Event Hubs but that's not the case at all as it is clearly documented and expected behavior for the ingestion service. It is the responsibility of the processing engine to have an idempotent implementation to handle duplicates appropriately. The sample Data Flow used for reference in this blog post has two sinks - one for Synapse SQL and the other one for ADLS Gen2 (parquet files). The additional transformations on the Synapse SQL sink path ensures that there are no duplicates in the sink but the ADLS Gen2 sink has duplicates. You can read more on the Idempotency and at-least once delivery characteristics on the following article - Resilient design guidance for Event Hubs and Functions - Azure Architecture Center | Microsoft Learn (it refers to Azure Functions but the same concepts carry over).
Mapping Data Flows makes it pretty easy to remove duplicates using Aggregate Transformation provided there is a key column in the data which can be used as identifier for records.
Lastly, it would also be a good idea to have Upsert implemented for the loads to Azure Synapse SQL Pool in case same record is processed again or maybe update records are also pushed through Azure Event Hubs. The AlterRow Transformation activity can be used with Synapse SQL Pool to enable Upserts provided a key column is available in the data to identify rows. The implementation requires AlterRow transformation is added before Synapse SQL Sink. The following screenshots show the settings for both AlterRow and Synapse SQL Sink.
Note: Enabling the Staging is another recommended setting for optimizing for performance of loads to Synapse SQL Pool - Sink performance and best practices in mapping data flow - Azure Data Factory & Azure Synapse | Micr...
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.