Introduction
The Workflow Orchestration Manager in Azure Data Factory streamlines setting up and managing Apache Airflow environments, enhancing your ability to execute scalable data pipelines efficiently. Apache Airflow, a robust open-source platform, allows for the programming, scheduling, and monitoring of intricate workflows by organizing tasks into data pipelines. This capability is highly valued in data engineering and data science for its adaptability and user-friendliness.
In this guide, I will walk you through a demonstration where we extract insights from GitHub data using the GitHub public API, and run custom operators in a private package within the Workflow Orchestration Manager in Azure Data Factory.
Prerequisites
- Tools and Technologies Needed:
- Azure data factory account
- knowledge in Apache Airflow
- knowledge in Python
- Initial Setup:
- ADF: create workflow orchestration manager
- Airflow (Optional): In this blog, I'm primarily focusing on running custom operators in Airflow. However, if you want to trigger Azure Data Factory (ADF) pipelines directly from Airflow, you'll need to establish a connection within the Airflow UI. This setup enables the triggering of ADF pipelines from Airflow, for more details click here.
Table of Contents:
- Designing Your Custom Package
- Create Custom Package
- Building Airflow DAG
- Run DAG in ADF Data orchestration manager
- Logs And Monitoring
- Links
- Call-To-Action
-
Step 1: Designing Your Custom Package
In this tutorial, I am utilizing the GitHub API and have written two Python operators: `GitHubAPIReaderOperator` and `CountLanguagesOperator`. These operators are designed to fetch data from GitHub repositories and count the programming languages used, respectively.from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults import requests import logging import re class GitHubAPIReaderOperator(BaseOperator): @apply_defaults def __init__(self, api_url, max_pages=20, token=None, *args, **kwargs): super(GitHubAPIReaderOperator, self).__init__(*args, **kwargs) self.api_url = api_url self.max_pages = max_pages self.token = token def execute(self, context): headers = {"Accept": "application/vnd.github.v3+json"} if self.token: headers["Authorization"] = f"Bearer {self.token}" session = requests.Session() session.headers.update(headers) next_url = self.api_url all_data = [] page_count = 0 while next_url and page_count < self.max_pages: response = session.get(next_url) response.raise_for_status() data = response.json() all_data.extend(data) next_url = self.get_next_link(response.headers.get('Link')) page_count += 1 return all_data def get_next_link(self, link_header): if link_header: links = link_header.split(',') next_link = [link for link in links if 'rel="next"' in link] if next_link: match = re.search(r'<(.*)>', next_link[0]) if match: return match.group(1) return None class CountLanguagesOperator(BaseOperator): @apply_defaults def __init__(self, api_url, token=None, *args, **kwargs): super(CountLanguagesOperator, self).__init__(*args, **kwargs) self.api_url = api_url self.token = token def execute(self, context): repos = context['task_instance'].xcom_pull(task_ids='fetch_github_data') headers = {"Accept": "application/vnd.github.v3+json"} if self.token: headers["Authorization"] = f"Bearer {self.token}" session = requests.Session() session.headers.update(headers) language_counts = {} for repo in repos: languages_url = repo.get('languages_url') if not languages_url: continue # Skip repos without a languages URL try: response = session.get(languages_url) response.raise_for_status() languages_data = response.json() for language in languages_data.keys(): if language in language_counts: language_counts[language] += 1 else: language_counts[language] = 1 except requests.exceptions.HTTPError as error: if error.response.status_code == 403: logging.warning(f"Skipping repository due to HTTP 403 Forbidden: {languages_url}") continue else: raise # Output the results for lang, count in language_counts.items(): logging.info(f"{lang} repositories count: {count}") return language_counts
Please check API's documentation and limitations.
Step 2: Create the Custom Package
Follow steps below to create wheel package in Python- you have to have folder hierarchy:
- in the setup file add the package folder name like so:
from setuptools import setup, find_packages setup( name="custom_operators", version="0.1.0", package_dir={"": "src"}, packages=find_packages(where="src"), install_requires=[ # List your dependencies here, e.g., 'numpy', 'pandas' ], classifiers=[ "Programming Language :: Python :: 3", "License :: OSI Approved :: MIT License", "Operating System :: OS Independent", ], python_requires='>=3.6', )
- in CMD, run this command to create the wheel package:
pip install setuptools wheel
python setup.py sdist bdist_wheel
This command will create a source distribution and a wheel for your package. The wheel file (
.whl
) will be stored in a newly createddist/
directory under custom_operators folder.
Step 3: Building Airflow DAG
Now that we have build our custom operators and created the wheel package, now we need to create a dag that will trigger these operators.
for that i created 2 tasks, fetch_github_data and count_languages.
each will call the operators abovefrom airflow import DAG from datetime import datetime, timedelta from custom_operators.github_operators import GitHubAPIReaderOperator,CountLanguagesOperator default_args = { 'owner': 'airflow', 'start_date': datetime(2023, 1, 1), 'depends_on_past': False, 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } dag = DAG( 'github_language_analysis', default_args=default_args, description='Analyze GitHub repos for language usage', schedule_interval=timedelta(days=1), ) fetch_github_data = GitHubAPIReaderOperator( task_id='fetch_github_data', api_url='https://api.github.com/repositories', max_pages=10, token='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', # Replace with your actual token dag=dag ) count_languages = CountLanguagesOperator( task_id='count_languages', api_url='https://api.github.com', token='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', # Replace with your actual token dag=dag ) fetch_github_data >> count_languages
Step 4: Run DAG in ADF Data orchestration manager
Now, we built our DAG and our custom package.
in order to run it in ADF.1. Create managed Airflow instance in ADF following MS docs.
2. in ADLS workspace, create the folder hierarchy as the following:
In the requirements file, include the path to the custom package stored in your ADLS storage account as follows:/opt/airflow/dags/custom_operators-0.1.0-py3-none-any.whl
3. In the ADF workspace, click on "Import files." Navigate to your ADLS storage account, locate the "Airflow" folder, and check the "Import requirements" checkbox.
it will take a few minutes till ADF orchestration manager will update the code and the custom package.
Step 5: Logs and MonitoringAfter importing the files, click on the "Monitor" button in the Data Orchestration Manager to view task execution and export Airflow logs. This will open the Airflow UI.
DAG:
Logs in count_languages task :
P.S: For more dynamic work, you can save the languages count as a JSON file and store it in your storage account.
Links:
Install a Private package - Azure Data Factory | Microsoft Learn
How does Workflow Orchestration Manager work? - Azure Data Factory | Microsoft Learnairflow.operators.python — Airflow Documentation (apache.org)
airflow.providers.microsoft.azure — apache-airflow-providers-microsoft-azure DocumentationCall to Action:
- Make sure to establish all connections before starting to work on managed airflow.
- check MS documentation on Workflow Orchestration Manager.
- Please help us improve by sharing your valuable Workflow Orchestration Manager Preview feedback by emailing us at ManagedAirflow@microsoft.com
- Follow me on LinkedIn: Sally Dabbah | LinkedIn - you have to have folder hierarchy: