How does the Spark Catalyst optimizer work, and what does Adaptive Query Execution add?
Catalyst is a rule-based and cost-based query optimizer that transforms a logical plan through four phases — analysis, logical optimization, physical planning, and code generation — before any data is touched. Adaptive Query Execution (AQE), introduced in Spark 3, extends this by re-optimizing the physical plan at runtime using actual shuffle statistics rather than stale estimates.
How to think about it
Catalyst is the reason DataFrame/SQL code often outperforms hand-written RDD code. It applies transformations that a human optimizer would never bother to write manually.
The four Catalyst phases
1. Analysis — resolves column names and types against the catalog. Fails fast on missing columns (at plan time, not runtime).
2. Logical optimization — applies rule-based transformations to the logical plan:
- Predicate push-down: moves filters as early as possible, ideally into the data source scan.
- Constant folding: evaluates
1 + 1at plan time. - Column pruning: drops unreferenced columns before they are read.
- Projection collapsing: merges multiple sequential
selectcalls.
3. Physical planning — converts the logical plan into one or more physical plans and uses a cost model (based on table statistics) to pick the best one. This is where join strategy selection happens (broadcast vs sort-merge vs hash).
4. Whole-stage code generation — Janino compiles the chosen physical plan into a single tight Java bytecode loop, eliminating virtual function dispatch and interpreter overhead.
# Catalyst sees this single query and pushes the filter into the Parquet scan,
# reads only the 'name' column, and fuses the filter + select into one codegen stage.
spark.read.parquet("users/") \
.filter("age > 30") \
.select("name") \
.count()
Adaptive Query Execution (Spark 3+)
The core insight: cost-model estimates are wrong for skewed or filtered data. AQE waits for shuffle statistics from completed stages and re-optimizes mid-execution.
Three concrete AQE features:
| Feature | Config | What it does |
|---|---|---|
| Coalescing shuffle partitions | spark.sql.adaptive.coalescePartitions.enabled | Merges tiny post-shuffle partitions into fewer larger ones |
| Converting joins | spark.sql.adaptive.localShuffleReader.enabled | Switches a sort-merge join to a broadcast join if the runtime size is small |
| Skew join handling | spark.sql.adaptive.skewJoin.enabled | Splits oversized partitions into sub-partitions before the join |
# Enable AQE (on by default in Spark 3.2+)
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")