Ben Chuanlong Du's Blog

It is never too late to learn.

Aggregate DataFrames in Spark

Aggregation Without Grouping

  1. You can aggregate all values in Columns of a DataFrame. Just use aggregation functions in select without groupBy, which is very similar to SQL syntax.

  2. The aggregation functions all and any are available since Spark 3.0. However, they can be achieved using other aggregation functions such as sum and count in earlier versions.

  3. You can use both column expression and column names in aggreagation functions.

In [2]:
!/opt/pyenv/versions/3.7.9/bin/python -m pip install pandas pyarrow findspark
Defaulting to user installation because normal site-packages is not writeable
Collecting pandas
  Downloading pandas-1.2.0-cp37-cp37m-manylinux1_x86_64.whl (9.9 MB)
     |████████████████████████████████| 9.9 MB 1.3 MB/s eta 0:00:01
Collecting pyarrow
  Downloading pyarrow-2.0.0-cp37-cp37m-manylinux2014_x86_64.whl (17.7 MB)
     |████████████████████████████████| 17.7 MB 7.2 MB/s eta 0:00:01    |███████████▏                    | 6.2 MB 3.7 MB/s eta 0:00:04     |████████████▌                   | 6.9 MB 3.7 MB/s eta 0:00:03     |██████████████████▌             | 10.3 MB 3.1 MB/s eta 0:00:03     |█████████████████████▌          | 11.9 MB 3.6 MB/s eta 0:00:02
Collecting findspark
  Downloading findspark-1.4.2-py2.py3-none-any.whl (4.2 kB)
Collecting pytz>=2017.3
  Downloading pytz-2020.5-py2.py3-none-any.whl (510 kB)
     |████████████████████████████████| 510 kB 3.8 MB/s eta 0:00:01
Collecting numpy>=1.16.5
  Downloading numpy-1.19.5-cp37-cp37m-manylinux2010_x86_64.whl (14.8 MB)
     |████████████████████████████████| 14.8 MB 2.9 MB/s eta 0:00:01    |█████████████                   | 6.1 MB 7.0 MB/s eta 0:00:02     |████████████████████▉           | 9.7 MB 7.0 MB/s eta 0:00:01     |██████████████████████████▏     | 12.1 MB 2.9 MB/s eta 0:00:01
Requirement already satisfied: python-dateutil>=2.7.3 in /opt/pyenv/versions/3.7.9/lib/python3.7/site-packages (from pandas) (2.8.1)
Requirement already satisfied: six>=1.5 in /opt/pyenv/versions/3.7.9/lib/python3.7/site-packages (from python-dateutil>=2.7.3->pandas) (1.15.0)
Installing collected packages: pytz, numpy, pandas, pyarrow, findspark
  WARNING: The scripts f2py, f2py3 and f2py3.7 are installed in '/home/dclong/.local/bin' which is not on PATH.
  Consider adding this directory to PATH or, if you prefer to suppress this warning, use --no-warn-script-location.
  WARNING: The script plasma_store is installed in '/home/dclong/.local/bin' which is not on PATH.
  Consider adding this directory to PATH or, if you prefer to suppress this warning, use --no-warn-script-location.
Successfully installed findspark-1.4.2 numpy-1.19.5 pandas-1.2.0 pyarrow-2.0.0 pytz-2020.5
WARNING: You are using pip version 20.1.1; however, version 20.3.3 is available.
You should consider upgrading via the '/opt/pyenv/versions/3.7.9/bin/python -m pip install --upgrade pip' command.
In [1]:
import pandas as pd
import findspark

findspark.init("/opt/spark-2.3.1-bin-hadoop2.7/")

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import *
from pyspark.sql.types import StructType

spark = SparkSession.builder.appName("Case/When").enableHiveSupport().getOrCreate()
In [2]:
import pandas as pd
import findspark

findspark.init("/opt/spark-3.0.1-bin-hadoop3.2/")

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import *
from pyspark.sql.types import StructType

spark = SparkSession.builder.appName("Case/When").enableHiveSupport().getOrCreate()
In [3]:
df = spark.createDataFrame(
    pd.DataFrame(
        data=(
            ("Ben", "Du", 0, True, 1),
            ("Ben", "Du", 0, True, 1),
            ("Ben", "Tu", 1, False, 0),
            ("Ben", "Tu", 3, False, 0),
            ("Ken", "Xu", 6, False, 0),
            ("Ken", "Xu", 9, False, 0),
        ),
        columns=("fname", "lname", "score", "x", "y"),
    )
)
df.show()
+-----+-----+-----+-----+---+
|fname|lname|score|    x|  y|
+-----+-----+-----+-----+---+
|  Ben|   Du|    0| true|  1|
|  Ben|   Du|    0| true|  1|
|  Ben|   Tu|    1|false|  0|
|  Ben|   Tu|    3|false|  0|
|  Ken|   Xu|    6|false|  0|
|  Ken|   Xu|    9|false|  0|
+-----+-----+-----+-----+---+

In [10]:
df.select(col("score").cast("boolean").cast("int")).show()
+-----+
|score|
+-----+
|    0|
|    0|
|    1|
|    1|
|    1|
|    1|
+-----+

None is correctly recoginized as null when used in when(expr, val).otherwise(None).

In [4]:
df.select(when(col("score") >= 3, 1)).show()
+---------------------------------+
|CASE WHEN (score >= 3) THEN 1 END|
+---------------------------------+
|                             null|
|                             null|
|                                1|
|                                1|
|                             null|
|                                1|
+---------------------------------+

In [3]:
df.select(when(col("score") >= 3, 1).otherwise(None)).show()
+-------------------------------------------+
|CASE WHEN (score >= 3) THEN 1 ELSE NULL END|
+-------------------------------------------+
|                                       null|
|                                       null|
|                                          1|
|                                          1|
|                                       null|
|                                          1|
+-------------------------------------------+

any

  1. Available only as a SQL function (instead of a DataFrame API function) since Spark 3.0.

  2. Works on boolean columns only.

In [9]:
from pyspark.sql.functions import any
---------------------------------------------------------------------------
ImportError                               Traceback (most recent call last)
<ipython-input-9-280aa8841125> in <module>
----> 1 from pyspark.sql.functions import any

ImportError: cannot import name 'any' from 'pyspark.sql.functions' (/opt/spark-3.0.1-bin-hadoop3.2/python/pyspark/sql/functions.py)
In [4]:
df.createOrReplaceTempView("df")
In [6]:
spark.sql("select any(x) from df").show()
+------+
|any(x)|
+------+
|  true|
+------+

In [8]:
spark.sql("select any(x) from df where score > 2").show()
+------+
|any(x)|
+------+
| false|
+------+

In [7]:
spark.sql("select any(y) from df").show()
---------------------------------------------------------------------------
AnalysisException                         Traceback (most recent call last)
<ipython-input-7-1b9dcb466bcc> in <module>
----> 1 spark.sql("select any(y) from df").show()

/opt/spark-3.0.1-bin-hadoop3.2/python/pyspark/sql/session.py in sql(self, sqlQuery)
    647         [Row(f1=1, f2=u'row1'), Row(f1=2, f2=u'row2'), Row(f1=3, f2=u'row3')]
    648         """
--> 649         return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
    650 
    651     @since(2.0)

/opt/spark-3.0.1-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:

/opt/spark-3.0.1-bin-hadoop3.2/python/pyspark/sql/utils.py in deco(*a, **kw)
    132                 # Hide where the exception came from that shows a non-Pythonic
    133                 # JVM exception message.
--> 134                 raise_from(converted)
    135             else:
    136                 raise

/opt/spark-3.0.1-bin-hadoop3.2/python/pyspark/sql/utils.py in raise_from(e)

AnalysisException: cannot resolve 'any(df.`y`)' due to data type mismatch: Input to function 'any' should have been boolean, but it's [bigint].; line 1 pos 7;
'Aggregate [unresolvedalias(any(y#4L), None)]
+- SubqueryAlias df
   +- LogicalRDD [fname#0, lname#1, score#2L, x#3, y#4L], false
In [3]:
df.select(
    count("fname").alias("num_first_name"),
    count("lname").alias("num_last_name"),
    sum("score").alias("sum_score"),
).show()
+--------------+-------------+---------+
|num_first_name|num_last_name|sum_score|
+--------------+-------------+---------+
|             6|            6|       20|
+--------------+-------------+---------+

Aggregation Using groupBy

You can use position alias in group by in Spark SQL!!!

In [6]:
df.show()
+-----+-----+-----+-----+---+
|fname|lname|score|    x|  y|
+-----+-----+-----+-----+---+
|  Ben|   Du|    0| true|  1|
|  Ben|   Du|    0| true|  1|
|  Ben|   Tu|    1|false|  0|
|  Ben|   Tu|    3|false|  0|
|  Ken|   Xu|    6|false|  0|
|  Ken|   Xu|    9|false|  0|
+-----+-----+-----+-----+---+

In [11]:
df.groupBy("lname").agg(sum("y")).show()
+-----+------+
|lname|sum(y)|
+-----+------+
|   Xu|     0|
|   Du|     2|
|   Tu|     0|
+-----+------+

In [4]:
df.createOrReplaceTempView("people")
In [7]:
spark.sql("select fname, count(*) as n from people group by 1").show
+-----+---+
|fname|  n|
+-----+---+
|  Ben|  4|
|  Ken|  2|
+-----+---+

sum

  1. sum ignores null

  2. When all values are null, sum returns null.

sum ignores null.

In [1]:
val df = Seq(
    ("2017-01-01", 1L),
    ("2017-01-01", 10L),
    ("2017-02-01", 2L),
    ("2017-02-01", 22L)
).toDF("date", "value").
withColumn("value", when($"value" > 20, null).otherwise($"value"))
df.show
+----------+-----+
|      date|value|
+----------+-----+
|2017-01-01|    1|
|2017-01-01|   10|
|2017-02-01|    2|
|2017-02-01| null|
+----------+-----+

df = [date: string, value: bigint]
Out[1]:
[date: string, value: bigint]
In [16]:
df.groupBy("date").agg(sum($"value").alias("s")).show
[Stage 0:>                                                          (0 + 4) / 4]+----------+---+
|      date|  s|
+----------+---+
|2017-01-01| 11|
|2017-02-01|  2|
+----------+---+

lastException: Throwable = null

When all values are null, sum returns null.

In [1]:
import org.apache.spark.sql.functions._
val df = spark.read.json("../data/people.json").
    withColumn("is_null", when($"age".isNull, 1).otherwise(0))
df.show
+----+-------+-------+
| age|   name|is_null|
+----+-------+-------+
|null|Michael|      1|
|  30|   Andy|      0|
|  19| Justin|      0|
+----+-------+-------+

df = [age: bigint, name: string ... 1 more field]
Out[1]:
[age: bigint, name: string ... 1 more field]

Specify an alias for the column after aggregation.

In [26]:
df.groupBy("is_null").agg(sum("age").alias("sage")).show
+-------+----+
|is_null|sage|
+-------+----+
|      1|null|
|      0|  49|
+-------+----+

Group By Multiple Columns

In [7]:
df.groupBy("fname", "lname").sum().show
+-----+-----+----------+
|fname|lname|sum(score)|
+-----+-----+----------+
|  Ben|   Du|         3|
|  Ken|   Xu|        10|
|  Ben|   Tu|         7|
+-----+-----+----------+

Out[7]:
null
In [3]:
val df = spark.read.json("../../data/people.json")
df.show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

Out[3]:
null

agg

In [4]:
import org.apache.spark.sql.functions._
val df = spark.read.json("../../data/people.json").withColumn("is_null", when($"age".isNull, 1).otherwise(0))
df.show
+----+-------+-------+
| age|   name|is_null|
+----+-------+-------+
|null|Michael|      1|
|  30|   Andy|      0|
|  19| Justin|      0|
+----+-------+-------+

Out[4]:
null
In [5]:
df.groupBy("is_null").agg(sum("age").alias("sage")).show
+-------+----+
|is_null|sage|
+-------+----+
|      1|null|
|      0|  49|
+-------+----+

Out[5]:
null
In [6]:
df.groupBy("is_null").agg(sum("age").alias("sage"), count("*").alias("cnt")).show
+-------+----+---+
|is_null|sage|cnt|
+-------+----+---+
|      1|null|  1|
|      0|  49|  2|
+-------+----+---+

Out[6]:
null

Collection

collect_list

In [7]:
import org.apache.spark.sql.functions._

val df = Seq(
    ("Ben", 1),
    ("Ben" ,2),
    ("Ben", 3),
    ("Ken", 1),
    ("Ken", 9)
).toDF("name", "score")
df.show
+----+-----+
|name|score|
+----+-----+
| Ben|    1|
| Ben|    2|
| Ben|    3|
| Ken|    1|
| Ken|    9|
+----+-----+

Out[7]:
null
In [13]:
val df2 = df.groupBy("name").agg(
    collect_list("score").alias("scores")
)
df2.show
+----+---------+
|name|   scores|
+----+---------+
| Ben|[1, 2, 3]|
| Ken|   [1, 9]|
+----+---------+

Out[13]:
null
In [14]:
df2.printSchema
root
 |-- name: string (nullable = true)
 |-- scores: array (nullable = true)
 |    |-- element: integer (containsNull = true)

Out[14]:
null

collect_set

In [11]:
val df_copy = Seq(
    ("Ben", 1),
    ("Ben", 1),
    ("Ben" ,2),
    ("Ben", 3),
    ("Ken", 1),
    ("Ken", 9)
).toDF("name", "score")
df_copy.show

val df3 = df_copy.groupBy("name").agg(collect_list("score").alias("scores"))
df3.show()

val df4 = df_copy.groupBy("name").agg(collect_set("score").alias("scores"))
df4.show
+----+-----+
|name|score|
+----+-----+
| Ben|    1|
| Ben|    1|
| Ben|    2|
| Ben|    3|
| Ken|    1|
| Ken|    9|
+----+-----+

+----+------------+
|name|      scores|
+----+------------+
| Ben|[1, 1, 2, 3]|
| Ken|      [1, 9]|
+----+------------+

+----+---------+
|name|   scores|
+----+---------+
| Ben|[1, 2, 3]|
| Ken|   [9, 1]|
+----+---------+

Out[11]:
null

First/Last

first

last

Grouping

grouping

grouping_id

In [3]:
val df = spark.read.json("../data/people.json")
df.show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

Count

count

In [5]:
import org.apache.spark.sql.functions._
val df = spark.read.json("../../../data/people.json").withColumn("is_null", when($"age".isNull, 1).otherwise(0))
df.show
[Stage 0:>                                                          (0 + 2) / 2]+----+-------+-------+
| age|   name|is_null|
+----+-------+-------+
|null|Michael|      1|
|  30|   Andy|      0|
|  19| Justin|      0|
+----+-------+-------+

df = [age: bigint, name: string ... 1 more field]
lastException: Throwable = null
Out[5]:
[age: bigint, name: string ... 1 more field]
In [6]:
df.groupBy("is_null").count().show()
+-------+-----+
|is_null|count|
+-------+-----+
|      1|    1|
|      0|    2|
+-------+-----+

In [7]:
df.groupBy("is_null").agg(count("*").as("total")).show
+-------+-----+
|is_null|total|
+-------+-----+
|      1|    1|
|      0|    2|
+-------+-----+

In [14]:
df.groupBy("is_null").agg(count(when($"name" === "Andy", 1).otherwise(null))).show
+-------+---------------------------------------------------+
|is_null|count(CASE WHEN (name = Andy) THEN 1 ELSE NULL END)|
+-------+---------------------------------------------------+
|      1|                                                  0|
|      0|                                                  1|
+-------+---------------------------------------------------+

In [13]:
df.groupBy("is_null").agg(sum(when($"name" === "Andy", 1).otherwise(0))).show
[Stage 41:================================>                       (44 + 8) / 75]+-------+----------------------------------------------+
|is_null|sum(CASE WHEN (name = Andy) THEN 1 ELSE 0 END)|
+-------+----------------------------------------------+
|      1|                                             0|
|      0|                                             1|
+-------+----------------------------------------------+

In [8]:
df.groupBy("is_null").agg(count("*").alias("total")).show
+-------+-----+
|is_null|total|
+-------+-----+
|      1|    1|
|      0|    2|
+-------+-----+

countDistinct

In [16]:
df.groupBy("is_null").agg(countDistinct("is_null").alias("total")).show
+-------+-----+
|is_null|total|
+-------+-----+
|      1|    1|
|      0|    1|
+-------+-----+

approx_count_distinct

Sum

sum

sumDistinct

Extreme Values

max

min

Mean/Average

avg

How does average behave on null values?

mean

Standard Deviation

stddev

stddev_pop

stddev_samp

Variance

var_pop

var_sample

variance

Correlation & Covariance

corr

covar_pop

covar_samp

Skewness & Kurtosis

skewness

kurtosis

Comments