Module 3 · Postgres & Data · Drills
Auth is the one area where "I read it" isn't enough — fumble it live and you leak data. Type every drill yourself, run it, and confirm the failure cases (bad token, expired token) actually 401 before you reveal the solution.
passlib[bcrypt] and python-jose[cryptography] installed, and a SECRET_KEY in your environment. Tick each box as you go; your progress is saved in this browser.
Drill 1 hashing
Hash the password "hunter2pls" with passlib's bcrypt context, then verify it twice: once with the right password (expect True) and once with a wrong one (expect False).
from passlib.context import CryptContext pwd = CryptContext(schemes=["bcrypt"], deprecated="auto") hashed = pwd.hash("hunter2pls") print(pwd.verify("hunter2pls", hashed)) # True print(pwd.verify("wrong", hashed)) # False
Hash on register, verify on login. You never compare hashes directly — verify re-hashes the candidate and checks in constant time.
Drill 2 jwt
Encode a JWT whose payload is {"sub": "sam@uae.dev"} plus an exp 30 minutes from now. Then decode it back and print the sub.
import os from datetime import datetime, timedelta, timezone from jose import jwt SECRET = os.environ["SECRET_KEY"] exp = datetime.now(timezone.utc) + timedelta(minutes=30) token = jwt.encode({"sub": "sam@uae.dev", "exp": exp}, SECRET, algorithm="HS256") data = jwt.decode(token, SECRET, algorithms=["HS256"]) print(data["sub"]) # sam@uae.dev
decode verifies the signature and the exp in one call. Paste the token into jwt.io and see the payload is readable — that's why no secrets go in it.
Drill 3 expiry
Create a token that's already expired (set exp to one minute ago) and confirm decoding it raises an error rather than returning the payload.
from jose import jwt, JWTError past = datetime.now(timezone.utc) - timedelta(minutes=1) token = jwt.encode({"sub": "sam@uae.dev", "exp": past}, SECRET, algorithm="HS256") try: jwt.decode(token, SECRET, algorithms=["HS256"]) except JWTError as e: print("rejected:", e) # rejected: Signature has expired.
You never hand-check the clock — the library enforces exp for you and raises. Your job is to catch it and turn it into a 401.
Drill 4 dependency
Write a get_current_user dependency: it takes the bearer token, decodes it, reads sub, looks the user up, and returns it — raising 401 if anything is off.
from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from jose import jwt, JWTError oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login") def get_current_user(token: str = Depends(oauth2_scheme)): err = HTTPException(status.HTTP_401_UNAUTHORIZED, "Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}) try: email = jwt.decode(token, SECRET, algorithms=["HS256"]).get("sub") except JWTError: raise err user = users.get(email) if user is None: raise err return user
One catch-all err for every failure path (bad signature, expired, missing sub, unknown user) — never reveal which check failed.
Drill 5 401
Given a get_current_user that decodes a token, make it return a proper 401 (with the WWW-Authenticate: Bearer header) when the token is garbage — not a 500.
# Wrap the decode in try/except — a bad token raises JWTError, # which you translate into a 401 (NOT an uncaught 500). try: payload = jwt.decode(token, SECRET, algorithms=["HS256"]) except JWTError: raise HTTPException( status.HTTP_401_UNAUTHORIZED, "Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, )
Test it: curl /me -H "Authorization: Bearer not-a-real-token" should return 401, not crash.
Drill 6 protect
Protect a GET /me endpoint so it only works with a valid token, and have it return the current user's email. Add Depends(get_current_user) — that's the whole guard.
from fastapi import FastAPI, Depends app = FastAPI() @app.get("/me") def read_me(user = Depends(get_current_user)): return {"email": user["email"]}
No token in the header → FastAPI's OAuth2PasswordBearer rejects with 401 before your code even runs. Adding auth to any route is now a one-line decision.
auth.py. By the end you can create an account, exchange credentials for a JWT, and use that JWT to reach a guarded route — the gate every DocChat endpoint will sit behind.
Build · the full auth flow, step by step
Step 1 — setup. One context for hashing, one scheme for the bearer token, a secret from the environment, and a user store (an in-memory dict now; your Postgres model later).
auth.py
import os from datetime import datetime, timedelta, timezone from fastapi import FastAPI, Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from passlib.context import CryptContext from jose import jwt, JWTError from pydantic import BaseModel SECRET = os.environ["SECRET_KEY"] ALGO, TTL = "HS256", 30 pwd = CryptContext(schemes=["bcrypt"], deprecated="auto") oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login") app = FastAPI() users: dict = {}
Step 2 — register hashes the password and stores the hash (never the raw text, never returned).
class Register(BaseModel): email: str password: str @app.post("/register") def register(body: Register): if body.email in users: raise HTTPException(status.HTTP_400_BAD_REQUEST, "Email already registered") users[body.email] = {"email": body.email, "hashed_password": pwd.hash(body.password)} return {"email": body.email}
Step 3 — login verifies the password and, on success, issues a short-lived signed JWT.
@app.post("/login") def login(form: OAuth2PasswordRequestForm = Depends()): user = users.get(form.username) if not user or not pwd.verify(form.password, user["hashed_password"]): raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Bad credentials") exp = datetime.now(timezone.utc) + timedelta(minutes=TTL) token = jwt.encode({"sub": user["email"], "exp": exp}, SECRET, algorithm=ALGO) return {"access_token": token, "token_type": "bearer"}
Step 4 — guard decodes the token, loads the user, and 401s on any failure. Then protect /me.
def get_current_user(token: str = Depends(oauth2_scheme)): err = HTTPException(status.HTTP_401_UNAUTHORIZED, "Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}) try: email = jwt.decode(token, SECRET, algorithms=[ALGO]).get("sub") except JWTError: raise err user = users.get(email) if user is None: raise err return user @app.get("/me") def me(user = Depends(get_current_user)): return {"email": user["email"]}
Step 5 — walk the flow with curl and confirm both the happy path and the 401s.
# one-off: generate and export a strong secret export SECRET_KEY=$(python -c "import secrets; print(secrets.token_hex(32))") uvicorn auth:app --reload # 1. register (JSON body) curl -X POST localhost:8000/register -H "Content-Type: application/json" \ -d '{"email":"sam@uae.dev","password":"hunter2pls"}' # 2. login (FORM body) -> grab the access_token curl -X POST localhost:8000/login -d "username=sam@uae.dev&password=hunter2pls" # 3. access the protected route with the token curl localhost:8000/me -H "Authorization: Bearer eyJ..." # {"email":"sam@uae.dev"} # 4. confirm a bad token is rejected curl -i localhost:8000/me -H "Authorization: Bearer garbage" # HTTP/1.1 401
Notice login takes form fields (username/password) because that's the OAuth2 password-flow contract, while register takes JSON. Swap the users dict for your SQLAlchemy model and this is production-shaped.
Click a card to flip it. Say the answer out loud before you flip — that's the rep that builds storage strength.
header . payload . signature — the signature is what makes it tamper-proof.os.environ["SECRET_KEY"]), git-ignored — never hard-coded.exp claim do?jwt.decode auto-rejects once it passes, raising JWTError.get_current_user for?Depends.Tick each only if you can do it without looking:
exp, and name its three partsget_current_user dependency that returns 401 on a bad or expired token/me and tested the failure cases