Comment¶
unionrelies on column order rather than column names. This is the same as in SQL. For columns that the type don’t match, the super type is used. However, this is really dangerous if you are careful. It is suggested that you define a function call unionByName to hanle this.def unionByName(df1, df2): ...A way to avoid the ordering issue is to select columns to make sure that columns of the 2 DataFrames have the same ordering.
Union 2 PySpark DataFrames. Notice that pyspark.sql.DataFrame.union does not dedup by default (since Spark 2.0).
Union multiple PySpark DataFrames at once using
functools.reduce.The number of partitions of the final DataFrame equals the sum of the number of partitions of each of the unioned DataFrame.
import pandas as pd
from functools import reduceimport findspark
findspark.init("/opt/spark-3.0.1-bin-hadoop3.2")
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col
spark = (
SparkSession.builder.appName("PySpark_Notebook").enableHiveSupport().getOrCreate()
)df1 = spark.createDataFrame(
pd.DataFrame(
data=(
(1, "a", "foo", 3.0),
(2, "b", "bar", 4.0),
(3, "c", "foo", 5.0),
(4, "d", "bar", 7.0),
),
columns=("col1", "col2", "col3", "col4"),
)
)
df1.show()+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
| 1| a| foo| 3.0|
| 2| b| bar| 4.0|
| 3| c| foo| 5.0|
| 4| d| bar| 7.0|
+----+----+----+----+
df1.rdd.getNumPartitions()4df2 = df1.filter(col("col1") <= 2)
df2.show()+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
| 1| a| foo| 3.0|
| 2| b| bar| 4.0|
+----+----+----+----+
df2.rdd.getNumPartitions()4df1.union(df2).show()+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
| 1| a| foo| 3.0|
| 2| b| bar| 4.0|
| 3| c| foo| 5.0|
| 4| d| bar| 7.0|
| 1| a| foo| 3.0|
| 2| b| bar| 4.0|
+----+----+----+----+
df1.union(df2).rdd.getNumPartitions()8df3 = df1.filter(col("col1") > 2)
df3.show()+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
| 3| c| foo| 5.0|
| 4| d| bar| 7.0|
+----+----+----+----+
reduce(DataFrame.union, [df1, df2, df3]).show()+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
| 1| a| foo| 3.0|
| 2| b| bar| 4.0|
| 3| c| foo| 5.0|
| 4| d| bar| 7.0|
| 1| a| foo| 3.0|
| 2| b| bar| 4.0|
| 3| c| foo| 5.0|
| 4| d| bar| 7.0|
+----+----+----+----+
reduce(DataFrame.union, [df1, df2, df3]).rdd.getNumPartitions()12A way to avoid the ordering issue is to select columns to make sure that columns of the 2 DataFrames have the same ordering.
df4 = df2.select("col4", "col3", "col2", "col1")
df4.show()+----+----+----+----+
|col4|col3|col2|col1|
+----+----+----+----+
| 3.0| foo| a| 1|
| 4.0| bar| b| 2|
+----+----+----+----+
df1.select(*df4.columns).show()+----+----+----+----+
|col4|col3|col2|col1|
+----+----+----+----+
| 3.0| foo| a| 1|
| 4.0| bar| b| 2|
| 5.0| foo| c| 3|
| 7.0| bar| d| 4|
+----+----+----+----+
df1.select(*df4.columns).union(df4).show()+----+----+----+----+
|col4|col3|col2|col1|
+----+----+----+----+
| 3.0| foo| a| 1|
| 4.0| bar| b| 2|
| 5.0| foo| c| 3|
| 7.0| bar| d| 4|
| 3.0| foo| a| 1|
| 4.0| bar| b| 2|
+----+----+----+----+
An exception is raised if the numbers of columns of the 2 DataFrames do not match.
df5 = spark.createDataFrame(
pd.DataFrame(data=((1, "hello"), (2, "world")), columns=("freq", "word"))
)
df5.show()+----+-----+
|freq| word|
+----+-----+
| 1|hello|
| 2|world|
+----+-----+
df6 = spark.createDataFrame(
pd.DataFrame(
data=(("how", 1000, 0), ("are", 300, 0), ("you", 100, 0)),
columns=("word", "freq", "group"),
)
)
df2.show()+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
| 1| a| foo| 3.0|
| 2| b| bar| 4.0|
+----+----+----+----+
df5.union(df6)---------------------------------------------------------------------------
AnalysisException Traceback (most recent call last)
<ipython-input-42-533c1d47b5b6> in <module>
----> 1 df5.union(df6)
/opt/spark-3.0.1-bin-hadoop3.2/python/pyspark/sql/dataframe.py in union(self, other)
1556 Also as standard in SQL, this function resolves columns by position (not by name).
1557 """
-> 1558 return DataFrame(self._jdf.union(other._jdf), self.sql_ctx)
1559
1560 @since(1.3)
/opt/spark-3.0.1-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
1302
1303 answer = self.gateway_client.send_command(command)
-> 1304 return_value = get_return_value(
1305 answer, self.gateway_client, self.target_id, self.name)
1306
/opt/spark-3.0.1-bin-hadoop3.2/python/pyspark/sql/utils.py in deco(*a, **kw)
132 # Hide where the exception came from that shows a non-Pythonic
133 # JVM exception message.
--> 134 raise_from(converted)
135 else:
136 raise
/opt/spark-3.0.1-bin-hadoop3.2/python/pyspark/sql/utils.py in raise_from(e)
AnalysisException: Union can only be performed on tables with the same number of columns, but the first table has 2 columns and the second table has 3 columns;;
'Union
:- LogicalRDD [freq#370L, word#371], false
+- LogicalRDD [word#383, freq#384L, group#385L], false
References¶
https://
https://
https://
https://