المرحلة 4: الاختبارات وتقوية الإنتاج

الاختبارات وتقوية الإنتاج

3 دقيقة للقراءة

واجهة TaskFlow البرمجية لديها المصادقة ونماذج قاعدة البيانات ونقاط نهاية CRUD. قبل شحنها، تحتاج شيئين: اختبارات تثبت أنها تعمل وتقوية إنتاجية تبقيها تعمل.

هرم الاختبارات لواجهات البرمجة

الطبقة ما تختبره السرعة الأدوات
الوحدة الدوال الفردية، المدققات، الأدوات المساعدة الأسرع pytest
التكامل نقطة النهاية + قاعدة البيانات + المصادقة معًا متوسط httpx + TestClient
من البداية للنهاية دورة الطلب الكاملة، التدفقات متعددة الخطوات الأبطأ httpx.AsyncClient

لواجهة REST API مثل TaskFlow، اختبارات التكامل تعطيك أكبر قيمة. فهي تمارس المسارات واستعلامات قاعدة البيانات والمصادقة في ضربة واحدة.

pytest 9.0: التركيبات واختبارات async

يستخدم pytest التركيبات (fixtures) لإعداد وتفكيك تبعيات الاختبار. لـ FastAPI، تحتاج ثلاث تركيبات أساسية: قاعدة بيانات اختبار، وجلسة اختبار، وعميل اختبار.

# tests/conftest.py
import pytest
from httpx import ASGITransport, AsyncClient
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession

from app.main import app
from app.database import Base, get_db
from app.config import settings

# استخدام قاعدة بيانات اختبار منفصلة
TEST_DATABASE_URL = settings.database_url.replace(
    "/taskflow", "/taskflow_test"
)

engine_test = create_async_engine(TEST_DATABASE_URL, echo=False)
async_session_test = async_sessionmaker(engine_test, class_=AsyncSession, expire_on_commit=False)


@pytest.fixture(scope="session", autouse=True)
async def setup_database():
    """إنشاء جميع الجداول قبل الاختبارات وحذفها بعدها."""
    async with engine_test.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
    yield
    async with engine_test.begin() as conn:
        await conn.run_sync(Base.metadata.drop_all)


@pytest.fixture
async def db_session():
    """توفير جلسة قاعدة بيانات معاملاتية تتراجع بعد كل اختبار."""
    async with async_session_test() as session:
        yield session
        await session.rollback()


@pytest.fixture
async def client(db_session):
    """عميل HTTP للاختبار مع تجاوز تبعية قاعدة البيانات."""
    async def override_get_db():
        yield db_session

    app.dependency_overrides[get_db] = override_get_db
    transport = ASGITransport(app=app)
    async with AsyncClient(transport=transport, base_url="http://test") as ac:
        yield ac
    app.dependency_overrides.clear()

TestClient مقابل httpx.AsyncClient

الميزة TestClient (متزامن) httpx.AsyncClient (غير متزامن)
الاستيراد from fastapi.testclient import TestClient from httpx import AsyncClient
دعم async لا (يغلف استدعاءات متزامنة) نعم (async/await أصلي)
استخدمه عندما اختبارات متزامنة سريعة، بدون تبعيات async جلسات قاعدة بيانات async، أنماط واقعية
توصية FastAPI الحالات البسيطة مفضل للتطبيقات غير المتزامنة

بما أن TaskFlow يستخدم SQLAlchemy غير المتزامن، نستخدم httpx.AsyncClient مع ASGITransport.

كتابة اختبارات API فعالة

نظّم اختباراتك حول الإجراءات والصلاحيات:

# tests/test_auth.py
import pytest

@pytest.mark.anyio
async def test_register_user(client):
    response = await client.post("/api/v1/auth/register", json={
        "email": "test@example.com",
        "password": "SecurePass123!",
        "full_name": "Test User"
    })
    assert response.status_code == 201
    assert response.json()["email"] == "test@example.com"
    assert "password" not in response.json()


@pytest.mark.anyio
async def test_login_returns_token(client):
    # التسجيل أولاً ثم تسجيل الدخول
    await client.post("/api/v1/auth/register", json={
        "email": "login@example.com",
        "password": "SecurePass123!",
        "full_name": "Login User"
    })
    response = await client.post("/api/v1/auth/login", json={
        "email": "login@example.com",
        "password": "SecurePass123!"
    })
    assert response.status_code == 200
    assert "access_token" in response.json()


@pytest.mark.anyio
async def test_protected_route_without_token(client):
    response = await client.get("/api/v1/projects")
    assert response.status_code == 401

تخزين Redis المؤقت: نمط Cache-Aside

نمط cache-aside (التحميل الكسول) مثالي لنقاط نهاية GET: تحقق من Redis أولاً، ثم ارجع لقاعدة البيانات، ثم خزّن النتيجة.

# app/cache.py
import json
from redis.asyncio import Redis

redis_client = Redis(host="localhost", port=6379, db=0, decode_responses=True)

CACHE_TTL = 60  # ثواني


async def get_cached(key: str) -> dict | None:
    """إرجاع البيانات المخزنة مؤقتًا أو None."""
    data = await redis_client.get(key)
    return json.loads(data) if data else None


async def set_cached(key: str, value: dict, ttl: int = CACHE_TTL):
    """تخزين البيانات مع مدة صلاحية."""
    await redis_client.set(key, json.dumps(value), ex=ttl)


async def invalidate_cache(pattern: str):
    """حذف جميع المفاتيح المطابقة لنمط."""
    keys = []
    async for key in redis_client.scan_iter(match=pattern):
        keys.append(key)
    if keys:
        await redis_client.delete(*keys)

الاستخدام في مسار:

@router.get("/projects")
async def list_projects(current_user: User = Depends(get_current_user)):
    cache_key = f"projects:user:{current_user.id}"
    cached = await get_cached(cache_key)
    if cached:
        return cached

    projects = await project_service.list_for_user(current_user.id)
    result = [ProjectSchema.model_validate(p).model_dump() for p in projects]
    await set_cached(cache_key, result, ttl=60)
    return result

عند تغيير البيانات، أبطل التخزين المؤقت:

@router.post("/projects", status_code=201)
async def create_project(data: ProjectCreate, current_user: User = Depends(get_current_user)):
    project = await project_service.create(data, current_user.id)
    await invalidate_cache(f"projects:user:{current_user.id}*")
    return project

وسيط معالجة الأخطاء

معالج استثناءات عام يضمن أن كل خطأ يرجع JSON متسق بدلاً من تتبعات المكدس الخام:

# app/middleware/error_handler.py
from fastapi import Request
from fastapi.responses import JSONResponse
from starlette.middleware.base import BaseHTTPMiddleware
import logging

logger = logging.getLogger(__name__)


class ErrorHandlerMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        try:
            return await call_next(request)
        except Exception as exc:
            logger.exception(f"Unhandled error: {exc}")
            return JSONResponse(
                status_code=500,
                content={
                    "error": "internal_server_error",
                    "message": "An unexpected error occurred",
                    "detail": str(exc) if settings.DEBUG else None,
                },
            )

سجّله في main.py:

from app.middleware.error_handler import ErrorHandlerMiddleware

app.add_middleware(ErrorHandlerMiddleware)

تحديد المعدل

احمِ نقاط نهاية المصادقة من هجمات القوة الغاشمة باستخدام slowapi:

# app/middleware/rate_limit.py
from slowapi import Limiter
from slowapi.util import get_remote_address

limiter = Limiter(key_func=get_remote_address)

# في مسار المصادقة:
@router.post("/login")
@limiter.limit("5/minute")
async def login(request: Request, credentials: LoginSchema):
    ...

وسيط تسجيل الطلبات

سجّل كل طلب مع الطريقة والمسار ورمز الحالة والمدة:

# app/middleware/logging.py
import time, logging
from starlette.middleware.base import BaseHTTPMiddleware

logger = logging.getLogger("taskflow.access")


class RequestLoggingMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request, call_next):
        start = time.perf_counter()
        response = await call_next(request)
        duration_ms = (time.perf_counter() - start) * 1000
        logger.info(
            f"{request.method} {request.url.path} "
            f"status={response.status_code} duration={duration_ms:.1f}ms"
        )
        return response

نقطة نهاية فحص الصحة

واجهة API إنتاجية تحتاج فحص صحة يتحقق من جميع التبعيات:

@router.get("/health")
async def health_check(db: AsyncSession = Depends(get_db)):
    checks = {"api": "healthy"}
    # فحص قاعدة البيانات
    try:
        await db.execute(text("SELECT 1"))
        checks["database"] = "healthy"
    except Exception:
        checks["database"] = "unhealthy"
    # فحص Redis
    try:
        await redis_client.ping()
        checks["redis"] = "healthy"
    except Exception:
        checks["redis"] = "unhealthy"

    status = 200 if all(v == "healthy" for v in checks.values()) else 503
    return JSONResponse(content=checks, status_code=status)

في المعمل، ستنفذ كل هذا: مجموعة اختبارات كاملة، تخزين Redis المؤقت، معالجة الأخطاء، تحديد المعدل، وفحص صحة لـ TaskFlow. :::

اختبار

اختبار الوحدة 4: الاختبارات وتقوية الإنتاج

خذ الاختبار