Ben Chuanlong Du's Blog

And let it direct your passion with reason.

Types of Joins of Spark DataFrames

Comments

  1. It is suggested that you always pass a list of columns to the parameter on even if there's only one column for joining.

  2. None in a pandas DataFrame is converted to NaN instead of null!

  3. Spark allows using following join types:

    • inner (default)
    • cross
    • outer
    • full, fullouter, full_outer
    • left, leftouter, left_outer
    • right, rightouter, right_outer
    • semi, leftsemi, left_semi
    • anti, leftanti, left_anti
In [1]:
import pandas as pd
import findspark

findspark.init("/opt/spark-3.1.1-bin-hadoop3.2/")

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import *
from pyspark.sql.types import StructType

spark = SparkSession.builder.appName("Join").enableHiveSupport().getOrCreate()
In [2]:
employees = spark.createDataFrame(
    pd.DataFrame(
        [
            ("Rafferty", 31),
            ("Jones", 33),
            ("Heisenberg", 33),
            ("Robinson", 34),
            ("Smith", 34),
            ("Ben", 50),
            ("Williams", None),
        ],
        columns=["last_name", "depart_id"],
    )
)
employees.show()
+----------+---------+
| last_name|depart_id|
+----------+---------+
|  Rafferty|     31.0|
|     Jones|     33.0|
|Heisenberg|     33.0|
|  Robinson|     34.0|
|     Smith|     34.0|
|       Ben|     50.0|
|  Williams|      NaN|
+----------+---------+

In [3]:
departments = spark.createDataFrame(
    pd.DataFrame(
        [(31, "Sales"), (33, "Engineering"), (34, "Clerical"), (35, "Marketing")],
        columns=["depart_id", "depart_name"],
    )
)
departments.show()
+---------+-----------+
|depart_id|depart_name|
+---------+-----------+
|       31|      Sales|
|       33|Engineering|
|       34|   Clerical|
|       35|  Marketing|
+---------+-----------+

Inner Join

In [6]:
employees.join(departments, ["depart_id"]).show()
+---------+----------+-----------+
|depart_id| last_name|depart_name|
+---------+----------+-----------+
|     34.0|  Robinson|   Clerical|
|     34.0|     Smith|   Clerical|
|     31.0|  Rafferty|      Sales|
|     33.0|     Jones|Engineering|
|     33.0|Heisenberg|Engineering|
+---------+----------+-----------+

In [12]:
employees.join(departments, employees["depart_id"] == departments["depart_id"]).select(
    employees["depart_id"],
    employees["last_name"],
    departments["depart_name"],
).show()
+---------+----------+-----------+
|depart_id| last_name|depart_name|
+---------+----------+-----------+
|     34.0|  Robinson|   Clerical|
|     34.0|     Smith|   Clerical|
|     31.0|  Rafferty|      Sales|
|     33.0|     Jones|Engineering|
|     33.0|Heisenberg|Engineering|
+---------+----------+-----------+

In [18]:
employees.alias("l").join(
    departments.alias("r"), employees["depart_id"] == departments["depart_id"]
).select(
    "l.last_name",
    "l.depart_id",
    "r.depart_name",
).show()
+----------+---------+-----------+
| last_name|depart_id|depart_name|
+----------+---------+-----------+
|  Robinson|     34.0|   Clerical|
|     Smith|     34.0|   Clerical|
|  Rafferty|     31.0|      Sales|
|     Jones|     33.0|Engineering|
|Heisenberg|     33.0|Engineering|
+----------+---------+-----------+

In [21]:
employees.alias("l").join(
    departments.alias("r"),
    (employees["depart_id"] == departments["depart_id"])
    & (departments["depart_name"] == "Sales"),
).select(
    "l.last_name",
    "l.depart_id",
    "r.depart_name",
).show()
+---------+---------+-----------+
|last_name|depart_id|depart_name|
+---------+---------+-----------+
| Rafferty|     31.0|      Sales|
+---------+---------+-----------+

In [22]:
 
+----------+---------+-----------+
| last_name|depart_id|depart_name|
+----------+---------+-----------+
|  Williams|      NaN|       null|
|  Robinson|     34.0|       null|
|     Smith|     34.0|       null|
|       Ben|     50.0|       null|
|  Rafferty|     31.0|      Sales|
|     Jones|     33.0|       null|
|Heisenberg|     33.0|       null|
+----------+---------+-----------+

Left Outer Join

  1. If you specify a (list) column name(s) for the joining condition, there will be no duplicated column names in the joining result. The joining column from the left table is used by default. However, you can still refer to joining columns in the right table by specifying full column names.

Column Name(s) vs Column Expression(s) as the Joining Condition

Similar to inner join, there's no duplicated/identical column names when you use a (list of) column name(s) for joining. However, duplicated columns happen if the joining columns in the 2 tables have the same name and a column expression is used as the joining condition.

In [24]:
employees.join(departments, "depart_id", "left_outer").show()
+---------+----------+-----------+
|depart_id| last_name|depart_name|
+---------+----------+-----------+
|      NaN|  Williams|       null|
|     34.0|  Robinson|   Clerical|
|     34.0|     Smith|   Clerical|
|     50.0|       Ben|       null|
|     31.0|  Rafferty|      Sales|
|     33.0|     Jones|Engineering|
|     33.0|Heisenberg|Engineering|
+---------+----------+-----------+

In [25]:
employees.join(departments, ["depart_id"], "left_outer").show()
+---------+----------+-----------+
|depart_id| last_name|depart_name|
+---------+----------+-----------+
|      NaN|  Williams|       null|
|     34.0|  Robinson|   Clerical|
|     34.0|     Smith|   Clerical|
|     50.0|       Ben|       null|
|     31.0|  Rafferty|      Sales|
|     33.0|     Jones|Engineering|
|     33.0|Heisenberg|Engineering|
+---------+----------+-----------+

In [29]:
employees.join(
    departments, employees["depart_id"] == departments["depart_id"], "left_outer"
).show()
+----------+---------+---------+-----------+
| last_name|depart_id|depart_id|depart_name|
+----------+---------+---------+-----------+
|  Williams|      NaN|     null|       null|
|  Robinson|     34.0|       34|   Clerical|
|     Smith|     34.0|       34|   Clerical|
|       Ben|     50.0|     null|       null|
|  Rafferty|     31.0|       31|      Sales|
|     Jones|     33.0|       33|Engineering|
|Heisenberg|     33.0|       33|Engineering|
+----------+---------+---------+-----------+

If you specify a (list) column name(s) for the joining condition, there will be no duplicated column names in the joining result. The joining column from the left table is used by default. However, you can still refer to joining columns in the right table (by specifying full column names) before an action happens. Taking the following left join as illustration, the DataFrame.filter is a transform (not an action), so you can still refer to the joining column (departments["depart_id"]) in the right table.

In [28]:
employees.join(departments, ["depart_id"], "left_outer").filter(
    departments["depart_id"].isNull()
).show()
+---------+---------+-----------+
|depart_id|last_name|depart_name|
+---------+---------+-----------+
|      NaN| Williams|       null|
|     50.0|      Ben|       null|
+---------+---------+-----------+

However, DataFrame.withColumn is an action which means that at the time the withColumn operation is executed, previous operations on the DataFrame has been executed which means that the column departments["depart_id"] is gone so you couldn't access it any more.

In [34]:
employees.join(departments, ["depart_id"], "left_outer").withColumn(
    "has_depart", departments["depart_id"].isNotNull()
).show()
---------------------------------------------------------------------------
AnalysisException                         Traceback (most recent call last)
<ipython-input-34-d4acbe0a8957> in <module>
----> 1 employees.join(departments, ["depart_id"], "left_outer").withColumn("has_depart", departments["depart_id"].isNotNull()).show()

/opt/spark-3.1.1-bin-hadoop3.2/python/pyspark/sql/dataframe.py in withColumn(self, colName, col)
   2453         """
   2454         assert isinstance(col, Column), "col should be Column"
-> 2455         return DataFrame(self._jdf.withColumn(colName, col._jc), self.sql_ctx)
   2456 
   2457     def withColumnRenamed(self, existing, new):

/opt/spark-3.1.1-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1302 
   1303         answer = self.gateway_client.send_command(command)
-> 1304         return_value = get_return_value(
   1305             answer, self.gateway_client, self.target_id, self.name)
   1306 

/opt/spark-3.1.1-bin-hadoop3.2/python/pyspark/sql/utils.py in deco(*a, **kw)
    115                 # Hide where the exception came from that shows a non-Pythonic
    116                 # JVM exception message.
--> 117                 raise converted from None
    118             else:
    119                 raise

AnalysisException: Resolved attribute(s) depart_id#39L missing from depart_id#27,last_name#26,depart_name#40 in operator !Project [depart_id#27, last_name#26, depart_name#40, isnotnull(depart_id#39L) AS has_depart#414]. Attribute(s) with the same name appear in the operation: depart_id. Please check if the right attribute(s) are used.;
!Project [depart_id#27, last_name#26, depart_name#40, isnotnull(depart_id#39L) AS has_depart#414]
+- Project [depart_id#27, last_name#26, depart_name#40]
   +- Join LeftOuter, (depart_id#27 = cast(depart_id#39L as double))
      :- LogicalRDD [last_name#26, depart_id#27], false
      +- LogicalRDD [depart_id#39L, depart_name#40], false
In [40]:
employees.join(departments, ["depart_id"], "left_outer").select(
    departments["depart_id"].isNotNull().alias("has_depart")
).show()
---------------------------------------------------------------------------
AnalysisException                         Traceback (most recent call last)
<ipython-input-40-15381a8f7b89> in <module>
----> 1 employees.join(departments, ["depart_id"], "left_outer").select(
      2     departments["depart_id"].isNotNull().alias("has_depart")
      3 ).show()

/opt/spark-3.1.1-bin-hadoop3.2/python/pyspark/sql/dataframe.py in select(self, *cols)
   1667         [Row(name='Alice', age=12), Row(name='Bob', age=15)]
   1668         """
-> 1669         jdf = self._jdf.select(self._jcols(*cols))
   1670         return DataFrame(jdf, self.sql_ctx)
   1671 

/opt/spark-3.1.1-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1302 
   1303         answer = self.gateway_client.send_command(command)
-> 1304         return_value = get_return_value(
   1305             answer, self.gateway_client, self.target_id, self.name)
   1306 

/opt/spark-3.1.1-bin-hadoop3.2/python/pyspark/sql/utils.py in deco(*a, **kw)
    115                 # Hide where the exception came from that shows a non-Pythonic
    116                 # JVM exception message.
--> 117                 raise converted from None
    118             else:
    119                 raise

AnalysisException: Resolved attribute(s) depart_id#39L missing from depart_id#27,last_name#26,depart_name#40 in operator !Project [isnotnull(depart_id#39L) AS has_depart#517]. Attribute(s) with the same name appear in the operation: depart_id. Please check if the right attribute(s) are used.;
!Project [isnotnull(depart_id#39L) AS has_depart#517]
+- Project [depart_id#27, last_name#26, depart_name#40]
   +- Join LeftOuter, (depart_id#27 = cast(depart_id#39L as double))
      :- LogicalRDD [last_name#26, depart_id#27], false
      +- LogicalRDD [depart_id#39L, depart_name#40], false

You can of course use a column expression as the joining condition which will keep the joining column(s) from both tables.

In [38]:
employees.join(
    departments, employees["depart_id"] == departments["depart_id"], "left_outer"
).withColumn("has_depart", departments["depart_id"].isNotNull()).show()
+----------+---------+---------+-----------+----------+
| last_name|depart_id|depart_id|depart_name|has_depart|
+----------+---------+---------+-----------+----------+
|  Williams|      NaN|     null|       null|     false|
|  Robinson|     34.0|       34|   Clerical|      true|
|     Smith|     34.0|       34|   Clerical|      true|
|       Ben|     50.0|     null|       null|     false|
|  Rafferty|     31.0|       31|      Sales|      true|
|     Jones|     33.0|       33|Engineering|      true|
|Heisenberg|     33.0|       33|Engineering|      true|
+----------+---------+---------+-----------+----------+

An example of left join with a complicated joining condition.

In [4]:
employees.alias("l").join(
    departments.alias("r"),
    (employees["depart_id"] == departments["depart_id"])
    & (departments["depart_name"] == "Sales"),
    "left_outer",
).select(
    "l.last_name",
    "l.depart_id",
    "r.depart_name",
).show()
+----------+---------+-----------+
| last_name|depart_id|depart_name|
+----------+---------+-----------+
|  Williams|      NaN|       null|
|  Robinson|     34.0|       null|
|     Smith|     34.0|       null|
|       Ben|     50.0|       null|
|  Rafferty|     31.0|      Sales|
|     Jones|     33.0|       null|
|Heisenberg|     33.0|       null|
+----------+---------+-----------+

In [4]:
employees.alias("l").join(
    departments.alias("r"),
    (col("l.depart_id") == col("r.depart_id")) & (col("r.depart_name") == "Sales"),
    "left_outer",
).select(
    "l.last_name",
    "l.depart_id",
    "r.depart_name",
).show()
+----------+---------+-----------+
| last_name|depart_id|depart_name|
+----------+---------+-----------+
|  Williams|      NaN|       null|
|  Robinson|     34.0|       null|
|     Smith|     34.0|       null|
|       Ben|     50.0|       null|
|  Rafferty|     31.0|      Sales|
|     Jones|     33.0|       null|
|Heisenberg|     33.0|       null|
+----------+---------+-----------+

In [6]:
employees.alias("l").join(
    departments.alias("r"),
    (col("l.depart_id") == col("r.depart_id")) & (col("r.depart_name") == "Sales"),
    "left_outer",
).drop(
    col("r.depart_id"),
).show()
+----------+---------+-----------+
| last_name|depart_id|depart_name|
+----------+---------+-----------+
|  Williams|      NaN|       null|
|  Robinson|     34.0|       null|
|     Smith|     34.0|       null|
|       Ben|     50.0|       null|
|  Rafferty|     31.0|      Sales|
|     Jones|     33.0|       null|
|Heisenberg|     33.0|       null|
+----------+---------+-----------+

Right Outer Join

Symmetric to left out join. Please refer to left out join above.

In [48]:
employees.join(departments, ["depart_id"], "right_outer").show()
+---------+----------+-----------+
|depart_id| last_name|depart_name|
+---------+----------+-----------+
|       35|      null|  Marketing|
|       34|  Robinson|   Clerical|
|       34|     Smith|   Clerical|
|       31|  Rafferty|      Sales|
|       33|     Jones|Engineering|
|       33|Heisenberg|Engineering|
+---------+----------+-----------+

Full Outer Join

In [62]:
employees.join(departments, ["depart_id"], "full_outer").show()
+---------+----------+-----------+
|depart_id| last_name|depart_name|
+---------+----------+-----------+
|      NaN|  Williams|       null|
|     35.0|      null|  Marketing|
|     34.0|  Robinson|   Clerical|
|     34.0|     Smith|   Clerical|
|     50.0|       Ben|       null|
|     31.0|  Rafferty|      Sales|
|     33.0|     Jones|Engineering|
|     33.0|Heisenberg|Engineering|
+---------+----------+-----------+

A - B

In [56]:
employees.show()
+----------+---------+
| last_name|depart_id|
+----------+---------+
|  Rafferty|     31.0|
|     Jones|     33.0|
|Heisenberg|     33.0|
|  Robinson|     34.0|
|     Smith|     34.0|
|       Ben|     50.0|
|  Williams|      NaN|
+----------+---------+

In [58]:
departments.show()
+---------+-----------+
|depart_id|depart_name|
+---------+-----------+
|       31|      Sales|
|       33|Engineering|
|       34|   Clerical|
|       35|  Marketing|
+---------+-----------+

In [54]:
employees.join(departments, ["depart_id"], "left_outer").filter(
    departments["depart_id"].isNull()
).show()
+---------+---------+-----------+
|depart_id|last_name|depart_name|
+---------+---------+-----------+
|      NaN| Williams|       null|
|     50.0|      Ben|       null|
+---------+---------+-----------+

A $\triangle$ B (Symmetric Difference)

In [64]:
employees.join(departments, ["depart_id"], "full_outer").filter(
    employees["depart_id"].isNull() | departments["depart_id"].isNull()
).show()
+---------+---------+-----------+
|depart_id|last_name|depart_name|
+---------+---------+-----------+
|      NaN| Williams|       null|
|     35.0|     null|  Marketing|
|     50.0|      Ben|       null|
+---------+---------+-----------+

Cartesian Join

Notice that you have to have "spark.sql.crossJoin.enabled" set to true in order to perform cartesian join on 2 DataFrames.

In [59]:
employees.join(departments).show()
+----------+---------+---------+-----------+
| last_name|depart_id|depart_id|depart_name|
+----------+---------+---------+-----------+
|  Rafferty|     31.0|       31|      Sales|
|  Rafferty|     31.0|       33|Engineering|
|  Rafferty|     31.0|       34|   Clerical|
|  Rafferty|     31.0|       35|  Marketing|
|     Jones|     33.0|       31|      Sales|
|     Jones|     33.0|       33|Engineering|
|     Jones|     33.0|       34|   Clerical|
|     Jones|     33.0|       35|  Marketing|
|Heisenberg|     33.0|       31|      Sales|
|Heisenberg|     33.0|       33|Engineering|
|Heisenberg|     33.0|       34|   Clerical|
|Heisenberg|     33.0|       35|  Marketing|
|  Robinson|     34.0|       31|      Sales|
|  Robinson|     34.0|       33|Engineering|
|  Robinson|     34.0|       34|   Clerical|
|  Robinson|     34.0|       35|  Marketing|
|     Smith|     34.0|       31|      Sales|
|     Smith|     34.0|       33|Engineering|
|     Smith|     34.0|       34|   Clerical|
|     Smith|     34.0|       35|  Marketing|
+----------+---------+---------+-----------+
only showing top 20 rows

In [65]:
products = spark.createDataFrame(
    pd.DataFrame(
        data=(
            ("steak", "1990-01-01", "2000-01-01", 150),
            ("steak", "2000-01-02", "2020-01-01", 180),
            ("fish", "1990-01-01", "2020-01-01", 100),
        ),
        columns=("name", "startDate", "endDate", "price"),
    )
)
products.show()
+-----+----------+----------+-----+
| name| startDate|   endDate|price|
+-----+----------+----------+-----+
|steak|1990-01-01|2000-01-01|  150|
|steak|2000-01-02|2020-01-01|  180|
| fish|1990-01-01|2020-01-01|  100|
+-----+----------+----------+-----+

In [66]:
orders = spark.createDataFrame(
    pd.DataFrame(
        data=(("1995-01-01", "steak"), ("2000-01-01", "fish"), ("2005-01-01", "steak")),
        columns=("date", "product"),
    )
)
orders.show()
+----------+-------+
|      date|product|
+----------+-------+
|1995-01-01|  steak|
|2000-01-01|   fish|
|2005-01-01|  steak|
+----------+-------+

In [68]:
orders.join(
    products,
    (orders["product"] == products["name"])
    & orders["date"].between(products["startDate"], products["endDate"]),
).show()
+----------+-------+-----+----------+----------+-----+
|      date|product| name| startDate|   endDate|price|
+----------+-------+-----+----------+----------+-----+
|1995-01-01|  steak|steak|1990-01-01|2000-01-01|  150|
|2005-01-01|  steak|steak|2000-01-02|2020-01-01|  180|
|2000-01-01|   fish| fish|1990-01-01|2020-01-01|  100|
+----------+-------+-----+----------+----------+-----+

In [ ]:
 

Comments