Forum Discussion

fretz's avatar
fretz
Icon for Microsoft rankMicrosoft
Feb 28, 2019

Azure Data Factory - Complex Java based ETL to Codeless Pipeline

Azure Data factory (ADF) is a fully managed data integration tool that helps to build, manage and orchestrate complex jobs. The brand new UX experience in ADF V2 is so intuitive & effortless in creating ADF’s pipelines, activities and other constructs. Anyone can easily pick up this tool and be fully productive in few days. From a feature set perspective, it has built-in connectors to 65+ data stores, works very well in a hybrid environment and has control flow elements to design even most complex ETL workflows. More information about ADF can be found in the link.

 

 

I wanted to call out the journey one of the customers had gone through with ADF adoption and how it modernized their ETL workflow and the best part- without writing a single line of code!

Their legacy data integration job was coded in Java, spanning more than 1000+ lines and scheduled as batch job in ETL virtual machine. Java code had many modules (like RestAPI calls, storing data to a Database, looping/lookup on, retry mechanism & some basic level of logging.

Team wanted to have a faster deployment with minimalistic code changes to their codebase. Even then I was confident ADF can easily find its own way to glory once tried. With almost no prior experience, customer was able to use Copy Data wizard to call RestAPI, transform data & and store to database. This was equivalent to hundreds of lines of code in java. And thereafter, customer was really impressed with the richness ADF provides in user experience and its powerful integration features.

In the end, entire Java code was effortlessly translated to a single ADF pipeline with zero coding and triggers on a pre-defined schedule & on new blob creation event. As an added advantage, ETL server hosting Java code were discarded, helping customer to reduce the cost. And special credits to Abhishek Narain – Program Manager for Azure Data Factory, who pitched in every time whenever we had small issues with ADF’s activities  (there is no better place than Microsoft when it comes to collaborative effort ).There were some good learnings & tricks that we learned, which might be helpful for other folks such as:

 

1) Mapping Event Trigger variables to Copy activity:

For those who do not know, Event driven architecture (EDA) is a common data integration pattern that involves production, detection, consumption and reaction to events. I had a little difficulty while trying to connect all pieces together for mapping event trigger variables for blob creation. Abhishek had explained what needs to be done and we thought it will be helpful for others also to know these steps.

Below is a snippet of ADF pipeline that gets triggered whenever a blob is uploaded to the container which is continuously polled by ADF.

 

 

At first, let us start with creating a few parameters for the Pipeline itself.  For a simple analogy, I would compare these to “global variables” in your programing code, which have global scope, i.e. available to all functions inside it. And functions can be considered to be ADF’s activities.

 

 

In the above parameters, most important ones are “pipeline_SourceFolderName” and “pipeline_SourceFileName” , which will be populated by ADF’s event triggers ( which is explained below).

Now let's go define input data set. In my use-cases, json file uploaded into a container is defined as input dataset. Trigger will be polling above container for new blob creation and subsequently triggers the ADF pipeline.

Under your input data set, you need to define 2 more parameters. These are only in the scope of this dataset and are not visible outside the scope. These are not the same as pipeline parameters which were defined above ( I have named it differently to avoid confusion).

 

 

 

The above 2 dataset parameters are then used to mention the File path in the connection tab of the dataset as shown below. These expressions can be picked by using a dynamic content pop-up.

 

 

 

Now let's go to the Pipeline Canvas and drag a Copy Activity. In this example I have named it “CopyDataToSQLDWH” since blobs (json file) are parsed and ingested into SQL DWH.  Configuration of source & sink are covered well in Copy Wizard tutorial, so I am not repeating it.However, the key piece here is to link input dataset’s parameters with pipeline’s parameters under the Source tab as show below

 

 

Just to recap, what has been mapped:

  • Pipeline parameter for Source Folder Name and File Name (basically the container name & blob name)
  • Defined Input Dataset’s parameter and mapped file path in its connection tab
  • In Pipeline Canvas, in Copy Activity’s Source tab, mapped input dataset’s parameters with pipeline’s parameters

At this stage, you have mapped parameters between pipeline & activities. And as the final stage, we will have to map pipeline parameters to trigger variables.

Click on the ‘Trigger’ button to create an Event Trigger and specify the blob name & folder path to poll for new blob creation.

 

 

 

And in the next window, the final step is to map Pipeline’s parameters to dynamic value generated by trigger which are

@triggerBody().folderPath : Returns the container path on which trigger is polling for new files

@triggerBody().fileName : Returns the file name of the blob which is picked for processing.

 

 

And that’s it. After successfully publishing ADF, whenever a .json file (I have given .json as file ending path) is uploaded , ADF is triggered automatically. It copies the JSON files to SQL DWH and if the copy activity is successful, then sends email by posting to a REST API; else sends error message.

 

2) Power of Parameter & Expression:

Expressions can appear anywhere in a JSON string value and always result in another JSON value. If a JSON value is an expression, the body of the expression is extracted by removing the at-sign (@). It gives so much freedom to make ADF pipeline modularized. It helps you to build dynamic content, minimize repetitions, minimal changes in code deployment, conditional execution and the list goes on. I have put some examples below:

 

Creating dynamic message content with runtime details

{

"message": "@{activity('CopyDataToSQLDWH').output.dataWritten}",

"dataFactoryName": "@{concat(pipeline().DataFactory,'- Success')}",

"pipelineName": "@{pipeline().Pipeline}",

"pipelineRunId": "@{pipeline().RunId}",

"receiver": "@pipeline().parameters.receiver"

}

{

"message": "@{activity('CopyDataToSQLDWH').error.message}",

"dataFactoryName": "@{concat(pipeline().DataFactory,'- Error')}",

"pipelineName": "@{pipeline().Pipeline}",

"pipelineRunId": "@{pipeline().RunId}",

"receiver": "@pipeline().parameters.receiver"

}

 

Parameterizing Key Vault to a Blob LinkedService

{

"name": "ArchiveBlob",

"properties": {

"linkedServiceName": {

"referenceName": "AzureBlobStorage",

"type": "LinkedServiceReference",

"parameters":{

"key_vault_for_archive_blob": {

"value": "@dataset().key_vault_for_archive_blob",

"type" : "Expression"

}}},}}

 

Deducing new blob filename  by concatenating Pipeline Runtime Id to blob name

@concat(pipeline().parameters.pipeline_SourceFileName,'-',pipeline().RunId)

 

 

No RepliesBe the first to reply

Resources