Cookbook · Postgres · 2026
The DB recipes you reach for in every project: timestamp mixins, UUID keys, the two kinds of pagination, upsert, soft delete, killing N+1, fast counts, transactions, full-text search, bulk insert, pool config, and indexes that actually get used. Every recipe is SQLAlchemy 2.0 (Mapped / mapped_column / select()) with the raw SQL shown where it earns its place.
session.query(...) here. Where raw SQL makes the intent clearer (upsert, keyset, full-text), it sits next to its SQLAlchemy equivalent so you can drop either into DocChat or any FastAPI app.
When: almost always. The moment a table holds data you'll want to know when a row was made and last touched — for audit, sorting, and "what changed". Define it once, mix it into every model.
mixins.py
from datetime import datetime from sqlalchemy import func from sqlalchemy.orm import Mapped, mapped_column class TimestampMixin: """Add created_at / updated_at to any model. Postgres fills both.""" created_at: Mapped[datetime] = mapped_column( server_default=func.now() ) updated_at: Mapped[datetime] = mapped_column( server_default=func.now(), onupdate=func.now(), # bumped on every UPDATE )
models.py — reuse it
class Document(TimestampMixin, Base): __tablename__ = "documents" id: Mapped[int] = mapped_column(primary_key=True) title: Mapped[str]
server_default=func.now() means Postgres sets the timestamp, not Python — so a raw INSERT or a different service writing the same table still gets a correct value. onupdate=func.now() is emitted by SQLAlchemy on every ORM UPDATE, keeping updated_at honest without you touching it.
onupdate= fires only on ORM-tracked updates. A bulk update() statement or a raw UPDATE straight to Postgres will not bump updated_at unless you set it in the statement or add a DB trigger. Don't assume the column self-maintains under bulk writes.
When: reach for UUID when IDs are exposed in URLs/APIs, generated client-side, or must merge across shards/services without collision. Stick with bigint identity for internal, high-write, append-only tables where index locality matters.
models.py
import uuid from sqlalchemy.orm import Mapped, mapped_column # UUID PK — opaque, unguessable, safe to expose class ApiKey(Base): __tablename__ = "api_keys" id: Mapped[uuid.UUID] = mapped_column( primary_key=True, default=uuid.uuid4 ) # bigint identity — compact, sequential, great index locality class Event(Base): __tablename__ = "events" id: Mapped[int] = mapped_column(primary_key=True) # GENERATED … AS IDENTITY
SQLAlchemy maps Mapped[uuid.UUID] to Postgres' native uuid type (16 bytes, not a 36-char string). UUIDs hide row counts and growth rate from anyone reading a URL, and let two services mint IDs without coordinating. Bigint stays smaller and keeps inserts at the "right" end of the B-tree.
Random uuid4 keys scatter inserts across the index, hurting cache locality and bloating it on very high-write tables. If you want UUID benefits and ordered inserts, use a time-ordered UUIDv7 (default=uuid.uuid7 on Python 3.14+, or a helper) rather than uuid4.
When: a paged list UI where users jump to arbitrary page numbers and the table is small-to-medium (tens of thousands of rows). The default, boring, correct choice for most admin lists.
raw SQL
SELECT * FROM documents ORDER BY created_at DESC LIMIT 20 OFFSET 40; -- page 3, size 20
SQLAlchemy 2.0
from sqlalchemy import select, func def list_documents(session, page: int, size: int = 20): stmt = ( select(Document) .order_by(Document.created_at.desc()) .limit(size) .offset((page - 1) * size) ) items = session.scalars(stmt).all() # total for the page count / "X of Y" header total = session.scalar( select(func.count()).select_from(Document) ) return {"items": items, "total": total, "page": page}
Always include a deterministic ORDER BY — without one, OFFSET returns arbitrary rows and pages overlap or drop records. Run the count as a separate cheap query (see the exists / count recipe) rather than materialising every row.
OFFSET still scans and discards every skipped row. OFFSET 1000000 reads a million rows before returning twenty — fine for page 3, brutal for deep pages on a big table. For that, use keyset pagination instead.
When: "load more" / infinite scroll, or any large table where users page deep. It stays fast at page 1 and page 100,000 alike because it never counts past rows.
raw SQL — page after the last seen (created_at, id)
SELECT * FROM documents WHERE (created_at, id) < (:last_created, :last_id) ORDER BY created_at DESC, id DESC LIMIT 20;
SQLAlchemy 2.0 — tuple_() comparison
from sqlalchemy import select, tuple_ def page_after(session, last_created=None, last_id=None, size=20): stmt = select(Document).order_by( Document.created_at.desc(), Document.id.desc() ).limit(size) if last_created is not None: # first page has no cursor stmt = stmt.where( tuple_(Document.created_at, Document.id) < tuple_(last_created, last_id) ) return session.scalars(stmt).all()
The composite (created_at, id) comparison gives a total order even when timestamps tie, so you never skip or repeat a row. With an index on (created_at DESC, id DESC) Postgres seeks straight to the cursor — constant time regardless of depth. Pass the last row's created_at/id back as the "next cursor".
Keyset can't jump to "page 47" — it only goes forward/back from a cursor. If the UI needs numbered page links, you're stuck with offset (or fake it). Also: the ORDER BY columns must exactly match the index column order, or Postgres falls back to a sort.
When: you load a list of parents and then touch a relationship on each (authors → their books, orders → their lines). The default lazy load fires one query per parent — the N+1 trap.
queries.py
from sqlalchemy import select from sqlalchemy.orm import selectinload, joinedload # Collections (one-to-many / many-to-many) → selectinload # one extra query: ... WHERE author_id IN (...) authors = session.scalars( select(Author).options(selectinload(Author.books)) ).all() for a in authors: print(a.name, len(a.books)) # no extra query — already loaded # Many-to-one (the single parent) → joinedload (one JOIN) books = session.scalars( select(Book).options(joinedload(Book.author)) ).all()
Collections want selectinload — it emits one extra WHERE id IN (...) query and never multiplies rows. Many-to-one wants joinedload — a single JOIN pulls the parent in the same round trip. Both collapse N+1 into a constant number of queries.
Don't joinedload a collection: the join repeats the parent row once per child, bloating the result set and confusing LIMIT. And eager options don't chain magically across two hops — nest them: selectinload(Author.books).selectinload(Book.reviews).
When: you need to know "does any row match?" or "how many?" — for a guard clause, a badge count, or a pagination total. Don't load rows just to count them.
queries.py
from sqlalchemy import select, exists, func # EXISTS — stops at the first match, returns a bool has_admin = session.scalar( select(exists().where(User.role == "admin")) ) # COUNT — done in Postgres, returns an int active = session.scalar( select(func.count()).select_from(User).where(User.active) )
raw SQL equivalents
SELECT EXISTS(SELECT 1 FROM users WHERE role = 'admin'); SELECT count(*) FROM users WHERE active;
session.scalar(...) returns the single value directly — a bool for exists(), an int for count(). EXISTS short-circuits at the first matching row, so it's far cheaper than counting when you only need yes/no.
len(session.scalars(stmt).all()) drags every matching row across the wire and builds full ORM objects just to throw them away. On a big table that's the difference between a 2 ms count and loading a million rows into memory.
When: "insert it, or update it if it already exists" — syncing external data, idempotent webhooks, counters, settings keyed by a unique column. One atomic statement instead of a read-then-write race.
SQLAlchemy 2.0 — postgresql dialect insert
from sqlalchemy.dialects.postgresql import insert stmt = insert(User).values( email="sam@uae.dev", name="Sam" ) stmt = stmt.on_conflict_do_update( index_elements=[User.email], # the unique constraint set_={"name": stmt.excluded.name}, # EXCLUDED = the row we tried to insert ).returning(User.id) new_id = session.scalar(stmt) session.commit()
raw SQL
INSERT INTO users (email, name) VALUES ('sam@uae.dev', 'Sam') ON CONFLICT (email) DO UPDATE SET name = EXCLUDED.name RETURNING id;
It's atomic — no "check then insert" gap where two requests both think the row is missing. EXCLUDED (stmt.excluded) refers to the values you tried to insert, so you can selectively copy them in. RETURNING hands back the id (or whole row) in the same trip, so you don't need a follow-up SELECT.
index_elements must name a real unique constraint or unique index — conflict on a non-unique column is undefined and Postgres rejects it. Import insert from sqlalchemy.dialects.postgresql, not the generic sqlalchemy.insert; only the dialect one has on_conflict_*.
When: the business needs an undo, an audit trail, or "show deleted" — invoices, user accounts, anything legal or recoverable. Skip it for high-churn or privacy-sensitive data where a real delete is simpler and safer.
models.py
from datetime import datetime from sqlalchemy.orm import Mapped, mapped_column class Invoice(Base): __tablename__ = "invoices" id: Mapped[int] = mapped_column(primary_key=True) deleted_at: Mapped[datetime | None] = mapped_column(default=None)
queries.py — soft-delete + always filter
from datetime import datetime, UTC from sqlalchemy import select # "delete" = set the timestamp inv.deleted_at = datetime.now(UTC) session.commit() # every read must exclude soft-deleted rows live = session.scalars( select(Invoice).where(Invoice.deleted_at.is_(None)) ).all()
A NULL deleted_at means "live", a timestamp means "deleted and when". You keep history, support undo (just null it back out), and never lose referential context. Add a partial index WHERE deleted_at IS NULL so the common "live rows" query stays fast.
The discipline is remembering the filter every single time — one forgotten WHERE deleted_at IS NULL and deleted rows reappear in a report. Unique constraints also still see the dead rows, so a "deleted" email blocks re-registration. For consistency, centralise the filter (a base query helper or SQLAlchemy event) rather than hand-writing it everywhere.
When: two or more writes must succeed or fail together — debit one account and credit another, create an order and its line items, write a record and bump a counter. Wrap them so a crash can't leave half-done state.
service.py
# session.begin() opens a transaction; the block commits on # success and rolls back on any exception — automatically. with session.begin(): session.add(Order(user_id=1, total=99)) session.add(LineItem(sku="A1", qty=2)) bump_inventory(session, sku="A1", by=-2) # commit happened here — or everything rolled back together
explicit form (when you need finer control)
try: session.add(order) session.add(line_item) session.commit() except Exception: session.rollback() # leave the DB exactly as it was raise
Grouping related writes in one transaction gives you atomicity: Postgres either applies all of them or none. with session.begin(): is the clean form — no stray commit(), and any exception triggers a rollback for you. One unit of work = one transaction is the mental model.
Don't commit() after every single write inside a multi-step operation — a failure on step 3 then leaves steps 1–2 permanently applied. And keep transactions short: holding one open across a slow HTTP call locks rows and starves the connection pool.
When: importing a CSV, seeding fixtures, writing a batch of embeddings/log rows. Looping session.add() over 10,000 objects is dramatically slower than one bulk statement.
import_rows.py
from sqlalchemy import insert rows = [ {"title": "Doc A", "body": "..."}, {"title": "Doc B", "body": "..."}, # ... thousands more, plain dicts ] # one INSERT, all rows bound as parameters session.execute(insert(Document), rows) session.commit()
Passing a list of dicts as the second arg to session.execute(insert(Model), rows) uses SQLAlchemy's fast executemany path — it skips per-object ORM bookkeeping (identity map, flush ordering, autoflush) and sends the rows in tightly batched statements. Easily 10–50× faster than add() in a loop.
Bulk insert bypasses the ORM: no default= Python-side defaults fire, no relationship cascades run, and you don't get the inserted objects back unless you add .returning(...). Use it for dumb, flat row data — not when you need ORM events or in-Python defaults.
When: "search the documents" on a few hundred thousand rows. Postgres' built-in full-text search is free, transactional, and good enough long before you need a separate search cluster.
migration — generated column + GIN index
-- a tsvector column Postgres keeps in sync automatically ALTER TABLE documents ADD COLUMN search tsvector GENERATED ALWAYS AS (to_tsvector('english', title || ' ' || body)) STORED; -- GIN index makes the @@ match fast CREATE INDEX documents_search_idx ON documents USING GIN (search);
SQLAlchemy 2.0 — query it
from sqlalchemy import select, func, text q = "quarterly & report" # to_tsquery operators: & | ! stmt = select(Document).where( Document.search.op("@@")(func.to_tsquery("english", q)) ).order_by( func.ts_rank(Document.search, func.to_tsquery("english", q)).desc() ) hits = session.scalars(stmt).all()
A GENERATED ALWAYS AS … STORED column means Postgres rebuilds the tsvector on every write — no triggers, no app code to keep it fresh. The GIN index makes the @@ match query fast, and ts_rank gives you relevance ordering. This handles stemming and stop-words out of the box.
Use plainto_tsquery (or websearch_to_tsquery) for raw user input — to_tsquery expects operator syntax and throws on a bare phrase like "quarterly report". And full-text isn't substring/fuzzy: it matches whole lexemes, so for typo-tolerance you want pg_trgm instead.
When: any real deployment, and mandatory on Neon / Supabase / any managed Postgres that drops idle connections. Set the pool once when you create the engine.
db.py
import os from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker engine = create_engine( os.environ["DATABASE_URL"], pool_size=5, # persistent connections kept open max_overflow=10, # extra burst connections under load pool_pre_ping=True, # test a connection before lending it pool_recycle=1800, # recycle conns older than 30 min ) SessionLocal = sessionmaker(bind=engine)
pool_pre_ping=True sends a tiny test query before handing out a connection and transparently reconnects if it's dead — which is exactly what saves you when Neon/Supabase has silently closed an idle socket. pool_size is steady state; max_overflow is the burst headroom before requests queue.
Sizing the pool too big starves Postgres: total connections across all app instances must stay under the server's max_connections. On serverless/edge where many short-lived instances each open a pool, point them at a pooler (PgBouncer / Supabase pooler) and use NullPool in the app, or you'll exhaust connections fast.
When: a query is slow and you suspect a missing or wrong index. The fix is rarely "add an index" — it's the right index, with the right column order, confirmed by EXPLAIN.
migration / DDL
-- composite index: most-selective / equality column FIRST, -- range/sort column LAST. Order matters. CREATE INDEX ix_docs_owner_created ON documents (owner_id, created_at DESC); -- partial index: only the rows you actually query CREATE INDEX ix_docs_active ON documents (created_at) WHERE deleted_at IS NULL;
declare it on the model
from sqlalchemy import Index class Document(Base): __tablename__ = "documents" __table_args__ = ( Index("ix_docs_owner_created", "owner_id", "created_at"), )
confirm it's used
EXPLAIN ANALYZE SELECT * FROM documents WHERE owner_id = 1 ORDER BY created_at DESC LIMIT 20; -- look for "Index Scan", NOT "Seq Scan"
Composite index column order follows the equality-then-range rule: columns you filter with = go first, the column you range/sort on goes last — that's why (owner_id, created_at) serves "this owner's docs, newest first" in one scan. A partial index (WHERE deleted_at IS NULL) is smaller and faster because it only indexes the rows you actually query.
An index Postgres never uses is pure write overhead. Wrapping a column in a function (WHERE lower(email) = …) disables a plain index unless you build a matching expression index. Always confirm with EXPLAIN ANALYZE on realistic data — a Seq Scan on a big table means your index isn't being chosen.
Mapped / select() patterns and the connection pooling reference.