Ben Chuanlong Du's Blog

And let it direct your passion with reason.

Column Functions and Operators in Spark

In [1]:
from typing import List, Tuple
import pandas as pd
In [2]:
from pathlib import Path
import findspark

findspark.init(str(next(Path("/opt").glob("spark-3*"))))
# findspark.init("/opt/spark-2.3.0-bin-hadoop2.7")

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import *
from pyspark.sql.types import (
    IntegerType,
    StringType,
    StructType,
    StructField,
    ArrayType,
)

spark = (
    SparkSession.builder.appName("PySpark_Str_Func").enableHiveSupport().getOrCreate()
)
In [3]:
df = spark.createDataFrame(
    pd.DataFrame(
        data=[([1, 2], "how", 1), ([2, 3], "are", 2), ([3, 4], "you", 3)],
        columns=["col1", "col2", "col3"],
    )
)
df.show()
+------+----+----+
|  col1|col2|col3|
+------+----+----+
|[1, 2]| how|   1|
|[2, 3]| are|   2|
|[3, 4]| you|   3|
+------+----+----+

between

In [7]:
df.filter(col("col2").between("hoa", "hox")).show()
+------+----+----+
|  col1|col2|col3|
+------+----+----+
|[1, 2]| how|   1|
+------+----+----+

In [8]:
df.filter(col("col3").between(2, 3)).show()
+------+----+----+
|  col1|col2|col3|
+------+----+----+
|[2, 3]| are|   2|
|[3, 4]| you|   3|
+------+----+----+

cast

In [12]:
df2 = df.select(col("col1"), col("col2"), col("col3").astype(StringType()))
df2.show()
+------+----+----+
|  col1|col2|col3|
+------+----+----+
|[1, 2]| how|   1|
|[2, 3]| are|   2|
|[3, 4]| you|   3|
+------+----+----+

In [13]:
df2.schema
Out[13]:
StructType(List(StructField(col1,ArrayType(LongType,true),true),StructField(col2,StringType,true),StructField(col3,StringType,true)))
In [15]:
df3 = df2.select(col("col1"), col("col2"), col("col3").cast(IntegerType()))
df3.show()
+------+----+----+
|  col1|col2|col3|
+------+----+----+
|[1, 2]| how|   1|
|[2, 3]| are|   2|
|[3, 4]| you|   3|
+------+----+----+

In [16]:
df3.schema
Out[16]:
StructType(List(StructField(col1,ArrayType(LongType,true),true),StructField(col2,StringType,true),StructField(col3,IntegerType,true)))

lit

In [4]:
x = lit(1)
In [5]:
type(x)
Out[5]:
pyspark.sql.column.Column

hash

In [7]:
df.withColumn("hash_code", hash("col2")).show()
+------+----+----+-----------+
|  col1|col2|col3|  hash_code|
+------+----+----+-----------+
|[1, 2]| how|   1|-1205091763|
|[2, 3]| are|   2| -422146862|
|[3, 4]| you|   3| -315368575|
+------+----+----+-----------+

when

  1. null in when condition is considered as false.
In [1]:
import org.apache.spark.sql.functions._

val df = spark.read.json("../data/people.json")
df.show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

df = [age: bigint, name: string]
Out[1]:
[age: bigint, name: string]

null in when condition is considered as false.

In [3]:
df.select(when($"age" > 20, 1).otherwise(0).alias("gt20")).show
+----+
|gt20|
+----+
|   0|
|   1|
|   0|
+----+

In [5]:
df.select(when($"age" <= 20, 1).otherwise(0).alias("le20")).show
+----+
|le20|
+----+
|   0|
|   0|
|   1|
+----+

In [6]:
df.select(when($"age".isNull, 0).when($"age" > 20 , 100).otherwise(10).alias("age")).show
+---+
|age|
+---+
|  0|
|100|
| 10|
+---+

In [7]:
df.select(when($"age".isNull, 0).alias("age")).show
+----+
| age|
+----+
|   0|
|null|
|null|
+----+

In [ ]:
 

Comments