Skip to article frontmatterSkip to article content
Site not loading correctly?

This may be due to an incorrect BASE_URL configuration. See the MyST Documentation for reference.

Things on this page are fragmentary and immature notes/thoughts of the author. Please read with your own judgement!

Aggregation Without Grouping

  1. You can aggregate all values in Columns of a DataFrame. Just use aggregation functions in select without groupBy, which is very similar to SQL syntax.

  2. The aggregation functions all and any are available since Spark 3.0. However, they can be achieved using other aggregation functions such as sum and count in earlier versions.

  3. You can use both column expression and column names in aggreagation functions.

!/opt/pyenv/versions/3.7.9/bin/python -m pip install pandas pyarrow findspark
Defaulting to user installation because normal site-packages is not writeable
Collecting pandas
  Downloading pandas-1.2.0-cp37-cp37m-manylinux1_x86_64.whl (9.9 MB)
     |████████████████████████████████| 9.9 MB 1.3 MB/s eta 0:00:01
Collecting pyarrow
  Downloading pyarrow-2.0.0-cp37-cp37m-manylinux2014_x86_64.whl (17.7 MB)
     |████████████████████████████████| 17.7 MB 7.2 MB/s eta 0:00:01    |███████████▏                    | 6.2 MB 3.7 MB/s eta 0:00:04     |████████████▌                   | 6.9 MB 3.7 MB/s eta 0:00:03     |██████████████████▌             | 10.3 MB 3.1 MB/s eta 0:00:03     |█████████████████████▌          | 11.9 MB 3.6 MB/s eta 0:00:02
Collecting findspark
  Downloading findspark-1.4.2-py2.py3-none-any.whl (4.2 kB)
Collecting pytz>=2017.3
  Downloading pytz-2020.5-py2.py3-none-any.whl (510 kB)
     |████████████████████████████████| 510 kB 3.8 MB/s eta 0:00:01
Collecting numpy>=1.16.5
  Downloading numpy-1.19.5-cp37-cp37m-manylinux2010_x86_64.whl (14.8 MB)
     |████████████████████████████████| 14.8 MB 2.9 MB/s eta 0:00:01    |█████████████                   | 6.1 MB 7.0 MB/s eta 0:00:02     |████████████████████▉           | 9.7 MB 7.0 MB/s eta 0:00:01     |██████████████████████████▏     | 12.1 MB 2.9 MB/s eta 0:00:01
Requirement already satisfied: python-dateutil>=2.7.3 in /opt/pyenv/versions/3.7.9/lib/python3.7/site-packages (from pandas) (2.8.1)
Requirement already satisfied: six>=1.5 in /opt/pyenv/versions/3.7.9/lib/python3.7/site-packages (from python-dateutil>=2.7.3->pandas) (1.15.0)
Installing collected packages: pytz, numpy, pandas, pyarrow, findspark
  WARNING: The scripts f2py, f2py3 and f2py3.7 are installed in '/home/dclong/.local/bin' which is not on PATH.
  Consider adding this directory to PATH or, if you prefer to suppress this warning, use --no-warn-script-location.
  WARNING: The script plasma_store is installed in '/home/dclong/.local/bin' which is not on PATH.
  Consider adding this directory to PATH or, if you prefer to suppress this warning, use --no-warn-script-location.
Successfully installed findspark-1.4.2 numpy-1.19.5 pandas-1.2.0 pyarrow-2.0.0 pytz-2020.5
WARNING: You are using pip version 20.1.1; however, version 20.3.3 is available.
You should consider upgrading via the '/opt/pyenv/versions/3.7.9/bin/python -m pip install --upgrade pip' command.
import pandas as pd
import findspark

findspark.init("/opt/spark-2.3.1-bin-hadoop2.7/")

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import *
from pyspark.sql.types import StructType

spark = SparkSession.builder.appName("Case/When").enableHiveSupport().getOrCreate()
import pandas as pd
import findspark

findspark.init("/opt/spark-3.0.1-bin-hadoop3.2/")

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import *
from pyspark.sql.types import StructType

spark = SparkSession.builder.appName("Case/When").enableHiveSupport().getOrCreate()
df = spark.createDataFrame(
    pd.DataFrame(
        data=(
            ("Ben", "Du", 0, True, 1),
            ("Ben", "Du", 0, True, 1),
            ("Ben", "Tu", 1, False, 0),
            ("Ben", "Tu", 3, False, 0),
            ("Ken", "Xu", 6, False, 0),
            ("Ken", "Xu", 9, False, 0),
        ),
        columns=("fname", "lname", "score", "x", "y"),
    )
)
df.show()
+-----+-----+-----+-----+---+
|fname|lname|score|    x|  y|
+-----+-----+-----+-----+---+
|  Ben|   Du|    0| true|  1|
|  Ben|   Du|    0| true|  1|
|  Ben|   Tu|    1|false|  0|
|  Ben|   Tu|    3|false|  0|
|  Ken|   Xu|    6|false|  0|
|  Ken|   Xu|    9|false|  0|
+-----+-----+-----+-----+---+

df.select(col("score").cast("boolean").cast("int")).show()
+-----+
|score|
+-----+
|    0|
|    0|
|    1|
|    1|
|    1|
|    1|
+-----+

None is correctly recoginized as null when used in when(expr, val).otherwise(None).

df.select(when(col("score") >= 3, 1)).show()
+---------------------------------+
|CASE WHEN (score >= 3) THEN 1 END|
+---------------------------------+
|                             null|
|                             null|
|                                1|
|                                1|
|                             null|
|                                1|
+---------------------------------+

df.select(when(col("score") >= 3, 1).otherwise(None)).show()
+-------------------------------------------+
|CASE WHEN (score >= 3) THEN 1 ELSE NULL END|
+-------------------------------------------+
|                                       null|
|                                       null|
|                                          1|
|                                          1|
|                                       null|
|                                          1|
+-------------------------------------------+

any

  1. Available only as a SQL function (instead of a DataFrame API function) since Spark 3.0.

  2. Works on boolean columns only.

from pyspark.sql.functions import any
---------------------------------------------------------------------------
ImportError                               Traceback (most recent call last)
<ipython-input-9-280aa8841125> in <module>
----> 1 from pyspark.sql.functions import any

ImportError: cannot import name 'any' from 'pyspark.sql.functions' (/opt/spark-3.0.1-bin-hadoop3.2/python/pyspark/sql/functions.py)
df.createOrReplaceTempView("df")
spark.sql("select any(x) from df").show()
+------+
|any(x)|
+------+
|  true|
+------+

spark.sql("select any(x) from df where score > 2").show()
+------+
|any(x)|
+------+
| false|
+------+

spark.sql("select any(y) from df").show()
---------------------------------------------------------------------------
AnalysisException                         Traceback (most recent call last)
<ipython-input-7-1b9dcb466bcc> in <module>
----> 1 spark.sql("select any(y) from df").show()

/opt/spark-3.0.1-bin-hadoop3.2/python/pyspark/sql/session.py in sql(self, sqlQuery)
    647         [Row(f1=1, f2=u'row1'), Row(f1=2, f2=u'row2'), Row(f1=3, f2=u'row3')]
    648         """
--> 649         return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
    650 
    651     @since(2.0)

/opt/spark-3.0.1-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:

/opt/spark-3.0.1-bin-hadoop3.2/python/pyspark/sql/utils.py in deco(*a, **kw)
    132                 # Hide where the exception came from that shows a non-Pythonic
    133                 # JVM exception message.
--> 134                 raise_from(converted)
    135             else:
    136                 raise

/opt/spark-3.0.1-bin-hadoop3.2/python/pyspark/sql/utils.py in raise_from(e)

AnalysisException: cannot resolve 'any(df.`y`)' due to data type mismatch: Input to function 'any' should have been boolean, but it's [bigint].; line 1 pos 7;
'Aggregate [unresolvedalias(any(y#4L), None)]
+- SubqueryAlias df
   +- LogicalRDD [fname#0, lname#1, score#2L, x#3, y#4L], false
df.select(
    count("fname").alias("num_first_name"),
    count("lname").alias("num_last_name"),
    sum("score").alias("sum_score"),
).show()
+--------------+-------------+---------+
|num_first_name|num_last_name|sum_score|
+--------------+-------------+---------+
|             6|            6|       20|
+--------------+-------------+---------+

Aggregation Using groupBy

You can use position alias in group by in Spark SQL!!!

df.show()
+-----+-----+-----+-----+---+
|fname|lname|score|    x|  y|
+-----+-----+-----+-----+---+
|  Ben|   Du|    0| true|  1|
|  Ben|   Du|    0| true|  1|
|  Ben|   Tu|    1|false|  0|
|  Ben|   Tu|    3|false|  0|
|  Ken|   Xu|    6|false|  0|
|  Ken|   Xu|    9|false|  0|
+-----+-----+-----+-----+---+

df.groupBy("lname").agg(sum("y")).show()
+-----+------+
|lname|sum(y)|
+-----+------+
|   Xu|     0|
|   Du|     2|
|   Tu|     0|
+-----+------+

df.createOrReplaceTempView("people")
spark.sql("select fname, count(*) as n from people group by 1").show
+-----+---+
|fname|  n|
+-----+---+
|  Ben|  4|
|  Ken|  2|
+-----+---+

sum

  1. sum ignores null

  2. When all values are null, sum returns null.

sum ignores null.

val df = Seq(
    ("2017-01-01", 1L),
    ("2017-01-01", 10L),
    ("2017-02-01", 2L),
    ("2017-02-01", 22L)
).toDF("date", "value").
withColumn("value", when($"value" > 20, null).otherwise($"value"))
df.show
+----------+-----+
|      date|value|
+----------+-----+
|2017-01-01|    1|
|2017-01-01|   10|
|2017-02-01|    2|
|2017-02-01| null|
+----------+-----+

df = [date: string, value: bigint]
[date: string, value: bigint]
df.groupBy("date").agg(sum($"value").alias("s")).show
[Stage 0:>                                                          (0 + 4) / 4]+----------+---+
|      date|  s|
+----------+---+
|2017-01-01| 11|
|2017-02-01|  2|
+----------+---+

lastException: Throwable = null

When all values are null, sum returns null.

import org.apache.spark.sql.functions._
val df = spark.read.json("../data/people.json").
    withColumn("is_null", when($"age".isNull, 1).otherwise(0))
df.show
+----+-------+-------+
| age|   name|is_null|
+----+-------+-------+
|null|Michael|      1|
|  30|   Andy|      0|
|  19| Justin|      0|
+----+-------+-------+

df = [age: bigint, name: string ... 1 more field]
[age: bigint, name: string ... 1 more field]

Specify an alias for the column after aggregation.

df.groupBy("is_null").agg(sum("age").alias("sage")).show
+-------+----+
|is_null|sage|
+-------+----+
|      1|null|
|      0|  49|
+-------+----+

Group By Multiple Columns

df.groupBy("fname", "lname").sum().show
+-----+-----+----------+
|fname|lname|sum(score)|
+-----+-----+----------+
|  Ben|   Du|         3|
|  Ken|   Xu|        10|
|  Ben|   Tu|         7|
+-----+-----+----------+

null
val df = spark.read.json("../../data/people.json")
df.show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

null

agg

import org.apache.spark.sql.functions._
val df = spark.read.json("../../data/people.json").withColumn("is_null", when($"age".isNull, 1).otherwise(0))
df.show
+----+-------+-------+
| age|   name|is_null|
+----+-------+-------+
|null|Michael|      1|
|  30|   Andy|      0|
|  19| Justin|      0|
+----+-------+-------+

null
df.groupBy("is_null").agg(sum("age").alias("sage")).show
+-------+----+
|is_null|sage|
+-------+----+
|      1|null|
|      0|  49|
+-------+----+

null
df.groupBy("is_null").agg(sum("age").alias("sage"), count("*").alias("cnt")).show
+-------+----+---+
|is_null|sage|cnt|
+-------+----+---+
|      1|null|  1|
|      0|  49|  2|
+-------+----+---+

null

Collection

collect_list

import org.apache.spark.sql.functions._

val df = Seq(
    ("Ben", 1),
    ("Ben" ,2),
    ("Ben", 3),
    ("Ken", 1),
    ("Ken", 9)
).toDF("name", "score")
df.show
+----+-----+
|name|score|
+----+-----+
| Ben|    1|
| Ben|    2|
| Ben|    3|
| Ken|    1|
| Ken|    9|
+----+-----+

null
val df2 = df.groupBy("name").agg(
    collect_list("score").alias("scores")
)
df2.show
+----+---------+
|name|   scores|
+----+---------+
| Ben|[1, 2, 3]|
| Ken|   [1, 9]|
+----+---------+

null
df2.printSchema
root
 |-- name: string (nullable = true)
 |-- scores: array (nullable = true)
 |    |-- element: integer (containsNull = true)

null

collect_set

val df_copy = Seq(
    ("Ben", 1),
    ("Ben", 1),
    ("Ben" ,2),
    ("Ben", 3),
    ("Ken", 1),
    ("Ken", 9)
).toDF("name", "score")
df_copy.show

val df3 = df_copy.groupBy("name").agg(collect_list("score").alias("scores"))
df3.show()

val df4 = df_copy.groupBy("name").agg(collect_set("score").alias("scores"))
df4.show
+----+-----+
|name|score|
+----+-----+
| Ben|    1|
| Ben|    1|
| Ben|    2|
| Ben|    3|
| Ken|    1|
| Ken|    9|
+----+-----+

+----+------------+
|name|      scores|
+----+------------+
| Ben|[1, 1, 2, 3]|
| Ken|      [1, 9]|
+----+------------+

+----+---------+
|name|   scores|
+----+---------+
| Ben|[1, 2, 3]|
| Ken|   [9, 1]|
+----+---------+

null

First/Last

first

last

Grouping

grouping

grouping_id

val df = spark.read.json("../data/people.json")
df.show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

Count

count

import org.apache.spark.sql.functions._
val df = spark.read.json("../../../data/people.json").withColumn("is_null", when($"age".isNull, 1).otherwise(0))
df.show
[Stage 0:>                                                          (0 + 2) / 2]+----+-------+-------+
| age|   name|is_null|
+----+-------+-------+
|null|Michael|      1|
|  30|   Andy|      0|
|  19| Justin|      0|
+----+-------+-------+

df = [age: bigint, name: string ... 1 more field]
lastException: Throwable = null
[age: bigint, name: string ... 1 more field]
df.groupBy("is_null").count().show()
+-------+-----+
|is_null|count|
+-------+-----+
|      1|    1|
|      0|    2|
+-------+-----+

df.groupBy("is_null").agg(count("*").as("total")).show
+-------+-----+
|is_null|total|
+-------+-----+
|      1|    1|
|      0|    2|
+-------+-----+

df.groupBy("is_null").agg(count(when($"name" === "Andy", 1).otherwise(null))).show
+-------+---------------------------------------------------+
|is_null|count(CASE WHEN (name = Andy) THEN 1 ELSE NULL END)|
+-------+---------------------------------------------------+
|      1|                                                  0|
|      0|                                                  1|
+-------+---------------------------------------------------+

df.groupBy("is_null").agg(sum(when($"name" === "Andy", 1).otherwise(0))).show
[Stage 41:================================>                       (44 + 8) / 75]+-------+----------------------------------------------+
|is_null|sum(CASE WHEN (name = Andy) THEN 1 ELSE 0 END)|
+-------+----------------------------------------------+
|      1|                                             0|
|      0|                                             1|
+-------+----------------------------------------------+

df.groupBy("is_null").agg(count("*").alias("total")).show
+-------+-----+
|is_null|total|
+-------+-----+
|      1|    1|
|      0|    2|
+-------+-----+

countDistinct

df.groupBy("is_null").agg(countDistinct("is_null").alias("total")).show
+-------+-----+
|is_null|total|
+-------+-----+
|      1|    1|
|      0|    1|
+-------+-----+

approx_count_distinct

Sum

sum

sumDistinct

Extreme Values

max

min

Mean/Average

avg

How does average behave on null values?

mean

Standard Deviation

stddev

stddev_pop

stddev_samp

Variance

var_pop

var_sample

variance

Correlation & Covariance

corr

covar_pop

covar_samp

Skewness & Kurtosis

skewness

kurtosis