Capstone goals and scope
This capstone ties together an event-sourced domain implemented in PostgreSQL with two non-negotiable properties: (1) replayable history that can rebuild state deterministically, and (2) performant read models that serve product queries without scanning raw events. The focus is on the end-to-end implementation details you would actually ship: domain boundaries, command handling, event persistence, replay orchestration, projection correctness checks, and operational practices (backfills, replays, and performance tuning) that keep the system reliable over time.
The chapter uses a concrete domain: an e-commerce ordering workflow. You will implement an event-sourced Order aggregate and two read models: an order_summary view for customer support and an order_item_sales_daily rollup for operations. You will also implement a replay mechanism that can rebuild projections from scratch, and a strategy for evolving projections without downtime.
Domain slice: Order aggregate and invariants
Pick a slice that has clear invariants and a lifecycle. For orders, typical invariants include: an order must be created before it can be paid; it cannot be shipped before payment; it cannot be paid twice; it cannot be shipped twice; cancellation rules depend on state. In event sourcing, invariants are enforced at command time by loading current state from events (or a snapshot) and deciding whether to emit new events.
Define the aggregate’s event types (names are illustrative):
- OrderCreated(order_id, customer_id, currency)
- ItemAdded(order_id, sku, qty, unit_price)
- ItemRemoved(order_id, sku, qty)
- OrderSubmitted(order_id)
- PaymentAuthorized(order_id, payment_id, amount)
- OrderCancelled(order_id, reason)
- OrderShipped(order_id, shipment_id, carrier)
Keep the capstone centered on implementation: how these events are stored, how state is reconstructed, and how read models are built and replayed.
- Listen to the audio with the screen off.
- Earn a certificate upon completion.
- Over 5000 courses for you to explore!
Download the app
Step-by-step: PostgreSQL schema for aggregate streams and projections
1) Aggregate event stream table
Assume you already have a robust append-only event table pattern. For the capstone, add the minimum additional structure needed to support aggregate-centric reads and replay orchestration: a stream identifier, a per-stream version, and metadata for tracing.
CREATE TABLE order_events ( event_id uuid PRIMARY KEY, order_id uuid NOT NULL, stream_version bigint NOT NULL, event_type text NOT NULL, occurred_at timestamptz NOT NULL, actor_id uuid, correlation_id uuid, causation_id uuid, payload jsonb NOT NULL, UNIQUE (order_id, stream_version));The uniqueness of (order_id, stream_version) enforces a single linear history per aggregate. Your command handler will insert with the next expected version to detect concurrent updates.
2) Snapshot table (optional but practical)
Replaying from the beginning of time for every command is rarely acceptable. A snapshot is a cached fold of events up to a version. You can keep it simple: store the latest snapshot per order, and rebuild it during replay or opportunistically during normal processing.
CREATE TABLE order_snapshots ( order_id uuid PRIMARY KEY, snapshot_version bigint NOT NULL, snapshot_at timestamptz NOT NULL, state jsonb NOT NULL);The snapshot state is an internal representation of the aggregate (e.g., items, totals, status). It does not need to match any external API schema.
3) Projection tables (read models)
Define read models that answer specific queries. Keep them denormalized and indexed for access patterns.
CREATE TABLE order_summary ( order_id uuid PRIMARY KEY, customer_id uuid NOT NULL, status text NOT NULL, currency text NOT NULL, total_amount numeric(12,2) NOT NULL, item_count int NOT NULL, submitted_at timestamptz, paid_at timestamptz, shipped_at timestamptz, updated_at timestamptz NOT NULL);CREATE TABLE order_item_sales_daily ( day date NOT NULL, sku text NOT NULL, currency text NOT NULL, qty_sold bigint NOT NULL, gross_amount numeric(14,2) NOT NULL, PRIMARY KEY (day, sku, currency));These two projections illustrate different shapes: an entity summary and a time-bucketed rollup.
4) Projection progress tracking
To make projections replayable and observable, track progress explicitly. The simplest robust approach is to track a high-water mark per projection (e.g., last processed event timestamp plus a tie-breaker). If your event table has a monotonically increasing sequence, track that. Here we’ll track (occurred_at, event_id) as a stable cursor.
CREATE TABLE projection_checkpoints ( projection_name text PRIMARY KEY, last_occurred_at timestamptz NOT NULL, last_event_id uuid NOT NULL, updated_at timestamptz NOT NULL);During replay you will reset checkpoints; during incremental processing you will advance them.
Step-by-step: Command handling with optimistic concurrency
The command handler is responsible for: loading current aggregate state, validating invariants, producing new events, and appending them atomically. The key is to ensure you never write events based on stale state.
1) Load aggregate state efficiently
Load the latest snapshot (if any), then load events after the snapshot version and fold them into state. Folding is application code, but the query pattern matters.
-- Load snapshotSELECT snapshot_version, stateFROM order_snapshotsWHERE order_id = $1;-- Load remaining eventsSELECT stream_version, event_type, occurred_at, payloadFROM order_eventsWHERE order_id = $1 AND stream_version > $2ORDER BY stream_version ASC;Maintain a deterministic fold function: given prior state and an event, produce the next state. Determinism is what makes replay trustworthy.
2) Decide and append events in a single transaction
When appending, enforce the expected next version. If two writers race, one will fail on the unique constraint (order_id, stream_version). Your application catches that and retries by reloading state.
BEGIN;-- Determine expected_version from loaded state-- Insert one or more events with consecutive stream_version valuesINSERT INTO order_events (event_id, order_id, stream_version, event_type, occurred_at, actor_id, correlation_id, causation_id, payload)VALUES ($1, $2, $3, 'PaymentAuthorized', now(), $4, $5, $6, $7::jsonb), ($8, $2, $3 + 1, 'OrderSubmitted', now(), $4, $5, $1, $9::jsonb);COMMIT;In practice, you may emit one event per command, or multiple events if the domain requires it. The important part is that the stream_version sequence is contiguous and validated by the database constraints.
3) Update snapshots opportunistically
After appending, you can update the snapshot if the event count since last snapshot exceeds a threshold. Do it in the same transaction if you want strict coupling, or asynchronously if you want lower write latency. A simple synchronous approach:
INSERT INTO order_snapshots (order_id, snapshot_version, snapshot_at, state)VALUES ($1, $2, now(), $3::jsonb)ON CONFLICT (order_id) DO UPDATESET snapshot_version = EXCLUDED.snapshot_version, snapshot_at = EXCLUDED.snapshot_at, state = EXCLUDED.stateWHERE order_snapshots.snapshot_version < EXCLUDED.snapshot_version;This ensures snapshots only move forward.
Step-by-step: Building replayable projections
A replayable projection has three traits: (1) it can be rebuilt from events without external dependencies, (2) it can be reset safely, and (3) it can be validated against invariants or reconciliation queries. The capstone implements projections as database tables updated by a projection worker (application code) that reads events in order and applies them.
1) Projection apply functions (conceptual)
Implement one apply function per projection. Each function takes an event and performs the minimal SQL needed to update the projection table. Keep these functions idempotent with respect to the projection cursor (i.e., only apply events once in the correct order). The checkpoint table is what prevents duplicates during normal processing.
For order_summary, you can update per event type:
- OrderCreated: insert row with status = 'CREATED'
- ItemAdded/ItemRemoved: update totals and item_count
- OrderSubmitted: set status and submitted_at
- PaymentAuthorized: set status and paid_at
- OrderShipped: set status and shipped_at
- OrderCancelled: set status
For order_item_sales_daily, you typically update on shipment (or submission) depending on your business definition of “sold”. Here we’ll count on shipment to avoid counting cancelled orders.
2) Event reading loop with checkpoint advancement
The worker reads events after the last checkpoint, in a stable order, applies them, then advances the checkpoint in the same transaction. This gives you atomic “apply + advance” semantics.
-- Pseudocode-ish SQL flow inside a transaction-- 1) Read checkpointSELECT last_occurred_at, last_event_idFROM projection_checkpointsWHERE projection_name = 'order_summary'FOR UPDATE;-- 2) Read next batchSELECT event_id, order_id, stream_version, event_type, occurred_at, payloadFROM order_eventsWHERE (occurred_at, event_id) > ($1, $2)ORDER BY occurred_at ASC, event_id ASCLIMIT 1000;-- 3) For each event: apply updates to projection table(s)-- 4) Advance checkpoint to last event in batchUPDATE projection_checkpointsSET last_occurred_at = $new_occurred_at, last_event_id = $new_event_id, updated_at = now()WHERE projection_name = 'order_summary';Using FOR UPDATE on the checkpoint row ensures only one worker instance processes a given projection at a time (per database). If you need parallelism, partition by stream (e.g., by order_id hash) and maintain multiple checkpoints, but keep the capstone single-worker for clarity.
3) Reset and replay procedure
To replay a projection from scratch, you need a deterministic reset. The reset must remove derived state and reset the checkpoint to the beginning.
BEGIN;TRUNCATE TABLE order_summary;UPDATE projection_checkpointsSET last_occurred_at = '-infinity', last_event_id = '00000000-0000-0000-0000-000000000000', updated_at = now()WHERE projection_name = 'order_summary';COMMIT;Then run the worker in “replay mode” (same logic as incremental mode, just starting from the beginning). For large datasets, you may prefer to rebuild into a new table and swap (see below).
Performant read models: patterns that keep queries fast
Event sourcing pushes complexity into writes and projections so reads can be simple. The capstone read models should be designed around the exact queries you need to serve.
Order summary query patterns
Typical queries: fetch by order_id, list recent orders for a customer, filter by status, and sort by updated_at.
CREATE INDEX order_summary_customer_updated_idxON order_summary (customer_id, updated_at DESC);CREATE INDEX order_summary_status_updated_idxON order_summary (status, updated_at DESC);Keep the row narrow enough to fit frequently accessed fields. If support needs a detailed item list, store it separately (e.g., order_items_read) rather than bloating the summary row.
Daily sales rollup query patterns
Typical queries: sales by SKU for a date range, top SKUs per day, totals per day.
CREATE INDEX order_item_sales_daily_day_idxON order_item_sales_daily (day);CREATE INDEX order_item_sales_daily_sku_day_idxON order_item_sales_daily (sku, day);Because the primary key already includes (day, sku, currency), the additional indexes depend on your most common filters and sorts.
Replay without downtime: rebuild-and-swap strategy
Replaying a projection by truncating and rebuilding can cause downtime or inconsistent reads if the table is queried while empty. A safer approach is rebuild-and-swap:
- Create a new table
order_summary_v2with the same schema (or evolved schema). - Replay events into
order_summary_v2while the old table continues serving reads. - Once caught up, atomically swap names in a transaction.
In PostgreSQL, swapping can be done with ALTER TABLE ... RENAME operations inside a transaction. Example:
BEGIN;ALTER TABLE order_summary RENAME TO order_summary_old;ALTER TABLE order_summary_v2 RENAME TO order_summary;COMMIT;After the swap, you can drop the old table later. Your application should reference the logical name (order_summary) so it automatically uses the new table after the swap.
To keep the new table up to date during the rebuild, you have two options:
- Dual-write projection worker: apply events to both old and new tables after a certain point.
- Catch-up phase: rebuild to a checkpoint, then run a fast incremental catch-up to the present before swapping.
The catch-up phase is usually simpler: record the “replay start cursor”, rebuild up to that cursor, then process from that cursor to current head, then swap.
Correctness controls: reconciliation queries and invariants
Replayability is only valuable if you can trust the result. Add reconciliation checks that compare projection state to event-derived calculations. These checks should be runnable in staging and periodically in production (off-peak).
1) Reconcile order totals
Compute totals from events and compare to order_summary.total_amount. The exact SQL depends on your event payload structure, but the pattern is: aggregate item add/remove events per order and compare.
WITH item_deltas AS ( SELECT order_id, SUM(CASE WHEN event_type = 'ItemAdded' THEN (payload->>'qty')::int ELSE 0 END) - SUM(CASE WHEN event_type = 'ItemRemoved' THEN (payload->>'qty')::int ELSE 0 END) AS qty_net, SUM(CASE WHEN event_type = 'ItemAdded' THEN ((payload->>'qty')::numeric * (payload->>'unit_price')::numeric) ELSE 0 END) - SUM(CASE WHEN event_type = 'ItemRemoved' THEN ((payload->>'qty')::numeric * (payload->>'unit_price')::numeric) ELSE 0 END) AS amount_net FROM order_events WHERE event_type IN ('ItemAdded','ItemRemoved') GROUP BY order_id)SELECT s.order_id, s.total_amount, d.amount_netFROM order_summary sJOIN item_deltas d USING (order_id)WHERE s.total_amount <> d.amount_net;This query should return zero rows. If it doesn’t, you have either a projection bug or unexpected event data that your projection logic didn’t handle.
2) Reconcile lifecycle states
Ensure impossible states do not appear in the read model (e.g., shipped without paid). You can express these as constraints or periodic checks.
SELECT order_idFROM order_summaryWHERE status = 'SHIPPED' AND paid_at IS NULL;If you can encode invariants as CHECK constraints on projections, do so carefully; constraints can protect you from projection code regressions, but they can also cause the projection worker to fail hard if historical data violates the constraint. A common compromise is to keep constraints on non-controversial invariants and use reconciliation queries for the rest.
Operational playbook: backfills, partial replays, and performance
Partial replay by time window or by aggregate
Not every fix requires a full replay. If a bug affected only a subset (e.g., a single customer segment, a time window, or a specific event type), implement targeted rebuilds:
- By order_id: delete and rebuild
order_summaryfor one order by folding its events and writing the row. - By time window: rebuild rollups for affected days by recomputing from events in that window.
For example, to rebuild a single order summary, you can delete the row and then re-apply events for that order in stream order. This is often implemented as an admin command in the projection service.
Batch sizing and transaction boundaries
Projection workers should process events in batches. Too small: overhead dominates. Too large: long transactions increase lock contention and bloat. Start with 500–5000 events per batch and measure. Keep each batch in a single transaction so the checkpoint advances atomically with the applied changes.
Hot rows and contention
Rollups like order_item_sales_daily can create hot rows (many events updating the same (day, sku)). If you see contention:
- Update on less frequent events (e.g., shipment rather than item add).
- Accumulate in-memory per batch and write aggregated deltas per key (one UPDATE per key per batch).
- Sharded rollups: write into
order_item_sales_daily_shard_ntables by hash and periodically merge (useful at very high throughput).
Projection code versioning
When projection logic changes (bug fix or new derived fields), treat it as a versioned artifact. A practical approach:
- Name projections with a version suffix internally (e.g.,
order_summary_v2). - Keep a metadata table that records projection version, build time, and the event cursor it was built to.
- Use rebuild-and-swap to deploy new versions safely.
This avoids trying to “patch” derived data in-place without a clear lineage of how it was computed.
Putting it together: an end-to-end implementation checklist
1) Implement aggregate fold and command decisions
- Define an in-memory state structure for Order (status, items, totals, timestamps).
- Implement
apply(event)to fold events into state deterministically. - Implement
decide(command, state)to validate invariants and return new events.
2) Implement event append with expected version
- Load snapshot + remaining events.
- Compute expected next stream_version.
- Insert events with consecutive stream_version values in one transaction.
- On conflict, retry by reloading state.
3) Implement projection workers
- One worker per projection (or one worker handling multiple projections with separate checkpoints).
- Read events after checkpoint in stable order.
- Apply event updates to projection tables.
- Advance checkpoint in the same transaction.
4) Implement replay tooling
- Reset procedure (truncate + checkpoint reset) for development and staging.
- Rebuild-and-swap procedure for production-safe replays.
- Partial replay tooling for targeted fixes.
5) Implement correctness checks
- Reconciliation queries for totals and lifecycle invariants.
- Monitoring: lag between event head and projection checkpoint; error rates; batch processing time.
- Alerting on projection stalls (checkpoint not advancing).