Phase 4: Testing & Production Hardening
Testing & Production Hardening
Your TaskFlow API has authentication, database models, and CRUD endpoints. Before shipping it, you need two things: tests that prove it works and production hardening that keeps it running.
The Testing Pyramid for APIs
| Layer | What It Tests | Speed | Tools |
|---|---|---|---|
| Unit | Individual functions, validators, utilities | Fastest | pytest |
| Integration | Endpoint + database + auth together | Medium | httpx + TestClient |
| End-to-end | Full request lifecycle, multi-step flows | Slowest | httpx.AsyncClient |
For a REST API like TaskFlow, integration tests give you the most value. They exercise your routes, database queries, and authentication in one shot.
pytest 9.0 Fixtures and Async Testing
pytest uses fixtures to set up and tear down test dependencies. For FastAPI, you need three core fixtures: a test database, a test session, and a test client.
# tests/conftest.py
import pytest
from httpx import ASGITransport, AsyncClient
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from app.main import app
from app.database import Base, get_db
from app.config import settings
# Use a separate test database
TEST_DATABASE_URL = settings.database_url.replace(
"/taskflow", "/taskflow_test"
)
engine_test = create_async_engine(TEST_DATABASE_URL, echo=False)
async_session_test = async_sessionmaker(engine_test, class_=AsyncSession, expire_on_commit=False)
@pytest.fixture(scope="session", autouse=True)
async def setup_database():
"""Create all tables before tests, drop after."""
async with engine_test.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield
async with engine_test.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
@pytest.fixture
async def db_session():
"""Provide a transactional database session that rolls back after each test."""
async with async_session_test() as session:
yield session
await session.rollback()
@pytest.fixture
async def client(db_session):
"""HTTP test client with database dependency override."""
async def override_get_db():
yield db_session
app.dependency_overrides[get_db] = override_get_db
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
yield ac
app.dependency_overrides.clear()
TestClient vs httpx.AsyncClient
| Feature | TestClient (sync) |
httpx.AsyncClient (async) |
|---|---|---|
| Import | from fastapi.testclient import TestClient |
from httpx import AsyncClient |
| Async support | No (wraps sync calls) | Yes (native async/await) |
| Use when | Quick sync tests, no async deps | Async database sessions, real-world patterns |
| FastAPI recommendation | Simple cases | Preferred for async apps |
Since TaskFlow uses async SQLAlchemy, we use httpx.AsyncClient with ASGITransport.
Writing Effective API Tests
Structure your tests around actions and permissions:
# tests/test_auth.py
import pytest
@pytest.mark.anyio
async def test_register_user(client):
response = await client.post("/api/v1/auth/register", json={
"email": "test@example.com",
"password": "SecurePass123!",
"full_name": "Test User"
})
assert response.status_code == 201
assert response.json()["email"] == "test@example.com"
assert "password" not in response.json()
@pytest.mark.anyio
async def test_login_returns_token(client):
# Register first, then login
await client.post("/api/v1/auth/register", json={
"email": "login@example.com",
"password": "SecurePass123!",
"full_name": "Login User"
})
response = await client.post("/api/v1/auth/login", json={
"email": "login@example.com",
"password": "SecurePass123!"
})
assert response.status_code == 200
assert "access_token" in response.json()
@pytest.mark.anyio
async def test_protected_route_without_token(client):
response = await client.get("/api/v1/projects")
assert response.status_code == 401
Redis Caching: Cache-Aside Pattern
The cache-aside (lazy-loading) pattern is ideal for GET endpoints: check Redis first, fall back to the database, then cache the result.
# app/cache.py
import json
from redis.asyncio import Redis
redis_client = Redis(host="localhost", port=6379, db=0, decode_responses=True)
CACHE_TTL = 60 # seconds
async def get_cached(key: str) -> dict | None:
"""Return cached data or None."""
data = await redis_client.get(key)
return json.loads(data) if data else None
async def set_cached(key: str, value: dict, ttl: int = CACHE_TTL):
"""Cache data with TTL."""
await redis_client.set(key, json.dumps(value), ex=ttl)
async def invalidate_cache(pattern: str):
"""Delete all keys matching a pattern."""
keys = []
async for key in redis_client.scan_iter(match=pattern):
keys.append(key)
if keys:
await redis_client.delete(*keys)
Usage in a route:
@router.get("/projects")
async def list_projects(current_user: User = Depends(get_current_user)):
cache_key = f"projects:user:{current_user.id}"
cached = await get_cached(cache_key)
if cached:
return cached
projects = await project_service.list_for_user(current_user.id)
result = [ProjectSchema.model_validate(p).model_dump() for p in projects]
await set_cached(cache_key, result, ttl=60)
return result
When data changes, invalidate the cache:
@router.post("/projects", status_code=201)
async def create_project(data: ProjectCreate, current_user: User = Depends(get_current_user)):
project = await project_service.create(data, current_user.id)
await invalidate_cache(f"projects:user:{current_user.id}*")
return project
Error Handling Middleware
A global exception handler ensures every error returns consistent JSON instead of raw stack traces:
# app/middleware/error_handler.py
from fastapi import Request
from fastapi.responses import JSONResponse
from starlette.middleware.base import BaseHTTPMiddleware
import logging
logger = logging.getLogger(__name__)
class ErrorHandlerMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
try:
return await call_next(request)
except Exception as exc:
logger.exception(f"Unhandled error: {exc}")
return JSONResponse(
status_code=500,
content={
"error": "internal_server_error",
"message": "An unexpected error occurred",
"detail": str(exc) if settings.DEBUG else None,
},
)
Register it in main.py:
from app.middleware.error_handler import ErrorHandlerMiddleware
app.add_middleware(ErrorHandlerMiddleware)
Rate Limiting
Protect auth endpoints from brute-force attacks using slowapi:
# app/middleware/rate_limit.py
from slowapi import Limiter
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
# In your auth router:
@router.post("/login")
@limiter.limit("5/minute")
async def login(request: Request, credentials: LoginSchema):
...
Request Logging Middleware
Log every request with method, path, status code, and duration:
# app/middleware/logging.py
import time, logging
from starlette.middleware.base import BaseHTTPMiddleware
logger = logging.getLogger("taskflow.access")
class RequestLoggingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
start = time.perf_counter()
response = await call_next(request)
duration_ms = (time.perf_counter() - start) * 1000
logger.info(
f"{request.method} {request.url.path} "
f"status={response.status_code} duration={duration_ms:.1f}ms"
)
return response
Health Check Endpoint
A production API needs a health check that verifies all dependencies:
@router.get("/health")
async def health_check(db: AsyncSession = Depends(get_db)):
checks = {"api": "healthy"}
# Database check
try:
await db.execute(text("SELECT 1"))
checks["database"] = "healthy"
except Exception:
checks["database"] = "unhealthy"
# Redis check
try:
await redis_client.ping()
checks["redis"] = "healthy"
except Exception:
checks["redis"] = "unhealthy"
status = 200 if all(v == "healthy" for v in checks.values()) else 503
return JSONResponse(content=checks, status_code=status)
In the lab, you will implement all of these: a full test suite, Redis caching, error handling, rate limiting, and a health check for TaskFlow. :::