Ben Chuanlong Du's Blog

It is never too late to learn.

Broadcast Join in Spark

Tips and Traps

  1. BroadcastHashJoin, i.e., map-side join is fast. Use BroadcastHashJoin if possible. Notice that Spark will automatically use BroacastHashJoin if a table in inner join has a size less then the configured BroadcastHashJoin limit.

  2. Notice that BroadcastJoin only works for inner joins. If you have a outer join, BroadcastJoin won't happend even if you explicitly Broadcast a DataFrame.

In [1]:
import pandas as pd
import findspark

findspark.init("/opt/spark-3.0.0-bin-hadoop3.2/")

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import *
from pyspark.sql.types import StructType

spark = SparkSession.builder.appName("PySpark_Union").enableHiveSupport().getOrCreate()
In [5]:
df1 = spark.createDataFrame(
    pd.DataFrame(
        data=[
            ["Ben", 2],
            ["Dan", 4],
            ["Will", 1],
        ],
        columns=["name", "id"],
    )
)
df1.show()
+----+---+
|name| id|
+----+---+
| Ben|  2|
| Dan|  4|
|Will|  1|
+----+---+

In [6]:
df2 = spark.createDataFrame(
    pd.DataFrame(
        data=[
            ["Ben", 30],
            ["Dan", 25],
            ["Will", 26],
        ],
        columns=["name", "age"],
    )
)
df2.show()
+----+---+
|name|age|
+----+---+
| Ben| 30|
| Dan| 25|
|Will| 26|
+----+---+

In [8]:
df1.join(df2, ["name"]).explain()
== Physical Plan ==
*(5) Project [name#25, id#26L, age#39L]
+- *(5) SortMergeJoin [name#25], [name#38], Inner
   :- *(2) Sort [name#25 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(name#25, 200), true, [id=#81]
   :     +- *(1) Filter isnotnull(name#25)
   :        +- *(1) Scan ExistingRDD[name#25,id#26L]
   +- *(4) Sort [name#38 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(name#38, 200), true, [id=#87]
         +- *(3) Filter isnotnull(name#38)
            +- *(3) Scan ExistingRDD[name#38,age#39L]


Notice that BroadcastHashJoin is used in the following execution plan.

In [9]:
df1.join(broadcast(df2), ["name"]).explain()
== Physical Plan ==
*(2) Project [name#25, id#26L, age#39L]
+- *(2) BroadcastHashJoin [name#25], [name#38], Inner, BuildRight
   :- *(2) Filter isnotnull(name#25)
   :  +- *(2) Scan ExistingRDD[name#25,id#26L]
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false])), [id=#122]
      +- *(1) Filter isnotnull(name#38)
         +- *(1) Scan ExistingRDD[name#38,age#39L]


Notice that BroadcastHashJoin cannot be used for outer joins!

In [11]:
df1.join(broadcast(df2), ["name"], "right_outer").explain()
== Physical Plan ==
*(5) Project [name#38, id#26L, age#39L]
+- SortMergeJoin [name#25], [name#38], RightOuter
   :- *(2) Sort [name#25 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(name#25, 200), true, [id=#154]
   :     +- *(1) Filter isnotnull(name#25)
   :        +- *(1) Scan ExistingRDD[name#25,id#26L]
   +- *(4) Sort [name#38 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(name#38, 200), true, [id=#159]
         +- *(3) Scan ExistingRDD[name#38,age#39L]


In [ ]:
 

Comments