Things on this page are fragmentary and immature notes/thoughts of the author. Please read with your own judgement!
import findspark
findspark.init("/opt/spark")from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import *
from pyspark.sql.types import StructType
spark = SparkSession.builder.appName("PySpark").enableHiveSupport().getOrCreate()file_flight = "../../home/media/data/flights14.csv"Load Data in CSV Format¶
.loadis a general method for reading data in different format. You have to specify the format of the data via the method.formatof course..csv(both for CSV and TSV),.jsonand.parquetare specializations of.load..formatis optional if you use a specific loading function (csv, json, etc.).No header by default.
.coalesece(1)orrepartition(1)if you want to write to only 1 file.
Using load¶
flights = (
spark.read.format("csv")
.option("header", "true")
.option("mode", "DROPMALFORMED")
.load(file_flight)
)
flights.show(5)+----+-----+---+--------+---------+--------+---------+---------+-------+-------+------+------+----+--------+--------+----+---+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|cancelled|carrier|tailnum|flight|origin|dest|air_time|distance|hour|min|
+----+-----+---+--------+---------+--------+---------+---------+-------+-------+------+------+----+--------+--------+----+---+
|2014| 1| 1| 914| 14| 1238| 13| 0| AA| N338AA| 1| JFK| LAX| 359| 2475| 9| 14|
|2014| 1| 1| 1157| -3| 1523| 13| 0| AA| N335AA| 3| JFK| LAX| 363| 2475| 11| 57|
|2014| 1| 1| 1902| 2| 2224| 9| 0| AA| N327AA| 21| JFK| LAX| 351| 2475| 19| 2|
|2014| 1| 1| 722| -8| 1014| -26| 0| AA| N3EHAA| 29| LGA| PBI| 157| 1035| 7| 22|
|2014| 1| 1| 1347| 2| 1706| 1| 0| AA| N319AA| 117| JFK| LAX| 350| 2475| 13| 47|
+----+-----+---+--------+---------+--------+---------+---------+-------+-------+------+------+----+--------+--------+----+---+
only showing top 5 rows
val flights = spark.read.
format("csv").
load(fileFlight)
flights.show(5)+----+-----+---+--------+---------+--------+---------+---------+-------+-------+------+------+----+--------+--------+----+----+
| _c0| _c1|_c2| _c3| _c4| _c5| _c6| _c7| _c8| _c9| _c10| _c11|_c12| _c13| _c14|_c15|_c16|
+----+-----+---+--------+---------+--------+---------+---------+-------+-------+------+------+----+--------+--------+----+----+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|cancelled|carrier|tailnum|flight|origin|dest|air_time|distance|hour| min|
|2014| 1| 1| 914| 14| 1238| 13| 0| AA| N338AA| 1| JFK| LAX| 359| 2475| 9| 14|
|2014| 1| 1| 1157| -3| 1523| 13| 0| AA| N335AA| 3| JFK| LAX| 363| 2475| 11| 57|
|2014| 1| 1| 1902| 2| 2224| 9| 0| AA| N327AA| 21| JFK| LAX| 351| 2475| 19| 2|
|2014| 1| 1| 722| -8| 1014| -26| 0| AA| N3EHAA| 29| LGA| PBI| 157| 1035| 7| 22|
+----+-----+---+--------+---------+--------+---------+---------+-------+-------+------+------+----+--------+--------+----+----+
only showing top 5 rows
nullUsing csv¶
val flights = spark.read.
format("csv").
option("header", "true"). // false by default
option("mode", "DROPMALFORMED").
csv(fileFlight)
flights.show(5)+----+-----+---+--------+---------+--------+---------+---------+-------+-------+------+------+----+--------+--------+----+---+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|cancelled|carrier|tailnum|flight|origin|dest|air_time|distance|hour|min|
+----+-----+---+--------+---------+--------+---------+---------+-------+-------+------+------+----+--------+--------+----+---+
|2014| 1| 1| 914| 14| 1238| 13| 0| AA| N338AA| 1| JFK| LAX| 359| 2475| 9| 14|
|2014| 1| 1| 1157| -3| 1523| 13| 0| AA| N335AA| 3| JFK| LAX| 363| 2475| 11| 57|
|2014| 1| 1| 1902| 2| 2224| 9| 0| AA| N327AA| 21| JFK| LAX| 351| 2475| 19| 2|
|2014| 1| 1| 722| -8| 1014| -26| 0| AA| N3EHAA| 29| LGA| PBI| 157| 1035| 7| 22|
|2014| 1| 1| 1347| 2| 1706| 1| 0| AA| N319AA| 117| JFK| LAX| 350| 2475| 13| 47|
+----+-----+---+--------+---------+--------+---------+---------+-------+-------+------+------+----+--------+--------+----+---+
only showing top 5 rows
nullval flights = spark.read.option("header", "true").csv(fileFlight)
flights.show(5)+----+-----+---+--------+---------+--------+---------+---------+-------+-------+------+------+----+--------+--------+----+---+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|cancelled|carrier|tailnum|flight|origin|dest|air_time|distance|hour|min|
+----+-----+---+--------+---------+--------+---------+---------+-------+-------+------+------+----+--------+--------+----+---+
|2014| 1| 1| 914| 14| 1238| 13| 0| AA| N338AA| 1| JFK| LAX| 359| 2475| 9| 14|
|2014| 1| 1| 1157| -3| 1523| 13| 0| AA| N335AA| 3| JFK| LAX| 363| 2475| 11| 57|
|2014| 1| 1| 1902| 2| 2224| 9| 0| AA| N327AA| 21| JFK| LAX| 351| 2475| 19| 2|
|2014| 1| 1| 722| -8| 1014| -26| 0| AA| N3EHAA| 29| LGA| PBI| 157| 1035| 7| 22|
|2014| 1| 1| 1347| 2| 1706| 1| 0| AA| N319AA| 117| JFK| LAX| 350| 2475| 13| 47|
+----+-----+---+--------+---------+--------+---------+---------+-------+-------+------+------+----+--------+--------+----+---+
only showing top 5 rows
nullval flights = spark.read.csv(fileFlight)
flights.show(5)+----+-----+---+--------+---------+--------+---------+---------+-------+-------+------+------+----+--------+--------+----+----+
| _c0| _c1|_c2| _c3| _c4| _c5| _c6| _c7| _c8| _c9| _c10| _c11|_c12| _c13| _c14|_c15|_c16|
+----+-----+---+--------+---------+--------+---------+---------+-------+-------+------+------+----+--------+--------+----+----+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|cancelled|carrier|tailnum|flight|origin|dest|air_time|distance|hour| min|
|2014| 1| 1| 914| 14| 1238| 13| 0| AA| N338AA| 1| JFK| LAX| 359| 2475| 9| 14|
|2014| 1| 1| 1157| -3| 1523| 13| 0| AA| N335AA| 3| JFK| LAX| 363| 2475| 11| 57|
|2014| 1| 1| 1902| 2| 2224| 9| 0| AA| N327AA| 21| JFK| LAX| 351| 2475| 19| 2|
|2014| 1| 1| 722| -8| 1014| -26| 0| AA| N3EHAA| 29| LGA| PBI| 157| 1035| 7| 22|
+----+-----+---+--------+---------+--------+---------+---------+-------+-------+------+------+----+--------+--------+----+----+
only showing top 5 rows
nullval flights = spark.sql("SELECT * FROM csv.`../../home/media/data/flights14.csv`")
flights.show+----+-----+---+--------+---------+--------+---------+---------+-------+-------+------+------+----+--------+--------+----+----+
| _c0| _c1|_c2| _c3| _c4| _c5| _c6| _c7| _c8| _c9| _c10| _c11|_c12| _c13| _c14|_c15|_c16|
+----+-----+---+--------+---------+--------+---------+---------+-------+-------+------+------+----+--------+--------+----+----+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|cancelled|carrier|tailnum|flight|origin|dest|air_time|distance|hour| min|
|2014| 1| 1| 914| 14| 1238| 13| 0| AA| N338AA| 1| JFK| LAX| 359| 2475| 9| 14|
|2014| 1| 1| 1157| -3| 1523| 13| 0| AA| N335AA| 3| JFK| LAX| 363| 2475| 11| 57|
|2014| 1| 1| 1902| 2| 2224| 9| 0| AA| N327AA| 21| JFK| LAX| 351| 2475| 19| 2|
|2014| 1| 1| 722| -8| 1014| -26| 0| AA| N3EHAA| 29| LGA| PBI| 157| 1035| 7| 22|
|2014| 1| 1| 1347| 2| 1706| 1| 0| AA| N319AA| 117| JFK| LAX| 350| 2475| 13| 47|
|2014| 1| 1| 1824| 4| 2145| 0| 0| AA| N3DEAA| 119| EWR| LAX| 339| 2454| 18| 24|
|2014| 1| 1| 2133| -2| 37| -18| 0| AA| N323AA| 185| JFK| LAX| 338| 2475| 21| 33|
|2014| 1| 1| 1542| -3| 1906| -14| 0| AA| N328AA| 133| JFK| LAX| 356| 2475| 15| 42|
|2014| 1| 1| 1509| -1| 1828| -17| 0| AA| N5FJAA| 145| JFK| MIA| 161| 1089| 15| 9|
|2014| 1| 1| 1848| -2| 2206| -14| 0| AA| N3HYAA| 235| JFK| SEA| 349| 2422| 18| 48|
|2014| 1| 1| 1655| -5| 2003| -17| 0| AA| N5CFAA| 172| EWR| MIA| 161| 1085| 16| 55|
|2014| 1| 1| 1752| 7| 2120| -5| 0| AA| N332AA| 177| JFK| SFO| 365| 2586| 17| 52|
|2014| 1| 1| 1253| 3| 1351| 1| 0| AA| N3JWAA| 178| JFK| BOS| 39| 187| 12| 53|
|2014| 1| 1| 1907| 142| 2223| 133| 0| AA| N336AA| 181| JFK| LAX| 345| 2475| 19| 7|
|2014| 1| 1| 1720| -5| 1819| -26| 0| AA| N3BCAA| 256| JFK| BOS| 35| 187| 17| 20|
|2014| 1| 1| 1733| 18| 2024| 69| 0| AA| N3HPAA| 199| JFK| ORD| 155| 740| 17| 33|
|2014| 1| 1| 1640| 25| 2001| 36| 0| AA| N3HFAA| 211| JFK| IAH| 234| 1417| 16| 40|
|2014| 1| 1| 1714| -1| 2036| 1| 0| AA| N3DVAA| 291| JFK| AUS| 232| 1521| 17| 14|
|2014| 1| 1| 1611| 191| 1910| 185| 0| AA| N471AA| 300| EWR| DFW| 214| 1372| 16| 11|
+----+-----+---+--------+---------+--------+---------+---------+-------+-------+------+------+----+--------+--------+----+----+
only showing top 20 rows
nullSchema¶
https://
import org.apache.spark.sql.types._
val customSchema = StructType(Array(
StructField("project", StringType, true),
StructField("article", StringType, true),
StructField("requests", IntegerType, true),
StructField("bytes_served", DoubleType, true))
)
val pagecount = sqlContext.read.format("csv")
.option("delimiter"," ").option("quote","")
.option("header", "true")
.schema(customSchema)
.load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")Write DataFrame to CSV¶
val flights = spark.read.
format("csv").
option("header", "true").
option("mode", "DROPMALFORMED").
csv(fileFlight)
flights.write.option("header", "true").csv("f2.csv")nullMisc Configurations¶
df.write.
mode("overwrite").
format("parquet").
option("compression", "none").
save("/tmp/file_no_compression_parq")
df.write.
mode("overwrite").
format("parquet").
option("compression", "gzip").
save("/tmp/file_with_gzip_parq")
df.write.
mode("overwrite").
format("parquet").
option("compression", "snappy").
save("/tmp/file_with_snappy_parq")
df.write.mode("overwrite").format("orc").option("compression", "none").mode("overwrite").save("/tmp/file_no_compression_orc")
df.write.mode("overwrite").format("orc").option("compression", "snappy").mode("overwrite").save("/tmp/file_with_snappy_orc")
df.write.mode("overwrite").format("orc").option("compression", "zlib").mode("overwrite").save("/tmp/file_with_zlib_orc")
Output with a Single Header¶
https://
dataFrame
.coalesce(1)
.write
.format("com.databricks.spark.csv")
.option("header", "true")
.save(out)dataFrame
.repartition(1)
.write
.format("com.databricks.spark.csv")
.option("header", "true")
.save(out)