Tips and Traps¶
- HDFS table might contain invalid data (I'm not clear about the reasons at this time) with respct to the column types (e.g., Date and Timestamp). This will cause issues when Spark tries to load the data. For more discussions, please refer to Unrecognized column type:TIMESTAMP_TYP.
datetime.datetimeordatetime.dateobjects CANNOT be used in date functions in PySpark (e.g.,datediff) directly. You have to wrap them in the functionlitwhich convertsdatetime.datetimeanddatetime.dateobjects to Columns ofTimestampTypeandDateTypein PySpark DataFrames respectively. As a matter of fact, it is suggested that you always use thelitfunction to explicit convert scalar values to Columns no matter implicit conversion might happen or not.Most date functions work on a string of the format
yyyy-MM-ddwhich is automatically casted to a date object. Notice that other date format (e.g.,yyyy/MM/dd) are not supported and will cause null values to be returned. Note that the functionto_datealso supportyyyy-MM-ddas the default type when a format string is not specified.Functions
second,minute,day/dayofmonth,weekofyear,monthofyear,quarterandyearextract the corresponding part from a date object/string.date_add,date_sub,datediffandadd_monthsperforms arithmatical operations on dates.to_date,to_timestamp,to_utc_timestamp,to_unix_timestampandtimestampcast date objects/strings.The method
Column.betweencasts its parameters to be of the same type as the Column and then perform comparsisons. For example, ifColumn.betwenis invoked on a string column, it automatically casts its parameters to be the string type; ifColumn.betweenis invoked on a date column, it automatically casts its parameters to be the date type. If the cast fails for a row, anullvalue is generated. Specifically,Column.betweendoes NOT trying to convert a string to a Column automatically no matter it is invoked on a date Column or not (even though most date-related Column functions automatically convert strings to Columns when applies). To avoid confusion and tricky bugs, it is suggested that you avoid relying on the feature of auto converting string to Columns when calling date-related functions but instead always explicitly convert a string to a column by callingcol("col_name").java.sql.Datecan be used with spark but does not support arithmatical computation.java.timesupport arithmatical computation but cannot be used as a Spark column directly. You have to rely onjava.timefor arithmatical computation and then convert dates tojava.sql.Dateso that they can be used in Spark DataFrames.
from pathlib import Path
import datetime
import pandas as pd
import findspark
findspark.init(str(next(Path("/opt").glob("spark-3*"))))
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StringType
spark = (
SparkSession.builder.appName("PySpark_Str_Func").enableHiveSupport().getOrCreate()
)
df = spark.createDataFrame(
pd.DataFrame(
data=(("2017-01-01", "2017-01-07"), ("2020-02-01", "2019-02-10")),
columns=["d1_s", "d2_s"],
)
)
df.show()
The Python type datetime.date is mapped to DateType in PySpark.
df.withColumn("d3_d", lit(datetime.date.today())).schema
The Python type datetime.datetime is mapped to TimestampType in PySpark.
df.withColumn("d3_ts", lit(datetime.datetime.today())).schema
Comparing a string column with a str works as expected.
df.filter(col("d1_s") >= "2017-01-15").show()
A datetime.date object is formatted as YYYY-mm-dd by default when converted to a string.
This is the behavior in most programming languages.
d = datetime.date.today()
d
str(d)
Comparing a string column of the format YYYY-mm-dd with a datetime.date object works as expecte.
df.filter(col("d1_s") >= datetime.date(2017, 1, 15)).show()
Similarly the between method of a string Column works when 2 datetime.date objects are passed to it.
df.filter(
col("d1_s").between(datetime.date(2017, 1, 15), datetime.date(2021, 1, 15))
).show()
The method between of a string Column works even when mixed types of parameters are pass to it.
df.filter(col("d1_s").between("2017-01-15", datetime.date(2021, 1, 15))).show()
The between method of a date Column converts arguments passed to it to date before doing comparsions.
Non-valid arguments get converted to null.
df.filter(col("d3_d").between("2017-01-15", "abc")).show()
df.filter(col("d3_d").astype(StringType()).between("2017-01-15", "abc")).show()
Notice that the between method a a string Column does not automatically convert a column name to a Column.
As a matter of fact,
it is suggested that you avoid relying on automatically conversion from column names to Columns
as it can cause tricky issues if not careful.
df.filter(col("d2_s").between("d1_s", "d3_d")).show()
df.filter(col("d2_s").between(col("d1_s"), col("d3_d"))).show()
The function datediff does NOT work with a string Column and a datetime.date object directly.
However,
it works if you convert a datetime.date object to a Column using lit.
df.select(datediff("d1", lit(datetime.date(2017, 1, 15)))).show()
+/- Operators¶
The +/- operators are supported on date/time columns in Spark 3.
spark.sql(
"""
select
current_date as today
"""
).show()
spark.sql(
"""
select
current_date + 10 as today
"""
).show()
spark.sql(
"""
select
current_date - 1 as today
"""
).show()
add_months¶
date_add¶
df = spark.createDataFrame(
pd.DataFrame(
data=(("2017-01-01", "2017-01-07"), ("2017-02-01", "2019-02-10")),
columns=["d1", "d2"],
)
)
df.show()
df1 = (
df.withColumn("d3", date_sub("d1", 30))
.withColumn("d4", date_add("d1", 30))
.withColumn("check", col("d2").between("d3", "d4"))
)
df1.show()
df1.schema
date_trunc¶
date_sub¶
datediff¶
df2 = df.withColumn("diff", datediff("d2", "d1"))
df2.show()
df2.schema
df2 = df.withColumn("diff", datediff("d2", datetime.date.today()))
df2.show()
df2.schema
df2 = df.withColumn("diff", datediff("d2", lit(datetime.date.today())))
df2.show()
df2.schema
spark.sql(
"""
select datediff('2020-12-04', '2020-12-03')
"""
).show()
spark.sql(
"""
select datediff('2020/12/04', '2020/12/03')
"""
).show()
current_date¶
val df3 = df.withColumn("current", current_date())
df3.show
df3.schema
current_timestamp / now¶
Both current_timestamp and now returns the current timestamp.
The difference is that now must be called with parentheses
while curent_timestamp can be called without parentheses.
If you are not sure,
always call functions with parentheses.
spark.sql(
"""
select
current_timestamp
"""
).show(n=1, truncate=False)
spark.sql(
"""
select
current_timestamp()
"""
).show(n=1, truncate=False)
spark.sql(
"""
select
now()
"""
).show(n=1, truncate=False)
dayofmonth / day¶
Returns the day from a given date or timestamp. This function is the same as the day function.
dayofmonth is the same as the day function.
val df4 = df.withColumn("day_of_d2", dayofmonth($"d2"))
df4.show
df4.schema
spark.sql(
"""
select
dayofmonth("2017-01-07")
"""
).show()
dayofyear¶
val df5 = df.withColumn("day_of_year_d1", dayofyear($"d1")).withColumn("day_of_year_d2", dayofyear($"d2"))
df5.show
df5.schema
date_format¶
val df6 = df.withColumn("format_d1", date_format($"d1", "dd/MM/yyyy"))
df6.show
df6.schema
date_trunc¶
Returns a timestamp specified as (ts) truncated to the unit specified by format (fmt) [“YEAR”, “YYYY”, “YY”, “MON”, “MONTH”, “MM”, “DAY”, “DD”, “HOUR”, “MINUTE”, “SECOND”, “WEEK”, “QUARTER”]
minute¶
month¶
spark.sql(
"""
select
month("2018-01-01") as month
"""
).show()
now¶
Returns the current timestamp.
next_day¶
Returns the day after the start_date specified by day_of_week. Day of week can be specified as ‘MON’, ‘TUE’, ‘WED’, ‘THU’, ‘FRI’, ‘SAT’, ‘SUN’ or as ‘MO’, ‘TU’, ‘WE’, ‘TH’, ‘FR’, ‘SA’, ‘SU’.
quarter¶
second¶
timestamp¶
to_date¶
spark.sql(
"""
select to_date('2020-12-04')
"""
).show()
spark.sql(
"""
select to_date('2020/12/04')
"""
).show()
spark.sql(
"""
select to_date('2020/12/04', 'yyyy/MM/dd')
"""
).show()
to_utc_timestamp¶
to_unix_timestamp¶
to_timestamp¶
unix_timestamp¶
weekofyear¶
year¶
spark.sql(
"""
select
year("2018-01-01") as year
"""
).show()
Aggregation w.r.t a Date Column¶
df1 = spark.createDataFrame(
pd.DataFrame(
data=(
(1, "2017-01-01", 1, 2),
(1, "2017-01-02", 2, 3),
(1, "2017-02-01", 10, 11),
(1, "2017-02-02", 20, 21),
(2, "2017-01-03", 3, 4),
(2, "2017-01-04", 4, 5),
(2, "2017-02-07", 11, 12),
(2, "2017-02-08", 22, 23),
),
columns=["user", "date", "x", "y"],
)
)
df1.show()
df2 = spark.createDataFrame(
pd.DataFrame(
data=(
(11, "2017-01-01", 10),
(11, "2017-01-02", 20),
(11, "2017-02-01", 100),
(11, "2017-02-02", 200),
(22, "2017-01-03", 30),
(22, "2017-01-04", 40),
(22, "2017-02-07", 110),
(22, "2017-02-08", 220),
),
columns=["user", "date", "x"],
)
)
df2.show()
def sum_col_date(
col_agg: str,
col_date: str,
date: datetime.date,
days_before_1: int,
days_before_2: int,
):
date1 = date - datetime.timedelta(days=days_before_1)
date2 = date - datetime.timedelta(days=days_before_2)
return sum(
when(col(col_date).between(date1, date2), col(col_agg)).otherwise(0)
).alias(f"sum_{col_agg}_{days_before_1}_{days_before_2}")
def agg_col(col_agg, col_date, date, days_before_1, days_before_2):
return [
avg(col_agg).alias(f"avg_{col_agg}"),
sum_col_date(col_agg, "date", datetime.date(2017, 1, 5), 7, 1),
]
df1.groupBy("user").agg(
sum_col_date("x", "date", datetime.date(2017, 1, 5), 7, 1)
).show()
df1.select(*["user", "date"], col("x")).show()
df2.groupBy("user").agg(
sum_col_date("x", "date", datetime.date(2017, 1, 5), 7, 1)
).show()
df1.groupBy("user").agg(
*agg_col("x", "date", datetime.date(2017, 1, 5), 7, 1),
*agg_col("y", "date", datetime.date(2017, 1, 5), 7, 1)
).show()