Data pre-processing for Azure Data Explorer with Apache Spark
We often see customer scenarios where historical data has to be migrated to Azure Data Explorer (ADX). Although ADX has very powerful data-transformation capabilities via update policies, sometimes more or less complex data engineering tasks must be done upfront. This happens if the original data structure is too complex or just single data elements being too big, hitting data explorer limits of dynamic columns of 1 MB or maximum ingest file-size of 1 GB for uncompressed data (see also Comparing ingestion methods and tools) .
Let's think about an Industrial Internet-of-Things (IIoT) use-case where you get data from several production lines. In the production line several devices read humidity, pressure, etc. The following example shows a scenario where a one-to-many relationship is implemented within an array. With this you might get very large columns (with millions of device readings per production line) that might exceed the limit of 1 MB in Azure Data Explorer for dynamic columns. In this case you need to do some pre-processing.
Data pre-processing with pyspark
There are many other ways of dealing with this - in this blog we are focusing on using pyspark and the Notebook experience in Azure Synapse Analytics. The sample code is available in our GitHub repo.
Let' s assume data has already been uploaded to Azure storage. You will start reading the json-data into a data frame:
reading the source data
We will see that the dataframe has some complex datatypes. The examination of the datatypes is showing the measurement column which is an array of structs with the measurement data per deviceId:
the data structure of the raw data
Displaying the dataframe will show you the data:
the data content
The only thing that we want to change here is getting rid of the array, so having the resulting dataset a row for every entry in the measurement array.
How can we achieve this?
pyspark-sql has some very powerful functions for transformations of complex datatypes. We will make use of the explode-function. In this case explode ("measurement") will give us a resulting dataframe with single rows per array-element. Finally, we only must drop the original measurement-column (it is the original structure):
With this we already have done the necessary data transformation with only one line of code. Let' s do some final "prettifying". As we are already preprocessing the data and want to get rid of the complex data types, we select the elements in the structs to get a simplified table:
selecting the elements of the structs
We are setting the extentsCreationTime to the notebook-parameter IngestDate
(this is one option for backfill scenarios, other options are discussed in the next section). The Spark ingestion properties are documented here
set ingestion parameters
Finally, we write the resulting dataframe back to to Azure Data Explorer. Prerequisite doing this in Synapse Analytics is having created a linked Service (detailed steps for the setup you can find in the documentation):
writing the data to ADX
Data in Azure Data Explorer is orderd and partitioned into extents according to a specific datetime column. For loading historical data (backfill scenarios) make sure that you consider this either:
- the source data is already partitioned by date (if not it is required structuring the data accordingly), so adding date information in your ingestion process, before ingesting the data to ADX. This requires a separate step in your data pipeline, as the creation date is part of the Spark ingestion properties (you have to set extentsCreationTime of the SparkIngestionPoperties see here, search for writing with advanced options, the configuration is also used in the referenced notebooks) . This can be achieved for example with a parametrized notebooks, with date as parameter what has to be called for every day. This approach saves an additional partitioning step in Azure Data Explorer, so should be preferred if the data is already partitioned by date.
- define a data partitioning policy based on a timestamp in the data serving as a uniform range datetime partition key (see the blog post Data partitioning for more details).
The data is written to a temporary table in ADX in a first step and extents of this table are finally moved to the target table. This is important to know, as a change of the IngestionBatching policy on the target table would show no effect. If you want to apply a change (e. g. changing the default batching time span) you have to do it on database level.
As mentioned above there is a limit for the (uncompressed) filesize you can ingest. With Spark the number of output files equals the number of partitions of the dataframe. This means you can control the final file sizes it in several ways, depending on the context:
- For datasets with no wide dependencies, you can control input using reader specific parameters
- For datasets with wide dependencies you can control number of partitions with spark.sql.shuffle.partitions parameter.
With a simple read, output files have the size of the spark default partition. So we get files of approximately 128 MB. You might consider this also, for other ingestion scenarios, e. g. via Azure EventGrid.
For historical data you want to load to Azure Data Explorer, consider the powerful capabilities pyspark offers, for complex data transformations. As you have seen, complex data structures can be simplified with a few lines of codes. Get started with the available notebooks for Azure Synapse Analytics and Azure Databricks.