Ben Chuanlong Du's Blog

It is never too late to learn.

Read/Write Parquet Files in Spark

In [1]:
%%classpath add mvn
org.apache.spark spark-core_2.11 2.3.1
org.apache.spark spark-sql_2.11 2.3.1
org.apache.spark spark-hive_2.11 2.3.1

Load Data

  1. .load is a general method for reading data in different format. You have to specify the format of the data via the method .format of course. .csv (both for CSV and TSV), .json and .parquet are specializations of .load. .format is optional if you use a specific loading function (csv, json, etc.).

  2. No header by default.

  3. .coalesece(1) or repartition(1) if you want to write to only 1 file.

Load Data in Parquet Format

In [7]:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

val spark = SparkSession.builder().master("local")
    .appName("IO")
    .getOrCreate()
spark
Out[7]:
org.apache.spark.sql.SparkSession@5d88dad5
In [9]:
val df = spark.read.parquet("f2.parquet")
df.show
+----+-----+---+--------+---------+--------+---------+---------+-------+-------+------+------+----+--------+--------+----+---+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|cancelled|carrier|tailnum|flight|origin|dest|air_time|distance|hour|min|
+----+-----+---+--------+---------+--------+---------+---------+-------+-------+------+------+----+--------+--------+----+---+
|2014|    1|  1|     914|       14|    1238|       13|        0|     AA| N338AA|     1|   JFK| LAX|     359|    2475|   9| 14|
|2014|    1|  1|    1157|       -3|    1523|       13|        0|     AA| N335AA|     3|   JFK| LAX|     363|    2475|  11| 57|
|2014|    1|  1|    1902|        2|    2224|        9|        0|     AA| N327AA|    21|   JFK| LAX|     351|    2475|  19|  2|
|2014|    1|  1|     722|       -8|    1014|      -26|        0|     AA| N3EHAA|    29|   LGA| PBI|     157|    1035|   7| 22|
|2014|    1|  1|    1347|        2|    1706|        1|        0|     AA| N319AA|   117|   JFK| LAX|     350|    2475|  13| 47|
|2014|    1|  1|    1824|        4|    2145|        0|        0|     AA| N3DEAA|   119|   EWR| LAX|     339|    2454|  18| 24|
|2014|    1|  1|    2133|       -2|      37|      -18|        0|     AA| N323AA|   185|   JFK| LAX|     338|    2475|  21| 33|
|2014|    1|  1|    1542|       -3|    1906|      -14|        0|     AA| N328AA|   133|   JFK| LAX|     356|    2475|  15| 42|
|2014|    1|  1|    1509|       -1|    1828|      -17|        0|     AA| N5FJAA|   145|   JFK| MIA|     161|    1089|  15|  9|
|2014|    1|  1|    1848|       -2|    2206|      -14|        0|     AA| N3HYAA|   235|   JFK| SEA|     349|    2422|  18| 48|
|2014|    1|  1|    1655|       -5|    2003|      -17|        0|     AA| N5CFAA|   172|   EWR| MIA|     161|    1085|  16| 55|
|2014|    1|  1|    1752|        7|    2120|       -5|        0|     AA| N332AA|   177|   JFK| SFO|     365|    2586|  17| 52|
|2014|    1|  1|    1253|        3|    1351|        1|        0|     AA| N3JWAA|   178|   JFK| BOS|      39|     187|  12| 53|
|2014|    1|  1|    1907|      142|    2223|      133|        0|     AA| N336AA|   181|   JFK| LAX|     345|    2475|  19|  7|
|2014|    1|  1|    1720|       -5|    1819|      -26|        0|     AA| N3BCAA|   256|   JFK| BOS|      35|     187|  17| 20|
|2014|    1|  1|    1733|       18|    2024|       69|        0|     AA| N3HPAA|   199|   JFK| ORD|     155|     740|  17| 33|
|2014|    1|  1|    1640|       25|    2001|       36|        0|     AA| N3HFAA|   211|   JFK| IAH|     234|    1417|  16| 40|
|2014|    1|  1|    1714|       -1|    2036|        1|        0|     AA| N3DVAA|   291|   JFK| AUS|     232|    1521|  17| 14|
|2014|    1|  1|    1611|      191|    1910|      185|        0|     AA| N471AA|   300|   EWR| DFW|     214|    1372|  16| 11|
|2014|    1|  1|     553|       -7|     739|       -6|        0|     AA| N3KHAA|   301|   LGA| ORD|     142|     733|   5| 53|
+----+-----+---+--------+---------+--------+---------+---------+-------+-------+------+------+----+--------+--------+----+---+
only showing top 20 rows

Out[9]:
null
In [10]:
df.count
Out[10]:
253316
In [11]:
df.select(input_file_name()).show
+--------------------+
|   input_file_name()|
+--------------------+
|file:///workdir/l...|
|file:///workdir/l...|
|file:///workdir/l...|
|file:///workdir/l...|
|file:///workdir/l...|
|file:///workdir/l...|
|file:///workdir/l...|
|file:///workdir/l...|
|file:///workdir/l...|
|file:///workdir/l...|
|file:///workdir/l...|
|file:///workdir/l...|
|file:///workdir/l...|
|file:///workdir/l...|
|file:///workdir/l...|
|file:///workdir/l...|
|file:///workdir/l...|
|file:///workdir/l...|
|file:///workdir/l...|
|file:///workdir/l...|
+--------------------+
only showing top 20 rows

In [1]:
val df = spark.read.load("namesAndAges.parquet")
df.show
<console>:89: error: not found: value spark
       val df = spark.read.load("namesAndAges.parquet")
                ^
In [9]:
val df = spark.sql("SELECT * FROM parquet.`namesAndAges.parquet`")
df.show
+-------+----+
|   name| age|
+-------+----+
|Michael|null|
|   Andy|  30|
| Justin|  19|
+-------+----+

In [20]:
import java.io.File

new File(".").listFiles.filter(_.getPath.endsWith(".csv"))
Out[20]:
Array(./flights14.csv, ./f2.csv)

Write DataFrame to Parquet

In [32]:
val flights = spark.read.
    format("csv").
    option("header", "true").
    option("mode", "DROPMALFORMED").
    csv("flights14.csv")
flights.write.parquet("f2.parquet")
In [3]:
peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")

Comments