-
PySpark 2.4 and older does not support Python 3.8. You have to use Python 3.7 with PySpark 2.4 or older.
-
It can be extremely helpful to run a PySpark application locally to detect possible issues before submitting it to the Spark cluster.
#!/usr/bin/env bash PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin \ /path/to/spark-2.3.1-bin-hadoop2.7/bin/spark-submit \ --conf spark.yarn.maxAppAttempts=1 \ --conf spark.yarn.appMasterEnv.ARROW_PRE_0_15_IPC_FORMAT=1 \ --conf spark.executorEnv.ARROW_PRE_0_15_IPC_FORMAT=1 \ --conf spark.sql.execution.arrow.enabled=True \ --conf spark.pyspark.driver.python=.venv/bin/python3 \ --conf spark.pyspark.python=.venv/bin/python3 \ script_to_run.py --arg1 v1 --arg2 v2
-
You can run PySpark interactively using the
pyspark
command and submit a PySpark job to the cluster using thespark-submit
command. For more details, please refer to Launching Applications with spark-submit. Below is an example of shell script for submitting a PySpark job usingspark-submit
.#!/bin/bash /apache/spark2.3/bin/spark-submit \ --files "file:///apache/hive/conf/hive-site.xml,file:///apache/hadoop/etc/hadoop/ssl-client.xml,file:///apache/hadoop/etc/hadoop/hdfs-site.xml,file:///apache/hadoop/etc/hadoop/core-site.xml,file:///apache/hadoop/etc/hadoop/federation-mapping.xml" \ --master yarn \ --deploy-mode cluster \ --queue YOUR_QUEUE \ --num-executors 200 \ --executor-memory 10G \ --driver-memory 15G \ --executor-cores 4 \ --conf spark.yarn.maxAppAttempts=2 \ --conf spark.dynamicAllocation.enabled=true \ --conf spark.dynamicAllocation.maxExecutors=1000 \ --conf spark.network.timeout=300s \ --conf spark.executor.memoryOverhead=2G \ --conf spark.yarn.appMasterEnv.ARROW_PRE_0_15_IPC_FORMAT=1 \ --conf spark.executorEnv.ARROW_PRE_0_15_IPC_FORMAT=1 \ --conf spark.sql.execution.arrow.enabled=True \ --conf spark.executor.extraJavaOptions=-XX:MaxDirectMemorySize=8G \ --conf spark.pyspark.driver.python=/usr/share/anaconda3/bin/python \ --conf spark.pyspark.python=/usr/share/anaconda3/bin/python \ /path/to/_pyspark.py
If you use a portable Python environment named
env.tar.gz
, you can submit a PySpark application using the following shell script.#!/bin/bash /apache/spark2.3/bin/spark-submit \ --files "file:///apache/hive/conf/hive-site.xml,file:///apache/hadoop/etc/hadoop/ssl-client.xml,file:///apache/hadoop/etc/hadoop/hdfs-site.xml,file:///apache/hadoop/etc/hadoop/core-site.xml,file:///apache/hadoop/etc/hadoop/federation-mapping.xml" \ --master yarn \ --deploy-mode cluster \ --queue YOUR_QUEUE \ --num-executors 200 \ --executor-memory 10G \ --driver-memory 15G \ --executor-cores 4 \ --conf spark.yarn.maxAppAttempts=2 \ --conf spark.dynamicAllocation.enabled=true \ --conf spark.dynamicAllocation.maxExecutors=1000 \ --conf spark.network.timeout=300s \ --conf spark.executor.memoryOverhead=2G \ --conf spark.yarn.appMasterEnv.ARROW_PRE_0_15_IPC_FORMAT=1 \ --conf spark.executorEnv.ARROW_PRE_0_15_IPC_FORMAT=1 \ --conf spark.sql.execution.arrow.enabled=True \ --conf spark.executor.extraJavaOptions=-XX:MaxDirectMemorySize=8G \ --conf spark.pyspark.driver.python=./env/bin/python \ --conf spark.pyspark.python=./env/bin/python \ --archives env.tar.gz#env \ $1
And below is a simple example of
_pyspark.py
.from pyspark.sql import SparkSession from pyspark.sql.functions import * spark = SparkSession.builder.appName("Test PySpark").enableHiveSupport().getOrCreate() sql = """ SELECT * FROM some_table TableSample (100000 Rows) """ spark.sql(sql).write.mode("overwrite").parquet("output")
Notice that I have prefixed an underscore to the name of the file. This is a simple but useful trick to avoid unintentional module conflictions in Python. For example, if you name your PySpark script
pyspark.py
, your PySpark application will fail to work as your scriptpyspark.py
is loaded as a module namedpyspark
which hides the officialpyspark
module in Python. It is suggestion that adopt the trick of "prefixing an underscore to file names" when submitting a PySpark job. -
If you have multiple versions of Spark installed, the exported Spark environment variables might intervent with each other and cause some of them fail to work. For example, if you have both a cluster version of Spark and a local version of Spark installed, you might failed to submit Spark applications using
spark-submit
. One simple fix to this problem is to manually configure a right version of thePATH
environemnt varible before you invoke the commandspark-submit
of the local version of Spark.PATH=/bin:/sbin:/usr/bin:/usr/sbin /path/to/spark-submit ...
As a matter of fact, this is the way to fix most PATH related issues in Linux/Unix.
-
PySpark does not support converting
$"col"
to a Column implicitly. However, the functionpyspark.sql.functions.col
works the same as in Spark. -
Pandas UDFs is preferred to UDFs. Please refer to User-defined Function (UDF) in PySpark for more discussions.
-
pyspark-stubs can be leveraged for static type checking for PySpark project.
Use PySpark in Jupyter/Lab Notebooks
-
The trick is to use the Python library
findspark
to find and initiate Spark for use in notebook.from pathlib import Path import findspark findspark.init(str(next(Path("/opt/").glob("spark-*")))) from pyspark.sql import SparkSession, DataFrame spark = SparkSession.builder.appName("PySpark_Notebook") \ .enableHiveSupport().getOrCreate()
Notice that the following 2 lines of code must be before any code that involving Spark. The example here might seems obvious, however, it might not so obvious if you import a module which creates a SparkSession object.
from pathlib import Path import findspark findspark.init(str(next(Path("/opt/").glob("spark-*"))))
-
When working with relatively large data in a local version of Spark in Jupyter/Lab notebook, you might easily encounter OOM errors. The trick to increase the driver memory using the option
.config("spark.driver.memory", "50g")
.from pathlib import Path import findspark findspark.init(str(next(Path("/opt/").glob("spark-*")))) from pyspark.sql import SparkSession, DataFrame spark = SparkSession.builder.appName("PySpark_Notebook") \ .master("local[*]") \ .config("spark.driver.memory", "50g") \ .enableHiveSupport().getOrCreate()
-
The command-line option
--jars
is equivalent tospark.jars
when you use--conf
. This means that you can use.config("spark.jars", "/path/to/file.jar")
to add JARs to a Spark/PySpark application in Jupyter/Lab notebook.from pathlib import Path import findspark findspark.init(str(next(Path("/opt/").glob("spark-*")))) from pyspark.sql import SparkSession, DataFrame spark = SparkSession.builder.appName("PySpark_Notebook") \ .config("spark.jars", "/path/to/file.jar") \ .config("spark.driver.memory", "50g") \ .enableHiveSupport().getOrCreate()
-
If you want to leverage a Spark cluster in a Jupyter/Lab notebook, there are a few things (Hadoop queue, driver IP and driver port) you need to configure. Below is an illustration.
import socket import findspark findspark.init("/apache/spark2.3") from pyspark.sql import SparkSession, DataFrame spark = SparkSession.builder.master("yarn").appName("PySpark_Cluster") \ .config("spark.yarn.queue", "your-hadoop-queue>") \ .config("spark.driver.host", socket.gethostbyname(socket.gethostname())) \ .config("spark.driver.port", "30202") \ .enableHiveSupport().getOrCreate()
Notice that the SparkContext object will be expired after inactive for a while. It won't help if you run the above code again in notebook because a SparkSession (even if it is invalid any more due to expiration of the underlying SparkContext) has already been created. You can of course restart the Python kernel and then create a new SparkSession. However, you will lose all Python objects in the notebook by doing this. A better alternative is to manually stop the SparkSession by calling
spark.stop()
and then run the above code in notebook again. This way, the Python objects (e.g., pandas DataFrame you have created) will still be alive for you to use. -
When you run Spark or PySpark in a Jupyter/Lab notebook, it is recommended that you show ERROR messages only. Otherwise, there might be too much logging information polluting your notebook. You can set the log level of Spark to
ERROR
using the following line of code.spark.sparkContext.setLogLevel("ERROR")
For more details, please refer to Configure Log4J for Spark .
Python Dependencies for PySpark
Listed below are several ways to handle dependencies for PySpark.
- Build a portable Python environment using python-build-standalone
- Build a portable Python environment using conda-pack
- Packaging Python dependencies using pex
Building a portable Python environment using python-build-standalone is the recommended approach.