Cookbook · Postgres · 2026

Postgres & SQLAlchemy — Everyday Patterns

The DB recipes you reach for in every project: timestamp mixins, UUID keys, the two kinds of pagination, upsert, soft delete, killing N+1, fast counts, transactions, full-text search, bulk insert, pool config, and indexes that actually get used. Every recipe is SQLAlchemy 2.0 (Mapped / mapped_column / select()) with the raw SQL shown where it earns its place.

How to use this shelf These are reusable patterns, not a tutorial — skim once so you know what's here, then Ctrl-F when you hit the problem in real code. Everything is 2.0-only: you will never see legacy session.query(...) here. Where raw SQL makes the intent clearer (upsert, keyset, full-text), it sits next to its SQLAlchemy equivalent so you can drop either into DocChat or any FastAPI app.
On this shelf
  1. Modellingtimestamp mixin · UUID primary keys
  2. Readingoffset pagination · keyset / seek · kill N+1 · exists / count
  3. Writingupsert · soft delete · one transaction per unit of work · bulk insert
  4. Performancefull-text search · connection pool · indexes that get used

Modelling 2 recipes

Timestamp mixin — created_at / updated_at on every model

When: almost always. The moment a table holds data you'll want to know when a row was made and last touched — for audit, sorting, and "what changed". Define it once, mix it into every model.

mixins.py
from datetime import datetime
from sqlalchemy import func
from sqlalchemy.orm import Mapped, mapped_column


class TimestampMixin:
    """Add created_at / updated_at to any model. Postgres fills both."""
    created_at: Mapped[datetime] = mapped_column(
        server_default=func.now()
    )
    updated_at: Mapped[datetime] = mapped_column(
        server_default=func.now(),
        onupdate=func.now(),     # bumped on every UPDATE
    )
models.py — reuse it
class Document(TimestampMixin, Base):
    __tablename__ = "documents"
    id: Mapped[int] = mapped_column(primary_key=True)
    title: Mapped[str]

server_default=func.now() means Postgres sets the timestamp, not Python — so a raw INSERT or a different service writing the same table still gets a correct value. onupdate=func.now() is emitted by SQLAlchemy on every ORM UPDATE, keeping updated_at honest without you touching it.

onupdate= fires only on ORM-tracked updates. A bulk update() statement or a raw UPDATE straight to Postgres will not bump updated_at unless you set it in the statement or add a DB trigger. Don't assume the column self-maintains under bulk writes.

UUID primary keys vs bigint identity

When: reach for UUID when IDs are exposed in URLs/APIs, generated client-side, or must merge across shards/services without collision. Stick with bigint identity for internal, high-write, append-only tables where index locality matters.

models.py
import uuid
from sqlalchemy.orm import Mapped, mapped_column


# UUID PK — opaque, unguessable, safe to expose
class ApiKey(Base):
    __tablename__ = "api_keys"
    id: Mapped[uuid.UUID] = mapped_column(
        primary_key=True, default=uuid.uuid4
    )


# bigint identity — compact, sequential, great index locality
class Event(Base):
    __tablename__ = "events"
    id: Mapped[int] = mapped_column(primary_key=True)   # GENERATED … AS IDENTITY

SQLAlchemy maps Mapped[uuid.UUID] to Postgres' native uuid type (16 bytes, not a 36-char string). UUIDs hide row counts and growth rate from anyone reading a URL, and let two services mint IDs without coordinating. Bigint stays smaller and keeps inserts at the "right" end of the B-tree.

Random uuid4 keys scatter inserts across the index, hurting cache locality and bloating it on very high-write tables. If you want UUID benefits and ordered inserts, use a time-ordered UUIDv7 (default=uuid.uuid7 on Python 3.14+, or a helper) rather than uuid4.

Reading 4 recipes

Offset pagination — the standard list endpoint

When: a paged list UI where users jump to arbitrary page numbers and the table is small-to-medium (tens of thousands of rows). The default, boring, correct choice for most admin lists.

raw SQL
SELECT * FROM documents
ORDER BY created_at DESC
LIMIT 20 OFFSET 40;   -- page 3, size 20
SQLAlchemy 2.0
from sqlalchemy import select, func


def list_documents(session, page: int, size: int = 20):
    stmt = (
        select(Document)
        .order_by(Document.created_at.desc())
        .limit(size)
        .offset((page - 1) * size)
    )
    items = session.scalars(stmt).all()

    # total for the page count / "X of Y" header
    total = session.scalar(
        select(func.count()).select_from(Document)
    )
    return {"items": items, "total": total, "page": page}

Always include a deterministic ORDER BY — without one, OFFSET returns arbitrary rows and pages overlap or drop records. Run the count as a separate cheap query (see the exists / count recipe) rather than materialising every row.

OFFSET still scans and discards every skipped row. OFFSET 1000000 reads a million rows before returning twenty — fine for page 3, brutal for deep pages on a big table. For that, use keyset pagination instead.

Keyset / seek pagination — infinite scroll on big tables

When: "load more" / infinite scroll, or any large table where users page deep. It stays fast at page 1 and page 100,000 alike because it never counts past rows.

raw SQL — page after the last seen (created_at, id)
SELECT * FROM documents
WHERE (created_at, id) < (:last_created, :last_id)
ORDER BY created_at DESC, id DESC
LIMIT 20;
SQLAlchemy 2.0 — tuple_() comparison
from sqlalchemy import select, tuple_


def page_after(session, last_created=None, last_id=None, size=20):
    stmt = select(Document).order_by(
        Document.created_at.desc(), Document.id.desc()
    ).limit(size)

    if last_created is not None:        # first page has no cursor
        stmt = stmt.where(
            tuple_(Document.created_at, Document.id)
            < tuple_(last_created, last_id)
        )
    return session.scalars(stmt).all()

The composite (created_at, id) comparison gives a total order even when timestamps tie, so you never skip or repeat a row. With an index on (created_at DESC, id DESC) Postgres seeks straight to the cursor — constant time regardless of depth. Pass the last row's created_at/id back as the "next cursor".

Keyset can't jump to "page 47" — it only goes forward/back from a cursor. If the UI needs numbered page links, you're stuck with offset (or fake it). Also: the ORDER BY columns must exactly match the index column order, or Postgres falls back to a sort.

Kill N+1 with eager loading

When: you load a list of parents and then touch a relationship on each (authors → their books, orders → their lines). The default lazy load fires one query per parent — the N+1 trap.

queries.py
from sqlalchemy import select
from sqlalchemy.orm import selectinload, joinedload

# Collections (one-to-many / many-to-many) → selectinload
# one extra query: ... WHERE author_id IN (...)
authors = session.scalars(
    select(Author).options(selectinload(Author.books))
).all()
for a in authors:
    print(a.name, len(a.books))   # no extra query — already loaded

# Many-to-one (the single parent) → joinedload (one JOIN)
books = session.scalars(
    select(Book).options(joinedload(Book.author))
).all()

Collections want selectinload — it emits one extra WHERE id IN (...) query and never multiplies rows. Many-to-one wants joinedload — a single JOIN pulls the parent in the same round trip. Both collapse N+1 into a constant number of queries.

Don't joinedload a collection: the join repeats the parent row once per child, bloating the result set and confusing LIMIT. And eager options don't chain magically across two hops — nest them: selectinload(Author.books).selectinload(Book.reviews).

Efficient exists & count — never len(query.all())

When: you need to know "does any row match?" or "how many?" — for a guard clause, a badge count, or a pagination total. Don't load rows just to count them.

queries.py
from sqlalchemy import select, exists, func

# EXISTS — stops at the first match, returns a bool
has_admin = session.scalar(
    select(exists().where(User.role == "admin"))
)

# COUNT — done in Postgres, returns an int
active = session.scalar(
    select(func.count()).select_from(User).where(User.active)
)
raw SQL equivalents
SELECT EXISTS(SELECT 1 FROM users WHERE role = 'admin');
SELECT count(*) FROM users WHERE active;

session.scalar(...) returns the single value directly — a bool for exists(), an int for count(). EXISTS short-circuits at the first matching row, so it's far cheaper than counting when you only need yes/no.

len(session.scalars(stmt).all()) drags every matching row across the wire and builds full ORM objects just to throw them away. On a big table that's the difference between a 2 ms count and loading a million rows into memory.

Writing 4 recipes

Upsert — INSERT … ON CONFLICT DO UPDATE

When: "insert it, or update it if it already exists" — syncing external data, idempotent webhooks, counters, settings keyed by a unique column. One atomic statement instead of a read-then-write race.

SQLAlchemy 2.0 — postgresql dialect insert
from sqlalchemy.dialects.postgresql import insert

stmt = insert(User).values(
    email="sam@uae.dev", name="Sam"
)
stmt = stmt.on_conflict_do_update(
    index_elements=[User.email],          # the unique constraint
    set_={"name": stmt.excluded.name},     # EXCLUDED = the row we tried to insert
).returning(User.id)

new_id = session.scalar(stmt)
session.commit()
raw SQL
INSERT INTO users (email, name)
VALUES ('sam@uae.dev', 'Sam')
ON CONFLICT (email) DO UPDATE
  SET name = EXCLUDED.name
RETURNING id;

It's atomic — no "check then insert" gap where two requests both think the row is missing. EXCLUDED (stmt.excluded) refers to the values you tried to insert, so you can selectively copy them in. RETURNING hands back the id (or whole row) in the same trip, so you don't need a follow-up SELECT.

index_elements must name a real unique constraint or unique index — conflict on a non-unique column is undefined and Postgres rejects it. Import insert from sqlalchemy.dialects.postgresql, not the generic sqlalchemy.insert; only the dialect one has on_conflict_*.

Soft delete — deleted_at instead of DELETE

When: the business needs an undo, an audit trail, or "show deleted" — invoices, user accounts, anything legal or recoverable. Skip it for high-churn or privacy-sensitive data where a real delete is simpler and safer.

models.py
from datetime import datetime
from sqlalchemy.orm import Mapped, mapped_column


class Invoice(Base):
    __tablename__ = "invoices"
    id: Mapped[int] = mapped_column(primary_key=True)
    deleted_at: Mapped[datetime | None] = mapped_column(default=None)
queries.py — soft-delete + always filter
from datetime import datetime, UTC
from sqlalchemy import select

# "delete" = set the timestamp
inv.deleted_at = datetime.now(UTC)
session.commit()

# every read must exclude soft-deleted rows
live = session.scalars(
    select(Invoice).where(Invoice.deleted_at.is_(None))
).all()

A NULL deleted_at means "live", a timestamp means "deleted and when". You keep history, support undo (just null it back out), and never lose referential context. Add a partial index WHERE deleted_at IS NULL so the common "live rows" query stays fast.

The discipline is remembering the filter every single time — one forgotten WHERE deleted_at IS NULL and deleted rows reappear in a report. Unique constraints also still see the dead rows, so a "deleted" email blocks re-registration. For consistency, centralise the filter (a base query helper or SQLAlchemy event) rather than hand-writing it everywhere.

One transaction per unit of work

When: two or more writes must succeed or fail together — debit one account and credit another, create an order and its line items, write a record and bump a counter. Wrap them so a crash can't leave half-done state.

service.py
# session.begin() opens a transaction; the block commits on
# success and rolls back on any exception — automatically.
with session.begin():
    session.add(Order(user_id=1, total=99))
    session.add(LineItem(sku="A1", qty=2))
    bump_inventory(session, sku="A1", by=-2)
# commit happened here — or everything rolled back together
explicit form (when you need finer control)
try:
    session.add(order)
    session.add(line_item)
    session.commit()
except Exception:
    session.rollback()   # leave the DB exactly as it was
    raise

Grouping related writes in one transaction gives you atomicity: Postgres either applies all of them or none. with session.begin(): is the clean form — no stray commit(), and any exception triggers a rollback for you. One unit of work = one transaction is the mental model.

Don't commit() after every single write inside a multi-step operation — a failure on step 3 then leaves steps 1–2 permanently applied. And keep transactions short: holding one open across a slow HTTP call locks rows and starves the connection pool.

Bulk insert — thousands of rows in one statement

When: importing a CSV, seeding fixtures, writing a batch of embeddings/log rows. Looping session.add() over 10,000 objects is dramatically slower than one bulk statement.

import_rows.py
from sqlalchemy import insert

rows = [
    {"title": "Doc A", "body": "..."},
    {"title": "Doc B", "body": "..."},
    # ... thousands more, plain dicts
]

# one INSERT, all rows bound as parameters
session.execute(insert(Document), rows)
session.commit()

Passing a list of dicts as the second arg to session.execute(insert(Model), rows) uses SQLAlchemy's fast executemany path — it skips per-object ORM bookkeeping (identity map, flush ordering, autoflush) and sends the rows in tightly batched statements. Easily 10–50× faster than add() in a loop.

Bulk insert bypasses the ORM: no default= Python-side defaults fire, no relationship cascades run, and you don't get the inserted objects back unless you add .returning(...). Use it for dumb, flat row data — not when you need ORM events or in-Python defaults.

Performance 3 recipes

Connection pool config — pre_ping for serverless Postgres

When: any real deployment, and mandatory on Neon / Supabase / any managed Postgres that drops idle connections. Set the pool once when you create the engine.

db.py
import os
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

engine = create_engine(
    os.environ["DATABASE_URL"],
    pool_size=5,          # persistent connections kept open
    max_overflow=10,      # extra burst connections under load
    pool_pre_ping=True,   # test a connection before lending it
    pool_recycle=1800,    # recycle conns older than 30 min
)

SessionLocal = sessionmaker(bind=engine)

pool_pre_ping=True sends a tiny test query before handing out a connection and transparently reconnects if it's dead — which is exactly what saves you when Neon/Supabase has silently closed an idle socket. pool_size is steady state; max_overflow is the burst headroom before requests queue.

Sizing the pool too big starves Postgres: total connections across all app instances must stay under the server's max_connections. On serverless/edge where many short-lived instances each open a pool, point them at a pooler (PgBouncer / Supabase pooler) and use NullPool in the app, or you'll exhaust connections fast.

Indexes that actually get used

When: a query is slow and you suspect a missing or wrong index. The fix is rarely "add an index" — it's the right index, with the right column order, confirmed by EXPLAIN.

migration / DDL
-- composite index: most-selective / equality column FIRST,
-- range/sort column LAST. Order matters.
CREATE INDEX ix_docs_owner_created
  ON documents (owner_id, created_at DESC);

-- partial index: only the rows you actually query
CREATE INDEX ix_docs_active
  ON documents (created_at) WHERE deleted_at IS NULL;
declare it on the model
from sqlalchemy import Index

class Document(Base):
    __tablename__ = "documents"
    __table_args__ = (
        Index("ix_docs_owner_created", "owner_id", "created_at"),
    )
confirm it's used
EXPLAIN ANALYZE
SELECT * FROM documents
WHERE owner_id = 1 ORDER BY created_at DESC LIMIT 20;
-- look for "Index Scan", NOT "Seq Scan"

Composite index column order follows the equality-then-range rule: columns you filter with = go first, the column you range/sort on goes last — that's why (owner_id, created_at) serves "this owner's docs, newest first" in one scan. A partial index (WHERE deleted_at IS NULL) is smaller and faster because it only indexes the rows you actually query.

An index Postgres never uses is pure write overhead. Wrapping a column in a function (WHERE lower(email) = …) disables a plain index unless you build a matching expression index. Always confirm with EXPLAIN ANALYZE on realistic data — a Seq Scan on a big table means your index isn't being chosen.

Primary sources ⭐ PostgreSQL 17 documentation — the canonical reference for INSERT … ON CONFLICT, full-text search, and indexes. Pair it with the SQLAlchemy 2.0 ORM docs for the Mapped / select() patterns and the connection pooling reference.