Ben Chuanlong Du's Blog

And let it direct your passion with reason.

Union DataFrames in Spark

Comment

  1. union relies on column order rather than column names. This is the same as in SQL. For columns that the type don't match, the super type is used. However, this is really dangerous if you are careful. It is suggested that you define a function call unionByName to hanle this.

     def unionByName(df1, df2):
         ...

    A way to avoid the ordering issue is to select columns to make sure that columns of the 2 DataFrames have the same ordering.

  2. Union 2 PySpark DataFrames. Notice that pyspark.sql.DataFrame.union does not dedup by default (since Spark 2.0).

  3. Union multiple PySpark DataFrames at once using functools.reduce.

  4. The number of partitions of the final DataFrame equals the sum of the number of partitions of each of the unioned DataFrame.

In [20]:
import pandas as pd
from functools import reduce
In [9]:
import findspark

findspark.init("/opt/spark-3.0.1-bin-hadoop3.2")
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col

spark = (
    SparkSession.builder.appName("PySpark_Notebook").enableHiveSupport().getOrCreate()
)
In [10]:
df1 = spark.createDataFrame(
    pd.DataFrame(
        data=(
            (1, "a", "foo", 3.0),
            (2, "b", "bar", 4.0),
            (3, "c", "foo", 5.0),
            (4, "d", "bar", 7.0),
        ),
        columns=("col1", "col2", "col3", "col4"),
    )
)
df1.show()
+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
|   1|   a| foo| 3.0|
|   2|   b| bar| 4.0|
|   3|   c| foo| 5.0|
|   4|   d| bar| 7.0|
+----+----+----+----+

In [11]:
df1.rdd.getNumPartitions()
Out[11]:
4
In [13]:
df2 = df1.filter(col("col1") <= 2)
df2.show()
+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
|   1|   a| foo| 3.0|
|   2|   b| bar| 4.0|
+----+----+----+----+

In [14]:
df2.rdd.getNumPartitions()
Out[14]:
4
In [15]:
df1.union(df2).show()
+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
|   1|   a| foo| 3.0|
|   2|   b| bar| 4.0|
|   3|   c| foo| 5.0|
|   4|   d| bar| 7.0|
|   1|   a| foo| 3.0|
|   2|   b| bar| 4.0|
+----+----+----+----+

In [16]:
df1.union(df2).rdd.getNumPartitions()
Out[16]:
8
In [18]:
df3 = df1.filter(col("col1") > 2)
df3.show()
+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
|   3|   c| foo| 5.0|
|   4|   d| bar| 7.0|
+----+----+----+----+

In [21]:
reduce(DataFrame.union, [df1, df2, df3]).show()
+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
|   1|   a| foo| 3.0|
|   2|   b| bar| 4.0|
|   3|   c| foo| 5.0|
|   4|   d| bar| 7.0|
|   1|   a| foo| 3.0|
|   2|   b| bar| 4.0|
|   3|   c| foo| 5.0|
|   4|   d| bar| 7.0|
+----+----+----+----+

In [22]:
reduce(DataFrame.union, [df1, df2, df3]).rdd.getNumPartitions()
Out[22]:
12

A way to avoid the ordering issue is to select columns to make sure that columns of the 2 DataFrames have the same ordering.

In [31]:
df4 = df2.select("col4", "col3", "col2", "col1")
df4.show()
+----+----+----+----+
|col4|col3|col2|col1|
+----+----+----+----+
| 3.0| foo|   a|   1|
| 4.0| bar|   b|   2|
+----+----+----+----+

In [36]:
df1.select(*df4.columns).show()
+----+----+----+----+
|col4|col3|col2|col1|
+----+----+----+----+
| 3.0| foo|   a|   1|
| 4.0| bar|   b|   2|
| 5.0| foo|   c|   3|
| 7.0| bar|   d|   4|
+----+----+----+----+

In [34]:
df1.select(*df4.columns).union(df4).show()
+----+----+----+----+
|col4|col3|col2|col1|
+----+----+----+----+
| 3.0| foo|   a|   1|
| 4.0| bar|   b|   2|
| 5.0| foo|   c|   3|
| 7.0| bar|   d|   4|
| 3.0| foo|   a|   1|
| 4.0| bar|   b|   2|
+----+----+----+----+

An exception is raised if the numbers of columns of the 2 DataFrames do not match.

In [39]:
df5 = spark.createDataFrame(
    pd.DataFrame(data=((1, "hello"), (2, "world")), columns=("freq", "word"))
)
df5.show()
+----+-----+
|freq| word|
+----+-----+
|   1|hello|
|   2|world|
+----+-----+

In [41]:
df6 = spark.createDataFrame(
    pd.DataFrame(
        data=(("how", 1000, 0), ("are", 300, 0), ("you", 100, 0)),
        columns=("word", "freq", "group"),
    )
)
df2.show()
+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
|   1|   a| foo| 3.0|
|   2|   b| bar| 4.0|
+----+----+----+----+

In [42]:
df5.union(df6)
---------------------------------------------------------------------------
AnalysisException                         Traceback (most recent call last)
<ipython-input-42-533c1d47b5b6> in <module>
----> 1 df5.union(df6)

/opt/spark-3.0.1-bin-hadoop3.2/python/pyspark/sql/dataframe.py in union(self, other)
   1556         Also as standard in SQL, this function resolves columns by position (not by name).
   1557         """
-> 1558         return DataFrame(self._jdf.union(other._jdf), self.sql_ctx)
   1559 
   1560     @since(1.3)

/opt/spark-3.0.1-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1302 
   1303         answer = self.gateway_client.send_command(command)
-> 1304         return_value = get_return_value(
   1305             answer, self.gateway_client, self.target_id, self.name)
   1306 

/opt/spark-3.0.1-bin-hadoop3.2/python/pyspark/sql/utils.py in deco(*a, **kw)
    132                 # Hide where the exception came from that shows a non-Pythonic
    133                 # JVM exception message.
--> 134                 raise_from(converted)
    135             else:
    136                 raise

/opt/spark-3.0.1-bin-hadoop3.2/python/pyspark/sql/utils.py in raise_from(e)

AnalysisException: Union can only be performed on tables with the same number of columns, but the first table has 2 columns and the second table has 3 columns;;
'Union
:- LogicalRDD [freq#370L, word#371], false
+- LogicalRDD [word#383, freq#384L, group#385L], false
In [ ]:
 

Comments