Ben Chuanlong Du's Blog

It is never too late to learn.

Partition and Bucketing in Spark

Tips and Traps

  1. Bucketed column is only supported in Hive table at this time.

  2. A Hive table can have both partition and bucket columns.

  3. Suppose t1 and t2 are 2 bucketed tables and with the number of buckets b1 and b2 respecitvely. For bucket optimization to kick in when joining them:

     - The 2 tables must be bucketed on the same keys/columns.
     - Must joining on the bucket keys/columns.
     - `b1` is a multiple of `b2` or `b2` is a multiple of `b1`.
    
    

    When there are many bucketed table that might join with each other, the number of buckets need to be carefully designed so that efficient bucket join can always be leveraged.

  4. Bucket for optimized filtering is available in Spark 2.4+. For examples, if the table person has a bucketed column id with an integer-compatible type, then the following query in Spark 2.4+ will be optimized to avoid a scan of the whole table. A few things to be aware here. First, you will still see a number of tasks close to the number of buckets in your Spark application. This is becuase the optimized job will still have to check all buckets of the table to see whether they are the right bucket corresponding to id=123. (If yes, Spark will scan all rows in the bucket to filter records. If not, the bucket will skipped to save time.) Second, the type of the value to compare must be compartible in order for Spark SQL to leverage bucket filtering. For example, if the id column in the person table is of the BigInt type and id = 123 is changed to id = "123" in the following query, Spark will have to do a full table scan (even if it sounds extremely stupid to do so).

     :::sql
     SELECT *
     FROM persons
     WHERE id = 123
  5. When you use multiple bucket columns in a Hive table, the hashing for bucket on a record is calculated based on a string concatenating values of all bucket columns. This means that to leverage bucket join or bucket filtering, all bucket columns must be used in joining conditions or filtering conditions.

Benefit of Partition Columns

  1. Spark supports partition pruning which skips scanning of non-needed partition files when filtering on partition columns. However, notice that partition columns does not help much on joining in Spark. For more discussions on this, please refer to Partition-wise joins and Apache Spark SQL .

When to Use Partition Columns

  1. Table size is big (> 50G).
  2. The table has low cardinality columns which are frequently used in filtering conditions.

How to Choose Partition Columns

  1. Choose low cardinality columns as partition columns (since a HDFS directory will be created for each partition value combination). Generally speaking, the total number of partition combinations should be less than 50K.

  2. The columns used frequently in filtering conditions.

  3. Use at most 2 partition columns as each partition column creates a new layer of directory.

Bucket columns in Hive tables are similar to primary indexes in Teradata.

Benefits of Bucket Columns

  1. Spark supports bucket pruning which skips scanning of non-needed bucket files when filtering on bucket columns.

  2. Bucket join will be leveraged when the 2 joining tables are both bucketed by joining keys of the same data type and bucket numbers of the 2 tables have a times relationship (e.g., 500 vs 1000).

  3. The number of buckets helps guide Spark engine on parallel execution level.

When to Use Bucket Columns

  1. Table size is big (> 200G).

  2. The table has high cardinality columns which are frequently used as filtering and/or joining keys.

  3. medium size table but it is mainly used to join with a huge bucketize table, it is still beneficial to bucketize it

  4. the sort merge join (without bucket) is slow due to shuffle not due to data skew

How to Configure Bucket Columns

  1. Choose high cardinality columns as bucket columns.
  2. Try to avoid data skew.
  3. At least 500 buckets (as a small bucket number will cause poor parallel execution).
  4. Sorting buckets is optional but strongly recommended.

Small Files

  1. name node, each file is an object in name node, lots of small files put lots of pressure on the name node

    also, compute engine also have latencies managing jobs. Lots of small files might results lots of small tasks which downgrade the performance of the Spark compute engine.

Frequent insert DML will introduce many small files in your tables' HDFS directory which will downgrade query performance and even impact HDFS name node stability.

To avoid too many small files:

  1. do not use "insert into tables values ..." in iterations.
  2. Trigger regular compact in your data processing compact table sales; # or compact table sales partition (site=0);

Analyze Tables/Columns

Table and column level statitics info can help the Spark compute engine to accelerate the query performance with accurate estimation. It is suggested that you populate it after your regular data processing.

analyze table sales computer statistics -- table level analyze table sales compute statistics for columns item_id -- column level analyze tables in db_name compute statistics -- analyze all tables in db_name

Misc

Do not use complex view definition.

Good Practices

  1. use partition/bucket columns in filtering conditions
  2. Spark 3 automatically handle simple situations of data skew, however, it doesn't work for complicated query especially when data skew happens in sub quries. It is suggested that you manually optimize your query for data skew issues at this time.
  3. cast data types to be the same for bucket joins
  4. consolidate UDFs -> use a sub query to invoke a UDF once
  5. use view to manage data access only. Avoid complex logics in a veiw definition. NEVER nest view logic in your data process flow!
  6. Add explain optimize before your query to analyze data and recommendation on your query!
  7. put small join table ahead (but I think Spark 3 handles this automatically now)
  8. A like X or A like Y -> A like any(X, Y)

Data Engineer

  1. use Parquet format, use Delta tables if you want to support update/delete, otherwise, use Hive tables
  2. partition/bucket on large tables and sort buckets
  3. reducing the number of small files
  4. avoid complex view definition
  5. data cache
  6. Bloom Filter Index
In [ ]:
 

Tricks and Trap on DataFrame.write.partitionBy and DataFrame.write.bucketBy

Partition is an important concept in Spark which affects Spark performance in many ways. When reading a table to Spark, the number of partitions in memory equals to the number of files on disk if each file is smaller than the block size, otherwise, there will be more partitions in memory than the number of files on disk. Generally speaking, there shouldn't be too many small files in a table as this cause too many partitions (and thus small tasks) in the Spark job. When you write a Spark DataFrame into disk, the number of files on disk usually equals to the number of partitions in memory unless you use partitionBy or bucketBy. Suppose there is a DataFrame df which has p partitions in memory and it has a column named col which has c distinct values $v_1$, ..., $v_c$, when you write df to disk using df.write.partitionBy(col), each of the p partitions in memory is written to separate partitions into the c directories on disk. This means that the final resulted number of partitions can be up to $c * p$. This is probably not what people want in most situations, instead, people often want exact $c$ partitions on disk when they call df.write.partitionBy(col). According to the above explanation on how Data.write.partitionBy works, a simple fix is to have each partition in memory corresponding to a distinct value in the columnd df.col. That is a repartition of the DataFrame using the col col resolves the issue.

:::python
df.repartition(col).partitionBy(col)

The above issue is not present when you DataFrame.write.bucketBy as DataFrame.write.bucketBy works by calculating hash code. There will always be the exact number of buckets/partitions on the disk as you specifed when you call the function DataFrame.write.bucketBy.

In [1]:
import findspark

findspark.init("/opt/spark-3.0.0-bin-hadoop3.2/")

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import *
from pyspark.sql.types import StructType

spark = SparkSession.builder.appName("PySpark_Union").enableHiveSupport().getOrCreate()
In [3]:
df = spark.read.option("header", "true").csv("../../home/media/data/daily.csv")
df = df.select(
    year("date").alias("year"), month("date").alias("month"), "date", "x", "y", "z"
).repartition(2)
df.show()
+----+-----+----------+------------------+------------------+------------------+
|year|month|      date|                 x|                 y|                 z|
+----+-----+----------+------------------+------------------+------------------+
|2018|   12|2018-12-12|          15218.66|343419.90721800004|136.56000000000003|
|2018|   12|2018-12-14|12127.650000000005|     252696.129202|125.28000000000002|
|2018|   12|2018-12-05| 35484.22999999998|     442708.934149|            230.76|
|2018|   10|2018-10-28|28418.420000000016|     515499.609327|268.80000000000007|
|2019|    1|2019-01-07|          29843.17|     375139.756514|172.62000000000003|
|2019|    1|2019-01-09|          30132.28|     212952.094433|            128.52|
|2018|   11|2018-11-22| 38395.96999999998|     437842.863362|            237.12|
|2018|   11|2018-11-23|          38317.15|391639.59950300003|212.22000000000003|
|2018|   12|2018-12-30| 7722.129999999999|     210282.286054| 85.80000000000003|
|2018|   10|2018-10-17|11101.180000000006|243019.40156300002|            150.84|
|2018|   12|2018-12-16|          12058.41|     366604.422485|            154.62|
|2018|   12|2018-12-04|35072.609999999986|     420124.638715|            222.78|
|2018|   12|2018-12-20| 5015.930000000001|120790.77259400001| 46.13999999999999|
|2018|   12|2018-12-15|11833.510000000006|276072.08876499993|            141.24|
|2018|   11|2018-11-30|          38306.17|     395650.858456|            243.12|
|2018|   11|2018-11-09|           25519.6|     287836.930741|184.01999999999995|
|2018|   10|2018-10-14|11152.390000000005| 311944.5214740002|198.05999999999992|
|2018|   10|2018-10-16|10974.380000000003|236304.82008200002|159.06000000000003|
|2019|    1|2019-01-03|          30953.24|383834.70136999997|            197.52|
|2019|    1|2019-01-06|          29520.23| 420714.7821390001|            217.98|
+----+-----+----------+------------------+------------------+------------------+
only showing top 20 rows

In [4]:
df.rdd.getNumPartitions()
Out[4]:
2
In [8]:
df.write.mode("overwrite").partitionBy("year").parquet("part_by_year.parquet")
In [11]:
!ls part_by_year.parquet/
_SUCCESS  year=2018 year=2019

Spark support multiple levels of partition.

In [9]:
df.write.mode("overwrite").partitionBy("year", "month").parquet(
    "part_by_year_month.parquet"
)
In [10]:
!ls part_by_year_month.parquet/year=2018/
month=10 month=11 month=12
In [24]:
spark.read.parquet("daily.parquet").rdd.getNumPartitions()
Out[24]:
4
In [17]:
df.repartition("year").write.mode("overwrite").partitionBy("year").parquet(
    "daily.parquet"
)
In [18]:
!ls daily.parquet/year=2018
part-00015-76ce0363-393a-4e1a-a387-488170fdcfbf.c000.snappy.parquet
In [19]:
!ls daily.parquet/year=2019
part-00081-76ce0363-393a-4e1a-a387-488170fdcfbf.c000.snappy.parquet
In [25]:
spark.read.parquet("daily.parquet").rdd.getNumPartitions()
Out[25]:
4
In [26]:
df.write.mode("overwrite").partitionBy("year").saveAsTable("daily_hive")
In [28]:
spark.table("daily_hive").rdd.getNumPartitions()
Out[28]:
4
In [29]:
df.createOrReplaceTempView("df")
In [35]:
spark.sql(
    """
    create table daily_hive_2
    using parquet     
    partitioned by (year) as
    select * from df
    """
)
Out[35]:
DataFrame[]
In [37]:
spark.table("daily_hive_2").rdd.getNumPartitions()
Out[37]:
4

Filtering Optimization Leveraging Bucketed Columns

Spark 3

In [1]:
import findspark

findspark.init("/opt/spark-3.0.0-bin-hadoop3.2/")

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import *
from pyspark.sql.types import StructType

spark = SparkSession.builder.appName("PySpark_Union").enableHiveSupport().getOrCreate()
In [2]:
df = spark.read.option("header", "true").csv("../../home/media/data/daily.csv")
df = df.repartition(2)
df.show()
+----------+------------------+------------------+------------------+
|      date|                 x|                 y|                 z|
+----------+------------------+------------------+------------------+
|2018-10-22|           10779.9|234750.19368899995|150.78000000000003|
|2018-12-07|15637.329999999998|281424.52784600004|147.36000000000004|
|2018-12-21|           4797.22|106753.64014699995|             47.46|
|2018-10-17|11101.180000000006|243019.40156300002|            150.84|
|2018-11-09|           25519.6|     287836.930741|184.01999999999995|
|2018-11-28|           39134.8|446640.72524799994|             225.3|
|2018-12-14|12127.650000000005|     252696.129202|125.28000000000002|
|2018-12-09|          14820.05|     407420.724814|167.81999999999996|
|2018-11-27|38929.669999999984|441879.99280600005|244.50000000000009|
|2018-12-18|           7623.48|     189779.703736| 90.05999999999996|
|2018-12-20| 5015.930000000001|120790.77259400001| 46.13999999999999|
|2019-01-02|          29647.83|     379943.385348|             199.2|
|2018-11-14|25252.369999999995|337906.34417199995|174.30000000000007|
|2018-10-31|          27578.91| 352146.2405870001|            216.84|
|2018-12-28| 7838.080000000003|     203588.781784|              84.6|
|2018-12-13|13612.409999999998|321809.42337600014|             137.7|
|2018-10-24| 24706.41000000001|     353630.071363|            249.78|
|2018-11-04|25480.279999999995|446580.81299899996|            256.14|
|2018-10-13|10977.450000000004|237820.89652399998|            143.88|
|2018-11-25|          38150.71|492148.65518500004| 286.7399999999999|
+----------+------------------+------------------+------------------+
only showing top 20 rows

In [7]:
df.rdd.getNumPartitions()
Out[7]:
2
In [4]:
df.write.bucketBy(10, "date").saveAsTable("daily_b2")
In [5]:
spark.table("daily_b2").rdd.getNumPartitions()
Out[5]:
10

Notice the execution plan does leverage bucketed columns for optimization.

In [5]:
spark.sql(
    """
    select 
        * 
    from 
        daily_b
    where
        date = "2019-01-11"
    """
).explain()
== Physical Plan ==
*(1) Project [date#53, x#54, y#55, z#56]
+- *(1) Filter (isnotnull(date#53) AND (date#53 = 2019-01-11))
   +- *(1) ColumnarToRow
      +- FileScan parquet default.daily_b[date#53,x#54,y#55,z#56] Batched: true, DataFilters: [isnotnull(date#53), (date#53 = 2019-01-11)], Format: Parquet, Location: InMemoryFileIndex[file:/opt/spark-3.0.0-bin-hadoop3.2/warehouse/daily_b], PartitionFilters: [], PushedFilters: [IsNotNull(date), EqualTo(date,2019-01-11)], ReadSchema: struct<date:string,x:string,y:string,z:string>, SelectedBucketsCount: 1 out of 10


Spark 2.3

In [2]:
import findspark

findspark.init("/opt/spark-2.3.4-bin-hadoop2.7/")

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import *
from pyspark.sql.types import StructType

spark23 = (
    SparkSession.builder.appName("PySpark_Union").enableHiveSupport().getOrCreate()
)
In [5]:
df = spark23.read.option("header", "true").csv("../../home/media/data/daily.csv")
df.show()
+----------+------------------+------------------+------------------+
|      date|                 x|                 y|                 z|
+----------+------------------+------------------+------------------+
|2019-01-11|               0.0|               0.0|               0.0|
|2019-01-10| 30436.96000000001|               0.0|               0.0|
|2019-01-09|          30132.28|     212952.094433|            128.52|
|2019-01-08|29883.240000000005|      352014.45016|            192.18|
|2019-01-07|          29843.17|     375139.756514|172.62000000000003|
|2019-01-06|          29520.23| 420714.7821390001|            217.98|
|2019-01-05|          29308.36|376970.94769900007|             183.3|
|2019-01-04|31114.940000000013|339321.70448899985|174.59999999999997|
|2019-01-03|          30953.24|383834.70136999997|            197.52|
|2019-01-02|          29647.83|     379943.385348|             199.2|
|2019-01-01| 9098.830000000004|     221854.328826|             88.26|
|2018-12-31|3522.9299999999994| 76976.74379300003| 30.83999999999999|
|2018-12-30| 7722.129999999999|     210282.286054| 85.80000000000003|
|2018-12-29|           7731.69|     184870.553121|             88.86|
|2018-12-28| 7838.080000000003|     203588.781784|              84.6|
|2018-12-27| 8031.669999999997|245940.99543200003|             90.84|
|2018-12-26| 8819.809999999998|     194513.682991|             77.52|
|2018-12-25| 6549.109999999998|136605.95935000002| 65.75999999999999|
|2018-12-24|           5015.84| 87121.95556000003|             44.52|
|2018-12-23| 5145.909999999998|     137563.979567|             52.02|
+----------+------------------+------------------+------------------+
only showing top 20 rows

In [6]:
df.write.bucketBy(10, "date").saveAsTable("daily_b")
In [8]:
spark23.table("daily_b").rdd.getNumPartitions()
Out[8]:
10

Notice the execution plan does not leverage bucketed columns for optimization.

In [9]:
spark23.sql(
    """
    select 
        * 
    from 
        daily_b
    where
        date = "2019-01-11"
    """
).explain()
== Physical Plan ==
*(1) Project [date#44, x#45, y#46, z#47]
+- *(1) Filter (isnotnull(date#44) && (date#44 = 2019-01-11))
   +- *(1) FileScan parquet default.daily_b[date#44,x#45,y#46,z#47] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/opt/spark-2.3.4-bin-hadoop2.7/warehouse/daily_b], PartitionFilters: [], PushedFilters: [IsNotNull(date), EqualTo(date,2019-01-11)], ReadSchema: struct<date:string,x:string,y:string,z:string>
In [ ]:
 

Comments