In this blog post, we’ll explore how Azure Data Factory (ADF) or Synapse Pipelines can be used for orchestrating large query ingestions. With this approach you will learn, how to split one large query ingest into multiple partitions, orchestrated with ADF.
Common use-cases for this are:
- Event tables requiring a certain query complexity ( e. g. window-functions requiring serialized datasets, currently not supported by materialized views).
- Backfill scenarios, for example for new created materialized views or derived event tables, based on existing very large datasets. (For materialized views there are built in functionalities doing the backfill, see also Create materialized view - Azure Data Explorer | Microsoft Learn), that you should check first.)
There are other ways of dealing with this, in this blog we will focus on orchestrating the query ingest with ADF.
From a conceptional view the approach is very simple: In a first step we identify time-partitions on the source dataset. With this information we will split the query ingest into these partitions, so instead of doing one query ingest for the overall dataset, we do several query ingests for all time-based partitions.
This can be solved with three Azure Data Factory activities:
- A Lookup activity, reading all source partitions
- A ForEach activity, executing an Azure Data Exlorer commant activity
- The Azure Data Explorer command activity, triggering the append command for each partition
The overall data pipeline looks like:
Step by step guide:
To follow this guide you need as a pre-requisite:
- An Azure Data Explorer Cluster with a database
- An Azure Data Factory (ADF) or Azure Synapse Workspace deployed
- To process data on Azure Data Explorer you have to create a linked service for your Azure Data Explorer Cluster in ADF. Follow the documentation for the setup (see Copy data from Azure Data Factory to Azure Data Explorer | Microsoft Learn).
- You have to grant your ADF database access. Best approach is granting the workspace service managed identity access to your Azure Data Explorer database (see Copy and transform data in Azure Data Explorer - Azure Data Factory & Azure Synapse | Microsoft Learn). You must grant at least the Database viewer role and the Database ingestor role to your managed identity to the database.
With this you will start
(1) creating a new pipeline.
(2) In the pipeline create a Lookup activity. In the settings select the ADX dataset. If it does not exist, create a new dataset, for this select Azure Data Explorer for the new integration dataset. Select the linked service you have created, and the table that will be used as a data-source. De-select first-row only select box and enter manually the query:
measurement
| summarize by bin(enqueuedTime, 1d)
| order by enqueuedTime asc
The query should return the time-partitions for your source data-set. In our example, we query all days from the table measurement.
In the general tab configure a retry of at least 3 to make your activity resilient:
(3) Add a ForEach activity and connect the Lookup activity with it (on success). In the setting tab under Items add:
@activity('lkpDayPartition').output.value
This is the value that will be used in the inner activity of the ForEach activity.
The batch count controls how many child activities will be executed in parallel. As the query-ingest can be very resource intensive, recommendation is to start with only 1 and do some testing with a slight increase.
(4) In the for-each-activity add an Azure Data Explorer Command activity. In the general tab do some changes to make the overall process resilient:
- Configure a retry (e. g. for the case your command gets throttled) of for example 5, and
- slightly increase the retry interval (e. g. to 120 sec)
Under connection select the Azure Data Explorer linked service.
Now, you have to add the command for the query ingest. We will configure two parameters:
- set the distributed flag to true. In our case we expect that the dataset being ingested for a day is larger than 1 GB, see also Kusto query ingestion (set, append, replace) - Azure Data Explorer | Microsoft Learn.
- We will set the creationTime: For a backfill scenario this is a good practice. As we are using the date partitions in the lookup this is the most efficient way (alternatively a partitioning policy could be used, see Partitioning policy - Azure Data Explorer | Microsoft Learn). If your overall dataset is going back more than 14 days, make sure that the default merge policy for your table in Azure Data Explorer is adapted for this process (see also Extents merge policy - Azure Data Explorer | Microsoft Learn).
The final command will also have a filter on the datetime column and will look as follows:
append measurement_copy with (distributed=true, creationTime = "@{item().enqueuedTime}") <| measurement
| where enqueuedTime >= todatetime("@{item().enqueuedTime}") and enqueuedTime < todatetime("@{item().enqueuedTime}") +1d
With this final step you are done, and you can trigger your new data pipeline.
Conclusion
In this blog post you have learned:
- How to orchestrate Azure Data Explorer query ingest with Azure Data Factory.
- Making the query ingest resilient, simply configuring retries in the Azure Data Factory activities.
- Passing parameters from Azure Data Factory activities to the ADX ingest command, for flexible filtering and further settings.