Ben Chuanlong Du's Blog

It is never too late to learn.

Read/Write Files/Tables in Spark

References

DataFrameReader APIs

DataFrameWriter APIs

https://spark.apache.org/docs/latest/sql-programming-guide.html#data-sources

Comments

  1. It is suggested that you specify a schema when reading text files. If a schema is not specified when reading text files, it is good practice to check the types of columns (as the types are inferred).

  2. Do NOT read data from and write data to the same path in Spark! Due to lazy evaluation of Spark, the path will likely be cleared before it is read into Spark, which will throw IO exceptions. And the worst part is that your data on HDFS is removed but recoverable.

  3. When you want to keep headers in the data, it is suggested that you use the Parquet format to save data for multiple reasons. It is fast, takes less space and you don't have to worry about duplicated headers encountered when merging CSV files with headers. There is no need to merge Parquet files. You can directly read in a folder of Parquet files in all kinds of programming languages, which is more convenient than a folder of CSV files.

  4. Writing to existing files using RDD.saveAsTextFile throws file already exist exception. This s because Hadoop filesystem does not overwrite files that already exist by default. RDD.saveAsTextFile does not provide an option to manually overwrite existing files. To avoid the issue, you have to manually remove the existing file before writing to them. Of course, it is no longer suggested to use RDD directly any more in Spark. You use should DataFrame as much as possible. DataFrame.write allows you to overwrite existing HDFS files via DataFrame.write.option("overwrite"). It is suggested that you always use the overwrite mode when writing files/tables in Spark.

  5. DataFrame.write does not overwriting existing files by default and it simply throws an exception. You can use the option .option("overwrite") to force overwrite existing files. And it is suggested that always use the overwrite mode when writting to files/tables in Spark.

     DataFrame.write.option("overwrite").parquet("/path/to/write/files")
  6. It takes lots of time for hadoop fs -getmerge to extract and merge large number of compressed text files. So it is suggested that you turn off compression when saving results into text files, especially when there are huge number of partitions.

  7. spark.table is known for reading a Hive table. However, it can be used to read any HDFS table too. For example, it can be used to read a Parquet file.

     spark.table("parquet.`/path/to/table`")
    
    

    It is not recommended to use spark.table to read text format HDFS table though as you won't be able to specify options for loading the table.

Readiness of Data on HDFS

  1. Data in a Hive table guarantees completeness which means that if you see data of a certain date in the table, the complete data is there. However, if you work with other format (Parquet, Avro, Sequence, etc.), you'd better check for data readiness before you use it. A simple way to do this is to check whether the _SUCESS file exists in the same directory.

Parquet

https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#partition-discovery

  1. You have issues saving a DataFrame read in from Parquet to a CSV file. The reason is that Parquet support more complex data structures (e.g., array) which is not supported in CSV.

  2. Shell-like syntax (curly brace, wildcard, etc.) of matching multiple files is supported in both the Scala API and the SQL API when querying files directly as long as it does cause CONFLICTING directory structure.

    files = 's3a://dev/2017/01/{02,03}/data.parquet'
    df = session.read.parquet(files)

    spark.read.parquet("some_path/2019-02-10_05-38-11/SITE_NAME=*")

    select
        count(*) as n
    from
        parquet.`/some_path/2019-02-10_05-38-11/SITE_NAME=*`

https://spark.apache.org/docs/latest/sql-data-sources-parquet.html

https://docs.databricks.com/spark/latest/data-sources/read-parquet.html

input_file_name

The function input_file_name in the package org.apache.spark.sql.functions creates a string column for the file name of the current Spark tasks.

In [ ]:
df.write.option("compression", "none").mode("overwrite").save("testoutput.parquet")
In [ ]:
df.write.
    mode("overwrite").
    format("parquet").
    option("compression", "none").
    save("/tmp/file_no_compression_parq")
df.write.
    mode("overwrite").
    format("parquet").
    option("compression", "gzip").
    save("/tmp/file_with_gzip_parq")
df.write.
    mode("overwrite").
    format("parquet").
    option("compression", "snappy").
    save("/tmp/file_with_snappy_parq")

df.write.mode("overwrite").format("orc").option("compression", "none").mode("overwrite").save("/tmp/file_no_compression_orc")
df.write.mode("overwrite").format("orc").option("compression", "snappy").mode("overwrite").save("/tmp/file_with_snappy_orc")
df.write.mode("overwrite").format("orc").option("compression", "zlib").mode("overwrite").save("/tmp/file_with_zlib_orc")
In [ ]:
mode("overwrite")
In [ ]:

In [ ]:
option("compression", "none")
option("compression", "gzip")
option("compression", "snappy")
option("compression", "zlib")

option("inferSchema", true)
.option("nullValue", "NA")
.option("quote", "")

Below are old-fashioned RDD-related tips. Skip them if you do not have to use RDD.

RDD.saveAsTextFile

  1. RDD.saveAsTextFile does not provide options to control the format of output files. You have to manually format the RDD to one containings string in the format that you want. For example, if rdd contains tuples and you want to output it into TSV files, you can format it first using the following code.

     rdd.map { x => x.productIterator.mkString("\t") }
  2. You can use the following statement to turn off compression when saving results to files (suggested when writing to text files).

     sc.hadoopConfiguration.set("mapred.output.compress", "false")
  3. Writing to existing files using RDD.saveAsTextFile throws file already exist exception. This s because Hadoop filesystem does not overwrite files that already exist by default. RDD.saveAsTextFile does not provide an option to manually overwrite existing files. To avoid the issue, you have to manually remove the existing file before writing to them. Of course, it is no longer suggested to use RDD directly any more in Spark. You use should use DataFrame as much as possible.

In [ ]:

Comments