Skip to article frontmatterSkip to article content
Site not loading correctly?

This may be due to an incorrect BASE_URL configuration. See the MyST Documentation for reference.

Column Functions and Operators in Spark

from typing import List, Tuple
import pandas as pd
from pathlib import Path
import findspark

findspark.init(str(next(Path("/opt").glob("spark-3*"))))
# findspark.init("/opt/spark-2.3.0-bin-hadoop2.7")

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import *
from pyspark.sql.types import (
    IntegerType,
    StringType,
    StructType,
    StructField,
    ArrayType,
)

spark = (
    SparkSession.builder.appName("PySpark_Str_Func").enableHiveSupport().getOrCreate()
)
df = spark.createDataFrame(
    pd.DataFrame(
        data=[([1, 2], "how", 1), ([2, 3], "are", 2), ([3, 4], "you", 3)],
        columns=["col1", "col2", "col3"],
    )
)
df.show()
+------+----+----+
|  col1|col2|col3|
+------+----+----+
|[1, 2]| how|   1|
|[2, 3]| are|   2|
|[3, 4]| you|   3|
+------+----+----+

between

df.filter(col("col2").between("hoa", "hox")).show()
+------+----+----+
|  col1|col2|col3|
+------+----+----+
|[1, 2]| how|   1|
+------+----+----+

df.filter(col("col3").between(2, 3)).show()
+------+----+----+
|  col1|col2|col3|
+------+----+----+
|[2, 3]| are|   2|
|[3, 4]| you|   3|
+------+----+----+

cast

df2 = df.select(col("col1"), col("col2"), col("col3").astype(StringType()))
df2.show()
+------+----+----+
|  col1|col2|col3|
+------+----+----+
|[1, 2]| how|   1|
|[2, 3]| are|   2|
|[3, 4]| you|   3|
+------+----+----+

df2.schema
StructType(List(StructField(col1,ArrayType(LongType,true),true),StructField(col2,StringType,true),StructField(col3,StringType,true)))
df3 = df2.select(col("col1"), col("col2"), col("col3").cast(IntegerType()))
df3.show()
+------+----+----+
|  col1|col2|col3|
+------+----+----+
|[1, 2]| how|   1|
|[2, 3]| are|   2|
|[3, 4]| you|   3|
+------+----+----+

df3.schema
StructType(List(StructField(col1,ArrayType(LongType,true),true),StructField(col2,StringType,true),StructField(col3,IntegerType,true)))

lit

x = lit(1)
type(x)
pyspark.sql.column.Column

hash

df.withColumn("hash_code", hash("col2")).show()
+------+----+----+-----------+
|  col1|col2|col3|  hash_code|
+------+----+----+-----------+
|[1, 2]| how|   1|-1205091763|
|[2, 3]| are|   2| -422146862|
|[3, 4]| you|   3| -315368575|
+------+----+----+-----------+

when

  1. null in when condition is considered as false.

import org.apache.spark.sql.functions._

val df = spark.read.json("../data/people.json")
df.show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

df = [age: bigint, name: string]
[age: bigint, name: string]

null in when condition is considered as false.

df.select(when($"age" > 20, 1).otherwise(0).alias("gt20")).show
+----+
|gt20|
+----+
|   0|
|   1|
|   0|
+----+

df.select(when($"age" <= 20, 1).otherwise(0).alias("le20")).show
+----+
|le20|
+----+
|   0|
|   0|
|   1|
+----+

df.select(when($"age".isNull, 0).when($"age" > 20 , 100).otherwise(10).alias("age")).show
+---+
|age|
+---+
|  0|
|100|
| 10|
+---+

df.select(when($"age".isNull, 0).alias("age")).show
+----+
| age|
+----+
|   0|
|null|
|null|
+----+