Mastering the Art: Orchestrating ADF with the Power of Managed Airflow
Published Jul 21 2023 02:47 AM 2,638 Views
Microsoft

Introduction:

ADF has proven to be a reliable service for orchestrating pipelines, however it does have its limitations. Fortunately, there's no need to worry. The advent of managed airflow brings a promising solution, empowering us to overcome these limitations through the power of coding.
In this blog, I built a small demo explaining how we can orchestrate the orchestrater - how can we dynamically run pipelines in ADF with managed airflow.

we will call the pipelines randomly using random library in python.

Prerequisites:

  1. Basic knowledge in Azure Data Factory and Azure Data Lake.

  2. Workspace in Azure Data Factory and Storage account in Azure Data Lake.

  3. Basic knowledge in Apache Airflow DAGs.

Part 1: Prepare Data for Managed Airflow and for ADF pipelines.
- in this tutorial i used this metadata, saved it into data lake and connected it as a dataset in ADF, what matters the most is the grade attribute for each student because we want to sum it and know its average.

 

 

[
  {
    "id": 1,
    "name": "Leanne Graham",
    "username": "Bret",
    "grade": 100,
    "email": "Sincere@april.biz",
    "address": {
      "street": "Kulas Light",
      "suite": "Apt. 556",
      "city": "Gwenborough",
      "zipcode": "92998-3874",
      "geo": {
        "lat": "-37.3159",
        "lng": "81.1496"
      }
    },
    "phone": "1-770-736-8031 x56442",
    "website": "hildegard.org",
    "company": {
      "name": "Romaguera-Crona",
      "catchPhrase": "Multi-layered client-server neural-net",
      "bs": "harness real-time e-markets"
    }
  },
  {
    "id": 2,
    "name": "Ervin Howell",
    "username": "Antonette",
    "grade": 66,
    "email": "Shanna@melissa.tv",
    "address": {
      "street": "Victor Plains",
      "suite": "Suite 879",
      "city": "Wisokyburgh",
      "zipcode": "90566-7771",
      "geo": {
        "lat": "-43.9509",
        "lng": "-34.4618"
      }
    },
    "phone": "010-692-6593 x09125",
    "website": "anastasia.net",
    "company": {
      "name": "Deckow-Crist",
      "catchPhrase": "Proactive didactic contingency",
      "bs": "synergize scalable supply-chains"
    }
  },
  {
    "id": 3,
    "name": "Clementine Bauch",
    "username": "Samantha",
    "grade": 12,
    "email": "Nathan@yesenia.net",
    "address": {
      "street": "Douglas Extension",
      "suite": "Suite 847",
      "city": "McKenziehaven",
      "zipcode": "59590-4157",
      "geo": {
        "lat": "-68.6102",
        "lng": "-47.0653"
      }
    },
    "phone": "1-463-123-4447",
    "website": "ramiro.info",
    "company": {
      "name": "Romaguera-Jacobson",
      "catchPhrase": "Face to face bifurcated interface",
      "bs": "e-enable strategic applications"
    }
  },
  {
    "id": 4,
    "name": "Patricia Lebsack",
    "username": "Karianne",
    "grade": 77,
    "email": "Julianne.OConner@kory.org",
    "address": {
      "street": "Hoeger Mall",
      "suite": "Apt. 692",
      "city": "South Elvis",
      "zipcode": "53919-4257",
      "geo": {
        "lat": "29.4572",
        "lng": "-164.2990"
      }
    },
    "phone": "493-170-9623 x156",
    "website": "kale.biz",
    "company": {
      "name": "Robel-Corkery",
      "catchPhrase": "Multi-tiered zero tolerance productivity",
      "bs": "transition cutting-edge web services"
    }
  },
  {
    "id": 5,
    "name": "Chelsey Dietrich",
    "username": "Kamren",
    "grade": 88,
    "email": "Lucio_Hettinger@annie.ca",
    "address": {
      "street": "Skiles Walks",
      "suite": "Suite 351",
      "city": "Roscoeview",
      "zipcode": "33263",
      "geo": {
        "lat": "-31.8129",
        "lng": "62.5342"
      }
    },
    "phone": "(254)954-1289",
    "website": "demarco.info",
    "company": {
      "name": "Keebler LLC",
      "catchPhrase": "User-centric fault-tolerant solution",
      "bs": "revolutionize end-to-end systems"
    }
  },
  {
    "id": 6,
    "name": "Mrs. Dennis Schulist",
    "username": "Leopoldo_Corkery",
    "grade": 71,
    "email": "Karley_Dach@jasper.info",
    "address": {
      "street": "Norberto Crossing",
      "suite": "Apt. 950",
      "city": "South Christy",
      "zipcode": "23505-1337",
      "geo": {
        "lat": "-71.4197",
        "lng": "71.7478"
      }
    },
    "phone": "1-477-935-8478 x6430",
    "website": "ola.org",
    "company": {
      "name": "Considine-Lockman",
      "catchPhrase": "Synchronised bottom-line interface",
      "bs": "e-enable innovative applications"
    }
  },
  {
    "id": 7,
    "name": "Kurtis Weissnat",
    "username": "Elwyn.Skiles",
    "grade": 82,
    "email": "Telly.Hoeger@billy.biz",
    "address": {
      "street": "Rex Trail",
      "suite": "Suite 280",
      "city": "Howemouth",
      "zipcode": "58804-1099",
      "geo": {
        "lat": "24.8918",
        "lng": "21.8984"
      }
    },
    "phone": "210.067.6132",
    "website": "elvis.io",
    "company": {
      "name": "Johns Group",
      "catchPhrase": "Configurable multimedia task-force",
      "bs": "generate enterprise e-tailers"
    }
  },
  {
    "id": 8,
    "name": "Nicholas Runolfsdottir V",
    "username": "Maxime_Nienow",
    "grade": 40,
    "email": "Sherwood@rosamond.me",
    "address": {
      "street": "Ellsworth Summit",
      "suite": "Suite 729",
      "city": "Aliyaview",
      "zipcode": "45169",
      "geo": {
        "lat": "-14.3990",
        "lng": "-120.7677"
      }
    },
    "phone": "586.493.6943 x140",
    "website": "jacynthe.com",
    "company": {
      "name": "Abernathy Group",
      "catchPhrase": "Implemented secondary concept",
      "bs": "e-enable extensible e-tailers"
    }
  },
  {
    "id": 9,
    "name": "Glenna Reichert",
    "username": "Delphine",
    "grade": 33,
    "email": "Chaim_McDermott@dana.io",
    "address": {
      "street": "Dayna Park",
      "suite": "Suite 449",
      "city": "Bartholomebury",
      "zipcode": "76495-3109",
      "geo": {
        "lat": "24.6463",
        "lng": "-168.8889"
      }
    },
    "phone": "(775)976-6794 x41206",
    "website": "conrad.com",
    "company": {
      "name": "Yost and Sons",
      "catchPhrase": "Switchable contextually-based project",
      "bs": "aggregate real-time technologies"
    }
  },
  {
    "id": 10,
    "name": "Clementina DuBuque",
    "username": "Moriah.Stanton",
    "grade": 90,
    "email": "Rey.Padberg@karina.biz",
    "address": {
      "street": "Kattie Turnpike",
      "suite": "Suite 198",
      "city": "Lebsackbury",
      "zipcode": "31428-2261",
      "geo": {
        "lat": "-38.2386",
        "lng": "57.2232"
      }
    },
    "phone": "024-648-3804",
    "website": "ambrose.net",
    "company": {
      "name": "Hoeger LLC",
      "catchPhrase": "Centralized empowering task-force",
      "bs": "target end-to-end models"
    }
  }
]

 

 

- data lake:
in order to create managed airflow instance, we need to have in data lake a folder called airflow, in this folder we should have 2 sub folders with names:
plugin and dags like so:

Sally_Dabbah_0-1689914046302.png

check more details here in Microsoft documentation:
How does Managed Airflow work? - Azure Data Factory | Microsoft Learn

 

Part 2: Creating Managed Airflow Instance and establish a connection to ADF.
- create a managed airflow instance following Microsoft documentation:
How does Managed Airflow work? - Azure Data Factory | Microsoft Learn 
in my tutorial i used basic auth when i creating the managed airflow instance
- after creating the managed airflow instance, in ADF UI : click on manage tab -> Apache airflow->in airflow instance click on monitor button, this should open to you the airflow UI.
in airflow UI:
click on Admin tab -> Connection tab ,this should open to you a list of possible connections in airflow. 
edit the connection in Azure Data Factory and start filling your details
PS: in order to work, you need to specify the client id of your ADF workspace and password retrieved from Azure Active Directory 
to get client id and client secret:
in AAD-> click on manage tab on the left side -> app registrations -> search for your ADF app -> click on it -> in overview tab you copy your credentials

 

Sally_Dabbah_1-1689914794963.png

 

After filling your information, click on save and you should get a success message. 

Sally_Dabbah_0-1689914907051.png

 

Part 3: Build pipelines in ADF
In ADF, i created 2 pipelines, we will call them dynamically from airflow. 
First Pipeline:
sum of grades for all students in my metadata. 
pipeline name: pipelineSumGrades
in the pipeline tab -> add dataflow activity

Sally_Dabbah_0-1689915223482.png

 


source activity ->add your dataset mentioned above, make sure to mark in source options -> Json settings -> array of documents so the array will be parsed correctly. 

select activity ->select only grade column from metadata 

aggregate activity -> add in aggregate function a new column naming gradeSum and in expression explorer add sum function like so:

Sally_Dabbah_1-1689915339253.png

 

sink activity -> saved my output as csv file into data lake. 

Second Pipeline:

pipeline name: pipelinAvgGrades

create a pipeline with dataflow activity, in dataflow:

Sally_Dabbah_5-1689915743293.png

 

same steps as previous pipeline, in aggregate activity i added 2 new columns, one with count and with sum like so :

Sally_Dabbah_3-1689915566616.png

 

in derived column activity i calculated the average by dividing gradesum/count

Sally_Dabbah_4-1689915643626.png

 

Part 4: Build DAG in airflow
following Microsoft's tutorial, we need to create adf.py file and save it in data lake under DAG folder, after we do that, we import the DAG into our managed instance in ADF. 

Sally_Dabbah_0-1689915968904.png

 

Dag code:

 

from datetime import datetime, timedelta
from textwrap import dedent
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
from airflow.models.dag import DAG
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
import json
from datetime import datetime, timedelta

from datetime import datetime, timedelta

from airflow.models import DAG, BaseOperator
from random import random
try:
    from airflow.operators.empty import EmptyOperator
except ModuleNotFoundError:
    from airflow.operators.dummy import DummyOperator as EmptyOperator  # type: ignore
from airflow.providers.microsoft.azure.operators.data_factory import AzureDataFactoryRunPipelineOperator
from airflow.providers.microsoft.azure.sensors.data_factory import AzureDataFactoryPipelineRunStatusSensor
from airflow.utils.edgemodifier import Label
from airflow.utils.trigger_rule import TriggerRule
import numpy as np
import random
import logging


def getTaskName():
    task_name = random.choice(["pipelineSumGrades", "pipelinAvgGrades"])
    logging.info("task name to be executed: "+ task_name)
    return task_name

with DAG(
    dag_id="simple_calculator_pipeline",
    start_date=datetime(2022, 5, 14),
    schedule_interval="@daily",
    catchup=False,
    default_args={
        "retries": 1,
        "retry_delay": timedelta(minutes=3),
        "azure_data_factory_conn_id": "xxxxx", #This is a connection created on Airflow UI
        "factory_name": "xxxxxx",  # This can also be specified in the ADF connection.
        "resource_group_name": "xxxxxx",  # This can also be specified in the ADF connection.
    },
    default_view="graph",
) as dag:
    begin = EmptyOperator(task_id="begin")
# its important to set trigger rule to all done since the default is all success and only one task will execute because of branching.
    end = EmptyOperator(task_id="end",trigger_rule=TriggerRule.ALL_DONE)
    run_avg_pipeline: BaseOperator = AzureDataFactoryRunPipelineOperator(
        task_id="pipelinAvgGrades",
        pipeline_name="pipelinAvgGrades"
    )
    run_sum_pipeline: BaseOperator = AzureDataFactoryRunPipelineOperator(
        task_id="pipelineSumGrades",
        pipeline_name="pipelineSumGrades"
    )
    branch_task = BranchPythonOperator(
        task_id='choose_task_randomly',
        python_callable=getTaskName,
        trigger_rule=TriggerRule.ALL_DONE)

    begin >> branch_task >> [run_avg_pipeline ,run_sum_pipeline] >> end

 

 

 

in this code, I'm randomly selecting task name, after we import the DAG as mentioned above, click on monitor tab in the managed airflow instance in ADF, this should open the airflow UI like so:

Sally_Dabbah_1-1689916313364.png

in this case pipelineSumGrades was chosen in the random function and pipelineAvgGrades is being skipped.

 

Conclusion:
- Managed Airflow offers several significant benefits when integrated with ADF (Azure Data Factory), enhancing the capabilities of pipeline orchestration
For me as a developer, the most significant advantage of airflow in this case is that it gives us flexibility and customization easily for our pipelines in ADF.

Links:
How does Managed Airflow work? - Azure Data Factory | Microsoft Learn 

airflow.operators.python — Airflow Documentation (apache.org) 
airflow.providers.microsoft.azure — apache-airflow-providers-microsoft-azure Documentation 

Call to Action:
- Make sure to establish all connections before starting to work on managed airflow.
- check MS documentation on managed airflow.
- Please help us improve by sharing your valuable Managed Airflow Preview feedback by emailing us at ManagedAirflow@microsoft.com
- Follow me on LinkedIn: Sally Dabbah | LinkedIn

Co-Authors
Version history
Last update:
‎Jul 21 2023 02:46 AM
Updated by: