Forum Discussion

AjayGopu's avatar
AjayGopu
Copper Contributor
May 10, 2024

How to Achieve Coupe of functionalities in Azure Data Factory Dyanmically.

Hi Team.

 

I have below scenarios as part of my business requirement. These requirements has to achieved dyanmically using Azure Data Factory Data Flows or Pipelines. 

 

Note : Requirement is not to using Function Apps, DataBricks or any other API calls.

 

I have a blob storage which holds the CSV files with varying headers(I mean the headers and content inside it will change all the time) in it all the time. I want to move these CSV files to Parquet file by performing couple of validations, which are as mentioned below.

 

  1. Need to loop through each file from source blob folder.
  2. Need to get the count of rows inside the file dynamically.
    Spoiler
    (can't use pipeline lookup as the data inside it is in millions).
  3. Use the count as conditional logic to continue to next step.
  4. In next step i need to validate the CSV data to find any invalid rows. For Example, i'm using the comma(,) as column delimitor in my dataset. So if any string which is not enclosed in double-quotes("") and has a comma(,) with in it, will be treated as new column without any header column name. These type of column names should be treated as invalid rows and should be moved to another blob storage folder as a ".CSV" 
  5. For example the source CSV file may look like this.

TestColumn1,TestColumn2,TestColumn3
BUDGETS,-1431654712,jgdsgfj,sdfds
BUDGETS,-1431654712,
BUDGETS,-1431654712,

2 Replies

  • AjayGopu 

    You can achieve this dynamically in Azure Data Factory (ADF) using Pipelines and Mapping Data Flows without Function Apps or Databricks.

    Steps:
    1. Loop through files – Use Get Metadata + ForEach to iterate over CSVs in Blob Storage.
    2. Get row count – Use Aggregate transformation in Data Flow.
    3. Apply conditional logic – Use If Condition to check if row count > 0.
    4. Validate rows – Use Derived Column + Filter transformation to check for misaligned columns (unquoted commas).
    5. Move valid/invalid rows –
      Valid Rows → Convert to Parquet
      Invalid Rows → Save as CSV in another folder
    This keeps the pipeline fully dynamic and automated

  • How about Azure Data Factory?

     

    1. Create a Pipeline:
      • Create a new pipeline in Azure Data Factory.
    2. Add Activities to Pipeline:
      • Get Metadata Activity: Use this activity to list all the files in the source blob folder.
      • ForEach Activity: Loop through each file using the output of the Get Metadata activity.
      • Inside the ForEach Activity:
        • Copy Activity: Copy the file content from the source blob to a staging area in Azure Data Lake Storage Gen2.
        • Data Flow Activity: Add a Data Flow activity to process the CSV files and perform validations.
    3. Create a Data Flow:
      • Source Transformation: Add a source transformation to read the CSV file from the staging area.
      • Derived Column Transformation: Add a derived column transformation to add a row count column.
      • Conditional Split Transformation: Add a conditional split transformation to separate valid and invalid rows.
        • Valid Rows: Rows that meet the criteria.
        • Invalid Rows: Rows that contain invalid data.
      • Sink Transformation: Add two sink transformations:
        • Valid Rows Sink: Write the valid rows to a Parquet file in the destination folder.
        • Invalid Rows Sink: Write the invalid rows to a separate CSV file in another blob storage folder.

    Example on Pipeline and Data Flow:

     

    Pipeline

    1. Get Metadata Activity:
      • Configure the activity to list all the files in the source blob folder.
      • Output: List of file names.
    2. ForEach Activity:
      • Items: @activity('Get Metadata Activity').output.childItems
      • Inside the ForEach activity, add the following activities:
        • Copy Activity:
          • Source: Source blob storage.
          • Sink: Staging area in Azure Data Lake Storage Gen2.
        • Data Flow Activity:
          • Parameters: Pass the file name to the data flow.

    Data Flow

     

    1. Source Transformation:
      • Source: Staging area in Azure Data Lake Storage Gen2.
      • Options: Enable header and specify delimiter as comma.
    2. Derived Column Transformation:
      • Add a column for row count: RowCount = rownum()
    3. Conditional Split Transformation:
      • Valid Rows Condition: length(toString(TestColumn1)) > 0 and length(toString(TestColumn2)) > 0 and length(toString(TestColumn3)) > 0
      • Invalid Rows Condition: !(length(toString(TestColumn1)) > 0 and length(toString(TestColumn2)) > 0 and length(toString(TestColumn3)) > 0)
    4. Sink Transformation:
      • Valid Rows Sink: Write to Parquet file.
      • Invalid Rows Sink: Write to CSV file in another blob storage folder.

Resources