Tips and Traps¶
- HDFS table might contain invalid data (I'm not clear about the reasons at this time) with respct to the column types (e.g., Date and Timestamp). This will cause issues when Spark tries to load the data. For more discussions, please refer to Unrecognized column type:TIMESTAMP_TYP.
datetime.datetime
ordatetime.date
objects CANNOT be used in date functions in PySpark (e.g.,datediff
) directly. You have to wrap them in the functionlit
which convertsdatetime.datetime
anddatetime.date
objects to Columns ofTimestampType
andDateType
in PySpark DataFrames respectively. As a matter of fact, it is suggested that you always use thelit
function to explicit convert scalar values to Columns no matter implicit conversion might happen or not.Most date functions work on a string of the format
yyyy-MM-dd
which is automatically casted to a date object. Notice that other date format (e.g.,yyyy/MM/dd
) are not supported and will cause null values to be returned. Note that the functionto_date
also supportyyyy-MM-dd
as the default type when a format string is not specified.Functions
second
,minute
,day
/dayofmonth
,weekofyear
,monthofyear
,quarter
andyear
extract the corresponding part from a date object/string.date_add
,date_sub
,datediff
andadd_months
performs arithmatical operations on dates.to_date
,to_timestamp
,to_utc_timestamp
,to_unix_timestamp
andtimestamp
cast date objects/strings.The method
Column.between
casts its parameters to be of the same type as the Column and then perform comparsisons. For example, ifColumn.betwen
is invoked on a string column, it automatically casts its parameters to be the string type; ifColumn.between
is invoked on a date column, it automatically casts its parameters to be the date type. If the cast fails for a row, anull
value is generated. Specifically,Column.between
does NOT trying to convert a string to a Column automatically no matter it is invoked on a date Column or not (even though most date-related Column functions automatically convert strings to Columns when applies). To avoid confusion and tricky bugs, it is suggested that you avoid relying on the feature of auto converting string to Columns when calling date-related functions but instead always explicitly convert a string to a column by callingcol("col_name")
.java.sql.Date
can be used with spark but does not support arithmatical computation.java.time
support arithmatical computation but cannot be used as a Spark column directly. You have to rely onjava.time
for arithmatical computation and then convert dates tojava.sql.Date
so that they can be used in Spark DataFrames.
from pathlib import Path
import datetime
import pandas as pd
import findspark
findspark.init(str(next(Path("/opt").glob("spark-3*"))))
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StringType
spark = (
SparkSession.builder.appName("PySpark_Str_Func").enableHiveSupport().getOrCreate()
)
df = spark.createDataFrame(
pd.DataFrame(
data=(("2017-01-01", "2017-01-07"), ("2020-02-01", "2019-02-10")),
columns=["d1_s", "d2_s"],
)
)
df.show()
The Python type datetime.date
is mapped to DateType
in PySpark.
df.withColumn("d3_d", lit(datetime.date.today())).schema
The Python type datetime.datetime
is mapped to TimestampType
in PySpark.
df.withColumn("d3_ts", lit(datetime.datetime.today())).schema
Comparing a string column with a str works as expected.
df.filter(col("d1_s") >= "2017-01-15").show()
A datetime.date
object is formatted as YYYY-mm-dd by default when converted to a string.
This is the behavior in most programming languages.
d = datetime.date.today()
d
str(d)
Comparing a string column of the format YYYY-mm-dd
with a datetime.date object works as expecte.
df.filter(col("d1_s") >= datetime.date(2017, 1, 15)).show()
Similarly the between
method of a string Column
works when 2 datetime.date
objects are passed to it.
df.filter(
col("d1_s").between(datetime.date(2017, 1, 15), datetime.date(2021, 1, 15))
).show()
The method between
of a string Column works even when mixed types of parameters are pass to it.
df.filter(col("d1_s").between("2017-01-15", datetime.date(2021, 1, 15))).show()
The between
method of a date Column converts arguments passed to it to date before doing comparsions.
Non-valid arguments get converted to null
.
df.filter(col("d3_d").between("2017-01-15", "abc")).show()
df.filter(col("d3_d").astype(StringType()).between("2017-01-15", "abc")).show()
Notice that the between
method a a string Column does not automatically convert a column name to a Column.
As a matter of fact,
it is suggested that you avoid relying on automatically conversion from column names to Columns
as it can cause tricky issues if not careful.
df.filter(col("d2_s").between("d1_s", "d3_d")).show()
df.filter(col("d2_s").between(col("d1_s"), col("d3_d"))).show()
The function datediff
does NOT work with a string Column
and a datetime.date
object directly.
However,
it works if you convert a datetime.date object to a Column
using lit
.
df.select(datediff("d1", lit(datetime.date(2017, 1, 15)))).show()
+/- Operators¶
The +/- operators are supported on date/time columns in Spark 3.
spark.sql(
"""
select
current_date as today
"""
).show()
spark.sql(
"""
select
current_date + 10 as today
"""
).show()
spark.sql(
"""
select
current_date - 1 as today
"""
).show()
add_months¶
date_add¶
df = spark.createDataFrame(
pd.DataFrame(
data=(("2017-01-01", "2017-01-07"), ("2017-02-01", "2019-02-10")),
columns=["d1", "d2"],
)
)
df.show()
df1 = (
df.withColumn("d3", date_sub("d1", 30))
.withColumn("d4", date_add("d1", 30))
.withColumn("check", col("d2").between("d3", "d4"))
)
df1.show()
df1.schema
date_trunc¶
date_sub¶
datediff¶
df2 = df.withColumn("diff", datediff("d2", "d1"))
df2.show()
df2.schema
df2 = df.withColumn("diff", datediff("d2", datetime.date.today()))
df2.show()
df2.schema
df2 = df.withColumn("diff", datediff("d2", lit(datetime.date.today())))
df2.show()
df2.schema
spark.sql(
"""
select datediff('2020-12-04', '2020-12-03')
"""
).show()
spark.sql(
"""
select datediff('2020/12/04', '2020/12/03')
"""
).show()
current_date¶
val df3 = df.withColumn("current", current_date())
df3.show
df3.schema
current_timestamp / now¶
Both current_timestamp
and now
returns the current timestamp.
The difference is that now
must be called with parentheses
while curent_timestamp
can be called without parentheses.
If you are not sure,
always call functions with parentheses.
spark.sql(
"""
select
current_timestamp
"""
).show(n=1, truncate=False)
spark.sql(
"""
select
current_timestamp()
"""
).show(n=1, truncate=False)
spark.sql(
"""
select
now()
"""
).show(n=1, truncate=False)
dayofmonth / day¶
Returns the day from a given date or timestamp. This function is the same as the day function.
dayofmonth
is the same as the day
function.
val df4 = df.withColumn("day_of_d2", dayofmonth($"d2"))
df4.show
df4.schema
spark.sql(
"""
select
dayofmonth("2017-01-07")
"""
).show()
dayofyear¶
val df5 = df.withColumn("day_of_year_d1", dayofyear($"d1")).withColumn("day_of_year_d2", dayofyear($"d2"))
df5.show
df5.schema
date_format¶
val df6 = df.withColumn("format_d1", date_format($"d1", "dd/MM/yyyy"))
df6.show
df6.schema
date_trunc¶
Returns a timestamp specified as (ts) truncated to the unit specified by format (fmt) [“YEAR”, “YYYY”, “YY”, “MON”, “MONTH”, “MM”, “DAY”, “DD”, “HOUR”, “MINUTE”, “SECOND”, “WEEK”, “QUARTER”]
minute¶
month¶
spark.sql(
"""
select
month("2018-01-01") as month
"""
).show()
now¶
Returns the current timestamp.
next_day¶
Returns the day after the start_date specified by day_of_week. Day of week can be specified as ‘MON’, ‘TUE’, ‘WED’, ‘THU’, ‘FRI’, ‘SAT’, ‘SUN’ or as ‘MO’, ‘TU’, ‘WE’, ‘TH’, ‘FR’, ‘SA’, ‘SU’.
quarter¶
second¶
timestamp¶
to_date¶
spark.sql(
"""
select to_date('2020-12-04')
"""
).show()
spark.sql(
"""
select to_date('2020/12/04')
"""
).show()
spark.sql(
"""
select to_date('2020/12/04', 'yyyy/MM/dd')
"""
).show()
to_utc_timestamp¶
to_unix_timestamp¶
to_timestamp¶
unix_timestamp¶
weekofyear¶
year¶
spark.sql(
"""
select
year("2018-01-01") as year
"""
).show()
Aggregation w.r.t a Date Column¶
df1 = spark.createDataFrame(
pd.DataFrame(
data=(
(1, "2017-01-01", 1, 2),
(1, "2017-01-02", 2, 3),
(1, "2017-02-01", 10, 11),
(1, "2017-02-02", 20, 21),
(2, "2017-01-03", 3, 4),
(2, "2017-01-04", 4, 5),
(2, "2017-02-07", 11, 12),
(2, "2017-02-08", 22, 23),
),
columns=["user", "date", "x", "y"],
)
)
df1.show()
df2 = spark.createDataFrame(
pd.DataFrame(
data=(
(11, "2017-01-01", 10),
(11, "2017-01-02", 20),
(11, "2017-02-01", 100),
(11, "2017-02-02", 200),
(22, "2017-01-03", 30),
(22, "2017-01-04", 40),
(22, "2017-02-07", 110),
(22, "2017-02-08", 220),
),
columns=["user", "date", "x"],
)
)
df2.show()
def sum_col_date(
col_agg: str,
col_date: str,
date: datetime.date,
days_before_1: int,
days_before_2: int,
):
date1 = date - datetime.timedelta(days=days_before_1)
date2 = date - datetime.timedelta(days=days_before_2)
return sum(
when(col(col_date).between(date1, date2), col(col_agg)).otherwise(0)
).alias(f"sum_{col_agg}_{days_before_1}_{days_before_2}")
def agg_col(col_agg, col_date, date, days_before_1, days_before_2):
return [
avg(col_agg).alias(f"avg_{col_agg}"),
sum_col_date(col_agg, "date", datetime.date(2017, 1, 5), 7, 1),
]
df1.groupBy("user").agg(
sum_col_date("x", "date", datetime.date(2017, 1, 5), 7, 1)
).show()
df1.select(*["user", "date"], col("x")).show()
df2.groupBy("user").agg(
sum_col_date("x", "date", datetime.date(2017, 1, 5), 7, 1)
).show()
df1.groupBy("user").agg(
*agg_col("x", "date", datetime.date(2017, 1, 5), 7, 1),
*agg_col("y", "date", datetime.date(2017, 1, 5), 7, 1)
).show()