What does idempotency mean for a data pipeline, and how do you make a pipeline idempotent?
An idempotent pipeline produces the same output no matter how many times it runs for the same logical window — rerunning it on an already-processed date partition yields identical results rather than duplicated rows. Achieving idempotency typically means using INSERT OVERWRITE (or MERGE) instead of plain INSERT, keying every record with a deterministic ID, and deleting-then-inserting the target partition before writing.
How to think about it
Idempotency is the most important reliability property for batch pipelines. Airflow retries on failure; engineers rerun jobs after bugs are fixed; backfills replay months of history. If a pipeline is not idempotent, every retry is a data quality incident.
What breaks idempotency
A plain INSERT INTO appends rows. Running the same job twice for the same date doubles the data. This is the most common data quality bug in analytics engineering.
-- BAD: doubles rows on retry
INSERT INTO orders_daily
SELECT DATE(created_at), SUM(amount) FROM orders
WHERE DATE(created_at) = '2026-06-05';
Pattern 1 — INSERT OVERWRITE a partition
Most columnar stores support atomic partition replacement. The partition is deleted and rewritten in a single operation.
-- Spark / Hive-style
INSERT OVERWRITE TABLE orders_daily
PARTITION (order_date = '2026-06-05')
SELECT SUM(amount) AS revenue FROM orders
WHERE DATE(created_at) = '2026-06-05';
Snowflake equivalent: wrap in a transaction that does DELETE WHERE order_date = '2026-06-05' followed by INSERT.
Pattern 2 — MERGE (upsert)
Use a natural or synthetic unique key. MERGE updates existing rows and inserts new ones.
MERGE INTO orders_daily AS target
USING staging_orders_daily AS src
ON target.order_date = src.order_date
WHEN MATCHED THEN UPDATE SET revenue = src.revenue
WHEN NOT MATCHED THEN INSERT (order_date, revenue) VALUES (src.order_date, src.revenue);
Pattern 3 — deterministic surrogate keys
If the source lacks a primary key, generate a deterministic ID from stable fields using MD5 or SHA-256. The same source row always produces the same key, so MERGE safely deduplicates on retry.
import hashlib, json
def make_id(row: dict) -> str:
stable = {k: row[k] for k in ("order_id", "event_type", "created_at")}
return hashlib.md5(json.dumps(stable, sort_keys=True).encode()).hexdigest()
Airflow: always pass ds as the partition key
Airflow’s {{ ds }} template variable always refers to the logical execution date, not now(). Using now() or CURRENT_DATE makes a DAG non-idempotent because a retry on a different wall-clock date will write to a different partition.