Introduction:
ADF has proven to be a reliable service for orchestrating pipelines, however it does have its limitations. Fortunately, there's no need to worry. The advent of managed airflow brings a promising solution, empowering us to overcome these limitations through the power of coding.
In this blog, I built a small demo explaining how we can orchestrate the orchestrater - how can we dynamically run pipelines in ADF with managed airflow.
we will call the pipelines randomly using random library in python.
Prerequisites:
Basic knowledge in Azure Data Factory and Azure Data Lake.
Workspace in Azure Data Factory and Storage account in Azure Data Lake.
Part 1: Prepare Data for Managed Airflow and for ADF pipelines.
- in this tutorial i used this metadata, saved it into data lake and connected it as a dataset in ADF, what matters the most is the grade attribute for each student because we want to sum it and know its average.
[
{
"id": 1,
"name": "Leanne Graham",
"username": "Bret",
"grade": 100,
"email": "Sincere@april.biz",
"address": {
"street": "Kulas Light",
"suite": "Apt. 556",
"city": "Gwenborough",
"zipcode": "92998-3874",
"geo": {
"lat": "-37.3159",
"lng": "81.1496"
}
},
"phone": "1-770-736-8031 x56442",
"website": "hildegard.org",
"company": {
"name": "Romaguera-Crona",
"catchPhrase": "Multi-layered client-server neural-net",
"bs": "harness real-time e-markets"
}
},
{
"id": 2,
"name": "Ervin Howell",
"username": "Antonette",
"grade": 66,
"email": "Shanna@melissa.tv",
"address": {
"street": "Victor Plains",
"suite": "Suite 879",
"city": "Wisokyburgh",
"zipcode": "90566-7771",
"geo": {
"lat": "-43.9509",
"lng": "-34.4618"
}
},
"phone": "010-692-6593 x09125",
"website": "anastasia.net",
"company": {
"name": "Deckow-Crist",
"catchPhrase": "Proactive didactic contingency",
"bs": "synergize scalable supply-chains"
}
},
{
"id": 3,
"name": "Clementine Bauch",
"username": "Samantha",
"grade": 12,
"email": "Nathan@yesenia.net",
"address": {
"street": "Douglas Extension",
"suite": "Suite 847",
"city": "McKenziehaven",
"zipcode": "59590-4157",
"geo": {
"lat": "-68.6102",
"lng": "-47.0653"
}
},
"phone": "1-463-123-4447",
"website": "ramiro.info",
"company": {
"name": "Romaguera-Jacobson",
"catchPhrase": "Face to face bifurcated interface",
"bs": "e-enable strategic applications"
}
},
{
"id": 4,
"name": "Patricia Lebsack",
"username": "Karianne",
"grade": 77,
"email": "Julianne.OConner@kory.org",
"address": {
"street": "Hoeger Mall",
"suite": "Apt. 692",
"city": "South Elvis",
"zipcode": "53919-4257",
"geo": {
"lat": "29.4572",
"lng": "-164.2990"
}
},
"phone": "493-170-9623 x156",
"website": "kale.biz",
"company": {
"name": "Robel-Corkery",
"catchPhrase": "Multi-tiered zero tolerance productivity",
"bs": "transition cutting-edge web services"
}
},
{
"id": 5,
"name": "Chelsey Dietrich",
"username": "Kamren",
"grade": 88,
"email": "Lucio_Hettinger@annie.ca",
"address": {
"street": "Skiles Walks",
"suite": "Suite 351",
"city": "Roscoeview",
"zipcode": "33263",
"geo": {
"lat": "-31.8129",
"lng": "62.5342"
}
},
"phone": "(254)954-1289",
"website": "demarco.info",
"company": {
"name": "Keebler LLC",
"catchPhrase": "User-centric fault-tolerant solution",
"bs": "revolutionize end-to-end systems"
}
},
{
"id": 6,
"name": "Mrs. Dennis Schulist",
"username": "Leopoldo_Corkery",
"grade": 71,
"email": "Karley_Dach@jasper.info",
"address": {
"street": "Norberto Crossing",
"suite": "Apt. 950",
"city": "South Christy",
"zipcode": "23505-1337",
"geo": {
"lat": "-71.4197",
"lng": "71.7478"
}
},
"phone": "1-477-935-8478 x6430",
"website": "ola.org",
"company": {
"name": "Considine-Lockman",
"catchPhrase": "Synchronised bottom-line interface",
"bs": "e-enable innovative applications"
}
},
{
"id": 7,
"name": "Kurtis Weissnat",
"username": "Elwyn.Skiles",
"grade": 82,
"email": "Telly.Hoeger@billy.biz",
"address": {
"street": "Rex Trail",
"suite": "Suite 280",
"city": "Howemouth",
"zipcode": "58804-1099",
"geo": {
"lat": "24.8918",
"lng": "21.8984"
}
},
"phone": "210.067.6132",
"website": "elvis.io",
"company": {
"name": "Johns Group",
"catchPhrase": "Configurable multimedia task-force",
"bs": "generate enterprise e-tailers"
}
},
{
"id": 8,
"name": "Nicholas Runolfsdottir V",
"username": "Maxime_Nienow",
"grade": 40,
"email": "Sherwood@rosamond.me",
"address": {
"street": "Ellsworth Summit",
"suite": "Suite 729",
"city": "Aliyaview",
"zipcode": "45169",
"geo": {
"lat": "-14.3990",
"lng": "-120.7677"
}
},
"phone": "586.493.6943 x140",
"website": "jacynthe.com",
"company": {
"name": "Abernathy Group",
"catchPhrase": "Implemented secondary concept",
"bs": "e-enable extensible e-tailers"
}
},
{
"id": 9,
"name": "Glenna Reichert",
"username": "Delphine",
"grade": 33,
"email": "Chaim_McDermott@dana.io",
"address": {
"street": "Dayna Park",
"suite": "Suite 449",
"city": "Bartholomebury",
"zipcode": "76495-3109",
"geo": {
"lat": "24.6463",
"lng": "-168.8889"
}
},
"phone": "(775)976-6794 x41206",
"website": "conrad.com",
"company": {
"name": "Yost and Sons",
"catchPhrase": "Switchable contextually-based project",
"bs": "aggregate real-time technologies"
}
},
{
"id": 10,
"name": "Clementina DuBuque",
"username": "Moriah.Stanton",
"grade": 90,
"email": "Rey.Padberg@karina.biz",
"address": {
"street": "Kattie Turnpike",
"suite": "Suite 198",
"city": "Lebsackbury",
"zipcode": "31428-2261",
"geo": {
"lat": "-38.2386",
"lng": "57.2232"
}
},
"phone": "024-648-3804",
"website": "ambrose.net",
"company": {
"name": "Hoeger LLC",
"catchPhrase": "Centralized empowering task-force",
"bs": "target end-to-end models"
}
}
]
- data lake:
in order to create managed airflow instance, we need to have in data lake a folder called airflow, in this folder we should have 2 sub folders with names:
plugin and dags like so:
check more details here in Microsoft documentation:
How does Managed Airflow work? - Azure Data Factory | Microsoft Learn
Part 2: Creating Managed Airflow Instance and establish a connection to ADF.
- create a managed airflow instance following Microsoft documentation:
How does Managed Airflow work? - Azure Data Factory | Microsoft Learn
in my tutorial i used basic auth when i creating the managed airflow instance
- after creating the managed airflow instance, in ADF UI : click on manage tab -> Apache airflow->in airflow instance click on monitor button, this should open to you the airflow UI.
in airflow UI:
click on Admin tab -> Connection tab ,this should open to you a list of possible connections in airflow.
edit the connection in Azure Data Factory and start filling your details
PS: in order to work, you need to specify the client id of your ADF workspace and password retrieved from Azure Active Directory
to get client id and client secret:
in AAD-> click on manage tab on the left side -> app registrations -> search for your ADF app -> click on it -> in overview tab you copy your credentials
After filling your information, click on save and you should get a success message.
Part 3: Build pipelines in ADF
In ADF, i created 2 pipelines, we will call them dynamically from airflow.
First Pipeline:
sum of grades for all students in my metadata.
pipeline name: pipelineSumGrades
in the pipeline tab -> add dataflow activity
source activity ->add your dataset mentioned above, make sure to mark in source options -> Json settings -> array of documents so the array will be parsed correctly.
select activity ->select only grade column from metadata
aggregate activity -> add in aggregate function a new column naming gradeSum and in expression explorer add sum function like so:
sink activity -> saved my output as csv file into data lake.
Second Pipeline:
pipeline name: pipelinAvgGrades
create a pipeline with dataflow activity, in dataflow:
same steps as previous pipeline, in aggregate activity i added 2 new columns, one with count and with sum like so :
in derived column activity i calculated the average by dividing gradesum/count
Part 4: Build DAG in airflow
following Microsoft's tutorial, we need to create adf.py file and save it in data lake under DAG folder, after we do that, we import the DAG into our managed instance in ADF.
Dag code:
from datetime import datetime, timedelta
from textwrap import dedent
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
from airflow.models.dag import DAG
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
import json
from datetime import datetime, timedelta
from datetime import datetime, timedelta
from airflow.models import DAG, BaseOperator
from random import random
try:
from airflow.operators.empty import EmptyOperator
except ModuleNotFoundError:
from airflow.operators.dummy import DummyOperator as EmptyOperator # type: ignore
from airflow.providers.microsoft.azure.operators.data_factory import AzureDataFactoryRunPipelineOperator
from airflow.providers.microsoft.azure.sensors.data_factory import AzureDataFactoryPipelineRunStatusSensor
from airflow.utils.edgemodifier import Label
from airflow.utils.trigger_rule import TriggerRule
import numpy as np
import random
import logging
def getTaskName():
task_name = random.choice(["pipelineSumGrades", "pipelinAvgGrades"])
logging.info("task name to be executed: "+ task_name)
return task_name
with DAG(
dag_id="simple_calculator_pipeline",
start_date=datetime(2022, 5, 14),
schedule_interval="@daily",
catchup=False,
default_args={
"retries": 1,
"retry_delay": timedelta(minutes=3),
"azure_data_factory_conn_id": "xxxxx", #This is a connection created on Airflow UI
"factory_name": "xxxxxx", # This can also be specified in the ADF connection.
"resource_group_name": "xxxxxx", # This can also be specified in the ADF connection.
},
default_view="graph",
) as dag:
begin = EmptyOperator(task_id="begin")
# its important to set trigger rule to all done since the default is all success and only one task will execute because of branching.
end = EmptyOperator(task_id="end",trigger_rule=TriggerRule.ALL_DONE)
run_avg_pipeline: BaseOperator = AzureDataFactoryRunPipelineOperator(
task_id="pipelinAvgGrades",
pipeline_name="pipelinAvgGrades"
)
run_sum_pipeline: BaseOperator = AzureDataFactoryRunPipelineOperator(
task_id="pipelineSumGrades",
pipeline_name="pipelineSumGrades"
)
branch_task = BranchPythonOperator(
task_id='choose_task_randomly',
python_callable=getTaskName,
trigger_rule=TriggerRule.ALL_DONE)
begin >> branch_task >> [run_avg_pipeline ,run_sum_pipeline] >> end
in this code, I'm randomly selecting task name, after we import the DAG as mentioned above, click on monitor tab in the managed airflow instance in ADF, this should open the airflow UI like so:
in this case pipelineSumGrades was chosen in the random function and pipelineAvgGrades is being skipped.
Conclusion:
- Managed Airflow offers several significant benefits when integrated with ADF (Azure Data Factory), enhancing the capabilities of pipeline orchestration
For me as a developer, the most significant advantage of airflow in this case is that it gives us flexibility and customization easily for our pipelines in ADF.
Links:
How does Managed Airflow work? - Azure Data Factory | Microsoft Learn
airflow.operators.python — Airflow Documentation (apache.org)
airflow.providers.microsoft.azure — apache-airflow-providers-microsoft-azure Documentation
Call to Action:
- Make sure to establish all connections before starting to work on managed airflow.
- check MS documentation on managed airflow.
- Please help us improve by sharing your valuable Managed Airflow Preview feedback by emailing us at ManagedAirflow@microsoft.com
- Follow me on LinkedIn: Sally Dabbah | LinkedIn
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.