What causes out-of-memory errors in Spark and how do you diagnose and fix them?
Spark OOM errors fall into two categories: driver OOM (usually from collect() or large broadcast tables) and executor OOM (from insufficient heap for task execution, shuffle buffers, or cached data). Diagnosing requires reading the Spark UI event log to identify which stage failed and whether the failure is in storage, execution, or user memory.
How to think about it
OOM in Spark is not one problem — it is several different problems that look identical in the stack trace.
Executor memory layout
Spark executor heap is divided into three regions:
- Reserved memory (300 MB hardcoded) — JVM overhead, never configurable.
- Unified memory (
spark.memory.fraction, default 0.6 of usable heap) — shared between execution (shuffle, sort, hash joins) and storage (cached partitions). Execution can borrow from storage and vice versa; storage is evicted first. - User memory (remainder) — user data structures, UDF closures, collected broadcast variables.
# Total heap per executor: spark.executor.memory
# Overhead for JVM/native: spark.executor.memoryOverhead (default 10% or 384 MB)
# Total container ask = executor.memory + memoryOverhead
spark.conf.set("spark.executor.memory", "8g")
spark.conf.set("spark.executor.memoryOverhead", "2g") # raise for native libs, Python UDFs
Common OOM causes and fixes
1. collect() on a large dataset (driver OOM)
# Bad: pulls 1 billion rows into driver
data = df.collect()
# Fix: write to storage, or sample
df.write.parquet("output/")
sample = df.limit(1000).collect()
2. Shuffle partition too large (executor OOM during reduce)
Each reduce task must hold its partition in memory. If spark.sql.shuffle.partitions is too low, each partition is huge.
# Raise shuffle partitions to reduce per-task data volume
spark.conf.set("spark.sql.shuffle.partitions", "800")
# Or enable AQE to set this automatically
spark.conf.set("spark.sql.adaptive.enabled", "true")
3. Skewed partition (one task processes all data)
One task gets a massive partition because of a high-cardinality hot key. Fix with salting or AQE skew join (see the skew/salting question).
4. Caching too much data
Cached partitions consume storage memory. If execution tasks cannot evict enough cached data, they spill to disk or OOM.
# Unpersist DataFrames you no longer need
df.unpersist()
5. Python UDF / PySpark overhead (memoryOverhead OOM)
Python worker processes live outside the JVM. Large objects passed between the JVM and Python via pickle inflate memoryOverhead. Use Pandas UDFs (vectorized) or native Spark SQL functions instead.
# Bad: row-at-a-time Python UDF, high serialization overhead
from pyspark.sql.functions import udf
my_udf = udf(lambda x: x * 2, "double")
# Better: pandas UDF, batch processing
from pyspark.sql.functions import pandas_udf
@pandas_udf("double")
def my_fast_udf(s):
return s * 2