Free Ebook cover Python Data Modeling in Practice: Dataclasses, Pydantic, and Type Hints

Python Data Modeling in Practice: Dataclasses, Pydantic, and Type Hints

New course

14 pages

Maintaining Model Evolution: Versioning and Backward Compatibility

Capítulo 13

Estimated reading time: 13 minutes

+ Exercise

Why model evolution needs explicit design

Once a model is used outside the module that defines it, it becomes a contract. That contract may be consumed by other services, stored in databases, emitted as events, written to files, or cached. When you change the model, you are changing the contract. “Model evolution” is the discipline of changing models while keeping the system operable across time: old data must still load, old messages must still be understood, and new producers must not break old consumers.

Backward compatibility means newer code can accept older representations. Forward compatibility means older code can tolerate newer representations (often by ignoring unknown fields). In practice, you usually aim for backward compatibility first, and add forward-compatibility where you can (especially for APIs and events).

Versioning is how you make evolution explicit. It can be a field in the payload, a schema identifier, a topic name, an API path, or a database migration version. The key is that versioning is not only about numbering; it is about defining rules for how versions relate and how code handles them.

Where versioning shows up in Python data models

In Python data modeling, versioning concerns typically appear at boundaries where data crosses time or process boundaries:

  • Persisted records (database rows, JSON blobs, pickled data, files). Old records must remain readable.

    Continue in our app.

    You can listen to the audiobook with the screen off, receive a free certificate for this course, and also have access to 5,000 other free online courses.

    Or continue reading below...
    Download App

    Download the app

  • APIs (request/response payloads). Clients and servers may upgrade independently.

  • Events and messages (Kafka, SQS, RabbitMQ). Producers and consumers are decoupled and may run different versions concurrently.

  • Caches (Redis). Cached payloads can outlive deployments.

Within a single running process, you can refactor freely. The moment data is stored or exchanged, you need a compatibility strategy.

Compatibility rules you can enforce

Additive changes (usually safe)

  • Add a new optional field with a default.

  • Add a new enum value if consumers treat unknown values safely (often not true).

  • Add a new event type while keeping old ones.

Breaking changes (need a plan)

  • Rename a field (breaks deserialization unless you support aliases).

  • Change a field type (e.g., int to str).

  • Change meaning/units (e.g., dollars to cents) without a new field.

  • Make a previously optional field required.

  • Split/merge fields (e.g., full_name into first_name/last_name).

A useful mindset: if old data cannot be interpreted unambiguously by new code, you must either keep the old interpretation around or introduce a new versioned representation.

Strategy 1: In-payload schema versioning

The simplest approach is to include a schema_version (or similar) field in the serialized form. Your Python code then routes decoding based on that version.

Step-by-step: evolve a JSON record from v1 to v2

Scenario: you stored user profiles as JSON. Version 1 had name as a single string. Version 2 splits it into given_name and family_name, but you must still load old records.

from __future__ import annotations

from dataclasses import dataclass
from typing import Any, Mapping, TypedDict, Literal


class UserProfileV1(TypedDict):
    schema_version: Literal[1]
    user_id: str
    name: str


class UserProfileV2(TypedDict):
    schema_version: Literal[2]
    user_id: str
    given_name: str
    family_name: str


@dataclass(frozen=True)
class UserProfile:
    user_id: str
    given_name: str
    family_name: str


def _parse_v1(payload: UserProfileV1) -> UserProfile:
    parts = payload["name"].strip().split(" ", 1)
    given = parts[0]
    family = parts[1] if len(parts) == 2 else ""
    return UserProfile(user_id=payload["user_id"], given_name=given, family_name=family)


def _parse_v2(payload: UserProfileV2) -> UserProfile:
    return UserProfile(
        user_id=payload["user_id"],
        given_name=payload["given_name"],
        family_name=payload["family_name"],
    )


def parse_user_profile(payload: Mapping[str, Any]) -> UserProfile:
    version = payload.get("schema_version", 1)  # default for legacy data
    if version == 1:
        return _parse_v1(payload)  # type: ignore[arg-type]
    if version == 2:
        return _parse_v2(payload)  # type: ignore[arg-type]
    raise ValueError(f"Unsupported schema_version: {version}")


def dump_user_profile(profile: UserProfile) -> UserProfileV2:
    return {
        "schema_version": 2,
        "user_id": profile.user_id,
        "given_name": profile.given_name,
        "family_name": profile.family_name,
    }

Key points:

  • Decoding supports multiple versions; encoding emits only the latest version.

  • Legacy defaulting (schema_version missing) is a common real-world need.

  • Conversion logic is explicit and testable.

When to prefer this approach

  • Files and document storage where each record carries its own version.

  • Events where consumers need to know how to interpret a payload.

  • Systems where you cannot guarantee coordinated deployments.

Strategy 2: Field aliasing and dual-write for renames

Renaming is one of the most common sources of breakage. A robust pattern is: read both names, write the new name, and keep the old name for a deprecation window.

Step-by-step: rename phone to phone_number

Assume old payloads use phone. New payloads should use phone_number. During migration, accept both.

from dataclasses import dataclass
from typing import Any, Mapping


@dataclass(frozen=True)
class Contact:
    user_id: str
    phone_number: str | None


def parse_contact(payload: Mapping[str, Any]) -> Contact:
    phone_number = payload.get("phone_number")
    if phone_number is None:
        phone_number = payload.get("phone")  # legacy alias

    return Contact(
        user_id=str(payload["user_id"]),
        phone_number=str(phone_number) if phone_number is not None else None,
    )


def dump_contact(contact: Contact) -> dict[str, Any]:
    # write only the new field
    return {
        "user_id": contact.user_id,
        "phone_number": contact.phone_number,
    }

Operationally, this pattern is often paired with a “dual-write” phase if you have downstream consumers that still expect the old field. In that case, you might temporarily emit both fields:

def dump_contact_dual_write(contact: Contact) -> dict[str, Any]:
    return {
        "user_id": contact.user_id,
        "phone_number": contact.phone_number,
        "phone": contact.phone_number,  # temporary compatibility
    }

Dual-write should be time-boxed and tracked, because it increases payload size and prolongs ambiguity.

Strategy 3: Upcasters (normalize old data at the boundary)

An upcaster is a function that transforms older representations into the newest representation before the rest of your code sees it. This keeps the internal model stable and pushes compatibility logic to the edge.

Upcasters are especially useful when you have many versions. Instead of writing N parsers, you can chain transformations: v1 → v2 → v3, etc. Each step is small and focused.

Step-by-step: chain upcasters

from typing import Any, Mapping


def upcast_to_v2(payload: dict[str, Any]) -> dict[str, Any]:
    if payload.get("schema_version", 1) != 1:
        return payload

    name = payload.pop("name")
    parts = name.strip().split(" ", 1)
    payload["given_name"] = parts[0]
    payload["family_name"] = parts[1] if len(parts) == 2 else ""
    payload["schema_version"] = 2
    return payload


def normalize_user_payload(payload: Mapping[str, Any]) -> dict[str, Any]:
    data = dict(payload)
    data = upcast_to_v2(data)
    # future: data = upcast_to_v3(data)
    return data

Then your main parser can assume v2 fields exist. The benefit is that your domain-facing constructor stays clean, and you can remove old upcasters once data has been migrated and old producers retired.

Strategy 4: Downcasters (support old consumers)

Sometimes you must produce older versions for compatibility (e.g., an external client cannot upgrade quickly). A downcaster transforms your current model into an older schema. This is common in APIs with explicit versioned endpoints.

Downcasting is riskier than upcasting because information may be lost. If v2 has more fields than v1, you must decide defaults or omit data. Make that loss explicit and documented in code.

from dataclasses import asdict


def dump_user_v1(profile: UserProfile) -> dict[str, Any]:
    # v1 only had a single name field
    name = (profile.given_name + " " + profile.family_name).strip()
    return {
        "schema_version": 1,
        "user_id": profile.user_id,
        "name": name,
    }


def dump_user_v2(profile: UserProfile) -> dict[str, Any]:
    d = asdict(profile)
    return {"schema_version": 2, **d}

Strategy 5: Database migrations plus tolerant readers

For relational databases, schema migrations are the primary tool, but they rarely eliminate the need for tolerant reading. During a rolling deployment, some application instances may still write the old shape while others read the new shape.

A practical approach is the “expand and contract” pattern:

  • Expand: add new columns/tables while keeping old ones; write both if needed.

  • Migrate: backfill data from old to new.

  • Switch reads: read from the new columns, still tolerate old ones.

  • Contract: remove old columns after all writers are updated.

Step-by-step: split name into two columns

At the model boundary (row → object), read new columns if present, otherwise derive from old:

from dataclasses import dataclass
from typing import Any


@dataclass(frozen=True)
class UserRowModel:
    user_id: str
    given_name: str
    family_name: str


def user_from_row(row: dict[str, Any]) -> UserRowModel:
    given = row.get("given_name")
    family = row.get("family_name")

    if given is None and family is None:
        # legacy column
        name = (row.get("name") or "").strip()
        parts = name.split(" ", 1)
        given = parts[0] if parts else ""
        family = parts[1] if len(parts) == 2 else ""

    return UserRowModel(user_id=str(row["user_id"]), given_name=str(given), family_name=str(family))

This keeps the rest of your code insulated from the migration timeline.

Strategy 6: Event schema evolution (producers, consumers, and time)

Events are often the hardest place to evolve models because old events remain in logs, and consumers may replay them. A useful rule: treat event payloads as immutable historical facts. Instead of changing an event in place, introduce a new event version or a new event type.

Versioned event envelope

Wrap event data in an envelope that includes a type and version. Consumers route based on both.

from dataclasses import dataclass
from typing import Any, Mapping


@dataclass(frozen=True)
class EventEnvelope:
    event_type: str
    schema_version: int
    data: dict[str, Any]


def parse_envelope(payload: Mapping[str, Any]) -> EventEnvelope:
    return EventEnvelope(
        event_type=str(payload["event_type"]),
        schema_version=int(payload.get("schema_version", 1)),
        data=dict(payload.get("data", {})),
    )

Then implement per-event upcasters. For example, UserRegistered v1 had name, v2 has split names.

def upcast_user_registered(envelope: EventEnvelope) -> EventEnvelope:
    if envelope.event_type != "UserRegistered":
        return envelope

    if envelope.schema_version == 1:
        data = dict(envelope.data)
        name = (data.pop("name") or "").strip()
        parts = name.split(" ", 1)
        data["given_name"] = parts[0] if parts else ""
        data["family_name"] = parts[1] if len(parts) == 2 else ""
        return EventEnvelope(event_type=envelope.event_type, schema_version=2, data=data)

    return envelope

This approach keeps replay safe: you can always interpret old events by upcasting them to the latest internal representation.

Deprecation windows and compatibility budgets

Backward compatibility is not free. Every supported version adds code paths, tests, and operational complexity. A practical way to manage this is to define:

  • Deprecation windows: how long you will accept old versions (e.g., 90 days for API clients, 6 months for file formats).

  • Compatibility budgets: how many versions you support simultaneously (e.g., current and previous only).

  • Removal criteria: metrics or dates that trigger deletion of old code (e.g., no v1 traffic for 30 days).

In code, you can make deprecations visible by logging when legacy paths are used. That gives you evidence to safely remove them later.

import logging
from typing import Any, Mapping

logger = logging.getLogger(__name__)


def parse_with_legacy_notice(payload: Mapping[str, Any]) -> UserProfile:
    version = payload.get("schema_version", 1)
    if version == 1:
        logger.info("Parsing legacy UserProfile schema_version=1")
    return parse_user_profile(payload)

Semantic versioning for schemas (and what it really means)

Semantic versioning (MAJOR.MINOR.PATCH) can be applied to schemas, but only if you define what “breaking” means in your context. For many payload schemas:

  • PATCH: clarifications, documentation-only changes, or relaxing constraints that do not change the shape.

  • MINOR: additive changes (new optional fields), new event types, new enum values only if consumers are tolerant.

  • MAJOR: removals, renames without aliasing, type changes, meaning changes, making optional required.

Be careful with enums and unions: adding a new variant is “additive” structurally, but can be behaviorally breaking if consumers assume an exhaustive set. If you cannot guarantee tolerant handling, treat new variants as MAJOR or introduce a new field that carries the new concept without changing existing ones.

Designing models for forward compatibility

Forward compatibility is about letting older code survive newer payloads. Common techniques:

  • Ignore unknown fields when parsing external payloads.

  • Use feature detection instead of version checks when possible (e.g., “if field exists, use it”).

  • Prefer additive changes over in-place modifications.

  • Keep stable identifiers for fields and event types; avoid reusing names with new meanings.

In Python, a simple forward-compatible parser pattern is to extract only what you need and ignore the rest:

from typing import Any, Mapping


def parse_minimal(payload: Mapping[str, Any]) -> dict[str, Any]:
    # Only pick known keys; ignore unknown keys for forward compatibility
    return {
        "user_id": str(payload["user_id"]),
        "given_name": str(payload.get("given_name", "")),
        "family_name": str(payload.get("family_name", "")),
    }

This pattern is especially important for services that must keep running while other teams add fields.

Handling type changes safely

Type changes are common when early versions used “convenient” types (like strings) and later versions need structure (like objects), or when numeric fields change units. A safe migration typically uses a new field rather than changing the existing one.

Example: migrate amount from float dollars to integer cents

Instead of changing amount in place, introduce amount_cents and keep amount as legacy. Read either, write the new one.

from dataclasses import dataclass
from decimal import Decimal, ROUND_HALF_UP
from typing import Any, Mapping


@dataclass(frozen=True)
class Payment:
    payment_id: str
    amount_cents: int


def parse_payment(payload: Mapping[str, Any]) -> Payment:
    if "amount_cents" in payload:
        cents = int(payload["amount_cents"])
    else:
        # legacy float dollars
        dollars = Decimal(str(payload.get("amount", "0")))
        cents = int((dollars * 100).quantize(Decimal("1"), rounding=ROUND_HALF_UP))

    return Payment(payment_id=str(payload["payment_id"]), amount_cents=cents)


def dump_payment(payment: Payment) -> dict[str, Any]:
    return {"payment_id": payment.payment_id, "amount_cents": payment.amount_cents}

This avoids ambiguity and makes unit changes explicit.

Organizing versioned code without chaos

As versions accumulate, code organization matters. A few maintainable structures:

  • Versioned modules: schemas/v1.py, schemas/v2.py, plus schemas/normalize.py for upcasters.

  • Single public parser that returns the current internal model, hiding legacy details.

  • Explicit conversion functions named from_v1, to_v1, upcast_v1_to_v2.

  • Central registry for event type/version handlers to avoid long if chains.

Example: registry-based routing for event upcasters

from typing import Callable

Upcaster = Callable[[EventEnvelope], EventEnvelope]


_UPCASTERS: dict[tuple[str, int], Upcaster] = {}


def register(event_type: str, from_version: int) -> Callable[[Upcaster], Upcaster]:
    def decorator(fn: Upcaster) -> Upcaster:
        _UPCASTERS[(event_type, from_version)] = fn
        return fn
    return decorator


@register("UserRegistered", 1)
def _upcast_user_registered_v1(envelope: EventEnvelope) -> EventEnvelope:
    return upcast_user_registered(envelope)


def upcast_event(envelope: EventEnvelope) -> EventEnvelope:
    current = envelope
    while True:
        fn = _UPCASTERS.get((current.event_type, current.schema_version))
        if fn is None:
            return current
        current = fn(current)

This pattern makes it easy to add a new upcaster without editing a large conditional block.

Operational checklist for safe model evolution

Before you change a model

  • Identify where the model is persisted or exchanged (DB, events, caches, files, APIs).

  • Decide whether the change is additive or breaking.

  • Choose a strategy: aliasing, upcasting, new version field, new event type, expand/contract migration.

  • Define the deprecation window and how you will measure remaining legacy usage.

Implementing the change

  • Update readers first (tolerant parsing, support old versions/fields).

  • Deploy readers.

  • Update writers to emit the new format (optionally dual-write temporarily).

  • Backfill/migrate stored data if needed.

  • Add logging/metrics for legacy paths.

After the rollout

  • Monitor legacy usage and errors.

  • Remove dual-write once consumers are updated.

  • Remove legacy parsing paths when the deprecation window ends and metrics confirm safety.

Now answer the exercise about the content:

Which approach best keeps your internal Python model stable by transforming older payloads into the newest representation before the rest of the code processes them?

You are right! Congratulations, now go to the next page

You missed! Try again.

Upcasters convert older representations to the latest schema at the boundary, keeping the internal model consistent and pushing compatibility logic to the edge.

Next chapter

Integrating Domain Models with Application and Persistence Layers

Arrow Right Icon
Download the app to earn free Certification and listen to the courses in the background, even with the screen off.