Ben Chuanlong Du's Blog

It is never too late to learn.

Read/Write CSV in Spark

In [1]:
import findspark

findspark.init("/opt/spark")
In [2]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import *
from pyspark.sql.types import StructType

spark = SparkSession.builder.appName("PySpark").enableHiveSupport().getOrCreate()
In [3]:
file_flight = "../../home/media/data/flights14.csv"

Load Data in CSV Format

  1. .load is a general method for reading data in different format. You have to specify the format of the data via the method .format of course. .csv (both for CSV and TSV), .json and .parquet are specializations of .load. .format is optional if you use a specific loading function (csv, json, etc.).

  2. No header by default.

  3. .coalesece(1) or repartition(1) if you want to write to only 1 file.

Using load

In [6]:
flights = (
    spark.read.format("csv")
    .option("header", "true")
    .option("mode", "DROPMALFORMED")
    .load(file_flight)
)
flights.show(5)
+----+-----+---+--------+---------+--------+---------+---------+-------+-------+------+------+----+--------+--------+----+---+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|cancelled|carrier|tailnum|flight|origin|dest|air_time|distance|hour|min|
+----+-----+---+--------+---------+--------+---------+---------+-------+-------+------+------+----+--------+--------+----+---+
|2014|    1|  1|     914|       14|    1238|       13|        0|     AA| N338AA|     1|   JFK| LAX|     359|    2475|   9| 14|
|2014|    1|  1|    1157|       -3|    1523|       13|        0|     AA| N335AA|     3|   JFK| LAX|     363|    2475|  11| 57|
|2014|    1|  1|    1902|        2|    2224|        9|        0|     AA| N327AA|    21|   JFK| LAX|     351|    2475|  19|  2|
|2014|    1|  1|     722|       -8|    1014|      -26|        0|     AA| N3EHAA|    29|   LGA| PBI|     157|    1035|   7| 22|
|2014|    1|  1|    1347|        2|    1706|        1|        0|     AA| N319AA|   117|   JFK| LAX|     350|    2475|  13| 47|
+----+-----+---+--------+---------+--------+---------+---------+-------+-------+------+------+----+--------+--------+----+---+
only showing top 5 rows

In [9]:
val flights = spark.read.
    format("csv").
    load(fileFlight)
flights.show(5)
+----+-----+---+--------+---------+--------+---------+---------+-------+-------+------+------+----+--------+--------+----+----+
| _c0|  _c1|_c2|     _c3|      _c4|     _c5|      _c6|      _c7|    _c8|    _c9|  _c10|  _c11|_c12|    _c13|    _c14|_c15|_c16|
+----+-----+---+--------+---------+--------+---------+---------+-------+-------+------+------+----+--------+--------+----+----+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|cancelled|carrier|tailnum|flight|origin|dest|air_time|distance|hour| min|
|2014|    1|  1|     914|       14|    1238|       13|        0|     AA| N338AA|     1|   JFK| LAX|     359|    2475|   9|  14|
|2014|    1|  1|    1157|       -3|    1523|       13|        0|     AA| N335AA|     3|   JFK| LAX|     363|    2475|  11|  57|
|2014|    1|  1|    1902|        2|    2224|        9|        0|     AA| N327AA|    21|   JFK| LAX|     351|    2475|  19|   2|
|2014|    1|  1|     722|       -8|    1014|      -26|        0|     AA| N3EHAA|    29|   LGA| PBI|     157|    1035|   7|  22|
+----+-----+---+--------+---------+--------+---------+---------+-------+-------+------+------+----+--------+--------+----+----+
only showing top 5 rows

Out[9]:
null

Using csv

In [10]:
val flights = spark.read.
    format("csv").
    option("header", "true"). // false by default
    option("mode", "DROPMALFORMED").
    csv(fileFlight)
flights.show(5)
+----+-----+---+--------+---------+--------+---------+---------+-------+-------+------+------+----+--------+--------+----+---+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|cancelled|carrier|tailnum|flight|origin|dest|air_time|distance|hour|min|
+----+-----+---+--------+---------+--------+---------+---------+-------+-------+------+------+----+--------+--------+----+---+
|2014|    1|  1|     914|       14|    1238|       13|        0|     AA| N338AA|     1|   JFK| LAX|     359|    2475|   9| 14|
|2014|    1|  1|    1157|       -3|    1523|       13|        0|     AA| N335AA|     3|   JFK| LAX|     363|    2475|  11| 57|
|2014|    1|  1|    1902|        2|    2224|        9|        0|     AA| N327AA|    21|   JFK| LAX|     351|    2475|  19|  2|
|2014|    1|  1|     722|       -8|    1014|      -26|        0|     AA| N3EHAA|    29|   LGA| PBI|     157|    1035|   7| 22|
|2014|    1|  1|    1347|        2|    1706|        1|        0|     AA| N319AA|   117|   JFK| LAX|     350|    2475|  13| 47|
+----+-----+---+--------+---------+--------+---------+---------+-------+-------+------+------+----+--------+--------+----+---+
only showing top 5 rows

Out[10]:
null
In [11]:
val flights = spark.read.option("header", "true").csv(fileFlight)
flights.show(5)
+----+-----+---+--------+---------+--------+---------+---------+-------+-------+------+------+----+--------+--------+----+---+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|cancelled|carrier|tailnum|flight|origin|dest|air_time|distance|hour|min|
+----+-----+---+--------+---------+--------+---------+---------+-------+-------+------+------+----+--------+--------+----+---+
|2014|    1|  1|     914|       14|    1238|       13|        0|     AA| N338AA|     1|   JFK| LAX|     359|    2475|   9| 14|
|2014|    1|  1|    1157|       -3|    1523|       13|        0|     AA| N335AA|     3|   JFK| LAX|     363|    2475|  11| 57|
|2014|    1|  1|    1902|        2|    2224|        9|        0|     AA| N327AA|    21|   JFK| LAX|     351|    2475|  19|  2|
|2014|    1|  1|     722|       -8|    1014|      -26|        0|     AA| N3EHAA|    29|   LGA| PBI|     157|    1035|   7| 22|
|2014|    1|  1|    1347|        2|    1706|        1|        0|     AA| N319AA|   117|   JFK| LAX|     350|    2475|  13| 47|
+----+-----+---+--------+---------+--------+---------+---------+-------+-------+------+------+----+--------+--------+----+---+
only showing top 5 rows

Out[11]:
null
In [12]:
val flights = spark.read.csv(fileFlight)
flights.show(5)
+----+-----+---+--------+---------+--------+---------+---------+-------+-------+------+------+----+--------+--------+----+----+
| _c0|  _c1|_c2|     _c3|      _c4|     _c5|      _c6|      _c7|    _c8|    _c9|  _c10|  _c11|_c12|    _c13|    _c14|_c15|_c16|
+----+-----+---+--------+---------+--------+---------+---------+-------+-------+------+------+----+--------+--------+----+----+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|cancelled|carrier|tailnum|flight|origin|dest|air_time|distance|hour| min|
|2014|    1|  1|     914|       14|    1238|       13|        0|     AA| N338AA|     1|   JFK| LAX|     359|    2475|   9|  14|
|2014|    1|  1|    1157|       -3|    1523|       13|        0|     AA| N335AA|     3|   JFK| LAX|     363|    2475|  11|  57|
|2014|    1|  1|    1902|        2|    2224|        9|        0|     AA| N327AA|    21|   JFK| LAX|     351|    2475|  19|   2|
|2014|    1|  1|     722|       -8|    1014|      -26|        0|     AA| N3EHAA|    29|   LGA| PBI|     157|    1035|   7|  22|
+----+-----+---+--------+---------+--------+---------+---------+-------+-------+------+------+----+--------+--------+----+----+
only showing top 5 rows

Out[12]:
null
In [14]:
val flights = spark.sql("SELECT * FROM csv.`../../home/media/data/flights14.csv`")
flights.show
+----+-----+---+--------+---------+--------+---------+---------+-------+-------+------+------+----+--------+--------+----+----+
| _c0|  _c1|_c2|     _c3|      _c4|     _c5|      _c6|      _c7|    _c8|    _c9|  _c10|  _c11|_c12|    _c13|    _c14|_c15|_c16|
+----+-----+---+--------+---------+--------+---------+---------+-------+-------+------+------+----+--------+--------+----+----+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|cancelled|carrier|tailnum|flight|origin|dest|air_time|distance|hour| min|
|2014|    1|  1|     914|       14|    1238|       13|        0|     AA| N338AA|     1|   JFK| LAX|     359|    2475|   9|  14|
|2014|    1|  1|    1157|       -3|    1523|       13|        0|     AA| N335AA|     3|   JFK| LAX|     363|    2475|  11|  57|
|2014|    1|  1|    1902|        2|    2224|        9|        0|     AA| N327AA|    21|   JFK| LAX|     351|    2475|  19|   2|
|2014|    1|  1|     722|       -8|    1014|      -26|        0|     AA| N3EHAA|    29|   LGA| PBI|     157|    1035|   7|  22|
|2014|    1|  1|    1347|        2|    1706|        1|        0|     AA| N319AA|   117|   JFK| LAX|     350|    2475|  13|  47|
|2014|    1|  1|    1824|        4|    2145|        0|        0|     AA| N3DEAA|   119|   EWR| LAX|     339|    2454|  18|  24|
|2014|    1|  1|    2133|       -2|      37|      -18|        0|     AA| N323AA|   185|   JFK| LAX|     338|    2475|  21|  33|
|2014|    1|  1|    1542|       -3|    1906|      -14|        0|     AA| N328AA|   133|   JFK| LAX|     356|    2475|  15|  42|
|2014|    1|  1|    1509|       -1|    1828|      -17|        0|     AA| N5FJAA|   145|   JFK| MIA|     161|    1089|  15|   9|
|2014|    1|  1|    1848|       -2|    2206|      -14|        0|     AA| N3HYAA|   235|   JFK| SEA|     349|    2422|  18|  48|
|2014|    1|  1|    1655|       -5|    2003|      -17|        0|     AA| N5CFAA|   172|   EWR| MIA|     161|    1085|  16|  55|
|2014|    1|  1|    1752|        7|    2120|       -5|        0|     AA| N332AA|   177|   JFK| SFO|     365|    2586|  17|  52|
|2014|    1|  1|    1253|        3|    1351|        1|        0|     AA| N3JWAA|   178|   JFK| BOS|      39|     187|  12|  53|
|2014|    1|  1|    1907|      142|    2223|      133|        0|     AA| N336AA|   181|   JFK| LAX|     345|    2475|  19|   7|
|2014|    1|  1|    1720|       -5|    1819|      -26|        0|     AA| N3BCAA|   256|   JFK| BOS|      35|     187|  17|  20|
|2014|    1|  1|    1733|       18|    2024|       69|        0|     AA| N3HPAA|   199|   JFK| ORD|     155|     740|  17|  33|
|2014|    1|  1|    1640|       25|    2001|       36|        0|     AA| N3HFAA|   211|   JFK| IAH|     234|    1417|  16|  40|
|2014|    1|  1|    1714|       -1|    2036|        1|        0|     AA| N3DVAA|   291|   JFK| AUS|     232|    1521|  17|  14|
|2014|    1|  1|    1611|      191|    1910|      185|        0|     AA| N471AA|   300|   EWR| DFW|     214|    1372|  16|  11|
+----+-----+---+--------+---------+--------+---------+---------+-------+-------+------+------+----+--------+--------+----+----+
only showing top 20 rows

Out[14]:
null
In [ ]:
import org.apache.spark.sql.types._

val customSchema = StructType(Array(
  StructField("project", StringType, true),
  StructField("article", StringType, true),
  StructField("requests", IntegerType, true),
  StructField("bytes_served", DoubleType, true))
)

val pagecount = sqlContext.read.format("csv")
  .option("delimiter"," ").option("quote","")
  .option("header", "true")
  .schema(customSchema)
  .load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")

Write DataFrame to CSV

In [15]:
val flights = spark.read.
    format("csv").
    option("header", "true").
    option("mode", "DROPMALFORMED").
    csv(fileFlight)

flights.write.option("header", "true").csv("f2.csv")
Out[15]:
null

Misc Configurations

In [ ]:
df.write.
    mode("overwrite").
    format("parquet").
    option("compression", "none").
    save("/tmp/file_no_compression_parq")
df.write.
    mode("overwrite").
    format("parquet").
    option("compression", "gzip").
    save("/tmp/file_with_gzip_parq")
df.write.
    mode("overwrite").
    format("parquet").
    option("compression", "snappy").
    save("/tmp/file_with_snappy_parq")

df.write.mode("overwrite").format("orc").option("compression", "none").mode("overwrite").save("/tmp/file_no_compression_orc")
df.write.mode("overwrite").format("orc").option("compression", "snappy").mode("overwrite").save("/tmp/file_with_snappy_orc")
df.write.mode("overwrite").format("orc").option("compression", "zlib").mode("overwrite").save("/tmp/file_with_zlib_orc")
In [ ]:
dataFrame
      .coalesce(1)
      .write
      .format("com.databricks.spark.csv")
      .option("header", "true")
      .save(out)
In [ ]:
dataFrame
      .repartition(1)
      .write
      .format("com.databricks.spark.csv")
      .option("header", "true")
      .save(out)

Comments