Tips and Traps¶
The easist way to define a UDF in PySpark is to use the
@udf
tag, and similarly the easist way to define a Pandas UDF in PySpark is to use the@pandas_udf
tag. Pandas UDFs are preferred to UDFs for server reasons. First, pandas UDFs are typically much faster than UDFs. Second, pandas UDFs are more flexible than UDFs on parameter passing. Both UDFs and pandas UDFs can take multiple columns as parameters. In addition, pandas UDFs can take a DataFrame as parameter (when passed to theapply
function aftergroupBy
is called).You need to specify a value for the parameter
returnType
(the type of elements in the PySpark DataFrame Column) when creating a (pandas) UDF. Both type objects (e.g.,StringType()
) and names of types (e.g.,"string"
) are accepted. Specifying names of types is simpler (as you do not have to import the corresponding types and names are short to type) but at the cost of losing the ability to do static type checking (e.g., using pylint) on the used return types.When invoking a (pandas) UDF, you can either pass column expressions (e.g.,
col("name")
) or names of columns (e.g.,"name"
) to it. It is suggested that you always use the explicit way (col("name")
) as it avoids confusions in certain situations.UDFs created using the tags
@udf
and@pandas_udf
can only be used in DataFrame APIs but not in Spark SQL. To use a UDF or Pandas UDF in Spark SQL, you have to register it usingspark.udf.register
. Notice thatspark.udf.register
can not only register UDFs and pandas UDFS but also a regular Python function (in which case you have to specify return types).BinaryType has already been supported in versions earlier than Spark 2.4. However, conversion between a Spark DataFrame which contains BinaryType columns and a pandas DataFrame (via pyarrow) is not supported until spark 2.4.
Pandas UDF leveraging PyArrow (>=0.15) causes
java.lang.IllegalArgumentException
in PySpark 2.4 (PySpark 3 has fixed issues completely). Listed below are 3 ways to fix this issue. For more discussions please refer to Apache Arrow in PySpark, PySpark pandas_udfs java.lang.IllegalArgumentException error and pandas udf not working with latest pyarrow release (0.15.0) .- Downgrade PyArrow to 0.14.1 (if you have to stick to PySpark 2.4).
- Set the environment variable
ARROW_PRE_0_15_IPC_FORMAT
to be1
(if you have to stick to PySpark 2.4). You can do this usingspark.yarn.appMasterEnv.ARROW_PRE_0_15_IPC_FORMAT=1
andspark.executorEnv.ARROW_PRE_0_15_IPC_FORMAT=1
. - Use PySpark 3.
If your (pandas) UDF needs a non-Column parameter, there are 3 ways to achieve it.
- Use a global variable in your pandas UDF.
Use a curried function which takes non-Column parameter(s) and return a (pandas) UDF (which then takes Columns as parameters).
def comparator_udf(n): return udf(lambda c: c == n, BooleanType()) df.where(comparator_udf("Bonsanto")(col("name")))
Simplify treat a non-Column parameter as a Column parameter and wrap the parameter into
lit
when invoking the (pandas) UDF.The 1st way is error-prone and inflexible. Both the 2nd and the last approaches are solid, however, the last ways is preferred as it is universal and also more flexible (if you want to use Column parameters to replace non-Column parameters later).
import pandas as pd
import findspark
findspark.init(str(next(Path("/opt/").glob("spark-*"))))
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import *
from pyspark.sql.types import IntegerType, StringType, StructType
spark = SparkSession.builder.appName("PySpark UDF").enableHiveSupport().getOrCreate()
df_p = pd.DataFrame(
data=[
["Ben", 1, 30],
["Dan", 1, 25],
["Will", 2, 26],
["Peter", 2, 37],
],
columns=["name", "gid", "age"],
)
df = spark.createDataFrame(df_p)
df.show()
df.createOrReplaceTempView("table_df")
Pandas UDF¶
- BinaryType has already been supported in versions earlier than Spark 2.4. However, conversion between a Spark DataFrame which contains BinaryType columns and a pandas DataFrame (via pyarrow) is not supported until spark 2.4.
Take One or Multiple Columns and Return One Column¶
An UDF takes in one or multiple pandas Series and returns a pandas Series in this case. The only restriction here is that the UDF must return a pandas Series with the length as the lenghth of input series.
PySpark 2¶
@pandas_udf(returnType="integer", functionType=PandasUDFType.SCALAR)
def age_plus_one(age):
return pd.Series(v + 1 for v in age)
df.withColumn("age1", age_plus_one("age")).show()
@pandas_udf(returnType="string", functionType=PandasUDFType.SCALAR)
def concat(name: pd.Series, age: pd.Series) -> pd.Series:
return name + " is " + age.astype(str) + " age years old."
df.withColumn("desc", concat("name", "age")).show()
PySpark 3¶
@pandas_udf(returnType="integer")
def age_plus_one(age: pd.Series) -> pd.Series:
return pd.Series(v + 1 for v in age)
df.withColumn("age1", age_plus_one("age")).show()
@pandas_udf(returnType="string")
def concat(name: pd.Series, age: pd.Series) -> pd.Series:
return name + " is " + age.astype(str) + " age years old."
df.withColumn("desc", concat("name", "age")).show()
Take a pandas DataFrame and Return a pandas DataFrame¶
PySpark 2¶
# PySpark 2
@pandas_udf(
returnType="name string, gid int, age int, desc string",
functionType=PandasUDFType.GROUPED_MAP,
)
def desc(df):
df["desc"] = df.name + " is " + df.age.astype(str) + " years old."
return df
df.groupBy(spark_partition_id()).apply(desc).show()
In terms of machine learning model prediction, it can be something look like the following. Assuming,
df
is a DataFrame containing columnsname
,gid
,age
and some feature columns.feature
is a list containing names of feature columns used by the modelmodel
is a (XGBoost, LightGBM, PyTorch, etc) model loaded into Python.
@pandas_udf(
returnType="name string, gid int, age int, prob string",
functionType=PandasUDFType.GROUPED_MAP,
)
def predict(df):
df["prob"] = model.predict_proba(df[features])[:, 1]
# keep only needed columns
return df[["name", "gid", "age", "prob"]]
df.groupBy(spark_partition_id()).apply(predict)
Note that it is perfect OK to group by a column of the DataFrame
instead of spark_partition_id()
in the above 2 examples.
Grouping-by in Spark always shuffles data
which means that grouping by spark_partition_id()
doesn't give you any performance benefits.
As a matter of fact,
the above way of doing prediction is discouraged due to data shuffling.
A pandas UDF taking multiple columns and return one column is preferred.
In PySpark 3,
DataFrame.mapInPandas
is preferred.
PySpark 3¶
def desc(itdf):
for df in itdf:
df["desc"] = df["name"] + " is " + df["age"].astype(str) + " years old."
yield df
df.mapInPandas(desc, schema="name string, gid int, age int, desc string").show()
def desc2(df):
df["desc"] = df["name"] + " is " + df["age"].astype(str) + " years old."
return df
df.groupBy(spark_partition_id()).applyInPandas(
desc2, schema="name string, gid int, age int, desc string"
).show()
Even though the above 2 examples yield the same result,
DataFrame.mapInPandas
is preferred
as applyInPandas
requires grouping-by which causes data shuffling.
Aggregation or Windows Function¶
Series to scalar pandas UDFs in PySpark 3+
(corresponding to PandasUDFType.GROUPED_AGG
in PySpark 2)
are similar to Spark aggregate functions.
A Series to scalar pandas UDF defines an aggregation from one or more pandas Series to a scalar value,
where each pandas Series represents a Spark column.
You use a Series to scalar pandas UDF with APIs such as select,
withColumn, groupBy.agg,
and pyspark.sql.Window.
You express the type hint as pandas.Series, ... -> Any
.
The return type should be a primitive data type,
and the returned scalar can be either a Python primitive type,
for example, int or float or a NumPy data type such as numpy.int64 or numpy.float64.
Any should ideally be a specific scalar type.
This type of UDF does not support partial aggregation and all data for each group is loaded into memory.
PySpark 2¶
# PySpark 2
@pandas_udf(returnType="double", functionType=PandasUDFType.GROUPED_AGG)
def avg_age(age):
return age.mean()
df.groupBy("gid").agg(avg_age("age").alias("avg_age")).show()
PySpark 3¶
# PySpark 2
@pandas_udf(returnType="double")
def avg_age(age: pd.Series) -> float:
return age.mean()
df.groupBy("gid").agg(avg_age("age").alias("avg_age")).show()
pyspark.sql.GroupedData.applyInPandas can be be used to achive the same thing.
Generally speaking,
applyInPandas
is more flexible.
Column Type of Struct¶
@pandas_udf(returnType="col1 string, col2 long")
def func(s1: pd.Series, s2: pd.Series, s3: pd.DataFrame) -> pd.DataFrame:
s3["col2"] = s1 + s2.str.len()
return s3
df = spark.createDataFrame(
[[1, "a string", ("a nested string",)]],
"long_col long, string_col string, struct_col struct<col1:string>",
)
df.show()
df.select(func("long_col", "string_col", "struct_col").alias("struct_col2")).show()
Pandas UDFs in Spark SQL¶
Pandas UDFs created using @pandas_udf
can only be used in DataFrame APIs but not in Spark SQL.
To use a Pandas UDF in Spark SQL,
you have to register it using spark.udf.register
.
The same holds for UDFs.
Notice that spark.udf.register
can not only register pandas UDFS and UDFS but also a regular Python function
(in which case you have to specify return types).
spark.sql("select *, age_plus_one(age) as age1 from table_df").show()
spark.udf.register("age_plus_one", age_plus_one)
spark.sql("select *, age_plus_one(age) as age1 from table_df").show()
df.withColumn("age1", age_plus_one("age")).show()
Non-Column Parameters¶
If your (pandas) UDF needs a non-Column parameter,
simplify treat it as a Column parameter
and wrap the parameter into lit
when invoking the (pandas) UDF.
Another possibility is to define a regular Python function with takes non-Column parameters
and return a (pandas) UDF.
The first approach is simpler, universal and also more flexible if later you want to use a Column parameters to replace the non-Column parameters.
@pandas_udf(returnType="integer")
def age_plus_one(age: pd.Series) -> pd.Series:
return age + 1
df.withColumn("age0", age_plus_one(lit(0))).show()
UDF Taking One Column as Parameter¶
@udf(returnType=StringType())
def say_hello(name: str) -> str:
return f"Hello {name}"
df.withColumn("greetings", say_hello(col("name"))).show()
def say_hello_2(name: str) -> str:
return f"Hello {name}"
spark.udf.register("say_hello_udf", say_hello_2, StringType())
spark.sql("select *, say_hello_2(name) as hello from table_df").show()
df.withColumn(say_hello_udf("name")).show()
UDF Taking Two Columns as Parameters¶
@udf(returnType="string")
def concat(name: str, age: int) -> str:
return f"{name} is {age} years old."
df.withColumn("greetings", concat(col("name"), col("age"))).show()
df.withColumn("greetings", concat("name", "age")).show()
References¶
pyspark.sql.functions.pandas_udf
https://docs.databricks.com/spark/latest/spark-sql/udf-python.html
https://changhsinlee.com/pyspark-udf/
https://medium.com/@ayplam/developing-pyspark-udfs-d179db0ccc87
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Column
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions