Things on this page are fragmentary and immature notes/thoughts of the author. Please read with your own judgement!
Comments¶
Spark DataFrame is an alias to Dataset[Row].
Even though a Spark DataFrame is stored as Rows in a Dataset,
built-in operations/functions (in org.apache.spark.sql.functions) for Spark DataFrame are Column-based.
Sometimes,
there might be transformations on a DataFrame that is hard to express as Column expressions
but rather evey convenient to express as Row expressions.
The traditional way to resolve this issue is to wrap the row-based function into a UDF.
It is worthing knowing that Spark DataFrame supports map/flatMap APIs
which works on Rows.
They are still experimental as Spark 2.4.3.
It is suggested that you stick to Column-based operations/functions until the Row-based methods mature.
%%classpath add mvn
org.apache.spark spark-core_2.11 2.3.1
org.apache.spark spark-sql_2.11 2.3.1import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
val spark = SparkSession.builder()
.master("local[2]")
.appName("Spark Example")
.config("spark.some.config.option", "some-value")
.getOrCreate()
import spark.implicits._org.apache.spark.sql.SparkSession$implicits$@33a3ab71val df = Seq(
(1L, "a", "foo", 3.0),
(2L, "b", "bar", 4.0),
(3L, "c", "foo", 5.0),
(4L, "d", "bar", 7.0)
).toDF("col1", "col2", "col3", "col4")
df.show+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
| 1| a| foo| 3.0|
| 2| b| bar| 4.0|
| 3| c| foo| 5.0|
| 4| d| bar| 7.0|
+----+----+----+----+
nullFilter¶
df.filter{
row => row.getLong(0) < 3
}.show+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
| 1| a| foo| 3.0|
| 2| b| bar| 4.0|
+----+----+----+----+
df.filter{
row => row.getAs("col3") == "foo"
}.show+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
| 1| a| foo| 3.0|
| 3| c| foo| 5.0|
+----+----+----+----+
Map¶
df.map {
row => Row(row.getInt(0) + row.getDouble(3))
}.show<console>:112: error: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
df.map {
^References¶
https://
https://
https://
Map vs UDF¶
https://
https://
val df = spark.read.json("../data/people.json")
df.show+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
df.map(s => s.getString(1)).show+-------+
| value|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
df.filter($"age".isNotNull).map(r => r.getLong(0)).show+-----+
|value|
+-----+
| 30|
| 19|
+-----+
Apply Operation to All Columns¶
import org.apache.spark.sql.functions.{col, upper}
val df = Seq(
("a", "B", "c"),
("D", "e", "F")
).toDF("x", "y", "z")
df.show+---+---+---+
| x| y| z|
+---+---+---+
| a| B| c|
| D| e| F|
+---+---+---+
df.select(df.columns.map(c => upper(col(c)).alias(c)): _*).show+---+---+---+
| x| y| z|
+---+---+---+
| A| B| C|
| D| E| F|
+---+---+---+