Ben Chuanlong Du's Blog

It is never too late to learn.

Row-based Mapping and Filtering on DataFrames in Spark

Comments

Spark DataFrame is an alias to Dataset[Row]. Even though a Spark DataFrame is stored as Rows in a Dataset, built-in operations/functions (in org.apache.spark.sql.functions) for Spark DataFrame are Column-based. Sometimes, there might be transformations on a DataFrame that is hard to express as Column expressions but rather evey convenient to express as Row expressions. The traditional way to resolve this issue is to wrap the row-based function into a UDF. It is worthing knowing that Spark DataFrame supports map/flatMap APIs which works on Rows. They are still experimental as Spark 2.4.3. It is suggested that you stick to Column-based operations/functions until the Row-based methods mature.

In [1]:
%%classpath add mvn
org.apache.spark spark-core_2.11 2.3.1
org.apache.spark spark-sql_2.11 2.3.1
In [7]:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row

val spark = SparkSession.builder()
    .master("local[2]")
    .appName("Spark Example")
    .config("spark.some.config.option", "some-value")
    .getOrCreate()

import spark.implicits._
Out[7]:
org.apache.spark.sql.SparkSession$implicits$@33a3ab71
In [3]:
val df = Seq(
    (1L, "a", "foo", 3.0),
    (2L, "b", "bar", 4.0),
    (3L, "c", "foo", 5.0),
    (4L, "d", "bar", 7.0)
).toDF("col1", "col2", "col3", "col4")
df.show
+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
|   1|   a| foo| 3.0|
|   2|   b| bar| 4.0|
|   3|   c| foo| 5.0|
|   4|   d| bar| 7.0|
+----+----+----+----+

Out[3]:
null

Filter

In [4]:
df.filter{
    row => row.getLong(0) < 3 
}.show
+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
|   1|   a| foo| 3.0|
|   2|   b| bar| 4.0|
+----+----+----+----+

In [5]:
df.filter{
    row => row.getAs("col3") == "foo" 
}.show
+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
|   1|   a| foo| 3.0|
|   3|   c| foo| 5.0|
+----+----+----+----+

Map

In [11]:
df.map {
    row => Row(row.getInt(0) + row.getDouble(3))
}.show
<console>:112: error: Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.
       df.map {
              ^
In [30]:
val df = spark.read.json("../data/people.json")
df.show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

In [27]:
df.map(s => s.getString(1)).show
+-------+
|  value|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+

In [37]:
df.filter($"age".isNotNull).map(r => r.getLong(0)).show
+-----+
|value|
+-----+
|   30|
|   19|
+-----+

Apply Operation to All Columns

In [7]:
import org.apache.spark.sql.functions.{col, upper}

val df = Seq(
    ("a", "B", "c"), 
    ("D", "e", "F")
).toDF("x", "y", "z")
df.show
+---+---+---+
|  x|  y|  z|
+---+---+---+
|  a|  B|  c|
|  D|  e|  F|
+---+---+---+

In [8]:
df.select(df.columns.map(c => upper(col(c)).alias(c)): _*).show
+---+---+---+
|  x|  y|  z|
+---+---+---+
|  A|  B|  C|
|  D|  E|  F|
+---+---+---+

In [ ]:

In [ ]:

Comments