Things on this page are fragmentary and immature notes/thoughts of the author. Please read with your own judgement!
DataFrameReader APIs
DataFrameWriter APIs
https://
%%classpath add mvn
org.apache.spark spark-core_2.11 2.3.1
org.apache.spark spark-sql_2.11 2.3.1
org.apache.spark spark-hive_2.11 2.3.1Loading...
Loading...
Load Data¶
.loadis a general method for reading data in different format. You have to specify the format of the data via the method.formatof course..csv(both for CSV and TSV),.jsonand.parquetare specializations of.load..formatis optional if you use a specific loading function (csv, json, etc.).No header by default.
.coalesece(1)orrepartition(1)if you want to write to only 1 file.
Load Data in Parquet Format¶
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
val spark = SparkSession.builder().master("local")
.appName("IO")
.getOrCreate()
sparkorg.apache.spark.sql.SparkSession@5d88dad5val df = spark.read.parquet("f2.parquet")
df.show+----+-----+---+--------+---------+--------+---------+---------+-------+-------+------+------+----+--------+--------+----+---+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|cancelled|carrier|tailnum|flight|origin|dest|air_time|distance|hour|min|
+----+-----+---+--------+---------+--------+---------+---------+-------+-------+------+------+----+--------+--------+----+---+
|2014| 1| 1| 914| 14| 1238| 13| 0| AA| N338AA| 1| JFK| LAX| 359| 2475| 9| 14|
|2014| 1| 1| 1157| -3| 1523| 13| 0| AA| N335AA| 3| JFK| LAX| 363| 2475| 11| 57|
|2014| 1| 1| 1902| 2| 2224| 9| 0| AA| N327AA| 21| JFK| LAX| 351| 2475| 19| 2|
|2014| 1| 1| 722| -8| 1014| -26| 0| AA| N3EHAA| 29| LGA| PBI| 157| 1035| 7| 22|
|2014| 1| 1| 1347| 2| 1706| 1| 0| AA| N319AA| 117| JFK| LAX| 350| 2475| 13| 47|
|2014| 1| 1| 1824| 4| 2145| 0| 0| AA| N3DEAA| 119| EWR| LAX| 339| 2454| 18| 24|
|2014| 1| 1| 2133| -2| 37| -18| 0| AA| N323AA| 185| JFK| LAX| 338| 2475| 21| 33|
|2014| 1| 1| 1542| -3| 1906| -14| 0| AA| N328AA| 133| JFK| LAX| 356| 2475| 15| 42|
|2014| 1| 1| 1509| -1| 1828| -17| 0| AA| N5FJAA| 145| JFK| MIA| 161| 1089| 15| 9|
|2014| 1| 1| 1848| -2| 2206| -14| 0| AA| N3HYAA| 235| JFK| SEA| 349| 2422| 18| 48|
|2014| 1| 1| 1655| -5| 2003| -17| 0| AA| N5CFAA| 172| EWR| MIA| 161| 1085| 16| 55|
|2014| 1| 1| 1752| 7| 2120| -5| 0| AA| N332AA| 177| JFK| SFO| 365| 2586| 17| 52|
|2014| 1| 1| 1253| 3| 1351| 1| 0| AA| N3JWAA| 178| JFK| BOS| 39| 187| 12| 53|
|2014| 1| 1| 1907| 142| 2223| 133| 0| AA| N336AA| 181| JFK| LAX| 345| 2475| 19| 7|
|2014| 1| 1| 1720| -5| 1819| -26| 0| AA| N3BCAA| 256| JFK| BOS| 35| 187| 17| 20|
|2014| 1| 1| 1733| 18| 2024| 69| 0| AA| N3HPAA| 199| JFK| ORD| 155| 740| 17| 33|
|2014| 1| 1| 1640| 25| 2001| 36| 0| AA| N3HFAA| 211| JFK| IAH| 234| 1417| 16| 40|
|2014| 1| 1| 1714| -1| 2036| 1| 0| AA| N3DVAA| 291| JFK| AUS| 232| 1521| 17| 14|
|2014| 1| 1| 1611| 191| 1910| 185| 0| AA| N471AA| 300| EWR| DFW| 214| 1372| 16| 11|
|2014| 1| 1| 553| -7| 739| -6| 0| AA| N3KHAA| 301| LGA| ORD| 142| 733| 5| 53|
+----+-----+---+--------+---------+--------+---------+---------+-------+-------+------+------+----+--------+--------+----+---+
only showing top 20 rows
nulldf.count253316df.select(input_file_name()).show+--------------------+
| input_file_name()|
+--------------------+
|file:///workdir/l...|
|file:///workdir/l...|
|file:///workdir/l...|
|file:///workdir/l...|
|file:///workdir/l...|
|file:///workdir/l...|
|file:///workdir/l...|
|file:///workdir/l...|
|file:///workdir/l...|
|file:///workdir/l...|
|file:///workdir/l...|
|file:///workdir/l...|
|file:///workdir/l...|
|file:///workdir/l...|
|file:///workdir/l...|
|file:///workdir/l...|
|file:///workdir/l...|
|file:///workdir/l...|
|file:///workdir/l...|
|file:///workdir/l...|
+--------------------+
only showing top 20 rows
val df = spark.read.load("namesAndAges.parquet")
df.show<console>:89: error: not found: value spark
val df = spark.read.load("namesAndAges.parquet")
^val df = spark.sql("SELECT * FROM parquet.`namesAndAges.parquet`")
df.show+-------+----+
| name| age|
+-------+----+
|Michael|null|
| Andy| 30|
| Justin| 19|
+-------+----+
import java.io.File
new File(".").listFiles.filter(_.getPath.endsWith(".csv"))Array(./flights14.csv, ./f2.csv)Write DataFrame to Parquet¶
val flights = spark.read.
format("csv").
option("header", "true").
option("mode", "DROPMALFORMED").
csv("flights14.csv")
flights.write.parquet("f2.parquet")peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")