Ben Chuanlong Du's Blog

It is never too late to learn.

New Features in Spark 3

AQE (Adaptive Query Execution)

To enable AQE, you have to set spark.sql.adaptive.enabled to true (using --conf spark.sql.adaptive.enabled=true in spark-submit or using `spark.config("spark.sql.adaptive,enabled", "true") in Spark/PySpark code.)

Pandas UDFs

Pandas UDFs are user defined functions that are executed by Spark using Arrow to transfer data to Pandas to work with the data, which allows vectorized operations. A Pandas UDF is defined using pandas_udf as a decorator or to wrap the function, and no additional configuration is required. A Pandas UDF behaves as a regular PySpark function API in general.

Pandas UDFs are introdduced in Spark 2.3. However, it has been greatly simplfied and made more Pythonic in Spark 3.0. Before Spark 3.0, Pandas UDFs used to be defined with PandasUDFType. From Spark 3.0 with Python 3.6+, you can also use Python type hints. Using Python type hints are preferred and using PandasUDFType will be deprecated in the future release.

Note that the type hint should use pandas.Series in all cases but there is one variant that pandas.DataFrame should be used for its input or output type hint instead when the input or output column is of StructType. The following example shows a Pandas UDF which takes long column, string column and struct column, and outputs a struct column. It requires the function to specify the type hints of pandas.Series and pandas.DataFrame as below:

In [ ]:
import pandas as pd

from pyspark.sql.functions import pandas_udf


@pandas_udf("col1 string, col2 long")
def func(s1: pd.Series, s2: pd.Series, s3: pd.DataFrame) -> pd.DataFrame:
    s3["col2"] = s1 + s2.str.len()
    return s3


# Create a Spark DataFrame that has three columns including a sturct column.
df = spark.createDataFrame(
    [[1, "a string", ("a nested string",)]],
    "long_col long, string_col string, struct_col struct<col1:string>",
)

df.printSchema()
# root
# |-- long_column: long (nullable = true)
# |-- string_column: string (nullable = true)
# |-- struct_column: struct (nullable = true)
# |    |-- col1: string (nullable = true)

df.select(func("long_col", "string_col", "struct_col")).printSchema()
# |-- func(long_col, string_col, struct_col): struct (nullable = true)
# |    |-- col1: string (nullable = true)
# |    |-- col2: long (nullable = true)
In [ ]:

In [ ]:

In [ ]:

In [ ]:

Comments