What is a shuffle in Spark and why is it expensive?
A shuffle redistributes data across all executors so that rows with the same key end up on the same partition. It involves writing intermediate data to disk, transferring it over the network, and re-reading it — making it the most costly operation in a Spark job in terms of latency and I/O.
How to think about it
A shuffle is triggered whenever Spark needs to co-locate data that currently lives on different partitions — most commonly during groupBy, join, distinct, and repartition.
What happens during a shuffle
- Map phase — each task hashes every row’s key and writes partitioned files (shuffle files) to local disk on the executor.
- Transfer phase — downstream tasks fetch the shuffle files they need from remote executors over the network.
- Reduce phase — each downstream task sorts and aggregates the received data to produce its output partition.
# This single groupBy triggers a full shuffle
df.groupBy("region").agg({"revenue": "sum"})
Why it is slow
- Disk I/O — shuffle data is written to and read from disk, bypassing in-memory speed.
- Network I/O — every executor may send data to every other executor (all-to-all pattern).
- Sort — data must be sorted by key before writing shuffle files.
- Serialization — every row is serialized for disk/network and deserialized on receipt.
On a 100-node cluster with 100 GB per node, a pathological shuffle can move terabytes over the network.
Reducing shuffle cost
# Bad: groupByKey materializes all values per key on one executor
rdd.groupByKey().mapValues(sum)
# Better: reduceByKey partially aggregates on the map side first
rdd.reduceByKey(lambda a, b: a + b)
Other strategies: broadcast joins (eliminate the join shuffle entirely), bucketing pre-sorted tables on the join key, and increasing spark.sql.shuffle.partitions to spread reduce tasks across more executors.
spark.sql.shuffle.partitions
The default is 200. For small datasets this is often too many (many tiny partitions, high task overhead). For large datasets it can be too few (large partitions, GC pressure). Adaptive Query Execution (AQE) in Spark 3+ tunes this at runtime.