Author(s): Arun Sethia is a Program Manager in Azure HDInsight Customer Success Engineering (CSE) team.
This blog is part of Spark 3.x series of blogs; the blog is dedicated towards looking into two different aspects of AQE (Adaptive Query Execution), Dynamic Switching Join Strategies and Dynamic Optimizing Skew Join using Apache Spark in Azure HDInsight.
There are multiple business scenarios where we must join multiple datasets to generate business insight for end-user consumption. Spark applies different join strategies based on the nature of dataset or query basic to choose the join operations. Few of the join strategies are:
We will not get into the detail of these join strategies in this blog; these join strategies are explained here. The Broadcast Hash Join is the most performant when any join side fits well in memory. Spark plans a Broadcast Hash Join if the estimated size of a join relation is less than the configured spark.sql.autoBroadcastJoinThreshold value. The smaller DataFrame will be broadcasted to all executors to perform Broadcast Hash Join.
The example code used for this blog has two datasets:
The business would like to get sales by date for a given state (filter customer data for a given state); which can be done by joining the customer data set with the sales data set.
%%sql
SELECT tx_date, sum(tx_value) AS total_sales
FROM sale
JOIN customer ON customer.customer_id = sale.customer_id
WHERE address_state="IL"
GROUP BY tx_date
The filter by address state is not known in static planning, so the initial plan opts for sort merge join. The customer table after filtering is small (~10% of original), so the query can do a broadcast hash join instead.
The broadcast join is a very high-performance join and a better option for the smaller data set where a smaller data/table is sent to every executor to execute a map-side join. We can use broadcast Spark join hint to enforce specific join must be used during the join operation. The broadcast joins hint suggests that Spark use broadcast join regardless of the configuration property autoBroadcastJoinThreshold.
Without AQE, the estimated size of join relations comes from the statistics of the original table. Unfortunately, it can go wrong in most real-world cases. Developers can only determine the table/dataset size after applying a filter and using a hint without knowing the dataset's size may result in an OOM exception.
AQE converts sort-merge join to broadcast hash join when the runtime statistics of any join side are smaller than the broadcast hash join threshold. It is not as efficient as planning a broadcast hash join in the first place, but it's better than keep doing the sort-merge join, as we can save the sorting of both the join sides and read shuffle files locally to save network traffic(if spark.sql.adaptive.localShuffleReader.enabled is true)
The AQE can be enabled using Spark Configuration (from Spark 3.2.0 onwards, It is enabled by default). Let's run the earlier code, enabled with AQE with HDInsight 5.0 using Spark 3.1.1. The source code example is available GitHub.
// Enable AQE
sql("SET spark.sql.adaptive.enabled=true")
sql("set spark.sql.adaptive.localShuffleReader.enabled=true")
As we can see, Spark optimizes the initial plan; It can replan the join strategy dynamically from sort-merge join to broadcast hash join at runtime if the size fits spark.sql.autoBroadcastJoinThreshold.
We can see local shuffle reader (CustomShuffleReader) is used to avoid shuffle when AQE optimizes the sort-merge join to the broadcast join.
Data skewness happens when data are unevenly distributed for a given key column; that means few column values have more rows than others; for example, the number of orders for day/month/year is more than others, uneven number of orders by selective customers, an uneven number of claims from a geo-location, number of page hits are uneven during the hours of the day, etc.
Spark join operation on two datasets would require moving data with the same join key(s) to the same executor. If your business data are skewed among different partition key column values, one or more partitions will have more data than other partitions. Such data skewness can cause Spark jobs to have one or more trailing tasks (larger partitions), severely downgrade queries' overall elapsed time, and waste resources on the cluster because idle waiting for these trailing tasks to complete.
Such skewed partitions do not fit in the memory of the executors. In that case, such tasks can result in garbage-collection problems or more slowness because data may spill to the disk, or in the worst case, this can result in Out of Memory exceptions and cause jobs to fail.
The example code used for this example has two datasets:
In the code example the sales data has been modified to generate data skewness for item id “18”. The source code example is available GitHub.
The business would like to get sales by date; we need to join sales with the item to get the item's price.
%%sql
SELECT tx_date, sum(soldQty * unit_price) AS total_sales
FROM sale
JOIN item ON item.item_id = sale.item_id
GROUP BY tx_date
It is difficult to detect data skewness from the Spark query execution plan. It provides the steps performed to execute a job but does not provide data distribution after each task. For this, we can use Spark UI from the history server. If we examine the job detail by accessing Spark UI, we get the following:
Stage 4 has taken longest time 7 mins, and if we drill down this stage to task level, we can see summary statistics of all 201 tasks of this stage. There is significant difference between max and 75th percentile or median. It is a strong indicator of data skewness.
The number of records processed by one task is significantly higher than other tasks, and we can also see the skewness problem from that task that it cannot fit data in memory, and data are spilled onto the disk.
We can manage data skew problems in multiple ways, such as using derived columns to divide large partitions, Broadcast Hash join if the dataset is not too large, etc. But we still may see room for performance improvements. The Adaptive Query Execution from Spark 3.x can rescue you with minimal code change; this feature dynamically handles skew in sort-merge join by splitting (and replicating if needed) skewed tasks into roughly evenly sized tasks. It takes effect when both spark.sql.adaptive.enabled and spark.sql.adaptive.skewJoin.enabled configurations are enabled. There are two additional parameters to tune skewJoin in AQE:
We will enable both AQE and skewjoin to our last example execution.
sql("SET spark.sql.adaptive.enabled=true")
sql("SET spark.sql.adaptive.skewJoin.enabled=true")
By looking execution plan and SQL query plan, we can find Spark optimizes the initial plan, the time taken by the query has reduced from 7 mins to 1.5 mins, and the number of tasks has reduced from 200 to 70.
If we drill down stage#6 to the task level, we can see summary statistics of all 70 tasks of this stage. There is not much difference between the max and 75th percentile or median. This is because the AQE has optimized uneven partitions.
While looking to Spark SQL plan from Spark UI, we can find:
By default, AQE Coalesce Partitions (spark.sql.adaptive.coalescePartitions.enabled) is enabled; if not, you can enable the same so that AQE will optimize smaller partitions into larger partitions based on statistics of data and processing resources that will reduce shuffle time and data transfer.
AQE in Spark optimizes joins, especially where to join involves a filter; it optimizes the initial plan from a sort-merge join to a broadcast join based on runtime statistics. AQE can replan the join strategy dynamically from sort-merge join to broadcast hash join at runtime.
Similarly, AQE can help significantly in managing data skew, especially long-running jobs that should be analyzed for such opportunities to allow developers to mitigate data-skew problems early to utilize resources and better overall performance.
AQE is a splendid feature added since Spark 3.x; migrating your HDInsight clusters to Spark 3.x can improve your business ROI and better performance. Contact the HDInsight team if you need help to migrate your workload from HDInsight 4.0 to HDInsight 5.x to take maximum benefits.
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.