Ben Chuanlong Du's Blog

And let it direct your passion with reason.

Date Functions in Spark

Tips and Traps

  1. HDFS table might contain invalid data (I'm not clear about the reasons at this time) with respct to the column types (e.g., Date and Timestamp). This will cause issues when Spark tries to load the data. For more discussions, please refer to Unrecognized column type:TIMESTAMP_TYP.
  1. datetime.datetime or datetime.date objects CANNOT be used in date functions in PySpark (e.g., datediff) directly. You have to wrap them in the function lit which converts datetime.datetime and datetime.date objects to Columns of TimestampType and DateType in PySpark DataFrames respectively. As a matter of fact, it is suggested that you always use the lit function to explicit convert scalar values to Columns no matter implicit conversion might happen or not.

  2. Most date functions work on a string of the format yyyy-MM-dd which is automatically casted to a date object. Notice that other date format (e.g., yyyy/MM/dd) are not supported and will cause null values to be returned. Note that the function to_date also support yyyy-MM-dd as the default type when a format string is not specified.

  3. Functions second, minute, day/dayofmonth, weekofyear, monthofyear, quarter and year extract the corresponding part from a date object/string.

  4. date_add, date_sub, datediff and add_months performs arithmatical operations on dates.

  5. to_date, to_timestamp, to_utc_timestamp, to_unix_timestamp and timestamp cast date objects/strings.

  6. The method Column.between casts its parameters to be of the same type as the Column and then perform comparsisons. For example, if Column.betwen is invoked on a string column, it automatically casts its parameters to be the string type; if Column.between is invoked on a date column, it automatically casts its parameters to be the date type. If the cast fails for a row, a null value is generated. Specifically, Column.between does NOT trying to convert a string to a Column automatically no matter it is invoked on a date Column or not (even though most date-related Column functions automatically convert strings to Columns when applies). To avoid confusion and tricky bugs, it is suggested that you avoid relying on the feature of auto converting string to Columns when calling date-related functions but instead always explicitly convert a string to a column by calling col("col_name").

  7. java.sql.Date can be used with spark but does not support arithmatical computation. java.time support arithmatical computation but cannot be used as a Spark column directly. You have to rely on java.time for arithmatical computation and then convert dates to java.sql.Date so that they can be used in Spark DataFrames.

In [2]:
from pathlib import Path
import datetime
import pandas as pd
import findspark

findspark.init(str(next(Path("/opt").glob("spark-3*"))))
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StringType

spark = (
    SparkSession.builder.appName("PySpark_Str_Func").enableHiveSupport().getOrCreate()
)
In [11]:
df = spark.createDataFrame(
    pd.DataFrame(
        data=(("2017-01-01", "2017-01-07"), ("2020-02-01", "2019-02-10")),
        columns=["d1_s", "d2_s"],
    )
)
df.show()
+----------+----------+
|      d1_s|      d2_s|
+----------+----------+
|2017-01-01|2017-01-07|
|2020-02-01|2019-02-10|
+----------+----------+

The Python type datetime.date is mapped to DateType in PySpark.

In [12]:
df.withColumn("d3_d", lit(datetime.date.today())).schema
Out[12]:
StructType(List(StructField(d1_s,StringType,true),StructField(d2_s,StringType,true),StructField(d3_d,DateType,false)))

The Python type datetime.datetime is mapped to TimestampType in PySpark.

In [13]:
df.withColumn("d3_ts", lit(datetime.datetime.today())).schema
Out[13]:
StructType(List(StructField(d1_s,StringType,true),StructField(d2_s,StringType,true),StructField(d3_ts,TimestampType,false)))

Comparing a string column with a str works as expected.

In [41]:
df.filter(col("d1_s") >= "2017-01-15").show()
+----------+----------+----------+
|      d1_s|      d2_s|      d3_d|
+----------+----------+----------+
|2020-02-01|2019-02-10|2021-03-24|
+----------+----------+----------+

A datetime.date object is formatted as YYYY-mm-dd by default when converted to a string. This is the behavior in most programming languages.

In [42]:
d = datetime.date.today()
d
Out[42]:
datetime.date(2021, 3, 24)
In [43]:
str(d)
Out[43]:
'2021-03-24'

Comparing a string column of the format YYYY-mm-dd with a datetime.date object works as expecte.

In [44]:
df.filter(col("d1_s") >= datetime.date(2017, 1, 15)).show()
+----------+----------+----------+
|      d1_s|      d2_s|      d3_d|
+----------+----------+----------+
|2020-02-01|2019-02-10|2021-03-24|
+----------+----------+----------+

Similarly the between method of a string Column works when 2 datetime.date objects are passed to it.

In [46]:
df.filter(
    col("d1_s").between(datetime.date(2017, 1, 15), datetime.date(2021, 1, 15))
).show()
+----------+----------+----------+
|      d1_s|      d2_s|      d3_d|
+----------+----------+----------+
|2020-02-01|2019-02-10|2021-03-24|
+----------+----------+----------+

The method between of a string Column works even when mixed types of parameters are pass to it.

In [48]:
df.filter(col("d1_s").between("2017-01-15", datetime.date(2021, 1, 15))).show()
+----------+----------+----------+
|      d1_s|      d2_s|      d3_d|
+----------+----------+----------+
|2020-02-01|2019-02-10|2021-03-24|
+----------+----------+----------+

The between method of a date Column converts arguments passed to it to date before doing comparsions. Non-valid arguments get converted to null.

In [60]:
df.filter(col("d3_d").between("2017-01-15", "abc")).show()
+----+----+----+
|d1_s|d2_s|d3_d|
+----+----+----+
+----+----+----+

In [65]:
df.filter(col("d3_d").astype(StringType()).between("2017-01-15", "abc")).show()
+----------+----------+----------+
|      d1_s|      d2_s|      d3_d|
+----------+----------+----------+
|2017-01-01|2017-01-07|2021-03-24|
|2020-02-01|2019-02-10|2021-03-24|
+----------+----------+----------+

Notice that the between method a a string Column does not automatically convert a column name to a Column. As a matter of fact, it is suggested that you avoid relying on automatically conversion from column names to Columns as it can cause tricky issues if not careful.

In [53]:
df.filter(col("d2_s").between("d1_s", "d3_d")).show()
+----+----+----+
|d1_s|d2_s|d3_d|
+----+----+----+
+----+----+----+

In [54]:
df.filter(col("d2_s").between(col("d1_s"), col("d3_d"))).show()
+----------+----------+----------+
|      d1_s|      d2_s|      d3_d|
+----------+----------+----------+
|2017-01-01|2017-01-07|2021-03-24|
+----------+----------+----------+

The function datediff does NOT work with a string Column and a datetime.date object directly. However, it works if you convert a datetime.date object to a Column using lit.

In [20]:
df.select(datediff("d1", lit(datetime.date(2017, 1, 15)))).show()
+-------------------------------+
|datediff(d1, DATE '2017-01-15')|
+-------------------------------+
|                            -14|
|                             17|
+-------------------------------+

+/- Operators

The +/- operators are supported on date/time columns in Spark 3.

In [2]:
spark.sql(
    """
    select 
        current_date as today
    """
).show()
+----------+
|     today|
+----------+
|2021-01-04|
+----------+

In [3]:
spark.sql(
    """
    select 
        current_date + 10 as today
    """
).show()
+----------+
|     today|
+----------+
|2021-01-14|
+----------+

In [4]:
spark.sql(
    """
    select 
        current_date - 1 as today
    """
).show()
+----------+
|     today|
+----------+
|2021-01-03|
+----------+

add_months

date_add

In [16]:
df = spark.createDataFrame(
    pd.DataFrame(
        data=(("2017-01-01", "2017-01-07"), ("2017-02-01", "2019-02-10")),
        columns=["d1", "d2"],
    )
)
df.show()
+----------+----------+
|        d1|        d2|
+----------+----------+
|2017-01-01|2017-01-07|
|2017-02-01|2019-02-10|
+----------+----------+

In [21]:
df1 = (
    df.withColumn("d3", date_sub("d1", 30))
    .withColumn("d4", date_add("d1", 30))
    .withColumn("check", col("d2").between("d3", "d4"))
)
df1.show()
df1.schema
+----------+----------+----------+----------+-----+
|        d1|        d2|        d3|        d4|check|
+----------+----------+----------+----------+-----+
|2017-01-01|2017-01-07|2016-12-02|2017-01-31|false|
|2017-02-01|2019-02-10|2017-01-02|2017-03-03|false|
+----------+----------+----------+----------+-----+

Out[21]:
StructType(List(StructField(d1,StringType,true),StructField(d2,StringType,true),StructField(d3,DateType,true),StructField(d4,DateType,true),StructField(check,BooleanType,true)))

date_trunc

date_sub

In [ ]:
 

datediff

In [6]:
df2 = df.withColumn("diff", datediff("d2", "d1"))
df2.show()
df2.schema
+----------+----------+----+
|        d1|        d2|diff|
+----------+----------+----+
|2017-01-01|2017-01-07|   6|
|2017-02-01|2019-02-10| 739|
+----------+----------+----+

Out[6]:
StructType(List(StructField(d1,StringType,true),StructField(d2,StringType,true),StructField(diff,IntegerType,true)))
In [7]:
df2 = df.withColumn("diff", datediff("d2", datetime.date.today()))
df2.show()
df2.schema
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-7-97e1d1306b89> in <module>
----> 1 df2 = df.withColumn("diff", datediff("d2", datetime.date.today()))
      2 df2.show()
      3 df2.schema

/opt/spark-3.0.1-bin-hadoop3.2/python/pyspark/sql/functions.py in datediff(end, start)
   1112     """
   1113     sc = SparkContext._active_spark_context
-> 1114     return Column(sc._jvm.functions.datediff(_to_java_column(end), _to_java_column(start)))
   1115 
   1116 

/opt/spark-3.0.1-bin-hadoop3.2/python/pyspark/sql/column.py in _to_java_column(col)
     50         jcol = _create_column_from_name(col)
     51     else:
---> 52         raise TypeError(
     53             "Invalid argument, not a string or column: "
     54             "{0} of type {1}. "

TypeError: Invalid argument, not a string or column: 2021-01-05 of type <class 'datetime.date'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.
In [8]:
df2 = df.withColumn("diff", datediff("d2", lit(datetime.date.today())))
df2.show()
df2.schema
+----------+----------+-----+
|        d1|        d2| diff|
+----------+----------+-----+
|2017-01-01|2017-01-07|-1459|
|2017-02-01|2019-02-10| -695|
+----------+----------+-----+

Out[8]:
StructType(List(StructField(d1,StringType,true),StructField(d2,StringType,true),StructField(diff,IntegerType,true)))
In [32]:
spark.sql(
    """
    select datediff('2020-12-04', '2020-12-03')
    """
).show()
+------------------------------------------------------------+
|datediff(CAST(2020-12-04 AS DATE), CAST(2020-12-03 AS DATE))|
+------------------------------------------------------------+
|                                                           1|
+------------------------------------------------------------+

In [33]:
spark.sql(
    """
    select datediff('2020/12/04', '2020/12/03')
    """
).show()
+------------------------------------------------------------+
|datediff(CAST(2020/12/04 AS DATE), CAST(2020/12/03 AS DATE))|
+------------------------------------------------------------+
|                                                        null|
+------------------------------------------------------------+

current_date

In [17]:
val df3 = df.withColumn("current", current_date())
df3.show
df3.schema
+----------+----------+----------+
|        d1|        d2|   current|
+----------+----------+----------+
|2017-01-01|2017-01-07|2018-05-02|
|2017-02-01|2019-02-10|2018-05-02|
+----------+----------+----------+

Out[17]:
[[StructField(d1,StringType,true), StructField(d2,StringType,true), StructField(current,DateType,false)]]

current_timestamp / now

Both current_timestamp and now returns the current timestamp. The difference is that now must be called with parentheses while curent_timestamp can be called without parentheses. If you are not sure, always call functions with parentheses.

In [17]:
spark.sql(
    """
    select
        current_timestamp
    """
).show(n=1, truncate=False)
+-----------------------+
|current_timestamp()    |
+-----------------------+
|2020-09-07 10:58:58.381|
+-----------------------+

In [20]:
spark.sql(
    """
    select
        current_timestamp()
    """
).show(n=1, truncate=False)
+----------------------+
|current_timestamp()   |
+----------------------+
|2020-09-07 11:00:53.47|
+----------------------+

In [19]:
spark.sql(
    """
    select
        now()
    """
).show(n=1, truncate=False)
+-----------------------+
|now()                  |
+-----------------------+
|2020-09-07 10:59:37.629|
+-----------------------+

dayofmonth / day

Returns the day from a given date or timestamp. This function is the same as the day function. dayofmonth is the same as the day function.

In [21]:
val df4 = df.withColumn("day_of_d2", dayofmonth($"d2"))
df4.show
df4.schema
+----------+----------+---------+
|        d1|        d2|day_of_d2|
+----------+----------+---------+
|2017-01-01|2017-01-07|        7|
|2017-02-01|2019-02-10|       10|
+----------+----------+---------+

Out[21]:
[[StructField(d1,StringType,true), StructField(d2,StringType,true), StructField(day_of_d2,IntegerType,true)]]
In [11]:
spark.sql(
    """
    select 
        dayofmonth("2017-01-07") 
    """
).show()
+------------------------------------+
|dayofmonth(CAST(2017-01-07 AS DATE))|
+------------------------------------+
|                                   7|
+------------------------------------+

dayofyear

In [22]:
val df5 = df.withColumn("day_of_year_d1", dayofyear($"d1")).withColumn("day_of_year_d2", dayofyear($"d2"))
df5.show
df5.schema
+----------+----------+--------------+--------------+
|        d1|        d2|day_of_year_d1|day_of_year_d2|
+----------+----------+--------------+--------------+
|2017-01-01|2017-01-07|             1|             7|
|2017-02-01|2019-02-10|            32|            41|
+----------+----------+--------------+--------------+

Out[22]:
[[StructField(d1,StringType,true), StructField(d2,StringType,true), StructField(day_of_year_d1,IntegerType,true), StructField(day_of_year_d2,IntegerType,true)]]

date_format

In [25]:
val df6 = df.withColumn("format_d1", date_format($"d1", "dd/MM/yyyy"))
df6.show
df6.schema
+----------+----------+----------+
|        d1|        d2| format_d1|
+----------+----------+----------+
|2017-01-01|2017-01-07|01/01/2017|
|2017-02-01|2019-02-10|01/02/2017|
+----------+----------+----------+

Out[25]:
[[StructField(d1,StringType,true), StructField(d2,StringType,true), StructField(format_d1,StringType,true)]]

date_trunc

Returns a timestamp specified as (ts) truncated to the unit specified by format (fmt) [“YEAR”, “YYYY”, “YY”, “MON”, “MONTH”, “MM”, “DAY”, “DD”, “HOUR”, “MINUTE”, “SECOND”, “WEEK”, “QUARTER”]

minute

month

In [9]:
spark.sql(
    """
    select
        month("2018-01-01") as month
    """
).show()
+-----+
|month|
+-----+
|    1|
+-----+

now

Returns the current timestamp.

next_day

Returns the day after the start_date specified by day_of_week. Day of week can be specified as ‘MON’, ‘TUE’, ‘WED’, ‘THU’, ‘FRI’, ‘SAT’, ‘SUN’ or as ‘MO’, ‘TU’, ‘WE’, ‘TH’, ‘FR’, ‘SA’, ‘SU’.

quarter

second

timestamp

to_date

In [36]:
spark.sql(
    """
    select to_date('2020-12-04') 
    """
).show()
+---------------------+
|to_date('2020-12-04')|
+---------------------+
|           2020-12-04|
+---------------------+

In [37]:
spark.sql(
    """
    select to_date('2020/12/04') 
    """
).show()
+---------------------+
|to_date('2020/12/04')|
+---------------------+
|                 null|
+---------------------+

In [38]:
spark.sql(
    """
    select to_date('2020/12/04', 'yyyy/MM/dd') 
    """
).show()
+-----------------------------------+
|to_date('2020/12/04', 'yyyy/MM/dd')|
+-----------------------------------+
|                         2020-12-04|
+-----------------------------------+

to_utc_timestamp

to_unix_timestamp

to_timestamp

unix_timestamp

weekofyear

year

In [8]:
spark.sql(
    """
    select
        year("2018-01-01") as year
    """
).show()
+----+
|year|
+----+
|2018|
+----+

Aggregation w.r.t a Date Column

In [49]:
df1 = spark.createDataFrame(
    pd.DataFrame(
        data=(
            (1, "2017-01-01", 1, 2),
            (1, "2017-01-02", 2, 3),
            (1, "2017-02-01", 10, 11),
            (1, "2017-02-02", 20, 21),
            (2, "2017-01-03", 3, 4),
            (2, "2017-01-04", 4, 5),
            (2, "2017-02-07", 11, 12),
            (2, "2017-02-08", 22, 23),
        ),
        columns=["user", "date", "x", "y"],
    )
)
df1.show()
+----+----------+---+---+
|user|      date|  x|  y|
+----+----------+---+---+
|   1|2017-01-01|  1|  2|
|   1|2017-01-02|  2|  3|
|   1|2017-02-01| 10| 11|
|   1|2017-02-02| 20| 21|
|   2|2017-01-03|  3|  4|
|   2|2017-01-04|  4|  5|
|   2|2017-02-07| 11| 12|
|   2|2017-02-08| 22| 23|
+----+----------+---+---+

In [38]:
df2 = spark.createDataFrame(
    pd.DataFrame(
        data=(
            (11, "2017-01-01", 10),
            (11, "2017-01-02", 20),
            (11, "2017-02-01", 100),
            (11, "2017-02-02", 200),
            (22, "2017-01-03", 30),
            (22, "2017-01-04", 40),
            (22, "2017-02-07", 110),
            (22, "2017-02-08", 220),
        ),
        columns=["user", "date", "x"],
    )
)
df2.show()
+----+----------+---+
|user|      date|  x|
+----+----------+---+
|  11|2017-01-01| 10|
|  11|2017-01-02| 20|
|  11|2017-02-01|100|
|  11|2017-02-02|200|
|  22|2017-01-03| 30|
|  22|2017-01-04| 40|
|  22|2017-02-07|110|
|  22|2017-02-08|220|
+----+----------+---+

In [51]:
def sum_col_date(
    col_agg: str,
    col_date: str,
    date: datetime.date,
    days_before_1: int,
    days_before_2: int,
):
    date1 = date - datetime.timedelta(days=days_before_1)
    date2 = date - datetime.timedelta(days=days_before_2)
    return sum(
        when(col(col_date).between(date1, date2), col(col_agg)).otherwise(0)
    ).alias(f"sum_{col_agg}_{days_before_1}_{days_before_2}")


def agg_col(col_agg, col_date, date, days_before_1, days_before_2):
    return [
        avg(col_agg).alias(f"avg_{col_agg}"),
        sum_col_date(col_agg, "date", datetime.date(2017, 1, 5), 7, 1),
    ]
In [52]:
df1.groupBy("user").agg(
    sum_col_date("x", "date", datetime.date(2017, 1, 5), 7, 1)
).show()
+----+---------+
|user|sum_x_7_1|
+----+---------+
|   1|        3|
|   2|        7|
+----+---------+

In [57]:
df1.select(*["user", "date"], col("x")).show()
+----+----------+---+
|user|      date|  x|
+----+----------+---+
|   1|2017-01-01|  1|
|   1|2017-01-02|  2|
|   1|2017-02-01| 10|
|   1|2017-02-02| 20|
|   2|2017-01-03|  3|
|   2|2017-01-04|  4|
|   2|2017-02-07| 11|
|   2|2017-02-08| 22|
+----+----------+---+

In [53]:
df2.groupBy("user").agg(
    sum_col_date("x", "date", datetime.date(2017, 1, 5), 7, 1)
).show()
+----+---------+
|user|sum_x_7_1|
+----+---------+
|  22|       70|
|  11|       30|
+----+---------+

In [54]:
df1.groupBy("user").agg(
    *agg_col("x", "date", datetime.date(2017, 1, 5), 7, 1),
    *agg_col("y", "date", datetime.date(2017, 1, 5), 7, 1)
).show()
+----+-----+---------+-----+---------+
|user|avg_x|sum_x_7_1|avg_y|sum_y_7_1|
+----+-----+---------+-----+---------+
|   1| 8.25|        3| 9.25|        5|
|   2| 10.0|        7| 11.0|        9|
+----+-----+---------+-----+---------+

In [ ]:
 

Comments