datarekha

Delta Lake — ACID on top of Parquet

The storage layer that turned data lakes into something you can actually trust. Time travel, schema enforcement, and the MERGE that finally made upsert easy.

10 min read Intermediate PySpark Lesson 20 of 22

What you'll learn

  • How the _delta_log directory gives Parquet ACID semantics
  • Time travel, schema enforcement, and schema evolution — when to use each
  • The MERGE operation and the CDC/SCD-Type-2 pattern
  • VACUUM, OPTIMIZE, Z-ORDER — the maintenance trio

Before you start

Plain Parquet on S3 is a good storage format and a bad database. Two writers can clobber each other. A failed write leaves orphan files. A reader can pick up half a commit. You can’t “go back to yesterday.”

Delta Lake fixes all of this by adding a transaction log (an append-only directory of JSON commit files that records every change to the table) next to your Parquet files. The data is still Parquet — anyone with a Parquet reader can read the underlying files. But everything that touches the table goes through the log, which gives you ACID (Atomicity, Consistency, Isolation, Durability — database-style guarantees that each write either fully succeeds or leaves nothing behind), versioning, and the features built on top.

Common misconception: Delta Lake is not a new file format. It is Parquet files plus a transaction log. The Parquet files are readable by any Parquet tool; the log is what adds the database semantics.

If you’re on Databricks, Delta is the default for everything. Knowing how the log works is what separates someone who uses Delta from someone who debugs it.

The mechanism — a log of commits

A Delta table is a directory:

/data/customers/
  _delta_log/
    00000000000000000000.json
    00000000000000000001.json
    00000000000000000002.json
    ...
  part-00000-abc.snappy.parquet
  part-00001-def.snappy.parquet
  part-00002-ghi.snappy.parquet
  ...

Each .json file in _delta_log/ is one commit. A commit lists which Parquet files were added (add) and which were removed (remove) in that transaction. The state of the table at version N is the union of all add/remove actions in commits 0 through N.

That’s the whole core idea. Everything else — time travel, ACID, schema enforcement — falls out of this one principle: the table is defined by its log, not by which files exist on disk.

When you write to a Delta table, the writer:

  1. Reads the current version (highest numbered log file)
  2. Writes new Parquet files (not yet visible)
  3. Atomically appends N+1.json with the add/remove actions
  4. (On conflict — another writer also wrote N+1 — the writer retries with N+2 if the changes don’t conflict)

The atomicity comes from the underlying object store’s conditional-write guarantee. On S3, Delta uses a PUT with If-None-Match (fail if the file already exists); on ADLS it uses a lease-based conditional put; on GCS it’s the equivalent. The log file appearing IS the commit happening — if two writers race to write the same version file, only one wins; the loser retries.

Reading and writing — same as Parquet, mostly

The PySpark API barely changes:

# Write
(df.write
   .format("delta")
   .mode("overwrite")
   .save("/data/customers"))

# Or as a UC-managed table
df.write.format("delta").saveAsTable("main.sales.customers")

# Read
df = spark.read.format("delta").load("/data/customers")
df = spark.read.table("main.sales.customers")

You can also use the path form spark.read.format("delta").load(...) or the table form. On Databricks with Unity Catalog, prefer the table form — it gives you permissions, lineage, and discovery.

Time travel

Because the log is append-only, every prior version of the table is still reconstructable:

# By version number
df_old = (spark.read
    .format("delta")
    .option("versionAsOf", 5)
    .load("/data/customers"))

# Or by timestamp
df_yesterday = (spark.read
    .format("delta")
    .option("timestampAsOf", "2026-05-27 09:00:00")
    .load("/data/customers"))

# SQL equivalent
spark.sql("SELECT * FROM main.sales.customers VERSION AS OF 5")
spark.sql("SELECT * FROM main.sales.customers TIMESTAMP AS OF '2026-05-27'")

Time travel is what makes Delta a real database, not a fancy folder. You can roll back a bad write, reproduce a model’s training data exactly, and diff yesterday’s table against today’s.

Schema enforcement vs schema evolution

By default, Delta enforces the schema. A write whose columns or types don’t match the table fails loudly at write time, not silently at read time:

# Table has columns: customer_id (long), name (string), age (int)
bad = spark.createDataFrame(
    [(1, "Aarav", "thirty")],   # age is string, not int!
    ["customer_id", "name", "age"]
)
bad.write.format("delta").mode("append").saveAsTable("main.sales.customers")
# AnalysisException: Cannot resolve column 'age' with type STRING

This is the feature that prevented entire categories of “we don’t know how this column got NULLs” incidents.

When you actually do want to add a column, opt in explicitly with mergeSchema:

# Adding a new column 'country' — opt in to schema evolution
(new_df.write
   .format("delta")
   .mode("append")
   .option("mergeSchema", "true")
   .saveAsTable("main.sales.customers"))

For dropping or changing column types, you use ALTER TABLE and the table needs column mapping enabled — Delta keeps the old columns in the log but hides them from new queries.

MERGE — the killer feature

The single most important Delta operation. MERGE lets you do an atomic upsert: insert new rows, update existing ones, optionally delete — all in one transaction. This is the pattern for CDC streams, slowly-changing dimensions, and any “I have a batch of changes, apply them to the current snapshot” job.

from delta.tables import DeltaTable

# Existing target table
target = DeltaTable.forPath(spark, "/data/customers")

# Incoming changes (from Kafka, a daily extract, whatever)
updates = spark.read.parquet("/staging/customer_updates/")

(target.alias("t")
   .merge(updates.alias("s"), "t.customer_id = s.customer_id")
   .whenMatchedUpdateAll()
   .whenNotMatchedInsertAll()
   .execute())

That five-line block replaces what used to be a multi-stage pipeline of “read both, anti-join to find new, join to find changed, write to staging, swap tables.” Atomic. One commit in the log.

You can be more precise about what to update or insert:

(target.alias("t")
   .merge(updates.alias("s"), "t.customer_id = s.customer_id")
   .whenMatchedUpdate(
       condition = "s.updated_at > t.updated_at",   # only newer rows
       set = {
           "name":       "s.name",
           "email":      "s.email",
           "updated_at": "s.updated_at",
       }
   )
   .whenNotMatchedInsert(values = {
       "customer_id":  "s.customer_id",
       "name":         "s.name",
       "email":        "s.email",
       "updated_at":   "s.updated_at",
   })
   .whenMatchedDelete(condition = "s.op = 'DELETE'")
   .execute())

For SCD Type 2 (preserve history as separate rows with effective date ranges), the recipe is two MERGEs: one to close out the current row by setting valid_to = now(), and one to insert the new version with valid_from = now(). The Databricks docs have a canonical template — bookmark it.

OPTIMIZE and Z-ORDER

Delta accumulates small files over time. Each write produces new Parquet files; each MERGE often rewrites a few. OPTIMIZE compacts small files into target-sized ones (default 1GB):

OPTIMIZE main.sales.customers;

OPTIMIZE works on the underlying Parquet but updates the Delta log to point at the new compacted files. Readers see no change.

Z-ORDER is OPTIMIZE plus a clustering: it co-locates rows that share filter values, so predicate pushdown skips more row groups:

OPTIMIZE main.sales.customers ZORDER BY (country, signup_date);

Pick Z-ORDER columns based on your filter predicates, not your join keys (joins benefit from partitioning, not Z-ORDER). One to four columns max — more than that and the clustering doesn’t help.

VACUUM — the cleanup

Time travel keeps old files around until you explicitly remove them. VACUUM deletes files no longer referenced by the current table state, older than a retention threshold:

-- Default retention is 7 days
VACUUM main.sales.customers;

-- Or a custom retention
VACUUM main.sales.customers RETAIN 168 HOURS;

After VACUUM, time travel to versions older than the retention window will fail — the files are gone. The default 7 days is a balance between “able to undo a bad week” and storage cost.

Simulating the Delta log

You don’t need a real Delta engine to understand the model. The shape of _delta_log is small enough to fit in 30 lines:

That’s the core trick. Real Delta has thousands of details on top — checkpoints every 10 commits, action types for protocol versioning, statistics for data skipping — but the snapshot-from-log model is the heart of it.

Quick check

Quick check

0/3
Q1What does the `_delta_log/` directory contain?
Q2Why is MERGE such a foundational Delta operation?
Q3You overwrite a table by accident with bad data. The table is on Delta with default 7-day retention. What's your fastest recovery?

Next

Delta gives you reliable storage. Now you need a way to schedule the PySpark code that writes to it — on a cadence, with retries, with dependencies. That’s the Databricks Jobs / Workflows lesson.

Practice this in an interview

All questions
What are the differences between a data warehouse, a data lake, and a data lakehouse?

A data warehouse stores structured, schema-on-write data optimized for SQL analytics but is expensive for raw or unstructured data. A data lake stores any format cheaply on object storage but lacks ACID transactions and query performance. A lakehouse layers open table formats (Delta Lake, Iceberg, Hudi) on object storage to deliver warehouse-grade performance and ACID semantics at data lake costs — it is the dominant architecture in 2026.

What do the ACID properties mean, and how does each one protect your data?

Atomicity ensures a transaction either commits fully or rolls back entirely — no partial updates. Consistency ensures every committed transaction leaves the database in a valid state that satisfies all defined constraints. Isolation ensures concurrent transactions do not see each other's intermediate state. Durability ensures a committed transaction survives crashes because its changes are flushed to non-volatile storage.

How does caching and persist work in Spark, and when should you use each storage level?

cache() stores a DataFrame in executor memory using the default MEMORY_AND_DISK storage level. persist() lets you choose the storage level — memory-only, disk-only, serialized, or replicated. Use caching when a DataFrame is reused multiple times in the same application; without it, Spark recomputes the entire lineage from scratch on each action.

What is the difference between an RDD, a DataFrame, and a Dataset in Spark?

RDD is the low-level, type-safe distributed collection with no schema knowledge. DataFrame adds a named-column schema on top, enabling the Catalyst optimizer and codegen — but loses compile-time type safety. Dataset merges both worlds: it carries a schema and passes through Catalyst while remaining statically typed in Scala/Java.

Sign in to track your progress

Completed lessons, your XP, level, and streak save to your account — it's free and takes a few seconds.

Explore further

Related lessons

Skip to content