Blog Post

FastTrack for Azure
5 MIN READ

Step-by-Step Guide: Building and Integrating Custom Package in ADF Workflow Orchestration Manager

Sally_Dabbah's avatar
Sally_Dabbah
Icon for Microsoft rankMicrosoft
May 17, 2024

Introduction

 

The Workflow Orchestration Manager in Azure Data Factory streamlines setting up and managing Apache Airflow environments, enhancing your ability to execute scalable data pipelines efficiently. Apache Airflow, a robust open-source platform, allows for the programming, scheduling, and monitoring of intricate workflows by organizing tasks into data pipelines. This capability is highly valued in data engineering and data science for its adaptability and user-friendliness.

 

In this guide, I will walk you through a demonstration where we extract insights from GitHub data using the GitHub public API, and run custom operators in a private package within the Workflow Orchestration Manager in Azure Data Factory.

 

 


Prerequisites
- Tools and Technologies Needed:

  • Azure data factory account 
  • knowledge in Apache Airflow 
  • knowledge in Python

- Initial Setup

  •  ADF: create workflow orchestration manager 
  • Airflow (Optional): In this blog, I'm primarily focusing on running custom operators in Airflow. However, if you want to trigger Azure Data Factory (ADF) pipelines directly from Airflow, you'll need to establish a connection within the Airflow UI. This setup enables the triggering of ADF pipelines from Airflow, for more details click here


Table of Contents:

 

 

  • Step 1: Designing Your Custom Package
    In this tutorial, I am utilizing the GitHub API and have written two Python operators: `GitHubAPIReaderOperator` and `CountLanguagesOperator`. These operators are designed to fetch data from GitHub repositories and count the programming languages used, respectively.

     

    from airflow.models import BaseOperator
    from airflow.utils.decorators import apply_defaults
    import requests
    import logging
    import re
    
    class GitHubAPIReaderOperator(BaseOperator):
        @apply_defaults
        def __init__(self, api_url, max_pages=20, token=None, *args, **kwargs):
            super(GitHubAPIReaderOperator, self).__init__(*args, **kwargs)
            self.api_url = api_url
            self.max_pages = max_pages
            self.token = token
    
        def execute(self, context):
            headers = {"Accept": "application/vnd.github.v3+json"}
            if self.token:
                headers["Authorization"] = f"Bearer {self.token}"
    
            session = requests.Session()
            session.headers.update(headers)
    
            next_url = self.api_url
            all_data = []
            page_count = 0
    
            while next_url and page_count < self.max_pages:
                response = session.get(next_url)
                response.raise_for_status()
                data = response.json()
                all_data.extend(data)
    
                next_url = self.get_next_link(response.headers.get('Link'))
                page_count += 1
    
            return all_data
    
        def get_next_link(self, link_header):
            if link_header:
                links = link_header.split(',')
                next_link = [link for link in links if 'rel="next"' in link]
                if next_link:
                    match = re.search(r'<(.*)>', next_link[0])
                    if match:
                        return match.group(1)
            return None
    
    class CountLanguagesOperator(BaseOperator):
        @apply_defaults
        def __init__(self, api_url, token=None, *args, **kwargs):
            super(CountLanguagesOperator, self).__init__(*args, **kwargs)
            self.api_url = api_url
            self.token = token
    
        def execute(self, context):
            repos = context['task_instance'].xcom_pull(task_ids='fetch_github_data')
            headers = {"Accept": "application/vnd.github.v3+json"}
            if self.token:
                headers["Authorization"] = f"Bearer {self.token}"
            
            session = requests.Session()
            session.headers.update(headers)
    
            language_counts = {}
    
            for repo in repos:
                languages_url = repo.get('languages_url')
                if not languages_url:
                    continue  # Skip repos without a languages URL
    
                try:
                    response = session.get(languages_url)
                    response.raise_for_status()
                    languages_data = response.json()
    
                    for language in languages_data.keys():
                        if language in language_counts:
                            language_counts[language] += 1
                        else:
                            language_counts[language] = 1
    
                except requests.exceptions.HTTPError as error:
                    if error.response.status_code == 403:
                        logging.warning(f"Skipping repository due to HTTP 403 Forbidden: {languages_url}")
                        continue
                    else:
                        raise
    
            # Output the results
            for lang, count in language_counts.items():
                logging.info(f"{lang} repositories count: {count}")
    
            return language_counts
    

     

    Please check API's documentation and limitations.

     

     

     

    Step 2: Create the Custom Package
     Follow steps below to create wheel package in Python

    • you have to have folder hierarchy:

       

       

    •  in the setup file add the package folder name like so: 
      from setuptools import setup, find_packages
      
      setup(
          name="custom_operators",
          version="0.1.0",
          package_dir={"": "src"},
          packages=find_packages(where="src"),
          install_requires=[
              # List your dependencies here, e.g., 'numpy', 'pandas'
          ],
          classifiers=[
              "Programming Language :: Python :: 3",
              "License :: OSI Approved :: MIT License",
              "Operating System :: OS Independent",
          ],
          python_requires='>=3.6',
      )
      ​
    • in CMD, run this command to create the wheel package: 
      pip install setuptools wheel​
      python setup.py sdist bdist_wheel
      

      This command will create a source distribution and a wheel for your package. The wheel file (.whl) will be stored in a newly created dist/ directory under custom_operators folder.

     

     

    Step 3: Building Airflow DAG 
    Now that we have build our custom operators and created the wheel package, now we need to create a dag that will trigger these operators.
    for that i created 2 tasks, fetch_github_data and count_languages. 
    each will call the operators above  

     

    from airflow import DAG
    from datetime import datetime, timedelta
    from custom_operators.github_operators import GitHubAPIReaderOperator,CountLanguagesOperator
    default_args = {
        'owner': 'airflow',
        'start_date': datetime(2023, 1, 1),
        'depends_on_past': False,
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    }
    
    dag = DAG(
        'github_language_analysis',
        default_args=default_args,
        description='Analyze GitHub repos for language usage',
        schedule_interval=timedelta(days=1),
    )
    
    fetch_github_data = GitHubAPIReaderOperator(
        task_id='fetch_github_data',
        api_url='https://api.github.com/repositories',
        max_pages=10,  
        token='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',  # Replace with your actual token
        dag=dag
    )
    
    count_languages = CountLanguagesOperator(
        task_id='count_languages',
        api_url='https://api.github.com',
    	token='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',  # Replace with your actual token
        dag=dag
    )
    
    fetch_github_data >> count_languages

     

     

     

    Step 4: Run DAG in ADF Data orchestration manager 

    Now, we built our DAG and our custom package.
    in order to run it in ADF.

    1. Create managed Airflow instance in ADF following MS docs.

    2. in ADLS workspace, create the folder hierarchy as the following:


    In the requirements file, include the path to the custom package stored in your ADLS storage account as follows:

    /opt/airflow/dags/custom_operators-0.1.0-py3-none-any.whl


    3. In the ADF workspace, click on "Import files." Navigate to your ADLS storage account, locate the "Airflow" folder, and check the "Import requirements" checkbox.

     


    it will take a few minutes till ADF orchestration manager will update the code and the custom package. 

     

     


    Step 5: Logs and Monitoring 

    After importing the files, click on the "Monitor" button in the Data Orchestration Manager to view task execution and export Airflow logs. This will open the Airflow UI.

     

    DAG

     

     

    Logs in count_languages task : 

     

     

    P.S: For more dynamic work, you can save the languages count as a JSON file and store it in your storage account.

     

     

     

    Links:

    Install a Private package - Azure Data Factory | Microsoft Learn
    How does Workflow Orchestration Manager work? - Azure Data Factory | Microsoft Learn

    airflow.operators.python — Airflow Documentation (apache.org) 
    airflow.providers.microsoft.azure — apache-airflow-providers-microsoft-azure Documentation 

     

     

    Call to Action:
    - Make sure to establish all connections before starting to work on managed airflow.
    - check MS documentation on Workflow Orchestration Manager.
    - Please help us improve by sharing your valuable Workflow Orchestration Manager Preview feedback by emailing us at ManagedAirflow@microsoft.com
    - Follow me on LinkedIn: Sally Dabbah | LinkedIn

     

Updated May 17, 2024
Version 1.0
No CommentsBe the first to comment