Module 2 · FastAPI · Deep Dive
The capstone of Module 2. The four FastAPI features that turn a toy API into the real DocChat backend — getting PDFs in, streaming AI answers out, and keeping clients open for the whole app's life.
IntermediateAdvancedBuild
This is the feature for DocChat — no upload, no documents, no chat. FastAPI handles uploads with UploadFile, a type you put straight into your function signature. Because a file arrives as multipart form data (not JSON), you need one extra package installed:
# required for any form/file handling — install it once
pip install python-multipart
The simplest possible route. UploadFile is an async, spooled file — small files stay in memory, large ones spill to a temp file on disk, so you don't blow up RAM on a 200 MB PDF:
from fastapi import FastAPI, UploadFile app = FastAPI() @app.post("/upload") async def upload(file: UploadFile): contents = await file.read() # bytes of the whole file return { "filename": file.filename, "content_type": file.content_type, "size": len(contents), }
Note await file.read() — the file methods are async, so the route must be async def. Useful attributes on every UploadFile:
| Attribute / method | What it gives you |
|---|---|
file.filename | Original name, e.g. "report.pdf". |
file.content_type | MIME type the browser sent, e.g. "application/pdf". |
await file.read() | Reads the whole file into bytes. Fine for small files. |
await file.read(1024) | Reads one chunk — call in a loop to stream a big file. |
await file.seek(0) | Rewind to the start (after reading, before re-reading). |
await file.read() pulls the entire file into memory at once — simple, and fine for a few-MB PDF. For genuinely large files, stream it out to disk in chunks so memory stays flat:
async def save_to_disk(file: UploadFile, dest: str): with open(dest, "wb") as out: while chunk := await file.read(1024 * 1024): # 1 MB at a time out.write(chunk)
Never trust the upload. Reject anything that isn't a PDF, and cap the size so nobody DOSes you with a 5 GB file. Raise HTTPException with the right status code:
from fastapi import FastAPI, UploadFile, HTTPException MAX_BYTES = 20 * 1024 * 1024 # 20 MB @app.post("/upload") async def upload(file: UploadFile): if file.content_type != "application/pdf": raise HTTPException(415, "Only PDF files are accepted") contents = await file.read() if len(contents) > MAX_BYTES: raise HTTPException(413, "File too large (20 MB max)") # ... hand off to the ingestion pipeline ... return {"filename": file.filename, "status": "received"}
content_type, so a malicious client can lie. For production, also sniff the real bytes (the first few bytes of a PDF are %PDF) or use a library like python-magic. For DocChat's MVP, the header check is a fine first gate.
Accept several files at once by typing the parameter as list[UploadFile]:
@app.post("/upload-many") async def upload_many(files: list[UploadFile]): return [{"name": f.filename} for f in files]
You'll sometimes see File() used as well — it's the marker that tells FastAPI "this comes from the request body as a file", and it lets you attach metadata or accept raw bytes instead of an UploadFile. With a plain UploadFile annotation you don't need it; use it when you want extra validation:
from typing import Annotated from fastapi import File, UploadFile @app.post("/upload") async def upload( file: Annotated[UploadFile, File(description="The PDF to ingest")], ): return {"filename": file.filename}PHP bridge:
UploadFile ≈ PHP's $_FILES['file'], but typed and async. file.filename ≈ $_FILES['file']['name'], await file.read() ≈ file_get_contents($_FILES['file']['tmp_name']). The big upgrade: it's a validated parameter, not a magic superglobal you have to remember to sanitize.
DocChat needs a database connection pool and an embedding/LLM client. Opening those on every request would be slow and wasteful — you want to open them once when the app boots and close them cleanly on shutdown. The modern way is a lifespan: an async context manager you hand to FastAPI().
from contextlib import asynccontextmanager from fastapi import FastAPI @asynccontextmanager async def lifespan(app: FastAPI): # ---- startup: runs ONCE before the first request ---- app.state.db = await open_db_pool() app.state.embedder = load_embedding_client() yield # app runs here, serving requests # ---- shutdown: runs ONCE when the app stops ---- await app.state.db.close() app = FastAPI(lifespan=lifespan)
Everything before yield is startup; everything after is shutdown. Store shared objects on app.state and reach them from any route via request.app.state.
@app.on_event("startup") and @app.on_event("shutdown"). Those are deprecated. The current, recommended pattern is the lifespan context manager shown above — it keeps startup and shutdown logic together and plays nicely with shared resources. If an interviewer sees on_event in your code, they'll clock you as out of date.
index.php.
When the LLM answers a DocChat question, it generates the response token by token. If you wait for the whole answer before sending it, the user stares at a spinner for 10 seconds. If you stream it, words appear as they're generated — the exact ChatGPT experience, and a huge UX win. FastAPI does this with StreamingResponse wrapping an async generator:
from fastapi import FastAPI from fastapi.responses import StreamingResponse @app.get("/chat") async def chat(q: str): async def token_stream(): async for token in llm.stream(q): # LLM yields chunks yield token # send each one immediately return StreamingResponse(token_stream(), media_type="text/plain")
The generator yields pieces; FastAPI flushes each to the client the moment it's produced, without buffering the whole thing.
For a browser chat UI, the cleanest transport is Server-Sent Events: a one-way stream the browser consumes with the native EventSource API. The media type is text/event-stream, and each message is formatted as data: ...\n\n:
@app.get("/chat") async def chat(q: str): async def event_stream(): async for token in llm.stream(q): yield f"data: {token}\n\n" # SSE frame format yield "data: [DONE]\n\n" # tell the client we're finished return StreamingResponse( event_stream(), media_type="text/event-stream", )
On the browser side, jQuery isn't the tool — you use the native EventSource:
const es = new EventSource("/chat?q=hello"); es.onmessage = (e) => { if (e.data === "[DONE]") return es.close(); output.textContent += e.data; // append each token as it arrives };
echo + flush() in a loop, but it fights the request model the whole way. An async generator + StreamingResponse is the native, non-blocking version — the server keeps serving other requests while this stream trickles out.
By default FastAPI returns its own JSON for errors, which is fine but generic. To return a consistent error shape across your whole API — so the frontend always parses errors the same way — register custom handlers with @app.exception_handler(...):
from fastapi import FastAPI, Request from fastapi.responses import JSONResponse class DocumentNotFound(Exception): def __init__(self, doc_id: str): self.doc_id = doc_id @app.exception_handler(DocumentNotFound) async def not_found_handler(request: Request, exc: DocumentNotFound): return JSONResponse( status_code=404, content={"error": "document_not_found", "id": exc.doc_id}, )
Now anywhere in your code you can raise DocumentNotFound(doc_id) and the handler turns it into clean JSON. You can also override the handler for FastAPI's built-in RequestValidationError (raised when an incoming body fails Pydantic validation) to reshape those 422 responses:
from fastapi.exceptions import RequestValidationError @app.exception_handler(RequestValidationError) async def validation_handler(request: Request, exc: RequestValidationError): return JSONResponse( status_code=422, content={"error": "validation_failed", "detail": exc.errors()}, )PHP bridge: this is a global
set_exception_handler() / a Laravel Handler::render() — one place that catches a type of error and renders a uniform response, instead of try/catch scattered through every route.
Sometimes you want to do something after sending the response — fire-and-forget work the client doesn't need to wait on. FastAPI's BackgroundTasks handles the light cases: log an event, send a confirmation email after an upload:
from fastapi import BackgroundTasks, UploadFile def send_confirmation(email: str, filename: str): # quick, non-critical side effect ... @app.post("/upload") async def upload(file: UploadFile, tasks: BackgroundTasks): # ... save the file ... tasks.add_task(send_confirmation, "user@x.com", file.filename) return {"status": "received"} # response sent now; task runs after
BackgroundTasks runs inside your web server process. That's fine for a 50 ms email, but wrong for DocChat's real work — embedding a 200-page PDF can take two minutes. If you do that in a background task, you tie up a server worker, and a crash or restart loses the job silently with no retry. When work is heavy, slow, or must not be lost, you graduate to a real task queue: Celery or RQ with Redis. The request hands the job to a queue and returns instantly; separate worker processes pick it up, retry on failure, and scale independently of your web servers.
The rule of thumb: BackgroundTasks for short, fire-and-forget side effects; Celery/RQ for anything heavy, long, retryable, or that must survive a restart. The PDF ingestion pipeline belongs in a queue.
"A user uploads a 200-page PDF. Embedding it takes two minutes. How do you handle the request so it doesn't time out?"
You never do the heavy work inside the request. The pattern:
"queued".202 Accepted — the response carries a job_id. The request is done in milliseconds; nothing times out."done" (or "failed", with retry).GET /jobs/{job_id} until status is done, or via a webhook/push notification if you have one.The one-liner: accept → 202 → process off-request in a worker → poll or webhook. That phrase alone signals you've built real systems.
Two small things that make your API look professional.
Set the success code per route with status_code=. Picking the right one is a quiet signal of competence:
| Code | Meaning | Use it for |
|---|---|---|
201 | Created | A POST that created a resource (a new document record). |
202 | Accepted | Work accepted but not finished — the async ingest job above. |
204 | No Content | Success with nothing to return (a DELETE). |
from fastapi import status @app.post("/upload", status_code=status.HTTP_202_ACCEPTED) async def upload(file: UploadFile): return {"job_id": "abc123", "status": "queued"}
The current, recommended way to declare dependencies and parameter metadata is Annotated[...] — it's reusable and type-checker-friendly, and it's what the FastAPI docs use now:
from typing import Annotated from fastapi import Depends async def get_db(): # yield a connection from the pool opened in lifespan ... DB = Annotated[Connection, Depends(get_db)] # name it once @app.get("/docs") async def list_docs(db: DB): # reuse the alias everywhere return await db.fetch_documents()
db: Connection = Depends(get_db). It still works, but the Annotated form is preferred now: you can define the dependency type once (DB) and reuse it across dozens of routes without repeating Depends(...).
/upload route: accept a PDF, reject non-PDFs and oversized files, save the bytes, queue a background confirmation, and return 202 with a job id. This is the actual entry point of the whole product.
Drill build
Build a POST /upload route that:
UploadFile.415 if content_type isn't application/pdf.413 if the file is over 20 MB../uploads/ in 1 MB chunks.BackgroundTasks job, and returns 202 with a job_id.upload.py
from pathlib import Path from uuid import uuid4 from fastapi import ( FastAPI, UploadFile, HTTPException, BackgroundTasks, status, ) app = FastAPI() UPLOAD_DIR = Path("./uploads") UPLOAD_DIR.mkdir(exist_ok=True) MAX_BYTES = 20 * 1024 * 1024 def queue_ingest(job_id: str, path: Path): # in real life: hand this to Celery/RQ, not a background task ... @app.post("/upload", status_code=status.HTTP_202_ACCEPTED) async def upload(file: UploadFile, tasks: BackgroundTasks): if file.content_type != "application/pdf": raise HTTPException(415, "Only PDF files are accepted") job_id = str(uuid4()) dest = UPLOAD_DIR / f"{job_id}.pdf" written = 0 with open(dest, "wb") as out: while chunk := await file.read(1024 * 1024): written += len(chunk) if written > MAX_BYTES: out.close() dest.unlink(missing_ok=True) raise HTTPException(413, "File too large (20 MB max)") out.write(chunk) tasks.add_task(queue_ingest, job_id, dest) return {"job_id": job_id, "status": "queued"}
Note we check size while streaming, not after reading the whole thing into memory — that way an oversized upload never gets fully buffered.
Tap a card to flip it. Answer out loud first.
python-multipart. Forms and files arrive as multipart data, and FastAPI needs it to parse them.UploadFile — an async, spooled file. Large files spill to a temp file so you don't exhaust RAM.lifespan async context manager passed to FastAPI(lifespan=...). Code before yield = startup, after = shutdown. @app.on_event is deprecated.StreamingResponse wrapping an async generator that yields chunks. For browsers, use SSE with media type text/event-stream.202 Accepted — return it when you queue a job and process it off-request. Pair with a job_id the client polls.Answer from memory — retrieval is what turns "I read it" into "I know it".
Which package is required to accept uploads?
The modern startup/shutdown mechanism is the:
To stream an answer token-by-token you return a:
The media type for Server-Sent Events is:
For a two-minute PDF embed job you should: