Things on this page are fragmentary and immature notes/thoughts of the author. Please read with your own judgement!
Aggregation Without Grouping¶
You can aggregate all values in Columns of a DataFrame. Just use aggregation functions in
selectwithoutgroupBy, which is very similar to SQL syntax.The aggregation functions
allandanyare available since Spark 3.0. However, they can be achieved using other aggregation functions such assumandcountin earlier versions.You can use both column expression and column names in aggreagation functions.
!/opt/pyenv/versions/3.7.9/bin/python -m pip install pandas pyarrow findsparkDefaulting to user installation because normal site-packages is not writeable
Collecting pandas
Downloading pandas-1.2.0-cp37-cp37m-manylinux1_x86_64.whl (9.9 MB)
|████████████████████████████████| 9.9 MB 1.3 MB/s eta 0:00:01
Collecting pyarrow
Downloading pyarrow-2.0.0-cp37-cp37m-manylinux2014_x86_64.whl (17.7 MB)
|████████████████████████████████| 17.7 MB 7.2 MB/s eta 0:00:01 |███████████▏ | 6.2 MB 3.7 MB/s eta 0:00:04 |████████████▌ | 6.9 MB 3.7 MB/s eta 0:00:03 |██████████████████▌ | 10.3 MB 3.1 MB/s eta 0:00:03 |█████████████████████▌ | 11.9 MB 3.6 MB/s eta 0:00:02
Collecting findspark
Downloading findspark-1.4.2-py2.py3-none-any.whl (4.2 kB)
Collecting pytz>=2017.3
Downloading pytz-2020.5-py2.py3-none-any.whl (510 kB)
|████████████████████████████████| 510 kB 3.8 MB/s eta 0:00:01
Collecting numpy>=1.16.5
Downloading numpy-1.19.5-cp37-cp37m-manylinux2010_x86_64.whl (14.8 MB)
|████████████████████████████████| 14.8 MB 2.9 MB/s eta 0:00:01 |█████████████ | 6.1 MB 7.0 MB/s eta 0:00:02 |████████████████████▉ | 9.7 MB 7.0 MB/s eta 0:00:01 |██████████████████████████▏ | 12.1 MB 2.9 MB/s eta 0:00:01
Requirement already satisfied: python-dateutil>=2.7.3 in /opt/pyenv/versions/3.7.9/lib/python3.7/site-packages (from pandas) (2.8.1)
Requirement already satisfied: six>=1.5 in /opt/pyenv/versions/3.7.9/lib/python3.7/site-packages (from python-dateutil>=2.7.3->pandas) (1.15.0)
Installing collected packages: pytz, numpy, pandas, pyarrow, findspark
WARNING: The scripts f2py, f2py3 and f2py3.7 are installed in '/home/dclong/.local/bin' which is not on PATH.
Consider adding this directory to PATH or, if you prefer to suppress this warning, use --no-warn-script-location.
WARNING: The script plasma_store is installed in '/home/dclong/.local/bin' which is not on PATH.
Consider adding this directory to PATH or, if you prefer to suppress this warning, use --no-warn-script-location.
Successfully installed findspark-1.4.2 numpy-1.19.5 pandas-1.2.0 pyarrow-2.0.0 pytz-2020.5
WARNING: You are using pip version 20.1.1; however, version 20.3.3 is available.
You should consider upgrading via the '/opt/pyenv/versions/3.7.9/bin/python -m pip install --upgrade pip' command.
import pandas as pd
import findspark
findspark.init("/opt/spark-2.3.1-bin-hadoop2.7/")
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import *
from pyspark.sql.types import StructType
spark = SparkSession.builder.appName("Case/When").enableHiveSupport().getOrCreate()import pandas as pd
import findspark
findspark.init("/opt/spark-3.0.1-bin-hadoop3.2/")
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import *
from pyspark.sql.types import StructType
spark = SparkSession.builder.appName("Case/When").enableHiveSupport().getOrCreate()df = spark.createDataFrame(
pd.DataFrame(
data=(
("Ben", "Du", 0, True, 1),
("Ben", "Du", 0, True, 1),
("Ben", "Tu", 1, False, 0),
("Ben", "Tu", 3, False, 0),
("Ken", "Xu", 6, False, 0),
("Ken", "Xu", 9, False, 0),
),
columns=("fname", "lname", "score", "x", "y"),
)
)
df.show()+-----+-----+-----+-----+---+
|fname|lname|score| x| y|
+-----+-----+-----+-----+---+
| Ben| Du| 0| true| 1|
| Ben| Du| 0| true| 1|
| Ben| Tu| 1|false| 0|
| Ben| Tu| 3|false| 0|
| Ken| Xu| 6|false| 0|
| Ken| Xu| 9|false| 0|
+-----+-----+-----+-----+---+
df.select(col("score").cast("boolean").cast("int")).show()+-----+
|score|
+-----+
| 0|
| 0|
| 1|
| 1|
| 1|
| 1|
+-----+
None is correctly recoginized as null when used in when(expr, val).otherwise(None).
df.select(when(col("score") >= 3, 1)).show()+---------------------------------+
|CASE WHEN (score >= 3) THEN 1 END|
+---------------------------------+
| null|
| null|
| 1|
| 1|
| null|
| 1|
+---------------------------------+
df.select(when(col("score") >= 3, 1).otherwise(None)).show()+-------------------------------------------+
|CASE WHEN (score >= 3) THEN 1 ELSE NULL END|
+-------------------------------------------+
| null|
| null|
| 1|
| 1|
| null|
| 1|
+-------------------------------------------+
any¶
Available only as a SQL function (instead of a DataFrame API function) since Spark 3.0.
Works on boolean columns only.
from pyspark.sql.functions import any---------------------------------------------------------------------------
ImportError Traceback (most recent call last)
<ipython-input-9-280aa8841125> in <module>
----> 1 from pyspark.sql.functions import any
ImportError: cannot import name 'any' from 'pyspark.sql.functions' (/opt/spark-3.0.1-bin-hadoop3.2/python/pyspark/sql/functions.py)df.createOrReplaceTempView("df")spark.sql("select any(x) from df").show()+------+
|any(x)|
+------+
| true|
+------+
spark.sql("select any(x) from df where score > 2").show()+------+
|any(x)|
+------+
| false|
+------+
spark.sql("select any(y) from df").show()---------------------------------------------------------------------------
AnalysisException Traceback (most recent call last)
<ipython-input-7-1b9dcb466bcc> in <module>
----> 1 spark.sql("select any(y) from df").show()
/opt/spark-3.0.1-bin-hadoop3.2/python/pyspark/sql/session.py in sql(self, sqlQuery)
647 [Row(f1=1, f2=u'row1'), Row(f1=2, f2=u'row2'), Row(f1=3, f2=u'row3')]
648 """
--> 649 return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
650
651 @since(2.0)
/opt/spark-3.0.1-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
1303 answer = self.gateway_client.send_command(command)
1304 return_value = get_return_value(
-> 1305 answer, self.gateway_client, self.target_id, self.name)
1306
1307 for temp_arg in temp_args:
/opt/spark-3.0.1-bin-hadoop3.2/python/pyspark/sql/utils.py in deco(*a, **kw)
132 # Hide where the exception came from that shows a non-Pythonic
133 # JVM exception message.
--> 134 raise_from(converted)
135 else:
136 raise
/opt/spark-3.0.1-bin-hadoop3.2/python/pyspark/sql/utils.py in raise_from(e)
AnalysisException: cannot resolve 'any(df.`y`)' due to data type mismatch: Input to function 'any' should have been boolean, but it's [bigint].; line 1 pos 7;
'Aggregate [unresolvedalias(any(y#4L), None)]
+- SubqueryAlias df
+- LogicalRDD [fname#0, lname#1, score#2L, x#3, y#4L], false
df.select(
count("fname").alias("num_first_name"),
count("lname").alias("num_last_name"),
sum("score").alias("sum_score"),
).show()+--------------+-------------+---------+
|num_first_name|num_last_name|sum_score|
+--------------+-------------+---------+
| 6| 6| 20|
+--------------+-------------+---------+
Aggregation Using groupBy¶
You can use position alias in group by in Spark SQL!!!
df.show()+-----+-----+-----+-----+---+
|fname|lname|score| x| y|
+-----+-----+-----+-----+---+
| Ben| Du| 0| true| 1|
| Ben| Du| 0| true| 1|
| Ben| Tu| 1|false| 0|
| Ben| Tu| 3|false| 0|
| Ken| Xu| 6|false| 0|
| Ken| Xu| 9|false| 0|
+-----+-----+-----+-----+---+
df.groupBy("lname").agg(sum("y")).show()+-----+------+
|lname|sum(y)|
+-----+------+
| Xu| 0|
| Du| 2|
| Tu| 0|
+-----+------+
df.createOrReplaceTempView("people")spark.sql("select fname, count(*) as n from people group by 1").show+-----+---+
|fname| n|
+-----+---+
| Ben| 4|
| Ken| 2|
+-----+---+
sum¶
sumignoresnullWhen all values are
null,sumreturnsnull.
sum ignores null.
val df = Seq(
("2017-01-01", 1L),
("2017-01-01", 10L),
("2017-02-01", 2L),
("2017-02-01", 22L)
).toDF("date", "value").
withColumn("value", when($"value" > 20, null).otherwise($"value"))
df.show+----------+-----+
| date|value|
+----------+-----+
|2017-01-01| 1|
|2017-01-01| 10|
|2017-02-01| 2|
|2017-02-01| null|
+----------+-----+
df = [date: string, value: bigint]
[date: string, value: bigint]df.groupBy("date").agg(sum($"value").alias("s")).show[Stage 0:> (0 + 4) / 4]+----------+---+
| date| s|
+----------+---+
|2017-01-01| 11|
|2017-02-01| 2|
+----------+---+
lastException: Throwable = null
When all values are null, sum returns null.
import org.apache.spark.sql.functions._
val df = spark.read.json("../data/people.json").
withColumn("is_null", when($"age".isNull, 1).otherwise(0))
df.show+----+-------+-------+
| age| name|is_null|
+----+-------+-------+
|null|Michael| 1|
| 30| Andy| 0|
| 19| Justin| 0|
+----+-------+-------+
df = [age: bigint, name: string ... 1 more field]
[age: bigint, name: string ... 1 more field]Specify an alias for the column after aggregation.
df.groupBy("is_null").agg(sum("age").alias("sage")).show+-------+----+
|is_null|sage|
+-------+----+
| 1|null|
| 0| 49|
+-------+----+
Group By Multiple Columns¶
df.groupBy("fname", "lname").sum().show+-----+-----+----------+
|fname|lname|sum(score)|
+-----+-----+----------+
| Ben| Du| 3|
| Ken| Xu| 10|
| Ben| Tu| 7|
+-----+-----+----------+
nullval df = spark.read.json("../../data/people.json")
df.show+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
nullagg¶
import org.apache.spark.sql.functions._
val df = spark.read.json("../../data/people.json").withColumn("is_null", when($"age".isNull, 1).otherwise(0))
df.show+----+-------+-------+
| age| name|is_null|
+----+-------+-------+
|null|Michael| 1|
| 30| Andy| 0|
| 19| Justin| 0|
+----+-------+-------+
nulldf.groupBy("is_null").agg(sum("age").alias("sage")).show+-------+----+
|is_null|sage|
+-------+----+
| 1|null|
| 0| 49|
+-------+----+
nulldf.groupBy("is_null").agg(sum("age").alias("sage"), count("*").alias("cnt")).show+-------+----+---+
|is_null|sage|cnt|
+-------+----+---+
| 1|null| 1|
| 0| 49| 2|
+-------+----+---+
nullCollection¶
collect_list¶
import org.apache.spark.sql.functions._
val df = Seq(
("Ben", 1),
("Ben" ,2),
("Ben", 3),
("Ken", 1),
("Ken", 9)
).toDF("name", "score")
df.show+----+-----+
|name|score|
+----+-----+
| Ben| 1|
| Ben| 2|
| Ben| 3|
| Ken| 1|
| Ken| 9|
+----+-----+
nullval df2 = df.groupBy("name").agg(
collect_list("score").alias("scores")
)
df2.show+----+---------+
|name| scores|
+----+---------+
| Ben|[1, 2, 3]|
| Ken| [1, 9]|
+----+---------+
nulldf2.printSchemaroot
|-- name: string (nullable = true)
|-- scores: array (nullable = true)
| |-- element: integer (containsNull = true)
nullcollect_set¶
val df_copy = Seq(
("Ben", 1),
("Ben", 1),
("Ben" ,2),
("Ben", 3),
("Ken", 1),
("Ken", 9)
).toDF("name", "score")
df_copy.show
val df3 = df_copy.groupBy("name").agg(collect_list("score").alias("scores"))
df3.show()
val df4 = df_copy.groupBy("name").agg(collect_set("score").alias("scores"))
df4.show+----+-----+
|name|score|
+----+-----+
| Ben| 1|
| Ben| 1|
| Ben| 2|
| Ben| 3|
| Ken| 1|
| Ken| 9|
+----+-----+
+----+------------+
|name| scores|
+----+------------+
| Ben|[1, 1, 2, 3]|
| Ken| [1, 9]|
+----+------------+
+----+---------+
|name| scores|
+----+---------+
| Ben|[1, 2, 3]|
| Ken| [9, 1]|
+----+---------+
nullFirst/Last¶
first¶
last¶
Grouping¶
grouping¶
grouping_id¶
val df = spark.read.json("../data/people.json")
df.show+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
Count¶
count¶
import org.apache.spark.sql.functions._
val df = spark.read.json("../../../data/people.json").withColumn("is_null", when($"age".isNull, 1).otherwise(0))
df.show[Stage 0:> (0 + 2) / 2]+----+-------+-------+
| age| name|is_null|
+----+-------+-------+
|null|Michael| 1|
| 30| Andy| 0|
| 19| Justin| 0|
+----+-------+-------+
df = [age: bigint, name: string ... 1 more field]
lastException: Throwable = null
[age: bigint, name: string ... 1 more field]df.groupBy("is_null").count().show()+-------+-----+
|is_null|count|
+-------+-----+
| 1| 1|
| 0| 2|
+-------+-----+
df.groupBy("is_null").agg(count("*").as("total")).show+-------+-----+
|is_null|total|
+-------+-----+
| 1| 1|
| 0| 2|
+-------+-----+
df.groupBy("is_null").agg(count(when($"name" === "Andy", 1).otherwise(null))).show+-------+---------------------------------------------------+
|is_null|count(CASE WHEN (name = Andy) THEN 1 ELSE NULL END)|
+-------+---------------------------------------------------+
| 1| 0|
| 0| 1|
+-------+---------------------------------------------------+
df.groupBy("is_null").agg(sum(when($"name" === "Andy", 1).otherwise(0))).show[Stage 41:================================> (44 + 8) / 75]+-------+----------------------------------------------+
|is_null|sum(CASE WHEN (name = Andy) THEN 1 ELSE 0 END)|
+-------+----------------------------------------------+
| 1| 0|
| 0| 1|
+-------+----------------------------------------------+
df.groupBy("is_null").agg(count("*").alias("total")).show+-------+-----+
|is_null|total|
+-------+-----+
| 1| 1|
| 0| 2|
+-------+-----+
countDistinct¶
df.groupBy("is_null").agg(countDistinct("is_null").alias("total")).show+-------+-----+
|is_null|total|
+-------+-----+
| 1| 1|
| 0| 1|
+-------+-----+
approx_count_distinct¶
Sum¶
sum¶
sumDistinct¶
Extreme Values¶
max¶
min¶
Mean/Average¶
avg¶
How does average behave on null values?
mean¶
Standard Deviation¶
stddev¶
stddev_pop¶
stddev_samp¶
Variance¶
var_pop¶
var_sample¶
variance¶
Correlation & Covariance¶
corr¶
covar_pop¶
covar_samp¶
Skewness & Kurtosis¶
skewness¶
kurtosis¶
References¶
https://
https://
https://
https://
https://
https://
https://