Ben Chuanlong Du's Blog

And let it direct your passion with reason.

Inner Join of Spark DataFrames

Tips and Traps

  1. Select only needed columns before joining.

  2. Rename joining column names to be identical (if different) before joining.

In [1]:
import pandas as pd
import findspark

findspark.init("/opt/spark-3.1.1-bin-hadoop3.2/")

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import *
from pyspark.sql.types import StructType

spark = SparkSession.builder.appName("Join").enableHiveSupport().getOrCreate()

Same Names in Both Tables

In [3]:
left = spark.createDataFrame(
    pd.DataFrame(
        data=(("bob", "2015-01-13", 4), ("alice", "2015-04-23", 10)),
        columns=("name", "date", "duration"),
    )
)
left.show()
+-----+----------+--------+
| name|      date|duration|
+-----+----------+--------+
|  bob|2015-01-13|       4|
|alice|2015-04-23|      10|
+-----+----------+--------+

In [5]:
right = spark.createDataFrame(
    pd.DataFrame(data=(("alice", 100), ("bob", 23)), columns=("name", "upload"))
)
right.show()
+-----+------+
| name|upload|
+-----+------+
|alice|   100|
|  bob|    23|
+-----+------+

Duplicate columns happens if you use an expression as join condition!

In [13]:
left.join(right, left["name"] == right["name"]).show()
+-----+----------+--------+-----+------+
| name|      date|duration| name|upload|
+-----+----------+--------+-----+------+
|alice|2015-04-23|      10|alice|   100|
|  bob|2015-01-13|       4|  bob|    23|
+-----+----------+--------+-----+------+

In [14]:
left.join(right, left.name == right.name).show()
+-----+----------+--------+-----+------+
| name|      date|duration| name|upload|
+-----+----------+--------+-----+------+
|alice|2015-04-23|      10|alice|   100|
|  bob|2015-01-13|       4|  bob|    23|
+-----+----------+--------+-----+------+

Using (a list of) string names can avoid duplicate columns.

In [16]:
left.join(right, ["name"]).show()
+-----+----------+--------+------+
| name|      date|duration|upload|
+-----+----------+--------+------+
|alice|2015-04-23|      10|   100|
|  bob|2015-01-13|       4|    23|
+-----+----------+--------+------+

In [17]:
left.join(right, "name").show()
+-----+----------+--------+------+
| name|      date|duration|upload|
+-----+----------+--------+------+
|alice|2015-04-23|      10|   100|
|  bob|2015-01-13|       4|    23|
+-----+----------+--------+------+

Same Columns Not in Join

In [19]:
left = spark.createDataFrame(
    pd.DataFrame(
        data=(("bob", "2015-01-13", 4), ("alice", "2015-04-23", 10)),
        columns=("name", "date", "duration"),
    )
)
left.show()
+-----+----------+--------+
| name|      date|duration|
+-----+----------+--------+
|  bob|2015-01-13|       4|
|alice|2015-04-23|      10|
+-----+----------+--------+

In [20]:
right = spark.createDataFrame(
    pd.DataFrame(
        data=(("alice", 100, 1), ("bob", 23, 2)), columns=("name", "upload", "duration")
    )
)
right.show()
+-----+------+--------+
| name|upload|duration|
+-----+------+--------+
|alice|   100|       1|
|  bob|    23|       2|
+-----+------+--------+

Join the 2 DataFrame by the name column. Duplicate columns happen as the duration column is in both DataFrame.

In [21]:
left.join(right, "name").show()
+-----+----------+--------+------+--------+
| name|      date|duration|upload|duration|
+-----+----------+--------+------+--------+
|alice|2015-04-23|      10|   100|       1|
|  bob|2015-01-13|       4|    23|       2|
+-----+----------+--------+------+--------+

Select via string names works on non duplicate columns. Exception will be throw if you select a duplicate column using string names.

In [22]:
left.alias("l").join(right.alias("r"), "name").select("name", "date").show()
+-----+----------+
| name|      date|
+-----+----------+
|alice|2015-04-23|
|  bob|2015-01-13|
+-----+----------+

Select using column objects.

In [23]:
left.join(right, "name").select(left["name"], left["date"], left["duration"]).show()
+-----+----------+--------+
| name|      date|duration|
+-----+----------+--------+
|alice|2015-04-23|      10|
|  bob|2015-01-13|       4|
+-----+----------+--------+

Using table alias is probably the most convenient way (in syntax). Similar to SQL, you don't have to specify table when there's no ambiguition.

In [24]:
left.alias("l").join(right.alias("r"), "name").select(
    "name", "date", "l.duration", "upload"
).show()
+-----+----------+--------+------+
| name|      date|duration|upload|
+-----+----------+--------+------+
|alice|2015-04-23|      10|   100|
|  bob|2015-01-13|       4|    23|
+-----+----------+--------+------+

Star in Select

Notice that * can be used to select all columns from a table.

In [25]:
left.alias("l").join(right.alias("r"), "name").select("l.*").show()
+-----+----------+--------+
| name|      date|duration|
+-----+----------+--------+
|alice|2015-04-23|      10|
|  bob|2015-01-13|       4|
+-----+----------+--------+

Different Names for Joining

If you want to do inner join only, it is suggested that you rename the columns to join to have the same names so that

  1. minimal number of columns
  2. no duplicate columns
In [27]:
left = spark.createDataFrame(
    pd.DataFrame(
        data=(("bob", "2015-01-13", 4), ("alice", "2015-04-23", 10)),
        columns=("name", "date", "duration"),
    )
)
left.show()
+-----+----------+--------+
| name|      date|duration|
+-----+----------+--------+
|  bob|2015-01-13|       4|
|alice|2015-04-23|      10|
+-----+----------+--------+

In [28]:
right = spark.createDataFrame(
    pd.DataFrame(
        data=(("alice", 100, 1), ("bob", 23, 2)), columns=("nm", "upload", "duration")
    )
)
right.show()
+-----+------+--------+
|   nm|upload|duration|
+-----+------+--------+
|alice|   100|       1|
|  bob|    23|       2|
+-----+------+--------+

In [29]:
left.join(right, left["name"] == right["nm"]).show()
+-----+----------+--------+-----+------+--------+
| name|      date|duration|   nm|upload|duration|
+-----+----------+--------+-----+------+--------+
|alice|2015-04-23|      10|alice|   100|       1|
|  bob|2015-01-13|       4|  bob|    23|       2|
+-----+----------+--------+-----+------+--------+

In [30]:
left.join(right.withColumnRenamed("nm", "name"), ["name"]).show()
+-----+----------+--------+------+--------+
| name|      date|duration|upload|duration|
+-----+----------+--------+------+--------+
|alice|2015-04-23|      10|   100|       1|
|  bob|2015-01-13|       4|    23|       2|
+-----+----------+--------+------+--------+

In [ ]:
 

Comments