Ben Chuanlong Du's Blog

And let it direct your passion with reason.

Control Number of Partitions of a DataFrame in Spark

Tips and Traps

  1. DataFrame.repartition repartitions the DataFrame by hash code of each row. If you specify a (multiple) column(s) (instead of number of partitions) to the method DataFrame.repartition, then hash code of the column(s) are calculated for repartition. In some situations, there are lots of hash conflictions even if the total number of rows is small (e.g., a few thousand), which means that partitions generated might be skewed . and causes a few long-running tasks. If this ever happens, it is suggested that you manually add a column which helps the hashing algoirthm. Notice that an existing integer column with distinct values in the DataFrame is not necessarily a good column to repartition by especially when those integers are big (e.g., u64) as hash code of those integers can easily conflicts. It is best to add a column of random numbers or a column of manually curated partition indexes and ask Spark to repartition based on that column.

  2. When loading files into a DataFrame, Spark controls the size of each partition of the DataFrame through the parameter spark.sql.files.maxPartitionBytes (128M by default). If a file has a size larger than spark.sql.files.maxPartitionBytes, it is splitted evenly into multiple smaller blocks (whose sizes are less than or equal to 128M) and each block is loaded into one partition of the DataFrame. If file sizes are small, Spark loads as many as possible files into one partition with the total size of files less than or equal to 128M. Generally speaking, you want to keep the default value for spark.sql.files.maxPartitionBytes as it yields good performance for Spark applications which are data intensive (typical situation of Spark applications). However, if your Spark application is CPU intensive, it makes more sense to set a much smaller value for spark.sql.files.maxPartitionBytes so that there are more partitions generated and yield a higher level of parallelism. You can, of course, repartition a DataFrame manually, but it is more expensive to do so and requires you to have access to the source code. If you do play with spark.sql.files.maxPartitionBytes to increase the number of partitions of a loaded DataFrame, be aware that the final output DataFrame (after computation) might also have a large number of partitions. It is not a good idea to write lots of small files into the Hadoop filesystem as it not only hurst the performance of the Hadoop filesystem but might also exceed the namespace quota limitation. In this case, you want to reduce the number of partitions of the output DataFrame before writing it into disk. The rule of thumb is to make each partition file on disk has a size of 64M-128M. There are a few ways to achieve this.

    • Manually repartition the output DataFrame to reduce the number of partitions.
    • Manually coalesce the output DataFrame to reduce the number of partitions.

      Manually repartition the output DataFrame is easy to carry out but it causes a full data shuffling which might be expensive. Manually coalescing the output DataFrame (to reduce the number of partitions) is less expensive but it has a pitfall. Spark optimizes the physical plan and might reduce the number of partitions before computation of the DataFrame. This is undesirable if you want to have a large number of partitions to increase parallelism when computing the DataFrame and reduce the number of partitions when outputing the DataFrame. There are a few ways to solve this problem.

    • Checkpoint the DataFrame before coalescing.

    • Cache the DataFrame and trigger a RDD count action (unlike checkpoint, caching itself does not trigger a RDD action) before coalescing.

      Generally speaking, caching + triggering a RDD action has a better performance than checkpoint but checkpoint is more robust (to noisy Spark cluster). You can also manually output the DataFrame (before coalesing), read it back, and then coalesce (to reduce the number of partitions) and output it. It is equivalent to caching to disk and then trigger a RDD action, theoretically speaking. However, in pratice I've enounctered performance issues with both checkpoint (due to the bug ) and cache + triggering a RDD action (First, computing twice similar to the bug of checkpoint; Second, unstable and fails often even with when persisting to disk ). On the contrary, manually writing the DataFrame, read it back, and then coalesce and output it works well. Please refer to repart_hdfs for a reference implementation. For more discussion on cache/persist vs checkpoint, please refer to Persist and Checkpoint DataFrames in Spark .

Comments