What “incremental and replayable” means for projections
A projection is a derived table (or set of tables) that is maintained from an event stream. “Incremental processing” means you update the projection using only the new events since the last successful run, rather than recomputing from scratch. “Replayable processing” means you can rebuild the same projection deterministically by re-reading events from an earlier point (often from the beginning), producing the same final state for a given cutoff.
These two properties are complementary: incremental processing keeps the system fast and cheap during normal operation, while replayability provides recovery from bugs, schema changes, or logic changes in the projection code. In PostgreSQL, the core design challenge is to make incremental updates safe under concurrency, observable (so you can monitor lag and correctness), and reversible (so you can replay without manual cleanup).
Core building blocks in PostgreSQL
1) A projection table with stable keys
Every projection needs a primary key that represents the entity or aggregation grain you are projecting (for example, account_id, customer_id, product_id, (tenant_id, day)). Incremental updates rely on being able to upsert into this stable key.
2) A checkpoint (cursor) per projection
Incremental processing requires a durable record of “how far” the projection has processed. This is typically a checkpoint table keyed by projection name (and optionally shard/partition). The checkpoint stores the last processed position (for example, an event table surrogate key, a stream offset, or a timestamp + tiebreaker). It also stores metadata such as last updated time, worker identity, and status.
3) A deterministic event selection query
Your incremental job must select “events after checkpoint” in a deterministic order. The ordering column(s) must be stable and monotonic for the stream you are reading. The job should process in batches to bound transaction size and lock duration.
- Listen to the audio with the screen off.
- Earn a certificate upon completion.
- Over 5000 courses for you to explore!
Download the app
4) A transactional update pattern
To avoid gaps or double-advancing the checkpoint, the projection update and the checkpoint advance should happen in the same database transaction. If the transaction commits, both the projection changes and the checkpoint move forward. If it rolls back, neither does.
Schema patterns: projection state and checkpoints
Checkpoint table
The checkpoint table is small but critical. It should support concurrent workers safely and allow you to reset for replay.
CREATE TABLE projection_checkpoint ( projection_name text PRIMARY KEY, last_event_id bigint NOT NULL DEFAULT 0, updated_at timestamptz NOT NULL DEFAULT now(), status text NOT NULL DEFAULT 'idle', worker_id text, notes text);This example uses last_event_id as the cursor. If your event stream is partitioned or sharded, extend the primary key to include partition_id or shard_id.
Projection table example
Assume a projection that maintains account balances and last activity time. The projection table might look like:
CREATE TABLE account_balance_projection ( account_id uuid PRIMARY KEY, balance_cents bigint NOT NULL, last_activity_at timestamptz NOT NULL, updated_at timestamptz NOT NULL DEFAULT now());Incremental processing will upsert into this table as new events arrive.
Incremental maintenance: a step-by-step processing loop
The following pattern is a practical baseline for a projection worker that runs continuously or on a schedule. It processes events in batches, updates the projection, and advances the checkpoint atomically.
Step 1: Acquire the checkpoint row (and optionally a lease)
If you run multiple workers, you must ensure only one worker processes a given projection (or a given shard) at a time. A simple approach is to lock the checkpoint row using SELECT ... FOR UPDATE inside a transaction. Another approach is advisory locks. Row locking is straightforward and keeps the “ownership” visible in the table.
BEGIN; SELECT last_event_id FROM projection_checkpoint WHERE projection_name = 'account_balance' FOR UPDATE;At this point, the worker has exclusive access to advance the cursor for this projection until it commits or rolls back.
Step 2: Read the next batch of events
Read events strictly after the checkpoint, ordered by the cursor column. Use a batch size that keeps transactions short (for example 1k–10k events depending on payload and update cost).
WITH next_events AS ( SELECT id, event_type, account_id, amount_cents, occurred_at FROM account_events WHERE id > (SELECT last_event_id FROM projection_checkpoint WHERE projection_name = 'account_balance') ORDER BY id LIMIT 5000) SELECT * FROM next_events;In practice you will run this query inside the same transaction as the checkpoint lock. That ensures the cursor value you read is consistent with the lock you hold.
Step 3: Apply events to the projection using set-based SQL
Instead of looping row-by-row in application code, prefer set-based updates. Aggregate the batch by projection key and apply a single upsert per key. This reduces write amplification and improves performance.
Example: if your events include credits and debits, compute net change per account in the batch:
WITH next_events AS ( SELECT id, account_id, amount_cents, occurred_at FROM account_events WHERE id > (SELECT last_event_id FROM projection_checkpoint WHERE projection_name = 'account_balance') ORDER BY id LIMIT 5000), per_account AS ( SELECT account_id, SUM(amount_cents) AS delta_cents, MAX(occurred_at) AS max_occurred_at, MAX(id) AS max_id FROM next_events GROUP BY account_id) INSERT INTO account_balance_projection(account_id, balance_cents, last_activity_at, updated_at) SELECT account_id, delta_cents, max_occurred_at, now() FROM per_account ON CONFLICT (account_id) DO UPDATE SET balance_cents = account_balance_projection.balance_cents + EXCLUDED.balance_cents, last_activity_at = GREATEST(account_balance_projection.last_activity_at, EXCLUDED.last_activity_at), updated_at = now();This pattern assumes the projection stores a running balance and that the event amounts are additive. For non-additive projections (for example “current status”), you may need to compute the latest event per key in the batch and update accordingly.
Step 4: Advance the checkpoint to the batch maximum
After applying the batch, advance the checkpoint to the maximum cursor value processed. Do not advance beyond what you actually applied.
WITH next_events AS ( SELECT id FROM account_events WHERE id > (SELECT last_event_id FROM projection_checkpoint WHERE projection_name = 'account_balance') ORDER BY id LIMIT 5000), max_id AS ( SELECT COALESCE(MAX(id), (SELECT last_event_id FROM projection_checkpoint WHERE projection_name = 'account_balance')) AS new_last_id FROM next_events) UPDATE projection_checkpoint SET last_event_id = (SELECT new_last_id FROM max_id), updated_at = now(), status = 'running' WHERE projection_name = 'account_balance';Then commit:
COMMIT;If the batch is empty, you can commit without changes and sleep briefly. You can also set status to idle when no work is found.
Handling partial failures and retries without losing replayability
Incremental workers fail: deployments happen, connections drop, and unexpected data appears. The goal is to make failure modes safe and easy to recover from.
Keep projection updates and checkpoint updates in one transaction
This is the main safety property. If the worker crashes after updating the projection but before advancing the checkpoint, the transaction will roll back and the projection will not contain partial results. If it crashes after commit, both projection and checkpoint are consistent.
Use bounded batches
Large batches increase the chance of long locks and deadlocks, and they make reprocessing after a failure more expensive. Smaller batches reduce blast radius and improve responsiveness.
Record errors with enough context to replay
When an event cannot be applied (for example due to a constraint violation), you need a policy: stop the projection, skip the event, or route it to a dead-letter table. If you skip, you must record the skipped event id and reason, otherwise replay will re-hit the same failure.
CREATE TABLE projection_dead_letter ( projection_name text NOT NULL, event_id bigint NOT NULL, error text NOT NULL, recorded_at timestamptz NOT NULL DEFAULT now(), PRIMARY KEY (projection_name, event_id));A conservative approach is to stop processing and alert, because skipping changes the semantics of replay. If you do implement dead-lettering, treat it as part of the projection’s deterministic behavior: the same event should be dead-lettered again on replay unless the underlying issue is fixed and you intentionally reprocess it.
Replay strategies: full rebuild vs. bounded replay
Replayability is not only “rebuild from scratch.” In practice you will use different replay scopes depending on what changed.
Full rebuild (drop and rebuild projection)
Use when projection logic changed significantly or when you suspect corruption. The simplest method is:
- Stop the projection worker.
- Truncate or recreate the projection table(s).
- Reset the checkpoint to 0 (or stream start).
- Run the worker in “catch-up” mode until it reaches the head.
In SQL:
BEGIN; TRUNCATE TABLE account_balance_projection; UPDATE projection_checkpoint SET last_event_id = 0, updated_at = now(), status = 'idle', notes = 'full rebuild' WHERE projection_name = 'account_balance'; COMMIT;Then restart the worker.
Bounded replay (reprocess from a known point)
Use when you fixed a bug that affected only a time window or a subset of events. The challenge is that projections are derived state; if you rewind the checkpoint, you must also rewind the projection state to match, otherwise you will double-apply effects.
There are three common approaches:
- Rebuild-from-scratch anyway: simplest, often acceptable if the projection is small or events are manageable.
- Keep periodic snapshots of the projection: restore the latest snapshot before the rewind point, then replay from there.
- Maintain compensating logic: compute and apply inverse changes for the range you are rewinding (harder, error-prone, and projection-specific).
If you choose snapshots, store them as tables keyed by snapshot id and cutoff cursor. For example, nightly snapshots of account_balance_projection with a recorded last_event_id. To bounded-replay, restore the snapshot and set the checkpoint to the snapshot cursor, then replay forward.
Versioning projection logic without breaking operations
Projection code evolves. A practical operational pattern is to treat each projection as versioned: account_balance_v1, account_balance_v2. You run v2 in parallel, backfill it by replaying events, validate it, then switch readers to v2.
Parallel run pattern
- Create new projection table(s) for v2.
- Create a new checkpoint row for v2 starting at 0.
- Run a worker for v2 to catch up.
- Compare v1 and v2 results for a sample or full dataset.
- Switch application reads to v2.
- Retire v1 later.
This avoids risky in-place rewinds and makes rollbacks easier: if v2 is wrong, keep reading v1.
Dealing with late-arriving and corrected events
Some domains produce events that refer to earlier business time (late-arriving) or represent corrections. Incremental processing based on stream order will still apply them when they arrive, but your projection must be designed to incorporate them correctly.
Late-arriving events in time-bucket projections
If you maintain a projection aggregated by day (for example (account_id, day)), a late event for a previous day must update that day’s row. Incremental processing still works because the event arrives “now” in stream order, but it updates an older bucket.
Set-based batch aggregation works well here: group by the bucket key derived from the event’s business time and upsert into the bucket row.
Corrections and reversals
If corrections are represented as separate events (for example “adjustment” or “reversal”), the projection should apply them as normal events. Replayability is preserved because the correction is part of the stream. Operationally, this is preferable to mutating historical events, because mutation complicates replay and auditing.
Performance and contention considerations for incremental workers
Minimize hot-row contention in the projection
If many events target the same projection key (for example a single popular account), upserts will contend on that row. Techniques to mitigate include:
- Process smaller batches to reduce lock duration on hot keys.
- Aggregate within the batch (as shown) so each key is updated once per batch.
- Consider splitting projections by shard key if the workload is naturally partitionable.
Keep the checkpoint lock short
Locking the checkpoint row for the entire batch is fine if batches are quick. If batches are slow, you can still keep correctness by using a lease approach: store a lease_until timestamp and worker_id, and have workers renew the lease. However, leases add complexity; start with row locks and optimize only if needed.
Use staging tables for complex transformations
If applying a batch requires multiple joins and transformations, consider inserting the batch into a temporary or unlogged staging table inside the transaction, then running a few deterministic SQL statements against it. This can simplify debugging and allow you to inspect intermediate results when developing.
Operational observability: lag, throughput, and correctness checks
Lag measurement
Store enough information to compute lag: the checkpoint cursor and the current head cursor. For an id-based stream, lag is head_id - last_event_id. You can expose this via a monitoring query:
SELECT c.projection_name, c.last_event_id, h.head_event_id, (h.head_event_id - c.last_event_id) AS lag_events, c.updated_at FROM projection_checkpoint c CROSS JOIN LATERAL ( SELECT MAX(id) AS head_event_id FROM account_events) h WHERE c.projection_name = 'account_balance';Throughput measurement
Track how many events were processed per batch and how long it took. You can store per-batch metrics in a small table:
CREATE TABLE projection_run_log ( projection_name text NOT NULL, started_at timestamptz NOT NULL, finished_at timestamptz NOT NULL, from_event_id bigint NOT NULL, to_event_id bigint NOT NULL, events_processed int NOT NULL, PRIMARY KEY (projection_name, started_at));Insert into this table at the end of each successful batch commit (or in a separate transaction if you prefer not to risk the main transaction, accepting that logs may be missing on crash).
Correctness checks with reconciliation queries
For some projections you can periodically reconcile by recomputing a small sample directly from events and comparing to the projection. For example, pick random accounts and compute balance from events up to the checkpoint cursor, then compare to the projection row. This helps detect logic bugs and silent data issues.
Advanced replay: deterministic rebuilds under schema evolution
When event schemas evolve, replay can fail if the projection expects fields that older events do not have. The replayable approach is to make the projection logic tolerant: provide defaults, branch by event type/version, and avoid assumptions that only hold for “new” events.
A practical technique is to normalize events into a “canonical” shape during processing: in the batch CTE, compute derived columns with COALESCE defaults and explicit casts, then use only those derived columns in the projection update. That way, replay over older events remains deterministic and does not require special-case application code.
Putting it together: a reusable worker transaction template
The following template shows the essential structure for an incremental, replayable projection batch in PostgreSQL. Adapt the event selection and projection update to your domain.
BEGIN; -- 1) Lock checkpoint SELECT last_event_id FROM projection_checkpoint WHERE projection_name = 'account_balance' FOR UPDATE; -- 2) Select batch WITH next_events AS ( SELECT id, account_id, amount_cents, occurred_at FROM account_events WHERE id > (SELECT last_event_id FROM projection_checkpoint WHERE projection_name = 'account_balance') ORDER BY id LIMIT 5000), per_account AS ( SELECT account_id, SUM(amount_cents) AS delta_cents, MAX(occurred_at) AS max_occurred_at FROM next_events GROUP BY account_id), max_id AS ( SELECT COALESCE(MAX(id), (SELECT last_event_id FROM projection_checkpoint WHERE projection_name = 'account_balance')) AS new_last_id, COUNT(*) AS n FROM next_events) -- 3) Apply projection INSERT INTO account_balance_projection(account_id, balance_cents, last_activity_at, updated_at) SELECT account_id, delta_cents, max_occurred_at, now() FROM per_account ON CONFLICT (account_id) DO UPDATE SET balance_cents = account_balance_projection.balance_cents + EXCLUDED.balance_cents, last_activity_at = GREATEST(account_balance_projection.last_activity_at, EXCLUDED.last_activity_at), updated_at = now(); -- 4) Advance checkpoint UPDATE projection_checkpoint SET last_event_id = (SELECT new_last_id FROM max_id), updated_at = now(), status = CASE WHEN (SELECT n FROM max_id) = 0 THEN 'idle' ELSE 'running' END WHERE projection_name = 'account_balance'; COMMIT;This template gives you: incremental processing (only events after the cursor), replayability (reset cursor and rebuild deterministically), and operational safety (atomic checkpoint advancement). From here, you can add versioned projections, snapshot-based bounded replay, dead-letter handling, and monitoring tables as your system grows.