Tips and Traps¶
The easist way to define a UDF in PySpark is to use the
@udftag, and similarly the easist way to define a Pandas UDF in PySpark is to use the@pandas_udftag. Pandas UDFs are preferred to UDFs for server reasons. First, pandas UDFs are typically much faster than UDFs. Second, pandas UDFs are more flexible than UDFs on parameter passing. Both UDFs and pandas UDFs can take multiple columns as parameters. In addition, pandas UDFs can take a DataFrame as parameter (when passed to theapplyfunction aftergroupByis called).You need to specify a value for the parameter
returnType(the type of elements in the PySpark DataFrame Column) when creating a (pandas) UDF. Both type objects (e.g.,StringType()) and names of types (e.g.,"string") are accepted. Specifying names of types is simpler (as you do not have to import the corresponding types and names are short to type) but at the cost of losing the ability to do static type checking (e.g., using pylint) on the used return types.When invoking a (pandas) UDF, you can either pass column expressions (e.g.,
col("name")) or names of columns (e.g.,"name") to it. It is suggested that you always use the explicit way (col("name")) as it avoids confusions in certain situations.UDFs created using the tags
@udfand@pandas_udfcan only be used in DataFrame APIs but not in Spark SQL. To use a UDF or Pandas UDF in Spark SQL, you have to register it usingspark.udf.register. Notice thatspark.udf.registercan not only register UDFs and pandas UDFS but also a regular Python function (in which case you have to specify return types).BinaryType has already been supported in versions earlier than Spark 2.4. However, conversion between a Spark DataFrame which contains BinaryType columns and a pandas DataFrame (via pyarrow) is not supported until spark 2.4.
Pandas UDF leveraging PyArrow (>=0.15) causes
java.lang.IllegalArgumentExceptionin PySpark 2.4 (PySpark 3 has fixed issues completely). Listed below are 3 ways to fix this issue. For more discussions please refer to Apache Arrow in PySpark, PySpark pandas_udfs java.lang.IllegalArgumentException error and pandas udf not working with latest pyarrow release (0.15.0) .Downgrade PyArrow to 0.14.1 (if you have to stick to PySpark 2.4).
Set the environment variable
ARROW_PRE_0_15_IPC_FORMATto be1(if you have to stick to PySpark 2.4). You can do this usingspark.yarn.appMasterEnv.ARROW_PRE_0_15_IPC_FORMAT=1andspark.executorEnv.ARROW_PRE_0_15_IPC_FORMAT=1.Use PySpark 3.
If your (pandas) UDF needs a non-Column parameter, there are 3 ways to achieve it.
Use a global variable in your pandas UDF.
Use a curried function which takes non-Column parameter(s) and return a (pandas) UDF (which then takes Columns as parameters).
def comparator_udf(n): return udf(lambda c: c == n, BooleanType()) df.where(comparator_udf("Bonsanto")(col("name")))Simplify treat a non-Column parameter as a Column parameter and wrap the parameter into
litwhen invoking the (pandas) UDF.
The 1st way is error-prone and inflexible. Both the 2nd and the last approaches are solid, however, the last ways is preferred as it is universal and also more flexible (if you want to use Column parameters to replace non-Column parameters later).
import pandas as pd
import findspark
findspark.init(str(next(Path("/opt/").glob("spark-*"))))
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import *
from pyspark.sql.types import IntegerType, StringType, StructType
spark = SparkSession.builder.appName("PySpark UDF").enableHiveSupport().getOrCreate()Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/12/10 19:55:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
df_p = pd.DataFrame(
data=[
["Ben", 1, 30],
["Dan", 1, 25],
["Will", 2, 26],
["Peter", 2, 37],
],
columns=["name", "gid", "age"],
)
df = spark.createDataFrame(df_p)
df.show()+-----+---+---+
| name|gid|age|
+-----+---+---+
| Ben| 1| 30|
| Dan| 1| 25|
| Will| 2| 26|
|Peter| 2| 37|
+-----+---+---+
df.createOrReplaceTempView("table_df")Pandas UDF¶
BinaryType has already been supported in versions earlier than Spark 2.4. However, conversion between a Spark DataFrame which contains BinaryType columns and a pandas DataFrame (via pyarrow) is not supported until spark 2.4.
Take One or Multiple Columns and Return One Column¶
An UDF takes in one or multiple pandas Series and returns a pandas Series in this case. The only restriction here is that the UDF must return a pandas Series with the length as the lenghth of input series.
PySpark 2¶
@pandas_udf(returnType="integer", functionType=PandasUDFType.SCALAR)
def age_plus_one(age):
return pd.Series(v + 1 for v in age)df.withColumn("age1", age_plus_one("age")).show()+-----+---+---+----+
| name|gid|age|age1|
+-----+---+---+----+
| Ben| 1| 30| 31|
| Dan| 1| 25| 26|
| Will| 2| 26| 27|
|Peter| 2| 37| 38|
+-----+---+---+----+
@pandas_udf(returnType="string", functionType=PandasUDFType.SCALAR)
def concat(name: pd.Series, age: pd.Series) -> pd.Series:
return name + " is " + age.astype(str) + " age years old."df.withColumn("desc", concat("name", "age")).show()+-----+---+---+--------------------+
| name|gid|age| desc|
+-----+---+---+--------------------+
| Ben| 1| 30|Ben is 30 age yea...|
| Dan| 1| 25|Dan is 25 age yea...|
| Will| 2| 26|Will is 26 age ye...|
|Peter| 2| 37|Peter is 37 age y...|
+-----+---+---+--------------------+
PySpark 3¶
@pandas_udf(returnType="integer")
def age_plus_one(age: pd.Series) -> pd.Series:
return pd.Series(v + 1 for v in age)df.withColumn("age1", age_plus_one("age")).show()+-----+---+---+----+
| name|gid|age|age1|
+-----+---+---+----+
| Ben| 1| 30| 31|
| Dan| 1| 25| 26|
| Will| 2| 26| 27|
|Peter| 2| 37| 38|
+-----+---+---+----+
@pandas_udf(returnType="string")
def concat(name: pd.Series, age: pd.Series) -> pd.Series:
return name + " is " + age.astype(str) + " age years old."df.withColumn("desc", concat("name", "age")).show()+-----+---+---+--------------------+
| name|gid|age| desc|
+-----+---+---+--------------------+
| Ben| 1| 30|Ben is 30 age yea...|
| Dan| 1| 25|Dan is 25 age yea...|
| Will| 2| 26|Will is 26 age ye...|
|Peter| 2| 37|Peter is 37 age y...|
+-----+---+---+--------------------+
Take a pandas DataFrame and Return a pandas DataFrame¶
PySpark 2¶
# PySpark 2
@pandas_udf(
returnType="name string, gid int, age int, desc string",
functionType=PandasUDFType.GROUPED_MAP,
)
def desc(df):
df["desc"] = df.name + " is " + df.age.astype(str) + " years old."
return dfdf.groupBy(spark_partition_id()).apply(desc).show()+-----+---+---+--------------------+
| name|gid|age| desc|
+-----+---+---+--------------------+
| Ben| 1| 30|Ben is 30 years old.|
| Dan| 1| 25|Dan is 25 years old.|
| Will| 2| 26|Will is 26 years ...|
|Peter| 2| 37|Peter is 37 years...|
+-----+---+---+--------------------+
In terms of machine learning model prediction, it can be something look like the following. Assuming,
dfis a DataFrame containing columnsname,gid,ageand some feature columns.featureis a list containing names of feature columns used by the modelmodelis a (XGBoost, LightGBM, PyTorch, etc) model loaded into Python.
@pandas_udf(
returnType="name string, gid int, age int, prob string",
functionType=PandasUDFType.GROUPED_MAP,
)
def predict(df):
df["prob"] = model.predict_proba(df[features])[:, 1]
# keep only needed columns
return df[["name", "gid", "age", "prob"]]df.groupBy(spark_partition_id()).apply(predict)Note that it is perfect OK to group by a column of the DataFrame
instead of spark_partition_id() in the above 2 examples.
Grouping-by in Spark always shuffles data
which means that grouping by spark_partition_id()
doesn’t give you any performance benefits.
As a matter of fact,
the above way of doing prediction is discouraged due to data shuffling.
A pandas UDF taking multiple columns and return one column is preferred.
In PySpark 3,
DataFrame.mapInPandas is preferred.
PySpark 3¶
def desc(itdf):
for df in itdf:
df["desc"] = df["name"] + " is " + df["age"].astype(str) + " years old."
yield dfdf.mapInPandas(desc, schema="name string, gid int, age int, desc string").show()+-----+---+---+--------------------+
| name|gid|age| desc|
+-----+---+---+--------------------+
| Ben| 1| 30|Ben is 30 years old.|
| Dan| 1| 25|Dan is 25 years old.|
| Will| 2| 26|Will is 26 years ...|
|Peter| 2| 37|Peter is 37 years...|
+-----+---+---+--------------------+
def desc2(df):
df["desc"] = df["name"] + " is " + df["age"].astype(str) + " years old."
return dfdf.groupBy(spark_partition_id()).applyInPandas(
desc2, schema="name string, gid int, age int, desc string"
).show()+-----+---+---+--------------------+
| name|gid|age| desc|
+-----+---+---+--------------------+
| Ben| 1| 30|Ben is 30 years old.|
| Dan| 1| 25|Dan is 25 years old.|
| Will| 2| 26|Will is 26 years ...|
|Peter| 2| 37|Peter is 37 years...|
+-----+---+---+--------------------+
Even though the above 2 examples yield the same result,
DataFrame.mapInPandas is preferred
as applyInPandas requires grouping-by which causes data shuffling.
Aggregation or Windows Function¶
Series to scalar pandas UDFs in PySpark 3+
(corresponding to PandasUDFType.GROUPED_AGG in PySpark 2)
are similar to Spark aggregate functions.
A Series to scalar pandas UDF defines an aggregation from one or more pandas Series to a scalar value,
where each pandas Series represents a Spark column.
You use a Series to scalar pandas UDF with APIs such as select,
withColumn, groupBy.agg,
and pyspark.sql.Window.
You express the type hint as pandas.Series, ... -> Any.
The return type should be a primitive data type,
and the returned scalar can be either a Python primitive type,
for example, int or float or a NumPy data type such as numpy.int64 or numpy.float64.
Any should ideally be a specific scalar type.
This type of UDF does not support partial aggregation and all data for each group is loaded into memory.
# PySpark 2
@pandas_udf(returnType="double", functionType=PandasUDFType.GROUPED_AGG)
def avg_age(age):
return age.mean()df.groupBy("gid").agg(avg_age("age").alias("avg_age")).show()+---+-------+
|gid|avg_age|
+---+-------+
| 1| 27.5|
| 2| 31.5|
+---+-------+
PySpark 3¶
# PySpark 2
@pandas_udf(returnType="double")
def avg_age(age: pd.Series) -> float:
return age.mean()df.groupBy("gid").agg(avg_age("age").alias("avg_age")).show()+---+-------+
|gid|avg_age|
+---+-------+
| 1| 27.5|
| 2| 31.5|
+---+-------+
pysparkapplyInPandas is more flexible.
Column Type of Struct¶
@pandas_udf(returnType="col1 string, col2 long")
def func(s1: pd.Series, s2: pd.Series, s3: pd.DataFrame) -> pd.DataFrame:
s3["col2"] = s1 + s2.str.len()
return s3df = spark.createDataFrame(
[[1, "a string", ("a nested string",)]],
"long_col long, string_col string, struct_col struct<col1:string>",
)
df.show()+--------+----------+-----------------+
|long_col|string_col| struct_col|
+--------+----------+-----------------+
| 1| a string|{a nested string}|
+--------+----------+-----------------+
df.select(func("long_col", "string_col", "struct_col").alias("struct_col2")).show()+--------------------+
| struct_col2|
+--------------------+
|{a nested string, 9}|
+--------------------+
Pandas UDFs in Spark SQL¶
Pandas UDFs created using @pandas_udf can only be used in DataFrame APIs but not in Spark SQL.
To use a Pandas UDF in Spark SQL,
you have to register it using spark.udf.register.
The same holds for UDFs.
Notice that spark.udf.register can not only register pandas UDFS and UDFS but also a regular Python function
(in which case you have to specify return types).
spark.sql("select *, age_plus_one(age) as age1 from table_df").show()---------------------------------------------------------------------------
AnalysisException Traceback (most recent call last)
<ipython-input-13-501267730fde> in <module>
----> 1 spark.sql("select *, age_plus_one(age) as age1 from table_df").show()
/opt/spark-3.1.1-bin-hadoop3.2/python/pyspark/sql/session.py in sql(self, sqlQuery)
721 [Row(f1=1, f2='row1'), Row(f1=2, f2='row2'), Row(f1=3, f2='row3')]
722 """
--> 723 return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
724
725 def table(self, tableName):
/opt/spark-3.1.1-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
1302
1303 answer = self.gateway_client.send_command(command)
-> 1304 return_value = get_return_value(
1305 answer, self.gateway_client, self.target_id, self.name)
1306
/opt/spark-3.1.1-bin-hadoop3.2/python/pyspark/sql/utils.py in deco(*a, **kw)
115 # Hide where the exception came from that shows a non-Pythonic
116 # JVM exception message.
--> 117 raise converted from None
118 else:
119 raise
AnalysisException: Undefined function: 'age_plus_one'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 10spark.udf.register("age_plus_one", age_plus_one)<function __main__.age_plus_one(age: pandas.core.series.Series) -> pandas.core.series.Series>spark.sql("select *, age_plus_one(age) as age1 from table_df").show()+----+---+---+----+
|name| id|age|age1|
+----+---+---+----+
| Ben| 2| 30| 31|
| Dan| 4| 25| 26|
|Will| 1| 26| 27|
+----+---+---+----+
df.withColumn("age1", age_plus_one("age")).show()+----+---+---+----+
|name| id|age|age1|
+----+---+---+----+
| Ben| 2| 30| 31|
| Dan| 4| 25| 26|
|Will| 1| 26| 27|
+----+---+---+----+
Non-Column Parameters¶
If your (pandas) UDF needs a non-Column parameter,
simplify treat it as a Column parameter
and wrap the parameter into lit when invoking the (pandas) UDF.
Another possibility is to define a regular Python function with takes non-Column parameters
and return a (pandas) UDF.
The first approach is simpler, universal and also more flexible if later you want to use a Column parameters to replace the non-Column parameters.
@pandas_udf(returnType="integer")
def age_plus_one(age: pd.Series) -> pd.Series:
return age + 1df.withColumn("age0", age_plus_one(lit(0))).show()+----+---+---+----+
|name| id|age|age0|
+----+---+---+----+
| Ben| 2| 30| 1|
| Dan| 4| 25| 1|
|Will| 1| 26| 1|
+----+---+---+----+
UDF Taking One Column as Parameter¶
@udf(returnType=StringType())
def say_hello(name: str) -> str:
return f"Hello {name}"df.withColumn("greetings", say_hello(col("name"))).show()+----+---+---+----------+
|name| id|age| greetings|
+----+---+---+----------+
| Ben| 2| 30| Hello Ben|
| Dan| 4| 25| Hello Dan|
|Will| 1| 26|Hello Will|
+----+---+---+----------+
def say_hello_2(name: str) -> str:
return f"Hello {name}"spark.udf.register("say_hello_udf", say_hello_2, StringType())<function __main__.say_hello_2(name: str) -> str>spark.sql("select *, say_hello_2(name) as hello from table_df").show()+----+---+---+----------+
|name| id|age| hello|
+----+---+---+----------+
| Ben| 2| 30| Hello Ben|
| Dan| 4| 25| Hello Dan|
|Will| 1| 26|Hello Will|
+----+---+---+----------+
df.withColumn(say_hello_udf("name")).show()---------------------------------------------------------------------------
NameError Traceback (most recent call last)
<ipython-input-27-83cdbd56ac4a> in <module>
----> 1 df.withColumn(say_hello_udf("name")).show()
NameError: name 'say_hello_udf' is not definedUDF Taking Two Columns as Parameters¶
@udf(returnType="string")
def concat(name: str, age: int) -> str:
return f"{name} is {age} years old."df.withColumn("greetings", concat(col("name"), col("age"))).show()+----+---+---+--------------------+
|name| id|age| greetings|
+----+---+---+--------------------+
| Ben| 2| 30|Ben is 30 years old.|
| Dan| 4| 25|Dan is 25 years old.|
|Will| 1| 26|Will is 26 years ...|
+----+---+---+--------------------+
df.withColumn("greetings", concat("name", "age")).show()+----+---+---+--------------------+
|name| id|age| greetings|
+----+---+---+--------------------+
| Ben| 2| 30|Ben is 30 years old.|
| Dan| 4| 25|Dan is 25 years old.|
|Will| 1| 26|Will is 26 years ...|
+----+---+---+--------------------+
References¶
pyspark
https://
https://
https://medium.com/ayplam/developing-pyspark-udfs-d179db0ccc87
https://
https://
https://
https://