Comments¶
It is suggested that you always pass a list of columns to the parameter
on
even if there's only one column for joining.None
in a pandas DataFrame is converted toNaN
instead ofnull
!Spark allows using following join types:
inner
(default)cross
outer
full
,fullouter
,full_outer
left
,leftouter
,left_outer
right
,rightouter
,right_outer
semi
,leftsemi
,left_semi
anti
,leftanti
,left_anti
import pandas as pd
import findspark
findspark.init("/opt/spark-3.1.1-bin-hadoop3.2/")
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import *
from pyspark.sql.types import StructType
spark = SparkSession.builder.appName("Join").enableHiveSupport().getOrCreate()
employees = spark.createDataFrame(
pd.DataFrame(
[
("Rafferty", 31),
("Jones", 33),
("Heisenberg", 33),
("Robinson", 34),
("Smith", 34),
("Ben", 50),
("Williams", None),
],
columns=["last_name", "depart_id"],
)
)
employees.show()
departments = spark.createDataFrame(
pd.DataFrame(
[(31, "Sales"), (33, "Engineering"), (34, "Clerical"), (35, "Marketing")],
columns=["depart_id", "depart_name"],
)
)
departments.show()
Inner Join¶
employees.join(departments, ["depart_id"]).show()
employees.join(departments, employees["depart_id"] == departments["depart_id"]).select(
employees["depart_id"],
employees["last_name"],
departments["depart_name"],
).show()
employees.alias("l").join(
departments.alias("r"), employees["depart_id"] == departments["depart_id"]
).select(
"l.last_name",
"l.depart_id",
"r.depart_name",
).show()
employees.alias("l").join(
departments.alias("r"),
(employees["depart_id"] == departments["depart_id"])
& (departments["depart_name"] == "Sales"),
).select(
"l.last_name",
"l.depart_id",
"r.depart_name",
).show()
Left Outer Join¶
- If you specify a (list) column name(s) for the joining condition, there will be no duplicated column names in the joining result. The joining column from the left table is used by default. However, you can still refer to joining columns in the right table by specifying full column names.
Column Name(s) vs Column Expression(s) as the Joining Condition¶
Similar to inner join, there's no duplicated/identical column names when you use a (list of) column name(s) for joining. However, duplicated columns happen if the joining columns in the 2 tables have the same name and a column expression is used as the joining condition.
employees.join(departments, "depart_id", "left_outer").show()
employees.join(departments, ["depart_id"], "left_outer").show()
employees.join(
departments, employees["depart_id"] == departments["depart_id"], "left_outer"
).show()
If you specify a (list) column name(s) for the joining condition,
there will be no duplicated column names in the joining result.
The joining column from the left table is used by default.
However,
you can still refer to joining columns in the right table
(by specifying full column names)
before an action happens.
Taking the following left join as illustration,
the DataFrame.filter
is a transform (not an action),
so you can still refer to the joining column (departments["depart_id"]
) in the right table.
employees.join(departments, ["depart_id"], "left_outer").filter(
departments["depart_id"].isNull()
).show()
However,
DataFrame.withColumn
is an action
which means that at the time the withColumn
operation is executed,
previous operations on the DataFrame has been executed
which means that the column departments["depart_id"]
is gone
so you couldn't access it any more.
employees.join(departments, ["depart_id"], "left_outer").withColumn(
"has_depart", departments["depart_id"].isNotNull()
).show()
employees.join(departments, ["depart_id"], "left_outer").select(
departments["depart_id"].isNotNull().alias("has_depart")
).show()
You can of course use a column expression as the joining condition which will keep the joining column(s) from both tables.
employees.join(
departments, employees["depart_id"] == departments["depart_id"], "left_outer"
).withColumn("has_depart", departments["depart_id"].isNotNull()).show()
An example of left join with a complicated joining condition.
employees.alias("l").join(
departments.alias("r"),
(employees["depart_id"] == departments["depart_id"])
& (departments["depart_name"] == "Sales"),
"left_outer",
).select(
"l.last_name",
"l.depart_id",
"r.depart_name",
).show()
employees.alias("l").join(
departments.alias("r"),
(col("l.depart_id") == col("r.depart_id")) & (col("r.depart_name") == "Sales"),
"left_outer",
).select(
"l.last_name",
"l.depart_id",
"r.depart_name",
).show()
employees.alias("l").join(
departments.alias("r"),
(col("l.depart_id") == col("r.depart_id")) & (col("r.depart_name") == "Sales"),
"left_outer",
).drop(
col("r.depart_id"),
).show()
Right Outer Join¶
Symmetric to left out join. Please refer to left out join above.
employees.join(departments, ["depart_id"], "right_outer").show()
Full Outer Join¶
employees.join(departments, ["depart_id"], "full_outer").show()
A - B¶
employees.show()
departments.show()
employees.join(departments, ["depart_id"], "left_outer").filter(
departments["depart_id"].isNull()
).show()
A $\triangle$ B (Symmetric Difference)¶
employees.join(departments, ["depart_id"], "full_outer").filter(
employees["depart_id"].isNull() | departments["depart_id"].isNull()
).show()
Cartesian Join¶
Notice that you have to have "spark.sql.crossJoin.enabled" set to true
in order to perform cartesian join on 2 DataFrames.
employees.join(departments).show()
products = spark.createDataFrame(
pd.DataFrame(
data=(
("steak", "1990-01-01", "2000-01-01", 150),
("steak", "2000-01-02", "2020-01-01", 180),
("fish", "1990-01-01", "2020-01-01", 100),
),
columns=("name", "startDate", "endDate", "price"),
)
)
products.show()
orders = spark.createDataFrame(
pd.DataFrame(
data=(("1995-01-01", "steak"), ("2000-01-01", "fish"), ("2005-01-01", "steak")),
columns=("date", "product"),
)
)
orders.show()
orders.join(
products,
(orders["product"] == products["name"])
& orders["date"].between(products["startDate"], products["endDate"]),
).show()