Phase 4: Testing & Production Hardening

Testing & Production Hardening

3 min read

Your TaskFlow API has authentication, database models, and CRUD endpoints. Before shipping it, you need two things: tests that prove it works and production hardening that keeps it running.

The Testing Pyramid for APIs

Layer What It Tests Speed Tools
Unit Individual functions, validators, utilities Fastest pytest
Integration Endpoint + database + auth together Medium httpx + TestClient
End-to-end Full request lifecycle, multi-step flows Slowest httpx.AsyncClient

For a REST API like TaskFlow, integration tests give you the most value. They exercise your routes, database queries, and authentication in one shot.

pytest 9.0 Fixtures and Async Testing

pytest uses fixtures to set up and tear down test dependencies. For FastAPI, you need three core fixtures: a test database, a test session, and a test client.

# tests/conftest.py
import pytest
from httpx import ASGITransport, AsyncClient
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession

from app.main import app
from app.database import Base, get_db
from app.config import settings

# Use a separate test database
TEST_DATABASE_URL = settings.database_url.replace(
    "/taskflow", "/taskflow_test"
)

engine_test = create_async_engine(TEST_DATABASE_URL, echo=False)
async_session_test = async_sessionmaker(engine_test, class_=AsyncSession, expire_on_commit=False)


@pytest.fixture(scope="session", autouse=True)
async def setup_database():
    """Create all tables before tests, drop after."""
    async with engine_test.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
    yield
    async with engine_test.begin() as conn:
        await conn.run_sync(Base.metadata.drop_all)


@pytest.fixture
async def db_session():
    """Provide a transactional database session that rolls back after each test."""
    async with async_session_test() as session:
        yield session
        await session.rollback()


@pytest.fixture
async def client(db_session):
    """HTTP test client with database dependency override."""
    async def override_get_db():
        yield db_session

    app.dependency_overrides[get_db] = override_get_db
    transport = ASGITransport(app=app)
    async with AsyncClient(transport=transport, base_url="http://test") as ac:
        yield ac
    app.dependency_overrides.clear()

TestClient vs httpx.AsyncClient

Feature TestClient (sync) httpx.AsyncClient (async)
Import from fastapi.testclient import TestClient from httpx import AsyncClient
Async support No (wraps sync calls) Yes (native async/await)
Use when Quick sync tests, no async deps Async database sessions, real-world patterns
FastAPI recommendation Simple cases Preferred for async apps

Since TaskFlow uses async SQLAlchemy, we use httpx.AsyncClient with ASGITransport.

Writing Effective API Tests

Structure your tests around actions and permissions:

# tests/test_auth.py
import pytest

@pytest.mark.anyio
async def test_register_user(client):
    response = await client.post("/api/v1/auth/register", json={
        "email": "test@example.com",
        "password": "SecurePass123!",
        "full_name": "Test User"
    })
    assert response.status_code == 201
    assert response.json()["email"] == "test@example.com"
    assert "password" not in response.json()


@pytest.mark.anyio
async def test_login_returns_token(client):
    # Register first, then login
    await client.post("/api/v1/auth/register", json={
        "email": "login@example.com",
        "password": "SecurePass123!",
        "full_name": "Login User"
    })
    response = await client.post("/api/v1/auth/login", json={
        "email": "login@example.com",
        "password": "SecurePass123!"
    })
    assert response.status_code == 200
    assert "access_token" in response.json()


@pytest.mark.anyio
async def test_protected_route_without_token(client):
    response = await client.get("/api/v1/projects")
    assert response.status_code == 401

Redis Caching: Cache-Aside Pattern

The cache-aside (lazy-loading) pattern is ideal for GET endpoints: check Redis first, fall back to the database, then cache the result.

# app/cache.py
import json
from redis.asyncio import Redis

redis_client = Redis(host="localhost", port=6379, db=0, decode_responses=True)

CACHE_TTL = 60  # seconds


async def get_cached(key: str) -> dict | None:
    """Return cached data or None."""
    data = await redis_client.get(key)
    return json.loads(data) if data else None


async def set_cached(key: str, value: dict, ttl: int = CACHE_TTL):
    """Cache data with TTL."""
    await redis_client.set(key, json.dumps(value), ex=ttl)


async def invalidate_cache(pattern: str):
    """Delete all keys matching a pattern."""
    keys = []
    async for key in redis_client.scan_iter(match=pattern):
        keys.append(key)
    if keys:
        await redis_client.delete(*keys)

Usage in a route:

@router.get("/projects")
async def list_projects(current_user: User = Depends(get_current_user)):
    cache_key = f"projects:user:{current_user.id}"
    cached = await get_cached(cache_key)
    if cached:
        return cached

    projects = await project_service.list_for_user(current_user.id)
    result = [ProjectSchema.model_validate(p).model_dump() for p in projects]
    await set_cached(cache_key, result, ttl=60)
    return result

When data changes, invalidate the cache:

@router.post("/projects", status_code=201)
async def create_project(data: ProjectCreate, current_user: User = Depends(get_current_user)):
    project = await project_service.create(data, current_user.id)
    await invalidate_cache(f"projects:user:{current_user.id}*")
    return project

Error Handling Middleware

A global exception handler ensures every error returns consistent JSON instead of raw stack traces:

# app/middleware/error_handler.py
from fastapi import Request
from fastapi.responses import JSONResponse
from starlette.middleware.base import BaseHTTPMiddleware
import logging

logger = logging.getLogger(__name__)


class ErrorHandlerMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        try:
            return await call_next(request)
        except Exception as exc:
            logger.exception(f"Unhandled error: {exc}")
            return JSONResponse(
                status_code=500,
                content={
                    "error": "internal_server_error",
                    "message": "An unexpected error occurred",
                    "detail": str(exc) if settings.DEBUG else None,
                },
            )

Register it in main.py:

from app.middleware.error_handler import ErrorHandlerMiddleware

app.add_middleware(ErrorHandlerMiddleware)

Rate Limiting

Protect auth endpoints from brute-force attacks using slowapi:

# app/middleware/rate_limit.py
from slowapi import Limiter
from slowapi.util import get_remote_address

limiter = Limiter(key_func=get_remote_address)

# In your auth router:
@router.post("/login")
@limiter.limit("5/minute")
async def login(request: Request, credentials: LoginSchema):
    ...

Request Logging Middleware

Log every request with method, path, status code, and duration:

# app/middleware/logging.py
import time, logging
from starlette.middleware.base import BaseHTTPMiddleware

logger = logging.getLogger("taskflow.access")


class RequestLoggingMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request, call_next):
        start = time.perf_counter()
        response = await call_next(request)
        duration_ms = (time.perf_counter() - start) * 1000
        logger.info(
            f"{request.method} {request.url.path} "
            f"status={response.status_code} duration={duration_ms:.1f}ms"
        )
        return response

Health Check Endpoint

A production API needs a health check that verifies all dependencies:

@router.get("/health")
async def health_check(db: AsyncSession = Depends(get_db)):
    checks = {"api": "healthy"}
    # Database check
    try:
        await db.execute(text("SELECT 1"))
        checks["database"] = "healthy"
    except Exception:
        checks["database"] = "unhealthy"
    # Redis check
    try:
        await redis_client.ping()
        checks["redis"] = "healthy"
    except Exception:
        checks["redis"] = "unhealthy"

    status = 200 if all(v == "healthy" for v in checks.values()) else 503
    return JSONResponse(content=checks, status_code=status)

In the lab, you will implement all of these: a full test suite, Redis caching, error handling, rate limiting, and a health check for TaskFlow. :::

Quiz

Module 4 Quiz: Testing & Production Hardening

Take Quiz