Things on this page are fragmentary and immature notes/thoughts of the author. Please read with your own judgement!
Tips and Traps¶
BroadcastHashJoin, i.e., map-side join is fast. Use BroadcastHashJoin if possible. Notice that Spark will automatically use BroacastHashJoin if a table in inner join has a size less then the configured BroadcastHashJoin limit.
Notice that BroadcastJoin only works for inner joins. If you have a outer join, BroadcastJoin won’t happend even if you explicitly Broadcast a DataFrame.
import pandas as pd
import findspark
findspark.init("/opt/spark-3.0.0-bin-hadoop3.2/")
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import *
from pyspark.sql.types import StructType
spark = SparkSession.builder.appName("PySpark_Union").enableHiveSupport().getOrCreate()df1 = spark.createDataFrame(
pd.DataFrame(
data=[
["Ben", 2],
["Dan", 4],
["Will", 1],
],
columns=["name", "id"],
)
)
df1.show()+----+---+
|name| id|
+----+---+
| Ben| 2|
| Dan| 4|
|Will| 1|
+----+---+
df2 = spark.createDataFrame(
pd.DataFrame(
data=[
["Ben", 30],
["Dan", 25],
["Will", 26],
],
columns=["name", "age"],
)
)
df2.show()+----+---+
|name|age|
+----+---+
| Ben| 30|
| Dan| 25|
|Will| 26|
+----+---+
df1.join(df2, ["name"]).explain()== Physical Plan ==
*(5) Project [name#25, id#26L, age#39L]
+- *(5) SortMergeJoin [name#25], [name#38], Inner
:- *(2) Sort [name#25 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(name#25, 200), true, [id=#81]
: +- *(1) Filter isnotnull(name#25)
: +- *(1) Scan ExistingRDD[name#25,id#26L]
+- *(4) Sort [name#38 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(name#38, 200), true, [id=#87]
+- *(3) Filter isnotnull(name#38)
+- *(3) Scan ExistingRDD[name#38,age#39L]
Notice that BroadcastHashJoin is used in the following execution plan.
df1.join(broadcast(df2), ["name"]).explain()== Physical Plan ==
*(2) Project [name#25, id#26L, age#39L]
+- *(2) BroadcastHashJoin [name#25], [name#38], Inner, BuildRight
:- *(2) Filter isnotnull(name#25)
: +- *(2) Scan ExistingRDD[name#25,id#26L]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false])), [id=#122]
+- *(1) Filter isnotnull(name#38)
+- *(1) Scan ExistingRDD[name#38,age#39L]
Notice that BroadcastHashJoin cannot be used for outer joins!
df1.join(broadcast(df2), ["name"], "right_outer").explain()== Physical Plan ==
*(5) Project [name#38, id#26L, age#39L]
+- SortMergeJoin [name#25], [name#38], RightOuter
:- *(2) Sort [name#25 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(name#25, 200), true, [id=#154]
: +- *(1) Filter isnotnull(name#25)
: +- *(1) Scan ExistingRDD[name#25,id#26L]
+- *(4) Sort [name#38 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(name#38, 200), true, [id=#159]
+- *(3) Scan ExistingRDD[name#38,age#39L]