Spark Adaptive Query Engine (AQE) - all the details you need to know

By Niraj Zade | 2024-02-20 | Tags: theory performance


The Adaptive Query Engine aka AQE was introduced in spark 3.0.0, and has been a major step up in making working with spark easier.

We enable AQE by setting the configuration variable spark.sql.adaptive.enabled to true.

Spark AQE follows the tried and tested way of bringing automation magic into systems:

  1. Create a system with a lot of configurations that lets users fine tune their operations
  2. Create a "meta management layer" that automatically sets and tweaks these parameters during runtime, to make the operation run better without needing user interaction

AQE has ended up automating many critical settings that we set as users (most importantly - spark.sql.shuffle.partitions), and has enabled those settings to automatically dynamically change between stages. This makes the system more hands off and removes guesswork from the system. This is a major game changer.

Understanding the context - problems in spark

When working with spark, there are 2 major problems that lead to out of memory errors (OOM) - improper partitioning and data skew.

You notice these problems in spark very clearly when you work with warehousing engines that don't make you think about these errors (like Snowflake, big query etc).

Now, we cannot directly compare spark framework with warehousing engines like Snowflake and BigQuery. But I want you to understand the larger point - spark makes you think too much about partitions and the execution of your program, and fails too often during execution.

The partition size problem

The main problem is managing partition sizes - in storage, and during execution. If partition sizes grow too large, your executors will throw OOM errors. If partition sizes grow too small, performance degrades.

Managing partition sizes while storing and reading from storage is pretty much solves, and doesn't take much experience to solve it.

However, managing partition sizes while execution is whole another thing. It simply takes a lot of experience to properly control them. Look back at most of the OOM errors and execution problems you have had while working with spark. You'll see that spark.sql.shuffle.partitions has been the cause and solution for majority of the problems.

The skew problem

The real world is skewed. Data is a reflection of the real world. So your data too will be skewed. It is inevitable.

Some examples are:

  • There will always be some products who record disproportionately more sales than other products
  • There will be some days (eg - black friday, big billion day) that lead to disproportionately more sales than other days
  • Social media accounts of famous people will generate disproportionately more engagement data than other normal accounts

This skew ends up coming into your data's partitions. So while some partitions get processed quickly while putting minimal load on their executors, the huge skewed partitions will come in and kill their executor with OOM errors.

Major warehousing engines in the market don't have these problems

If you have worked with warehouses like Snowflake, BigQuery etc, you know how painfree the experience is. You just put in data, partition it on some key, and just execute queries. You don't have to worry about size of partitions, types of joins etc. Large partitions and skews will cause reduced performance, but they won't straight out kill your queries.

These warehouses and their processing engines simply hide away the executors from us. Their query planners and execution engines take care of partitions and skews for us. Working with them is a pleasant & brainless experience. The query might run slow, but it won't fail.

With spark, we simply have to care too much about execution. This causes headaches as well as a high barrier to entry. Databricks is the largest contributor to spark. With Databricks' new offering for business intelligence through Databricks SQL, this high barrier to entry and requirement of execution knowledge has become an even larger problem. Analysts simply want to run queries and get answers. They don't want to deal with this execution OOM mumbo-jumbo. If you have been in the field long enough, you'll quickly realize that majority of the people don't care about performance - this entire field is famously littered with huge un-optimized SQL queries. Most people just want their queries to run, and their reports to come out on time. That's all.

Spark has been taking steps towards reaching the developer comfort of these engines, and making the developers worry less about these execution problems. AQE is a part of these steps.

Currently, AQE targets 3 particular problems:

  1. Partition size across stages problem
  2. Data skew problem
  3. Slow join problem

Partition size across stages problem

During storage, you create partitions and buckets carefully such that they don't overload the executors when read.

But what about during execution? what about the partition size that increases during execution when you perform joins and operations on data?

During execution, if the partition size grows too large, you'll again get OOM errors. So now, you tune the partition size by explicitly telling spark to cut up partitions. You do this using the spark.sql.shuffle.partitions setting.

You look at the stage with the most amount of data and set spark.sql.shuffle.partitions to a large enough number such that even that largest stage will run without OOM errors. But this makes other stages with smaller volume of data suffer from the "too small partitions" problem. This kills performance.

What you actually wanted was the option to set partition size on a stage by stage basis. Not set the same number of partitions across all stages.

This is where Coalescing Post Shuffle Partitions helps. With this setting, spark.sql.shuffle.partitions goes from being a number that is strictly applied to all stages, to simply becoming an upper bound of number of partitions.

Suppose spark.sql.shuffle.partitions was set to 1000. But aqe looks at the stats and sees that after the shuffle, the partitions have become too small, and actually work best with 100 partitions. In this case, after the shuffle, AQE will coalesce the partitions to form larger partitions.

In this way, AQE avoids the "many tiny partitions problem"

Coalescing Post Shuffle Partitions

Spark AQE avoids the small partition problem by coalescing partitions after shuffle to form a larger partitions. You practically have to stop worrying about spark.sql.shuffle.partitions.

This is enabled when both spark.sql.adaptive.enabled and spark.sql.adaptive.coalescePartitions.enabled are set to true.

Just set a large enough initial number of shuffle partitions in spark.sql.adaptive.coalescePartitions.initialPartitionNum, and spark will pick proper partition number at runtime. No need to set a specific shuffle partition number through spark.sql.shuffle.partitions to fit our dataset. This simplifies the tuning of shuffle partition numbers when running queries.

When enabled, AQE will coalesce shuffle partitions to the target size set in spark.sql.adaptive.advisoryPartitionSizeInBytes.


NOTE:

However, by default spark AQE will not try to achieve partition sizes equal to spark.sql.adaptive.advisoryPartitionSizeInBytes. Instead, it will try to maximize parallelism and avoid performance regression when enabling adaptive query execution.

This default behaviour is because spark.sql.adaptive.coalescePartitions.parallelismFirst is set to true by default. It is recommended to set it to false and turn off this behaviour.

When spark.sql.adaptive.coalescePartitions.parallelismFirst is true, spark ignores the target size in spark.sql.adaptive.advisoryPartitionSizeInBytes while coalescing, and only respects the minimum partition size specified by spark.sql.adaptive.coalescePartitions.minPartitionSize.

Why this behaviour? Because coalescing will reduce the number of partitions, so it will reduce the number of parallel tasks, so it will cause a reduction in the job's execution speed. The spark devs don't want your job execution speed to suffer when you turn on AQE. That's why this default behaviour maintains job parallelism and execution speed. Then you come in and deliberately turn this behaviour off and let AQE coalesce and increase the partition sizes to try to achieve the partition size specified in spark.sql.adaptive.advisoryPartitionSizeInBytes.

Another configuration variable you need to know is spark.sql.adaptive.coalescePartitions.initialPartitionNum. This variable sets the initial number of shuffle partitions before coalescing. If it is not set, spark uses the value in spark.sql.shuffle.partitions

Details of configuration variables:

  • spark.sql.adaptive.coalescePartitions.enabled
    • Default value = true
    • Available from spark 3.0.0 onward (All currently suported Datbricks runtimes)
  • spark.sql.adaptive.coalescePartitions.parallelismFirst
    • Default value = true (recommended to setting it to false)
    • Available from spark 3.2.0 onward (Databricks runtime 10.4 LTS onward)
  • `spark.sql.adaptive.coalescePartitions.minPartitionSize
    • Default value = 1mb
    • Available from spark 3.2.0 onward (Databricks runtime 10.4 LTS onward)
    • Sets the minimum shuffle partitions after coalescing. Can be at most 20% of spark.sql.adaptive.advisoryPartitionSizeInBytes. Used when target size is ignored by setting spark.sql.adaptive.coalescePartitions.parallelismFirst to true
  • spark.sql.adaptive.coalescePartitions.initialPartitionNum
    • Default value= none ( (spark will use the value set in spark.sql.shuffle.partitions, whose default value is 200)
    • Available from spark 3.0.0 onward (All currently suported Datbricks runtimes)
    • When not set, spark will use the value set in spark.sql.shuffle.partitions (whose default value is 200)
  • spark.sql.adaptive.advisoryPartitionSizeInBytes
    • Default value = 64mb
    • Available from spark 3.0.0 onward (All currently suported Datbricks runtimes)

Data skew problem

Data skew is a major unavoidable problem. The few skewed partitions will cause OOM in their executors.

Until AQE, there were 2 solutions for this problem:

Solution 1. Increase executor sizes

This isn't a feasible solution. Suppose all your partitions need 1gb of ram to execute, but one skewed partition needs 4gb to execute. Now, while using spark, you cannot make a single larger 4gb executor and tell spark to use this larger executor to process the skewed partition. While using spark, you'll have to increase the ram of ALL executors to 4gb. Now, all partitions along with the skewed partition will be processed successfully. But all the other normal sized partition only needed 1gb of ram. So, 3gb of ram in all executors got wasted.

Hardware is expensive. So this is not a solution.

Solution 2. Manually split partitions with salting

(If you don't know salting, there are many articles and videos explaining it on the internet.)

This is the solution used by everyone. If the partition on current column is causing skews, we create an synthetic column, and make spark re-partition the data by shuffling on the combination of older partition key column and synthetic column. In this way, we break the skewed partition into smaller and more evenly distributed chunks.


With AQE, we simply don't have to worry about this anymore.

When enabled, AQE will automatically split skewed partitions when they cross the thresholds you set in configuration. No need to increase hardware resources. No need to figure out salting.

This setting is enabled by setting spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled to true. When partition splitting is enabled, the partitions will be split to achieve the target size specified in spark.sql.adaptive.advisoryPartitionSizeInBytes.

A partition will be merged during splitting if its size is smaller than spark.sql.adaptive.advisoryPartitionSizeInBytes multiplied by spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor

Example - the default value of spark.sql.adaptive.advisoryPartitionSizeInBytes is 64mb, and the default value of spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor is 0.2. So, if a partition's size is less then 12.8mb (64mb x 0.2), it will be merged to form a larger partition.

Details of configuration variables:

  • spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled
    • Default value = true
    • Available from spark 3.2.0 onward (Databricks runtime 10.4 LTS onward)
  • spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor
    • Default value = 0.2
    • Available from spark 3.3.0 onward (Databricks runtime 11.3 LTS onward)

Slow join problem

AQE will try to upgrade slow join strategies to faster join strategies.

AQE particularly tries to avoid Shuffle Sort Merge join strategy by converting it into a variant of the Hash join strategies - Broadcast Hash join or Shuffle Hash join

Why? because shuffle sort merge join sorts both sides before the merge operation. This is computationally very expensive.

Converting Shuffle Sort Merge join to Broadcast Hash join

If any join side is smaller than size specified in spark.sql.adaptive.autoBroadcastJoinThreshold, spark will convert it into Broadcast Hash join.

There is also an option to read shuffle files locally within the node itself, to save network traffic. This is done using the spark.sql.adaptive.localShuffleReader.enabled variable.

More details of configuration variables:

  • spark.sql.adaptive.autoBroadcastJoinThreshold
    • Default value = none (AQE will use the value in spark.sql.autoBroadcastJoinThreshold)
    • Available from spark 3.2.0 onward (Databricks runtime 10.4 LTS onward)
    • Set the maximum size in bytes
    • Broadcasting can be disabled by setting value to -1.
  • spark.sql.adaptive.localShuffleReader.enabled
    • Default value = true
    • Available from spark 3.0.0 onward (All currently supported Databricks runtimes)
    • When spark.sql.adaptive.enabled is true and spark.sql.adaptive.localShuffleReader.enabled is true, spark will use the local shuffle reader when shuffle partitioning is not needed.

Converting Shuffle Sort Merge join to Shuffle Hash Join

If all post shuffle partitions are smaller than a threshold (spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold), AQE will convert the Shuffle Sort Merge join to Shuffle Hash join.

Spark AQE will convert Shuffle Sort Merge join to Shuffle Hash Join irrespective of the value of spark.sql.join.preferSortMergeJoin when:

  1. spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold is greater than spark.sql.adaptive.advisoryPartitionSizeInBytes
  2. all partition sizes are smaller than spark.sql.adaptive.advisoryPartitionSizeInBytes

More details of configuration variables:

  • spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold
    • Default value = 0
    • Available from spark 3.2.0 onward (Databricks runtime 10.4 LTS onward)
    • Sets the maximum partition size in bytes that is allowed to build local hash map (for the join algorithm)

Some interesting history behind sort merge joins and shuffle hash join

According to Spark Issue 11675, the shuffle hash join was removed from spark 1.6. Reason cited was -

... I think we should just standardize on sort merge join for large joins for now, and create better implementations of hash joins if needed in the future

Then in spark 2.0, shuffle hash join was added back into to spark, according to Spark issue 13977. The reason cited was:

ShuffledHashJoin is still useful when:

1) any partition of the build side could fit in memory

2) the build side is much smaller than stream side, the building hash table on smaller side should be faster than sorting the bigger side.

Source - Stackoverflow

Resources