Why integration is the hard part
In a layered application, the domain model is rarely the only representation of data. The application layer needs request/response shapes, the persistence layer needs storage-friendly records, and external systems impose their own schemas. Integration is the work of moving data across these boundaries without letting infrastructure concerns leak into domain logic.
This chapter focuses on practical integration patterns: mapping between domain objects and persistence records, orchestrating use cases in the application layer, handling transactions, and keeping dependencies pointed inward. The goal is to make integration explicit and testable: domain objects stay focused on business rules, while adapters translate to and from the outside world.
Layer responsibilities and dependency direction
A useful mental model is: the domain layer defines business concepts and rules; the application layer coordinates use cases (commands/queries), transactions, and authorization; the persistence layer stores and retrieves data. Integration succeeds when the domain does not import persistence types, database sessions, ORMs, or web framework models.
- Domain layer: entities/value objects, domain services, domain events, invariants, business rules.
- Application layer: use-case orchestration, repositories as interfaces, unit-of-work boundaries, mapping to DTOs, calling external services via ports.
- Persistence layer: concrete repositories, database models/ORM, SQL, migrations, caching, message brokers, serialization formats.
To keep the dependency direction clean, define ports (interfaces) in the application layer (or domain-adjacent layer) and implement them in the infrastructure/persistence layer. The application layer depends on abstractions; infrastructure depends on the domain and application abstractions.
Integration pattern: Ports, adapters, and mappers
Most integration work boils down to three building blocks:
Continue in our app.
You can listen to the audiobook with the screen off, receive a free certificate for this course, and also have access to 5,000 other free online courses.
Or continue reading below...Download the app
- Ports: interfaces the application layer expects (e.g., repository, message publisher, clock, payment gateway).
- Adapters: implementations of ports (e.g., SQLAlchemy repository, Redis cache adapter, HTTP client).
- Mappers: pure functions/classes that translate between representations (domain objects <-> persistence records, domain objects <-> API DTOs).
Mappers are especially valuable because they isolate “shape changes” (field names, normalization, denormalization, JSON columns, join tables) from business logic. They also become the natural place to handle backward compatibility at the persistence boundary (e.g., reading old columns and producing the current domain model).
Step-by-step: integrating a domain model with persistence
Consider a simple ordering domain: an Order entity with line items and a status. The domain object is already designed and validated (covered earlier). Now we integrate it with a relational database.
Step 1: define repository and unit-of-work ports
Define the interfaces the application layer needs. Keep them minimal and use domain types in signatures.
from __future__ import annotations # if needed for forward references in older versions of Python 3.11-3.12 codebases is optional here # noqa: E501 # optional in some linters # noqa: E501 # optional in some linters # noqa: E501 # optional in some linters # noqa: E501 # optional in some linters # noqa: E501 # optional in some linters # noqa: E501 # optional in some linters # noqa: E501 # optional in some linters # noqa: E501 # optional in some linters # noqa: E501 # optional in some linters # noqa: E501
from dataclasses import dataclass
from typing import Iterable, Optional, Protocol
# Domain types assumed to exist
OrderId = str
class OrderRepository(Protocol):
def get(self, order_id: OrderId) -> Optional["Order"]: ...
def add(self, order: "Order") -> None: ...
def update(self, order: "Order") -> None: ...
class UnitOfWork(Protocol):
orders: OrderRepository
def commit(self) -> None: ...
def rollback(self) -> None: ...
def __enter__(self) -> "UnitOfWork": ...
def __exit__(self, exc_type, exc, tb) -> None: ...Two integration benefits appear immediately: (1) the application layer can be tested with in-memory fakes; (2) persistence is free to use any technology as long as it implements the port.
Step 2: define persistence records (tables/ORM models) separate from domain
Even if you use an ORM, treat ORM models as persistence records, not domain entities. They can be mutable, include foreign keys, and reflect normalization constraints.
from dataclasses import dataclass
from typing import List
@dataclass
class OrderRow:
id: str
status: str
customer_id: str
@dataclass
class OrderLineRow:
order_id: str
sku: str
quantity: int
unit_price_cents: intThese records are intentionally “dumb”: they represent how data is stored, not how the business behaves.
Step 3: implement a mapper (domain <-> persistence)
The mapper translates between Order and OrderRow/OrderLineRow. Keep it pure and deterministic so it’s easy to test.
from dataclasses import dataclass
from typing import Iterable, Tuple
# Domain types assumed to exist:
# - Order
# - OrderLine
# - Money (or int cents)
@dataclass(frozen=True)
class OrderRecord:
order: OrderRow
lines: tuple[OrderLineRow, ...]
class OrderMapper:
@staticmethod
def to_record(order: "Order") -> OrderRecord:
order_row = OrderRow(
id=str(order.id),
status=order.status.value, # e.g., an Enum in the domain
customer_id=str(order.customer_id),
)
line_rows = tuple(
OrderLineRow(
order_id=str(order.id),
sku=line.sku,
quantity=line.quantity,
unit_price_cents=line.unit_price.cents,
)
for line in order.lines
)
return OrderRecord(order=order_row, lines=line_rows)
@staticmethod
def from_record(record: OrderRecord) -> "Order":
# Convert persistence primitives into domain objects
lines = [
OrderLine(
sku=row.sku,
quantity=row.quantity,
unit_price=Money(cents=row.unit_price_cents),
)
for row in record.lines
]
return Order(
id=OrderId(record.order.id),
customer_id=CustomerId(record.order.customer_id),
status=OrderStatus(record.order.status),
lines=lines,
)Notice what the mapper does not do: it does not open transactions, call the database, or apply business rules. It only translates shapes and types.
Step 4: implement a concrete repository using the mapper
The repository is an adapter: it speaks the domain language to the application layer and speaks the database language internally.
from typing import Optional
class InMemoryOrderRepository:
def __init__(self) -> None:
self._orders: dict[str, OrderRow] = {}
self._lines: list[OrderLineRow] = []
def get(self, order_id: OrderId) -> Optional["Order"]:
row = self._orders.get(str(order_id))
if row is None:
return None
lines = tuple(l for l in self._lines if l.order_id == str(order_id))
return OrderMapper.from_record(OrderRecord(order=row, lines=lines))
def add(self, order: "Order") -> None:
record = OrderMapper.to_record(order)
self._orders[record.order.id] = record.order
self._lines.extend(record.lines)
def update(self, order: "Order") -> None:
record = OrderMapper.to_record(order)
self._orders[record.order.id] = record.order
self._lines = [l for l in self._lines if l.order_id != record.order.id]
self._lines.extend(record.lines)This in-memory repository demonstrates the separation. A SQL repository would follow the same shape: load rows, map to domain; map domain to rows, persist.
Step 5: coordinate with a unit of work
For real databases, you typically need a transaction boundary. The unit-of-work adapter manages the session/connection and exposes repositories bound to that session.
class InMemoryUnitOfWork:
def __init__(self) -> None:
self.orders = InMemoryOrderRepository()
self.committed = False
def __enter__(self) -> "InMemoryUnitOfWork":
return self
def __exit__(self, exc_type, exc, tb) -> None:
if exc_type is None:
self.commit()
else:
self.rollback()
def commit(self) -> None:
self.committed = True
def rollback(self) -> None:
self.committed = FalseIn a database-backed unit of work, commit and rollback would call the underlying transaction methods.
Application layer: use cases that integrate domain + persistence
The application layer should be thin but explicit. It loads domain objects, calls domain methods, persists changes, and returns results. It should not contain SQL, ORM queries, or HTTP request parsing.
Command use case example: add an order line
A command use case changes state. It should be transactional and should treat repositories as the source of truth.
from dataclasses import dataclass
@dataclass(frozen=True)
class AddOrderLine:
order_id: OrderId
sku: str
quantity: int
unit_price_cents: int
class OrderNotFound(Exception):
pass
class AddOrderLineHandler:
def __init__(self, uow: UnitOfWork) -> None:
self._uow = uow
def __call__(self, cmd: AddOrderLine) -> None:
with self._uow:
order = self._uow.orders.get(cmd.order_id)
if order is None:
raise OrderNotFound(cmd.order_id)
order.add_line(
sku=cmd.sku,
quantity=cmd.quantity,
unit_price=Money(cents=cmd.unit_price_cents),
)
self._uow.orders.update(order)
self._uow.commit()Integration details to notice:
- The handler depends on
UnitOfWork(a port), not a database session. - The handler calls a domain method (
order.add_line) to enforce business rules. - Persistence is invoked only at the end via repository update and commit.
Query use case example: read model DTOs
Queries often return read-optimized shapes (DTOs) rather than full domain objects. You can still keep the domain clean by placing DTO definitions and query services in the application layer.
from dataclasses import dataclass
@dataclass(frozen=True)
class OrderLineDTO:
sku: str
quantity: int
unit_price_cents: int
@dataclass(frozen=True)
class OrderDTO:
id: str
status: str
customer_id: str
lines: tuple[OrderLineDTO, ...]
class GetOrderHandler:
def __init__(self, repo: OrderRepository) -> None:
self._repo = repo
def __call__(self, order_id: OrderId) -> OrderDTO:
order = self._repo.get(order_id)
if order is None:
raise OrderNotFound(order_id)
return OrderDTO(
id=str(order.id),
status=order.status.value,
customer_id=str(order.customer_id),
lines=tuple(
OrderLineDTO(
sku=l.sku,
quantity=l.quantity,
unit_price_cents=l.unit_price.cents,
)
for l in order.lines
),
)This pattern avoids leaking persistence records to the outside world and avoids forcing the domain model to serve as an API schema.
Handling identity and database-generated keys
Integration often becomes tricky when the database generates identifiers (auto-increment IDs, UUID defaults) but the domain expects stable identity semantics. Two common approaches:
- Domain-generated IDs: create the ID in the application layer (or domain factory) before persistence. This simplifies mapping because the domain object is complete before saving.
- Database-generated IDs: persist a new record first, then hydrate the domain object with the generated ID. This requires a creation workflow that can return the assigned ID and update the domain instance (or create a new instance with the ID).
When using database-generated IDs, keep the “ID assignment” logic in the repository adapter. The application layer can treat add() as returning the assigned identity.
class OrderRepositoryWithIdentity(Protocol):
def add(self, order: "Order") -> "Order": ... # returns order with assigned idThis keeps the domain free of persistence concerns while still acknowledging that identity may be assigned externally.
Transactions, concurrency, and consistency boundaries
Integration must address what happens when multiple requests update the same aggregate concurrently. The domain model enforces invariants within a single instance; the persistence layer must help enforce consistency across concurrent transactions.
Optimistic concurrency with a version field
A practical approach is to store a version number (or updated timestamp) and check it on update. The domain can carry a version attribute, but the concurrency check belongs in the repository adapter.
class ConcurrencyError(Exception):
pass
@dataclass
class OrderRow:
id: str
status: str
customer_id: str
version: int
class SqlOrderRepository:
def update(self, order: "Order") -> None:
record = OrderMapper.to_record(order)
# Pseudocode: UPDATE ... WHERE id=? AND version=?
# If affected_rows == 0: raise ConcurrencyError
# Else increment version in storage
raise NotImplementedErrorThe application layer can catch ConcurrencyError and decide whether to retry, return a conflict response, or reload and reapply changes.
Unit of work as the consistency boundary
Keep a single use case within a single unit-of-work scope. If a workflow spans multiple aggregates and external calls, decide explicitly where the transaction boundary is. A common pattern is:
- Load and validate domain state.
- Apply domain changes.
- Persist changes in one transaction.
- Trigger side effects (emails, messages) after commit, or via an outbox.
Integrating domain events without leaking infrastructure
Domain events are a clean way to represent “something happened” in the domain. Integration becomes tricky when publishing events requires infrastructure (message broker, queue). The key is to collect events in the domain and publish them in the application layer after persistence succeeds.
Collecting events from aggregates
Assume the domain entity appends events to an internal list when state changes. The application layer can pull and dispatch them.
class EventPublisher(Protocol):
def publish(self, events: list[object]) -> None: ...
class AddOrderLineHandler:
def __init__(self, uow: UnitOfWork, publisher: EventPublisher) -> None:
self._uow = uow
self._publisher = publisher
def __call__(self, cmd: AddOrderLine) -> None:
with self._uow:
order = self._uow.orders.get(cmd.order_id)
if order is None:
raise OrderNotFound(cmd.order_id)
order.add_line(
sku=cmd.sku,
quantity=cmd.quantity,
unit_price=Money(cents=cmd.unit_price_cents),
)
self._uow.orders.update(order)
self._uow.commit()
# publish after commit
self._publisher.publish(order.pull_events())This keeps the domain independent: it only creates event objects; it does not know how they are delivered.
Mapping strategies: when to map and what to map
There are multiple valid mapping strategies, and the right choice depends on complexity and performance needs.
Strategy A: domain-first mapping (recommended default)
- Repositories return domain objects.
- Application layer converts domain objects to DTOs for output.
- Persistence layer maps domain objects to records.
This maximizes domain consistency and keeps business rules centralized.
Strategy B: read models for queries (CQRS-style)
- Commands use domain objects and repositories.
- Queries use specialized read repositories returning DTOs directly (often via SQL joins or materialized views).
This avoids heavy domain hydration for read-heavy endpoints and reduces N+1 query risks. The tradeoff is maintaining two representations: domain for writes, read models for reads.
Strategy C: persistence model as domain model (use cautiously)
Sometimes teams use ORM entities as domain entities. This can be pragmatic for small systems, but it tends to blur boundaries: lazy-loading, session lifetimes, and persistence annotations can leak into business logic. If you choose this approach, be explicit about the tradeoffs and keep domain methods free of session access.
Dealing with nested aggregates and relational schemas
Domain aggregates often contain nested structures (e.g., order lines). Relational schemas often represent these as separate tables. Integration must decide how to load and save them.
Loading an aggregate
Repositories should load the aggregate as a whole, ensuring invariants can be evaluated with complete information. In SQL terms, that often means:
- One query for the root row and one for children, or
- A join query and grouping in memory.
The mapper can accept a composite record (OrderRecord) that includes both root and child rows, as shown earlier.
Saving an aggregate
For child collections, common persistence strategies include:
- Replace-all: delete existing child rows and insert current ones (simple, can be expensive).
- Diff-based: compute added/removed/updated lines and apply minimal changes (more complex, better performance).
Keep the diffing logic in the repository adapter, not in the domain. The domain should express intent (e.g., “remove line”, “change quantity”), while the repository decides how to translate that into SQL operations.
Integrating with external services: anti-corruption mapping
External APIs often have awkward schemas, inconsistent naming, or different concepts. Avoid letting those shapes enter your domain. Instead, create an adapter that maps external DTOs into your own application/domain types.
Example: payment gateway adapter
from dataclasses import dataclass
@dataclass(frozen=True)
class ChargeRequest:
order_id: str
amount_cents: int
currency: str
@dataclass(frozen=True)
class ChargeResult:
provider_charge_id: str
authorized: bool
class PaymentGateway(Protocol):
def charge(self, req: ChargeRequest) -> ChargeResult: ...
class HttpPaymentGateway:
def __init__(self, base_url: str, api_key: str) -> None:
self._base_url = base_url
self._api_key = api_key
def charge(self, req: ChargeRequest) -> ChargeResult:
# Pseudocode: translate to provider JSON
payload = {
"reference": req.order_id,
"amount": req.amount_cents,
"currency": req.currency,
}
# send HTTP request here...
provider_response = {"id": "ch_123", "status": "AUTHORIZED"}
return ChargeResult(
provider_charge_id=provider_response["id"],
authorized=(provider_response["status"] == "AUTHORIZED"),
)The application layer uses PaymentGateway (port). The adapter translates to provider-specific JSON and back. If the provider changes fields, only the adapter changes.
Practical integration checklist
- Keep domain imports clean: domain code should not import ORM models, DB sessions, HTTP clients, or framework request objects.
- Use mappers as pure translators: mapping code should be deterministic and easy to unit test.
- Define ports where they are used: application layer defines repository/gateway interfaces; infrastructure implements them.
- Make transaction boundaries explicit: one use case, one unit-of-work scope; commit/rollback handled in one place.
- Publish side effects after commit: avoid emitting messages for changes that might roll back; consider an outbox when reliability matters.
- Separate read and write shapes when needed: DTOs/read models can optimize queries without compromising domain integrity.
- Handle concurrency in persistence: use version checks or locking strategies in repositories, not in domain methods.