SQLAlchemy integration for a small Flask service
This chapter focuses on using SQLAlchemy in a way that stays maintainable as your service grows: define models with clear constraints, use sessions with explicit transaction boundaries, and keep database access logic out of routes by using repositories/services and query helpers.
Choose an integration style
You can integrate SQLAlchemy in two common ways:
- Flask-SQLAlchemy: convenient defaults and integration with Flask context. Good for small services.
- Plain SQLAlchemy: explicit engine/session management. Good when you want maximum control or share code outside Flask.
Below uses plain SQLAlchemy to make session lifecycle and transaction boundaries explicit, but the same patterns apply if you use Flask-SQLAlchemy (the main difference is where the session comes from).
Step-by-step: engine, session factory, and scoped session
1) Create the engine and session factory
Put this in a dedicated module (e.g., db.py). The goal: a single place that defines how connections and sessions are created.
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
# Example: PostgreSQL URL: postgresql+psycopg://user:pass@host:5432/dbname
def make_engine(database_url: str):
return create_engine(
database_url,
pool_pre_ping=True, # helps with stale connections
future=True,
)
def make_session_factory(engine):
return sessionmaker(
bind=engine,
autoflush=False,
autocommit=False,
expire_on_commit=False,
future=True,
)expire_on_commit=False is often helpful for APIs: after commit, objects keep their loaded attributes, which avoids surprising lazy-loads after the transaction is closed.
Continue in our app.
You can listen to the audiobook with the screen off, receive a free certificate for this course, and also have access to 5,000 other free online courses.
Or continue reading below...Download the app
2) Base class and metadata
Define a declarative base for models. Keep it in models/base.py so all models share the same metadata.
from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
pass3) Create tables (for development)
In production you typically use migrations (e.g., Alembic). For local development or tests, you can create tables from metadata:
from .models.base import Base
def create_all(engine):
Base.metadata.create_all(engine)Defining models: constraints, indexes, and relationships
Models should encode invariants: uniqueness, non-nullability, foreign keys, and indexes. This reduces application-side complexity and prevents data corruption.
Example domain: users and API keys
We will model:
Userwith unique emailApiKeybelonging to a user, unique token
import sqlalchemy as sa
from sqlalchemy.orm import Mapped, mapped_column, relationship
from .base import Base
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(sa.Integer, primary_key=True)
email: Mapped[str] = mapped_column(sa.String(255), nullable=False, unique=True)
name: Mapped[str | None] = mapped_column(sa.String(120), nullable=True)
created_at: Mapped[sa.DateTime] = mapped_column(
sa.DateTime(timezone=True),
server_default=sa.func.now(),
nullable=False,
)
api_keys: Mapped[list["ApiKey"]] = relationship(
back_populates="user",
cascade="all, delete-orphan",
lazy="selectin", # helps avoid N+1 for common access patterns
)
__table_args__ = (
sa.Index("ix_users_created_at", "created_at"),
)
class ApiKey(Base):
__tablename__ = "api_keys"
id: Mapped[int] = mapped_column(sa.Integer, primary_key=True)
token: Mapped[str] = mapped_column(sa.String(64), nullable=False, unique=True)
user_id: Mapped[int] = mapped_column(sa.ForeignKey("users.id"), nullable=False)
revoked_at: Mapped[sa.DateTime | None] = mapped_column(sa.DateTime(timezone=True), nullable=True)
user: Mapped[User] = relationship(back_populates="api_keys")
__table_args__ = (
sa.Index("ix_api_keys_user_id", "user_id"),
sa.CheckConstraint("length(token) >= 32", name="ck_api_keys_token_len"),
)Notes on constraints and indexes
- Unique constraints (e.g.,
email,token) enforce business rules at the database level. Your code must handle conflicts gracefully. - Indexes should match query patterns. If you frequently list users by
created_at, index it. If you filter API keys byuser_id, index it. - Relationship loading:
lazy="selectin"is a good default for one-to-many in APIs; it reduces N+1 queries when loading collections for multiple parents.
Session lifecycle patterns and transaction boundaries
A SQLAlchemy Session is a unit-of-work: it tracks changes and coordinates database interaction. The key design decision is where the session is created and where it is committed/rolled back.
Recommended pattern: one session per request, commit at the boundary
In a web service, a common pattern is:
- Create a session at the start of the request
- Run repository/service operations using that session
- Commit once if everything succeeded
- Rollback on any exception
- Close the session at the end
This keeps transaction boundaries clear and avoids partial commits.
Context manager for reliable commit/rollback
Even if you integrate session creation with Flask request hooks, it is useful to have a reusable transaction helper for scripts, background jobs, and tests.
from contextlib import contextmanager
from sqlalchemy.exc import SQLAlchemyError
@contextmanager
def session_scope(SessionFactory):
session = SessionFactory()
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()Important detail: rollback on any exception, not only database exceptions. A Python error after a write should still rollback.
When to use flush()
flush() sends pending changes to the database without committing. Use it when you need generated values (like primary keys) before the transaction ends.
user = User(email=email)
session.add(user)
session.flush() # user.id is now available (within the transaction)Nested operations: avoid committing inside repositories
Repositories should not call commit() by default. If they commit, you lose the ability to group multiple operations into one transaction (and you can end up with partial updates). Instead:
- Repositories:
add,get,list,delete,update(no commit) - Service layer: orchestrates multiple repository calls and decides transaction boundaries
Maintainable structure: models, repositories/services, query helpers
Suggested module layout
| Module | Responsibility |
|---|---|
models/ | ORM classes only (plus small model-level helpers) |
repositories/ | Database access functions: queries and persistence |
services/ | Business workflows that combine multiple repositories |
queries/ | Reusable query builders (filters, pagination, eager loading options) |
This separation makes it easier to test business logic without HTTP and to reuse query patterns consistently.
CRUD examples with careful error handling
Below are practical repository/service examples. They assume you already have consistent API error responses elsewhere; here we focus on mapping database problems to domain-level exceptions.
Define domain exceptions
class NotFoundError(Exception):
pass
class ConflictError(Exception):
passCreate: handle unique constraint conflicts
Uniqueness is enforced by the database. Your code should catch integrity errors and translate them into a conflict.
from sqlalchemy.exc import IntegrityError
def create_user(session, *, email: str, name: str | None = None):
user = User(email=email, name=name)
session.add(user)
try:
session.flush() # detect constraint violations early
except IntegrityError as e:
session.rollback() # session is now clean for further use
# In a real service, inspect e.orig / constraint name if you need precision
raise ConflictError("email already exists") from e
return userWhy flush()? It forces the INSERT to happen now, so you can detect conflicts before doing more work in the same transaction.
Read: missing records
import sqlalchemy as sa
def get_user_by_id(session, user_id: int) -> User:
user = session.get(User, user_id)
if user is None:
raise NotFoundError("user not found")
return user
def get_user_by_email(session, email: str) -> User:
stmt = sa.select(User).where(User.email == email)
user = session.execute(stmt).scalar_one_or_none()
if user is None:
raise NotFoundError("user not found")
return userUpdate: optimistic approach with flush
from sqlalchemy.exc import IntegrityError
def update_user_email(session, user_id: int, *, new_email: str) -> User:
user = get_user_by_id(session, user_id)
user.email = new_email
try:
session.flush()
except IntegrityError as e:
session.rollback()
raise ConflictError("email already exists") from e
return userNote: after rollback(), ORM instances may be expired/detached depending on configuration. A simple strategy is to stop using the instance after rollback and re-load if needed.
Delete: report missing record
def delete_user(session, user_id: int) -> None:
user = session.get(User, user_id)
if user is None:
raise NotFoundError("user not found")
session.delete(user)
session.flush()Service function: orchestrate multiple operations in one transaction
Example: create a user and an API key together. If key creation fails, the user creation should rollback too.
import secrets
def provision_user_with_key(session, *, email: str, name: str | None = None) -> tuple[User, ApiKey]:
user = create_user(session, email=email, name=name)
token = secrets.token_hex(32)
key = ApiKey(token=token, user=user)
session.add(key)
try:
session.flush()
except IntegrityError as e:
session.rollback()
# token conflict is unlikely but possible; email conflict handled earlier
raise ConflictError("could not provision api key") from e
return user, keyQuery helpers: pagination and reusable filters
Offset/limit pagination (simple and common)
Offset/limit is easy but can become slow for large offsets. It is still fine for many small services.
import sqlalchemy as sa
def paginate(stmt, *, limit: int, offset: int):
limit = max(1, min(limit, 100))
offset = max(0, offset)
return stmt.limit(limit).offset(offset)
def list_users(session, *, limit: int = 50, offset: int = 0) -> list[User]:
stmt = sa.select(User).order_by(User.created_at.desc())
stmt = paginate(stmt, limit=limit, offset=offset)
return list(session.execute(stmt).scalars().all())Keyset pagination (better for large datasets)
Keyset pagination avoids large offsets by using a stable cursor (e.g., created_at + id). This requires an index that matches the ordering.
def list_users_after(session, *, after_created_at=None, after_id=None, limit: int = 50):
import sqlalchemy as sa
stmt = sa.select(User).order_by(User.created_at.desc(), User.id.desc())
if after_created_at is not None and after_id is not None:
stmt = stmt.where(
sa.tuple_(User.created_at, User.id) < sa.tuple_(after_created_at, after_id)
)
stmt = stmt.limit(max(1, min(limit, 100)))
return list(session.execute(stmt).scalars().all())Performance basics: avoiding N+1 queries and controlling eager loading
Recognize the N+1 problem
N+1 happens when you load a list of parent rows, then for each parent you lazily load children in a separate query. Example: list users, then access user.api_keys for each user.
Use selectinload for collections
selectinload performs one query for parents and one additional query for all related children using an IN clause.
import sqlalchemy as sa
from sqlalchemy.orm import selectinload
def list_users_with_keys(session, *, limit: int = 50, offset: int = 0):
stmt = (
sa.select(User)
.options(selectinload(User.api_keys))
.order_by(User.created_at.desc())
.limit(limit)
.offset(offset)
)
return list(session.execute(stmt).scalars().all())Use joinedload carefully
joinedload can be efficient for many-to-one (e.g., loading ApiKey.user), but for one-to-many it can multiply rows and require unique() handling. Prefer selectinload for collections unless you have a measured reason.
from sqlalchemy.orm import joinedload
def get_api_key_with_user(session, token: str) -> ApiKey:
import sqlalchemy as sa
stmt = sa.select(ApiKey).options(joinedload(ApiKey.user)).where(ApiKey.token == token)
key = session.execute(stmt).scalar_one_or_none()
if key is None:
raise NotFoundError("api key not found")
return keyCount queries for pagination metadata
If you need total counts, do it explicitly. Avoid counting with joins that inflate counts.
def count_users(session) -> int:
import sqlalchemy as sa
stmt = sa.select(sa.func.count()).select_from(User)
return session.execute(stmt).scalar_one()Reliable transaction handling in request-driven code
Pattern: commit/rollback in one place
Whether you implement this as middleware, hooks, or a dependency provider, the key is: routes call services; services call repositories; only the request boundary commits.
A simplified example of a request-scoped transaction wrapper (framework-agnostic) looks like:
def handle_request(SessionFactory, handler_func):
session = SessionFactory()
try:
result = handler_func(session)
session.commit()
return result
except Exception:
session.rollback()
raise
finally:
session.close()Don’t keep sessions around
Sessions should be short-lived. Avoid storing a session on global objects or caching ORM instances across requests. If you need caching, cache serialized data or identifiers, not live ORM objects.
Be explicit about transaction boundaries for read-only operations
Read-only endpoints can still use a session and close it without committing. If you use session_scope for reads, it will commit (which is usually harmless but unnecessary). For clarity, you can provide a read-only scope:
from contextlib import contextmanager
@contextmanager
def read_session_scope(SessionFactory):
session = SessionFactory()
try:
yield session
finally:
session.close()