Persist vs Checkpoint¶
Spark Internals - 6-CacheAndCheckpoint.md has a good explanation of persist vs checkpoint.
Persist/Cache in Spark is lazy and doesn't truncate the lineage while checkpoint is eager (by default) and truncates the lineage.
Generally speaking,
DataFrame.persist
has a better performance thanDataFrame.checkpoint
. However,DataFrame.checkpoint
is more robust and is preferred in any of the following situations.- When running Spark applications on a noisry cluster
- When a DataFrame is computed using lots of partitions (which increases the chance of node failures)
- When you want to be able to recover from a failed Spark application using checkpoints
Due to issues/bugs in persist/cache and checkpoint, (see the next 2 sections for details), manually writing a DataFrame into disk and then read it back can be more efficient than persisting to disk and checkpoint.
Tips & Traps for Persist/Cache¶
The method
DataFrame.cache
is a special case ofDataFrame.persist
.DataFrame.cache
caches a DataFrame to the default storage level (MEMORY_AND_DISK
) which is equivalent toDataFrame.persist()
(with the default behavior). However,DataFrame.persist
is more flexible on the storage leve and is preferred overDataFrame.cache
.The definition of the class
pyspark.StorageLevel
is as below.:::python class pyspark.StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication = 1) ...
And it has the following pre-defined instances.
DISK_ONLY = StorageLevel(True, False, False, False, 1)
DISK_ONLY_2 = StorageLevel(True, False, False, False, 2)
MEMORY_AND_DISK = StorageLevel(True, True, False, False, 1)
MEMORY_AND_DISK_2 = StorageLevel(True, True, False, False, 2)
MEMORY_AND_DISK_SER = StorageLevel(True, True, False, False, 1)
MEMORY_AND_DISK_SER_2 = StorageLevel(True, True, False, False, 2)
MEMORY_ONLY = StorageLevel(False, True, False, False, 1)
MEMORY_ONLY_2 = StorageLevel(False, True, False, False, 2)
MEMORY_ONLY_SER = StorageLevel(False, True, False, False, 1)
MEMORY_ONLY_SER_2 = StorageLevel(False, True, False, False, 2)
OFF_HEAP = StorageLevel(True, True, True, False, 1)
The method
DataFrame.persist
returns itself, which means that you can chain methods after it.Persist a DataFrame which is used multiple times and expensive to recompute. Remembe to unpersist it too when the DataFrame is no longer needed. Even Spark evict data from memory using the LRU (least recently used) strategy when the caching layer becomes full, it is still beneficial to unpersist data as soon as it is no used any more to reduce memory usage.
Persisting too many DataFrames into memory can cause memory issues. There are a few ways to address memory issues caused by this.
- Increase memory.
- Persist only the most reused DataFrames into memory.
- Persist other DataFrame into disk. Generally speaking,
The option
spark.history.store.maxDiskUsage
controls the maximum disk usage for the local directory where the cache application history information are stored. The default is 10G. You can set it to a larger value if you need to persist large DataFrames to disk.DataFrame.persist
is lazy which means that Spark does not compute and persist a DataFrame immediately but waits until an RDD action on the DataFrame. This might have 2 undesired side effects.- Other operations/transformation might get optimized together into the execution plan of the DataFrame which significantly hurts the performance of Spark. Please refer to the Tips and Traps section of the article Control Number of Partitions of a DataFrame in Spark for more detailed discussion of such an example (and ways to address the problem).
Too large execution plan without eager caching/persist might make a Spark application fail.
If you'd like persist/cache a Spark DataFrame eagerly, you can manually call the method
DataFrame.count
(which is a RDD action) afterDataFrame.persist
to trigger it. Do NOT use the methodDataFrame.first
instead ofDataFrame.count
in this case, as even thoughDataFrame.first
is also a RDD action, it will not trigger a full DataFrame persist/caching but instead only the partition from which a row was retrieved.:::python df.cache().count()
I encountered a tricky issue with persist/cache that it computes the DataFrame to be cached twice (no matter which persist option is specified) , sort of like the bug on checkpoint . Personally speaking, I have no idea whether it is a bug in the community version of Spark or it is a bug in the enterprise version that is used at my workplace as I don't see any issue on this raised against the community version of Spark.
Another tricky issue that I countered with persist/cache is that for a Spark DataFrame with a large execution plan, persist/cache fails to work (no matter which persist option is used and how large the option
spark.history.store.maxDiskUsage
is set to) . Triggering an eager persist by callingDataFrame.count
immediate afterDataFrame.persist
often helps. ReplacingDataFrame.persist
withDataFrame.checkpoint
also makes the application work.
Tips & Trap for Checkpoint¶
You have to manually specify a checkpoint directory if you'd like to use
DataFrame.checkpoint
.:::python spark.sparkContext.setCheckpointDir("/hdfs/path/for/checkpoints")
Spark does not clean a checkpoint directory by default, so that you can reuse a checkpoint directory for recovering failed applications or speed up other applications sharing identical computations. You can manually remove a checkpoint directory if it is no longer needed or you can set the following configuration if you want Spark to auto clean the checkpoint directory of an application after it completes running.
:::bash --conf spark.cleaner.referenceTracking.cleanCheckpoints=true
DataFrame.checkpoints
has a bug which computes the DataFrame twice before Spark 3.3.0. It is more efficient to manually write a DataFrame to the disk if you need to store the computed DataFrame on disk, triage the lineage, but don't really care about auto recovery of jobs. If you don't need to triage the lineage either (which is often NOT the case) , you can just persist the DataFrame to disk instead of using checkpoint.
import pandas as pd
import findspark
findspark.init("/opt/spark-3.0.2-bin-hadoop3.2/")
from pyspark.sql import SparkSession, DataFrame
from pyspark.storagelevel import StorageLevel
from pyspark.sql.functions import *
from pyspark.sql.types import IntegerType, StringType, StructType
spark = SparkSession.builder.appName("PySpark UDF").enableHiveSupport().getOrCreate()
df = spark.createDataFrame(
pd.DataFrame(
data=(
(1, "a", "foo", 3.0),
(1, "b", "bar", 4.0),
(3, "c", "foo", 5.0),
(4, "d", "bar", 7.0),
),
columns=("col1", "col2", "col3", "col4"),
)
)
df.show()
Persist df
to memory.
df.persist(StorageLevel.MEMORY_ONLY)
Verify that df
has been persisted to memory.
df.storageLevel
References¶
https://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/sql/Dataset.html
https://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/sql/functions.html
https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Row.html
https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/storage/StorageLevel.html
https://luminousmen.com/post/explaining-the-mechanics-of-spark-caching
https://towardsdatascience.com/best-practices-for-caching-in-spark-sql-b22fb0f02d34