Comment¶
union
relies on column order rather than column names. This is the same as in SQL. For columns that the type don't match, the super type is used. However, this is really dangerous if you are careful. It is suggested that you define a function call unionByName to hanle this.def unionByName(df1, df2): ...
A way to avoid the ordering issue is to select columns to make sure that columns of the 2 DataFrames have the same ordering.
Union 2 PySpark DataFrames. Notice that pyspark.sql.DataFrame.union does not dedup by default (since Spark 2.0).
Union multiple PySpark DataFrames at once using
functools.reduce
.The number of partitions of the final DataFrame equals the sum of the number of partitions of each of the unioned DataFrame.
import pandas as pd
from functools import reduce
import findspark
findspark.init("/opt/spark-3.0.1-bin-hadoop3.2")
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col
spark = (
SparkSession.builder.appName("PySpark_Notebook").enableHiveSupport().getOrCreate()
)
df1 = spark.createDataFrame(
pd.DataFrame(
data=(
(1, "a", "foo", 3.0),
(2, "b", "bar", 4.0),
(3, "c", "foo", 5.0),
(4, "d", "bar", 7.0),
),
columns=("col1", "col2", "col3", "col4"),
)
)
df1.show()
df1.rdd.getNumPartitions()
df2 = df1.filter(col("col1") <= 2)
df2.show()
df2.rdd.getNumPartitions()
df1.union(df2).show()
df1.union(df2).rdd.getNumPartitions()
df3 = df1.filter(col("col1") > 2)
df3.show()
reduce(DataFrame.union, [df1, df2, df3]).show()
reduce(DataFrame.union, [df1, df2, df3]).rdd.getNumPartitions()
A way to avoid the ordering issue is to select columns to make sure that columns of the 2 DataFrames have the same ordering.
df4 = df2.select("col4", "col3", "col2", "col1")
df4.show()
df1.select(*df4.columns).show()
df1.select(*df4.columns).union(df4).show()
An exception is raised if the numbers of columns of the 2 DataFrames do not match.
df5 = spark.createDataFrame(
pd.DataFrame(data=((1, "hello"), (2, "world")), columns=("freq", "word"))
)
df5.show()
df6 = spark.createDataFrame(
pd.DataFrame(
data=(("how", 1000, 0), ("are", 300, 0), ("you", 100, 0)),
columns=("word", "freq", "group"),
)
)
df2.show()
df5.union(df6)
References¶
https://stackoverflow.com/questions/37612622/spark-unionall-multiple-dataframes
https://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/sql/Dataset.html
https://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/sql/functions.html
https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Row.html