datarekha
Data Engineering Hard Asked at DatabricksAsked at AmazonAsked at GoogleAsked at LinkedInAsked at Netflix

How does the Spark Catalyst optimizer work, and what does Adaptive Query Execution add?

The short answer

Catalyst is a rule-based and cost-based query optimizer that transforms a logical plan through four phases — analysis, logical optimization, physical planning, and code generation — before any data is touched. Adaptive Query Execution (AQE), introduced in Spark 3, extends this by re-optimizing the physical plan at runtime using actual shuffle statistics rather than stale estimates.

How to think about it

Catalyst is the reason DataFrame/SQL code often outperforms hand-written RDD code. It applies transformations that a human optimizer would never bother to write manually.

The four Catalyst phases

1. Analysis — resolves column names and types against the catalog. Fails fast on missing columns (at plan time, not runtime).

2. Logical optimization — applies rule-based transformations to the logical plan:

  • Predicate push-down: moves filters as early as possible, ideally into the data source scan.
  • Constant folding: evaluates 1 + 1 at plan time.
  • Column pruning: drops unreferenced columns before they are read.
  • Projection collapsing: merges multiple sequential select calls.

3. Physical planning — converts the logical plan into one or more physical plans and uses a cost model (based on table statistics) to pick the best one. This is where join strategy selection happens (broadcast vs sort-merge vs hash).

4. Whole-stage code generation — Janino compiles the chosen physical plan into a single tight Java bytecode loop, eliminating virtual function dispatch and interpreter overhead.

# Catalyst sees this single query and pushes the filter into the Parquet scan,
# reads only the 'name' column, and fuses the filter + select into one codegen stage.
spark.read.parquet("users/") \
    .filter("age > 30") \
    .select("name") \
    .count()

Adaptive Query Execution (Spark 3+)

The core insight: cost-model estimates are wrong for skewed or filtered data. AQE waits for shuffle statistics from completed stages and re-optimizes mid-execution.

Three concrete AQE features:

FeatureConfigWhat it does
Coalescing shuffle partitionsspark.sql.adaptive.coalescePartitions.enabledMerges tiny post-shuffle partitions into fewer larger ones
Converting joinsspark.sql.adaptive.localShuffleReader.enabledSwitches a sort-merge join to a broadcast join if the runtime size is small
Skew join handlingspark.sql.adaptive.skewJoin.enabledSplits oversized partitions into sub-partitions before the join
# Enable AQE (on by default in Spark 3.2+)
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")

Keep practising

All Data Engineering questions

Explore further

Skip to content