Ben Chuanlong Du's Blog

And let it direct your passion with reason.

Persist and Checkpoint DataFrames in Spark

Persist vs Checkpoint

Spark Internals - 6-CacheAndCheckpoint.md has a good explanation of persist vs checkpoint.

  1. Persist/Cache in Spark is lazy and doesn't truncate the lineage while checkpoint is eager (by default) and truncates the lineage.

  2. Generally speaking, DataFrame.persist has a better performance than DataFrame.checkpoint. However, DataFrame.checkpoint is more robust and is preferred in any of the following situations.

    • When running Spark applications on a noisry cluster
    • When a DataFrame is computed using lots of partitions (which increases the chance of node failures)
    • When you want to be able to recover from a failed Spark application using checkpoints
  3. Due to issues/bugs in persist/cache and checkpoint, (see the next 2 sections for details), manually writing a DataFrame into disk and then read it back can be more efficient than persisting to disk and checkpoint.

Tips & Traps for Persist/Cache

  1. The method DataFrame.cache is a special case of DataFrame.persist. DataFrame.cache caches a DataFrame to the default storage level (MEMORY_AND_DISK) which is equivalent to DataFrame.persist() (with the default behavior). However, DataFrame.persist is more flexible on the storage leve and is preferred over DataFrame.cache.

  2. The definition of the class pyspark.StorageLevel is as below.

     :::python
     class pyspark.StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication = 1)
         ...
    
    

    And it has the following pre-defined instances.

    • DISK_ONLY = StorageLevel(True, False, False, False, 1)

    • DISK_ONLY_2 = StorageLevel(True, False, False, False, 2)

    • MEMORY_AND_DISK = StorageLevel(True, True, False, False, 1)

    • MEMORY_AND_DISK_2 = StorageLevel(True, True, False, False, 2)

    • MEMORY_AND_DISK_SER = StorageLevel(True, True, False, False, 1)

    • MEMORY_AND_DISK_SER_2 = StorageLevel(True, True, False, False, 2)

    • MEMORY_ONLY = StorageLevel(False, True, False, False, 1)

    • MEMORY_ONLY_2 = StorageLevel(False, True, False, False, 2)

    • MEMORY_ONLY_SER = StorageLevel(False, True, False, False, 1)

    • MEMORY_ONLY_SER_2 = StorageLevel(False, True, False, False, 2)

    • OFF_HEAP = StorageLevel(True, True, True, False, 1)

  3. The method DataFrame.persist returns itself, which means that you can chain methods after it.

  4. Persist a DataFrame which is used multiple times and expensive to recompute. Remembe to unpersist it too when the DataFrame is no longer needed. Even Spark evict data from memory using the LRU (least recently used) strategy when the caching layer becomes full, it is still beneficial to unpersist data as soon as it is no used any more to reduce memory usage.

  5. Persisting too many DataFrames into memory can cause memory issues. There are a few ways to address memory issues caused by this.

    • Increase memory.
    • Persist only the most reused DataFrames into memory.
    • Persist other DataFrame into disk. Generally speaking,
  6. The option spark.history.store.maxDiskUsage controls the maximum disk usage for the local directory where the cache application history information are stored. The default is 10G. You can set it to a larger value if you need to persist large DataFrames to disk.

  7. DataFrame.persist is lazy which means that Spark does not compute and persist a DataFrame immediately but waits until an RDD action on the DataFrame. This might have 2 undesired side effects.

    • Other operations/transformation might get optimized together into the execution plan of the DataFrame which significantly hurts the performance of Spark. Please refer to the Tips and Traps section of the article Control Number of Partitions of a DataFrame in Spark for more detailed discussion of such an example (and ways to address the problem).
    • Too large execution plan without eager caching/persist might make a Spark application fail.

      If you'd like persist/cache a Spark DataFrame eagerly, you can manually call the method DataFrame.count (which is a RDD action) after DataFrame.persist to trigger it. Do NOT use the method DataFrame.first instead of DataFrame.count in this case, as even though DataFrame.first is also a RDD action, it will not trigger a full DataFrame persist/caching but instead only the partition from which a row was retrieved.

      :::python df.cache().count()

  8. I encountered a tricky issue with persist/cache that it computes the DataFrame to be cached twice (no matter which persist option is specified) , sort of like the bug on checkpoint . Personally speaking, I have no idea whether it is a bug in the community version of Spark or it is a bug in the enterprise version that is used at my workplace as I don't see any issue on this raised against the community version of Spark.

  9. Another tricky issue that I countered with persist/cache is that for a Spark DataFrame with a large execution plan, persist/cache fails to work (no matter which persist option is used and how large the option spark.history.store.maxDiskUsage is set to) . Triggering an eager persist by calling DataFrame.count immediate after DataFrame.persist often helps. Replacing DataFrame.persist with DataFrame.checkpoint also makes the application work.

Tips & Trap for Checkpoint

  1. You have to manually specify a checkpoint directory if you'd like to use DataFrame.checkpoint.

     :::python
     spark.sparkContext.setCheckpointDir("/hdfs/path/for/checkpoints")
    
    

    Spark does not clean a checkpoint directory by default, so that you can reuse a checkpoint directory for recovering failed applications or speed up other applications sharing identical computations. You can manually remove a checkpoint directory if it is no longer needed or you can set the following configuration if you want Spark to auto clean the checkpoint directory of an application after it completes running.

     :::bash
     --conf spark.cleaner.referenceTracking.cleanCheckpoints=true
  2. DataFrame.checkpoints has a bug which computes the DataFrame twice before Spark 3.3.0. It is more efficient to manually write a DataFrame to the disk if you need to store the computed DataFrame on disk, triage the lineage, but don't really care about auto recovery of jobs. If you don't need to triage the lineage either (which is often NOT the case) , you can just persist the DataFrame to disk instead of using checkpoint.

In [6]:
import pandas as pd
import findspark

findspark.init("/opt/spark-3.0.2-bin-hadoop3.2/")

from pyspark.sql import SparkSession, DataFrame
from pyspark.storagelevel import StorageLevel
from pyspark.sql.functions import *
from pyspark.sql.types import IntegerType, StringType, StructType

spark = SparkSession.builder.appName("PySpark UDF").enableHiveSupport().getOrCreate()
In [5]:
df = spark.createDataFrame(
    pd.DataFrame(
        data=(
            (1, "a", "foo", 3.0),
            (1, "b", "bar", 4.0),
            (3, "c", "foo", 5.0),
            (4, "d", "bar", 7.0),
        ),
        columns=("col1", "col2", "col3", "col4"),
    )
)
df.show()
+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
|   1|   a| foo| 3.0|
|   1|   b| bar| 4.0|
|   3|   c| foo| 5.0|
|   4|   d| bar| 7.0|
+----+----+----+----+

Persist df to memory.

In [7]:
df.persist(StorageLevel.MEMORY_ONLY)
Out[7]:
DataFrame[col1: bigint, col2: string, col3: string, col4: double]

Verify that df has been persisted to memory.

In [8]:
df.storageLevel
Out[8]:
StorageLevel(False, True, False, False, 1)
In [ ]:
 

Comments