Ben Chuanlong Du's Blog

It is never too late to learn.

Collection Functions in Spark

Tips and Traps

  1. If you use PySpark instead of Spark/Scala, pandas udf is a great alternative to all those (complicated) collections functions discussed here. Leveraging pandas udf, each partition of a Spark DataFrame can be converted to a pandas DataFrame without copying the underlying data, you can then do transforms on pandas DataFrames which will be converted back to partitons of a Spark DataFrame.

  2. When converting a pandas DataFrame to a Spark DataFrame,

    • a column of list is converted to a column of ArrayType
    • a column of tuple is converted to a column of StructType
    • a column of dict is converted to a column of MapType

Q: how about dict?

Comemnts

There are multiple ways (vanilla string, JSON string, StructType and ArrayType) to represent complex data types in Spark DataFrames. Notice that a Tuple is converted to a StructType in Spark DataFrames and an Array is converted to a ArrayType in Spark DataFrames. Starting from Spark 2.4, you can use ArrayType which is more convenient if the elements have the same type.

Vanilla String

  • string, substring, regexp_extract, locate, left, concat_ws

JSON String

  • json_tuple
  • get_json_object
  • from_json

StructType

ArrayType

  • array
  • element_at
  • array_min, array_max, array_join, array_interesect, array_except, array_distinct, array_contains, array, array_position, array_remove, array_repeat, array_sort, array_union, array_overlap, array_zip
In [3]:
from typing import List, Tuple
import pandas as pd
In [5]:
from pathlib import Path
import findspark

findspark.init(str(next(Path("/opt").glob("spark-3*"))))

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import *
from pyspark.sql.types import (
    IntegerType,
    StringType,
    StructType,
    StructField,
    ArrayType,
)

spark = (
    SparkSession.builder.appName("PySpark_Collection_Func")
    .enableHiveSupport()
    .getOrCreate()
)

Python Types to DataType in PySpark

A column of list is converted to a Column of ArrayType.

In [15]:
df = spark.createDataFrame(
    pd.DataFrame(
        data=[([1, 2], "how"), ([2, 3], "are"), ([3, 4], "you")],
        columns=("col1", "col2"),
    )
)
df.show()
+------+----+
|  col1|col2|
+------+----+
|[1, 2]| how|
|[2, 3]| are|
|[3, 4]| you|
+------+----+

In [16]:
df.schema
Out[16]:
StructType(List(StructField(col1,ArrayType(LongType,true),true),StructField(col2,StringType,true)))

A column of tuple is converted to a Column of StructType.

In [ ]:
df = spark.createDataFrame(
    pd.DataFrame(
        data=[((1, 2), "how"), ((2, 3), "are"), ((3, 4), "you")],
        columns=("col1", "col2"),
    )
)
df.show()
+------+----+
|  col1|col2|
+------+----+
|{1, 2}| how|
|{2, 3}| are|
|{3, 4}| you|
+------+----+

In [ ]:
df.schema
Out[ ]:
StructType(List(StructField(col1,StructType(List(StructField(_1,LongType,true),StructField(_2,LongType,true))),true),StructField(col2,StringType,true)))

A column of dict is converted to a column of MapType.

In [13]:
df = spark.createDataFrame(
    pd.DataFrame(
        data=[
            ({"x": 1, "y": 2}, "how"),
            ({"x": 2, "y": 3}, "are"),
            ({"x": 3, "y": 4}, "you"),
        ],
        columns=("col1", "col2"),
    )
)
df.show()
+----------------+----+
|            col1|col2|
+----------------+----+
|{x -> 1, y -> 2}| how|
|{x -> 2, y -> 3}| are|
|{x -> 3, y -> 4}| you|
+----------------+----+

In [14]:
df.schema
Out[14]:
StructType(List(StructField(col1,MapType(StringType,LongType,true),true),StructField(col2,StringType,true)))
In [24]:
df = spark.createDataFrame(
    pd.DataFrame(
        data=[(1, 2, "how"), (2, 3, "are"), (3, 4, "you")],
        columns=("col1", "col2", "col3"),
    )
)
df.show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   2| how|
|   2|   3| are|
|   3|   4| you|
+----+----+----+

In [25]:
df.select(create_map("col3", "col1").alias("map")).show()
+----------+
|       map|
+----------+
|{how -> 1}|
|{are -> 2}|
|{you -> 3}|
+----------+

In [27]:
df.select(create_map([df.col3, df.col1]).alias("map")).show()
+----------+
|       map|
+----------+
|{how -> 1}|
|{are -> 2}|
|{you -> 3}|
+----------+

explode

In [4]:
spark.sql(
    """
    select
        split("how are you", " ") as words
    """
).show()
+---------------+
|          words|
+---------------+
|[how, are, you]|
+---------------+

In [3]:
spark.sql(
    """
    select
        explode(split("how are you", " ")) as words
    """
).show()
+-----+
|words|
+-----+
|  how|
|  are|
|  you|
+-----+

split

collect

In [17]:
df = spark.createDataFrame(
    pd.DataFrame(
        data=[(1, 2, "how"), (2, 3, "are"), (3, 4, "you")],
        columns=("col1", "col2", "col3"),
    )
)
df.show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   2| how|
|   2|   3| are|
|   3|   4| you|
+----+----+----+

In [20]:
df.select(struct("col1", "col2").alias("struct")).show()
+------+
|struct|
+------+
|{1, 2}|
|{2, 3}|
|{3, 4}|
+------+

In [23]:
df.select(struct([df.col1, df.col2]).alias("struct")).show()
+------+
|struct|
+------+
|{1, 2}|
|{2, 3}|
|{3, 4}|
+------+

In [21]:
df.select(struct("col1", "col2").alias("struct")).schema
Out[21]:
StructType(List(StructField(struct,StructType(List(StructField(col1,LongType,true),StructField(col2,LongType,true))),false)))

Work with StructType

Notice that a Tuple is converted to StructType in Spark DataFrames.

In [ ]:
df = spark.createDataFrame(
    pd.DataFrame(
        data=[((1, 2), "how"), ((2, 3), "are"), ((3, 4), "you")],
        columns=("col1", "col2"),
    )
)
df.show()
+------+----+
|  col1|col2|
+------+----+
|{1, 2}| how|
|{2, 3}| are|
|{3, 4}| you|
+------+----+

Split all elements of a StructType into different columns.

In [15]:
df.select("col1.*").show
+---+---+
| _1| _2|
+---+---+
|  1|  2|
|  2|  3|
|  3|  4|
+---+---+

Extract elements from StructTypes by position and rename the columns.

In [16]:
df.select(
    $"col1._1".alias("v1"),
    $"col1._2".alias("v2")
).show
+---+---+
| v1| v2|
+---+---+
|  1|  2|
|  2|  3|
|  3|  4|
+---+---+

Work with ArrayType

Notice that an Array is converted to an ArrayType in Spark DataFrames. Note: ArrayType requires Spark 2.4.0+.

In [17]:
val df = Seq(
    (Array(1, 2), "how"),
    (Array(2, 3), "are"),
    (Array(3, 4), "you")
).toDF("col1", "col2")
df.show
+------+----+
|  col1|col2|
+------+----+
|[1, 2]| how|
|[2, 3]| are|
|[3, 4]| you|
+------+----+

Out[17]:
null
In [22]:
df.select(
    element_at($"col1", 1).alias("v1"),
    element_at($"col1", 2).alias("v2")
).show
+---+---+
| v1| v2|
+---+---+
|  1|  2|
|  2|  3|
|  3|  4|
+---+---+

ArrayType

In [3]:
df = spark.createDataFrame(
    pd.DataFrame(
        data=[([1, 2], "how", 1), ([2, 3], "are", 2), ([3, 4], "you", 3)],
        columns=["col1", "col2", "col3"],
    )
)
df.show()
+------+----+----+
|  col1|col2|col3|
+------+----+----+
|[1, 2]| how|   1|
|[2, 3]| are|   2|
|[3, 4]| you|   3|
+------+----+----+

In [4]:
df.select(element_at(col("col1"), 1).alias("word")).show()
+----+
|word|
+----+
|   1|
|   2|
|   3|
+----+

In [5]:
df.select(element_at("col1", 1).alias("word")).show()
+----+
|word|
+----+
|   1|
|   2|
|   3|
+----+

In [10]:
@udf(ArrayType(IntegerType()))
def my_udf(x: int) -> List:
    return [x, 1]
In [11]:
df1 = df.select(my_udf("col3").alias("f1"))
df1.show()
+------+
|    f1|
+------+
|[1, 1]|
|[2, 1]|
|[3, 1]|
+------+

In [12]:
df1.schema
Out[12]:
StructType(List(StructField(f1,ArrayType(IntegerType,true),true)))
In [13]:
df1.select(element_at("f1", 1).alias("v1"), element_at("f1", 2).alias("v2")).show()
+---+---+
| v1| v2|
+---+---+
|  1|  1|
|  2|  1|
|  3|  1|
+---+---+

StructType

In [41]:
df2 = spark.createDataFrame(
    pd.DataFrame(
        data=[((1, 2), "how", 1), ((2, 3), "are", 2), ((3, 4), "you", 3)],
        columns=["col1", "col2", "col3"],
    )
)
df2.show()
+------+----+----+
|  col1|col2|col3|
+------+----+----+
|[1, 2]| how|   1|
|[2, 3]| are|   2|
|[3, 4]| you|   3|
+------+----+----+

In [42]:
df2.schema
Out[42]:
StructType(List(StructField(col1,StructType(List(StructField(_1,LongType,true),StructField(_2,LongType,true))),true),StructField(col2,StringType,true),StructField(col3,LongType,true)))
In [43]:
df2.select("col1.*").show()
+---+---+
| _1| _2|
+---+---+
|  1|  2|
|  2|  3|
|  3|  4|
+---+---+

In [44]:
@udf(
    StructType(
        [
            StructField("_1", IntegerType(), nullable=True),
            StructField("_2", IntegerType(), nullable=True),
        ]
    )
)
def my_udf2(x: int) -> Tuple:
    return (x, 1)
In [45]:
df3 = df.select(my_udf2("col3").alias("f1"))
df3.show()
+------+
|    f1|
+------+
|[1, 1]|
|[2, 1]|
|[3, 1]|
+------+

In [46]:
df3.schema
Out[46]:
StructType(List(StructField(f1,StructType(List(StructField(_1,IntegerType,true),StructField(_2,IntegerType,true))),true)))
In [47]:
df3.select("f1.*").show()
+---+---+
| _1| _2|
+---+---+
|  1|  1|
|  2|  1|
|  3|  1|
+---+---+

In [48]:
df3.select(col("f1._1").alias("v1"), col("f1._2").alias("v2")).show()
+---+---+
| v1| v2|
+---+---+
|  1|  1|
|  2|  1|
|  3|  1|
+---+---+

In [ ]:
 

Comments