Ben Chuanlong Du's Blog

It is never too late to learn.

Window Functions in Spark

Window with orderBy

It is tricky!!!

If you provide ORDER BY clause then the default frame is RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:

https://stackoverflow.com/questions/52273186/pyspark-spark-window-function-first-last-issue

  1. Avoid using last and use first with descending order by instead. This gives less surprisings.

  2. Do NOT use order by if not necessary. It introduces unnecessary ...

In [2]:
import findspark

findspark.init("/opt/spark")

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import *
from pyspark.sql.types import StructType

spark = SparkSession.builder.appName("PySpark").enableHiveSupport().getOrCreate()
In [6]:
from pyspark.sql import Window
In [7]:
import pandas as pd
In [14]:
cust_p = pd.DataFrame(
    data=[
        ("Alice", "2016-05-01", 50.00, 1),
        ("Alice", "2016-05-01", 45.00, 2),
        ("Alice", "2016-05-02", 55.00, 3),
        ("Alice", "2016-05-02", 100.00, 4),
        ("Bob", "2016-05-01", 25.00, 5),
        ("Bob", "2016-05-01", 29.00, 6),
        ("Bob", "2016-05-02", 27.00, 7),
        ("Bob", "2016-05-02", 30.00, 8),
    ],
    columns=("name", "date", "amount", "id"),
)
cust_p
Out[14]:
name date amount id
0 Alice 2016-05-01 50.0 1
1 Alice 2016-05-01 45.0 2
2 Alice 2016-05-02 55.0 3
3 Alice 2016-05-02 100.0 4
4 Bob 2016-05-01 25.0 5
5 Bob 2016-05-01 29.0 6
6 Bob 2016-05-02 27.0 7
7 Bob 2016-05-02 30.0 8
In [16]:
cust = spark.createDataFrame(cust_p)
cust.show()
+-----+----------+------+---+
| name|      date|amount| id|
+-----+----------+------+---+
|Alice|2016-05-01|  50.0|  1|
|Alice|2016-05-01|  45.0|  2|
|Alice|2016-05-02|  55.0|  3|
|Alice|2016-05-02| 100.0|  4|
|  Bob|2016-05-01|  25.0|  5|
|  Bob|2016-05-01|  29.0|  6|
|  Bob|2016-05-02|  27.0|  7|
|  Bob|2016-05-02|  30.0|  8|
+-----+----------+------+---+

In [18]:
cust.orderBy("name", "date").show()
+-----+----------+------+---+
| name|      date|amount| id|
+-----+----------+------+---+
|Alice|2016-05-01|  50.0|  1|
|Alice|2016-05-01|  45.0|  2|
|Alice|2016-05-02|  55.0|  3|
|Alice|2016-05-02| 100.0|  4|
|  Bob|2016-05-01|  25.0|  5|
|  Bob|2016-05-01|  29.0|  6|
|  Bob|2016-05-02|  27.0|  7|
|  Bob|2016-05-02|  30.0|  8|
+-----+----------+------+---+

Create a temp view for testing Spark SQL (to compare with the result of PySpark DataFrame API).

In [19]:
cust.createOrReplaceTempView("customers")

max

max works well on over ... partition ... when order by is not used.

In [22]:
cust.select(
    col("name"),
    col("date"),
    col("amount"),
    col("id"),
    max(col("amount")).over(Window.partitionBy("name", "date")).alias("max_amount"),
).orderBy("name", "date").show()
+-----+----------+------+---+----------+
| name|      date|amount| id|max_amount|
+-----+----------+------+---+----------+
|Alice|2016-05-01|  50.0|  1|      50.0|
|Alice|2016-05-01|  45.0|  2|      50.0|
|Alice|2016-05-02|  55.0|  3|     100.0|
|Alice|2016-05-02| 100.0|  4|     100.0|
|  Bob|2016-05-01|  25.0|  5|      29.0|
|  Bob|2016-05-01|  29.0|  6|      29.0|
|  Bob|2016-05-02|  27.0|  7|      30.0|
|  Bob|2016-05-02|  30.0|  8|      30.0|
+-----+----------+------+---+----------+

In [23]:
spark.sql(
    """
    select
        name,
        date,
        amount,
        id,
        max(amount) over (partition by name, date) as max_amount
    from
        customers
    """
).orderBy("name", "date").show()
+-----+----------+------+---+----------+
| name|      date|amount| id|max_amount|
+-----+----------+------+---+----------+
|Alice|2016-05-01|  50.0|  1|      50.0|
|Alice|2016-05-01|  45.0|  2|      50.0|
|Alice|2016-05-02|  55.0|  3|     100.0|
|Alice|2016-05-02| 100.0|  4|     100.0|
|  Bob|2016-05-01|  25.0|  5|      29.0|
|  Bob|2016-05-01|  29.0|  6|      29.0|
|  Bob|2016-05-02|  27.0|  7|      30.0|
|  Bob|2016-05-02|  30.0|  8|      30.0|
+-----+----------+------+---+----------+

The following results might surprise people. There is nothing wrong in code. It is only that when order by is used, the default frame for window functions (max in this case) is unbounded preceding and the current row.

In [24]:
cust.select(
    col("name"),
    col("date"),
    col("amount"),
    col("id"),
    max(col("amount"))
    .over(Window.partitionBy("name", "date").orderBy("id"))
    .alias("max_amount"),
).orderBy("name", "date").show()
+-----+----------+------+---+----------+
| name|      date|amount| id|max_amount|
+-----+----------+------+---+----------+
|Alice|2016-05-01|  50.0|  1|      50.0|
|Alice|2016-05-01|  45.0|  2|      50.0|
|Alice|2016-05-02|  55.0|  3|      55.0|
|Alice|2016-05-02| 100.0|  4|     100.0|
|  Bob|2016-05-01|  25.0|  5|      25.0|
|  Bob|2016-05-01|  29.0|  6|      29.0|
|  Bob|2016-05-02|  27.0|  7|      27.0|
|  Bob|2016-05-02|  30.0|  8|      30.0|
+-----+----------+------+---+----------+

In [25]:
spark.sql(
    """
    select
        name,
        date,
        amount,
        id,
        max(amount) over (partition by name, date order by id) as max_amount
    from
        customers
    """
).orderBy("name", "date").show()
+-----+----------+------+---+----------+
| name|      date|amount| id|max_amount|
+-----+----------+------+---+----------+
|Alice|2016-05-01|  50.0|  1|      50.0|
|Alice|2016-05-01|  45.0|  2|      50.0|
|Alice|2016-05-02|  55.0|  3|      55.0|
|Alice|2016-05-02| 100.0|  4|     100.0|
|  Bob|2016-05-01|  25.0|  5|      25.0|
|  Bob|2016-05-01|  29.0|  6|      29.0|
|  Bob|2016-05-02|  27.0|  7|      27.0|
|  Bob|2016-05-02|  30.0|  8|      30.0|
+-----+----------+------+---+----------+

rank

The window function rank requires order by to be used.

In [27]:
cust.select(
    col("name"),
    col("date"),
    col("amount"),
    col("id"),
    rank().over(Window.partitionBy("name", "date").orderBy("amount")).alias("rank"),
).orderBy("name", "date").show()
+-----+----------+------+---+----+
| name|      date|amount| id|rank|
+-----+----------+------+---+----+
|Alice|2016-05-01|  45.0|  2|   1|
|Alice|2016-05-01|  50.0|  1|   2|
|Alice|2016-05-02|  55.0|  3|   1|
|Alice|2016-05-02| 100.0|  4|   2|
|  Bob|2016-05-01|  25.0|  5|   1|
|  Bob|2016-05-01|  29.0|  6|   2|
|  Bob|2016-05-02|  27.0|  7|   1|
|  Bob|2016-05-02|  30.0|  8|   2|
+-----+----------+------+---+----+

In [29]:
spark.sql(
    """
    select
        name,
        date,
        amount,
        id,
        rank() over (partition by name, date ORDER BY amount DESC) as rank
    from
        customers
    """
).orderBy("name", "date").show()
+-----+----------+------+---+----+
| name|      date|amount| id|rank|
+-----+----------+------+---+----+
|Alice|2016-05-01|  50.0|  1|   1|
|Alice|2016-05-01|  45.0|  2|   2|
|Alice|2016-05-02| 100.0|  4|   1|
|Alice|2016-05-02|  55.0|  3|   2|
|  Bob|2016-05-01|  29.0|  6|   1|
|  Bob|2016-05-01|  25.0|  5|   2|
|  Bob|2016-05-02|  30.0|  8|   1|
|  Bob|2016-05-02|  27.0|  7|   2|
+-----+----------+------+---+----+

In [30]:
cust.select(
    col("name"),
    col("date"),
    col("amount"),
    col("id"),
    rank().over(Window.partitionBy("name").orderBy("date")).alias("rank"),
).orderBy("name").show()
+-----+----------+------+---+----+
| name|      date|amount| id|rank|
+-----+----------+------+---+----+
|Alice|2016-05-01|  50.0|  1|   1|
|Alice|2016-05-01|  45.0|  2|   1|
|Alice|2016-05-02|  55.0|  3|   3|
|Alice|2016-05-02| 100.0|  4|   3|
|  Bob|2016-05-01|  25.0|  5|   1|
|  Bob|2016-05-02|  30.0|  8|   3|
|  Bob|2016-05-01|  29.0|  6|   1|
|  Bob|2016-05-02|  27.0|  7|   3|
+-----+----------+------+---+----+

In [31]:
spark.sql(
    """
    select
        name,
        date,
        amount,
        id,
        rank() over (partition by name ORDER BY date DESC) as rank
    from
        customers
    """
).orderBy("name").show()
+-----+----------+------+---+----+
| name|      date|amount| id|rank|
+-----+----------+------+---+----+
|Alice|2016-05-02|  55.0|  3|   1|
|Alice|2016-05-02| 100.0|  4|   1|
|Alice|2016-05-01|  50.0|  1|   3|
|Alice|2016-05-01|  45.0|  2|   3|
|  Bob|2016-05-02|  27.0|  7|   1|
|  Bob|2016-05-02|  30.0|  8|   1|
|  Bob|2016-05-01|  25.0|  5|   3|
|  Bob|2016-05-01|  29.0|  6|   3|
+-----+----------+------+---+----+

dense_rank

In [32]:
cust.select(
    col("name"),
    col("date"),
    col("amount"),
    col("id"),
    dense_rank().over(Window.partitionBy("name").orderBy("date")).alias("dense_rank"),
).orderBy("name").show()
+-----+----------+------+---+----------+
| name|      date|amount| id|dense_rank|
+-----+----------+------+---+----------+
|Alice|2016-05-01|  50.0|  1|         1|
|Alice|2016-05-01|  45.0|  2|         1|
|Alice|2016-05-02|  55.0|  3|         2|
|Alice|2016-05-02| 100.0|  4|         2|
|  Bob|2016-05-01|  25.0|  5|         1|
|  Bob|2016-05-01|  29.0|  6|         1|
|  Bob|2016-05-02|  27.0|  7|         2|
|  Bob|2016-05-02|  30.0|  8|         2|
+-----+----------+------+---+----------+

In [33]:
spark.sql(
    """
    select
        name,
        date,
        amount,
        id,
        dense_rank() over (partition by name ORDER BY date DESC) as rank
    from
        customers
    """
).orderBy("name").show()
+-----+----------+------+---+----+
| name|      date|amount| id|rank|
+-----+----------+------+---+----+
|Alice|2016-05-02|  55.0|  3|   1|
|Alice|2016-05-02| 100.0|  4|   1|
|Alice|2016-05-01|  50.0|  1|   2|
|Alice|2016-05-01|  45.0|  2|   2|
|  Bob|2016-05-02|  27.0|  7|   1|
|  Bob|2016-05-02|  30.0|  8|   1|
|  Bob|2016-05-01|  25.0|  5|   2|
|  Bob|2016-05-01|  29.0|  6|   2|
+-----+----------+------+---+----+

first

In [48]:
customers.select(
    $"name",
    $"date",
    $"amount",
    $"id",
    first($"amount").over(Window.partitionBy("name", "date").orderBy("id")).alias("first_amount")
).orderBy("name", "date").show
+-----+----------+------+---+------------+
| name|      date|amount| id|first_amount|
+-----+----------+------+---+------------+
|Alice|2016-05-01|  50.0|  1|        50.0|
|Alice|2016-05-01|  45.0|  2|        50.0|
|Alice|2016-05-02|  55.0|  3|        55.0|
|Alice|2016-05-02| 100.0|  4|        55.0|
|  Bob|2016-05-01|  25.0|  5|        25.0|
|  Bob|2016-05-01|  29.0|  6|        25.0|
|  Bob|2016-05-02|  27.0|  7|        27.0|
|  Bob|2016-05-02|  30.0|  8|        27.0|
+-----+----------+------+---+------------+

In [49]:
spark.sql(
    """
    select
        name,
        date,
        amount,
        id,
        first(amount) over (partition by name, date order by id) as first_amount
    from
        customers
    """
).orderBy("name", "date").show
+-----+----------+------+---+------------+
| name|      date|amount| id|first_amount|
+-----+----------+------+---+------------+
|Alice|2016-05-01|  50.0|  1|        50.0|
|Alice|2016-05-01|  45.0|  2|        50.0|
|Alice|2016-05-02|  55.0|  3|        55.0|
|Alice|2016-05-02| 100.0|  4|        55.0|
|  Bob|2016-05-01|  25.0|  5|        25.0|
|  Bob|2016-05-01|  29.0|  6|        25.0|
|  Bob|2016-05-02|  27.0|  7|        27.0|
|  Bob|2016-05-02|  30.0|  8|        27.0|
+-----+----------+------+---+------------+

In [64]:
customers.select(
    $"name",
    $"date",
    $"amount",
    $"id",
    last($"amount").over(Window.partitionBy("name", "date").orderBy("id")).alias("last_amount")
).orderBy("name", "date").show
+-----+----------+------+---+-----------+
| name|      date|amount| id|last_amount|
+-----+----------+------+---+-----------+
|Alice|2016-05-01|  50.0|  1|       50.0|
|Alice|2016-05-01|  45.0|  2|       45.0|
|Alice|2016-05-02| 100.0|  4|      100.0|
|Alice|2016-05-02|  55.0|  3|       55.0|
|  Bob|2016-05-01|  25.0|  5|       25.0|
|  Bob|2016-05-01|  29.0|  6|       29.0|
|  Bob|2016-05-02|  27.0|  7|       27.0|
|  Bob|2016-05-02|  30.0|  8|       30.0|
+-----+----------+------+---+-----------+

In [54]:
spark.sql(
    """
    select
        name,
        date,
        amount,
        id,
        last(amount) over (partition by name, date order by id) as last_amount
    from
        customers
    """
).orderBy("name", "date").show
+-----+----------+------+---+-----------+
| name|      date|amount| id|last_amount|
+-----+----------+------+---+-----------+
|Alice|2016-05-01|  50.0|  1|       50.0|
|Alice|2016-05-01|  45.0|  2|       45.0|
|Alice|2016-05-02|  55.0|  3|       55.0|
|Alice|2016-05-02| 100.0|  4|      100.0|
|  Bob|2016-05-01|  25.0|  5|       25.0|
|  Bob|2016-05-01|  29.0|  6|       29.0|
|  Bob|2016-05-02|  27.0|  7|       27.0|
|  Bob|2016-05-02|  30.0|  8|       30.0|
+-----+----------+------+---+-----------+

In [58]:
customers.select(
    $"name",
    $"date",
    $"amount",
    $"id",
    first($"amount").over(Window.partitionBy("name", "date").orderBy($"id".desc)).alias("last_amount")
).orderBy("name", "date").show
+-----+----------+------+---+-----------+
| name|      date|amount| id|last_amount|
+-----+----------+------+---+-----------+
|Alice|2016-05-01|  45.0|  2|       45.0|
|Alice|2016-05-01|  50.0|  1|       45.0|
|Alice|2016-05-02| 100.0|  4|      100.0|
|Alice|2016-05-02|  55.0|  3|      100.0|
|  Bob|2016-05-01|  29.0|  6|       29.0|
|  Bob|2016-05-01|  25.0|  5|       29.0|
|  Bob|2016-05-02|  30.0|  8|       30.0|
|  Bob|2016-05-02|  27.0|  7|       30.0|
+-----+----------+------+---+-----------+

In [67]:
spark.sql(
    """
    select
        name,
        date,
        amount,
        id,
        first(amount) over (partition by name, date order by id desc) as last_amount
    from
        customers
    """
).orderBy("name", "date").show
+-----+----------+------+---+-----------+
| name|      date|amount| id|last_amount|
+-----+----------+------+---+-----------+
|Alice|2016-05-01|  45.0|  2|       45.0|
|Alice|2016-05-01|  50.0|  1|       45.0|
|Alice|2016-05-02| 100.0|  4|      100.0|
|Alice|2016-05-02|  55.0|  3|      100.0|
|  Bob|2016-05-01|  29.0|  6|       29.0|
|  Bob|2016-05-01|  25.0|  5|       29.0|
|  Bob|2016-05-02|  30.0|  8|       30.0|
|  Bob|2016-05-02|  27.0|  7|       30.0|
+-----+----------+------+---+-----------+

partition by with group by

Avoid doing so!!!

In [7]:
spark.sql(
    """
    select
        name,
        date,
        first(amount) over (partition by name, date order by id desc) as last_amount
    from
        customers
    group by
        name, date
    """
).orderBy("name", "date").show
org.apache.spark.sql.AnalysisException: expression 'customers.`amount`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;;
Project [name#9, date#10, last_amount#30]
+- Project [name#9, date#10, amount#11, id#12, last_amount#30, last_amount#30]
   +- Window [first(amount#11, false) windowspecdefinition(name#9, date#10, id#12 DESC NULLS LAST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS last_amount#30], [name#9, date#10], [id#12 DESC NULLS LAST]
      +- Aggregate [name#9, date#10], [name#9, date#10, amount#11, id#12]
         +- SubqueryAlias `customers`
            +- Project [_1#4 AS name#9, _2#5 AS date#10, _3#6 AS amount#11, _4#7 AS id#12]
               +- LocalRelation [_1#4, _2#5, _3#6, _4#7]

  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:42)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:95)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.org$apache$spark$sql$catalyst$analysis$CheckAnalysis$class$$anonfun$$checkValidAggregateExpression$1(CheckAnalysis.scala:224)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$10.apply(CheckAnalysis.scala:257)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$10.apply(CheckAnalysis.scala:257)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:257)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:85)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:85)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:95)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:108)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:105)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:201)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)
  at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
  at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
  at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:78)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
  ... 48 elided
In [9]:
spark.sql(
    """
    select
        name,
        date,
        first(max(amount)) over (partition by name, date order by id desc) as last_amount
    from
        customers
    group by
        name, date
    """
).orderBy("name", "date").show
org.apache.spark.sql.AnalysisException: expression 'customers.`id`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;;
Project [name#9, date#10, last_amount#36]
+- Project [name#9, date#10, _w0#39, id#12, last_amount#36, last_amount#36]
   +- Window [first(_w0#39, false) windowspecdefinition(name#9, date#10, id#12 DESC NULLS LAST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS last_amount#36], [name#9, date#10], [id#12 DESC NULLS LAST]
      +- Aggregate [name#9, date#10], [name#9, date#10, max(amount#11) AS _w0#39, id#12]
         +- SubqueryAlias `customers`
            +- Project [_1#4 AS name#9, _2#5 AS date#10, _3#6 AS amount#11, _4#7 AS id#12]
               +- LocalRelation [_1#4, _2#5, _3#6, _4#7]

  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:42)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:95)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.org$apache$spark$sql$catalyst$analysis$CheckAnalysis$class$$anonfun$$checkValidAggregateExpression$1(CheckAnalysis.scala:224)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$10.apply(CheckAnalysis.scala:257)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$10.apply(CheckAnalysis.scala:257)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:257)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:85)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:85)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:95)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:108)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:105)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:201)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)
  at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
  at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
  at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:78)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
  ... 48 elided
In [10]:
customers.orderBy("name", "date").show
+-----+----------+------+---+
| name|      date|amount| id|
+-----+----------+------+---+
|Alice|2016-05-01|  50.0|  1|
|Alice|2016-05-01|  45.0|  2|
|Alice|2016-05-02|  55.0|  3|
|Alice|2016-05-02| 100.0|  4|
|  Bob|2016-05-01|  29.0|  6|
|  Bob|2016-05-01|  25.0|  5|
|  Bob|2016-05-02|  27.0|  7|
|  Bob|2016-05-02|  30.0|  8|
+-----+----------+------+---+

In [12]:
spark.sql(
    """
    select
        name,
        date,
        amount,
        id,
        row_number() over (partition by name, date order by id desc) as rownum
    from
        customers
    """
).orderBy("name", "date").show
+-----+----------+------+---+------+
| name|      date|amount| id|rownum|
+-----+----------+------+---+------+
|Alice|2016-05-01|  45.0|  2|     1|
|Alice|2016-05-01|  50.0|  1|     2|
|Alice|2016-05-02| 100.0|  4|     1|
|Alice|2016-05-02|  55.0|  3|     2|
|  Bob|2016-05-01|  29.0|  6|     1|
|  Bob|2016-05-01|  25.0|  5|     2|
|  Bob|2016-05-02|  30.0|  8|     1|
|  Bob|2016-05-02|  27.0|  7|     2|
+-----+----------+------+---+------+

In [13]:
spark.sql(
    """
    select 
        *
    from (
        select
            name,
            date,
            amount,
            id,
            row_number() over (partition by name, date order by id desc) as rownum
        from
            customers
        ) A
    where 
        rownum = 1
    """
).orderBy("name", "date").show
+-----+----------+------+---+------+
| name|      date|amount| id|rownum|
+-----+----------+------+---+------+
|Alice|2016-05-01|  45.0|  2|     1|
|Alice|2016-05-02| 100.0|  4|     1|
|  Bob|2016-05-01|  29.0|  6|     1|
|  Bob|2016-05-02|  30.0|  8|     1|
+-----+----------+------+---+------+

Define a window parition. It does not have to be associated with a table.

In [13]:
Window.partitionBy("col1", "col2")
Out[13]:
org.apache.spark.sql.expressions.WindowSpec@4f336bdc
In [12]:
val customers = Seq(
    ("Alice", "2016-05-01", 50.00, 1),
    ("Alice", "2016-05-01", 45.00, 2),
    ("Alice", "2016-05-02", 55.00, 3),
    ("Alice", "2016-05-02", 100.00, 4),
    ("Bob", "2016-05-01", 25.00, 5),
    ("Bob", "2016-05-01", 29.00, 6),
    ("Bob", "2016-05-02", 27.00,7 ),
    ("Bob", "2016-05-02", 30.00, 8)
).toDF("name", "date", "amount", "id")
customers.show
+-----+----------+------+---+
| name|      date|amount| id|
+-----+----------+------+---+
|Alice|2016-05-01|  50.0|  1|
|Alice|2016-05-01|  45.0|  2|
|Alice|2016-05-02|  55.0|  3|
|Alice|2016-05-02| 100.0|  4|
|  Bob|2016-05-01|  25.0|  5|
|  Bob|2016-05-01|  29.0|  6|
|  Bob|2016-05-02|  27.0|  7|
|  Bob|2016-05-02|  30.0|  8|
+-----+----------+------+---+

Out[12]:
null
In [16]:
val winSpec = Window.partitionBy("name", "date")
customers.select(
    $"name",
    $"date",
    $"amount",
    $"id",
    avg($"amount").over(winSpec).alias("avg_amt"),
    max($"id").over(winSpec).alias("max_id")
).show
+-----+----------+------+---+-------+------+
| name|      date|amount| id|avg_amt|max_id|
+-----+----------+------+---+-------+------+
|  Bob|2016-05-01|  25.0|  5|   27.0|     6|
|  Bob|2016-05-01|  29.0|  6|   27.0|     6|
|  Bob|2016-05-02|  27.0|  7|   28.5|     8|
|  Bob|2016-05-02|  30.0|  8|   28.5|     8|
|Alice|2016-05-02|  55.0|  3|   77.5|     4|
|Alice|2016-05-02| 100.0|  4|   77.5|     4|
|Alice|2016-05-01|  50.0|  1|   47.5|     2|
|Alice|2016-05-01|  45.0|  2|   47.5|     2|
+-----+----------+------+---+-------+------+

Out[16]:
null
In [17]:
val winSpec = Window.partitionBy("name", "date")
customers.select(
    $"name",
    $"date",
    $"amount",
    $"id",
    (avg($"amount").over(winSpec) + max($"id").over(winSpec) * 100).alias("new_column")
).show
+-----+----------+------+---+----------+
| name|      date|amount| id|new_column|
+-----+----------+------+---+----------+
|  Bob|2016-05-01|  25.0|  5|     627.0|
|  Bob|2016-05-01|  29.0|  6|     627.0|
|  Bob|2016-05-02|  27.0|  7|     828.5|
|  Bob|2016-05-02|  30.0|  8|     828.5|
|Alice|2016-05-02|  55.0|  3|     477.5|
|Alice|2016-05-02| 100.0|  4|     477.5|
|Alice|2016-05-01|  50.0|  1|     247.5|
|Alice|2016-05-01|  45.0|  2|     247.5|
+-----+----------+------+---+----------+

Out[17]:
null
In [9]:
customers.withColumn("avg", 
    avg($"amount").over(Window.partitionBy("name", "date"))
).show()
+-----+----------+------+----+
| name|      date|amount| avg|
+-----+----------+------+----+
|  Bob|2016-05-01|  25.0|27.0|
|  Bob|2016-05-01|  29.0|27.0|
|  Bob|2016-05-02|  27.0|28.5|
|  Bob|2016-05-02|  30.0|28.5|
|Alice|2016-05-02|  55.0|77.5|
|Alice|2016-05-02| 100.0|77.5|
|Alice|2016-05-01|  50.0|47.5|
|Alice|2016-05-01|  45.0|47.5|
+-----+----------+------+----+

In [3]:
val customers = Seq(
    ("Alice", "2016-05-01", 50.00),
    ("Alice", "2016-05-03", 45.00),
    ("Alice", "2016-05-04", 55.00),
    ("Bob", "2016-05-01", 25.00),
    ("Bob", "2016-05-04", 29.00),
    ("Bob", "2016-05-06", 27.00)
).toDF("name", "date", "amountSpent")
customers.show
+-----+----------+-----------+
| name|      date|amountSpent|
+-----+----------+-----------+
|Alice|2016-05-01|       50.0|
|Alice|2016-05-03|       45.0|
|Alice|2016-05-04|       55.0|
|  Bob|2016-05-01|       25.0|
|  Bob|2016-05-04|       29.0|
|  Bob|2016-05-06|       27.0|
+-----+----------+-----------+

Out[3]:
null

Moving Average

In [8]:
val wSpec1 = Window.partitionBy("name").orderBy("date").rowsBetween(-1, 1)
In [10]:
customers.withColumn("movingAvg", avg(customers("amountSpent")).over(wSpec1)).show()
+-----+----------+-----------+---------+
| name|      date|amountSpent|movingAvg|
+-----+----------+-----------+---------+
|  Bob|2016-05-01|       25.0|     27.0|
|  Bob|2016-05-04|       29.0|     27.0|
|  Bob|2016-05-06|       27.0|     28.0|
|Alice|2016-05-01|       50.0|     47.5|
|Alice|2016-05-03|       45.0|     50.0|
|Alice|2016-05-04|       55.0|     50.0|
+-----+----------+-----------+---------+

Cumulative Sum

In [11]:
val wSpec2 = Window.partitionBy("name").orderBy("date").rowsBetween(Long.MinValue, 0)
In [12]:
customers.withColumn("cumSum", sum(customers("amountSpent")).over(wSpec2)).show()
+-----+----------+-----------+------+
| name|      date|amountSpent|cumSum|
+-----+----------+-----------+------+
|  Bob|2016-05-01|       25.0|  25.0|
|  Bob|2016-05-04|       29.0|  54.0|
|  Bob|2016-05-06|       27.0|  81.0|
|Alice|2016-05-01|       50.0|  50.0|
|Alice|2016-05-03|       45.0|  95.0|
|Alice|2016-05-04|       55.0| 150.0|
+-----+----------+-----------+------+

Data from previous row

In [13]:
val wSpec3 = Window.partitionBy("name").orderBy("date")
In [14]:
customers.withColumn(
    "prevAmountSpent", lag(customers("amountSpent"), 1).over(wSpec3)
).show()
+-----+----------+-----------+---------------+
| name|      date|amountSpent|prevAmountSpent|
+-----+----------+-----------+---------------+
|  Bob|2016-05-01|       25.0|           null|
|  Bob|2016-05-04|       29.0|           25.0|
|  Bob|2016-05-06|       27.0|           29.0|
|Alice|2016-05-01|       50.0|           null|
|Alice|2016-05-03|       45.0|           50.0|
|Alice|2016-05-04|       55.0|           45.0|
+-----+----------+-----------+---------------+

row_number

In [24]:
val wSpec3 = Window.partitionBy("name").orderBy("date")
customers.withColumn("row_num", 
    row_number().over(wSpec3) 
).show()
+-----+----------+-----------+-------+
| name|      date|amountSpent|row_num|
+-----+----------+-----------+-------+
|  Bob|2016-05-01|       25.0|      1|
|  Bob|2016-05-04|       29.0|      2|
|  Bob|2016-05-06|       27.0|      3|
|Alice|2016-05-01|       50.0|      1|
|Alice|2016-05-03|       45.0|      2|
|Alice|2016-05-04|       55.0|      3|
+-----+----------+-----------+-------+

percentRank

In [ ]:
 

ntile

In [ ]:
 

first

In [ ]:
 

last

In [ ]:
 

lag

In [ ]:
 

lead

In [ ]:
 

cume_dist

In [ ]:
 
In [16]:
val customers = Seq(
    ("Alice", "2016-05-01", 50.00, 1),
    ("Alice", "2016-05-01", 45.00, 2),
    ("Alice", "2016-05-02", 55.00, 3),
    ("Alice", "2016-05-02", 100.00, 4),
    ("Bob", "2016-05-01", 25.00, 5),
    ("Bob", "2016-05-01", 29.00, 6),
    ("Bob", "2016-05-02", 27.00,7 ),
    ("Bob", "2016-05-02", 30.00, 8)
).toDF("name", "date", "amount", "id")
customers.orderBy("name", "date").show
+-----+----------+------+---+
| name|      date|amount| id|
+-----+----------+------+---+
|Alice|2016-05-01|  50.0|  1|
|Alice|2016-05-01|  45.0|  2|
|Alice|2016-05-02|  55.0|  3|
|Alice|2016-05-02| 100.0|  4|
|  Bob|2016-05-01|  29.0|  6|
|  Bob|2016-05-01|  25.0|  5|
|  Bob|2016-05-02|  27.0|  7|
|  Bob|2016-05-02|  30.0|  8|
+-----+----------+------+---+

Out[16]:
null
In [27]:
customers.createOrReplaceTempView("customers")

Comment

Do NOT use orderBy if the order does not matter!!!

In [17]:
val wSpec = Window.partitionBy("name", "date").orderBy("id")
Out[17]:
org.apache.spark.sql.expressions.WindowSpec@4f418d5a
In [19]:
customers.select(
    $"name",
    $"date",
    $"amount",
    $"id",
    max($"amount").over(Window.partitionBy("name", "date")).alias("max_amount")
).orderBy("name", "date").show
+-----+----------+------+---+----------+
| name|      date|amount| id|max_amount|
+-----+----------+------+---+----------+
|Alice|2016-05-01|  50.0|  1|      50.0|
|Alice|2016-05-01|  45.0|  2|      50.0|
|Alice|2016-05-02|  55.0|  3|     100.0|
|Alice|2016-05-02| 100.0|  4|     100.0|
|  Bob|2016-05-01|  25.0|  5|      29.0|
|  Bob|2016-05-01|  29.0|  6|      29.0|
|  Bob|2016-05-02|  27.0|  7|      30.0|
|  Bob|2016-05-02|  30.0|  8|      30.0|
+-----+----------+------+---+----------+

In [30]:
spark.sql(
    """
    select
        name,
        date,
        amount,
        id,
        max(amount) over (partition by name, date) as max_amount
    from
        customers
    """
).orderBy("name", "date").show
+-----+----------+------+---+----------+
| name|      date|amount| id|max_amount|
+-----+----------+------+---+----------+
|Alice|2016-05-01|  50.0|  1|      50.0|
|Alice|2016-05-01|  45.0|  2|      50.0|
|Alice|2016-05-02|  55.0|  3|     100.0|
|Alice|2016-05-02| 100.0|  4|     100.0|
|  Bob|2016-05-01|  25.0|  5|      29.0|
|  Bob|2016-05-01|  29.0|  6|      29.0|
|  Bob|2016-05-02|  27.0|  7|      30.0|
|  Bob|2016-05-02|  30.0|  8|      30.0|
+-----+----------+------+---+----------+

In [31]:
customers.select(
    $"name",
    $"date",
    $"amount",
    $"id",
    max($"amount").over(Window.partitionBy("name", "date").orderBy("id")).alias("max_amount")
).orderBy("name", "date").show
+-----+----------+------+---+----------+
| name|      date|amount| id|max_amount|
+-----+----------+------+---+----------+
|Alice|2016-05-01|  50.0|  1|      50.0|
|Alice|2016-05-01|  45.0|  2|      50.0|
|Alice|2016-05-02|  55.0|  3|      55.0|
|Alice|2016-05-02| 100.0|  4|     100.0|
|  Bob|2016-05-01|  25.0|  5|      25.0|
|  Bob|2016-05-01|  29.0|  6|      29.0|
|  Bob|2016-05-02|  27.0|  7|      27.0|
|  Bob|2016-05-02|  30.0|  8|      30.0|
+-----+----------+------+---+----------+

In [32]:
spark.sql(
    """
    select
        name,
        date,
        amount,
        id,
        max(amount) over (partition by name, date order by id) as max_amount
    from
        customers
    """
).orderBy("name", "date").show
+-----+----------+------+---+----------+
| name|      date|amount| id|max_amount|
+-----+----------+------+---+----------+
|Alice|2016-05-01|  50.0|  1|      50.0|
|Alice|2016-05-01|  45.0|  2|      50.0|
|Alice|2016-05-02|  55.0|  3|      55.0|
|Alice|2016-05-02| 100.0|  4|     100.0|
|  Bob|2016-05-01|  25.0|  5|      25.0|
|  Bob|2016-05-01|  29.0|  6|      29.0|
|  Bob|2016-05-02|  27.0|  7|      27.0|
|  Bob|2016-05-02|  30.0|  8|      30.0|
+-----+----------+------+---+----------+

In [33]:
customers.select(
    $"name",
    $"date",
    $"amount",
    $"id",
    first($"amount").over(Window.partitionBy("name", "date").orderBy("id")).alias("first_amount")
).orderBy("name", "date").show
+-----+----------+------+---+------------+
| name|      date|amount| id|first_amount|
+-----+----------+------+---+------------+
|Alice|2016-05-01|  50.0|  1|        50.0|
|Alice|2016-05-01|  45.0|  2|        50.0|
|Alice|2016-05-02|  55.0|  3|        55.0|
|Alice|2016-05-02| 100.0|  4|        55.0|
|  Bob|2016-05-01|  25.0|  5|        25.0|
|  Bob|2016-05-01|  29.0|  6|        25.0|
|  Bob|2016-05-02|  27.0|  7|        27.0|
|  Bob|2016-05-02|  30.0|  8|        27.0|
+-----+----------+------+---+------------+

In [34]:
spark.sql(
    """
    select
        name,
        date,
        amount,
        id,
        first(amount) over (partition by name, date order by id) as first_amount
    from
        customers
    """
).orderBy("name", "date").show
+-----+----------+------+---+------------+
| name|      date|amount| id|first_amount|
+-----+----------+------+---+------------+
|Alice|2016-05-01|  50.0|  1|        50.0|
|Alice|2016-05-01|  45.0|  2|        50.0|
|Alice|2016-05-02|  55.0|  3|        55.0|
|Alice|2016-05-02| 100.0|  4|        55.0|
|  Bob|2016-05-01|  25.0|  5|        25.0|
|  Bob|2016-05-01|  29.0|  6|        25.0|
|  Bob|2016-05-02|  27.0|  7|        27.0|
|  Bob|2016-05-02|  30.0|  8|        27.0|
+-----+----------+------+---+------------+

In [35]:
customers.select(
    $"name",
    $"date",
    $"amount",
    $"id",
    last($"amount").over(Window.partitionBy("name", "date").orderBy("id")).alias("last_amount")
).orderBy("name", "date").show
+-----+----------+------+---+-----------+
| name|      date|amount| id|last_amount|
+-----+----------+------+---+-----------+
|Alice|2016-05-01|  50.0|  1|       50.0|
|Alice|2016-05-01|  45.0|  2|       45.0|
|Alice|2016-05-02|  55.0|  3|       55.0|
|Alice|2016-05-02| 100.0|  4|      100.0|
|  Bob|2016-05-01|  25.0|  5|       25.0|
|  Bob|2016-05-01|  29.0|  6|       29.0|
|  Bob|2016-05-02|  27.0|  7|       27.0|
|  Bob|2016-05-02|  30.0|  8|       30.0|
+-----+----------+------+---+-----------+

In [36]:
spark.sql(
    """
    select
        name,
        date,
        amount,
        id,
        last(amount) over (partition by name, date order by id) as first_amount
    from
        customers
    """
).orderBy("name", "date").show
+-----+----------+------+---+------------+
| name|      date|amount| id|first_amount|
+-----+----------+------+---+------------+
|Alice|2016-05-01|  50.0|  1|        50.0|
|Alice|2016-05-01|  45.0|  2|        45.0|
|Alice|2016-05-02|  55.0|  3|        55.0|
|Alice|2016-05-02| 100.0|  4|       100.0|
|  Bob|2016-05-01|  25.0|  5|        25.0|
|  Bob|2016-05-01|  29.0|  6|        29.0|
|  Bob|2016-05-02|  27.0|  7|        27.0|
|  Bob|2016-05-02|  30.0|  8|        30.0|
+-----+----------+------+---+------------+

In [37]:
spark.sql(
    """
    select
        name,
        date,
        amount,
        id,
        first(amount) over (partition by name, date order by id desc) as first_amount
    from
        customers
    """
).orderBy("name", "date").show
+-----+----------+------+---+------------+
| name|      date|amount| id|first_amount|
+-----+----------+------+---+------------+
|Alice|2016-05-01|  45.0|  2|        45.0|
|Alice|2016-05-01|  50.0|  1|        45.0|
|Alice|2016-05-02| 100.0|  4|       100.0|
|Alice|2016-05-02|  55.0|  3|       100.0|
|  Bob|2016-05-01|  29.0|  6|        29.0|
|  Bob|2016-05-01|  25.0|  5|        29.0|
|  Bob|2016-05-02|  30.0|  8|        30.0|
|  Bob|2016-05-02|  27.0|  7|        30.0|
+-----+----------+------+---+------------+

In [3]:
val customers = Seq(
    ("Alice", "1", "2016-05-01", 50.00),
    ("Alice", "1", "2016-05-03", 45.00),
    ("Alice", "2", "2016-05-04", 55.00),
    ("Bob", "2", "2016-05-01", 25.00),
    ("Bob", "2", "2016-05-04", 29.00),
    ("Bob", "2", "2016-05-06", 27.00)
).toDF("name", "group", "date", "amountSpent")
customers.show
+-----+-----+----------+-----------+
| name|group|      date|amountSpent|
+-----+-----+----------+-----------+
|Alice|    1|2016-05-01|       50.0|
|Alice|    1|2016-05-03|       45.0|
|Alice|    2|2016-05-04|       55.0|
|  Bob|    2|2016-05-01|       25.0|
|  Bob|    2|2016-05-04|       29.0|
|  Bob|    2|2016-05-06|       27.0|
+-----+-----+----------+-----------+

In [2]:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
In [4]:
val ws1 = Window.partitionBy("name").orderBy("date")
val ws2 = Window.partitionBy("group").orderBy("date")
In [6]:
customers.
    withColumn("i", row_number().over(ws1)).
    withColumn("j", row_number().over(ws2)).
    show()
+-----+-----+----------+-----------+---+---+
| name|group|      date|amountSpent|  i|  j|
+-----+-----+----------+-----------+---+---+
|Alice|    1|2016-05-01|       50.0|  1|  1|
|Alice|    1|2016-05-03|       45.0|  2|  2|
|  Bob|    2|2016-05-01|       25.0|  1|  1|
|  Bob|    2|2016-05-04|       29.0|  2|  2|
|Alice|    2|2016-05-04|       55.0|  3|  3|
|  Bob|    2|2016-05-06|       27.0|  3|  4|
+-----+-----+----------+-----------+---+---+

Comments