Ben Chuanlong Du's Blog

It is never too late to learn.

Read Multiple Files into a DataFrame in Spark

In [1]:
%%classpath add mvn
org.apache.spark spark-core_2.11 2.3.1
org.apache.spark spark-sql_2.11 2.3.1
In [2]:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

val spark = SparkSession.builder()
    .master("local[2]")
    .appName("Spark Column Example")
    .config("spark.some.config.option", "some-value")
    .getOrCreate()

import spark.implicits._
Out[2]:
org.apache.spark.sql.SparkSession$implicits$@107f8243
In [21]:
spark.read.parquet(
    "file:///workdir/archives/projects/rilb/src/test/resources/abc_item_1.parquet"
)
Out[21]:
[item_id: bigint, site_id: bigint]
In [11]:
spark.read.parquet(
    "/workdir/archives/projects/rilb/src/test/resources/abc_item_[1-2].parquet"
)
Out[11]:
[item_id: bigint, site_id: bigint]
In [10]:
spark.read.parquet(
    "/workdir/archives/projects/rilb/src/test/resources/abc_item_*.parquet"
)
Out[10]:
[item_id: bigint, site_id: bigint]
In [6]:
spark.read.parquet(
    "/workdir/archives/projects/rilb/src/test/resources/abc_item_{1,2}.parquet"
)
Out[6]:
[item_id: bigint, site_id: bigint]
In [13]:
spark.read.parquet(
    "/workdir/archives/projects/rilb/src/test/resources/{abc_item_1.parquet,abc_item_2.parquet}"
)
Out[13]:
[item_id: bigint, site_id: bigint]
In [14]:
spark.read.parquet(
    "/workdir/{archives/projects/rilb/src/test/resources/abc_item_1.parquet,archives/projects/rilb/src/test/resources/abc_item_2.parquet}"
)
Out[14]:
[item_id: bigint, site_id: bigint]
In [15]:
spark.read.parquet(
    "/{workdir/archives/projects/rilb/src/test/resources/abc_item_1.parquet,workdir/archives/projects/rilb/src/test/resources/abc_item_2.parquet}"
)
Out[15]:
[item_id: bigint, site_id: bigint]
In [5]:
spark.read.parquet(
    "/workdir/archives/projects/rilb/src/test/resources/abc_item_1.parquet,/workdir/archives/projects/rilb/src/test/resources/abc_item_2.parquet"
)
org.apache.spark.sql.AnalysisException: Path does not exist: file:/workdir/archives/projects/rilb/src/test/resources/abc_item_1.parquet,/workdir/archives/projects/rilb/src/test/resources/abc_item_2.parquet;
  at org.apache.spark.sql.execution.datasources.DataSource$.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:715)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$15.apply(DataSource.scala:389)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$15.apply(DataSource.scala:389)
  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
  at scala.collection.immutable.List.flatMap(List.scala:355)
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:388)
  at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
  at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:622)
  at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:606)
  ... 48 elided
In [24]:
spark.read.parquet(
    "file:///workdir/archives/projects/rilb/src/test/resources/abc_item_1.parquet,file:///workdir/archives/projects/rilb/src/test/resources/abc_item_2.parquet"
)
org.apache.spark.sql.AnalysisException: Path does not exist: file:/workdir/archives/projects/rilb/src/test/resources/abc_item_1.parquet,file:/workdir/archives/projects/rilb/src/test/resources/abc_item_2.parquet;
  at org.apache.spark.sql.execution.datasources.DataSource$.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:715)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$15.apply(DataSource.scala:389)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$15.apply(DataSource.scala:389)
  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
  at scala.collection.immutable.List.flatMap(List.scala:355)
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:388)
  at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
  at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:622)
  at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:606)
  ... 48 elided
In [20]:
spark.read.parquet(
    "file://{/workdir/archives/projects/rilb/src/test/resources/abc_item_1.parquet,/workdir/archives/projects/rilb/src/test/resources/abc_item_2.parquet}"
)
java.lang.IllegalArgumentException: Wrong FS: file://{/workdir/archives/projects/rilb/src/test/resources/abc_item_1.parquet,/workdir/archives/projects/rilb/src/test/resources/abc_item_2.parquet}, expected: file:///
  at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:645)
  at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:80)
  at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:534)
  at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:752)
  at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:529)
  at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:409)
  at org.apache.hadoop.fs.Globber.getFileStatus(Globber.java:57)
  at org.apache.hadoop.fs.Globber.glob(Globber.java:252)
  at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1625)
  at org.apache.spark.deploy.SparkHadoopUtil.globPath(SparkHadoopUtil.scala:244)
  at org.apache.spark.deploy.SparkHadoopUtil.globPathIfNecessary(SparkHadoopUtil.scala:254)
  at org.apache.spark.sql.execution.datasources.DataSource$.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:707)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$15.apply(DataSource.scala:389)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$15.apply(DataSource.scala:389)
  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
  at scala.collection.immutable.List.flatMap(List.scala:355)
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:388)
  at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
  at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:622)
  at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:606)
  ... 48 elided
In [9]:
spark.read.parquet(
    "{file:///workdir/archives/projects/rilb/src/test/resources/abc_item_1.parquet,file:///workdir/archives/projects/rilb/src/test/resources/abc_item_2.parquet}"
)
java.lang.IllegalArgumentException: java.net.URISyntaxException: Illegal character in scheme name at index 0: {file:///workdir/archives/projects/rilb/src/test/resources/abc_item_1.parquet,file://workdir/archives/projects/rilb/src/test/resources/abc_item_2.parquet%7D
  at org.apache.hadoop.fs.Path.initialize(Path.java:206)
  at org.apache.hadoop.fs.Path.<init>(Path.java:172)
  at org.apache.spark.sql.execution.datasources.DataSource$.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:704)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$15.apply(DataSource.scala:389)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$15.apply(DataSource.scala:389)
  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
  at scala.collection.immutable.List.flatMap(List.scala:355)
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:388)
  at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
  at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:622)
  at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:606)
  ... 48 elided
Caused by: java.net.URISyntaxException: Illegal character in scheme name at index 0: {file:///workdir/archives/projects/rilb/src/test/resources/abc_item_1.parquet,file://workdir/archives/projects/rilb/src/test/resources/abc_item_2.parquet%7D
  at java.net.URI$Parser.fail(URI.java:2848)
  at java.net.URI$Parser.checkChars(URI.java:3021)
  at java.net.URI$Parser.checkChar(URI.java:3031)
  at java.net.URI$Parser.parse(URI.java:3047)
  at java.net.URI.<init>(URI.java:746)
  at org.apache.hadoop.fs.Path.initialize(Path.java:203)
  ... 62 more
In [ ]:

Comments