Things on this page are fragmentary and immature notes/thoughts of the author. Please read with your own judgement!
Tips and Traps¶
If you use PySpark instead of Spark/Scala, pandas udf is a great alternative to all those (complicated) collections functions discussed here. Leveraging pandas udf, each partition of a Spark DataFrame can be converted to a pandas DataFrame without copying the underlying data, you can then do transforms on pandas DataFrames which will be converted back to partitons of a Spark DataFrame.
When converting a pandas DataFrame to a Spark DataFrame,
a column of
listis converted to a column ofArrayTypea column of
tupleis converted to a column ofStructTypea column of
dictis converted to a column ofMapType
Q: how about dict?
Comemnts¶
There are multiple ways (vanilla string, JSON string, StructType and ArrayType) to represent complex data types in Spark DataFrames. Notice that a Tuple is converted to a StructType in Spark DataFrames and an Array is converted to a ArrayType in Spark DataFrames. Starting from Spark 2.4, you can use ArrayType which is more convenient if the elements have the same type.
Vanilla String¶
string, substring, regexp_extract, locate, left, concat_ws
JSON String¶
json_tuple
get_json_object
from_json
StructType¶
ArrayType¶
array
element_at
array_min, array_max, array_join, array_interesect, array_except, array_distinct, array_contains, array, array_position, array_remove, array_repeat, array_sort, array_union, array_overlap, array_zip
from typing import List, Tuple
import pandas as pdfrom pathlib import Path
import findspark
findspark.init(str(next(Path("/opt").glob("spark-3*"))))
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import *
from pyspark.sql.types import (
IntegerType,
StringType,
StructType,
StructField,
ArrayType,
)
spark = (
SparkSession.builder.appName("PySpark_Collection_Func")
.enableHiveSupport()
.getOrCreate()
)Python Types to DataType in PySpark¶
A column of list is converted to a Column of ArrayType.
df = spark.createDataFrame(
pd.DataFrame(
data=[([1, 2], "how"), ([2, 3], "are"), ([3, 4], "you")],
columns=("col1", "col2"),
)
)
df.show()+------+----+
| col1|col2|
+------+----+
|[1, 2]| how|
|[2, 3]| are|
|[3, 4]| you|
+------+----+
df.schemaStructType(List(StructField(col1,ArrayType(LongType,true),true),StructField(col2,StringType,true)))A column of tuple is converted to a Column of StructType.
df = spark.createDataFrame(
pd.DataFrame(
data=[((1, 2), "how"), ((2, 3), "are"), ((3, 4), "you")],
columns=("col1", "col2"),
)
)
df.show()+------+----+
| col1|col2|
+------+----+
|{1, 2}| how|
|{2, 3}| are|
|{3, 4}| you|
+------+----+
df.schemaStructType(List(StructField(col1,StructType(List(StructField(_1,LongType,true),StructField(_2,LongType,true))),true),StructField(col2,StringType,true)))A column of dict is converted to a column of MapType.
df = spark.createDataFrame(
pd.DataFrame(
data=[
({"x": 1, "y": 2}, "how"),
({"x": 2, "y": 3}, "are"),
({"x": 3, "y": 4}, "you"),
],
columns=("col1", "col2"),
)
)
df.show()+----------------+----+
| col1|col2|
+----------------+----+
|{x -> 1, y -> 2}| how|
|{x -> 2, y -> 3}| are|
|{x -> 3, y -> 4}| you|
+----------------+----+
df.schemaStructType(List(StructField(col1,MapType(StringType,LongType,true),true),StructField(col2,StringType,true)))df = spark.createDataFrame(
pd.DataFrame(
data=[(1, 2, "how"), (2, 3, "are"), (3, 4, "you")],
columns=("col1", "col2", "col3"),
)
)
df.show()+----+----+----+
|col1|col2|col3|
+----+----+----+
| 1| 2| how|
| 2| 3| are|
| 3| 4| you|
+----+----+----+
df.select(create_map("col3", "col1").alias("map")).show()+----------+
| map|
+----------+
|{how -> 1}|
|{are -> 2}|
|{you -> 3}|
+----------+
df.select(create_map([df.col3, df.col1]).alias("map")).show()+----------+
| map|
+----------+
|{how -> 1}|
|{are -> 2}|
|{you -> 3}|
+----------+
explode¶
spark.sql(
"""
select
split("how are you", " ") as words
"""
).show()+---------------+
| words|
+---------------+
|[how, are, you]|
+---------------+
spark.sql(
"""
select
explode(split("how are you", " ")) as words
"""
).show()+-----+
|words|
+-----+
| how|
| are|
| you|
+-----+
split¶
collect¶
df = spark.createDataFrame(
pd.DataFrame(
data=[(1, 2, "how"), (2, 3, "are"), (3, 4, "you")],
columns=("col1", "col2", "col3"),
)
)
df.show()+----+----+----+
|col1|col2|col3|
+----+----+----+
| 1| 2| how|
| 2| 3| are|
| 3| 4| you|
+----+----+----+
df.select(struct("col1", "col2").alias("struct")).show()+------+
|struct|
+------+
|{1, 2}|
|{2, 3}|
|{3, 4}|
+------+
df.select(struct([df.col1, df.col2]).alias("struct")).show()+------+
|struct|
+------+
|{1, 2}|
|{2, 3}|
|{3, 4}|
+------+
df.select(struct("col1", "col2").alias("struct")).schemaStructType(List(StructField(struct,StructType(List(StructField(col1,LongType,true),StructField(col2,LongType,true))),false)))Work with StructType¶
Notice that a Tuple is converted to StructType in Spark DataFrames.
df = spark.createDataFrame(
pd.DataFrame(
data=[((1, 2), "how"), ((2, 3), "are"), ((3, 4), "you")],
columns=("col1", "col2"),
)
)
df.show()+------+----+
| col1|col2|
+------+----+
|{1, 2}| how|
|{2, 3}| are|
|{3, 4}| you|
+------+----+
Split all elements of a StructType into different columns.
df.select("col1.*").show+---+---+
| _1| _2|
+---+---+
| 1| 2|
| 2| 3|
| 3| 4|
+---+---+
Extract elements from StructTypes by position and rename the columns.
df.select(
$"col1._1".alias("v1"),
$"col1._2".alias("v2")
).show+---+---+
| v1| v2|
+---+---+
| 1| 2|
| 2| 3|
| 3| 4|
+---+---+
Work with ArrayType¶
Notice that an Array is converted to an ArrayType in Spark DataFrames. Note: ArrayType requires Spark 2.4.0+.
val df = Seq(
(Array(1, 2), "how"),
(Array(2, 3), "are"),
(Array(3, 4), "you")
).toDF("col1", "col2")
df.show+------+----+
| col1|col2|
+------+----+
|[1, 2]| how|
|[2, 3]| are|
|[3, 4]| you|
+------+----+
nulldf.select(
element_at($"col1", 1).alias("v1"),
element_at($"col1", 2).alias("v2")
).show+---+---+
| v1| v2|
+---+---+
| 1| 2|
| 2| 3|
| 3| 4|
+---+---+
ArrayType¶
df = spark.createDataFrame(
pd.DataFrame(
data=[([1, 2], "how", 1), ([2, 3], "are", 2), ([3, 4], "you", 3)],
columns=["col1", "col2", "col3"],
)
)
df.show()+------+----+----+
| col1|col2|col3|
+------+----+----+
|[1, 2]| how| 1|
|[2, 3]| are| 2|
|[3, 4]| you| 3|
+------+----+----+
df.select(element_at(col("col1"), 1).alias("word")).show()+----+
|word|
+----+
| 1|
| 2|
| 3|
+----+
df.select(element_at("col1", 1).alias("word")).show()+----+
|word|
+----+
| 1|
| 2|
| 3|
+----+
@udf(ArrayType(IntegerType()))
def my_udf(x: int) -> List:
return [x, 1]df1 = df.select(my_udf("col3").alias("f1"))
df1.show()+------+
| f1|
+------+
|[1, 1]|
|[2, 1]|
|[3, 1]|
+------+
df1.schemaStructType(List(StructField(f1,ArrayType(IntegerType,true),true)))df1.select(element_at("f1", 1).alias("v1"), element_at("f1", 2).alias("v2")).show()+---+---+
| v1| v2|
+---+---+
| 1| 1|
| 2| 1|
| 3| 1|
+---+---+
StructType¶
df2 = spark.createDataFrame(
pd.DataFrame(
data=[((1, 2), "how", 1), ((2, 3), "are", 2), ((3, 4), "you", 3)],
columns=["col1", "col2", "col3"],
)
)
df2.show()+------+----+----+
| col1|col2|col3|
+------+----+----+
|[1, 2]| how| 1|
|[2, 3]| are| 2|
|[3, 4]| you| 3|
+------+----+----+
df2.schemaStructType(List(StructField(col1,StructType(List(StructField(_1,LongType,true),StructField(_2,LongType,true))),true),StructField(col2,StringType,true),StructField(col3,LongType,true)))df2.select("col1.*").show()+---+---+
| _1| _2|
+---+---+
| 1| 2|
| 2| 3|
| 3| 4|
+---+---+
@udf(
StructType(
[
StructField("_1", IntegerType(), nullable=True),
StructField("_2", IntegerType(), nullable=True),
]
)
)
def my_udf2(x: int) -> Tuple:
return (x, 1)df3 = df.select(my_udf2("col3").alias("f1"))
df3.show()+------+
| f1|
+------+
|[1, 1]|
|[2, 1]|
|[3, 1]|
+------+
df3.schemaStructType(List(StructField(f1,StructType(List(StructField(_1,IntegerType,true),StructField(_2,IntegerType,true))),true)))df3.select("f1.*").show()+---+---+
| _1| _2|
+---+---+
| 1| 1|
| 2| 1|
| 3| 1|
+---+---+
df3.select(col("f1._1").alias("v1"), col("f1._2").alias("v2")).show()+---+---+
| v1| v2|
+---+---+
| 1| 1|
| 2| 1|
| 3| 1|
+---+---+
https://
https://
https://
https://
References¶
References¶
https://
https://
https://
https://
https://
https://
https://