Ben Chuanlong Du's Blog

And let it direct your passion with reason.

User-defined Function (UDF) in PySpark

Tips and Traps

  1. The easist way to define a UDF in PySpark is to use the @udf tag, and similarly the easist way to define a Pandas UDF in PySpark is to use the @pandas_udf tag. Pandas UDFs are preferred to UDFs for server reasons. First, pandas UDFs are typically much faster than UDFs. Second, pandas UDFs are more flexible than UDFs on parameter passing. Both UDFs and pandas UDFs can take multiple columns as parameters. In addition, pandas UDFs can take a DataFrame as parameter (when passed to the apply function after groupBy is called).

  2. You need to specify a value for the parameter returnType (the type of elements in the PySpark DataFrame Column) when creating a (pandas) UDF. Both type objects (e.g., StringType()) and names of types (e.g., "string") are accepted. Specifying names of types is simpler (as you do not have to import the corresponding types and names are short to type) but at the cost of losing the ability to do static type checking (e.g., using pylint) on the used return types.

  3. When invoking a (pandas) UDF, you can either pass column expressions (e.g., col("name")) or names of columns (e.g., "name") to it. It is suggested that you always use the explicit way (col("name")) as it avoids confusions in certain situations.

  4. UDFs created using the tags @udf and @pandas_udf can only be used in DataFrame APIs but not in Spark SQL. To use a UDF or Pandas UDF in Spark SQL, you have to register it using spark.udf.register. Notice that spark.udf.register can not only register UDFs and pandas UDFS but also a regular Python function (in which case you have to specify return types).

  5. BinaryType has already been supported in versions earlier than Spark 2.4. However, conversion between a Spark DataFrame which contains BinaryType columns and a pandas DataFrame (via pyarrow) is not supported until spark 2.4.

  6. Pandas UDF leveraging PyArrow (>=0.15) causes java.lang.IllegalArgumentException in PySpark 2.4 (PySpark 3 has fixed issues completely). Listed below are 3 ways to fix this issue. For more discussions please refer to Apache Arrow in PySpark, PySpark pandas_udfs java.lang.IllegalArgumentException error and pandas udf not working with latest pyarrow release (0.15.0) .

    1. Downgrade PyArrow to 0.14.1 (if you have to stick to PySpark 2.4).
    2. Set the environment variable ARROW_PRE_0_15_IPC_FORMAT to be 1 (if you have to stick to PySpark 2.4). You can do this using spark.yarn.appMasterEnv.ARROW_PRE_0_15_IPC_FORMAT=1 and spark.executorEnv.ARROW_PRE_0_15_IPC_FORMAT=1.
    3. Use PySpark 3.
  1. If your (pandas) UDF needs a non-Column parameter, there are 3 ways to achieve it.

    • Use a global variable in your pandas UDF.
    • Use a curried function which takes non-Column parameter(s) and return a (pandas) UDF (which then takes Columns as parameters).

        def comparator_udf(n):
            return udf(lambda c: c == n, BooleanType())
      
        df.where(comparator_udf("Bonsanto")(col("name")))
    • Simplify treat a non-Column parameter as a Column parameter and wrap the parameter into lit when invoking the (pandas) UDF.

      The 1st way is error-prone and inflexible. Both the 2nd and the last approaches are solid, however, the last ways is preferred as it is universal and also more flexible (if you want to use Column parameters to replace non-Column parameters later).

In [1]:
import pandas as pd
import findspark

findspark.init(str(next(Path("/opt/").glob("spark-*"))))
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import *
from pyspark.sql.types import IntegerType, StringType, StructType

spark = SparkSession.builder.appName("PySpark UDF").enableHiveSupport().getOrCreate()
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/12/10 19:55:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
In [63]:
df_p = pd.DataFrame(
    data=[
        ["Ben", 1, 30],
        ["Dan", 1, 25],
        ["Will", 2, 26],
        ["Peter", 2, 37],
    ],
    columns=["name", "gid", "age"],
)
df = spark.createDataFrame(df_p)
df.show()
+-----+---+---+
| name|gid|age|
+-----+---+---+
|  Ben|  1| 30|
|  Dan|  1| 25|
| Will|  2| 26|
|Peter|  2| 37|
+-----+---+---+

In [64]:
df.createOrReplaceTempView("table_df")

Pandas UDF

  1. BinaryType has already been supported in versions earlier than Spark 2.4. However, conversion between a Spark DataFrame which contains BinaryType columns and a pandas DataFrame (via pyarrow) is not supported until spark 2.4.

Take One or Multiple Columns and Return One Column

An UDF takes in one or multiple pandas Series and returns a pandas Series in this case. The only restriction here is that the UDF must return a pandas Series with the length as the lenghth of input series.

PySpark 2

In [80]:
@pandas_udf(returnType="integer", functionType=PandasUDFType.SCALAR)
def age_plus_one(age):
    return pd.Series(v + 1 for v in age)
In [81]:
df.withColumn("age1", age_plus_one("age")).show()
+-----+---+---+----+
| name|gid|age|age1|
+-----+---+---+----+
|  Ben|  1| 30|  31|
|  Dan|  1| 25|  26|
| Will|  2| 26|  27|
|Peter|  2| 37|  38|
+-----+---+---+----+

In [82]:
@pandas_udf(returnType="string", functionType=PandasUDFType.SCALAR)
def concat(name: pd.Series, age: pd.Series) -> pd.Series:
    return name + " is " + age.astype(str) + " age years old."
In [68]:
df.withColumn("desc", concat("name", "age")).show()
+-----+---+---+--------------------+
| name|gid|age|                desc|
+-----+---+---+--------------------+
|  Ben|  1| 30|Ben is 30 age yea...|
|  Dan|  1| 25|Dan is 25 age yea...|
| Will|  2| 26|Will is 26 age ye...|
|Peter|  2| 37|Peter is 37 age y...|
+-----+---+---+--------------------+

PySpark 3

In [83]:
@pandas_udf(returnType="integer")
def age_plus_one(age: pd.Series) -> pd.Series:
    return pd.Series(v + 1 for v in age)
In [77]:
df.withColumn("age1", age_plus_one("age")).show()
+-----+---+---+----+
| name|gid|age|age1|
+-----+---+---+----+
|  Ben|  1| 30|  31|
|  Dan|  1| 25|  26|
| Will|  2| 26|  27|
|Peter|  2| 37|  38|
+-----+---+---+----+

In [78]:
@pandas_udf(returnType="string")
def concat(name: pd.Series, age: pd.Series) -> pd.Series:
    return name + " is " + age.astype(str) + " age years old."
In [79]:
df.withColumn("desc", concat("name", "age")).show()
+-----+---+---+--------------------+
| name|gid|age|                desc|
+-----+---+---+--------------------+
|  Ben|  1| 30|Ben is 30 age yea...|
|  Dan|  1| 25|Dan is 25 age yea...|
| Will|  2| 26|Will is 26 age ye...|
|Peter|  2| 37|Peter is 37 age y...|
+-----+---+---+--------------------+

Take a pandas DataFrame and Return a pandas DataFrame

PySpark 2

In [84]:
# PySpark 2
@pandas_udf(
    returnType="name string, gid int, age int, desc string",
    functionType=PandasUDFType.GROUPED_MAP,
)
def desc(df):
    df["desc"] = df.name + " is " + df.age.astype(str) + " years old."
    return df
In [85]:
df.groupBy(spark_partition_id()).apply(desc).show()
+-----+---+---+--------------------+
| name|gid|age|                desc|
+-----+---+---+--------------------+
|  Ben|  1| 30|Ben is 30 years old.|
|  Dan|  1| 25|Dan is 25 years old.|
| Will|  2| 26|Will is 26 years ...|
|Peter|  2| 37|Peter is 37 years...|
+-----+---+---+--------------------+

In terms of machine learning model prediction, it can be something look like the following. Assuming,

  • df is a DataFrame containing columns name, gid, age and some feature columns.
  • feature is a list containing names of feature columns used by the model
  • model is a (XGBoost, LightGBM, PyTorch, etc) model loaded into Python.
In [27]:
@pandas_udf(
    returnType="name string, gid int, age int, prob string",
    functionType=PandasUDFType.GROUPED_MAP,
)
def predict(df):
    df["prob"] = model.predict_proba(df[features])[:, 1]
    # keep only needed columns
    return df[["name", "gid", "age", "prob"]]
In [ ]:
df.groupBy(spark_partition_id()).apply(predict)

Note that it is perfect OK to group by a column of the DataFrame instead of spark_partition_id() in the above 2 examples. Grouping-by in Spark always shuffles data which means that grouping by spark_partition_id() doesn't give you any performance benefits. As a matter of fact, the above way of doing prediction is discouraged due to data shuffling. A pandas UDF taking multiple columns and return one column is preferred. In PySpark 3, DataFrame.mapInPandas is preferred.

PySpark 3

In [104]:
def desc(itdf):
    for df in itdf:
        df["desc"] = df["name"] + " is " + df["age"].astype(str) + " years old."
        yield df
In [106]:
df.mapInPandas(desc, schema="name string, gid int, age int, desc string").show()
+-----+---+---+--------------------+
| name|gid|age|                desc|
+-----+---+---+--------------------+
|  Ben|  1| 30|Ben is 30 years old.|
|  Dan|  1| 25|Dan is 25 years old.|
| Will|  2| 26|Will is 26 years ...|
|Peter|  2| 37|Peter is 37 years...|
+-----+---+---+--------------------+

In [107]:
def desc2(df):
    df["desc"] = df["name"] + " is " + df["age"].astype(str) + " years old."
    return df
In [113]:
df.groupBy(spark_partition_id()).applyInPandas(
    desc2, schema="name string, gid int, age int, desc string"
).show()
+-----+---+---+--------------------+
| name|gid|age|                desc|
+-----+---+---+--------------------+
|  Ben|  1| 30|Ben is 30 years old.|
|  Dan|  1| 25|Dan is 25 years old.|
| Will|  2| 26|Will is 26 years ...|
|Peter|  2| 37|Peter is 37 years...|
+-----+---+---+--------------------+

Even though the above 2 examples yield the same result, DataFrame.mapInPandas is preferred as applyInPandas requires grouping-by which causes data shuffling.

Aggregation or Windows Function

Series to scalar pandas UDFs in PySpark 3+ (corresponding to PandasUDFType.GROUPED_AGG in PySpark 2) are similar to Spark aggregate functions. A Series to scalar pandas UDF defines an aggregation from one or more pandas Series to a scalar value, where each pandas Series represents a Spark column. You use a Series to scalar pandas UDF with APIs such as select, withColumn, groupBy.agg, and pyspark.sql.Window.

You express the type hint as pandas.Series, ... -> Any. The return type should be a primitive data type, and the returned scalar can be either a Python primitive type, for example, int or float or a NumPy data type such as numpy.int64 or numpy.float64. Any should ideally be a specific scalar type.

This type of UDF does not support partial aggregation and all data for each group is loaded into memory.

PySpark 2

In [114]:
# PySpark 2
@pandas_udf(returnType="double", functionType=PandasUDFType.GROUPED_AGG)
def avg_age(age):
    return age.mean()
In [115]:
df.groupBy("gid").agg(avg_age("age").alias("avg_age")).show()
+---+-------+
|gid|avg_age|
+---+-------+
|  1|   27.5|
|  2|   31.5|
+---+-------+

PySpark 3

In [117]:
# PySpark 2
@pandas_udf(returnType="double")
def avg_age(age: pd.Series) -> float:
    return age.mean()
In [118]:
df.groupBy("gid").agg(avg_age("age").alias("avg_age")).show()
+---+-------+
|gid|avg_age|
+---+-------+
|  1|   27.5|
|  2|   31.5|
+---+-------+

pyspark.sql.GroupedData.applyInPandas can be be used to achive the same thing. Generally speaking, applyInPandas is more flexible.

Column Type of Struct

In [57]:
@pandas_udf(returnType="col1 string, col2 long")
def func(s1: pd.Series, s2: pd.Series, s3: pd.DataFrame) -> pd.DataFrame:
    s3["col2"] = s1 + s2.str.len()
    return s3
In [58]:
df = spark.createDataFrame(
    [[1, "a string", ("a nested string",)]],
    "long_col long, string_col string, struct_col struct<col1:string>",
)
df.show()
+--------+----------+-----------------+
|long_col|string_col|       struct_col|
+--------+----------+-----------------+
|       1|  a string|{a nested string}|
+--------+----------+-----------------+

In [9]:
df.select(func("long_col", "string_col", "struct_col").alias("struct_col2")).show()
+--------------------+
|         struct_col2|
+--------------------+
|{a nested string, 9}|
+--------------------+

Pandas UDFs in Spark SQL

Pandas UDFs created using @pandas_udf can only be used in DataFrame APIs but not in Spark SQL. To use a Pandas UDF in Spark SQL, you have to register it using spark.udf.register. The same holds for UDFs. Notice that spark.udf.register can not only register pandas UDFS and UDFS but also a regular Python function (in which case you have to specify return types).

In [13]:
spark.sql("select *, age_plus_one(age) as age1 from table_df").show()
---------------------------------------------------------------------------
AnalysisException                         Traceback (most recent call last)
<ipython-input-13-501267730fde> in <module>
----> 1 spark.sql("select *, age_plus_one(age) as age1 from table_df").show()

/opt/spark-3.1.1-bin-hadoop3.2/python/pyspark/sql/session.py in sql(self, sqlQuery)
    721         [Row(f1=1, f2='row1'), Row(f1=2, f2='row2'), Row(f1=3, f2='row3')]
    722         """
--> 723         return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
    724 
    725     def table(self, tableName):

/opt/spark-3.1.1-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1302 
   1303         answer = self.gateway_client.send_command(command)
-> 1304         return_value = get_return_value(
   1305             answer, self.gateway_client, self.target_id, self.name)
   1306 

/opt/spark-3.1.1-bin-hadoop3.2/python/pyspark/sql/utils.py in deco(*a, **kw)
    115                 # Hide where the exception came from that shows a non-Pythonic
    116                 # JVM exception message.
--> 117                 raise converted from None
    118             else:
    119                 raise

AnalysisException: Undefined function: 'age_plus_one'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 10
In [18]:
spark.udf.register("age_plus_one", age_plus_one)
Out[18]:
<function __main__.age_plus_one(age: pandas.core.series.Series) -> pandas.core.series.Series>
In [19]:
spark.sql("select *, age_plus_one(age) as age1 from table_df").show()
+----+---+---+----+
|name| id|age|age1|
+----+---+---+----+
| Ben|  2| 30|  31|
| Dan|  4| 25|  26|
|Will|  1| 26|  27|
+----+---+---+----+

In [20]:
df.withColumn("age1", age_plus_one("age")).show()
+----+---+---+----+
|name| id|age|age1|
+----+---+---+----+
| Ben|  2| 30|  31|
| Dan|  4| 25|  26|
|Will|  1| 26|  27|
+----+---+---+----+

Non-Column Parameters

If your (pandas) UDF needs a non-Column parameter, simplify treat it as a Column parameter and wrap the parameter into lit when invoking the (pandas) UDF. Another possibility is to define a regular Python function with takes non-Column parameters and return a (pandas) UDF. The first approach is simpler, universal and also more flexible if later you want to use a Column parameters to replace the non-Column parameters.

In [7]:
@pandas_udf(returnType="integer")
def age_plus_one(age: pd.Series) -> pd.Series:
    return age + 1
In [9]:
df.withColumn("age0", age_plus_one(lit(0))).show()
+----+---+---+----+
|name| id|age|age0|
+----+---+---+----+
| Ben|  2| 30|   1|
| Dan|  4| 25|   1|
|Will|  1| 26|   1|
+----+---+---+----+

UDF Taking One Column as Parameter

In [19]:
@udf(returnType=StringType())
def say_hello(name: str) -> str:
    return f"Hello {name}"
In [21]:
df.withColumn("greetings", say_hello(col("name"))).show()
+----+---+---+----------+
|name| id|age| greetings|
+----+---+---+----------+
| Ben|  2| 30| Hello Ben|
| Dan|  4| 25| Hello Dan|
|Will|  1| 26|Hello Will|
+----+---+---+----------+

In [21]:
def say_hello_2(name: str) -> str:
    return f"Hello {name}"
In [25]:
spark.udf.register("say_hello_udf", say_hello_2, StringType())
Out[25]:
<function __main__.say_hello_2(name: str) -> str>
In [26]:
spark.sql("select *, say_hello_2(name) as hello from table_df").show()
+----+---+---+----------+
|name| id|age|     hello|
+----+---+---+----------+
| Ben|  2| 30| Hello Ben|
| Dan|  4| 25| Hello Dan|
|Will|  1| 26|Hello Will|
+----+---+---+----------+

In [27]:
df.withColumn(say_hello_udf("name")).show()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
<ipython-input-27-83cdbd56ac4a> in <module>
----> 1 df.withColumn(say_hello_udf("name")).show()

NameError: name 'say_hello_udf' is not defined

UDF Taking Two Columns as Parameters

In [31]:
@udf(returnType="string")
def concat(name: str, age: int) -> str:
    return f"{name} is {age} years old."
In [32]:
df.withColumn("greetings", concat(col("name"), col("age"))).show()
+----+---+---+--------------------+
|name| id|age|           greetings|
+----+---+---+--------------------+
| Ben|  2| 30|Ben is 30 years old.|
| Dan|  4| 25|Dan is 25 years old.|
|Will|  1| 26|Will is 26 years ...|
+----+---+---+--------------------+

In [24]:
df.withColumn("greetings", concat("name", "age")).show()
+----+---+---+--------------------+
|name| id|age|           greetings|
+----+---+---+--------------------+
| Ben|  2| 30|Ben is 30 years old.|
| Dan|  4| 25|Dan is 25 years old.|
|Will|  1| 26|Will is 26 years ...|
+----+---+---+--------------------+

In [ ]:
 

Comments