Things on this page are fragmentary and immature notes/thoughts of the author. Please read with your own judgement!
Window with orderBy¶
It is tricky!!!
If you provide ORDER BY clause then the default frame is RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:
https://
Avoid using last and use first with
descending order byinstead. This gives less surprisings.Do NOT use order by if not necessary. It introduces unnecessary ...
import findspark
findspark.init("/opt/spark")
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import *
from pyspark.sql.types import StructType
spark = SparkSession.builder.appName("PySpark").enableHiveSupport().getOrCreate()from pyspark.sql import Windowimport pandas as pdcust_p = pd.DataFrame(
data=[
("Alice", "2016-05-01", 50.00, 1),
("Alice", "2016-05-01", 45.00, 2),
("Alice", "2016-05-02", 55.00, 3),
("Alice", "2016-05-02", 100.00, 4),
("Bob", "2016-05-01", 25.00, 5),
("Bob", "2016-05-01", 29.00, 6),
("Bob", "2016-05-02", 27.00, 7),
("Bob", "2016-05-02", 30.00, 8),
],
columns=("name", "date", "amount", "id"),
)
cust_pcust = spark.createDataFrame(cust_p)
cust.show()+-----+----------+------+---+
| name| date|amount| id|
+-----+----------+------+---+
|Alice|2016-05-01| 50.0| 1|
|Alice|2016-05-01| 45.0| 2|
|Alice|2016-05-02| 55.0| 3|
|Alice|2016-05-02| 100.0| 4|
| Bob|2016-05-01| 25.0| 5|
| Bob|2016-05-01| 29.0| 6|
| Bob|2016-05-02| 27.0| 7|
| Bob|2016-05-02| 30.0| 8|
+-----+----------+------+---+
cust.orderBy("name", "date").show()+-----+----------+------+---+
| name| date|amount| id|
+-----+----------+------+---+
|Alice|2016-05-01| 50.0| 1|
|Alice|2016-05-01| 45.0| 2|
|Alice|2016-05-02| 55.0| 3|
|Alice|2016-05-02| 100.0| 4|
| Bob|2016-05-01| 25.0| 5|
| Bob|2016-05-01| 29.0| 6|
| Bob|2016-05-02| 27.0| 7|
| Bob|2016-05-02| 30.0| 8|
+-----+----------+------+---+
Create a temp view for testing Spark SQL (to compare with the result of PySpark DataFrame API).
cust.createOrReplaceTempView("customers")max¶
max works well on over ... partition ... when order by is not used.
cust.select(
col("name"),
col("date"),
col("amount"),
col("id"),
max(col("amount")).over(Window.partitionBy("name", "date")).alias("max_amount"),
).orderBy("name", "date").show()+-----+----------+------+---+----------+
| name| date|amount| id|max_amount|
+-----+----------+------+---+----------+
|Alice|2016-05-01| 50.0| 1| 50.0|
|Alice|2016-05-01| 45.0| 2| 50.0|
|Alice|2016-05-02| 55.0| 3| 100.0|
|Alice|2016-05-02| 100.0| 4| 100.0|
| Bob|2016-05-01| 25.0| 5| 29.0|
| Bob|2016-05-01| 29.0| 6| 29.0|
| Bob|2016-05-02| 27.0| 7| 30.0|
| Bob|2016-05-02| 30.0| 8| 30.0|
+-----+----------+------+---+----------+
spark.sql(
"""
select
name,
date,
amount,
id,
max(amount) over (partition by name, date) as max_amount
from
customers
"""
).orderBy("name", "date").show()+-----+----------+------+---+----------+
| name| date|amount| id|max_amount|
+-----+----------+------+---+----------+
|Alice|2016-05-01| 50.0| 1| 50.0|
|Alice|2016-05-01| 45.0| 2| 50.0|
|Alice|2016-05-02| 55.0| 3| 100.0|
|Alice|2016-05-02| 100.0| 4| 100.0|
| Bob|2016-05-01| 25.0| 5| 29.0|
| Bob|2016-05-01| 29.0| 6| 29.0|
| Bob|2016-05-02| 27.0| 7| 30.0|
| Bob|2016-05-02| 30.0| 8| 30.0|
+-----+----------+------+---+----------+
The following results might surprise people.
There is nothing wrong in code.
It is only that when order by is used,
the default frame for window functions (max in this case) is unbounded preceding and the current row.
cust.select(
col("name"),
col("date"),
col("amount"),
col("id"),
max(col("amount"))
.over(Window.partitionBy("name", "date").orderBy("id"))
.alias("max_amount"),
).orderBy("name", "date").show()+-----+----------+------+---+----------+
| name| date|amount| id|max_amount|
+-----+----------+------+---+----------+
|Alice|2016-05-01| 50.0| 1| 50.0|
|Alice|2016-05-01| 45.0| 2| 50.0|
|Alice|2016-05-02| 55.0| 3| 55.0|
|Alice|2016-05-02| 100.0| 4| 100.0|
| Bob|2016-05-01| 25.0| 5| 25.0|
| Bob|2016-05-01| 29.0| 6| 29.0|
| Bob|2016-05-02| 27.0| 7| 27.0|
| Bob|2016-05-02| 30.0| 8| 30.0|
+-----+----------+------+---+----------+
spark.sql(
"""
select
name,
date,
amount,
id,
max(amount) over (partition by name, date order by id) as max_amount
from
customers
"""
).orderBy("name", "date").show()+-----+----------+------+---+----------+
| name| date|amount| id|max_amount|
+-----+----------+------+---+----------+
|Alice|2016-05-01| 50.0| 1| 50.0|
|Alice|2016-05-01| 45.0| 2| 50.0|
|Alice|2016-05-02| 55.0| 3| 55.0|
|Alice|2016-05-02| 100.0| 4| 100.0|
| Bob|2016-05-01| 25.0| 5| 25.0|
| Bob|2016-05-01| 29.0| 6| 29.0|
| Bob|2016-05-02| 27.0| 7| 27.0|
| Bob|2016-05-02| 30.0| 8| 30.0|
+-----+----------+------+---+----------+
rank¶
The window function rank requires order by to be used.
cust.select(
col("name"),
col("date"),
col("amount"),
col("id"),
rank().over(Window.partitionBy("name", "date").orderBy("amount")).alias("rank"),
).orderBy("name", "date").show()+-----+----------+------+---+----+
| name| date|amount| id|rank|
+-----+----------+------+---+----+
|Alice|2016-05-01| 45.0| 2| 1|
|Alice|2016-05-01| 50.0| 1| 2|
|Alice|2016-05-02| 55.0| 3| 1|
|Alice|2016-05-02| 100.0| 4| 2|
| Bob|2016-05-01| 25.0| 5| 1|
| Bob|2016-05-01| 29.0| 6| 2|
| Bob|2016-05-02| 27.0| 7| 1|
| Bob|2016-05-02| 30.0| 8| 2|
+-----+----------+------+---+----+
spark.sql(
"""
select
name,
date,
amount,
id,
rank() over (partition by name, date ORDER BY amount DESC) as rank
from
customers
"""
).orderBy("name", "date").show()+-----+----------+------+---+----+
| name| date|amount| id|rank|
+-----+----------+------+---+----+
|Alice|2016-05-01| 50.0| 1| 1|
|Alice|2016-05-01| 45.0| 2| 2|
|Alice|2016-05-02| 100.0| 4| 1|
|Alice|2016-05-02| 55.0| 3| 2|
| Bob|2016-05-01| 29.0| 6| 1|
| Bob|2016-05-01| 25.0| 5| 2|
| Bob|2016-05-02| 30.0| 8| 1|
| Bob|2016-05-02| 27.0| 7| 2|
+-----+----------+------+---+----+
cust.select(
col("name"),
col("date"),
col("amount"),
col("id"),
rank().over(Window.partitionBy("name").orderBy("date")).alias("rank"),
).orderBy("name").show()+-----+----------+------+---+----+
| name| date|amount| id|rank|
+-----+----------+------+---+----+
|Alice|2016-05-01| 50.0| 1| 1|
|Alice|2016-05-01| 45.0| 2| 1|
|Alice|2016-05-02| 55.0| 3| 3|
|Alice|2016-05-02| 100.0| 4| 3|
| Bob|2016-05-01| 25.0| 5| 1|
| Bob|2016-05-02| 30.0| 8| 3|
| Bob|2016-05-01| 29.0| 6| 1|
| Bob|2016-05-02| 27.0| 7| 3|
+-----+----------+------+---+----+
spark.sql(
"""
select
name,
date,
amount,
id,
rank() over (partition by name ORDER BY date DESC) as rank
from
customers
"""
).orderBy("name").show()+-----+----------+------+---+----+
| name| date|amount| id|rank|
+-----+----------+------+---+----+
|Alice|2016-05-02| 55.0| 3| 1|
|Alice|2016-05-02| 100.0| 4| 1|
|Alice|2016-05-01| 50.0| 1| 3|
|Alice|2016-05-01| 45.0| 2| 3|
| Bob|2016-05-02| 27.0| 7| 1|
| Bob|2016-05-02| 30.0| 8| 1|
| Bob|2016-05-01| 25.0| 5| 3|
| Bob|2016-05-01| 29.0| 6| 3|
+-----+----------+------+---+----+
dense_rank¶
cust.select(
col("name"),
col("date"),
col("amount"),
col("id"),
dense_rank().over(Window.partitionBy("name").orderBy("date")).alias("dense_rank"),
).orderBy("name").show()+-----+----------+------+---+----------+
| name| date|amount| id|dense_rank|
+-----+----------+------+---+----------+
|Alice|2016-05-01| 50.0| 1| 1|
|Alice|2016-05-01| 45.0| 2| 1|
|Alice|2016-05-02| 55.0| 3| 2|
|Alice|2016-05-02| 100.0| 4| 2|
| Bob|2016-05-01| 25.0| 5| 1|
| Bob|2016-05-01| 29.0| 6| 1|
| Bob|2016-05-02| 27.0| 7| 2|
| Bob|2016-05-02| 30.0| 8| 2|
+-----+----------+------+---+----------+
spark.sql(
"""
select
name,
date,
amount,
id,
dense_rank() over (partition by name ORDER BY date DESC) as rank
from
customers
"""
).orderBy("name").show()+-----+----------+------+---+----+
| name| date|amount| id|rank|
+-----+----------+------+---+----+
|Alice|2016-05-02| 55.0| 3| 1|
|Alice|2016-05-02| 100.0| 4| 1|
|Alice|2016-05-01| 50.0| 1| 2|
|Alice|2016-05-01| 45.0| 2| 2|
| Bob|2016-05-02| 27.0| 7| 1|
| Bob|2016-05-02| 30.0| 8| 1|
| Bob|2016-05-01| 25.0| 5| 2|
| Bob|2016-05-01| 29.0| 6| 2|
+-----+----------+------+---+----+
first¶
customers.select(
$"name",
$"date",
$"amount",
$"id",
first($"amount").over(Window.partitionBy("name", "date").orderBy("id")).alias("first_amount")
).orderBy("name", "date").show+-----+----------+------+---+------------+
| name| date|amount| id|first_amount|
+-----+----------+------+---+------------+
|Alice|2016-05-01| 50.0| 1| 50.0|
|Alice|2016-05-01| 45.0| 2| 50.0|
|Alice|2016-05-02| 55.0| 3| 55.0|
|Alice|2016-05-02| 100.0| 4| 55.0|
| Bob|2016-05-01| 25.0| 5| 25.0|
| Bob|2016-05-01| 29.0| 6| 25.0|
| Bob|2016-05-02| 27.0| 7| 27.0|
| Bob|2016-05-02| 30.0| 8| 27.0|
+-----+----------+------+---+------------+
spark.sql(
"""
select
name,
date,
amount,
id,
first(amount) over (partition by name, date order by id) as first_amount
from
customers
"""
).orderBy("name", "date").show+-----+----------+------+---+------------+
| name| date|amount| id|first_amount|
+-----+----------+------+---+------------+
|Alice|2016-05-01| 50.0| 1| 50.0|
|Alice|2016-05-01| 45.0| 2| 50.0|
|Alice|2016-05-02| 55.0| 3| 55.0|
|Alice|2016-05-02| 100.0| 4| 55.0|
| Bob|2016-05-01| 25.0| 5| 25.0|
| Bob|2016-05-01| 29.0| 6| 25.0|
| Bob|2016-05-02| 27.0| 7| 27.0|
| Bob|2016-05-02| 30.0| 8| 27.0|
+-----+----------+------+---+------------+
customers.select(
$"name",
$"date",
$"amount",
$"id",
last($"amount").over(Window.partitionBy("name", "date").orderBy("id")).alias("last_amount")
).orderBy("name", "date").show+-----+----------+------+---+-----------+
| name| date|amount| id|last_amount|
+-----+----------+------+---+-----------+
|Alice|2016-05-01| 50.0| 1| 50.0|
|Alice|2016-05-01| 45.0| 2| 45.0|
|Alice|2016-05-02| 100.0| 4| 100.0|
|Alice|2016-05-02| 55.0| 3| 55.0|
| Bob|2016-05-01| 25.0| 5| 25.0|
| Bob|2016-05-01| 29.0| 6| 29.0|
| Bob|2016-05-02| 27.0| 7| 27.0|
| Bob|2016-05-02| 30.0| 8| 30.0|
+-----+----------+------+---+-----------+
spark.sql(
"""
select
name,
date,
amount,
id,
last(amount) over (partition by name, date order by id) as last_amount
from
customers
"""
).orderBy("name", "date").show+-----+----------+------+---+-----------+
| name| date|amount| id|last_amount|
+-----+----------+------+---+-----------+
|Alice|2016-05-01| 50.0| 1| 50.0|
|Alice|2016-05-01| 45.0| 2| 45.0|
|Alice|2016-05-02| 55.0| 3| 55.0|
|Alice|2016-05-02| 100.0| 4| 100.0|
| Bob|2016-05-01| 25.0| 5| 25.0|
| Bob|2016-05-01| 29.0| 6| 29.0|
| Bob|2016-05-02| 27.0| 7| 27.0|
| Bob|2016-05-02| 30.0| 8| 30.0|
+-----+----------+------+---+-----------+
customers.select(
$"name",
$"date",
$"amount",
$"id",
first($"amount").over(Window.partitionBy("name", "date").orderBy($"id".desc)).alias("last_amount")
).orderBy("name", "date").show+-----+----------+------+---+-----------+
| name| date|amount| id|last_amount|
+-----+----------+------+---+-----------+
|Alice|2016-05-01| 45.0| 2| 45.0|
|Alice|2016-05-01| 50.0| 1| 45.0|
|Alice|2016-05-02| 100.0| 4| 100.0|
|Alice|2016-05-02| 55.0| 3| 100.0|
| Bob|2016-05-01| 29.0| 6| 29.0|
| Bob|2016-05-01| 25.0| 5| 29.0|
| Bob|2016-05-02| 30.0| 8| 30.0|
| Bob|2016-05-02| 27.0| 7| 30.0|
+-----+----------+------+---+-----------+
spark.sql(
"""
select
name,
date,
amount,
id,
first(amount) over (partition by name, date order by id desc) as last_amount
from
customers
"""
).orderBy("name", "date").show+-----+----------+------+---+-----------+
| name| date|amount| id|last_amount|
+-----+----------+------+---+-----------+
|Alice|2016-05-01| 45.0| 2| 45.0|
|Alice|2016-05-01| 50.0| 1| 45.0|
|Alice|2016-05-02| 100.0| 4| 100.0|
|Alice|2016-05-02| 55.0| 3| 100.0|
| Bob|2016-05-01| 29.0| 6| 29.0|
| Bob|2016-05-01| 25.0| 5| 29.0|
| Bob|2016-05-02| 30.0| 8| 30.0|
| Bob|2016-05-02| 27.0| 7| 30.0|
+-----+----------+------+---+-----------+
partition by with group by¶
Avoid doing so!!!
spark.sql(
"""
select
name,
date,
first(amount) over (partition by name, date order by id desc) as last_amount
from
customers
group by
name, date
"""
).orderBy("name", "date").showorg.apache.spark.sql.AnalysisException: expression 'customers.`amount`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;;
Project [name#9, date#10, last_amount#30]
+- Project [name#9, date#10, amount#11, id#12, last_amount#30, last_amount#30]
+- Window [first(amount#11, false) windowspecdefinition(name#9, date#10, id#12 DESC NULLS LAST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS last_amount#30], [name#9, date#10], [id#12 DESC NULLS LAST]
+- Aggregate [name#9, date#10], [name#9, date#10, amount#11, id#12]
+- SubqueryAlias `customers`
+- Project [_1#4 AS name#9, _2#5 AS date#10, _3#6 AS amount#11, _4#7 AS id#12]
+- LocalRelation [_1#4, _2#5, _3#6, _4#7]
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:42)
at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:95)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.org$apache$spark$sql$catalyst$analysis$CheckAnalysis$class$$anonfun$$checkValidAggregateExpression$1(CheckAnalysis.scala:224)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$10.apply(CheckAnalysis.scala:257)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$10.apply(CheckAnalysis.scala:257)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:257)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:85)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:85)
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:95)
at org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:108)
at org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:105)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:201)
at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:78)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
... 48 elidedspark.sql(
"""
select
name,
date,
first(max(amount)) over (partition by name, date order by id desc) as last_amount
from
customers
group by
name, date
"""
).orderBy("name", "date").showorg.apache.spark.sql.AnalysisException: expression 'customers.`id`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;;
Project [name#9, date#10, last_amount#36]
+- Project [name#9, date#10, _w0#39, id#12, last_amount#36, last_amount#36]
+- Window [first(_w0#39, false) windowspecdefinition(name#9, date#10, id#12 DESC NULLS LAST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS last_amount#36], [name#9, date#10], [id#12 DESC NULLS LAST]
+- Aggregate [name#9, date#10], [name#9, date#10, max(amount#11) AS _w0#39, id#12]
+- SubqueryAlias `customers`
+- Project [_1#4 AS name#9, _2#5 AS date#10, _3#6 AS amount#11, _4#7 AS id#12]
+- LocalRelation [_1#4, _2#5, _3#6, _4#7]
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:42)
at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:95)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.org$apache$spark$sql$catalyst$analysis$CheckAnalysis$class$$anonfun$$checkValidAggregateExpression$1(CheckAnalysis.scala:224)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$10.apply(CheckAnalysis.scala:257)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$10.apply(CheckAnalysis.scala:257)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:257)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:85)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:85)
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:95)
at org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:108)
at org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:105)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:201)
at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:78)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
... 48 elidedcustomers.orderBy("name", "date").show+-----+----------+------+---+
| name| date|amount| id|
+-----+----------+------+---+
|Alice|2016-05-01| 50.0| 1|
|Alice|2016-05-01| 45.0| 2|
|Alice|2016-05-02| 55.0| 3|
|Alice|2016-05-02| 100.0| 4|
| Bob|2016-05-01| 29.0| 6|
| Bob|2016-05-01| 25.0| 5|
| Bob|2016-05-02| 27.0| 7|
| Bob|2016-05-02| 30.0| 8|
+-----+----------+------+---+
spark.sql(
"""
select
name,
date,
amount,
id,
row_number() over (partition by name, date order by id desc) as rownum
from
customers
"""
).orderBy("name", "date").show+-----+----------+------+---+------+
| name| date|amount| id|rownum|
+-----+----------+------+---+------+
|Alice|2016-05-01| 45.0| 2| 1|
|Alice|2016-05-01| 50.0| 1| 2|
|Alice|2016-05-02| 100.0| 4| 1|
|Alice|2016-05-02| 55.0| 3| 2|
| Bob|2016-05-01| 29.0| 6| 1|
| Bob|2016-05-01| 25.0| 5| 2|
| Bob|2016-05-02| 30.0| 8| 1|
| Bob|2016-05-02| 27.0| 7| 2|
+-----+----------+------+---+------+
spark.sql(
"""
select
*
from (
select
name,
date,
amount,
id,
row_number() over (partition by name, date order by id desc) as rownum
from
customers
) A
where
rownum = 1
"""
).orderBy("name", "date").show+-----+----------+------+---+------+
| name| date|amount| id|rownum|
+-----+----------+------+---+------+
|Alice|2016-05-01| 45.0| 2| 1|
|Alice|2016-05-02| 100.0| 4| 1|
| Bob|2016-05-01| 29.0| 6| 1|
| Bob|2016-05-02| 30.0| 8| 1|
+-----+----------+------+---+------+
http://
https://
Define a window parition. It does not have to be associated with a table.
Window.partitionBy("col1", "col2")org.apache.spark.sql.expressions.WindowSpec@4f336bdcval customers = Seq(
("Alice", "2016-05-01", 50.00, 1),
("Alice", "2016-05-01", 45.00, 2),
("Alice", "2016-05-02", 55.00, 3),
("Alice", "2016-05-02", 100.00, 4),
("Bob", "2016-05-01", 25.00, 5),
("Bob", "2016-05-01", 29.00, 6),
("Bob", "2016-05-02", 27.00,7 ),
("Bob", "2016-05-02", 30.00, 8)
).toDF("name", "date", "amount", "id")
customers.show+-----+----------+------+---+
| name| date|amount| id|
+-----+----------+------+---+
|Alice|2016-05-01| 50.0| 1|
|Alice|2016-05-01| 45.0| 2|
|Alice|2016-05-02| 55.0| 3|
|Alice|2016-05-02| 100.0| 4|
| Bob|2016-05-01| 25.0| 5|
| Bob|2016-05-01| 29.0| 6|
| Bob|2016-05-02| 27.0| 7|
| Bob|2016-05-02| 30.0| 8|
+-----+----------+------+---+
nullval winSpec = Window.partitionBy("name", "date")
customers.select(
$"name",
$"date",
$"amount",
$"id",
avg($"amount").over(winSpec).alias("avg_amt"),
max($"id").over(winSpec).alias("max_id")
).show+-----+----------+------+---+-------+------+
| name| date|amount| id|avg_amt|max_id|
+-----+----------+------+---+-------+------+
| Bob|2016-05-01| 25.0| 5| 27.0| 6|
| Bob|2016-05-01| 29.0| 6| 27.0| 6|
| Bob|2016-05-02| 27.0| 7| 28.5| 8|
| Bob|2016-05-02| 30.0| 8| 28.5| 8|
|Alice|2016-05-02| 55.0| 3| 77.5| 4|
|Alice|2016-05-02| 100.0| 4| 77.5| 4|
|Alice|2016-05-01| 50.0| 1| 47.5| 2|
|Alice|2016-05-01| 45.0| 2| 47.5| 2|
+-----+----------+------+---+-------+------+
nullval winSpec = Window.partitionBy("name", "date")
customers.select(
$"name",
$"date",
$"amount",
$"id",
(avg($"amount").over(winSpec) + max($"id").over(winSpec) * 100).alias("new_column")
).show+-----+----------+------+---+----------+
| name| date|amount| id|new_column|
+-----+----------+------+---+----------+
| Bob|2016-05-01| 25.0| 5| 627.0|
| Bob|2016-05-01| 29.0| 6| 627.0|
| Bob|2016-05-02| 27.0| 7| 828.5|
| Bob|2016-05-02| 30.0| 8| 828.5|
|Alice|2016-05-02| 55.0| 3| 477.5|
|Alice|2016-05-02| 100.0| 4| 477.5|
|Alice|2016-05-01| 50.0| 1| 247.5|
|Alice|2016-05-01| 45.0| 2| 247.5|
+-----+----------+------+---+----------+
nullcustomers.withColumn("avg",
avg($"amount").over(Window.partitionBy("name", "date"))
).show()+-----+----------+------+----+
| name| date|amount| avg|
+-----+----------+------+----+
| Bob|2016-05-01| 25.0|27.0|
| Bob|2016-05-01| 29.0|27.0|
| Bob|2016-05-02| 27.0|28.5|
| Bob|2016-05-02| 30.0|28.5|
|Alice|2016-05-02| 55.0|77.5|
|Alice|2016-05-02| 100.0|77.5|
|Alice|2016-05-01| 50.0|47.5|
|Alice|2016-05-01| 45.0|47.5|
+-----+----------+------+----+
val customers = Seq(
("Alice", "2016-05-01", 50.00),
("Alice", "2016-05-03", 45.00),
("Alice", "2016-05-04", 55.00),
("Bob", "2016-05-01", 25.00),
("Bob", "2016-05-04", 29.00),
("Bob", "2016-05-06", 27.00)
).toDF("name", "date", "amountSpent")
customers.show+-----+----------+-----------+
| name| date|amountSpent|
+-----+----------+-----------+
|Alice|2016-05-01| 50.0|
|Alice|2016-05-03| 45.0|
|Alice|2016-05-04| 55.0|
| Bob|2016-05-01| 25.0|
| Bob|2016-05-04| 29.0|
| Bob|2016-05-06| 27.0|
+-----+----------+-----------+
nullMoving Average¶
val wSpec1 = Window.partitionBy("name").orderBy("date").rowsBetween(-1, 1)customers.withColumn("movingAvg", avg(customers("amountSpent")).over(wSpec1)).show()+-----+----------+-----------+---------+
| name| date|amountSpent|movingAvg|
+-----+----------+-----------+---------+
| Bob|2016-05-01| 25.0| 27.0|
| Bob|2016-05-04| 29.0| 27.0|
| Bob|2016-05-06| 27.0| 28.0|
|Alice|2016-05-01| 50.0| 47.5|
|Alice|2016-05-03| 45.0| 50.0|
|Alice|2016-05-04| 55.0| 50.0|
+-----+----------+-----------+---------+
Cumulative Sum¶
val wSpec2 = Window.partitionBy("name").orderBy("date").rowsBetween(Long.MinValue, 0)customers.withColumn("cumSum", sum(customers("amountSpent")).over(wSpec2)).show()+-----+----------+-----------+------+
| name| date|amountSpent|cumSum|
+-----+----------+-----------+------+
| Bob|2016-05-01| 25.0| 25.0|
| Bob|2016-05-04| 29.0| 54.0|
| Bob|2016-05-06| 27.0| 81.0|
|Alice|2016-05-01| 50.0| 50.0|
|Alice|2016-05-03| 45.0| 95.0|
|Alice|2016-05-04| 55.0| 150.0|
+-----+----------+-----------+------+
Data from previous row¶
val wSpec3 = Window.partitionBy("name").orderBy("date")customers.withColumn(
"prevAmountSpent", lag(customers("amountSpent"), 1).over(wSpec3)
).show()+-----+----------+-----------+---------------+
| name| date|amountSpent|prevAmountSpent|
+-----+----------+-----------+---------------+
| Bob|2016-05-01| 25.0| null|
| Bob|2016-05-04| 29.0| 25.0|
| Bob|2016-05-06| 27.0| 29.0|
|Alice|2016-05-01| 50.0| null|
|Alice|2016-05-03| 45.0| 50.0|
|Alice|2016-05-04| 55.0| 45.0|
+-----+----------+-----------+---------------+
row_number¶
val wSpec3 = Window.partitionBy("name").orderBy("date")
customers.withColumn("row_num",
row_number().over(wSpec3)
).show()+-----+----------+-----------+-------+
| name| date|amountSpent|row_num|
+-----+----------+-----------+-------+
| Bob|2016-05-01| 25.0| 1|
| Bob|2016-05-04| 29.0| 2|
| Bob|2016-05-06| 27.0| 3|
|Alice|2016-05-01| 50.0| 1|
|Alice|2016-05-03| 45.0| 2|
|Alice|2016-05-04| 55.0| 3|
+-----+----------+-----------+-------+
percentRank¶
ntile¶
first¶
last¶
lag¶
lead¶
cume_dist¶
val customers = Seq(
("Alice", "2016-05-01", 50.00, 1),
("Alice", "2016-05-01", 45.00, 2),
("Alice", "2016-05-02", 55.00, 3),
("Alice", "2016-05-02", 100.00, 4),
("Bob", "2016-05-01", 25.00, 5),
("Bob", "2016-05-01", 29.00, 6),
("Bob", "2016-05-02", 27.00,7 ),
("Bob", "2016-05-02", 30.00, 8)
).toDF("name", "date", "amount", "id")
customers.orderBy("name", "date").show+-----+----------+------+---+
| name| date|amount| id|
+-----+----------+------+---+
|Alice|2016-05-01| 50.0| 1|
|Alice|2016-05-01| 45.0| 2|
|Alice|2016-05-02| 55.0| 3|
|Alice|2016-05-02| 100.0| 4|
| Bob|2016-05-01| 29.0| 6|
| Bob|2016-05-01| 25.0| 5|
| Bob|2016-05-02| 27.0| 7|
| Bob|2016-05-02| 30.0| 8|
+-----+----------+------+---+
nullcustomers.createOrReplaceTempView("customers")Comment¶
Do NOT use orderBy if the order does not matter!!!
val wSpec = Window.partitionBy("name", "date").orderBy("id")org.apache.spark.sql.expressions.WindowSpec@4f418d5acustomers.select(
$"name",
$"date",
$"amount",
$"id",
max($"amount").over(Window.partitionBy("name", "date")).alias("max_amount")
).orderBy("name", "date").show+-----+----------+------+---+----------+
| name| date|amount| id|max_amount|
+-----+----------+------+---+----------+
|Alice|2016-05-01| 50.0| 1| 50.0|
|Alice|2016-05-01| 45.0| 2| 50.0|
|Alice|2016-05-02| 55.0| 3| 100.0|
|Alice|2016-05-02| 100.0| 4| 100.0|
| Bob|2016-05-01| 25.0| 5| 29.0|
| Bob|2016-05-01| 29.0| 6| 29.0|
| Bob|2016-05-02| 27.0| 7| 30.0|
| Bob|2016-05-02| 30.0| 8| 30.0|
+-----+----------+------+---+----------+
spark.sql(
"""
select
name,
date,
amount,
id,
max(amount) over (partition by name, date) as max_amount
from
customers
"""
).orderBy("name", "date").show+-----+----------+------+---+----------+
| name| date|amount| id|max_amount|
+-----+----------+------+---+----------+
|Alice|2016-05-01| 50.0| 1| 50.0|
|Alice|2016-05-01| 45.0| 2| 50.0|
|Alice|2016-05-02| 55.0| 3| 100.0|
|Alice|2016-05-02| 100.0| 4| 100.0|
| Bob|2016-05-01| 25.0| 5| 29.0|
| Bob|2016-05-01| 29.0| 6| 29.0|
| Bob|2016-05-02| 27.0| 7| 30.0|
| Bob|2016-05-02| 30.0| 8| 30.0|
+-----+----------+------+---+----------+
customers.select(
$"name",
$"date",
$"amount",
$"id",
max($"amount").over(Window.partitionBy("name", "date").orderBy("id")).alias("max_amount")
).orderBy("name", "date").show+-----+----------+------+---+----------+
| name| date|amount| id|max_amount|
+-----+----------+------+---+----------+
|Alice|2016-05-01| 50.0| 1| 50.0|
|Alice|2016-05-01| 45.0| 2| 50.0|
|Alice|2016-05-02| 55.0| 3| 55.0|
|Alice|2016-05-02| 100.0| 4| 100.0|
| Bob|2016-05-01| 25.0| 5| 25.0|
| Bob|2016-05-01| 29.0| 6| 29.0|
| Bob|2016-05-02| 27.0| 7| 27.0|
| Bob|2016-05-02| 30.0| 8| 30.0|
+-----+----------+------+---+----------+
spark.sql(
"""
select
name,
date,
amount,
id,
max(amount) over (partition by name, date order by id) as max_amount
from
customers
"""
).orderBy("name", "date").show+-----+----------+------+---+----------+
| name| date|amount| id|max_amount|
+-----+----------+------+---+----------+
|Alice|2016-05-01| 50.0| 1| 50.0|
|Alice|2016-05-01| 45.0| 2| 50.0|
|Alice|2016-05-02| 55.0| 3| 55.0|
|Alice|2016-05-02| 100.0| 4| 100.0|
| Bob|2016-05-01| 25.0| 5| 25.0|
| Bob|2016-05-01| 29.0| 6| 29.0|
| Bob|2016-05-02| 27.0| 7| 27.0|
| Bob|2016-05-02| 30.0| 8| 30.0|
+-----+----------+------+---+----------+
customers.select(
$"name",
$"date",
$"amount",
$"id",
first($"amount").over(Window.partitionBy("name", "date").orderBy("id")).alias("first_amount")
).orderBy("name", "date").show+-----+----------+------+---+------------+
| name| date|amount| id|first_amount|
+-----+----------+------+---+------------+
|Alice|2016-05-01| 50.0| 1| 50.0|
|Alice|2016-05-01| 45.0| 2| 50.0|
|Alice|2016-05-02| 55.0| 3| 55.0|
|Alice|2016-05-02| 100.0| 4| 55.0|
| Bob|2016-05-01| 25.0| 5| 25.0|
| Bob|2016-05-01| 29.0| 6| 25.0|
| Bob|2016-05-02| 27.0| 7| 27.0|
| Bob|2016-05-02| 30.0| 8| 27.0|
+-----+----------+------+---+------------+
spark.sql(
"""
select
name,
date,
amount,
id,
first(amount) over (partition by name, date order by id) as first_amount
from
customers
"""
).orderBy("name", "date").show+-----+----------+------+---+------------+
| name| date|amount| id|first_amount|
+-----+----------+------+---+------------+
|Alice|2016-05-01| 50.0| 1| 50.0|
|Alice|2016-05-01| 45.0| 2| 50.0|
|Alice|2016-05-02| 55.0| 3| 55.0|
|Alice|2016-05-02| 100.0| 4| 55.0|
| Bob|2016-05-01| 25.0| 5| 25.0|
| Bob|2016-05-01| 29.0| 6| 25.0|
| Bob|2016-05-02| 27.0| 7| 27.0|
| Bob|2016-05-02| 30.0| 8| 27.0|
+-----+----------+------+---+------------+
customers.select(
$"name",
$"date",
$"amount",
$"id",
last($"amount").over(Window.partitionBy("name", "date").orderBy("id")).alias("last_amount")
).orderBy("name", "date").show+-----+----------+------+---+-----------+
| name| date|amount| id|last_amount|
+-----+----------+------+---+-----------+
|Alice|2016-05-01| 50.0| 1| 50.0|
|Alice|2016-05-01| 45.0| 2| 45.0|
|Alice|2016-05-02| 55.0| 3| 55.0|
|Alice|2016-05-02| 100.0| 4| 100.0|
| Bob|2016-05-01| 25.0| 5| 25.0|
| Bob|2016-05-01| 29.0| 6| 29.0|
| Bob|2016-05-02| 27.0| 7| 27.0|
| Bob|2016-05-02| 30.0| 8| 30.0|
+-----+----------+------+---+-----------+
spark.sql(
"""
select
name,
date,
amount,
id,
last(amount) over (partition by name, date order by id) as first_amount
from
customers
"""
).orderBy("name", "date").show+-----+----------+------+---+------------+
| name| date|amount| id|first_amount|
+-----+----------+------+---+------------+
|Alice|2016-05-01| 50.0| 1| 50.0|
|Alice|2016-05-01| 45.0| 2| 45.0|
|Alice|2016-05-02| 55.0| 3| 55.0|
|Alice|2016-05-02| 100.0| 4| 100.0|
| Bob|2016-05-01| 25.0| 5| 25.0|
| Bob|2016-05-01| 29.0| 6| 29.0|
| Bob|2016-05-02| 27.0| 7| 27.0|
| Bob|2016-05-02| 30.0| 8| 30.0|
+-----+----------+------+---+------------+
spark.sql(
"""
select
name,
date,
amount,
id,
first(amount) over (partition by name, date order by id desc) as first_amount
from
customers
"""
).orderBy("name", "date").show+-----+----------+------+---+------------+
| name| date|amount| id|first_amount|
+-----+----------+------+---+------------+
|Alice|2016-05-01| 45.0| 2| 45.0|
|Alice|2016-05-01| 50.0| 1| 45.0|
|Alice|2016-05-02| 100.0| 4| 100.0|
|Alice|2016-05-02| 55.0| 3| 100.0|
| Bob|2016-05-01| 29.0| 6| 29.0|
| Bob|2016-05-01| 25.0| 5| 29.0|
| Bob|2016-05-02| 30.0| 8| 30.0|
| Bob|2016-05-02| 27.0| 7| 30.0|
+-----+----------+------+---+------------+
val customers = Seq(
("Alice", "1", "2016-05-01", 50.00),
("Alice", "1", "2016-05-03", 45.00),
("Alice", "2", "2016-05-04", 55.00),
("Bob", "2", "2016-05-01", 25.00),
("Bob", "2", "2016-05-04", 29.00),
("Bob", "2", "2016-05-06", 27.00)
).toDF("name", "group", "date", "amountSpent")
customers.show+-----+-----+----------+-----------+
| name|group| date|amountSpent|
+-----+-----+----------+-----------+
|Alice| 1|2016-05-01| 50.0|
|Alice| 1|2016-05-03| 45.0|
|Alice| 2|2016-05-04| 55.0|
| Bob| 2|2016-05-01| 25.0|
| Bob| 2|2016-05-04| 29.0|
| Bob| 2|2016-05-06| 27.0|
+-----+-----+----------+-----------+
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._val ws1 = Window.partitionBy("name").orderBy("date")
val ws2 = Window.partitionBy("group").orderBy("date")customers.
withColumn("i", row_number().over(ws1)).
withColumn("j", row_number().over(ws2)).
show()+-----+-----+----------+-----------+---+---+
| name|group| date|amountSpent| i| j|
+-----+-----+----------+-----------+---+---+
|Alice| 1|2016-05-01| 50.0| 1| 1|
|Alice| 1|2016-05-03| 45.0| 2| 2|
| Bob| 2|2016-05-01| 25.0| 1| 1|
| Bob| 2|2016-05-04| 29.0| 2| 2|
|Alice| 2|2016-05-04| 55.0| 3| 3|
| Bob| 2|2016-05-06| 27.0| 3| 4|
+-----+-----+----------+-----------+---+---+