Spark join strategies

By Niraj Zade | 2024-01-22   | Tags: theory join performance


Basics you should know - Types of joins

The type of join can be decided on 2 parameters:

  1. The join condition used to decide if the rows should be joined or not
  2. The set operation being performed in the join

Types of joins based on join condition

There are 2 types of join conditions:

  1. Equi join
  2. Non-equi join

    The type of join condition decides if a join algorithm is applicable to the join operation or not.

Equi Join

In an equi-join, the join condition is a strict equality condition.

Examples: - table1.column = table2.column

SELECT *
FROM table1, table2
WHERE table1.column = table2.column;

Non-equi Join

In a non-equi join, the join condition is not a strict equality condition.

Examples:

  • table1.column > table2.column
  • table1.column < table2.column
  • table1.column >= table2.column
  • table1.column <= table2.column
  • table1.column != table2.column
  • table1.column BETWEEN table2.column1 AND table2.column2
SELECT *
FROM table1, table2
WHERE table1.column <= table2.column;

Types of joins based on set operation

These are the normal set operations that we do in our joins. These operations decide if a join algorithm is applicable to a particular join type or not.

  • INNER JOIN
  • LEFT JOIN / RIGHT JOIN
  • CROSS JOIN
  • LEFT SEMI JOIN / RIGHT SEMI JOIN
  • OUTER JOIN

Overview of available join strategies

The join process

The join process is simple:

  1. Collect relevant rows needed for the join operation within the relevant executors
  2. Apply the algorithm to join rows in the dataframe
  3. Save the joined data into a dataframe

The first step - collecting relevant data at one place - can be done in 2 ways:

  1. If the dataframe size is smaller than a threshold, spark will broadcast the dataframe to all executors
  2. If the dataframe size is greater than a threshold, spark will perform a shuffle

After the relevant data is collected within the executors, is it time to join the rows. There are 3 join algorithms available in the Spark engine:

  1. Hash joins
  2. Nested loop joins
  3. Sort merge join

These join algorithms are standard in the data processing world, and are in almost all relational database engines (eg - PostgreSQL).

Join strategies

Now, we have 2 ways to collect relevant data into the executors, and 3 algorithms to perform joins. Through combinations of these two, there are 5 join strategies available in spark:

  1. Hash joins (2 variants - shuffle or broadcast):
    1. Broadcast Hash Join
    2. Shuffle Hash Join
  2. Nested loop joins (2 variants - shuffle or broadcast):
    1. Broadcast Nested Loop Join
    2. Shuffle and Replicate Nested Loop Join
  3. Sort merge join:
    1. Shuffle Sort Merge Join

Hash Joins

Wikipedia article on the algorithm - Wikipedia - Hash Join

Spark engine calculates hash table of the join column(s) of one of the dataframes (usually the smaller dataframe) and stores it in memory. Then it loops over rows in the other dataframe, calculates hash of the current row, and matches this calculated hash with hashes of the other dataframe stored in memory. If both hashes match, the rows are joined and added to the result dataframe.

Since the hashtable of one of the dataframes has to be stored in memory, hash join algorithms will always use more memory than nested loop join algorithms.

Broadcast Hash join - BROADCAST

The hinted dataframe is broadcasted to all worker nodes, and the join is performed locally on each node.

Used when a small dataframe is being joined to another dataframe. The small dataframe will be broadcasted to all executors for joining.

Typical use case is when joining a dimension table to a fact table - We usually broadcast the dimension table to all executors, and then join it to the fact table within executors.

You can hint spark to use broadcast hash join using any one of the following hints -

  • BROADCAST - preferred
  • BROADCASTJOIN
  • MAPJOIN

Example:

# hint Braodcast Hash Join
joined_df = fact_sales_df.join(country_dim_df.hint("BROADCAST"), on="country_id", how="inner")

# explain physical plan
joined_df.explain()

Output:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [country_id#231, id#229L, customer_name#230, sales_amount#232L, country_name#255]
   +- BroadcastHashJoin [country_id#231], [country_id#254], Inner, BuildRight, false
      :- Filter isnotnull(country_id#231)
      :  +- Scan ExistingRDD[id#229L,customer_name#230,country_id#231,sales_amount#232L]
      +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]),false), [plan_id=1130]
         +- Filter isnotnull(country_id#254)
            +- Scan ExistingRDD[country_id#254,country_name#255]

Shuffle hash join - SHUFFLE_HASH

It redistributes both dataframes' data across the partitions using a hash function on the join key, so that matching keys end up in the same partition. And then, the join is performed locally on each node.

You can hint spark to use broadcast hash join using the "SHUFFLE_HASH" hint.

Example:

# hint Broadcast Shuffle Join
joined_df = fact_sales_df.join(country_dim_df.hint("SHUFFLE_HASH"), on="country_id", how="inner")

# explain physical plan
joined_df.explain()

Output:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [country_id#231, id#229L, customer_name#230, sales_amount#232L, country_name#255]
   +- ShuffledHashJoin [country_id#231], [country_id#254], Inner, BuildRight
      :- Exchange hashpartitioning(country_id#231, 200), ENSURE_REQUIREMENTS, [plan_id=1251]
      :  +- Filter isnotnull(country_id#231)
      :     +- Scan ExistingRDD[id#229L,customer_name#230,country_id#231,sales_amount#232L]
      +- Exchange hashpartitioning(country_id#254, 200), ENSURE_REQUIREMENTS, [plan_id=1252]
         +- Filter isnotnull(country_id#254)
            +- Scan ExistingRDD[country_id#254,country_name#255]

Nested Loop joins

Wikipedia article on the algorithm - Nested Loop Join

This is a Cartesian product join, and is also the simplest join algorithm to understand.

To find matching rows to join - the algorithm iterates through rows of the outer table, and compares it to every row of the inner table. If the join condition is satisfied, the rows are joined and added to the result dataframe.

Suppose the outer table has n rows and inner table has m rows, then this join has O(n*m) time complexity. So, it is a very very expensive join.

Broadcast nested loop join

There is no hint for this join type, as it is the default join strategy used by spark (when no other strategies are applicable).

From comments in the spark source code:

If the join is an equi join:

//5. Pick broadcast nested loop join as the final solution. It may OOM but we don't have
//      other choice.

If the join is a non-equi join:

//   3. Pick broadcast nested loop join as the final solution. It may OOM but we don't have
//      other choice. It broadcasts the smaller side for inner and full joins, broadcasts the
//      left side for right join, and broadcasts right side for left join.

Shuffle-and-Replicate Nested Loop Join - SHUFFLE_REPLACE_NL

You can hint spark to use broadcast hash join using the "SHUFFLE_REPLICATE_NL" hint.

Example:

# hint Shuffle and Replicate Nested Loop Join
joined_df = fact_sales_df.join(country_dim_df.hint("SHUFFLE_REPLICATE_NL"), on="country_id", how="inner")

# explain physical plan
joined_df.explain()

Output:

== Physical Plan ==
*(3) Project [country_id#231, id#229L, customer_name#230, sales_amount#232L, country_name#255]
+- CartesianProduct (country_id#231 = country_id#254)
   :- *(1) Filter isnotnull(country_id#231)
   :  +- *(1) Scan ExistingRDD[id#229L,customer_name#230,country_id#231,sales_amount#232L]
   +- *(2) Filter isnotnull(country_id#254)
      +- *(2) Scan ExistingRDD[country_id#254,country_name#255]

Sort Merge join

In this algorithm, both dataframes are sorted based on the join key, and then merged if the join condition is satisfied. The merged rows are added to the result dataframe.

This algorithm only works with equi-joins.

Shuffle sort merge join - MERGE

This strategy is efficient when both DataFrames are large and can be sorted without shuffling. To avoid the shuffle phase, make sure the data is bucketed so that the shuffle phase is avoided.

You can hint spark to use broadcast hash join using any one of the following hints -

  • MERGE - preferred
  • SHUFFLE_MERGE
  • MERGEJOIN

Example:

# hint Shuffle Sort Merge Join
joined_df = fact_sales_df.join(country_dim_df.hint("MERGE"), on="country_id", how="inner")

# explain physical plan
joined_df.explain()

Output:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [country_id#231, id#229L, customer_name#230, sales_amount#232L, country_name#255]
   +- SortMergeJoin [country_id#231], [country_id#254], Inner
      :- Sort [country_id#231 ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(country_id#231, 200), ENSURE_REQUIREMENTS, [plan_id=2187]
      :     +- Filter isnotnull(country_id#231)
      :        +- Scan ExistingRDD[id#229L,customer_name#230,country_id#231,sales_amount#232L]
      +- Sort [country_id#254 ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(country_id#254, 200), ENSURE_REQUIREMENTS, [plan_id=2188]
            +- Filter isnotnull(country_id#254)
               +- Scan ExistingRDD[country_id#254,country_name#255]

How does spark choose a join strategy?

Spark has a straightforward, predictable way of choosing join strategy. It goes through a decision tree to choose the join strategy to use.

If the join is an equi-join

If join hints are provided - check hints in this order and pick the first hint that matches:

  • Broadcast hash join hint (BROADCAST , BROADCASTJOIN, MAPJOIN) - If both sizes have broadcast hints, broadcast the smaller side.
  • Sort merge Join (MERGE) - Pick if join keys are sortable.
  • Shuffle hash join (SHUFFLE_HASH) - If both sides have shuffle hash hints, choose smaller side as the build side.
  • Shuffle replicate nested loop join (SHUFFLE_REPLACE_NL) - Pick if the join type is inner-like.

If no hint matches or if no hint was provided by the programmer - check conditions in this order and pick the the join strategy of the first condition that is satisfied:

  1. If one side is small enough to broadcast, and broadcast hash join is supported, pick Broadcast Hash join.
    • If both sides are small, broadcast the smaller side.
  2. If one side is small enough to build a local hashmap, and that side is much smaller than the other side, and spark.sql.join.preferSortMergeJoin is false, pick Shuffle Hash join.
  3. If join keys are sortable, pick Sort Merge join.
  4. If the join type is inner like, pick cartesian product join.

No condition was satisfied? pick Broadcast Nested Loop join. It may throw Out of memory error, but there is no other choice.

If the join is a non-equi join

If join hints are provided - check hints in this order and pick the first hint that matches:

  • Broadcast hash join hint (BROADCAST , BROADCASTJOIN, MAPJOIN) - If both sizes have broadcast hints, broadcast the smaller side.
  • Shuffle replicate nested loop join (SHUFFLE_REPLACE_NL) - Pick if the join type is inner-like.

If no hint matches or if no hint was provided by the programmer - check conditions in this order and pick the the join strategy of the first condition that is satisfied:

  1. If one side is small enough to broadcast, and broadcast hash join is supported, pick Broadcast Hash join.
    1. If only left side is broadcast-able and it's left join, or only right side is broadcast-able and it's right join, skip this rule.
    2. If both sides are small, broadcast the smaller side for inner and full joins, broadcast the left side for right join, and broadcast right side for left join.
  2. If the join type is inner like, pick cartesian product join.

No condition was satisfied? pick Broadcast Nested Loop join. It may throw Out of memory error, but there is no other choice.

Comments from the source code

Commends in the spark source code describe this process of choosing join strategy. (Read the comments in the apply() function inside SparkStrategies.scala -Source code link)

Here are the comments:

Equi join

  // If it is an equi-join, we first look at the join hints w.r.t. the following order:
  //   1. broadcast hint: pick broadcast hash join if the join type is supported. If both sides
  //      have the broadcast hints, choose the smaller side (based on stats) to broadcast.
  //   2. sort merge hint: pick sort merge join if join keys are sortable.
  //   3. shuffle hash hint: We pick shuffle hash join if the join type is supported. If both
  //      sides have the shuffle hash hints, choose the smaller side (based on stats) as the
  //      build side.
  //   4. shuffle replicate NL hint: pick cartesian product if join type is inner like.
  //
  // If there is no hint or the hints are not applicable, we follow these rules one by one:
  //   1. Pick broadcast hash join if one side is small enough to broadcast, and the join type
  //      is supported. If both sides are small, choose the smaller side (based on stats)
  //      to broadcast.
  //   2. Pick shuffle hash join if one side is small enough to build local hash map, and is
  //      much smaller than the other side, and `spark.sql.join.preferSortMergeJoin` is false.
  //   3. Pick sort merge join if the join keys are sortable.
  //   4. Pick cartesian product if join type is inner like.
  //   5. Pick broadcast nested loop join as the final solution. It may OOM but we don't have
  //      other choice.

Non-equi join

// If it is not an equi-join, we first look at the join hints w.r.t. the following order:
//   1. broadcast hint: pick broadcast nested loop join. If both sides have the broadcast
//      hints, choose the smaller side (based on stats) to broadcast for inner and full joins,
//      choose the left side for right join, and choose right side for left join.
//   2. shuffle replicate NL hint: pick cartesian product if join type is inner like.
//
// If there is no hint or the hints are not applicable, we follow these rules one by one:
//   1. Pick broadcast nested loop join if one side is small enough to broadcast. If only left
//      side is broadcast-able and it's left join, or only right side is broadcast-able and
//      it's right join, we skip this rule. If both sides are small, broadcasts the smaller
//      side for inner and full joins, broadcasts the left side for right join, and broadcasts
//      right side for left join.
//   2. Pick cartesian product if join type is inner like.
//   3. Pick broadcast nested loop join as the final solution. It may OOM but we don't have
//      other choice. It broadcasts the smaller side for inner and full joins, broadcasts the
//      left side for right join, and broadcasts right side for left join.

Notes

Note 1

If your spark version is less then 3.0.0, only broadcast hash join's hints are supported. Only spark version 3.0.0 and later support all 4 join hints.

Note 2

The hint() function is not case sensitive. So, you can give the join hint in any case - uppercase, lowecase, mixed case..

For example - all these are valid hints, recognized by spark:

  • hint('mErGe')
  • hint('merge')
  • hint('MERGE')

© pySparkGuide.com 2024 | Website was autogenerated on 2024-04-24

Brought to you by Niraj Zade - Website, Linkedin

~ whoever owns storage, owns computing ~