المرحلة 4: الاختبارات وتقوية الإنتاج
الاختبارات وتقوية الإنتاج
واجهة TaskFlow البرمجية لديها المصادقة ونماذج قاعدة البيانات ونقاط نهاية CRUD. قبل شحنها، تحتاج شيئين: اختبارات تثبت أنها تعمل وتقوية إنتاجية تبقيها تعمل.
هرم الاختبارات لواجهات البرمجة
| الطبقة | ما تختبره | السرعة | الأدوات |
|---|---|---|---|
| الوحدة | الدوال الفردية، المدققات، الأدوات المساعدة | الأسرع | pytest |
| التكامل | نقطة النهاية + قاعدة البيانات + المصادقة معًا | متوسط | httpx + TestClient |
| من البداية للنهاية | دورة الطلب الكاملة، التدفقات متعددة الخطوات | الأبطأ | httpx.AsyncClient |
لواجهة REST API مثل TaskFlow، اختبارات التكامل تعطيك أكبر قيمة. فهي تمارس المسارات واستعلامات قاعدة البيانات والمصادقة في ضربة واحدة.
pytest 9.0: التركيبات واختبارات async
يستخدم pytest التركيبات (fixtures) لإعداد وتفكيك تبعيات الاختبار. لـ FastAPI، تحتاج ثلاث تركيبات أساسية: قاعدة بيانات اختبار، وجلسة اختبار، وعميل اختبار.
# tests/conftest.py
import pytest
from httpx import ASGITransport, AsyncClient
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from app.main import app
from app.database import Base, get_db
from app.config import settings
# استخدام قاعدة بيانات اختبار منفصلة
TEST_DATABASE_URL = settings.database_url.replace(
"/taskflow", "/taskflow_test"
)
engine_test = create_async_engine(TEST_DATABASE_URL, echo=False)
async_session_test = async_sessionmaker(engine_test, class_=AsyncSession, expire_on_commit=False)
@pytest.fixture(scope="session", autouse=True)
async def setup_database():
"""إنشاء جميع الجداول قبل الاختبارات وحذفها بعدها."""
async with engine_test.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield
async with engine_test.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
@pytest.fixture
async def db_session():
"""توفير جلسة قاعدة بيانات معاملاتية تتراجع بعد كل اختبار."""
async with async_session_test() as session:
yield session
await session.rollback()
@pytest.fixture
async def client(db_session):
"""عميل HTTP للاختبار مع تجاوز تبعية قاعدة البيانات."""
async def override_get_db():
yield db_session
app.dependency_overrides[get_db] = override_get_db
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
yield ac
app.dependency_overrides.clear()
TestClient مقابل httpx.AsyncClient
| الميزة | TestClient (متزامن) |
httpx.AsyncClient (غير متزامن) |
|---|---|---|
| الاستيراد | from fastapi.testclient import TestClient |
from httpx import AsyncClient |
| دعم async | لا (يغلف استدعاءات متزامنة) | نعم (async/await أصلي) |
| استخدمه عندما | اختبارات متزامنة سريعة، بدون تبعيات async | جلسات قاعدة بيانات async، أنماط واقعية |
| توصية FastAPI | الحالات البسيطة | مفضل للتطبيقات غير المتزامنة |
بما أن TaskFlow يستخدم SQLAlchemy غير المتزامن، نستخدم httpx.AsyncClient مع ASGITransport.
كتابة اختبارات API فعالة
نظّم اختباراتك حول الإجراءات والصلاحيات:
# tests/test_auth.py
import pytest
@pytest.mark.anyio
async def test_register_user(client):
response = await client.post("/api/v1/auth/register", json={
"email": "test@example.com",
"password": "SecurePass123!",
"full_name": "Test User"
})
assert response.status_code == 201
assert response.json()["email"] == "test@example.com"
assert "password" not in response.json()
@pytest.mark.anyio
async def test_login_returns_token(client):
# التسجيل أولاً ثم تسجيل الدخول
await client.post("/api/v1/auth/register", json={
"email": "login@example.com",
"password": "SecurePass123!",
"full_name": "Login User"
})
response = await client.post("/api/v1/auth/login", json={
"email": "login@example.com",
"password": "SecurePass123!"
})
assert response.status_code == 200
assert "access_token" in response.json()
@pytest.mark.anyio
async def test_protected_route_without_token(client):
response = await client.get("/api/v1/projects")
assert response.status_code == 401
تخزين Redis المؤقت: نمط Cache-Aside
نمط cache-aside (التحميل الكسول) مثالي لنقاط نهاية GET: تحقق من Redis أولاً، ثم ارجع لقاعدة البيانات، ثم خزّن النتيجة.
# app/cache.py
import json
from redis.asyncio import Redis
redis_client = Redis(host="localhost", port=6379, db=0, decode_responses=True)
CACHE_TTL = 60 # ثواني
async def get_cached(key: str) -> dict | None:
"""إرجاع البيانات المخزنة مؤقتًا أو None."""
data = await redis_client.get(key)
return json.loads(data) if data else None
async def set_cached(key: str, value: dict, ttl: int = CACHE_TTL):
"""تخزين البيانات مع مدة صلاحية."""
await redis_client.set(key, json.dumps(value), ex=ttl)
async def invalidate_cache(pattern: str):
"""حذف جميع المفاتيح المطابقة لنمط."""
keys = []
async for key in redis_client.scan_iter(match=pattern):
keys.append(key)
if keys:
await redis_client.delete(*keys)
الاستخدام في مسار:
@router.get("/projects")
async def list_projects(current_user: User = Depends(get_current_user)):
cache_key = f"projects:user:{current_user.id}"
cached = await get_cached(cache_key)
if cached:
return cached
projects = await project_service.list_for_user(current_user.id)
result = [ProjectSchema.model_validate(p).model_dump() for p in projects]
await set_cached(cache_key, result, ttl=60)
return result
عند تغيير البيانات، أبطل التخزين المؤقت:
@router.post("/projects", status_code=201)
async def create_project(data: ProjectCreate, current_user: User = Depends(get_current_user)):
project = await project_service.create(data, current_user.id)
await invalidate_cache(f"projects:user:{current_user.id}*")
return project
وسيط معالجة الأخطاء
معالج استثناءات عام يضمن أن كل خطأ يرجع JSON متسق بدلاً من تتبعات المكدس الخام:
# app/middleware/error_handler.py
from fastapi import Request
from fastapi.responses import JSONResponse
from starlette.middleware.base import BaseHTTPMiddleware
import logging
logger = logging.getLogger(__name__)
class ErrorHandlerMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
try:
return await call_next(request)
except Exception as exc:
logger.exception(f"Unhandled error: {exc}")
return JSONResponse(
status_code=500,
content={
"error": "internal_server_error",
"message": "An unexpected error occurred",
"detail": str(exc) if settings.DEBUG else None,
},
)
سجّله في main.py:
from app.middleware.error_handler import ErrorHandlerMiddleware
app.add_middleware(ErrorHandlerMiddleware)
تحديد المعدل
احمِ نقاط نهاية المصادقة من هجمات القوة الغاشمة باستخدام slowapi:
# app/middleware/rate_limit.py
from slowapi import Limiter
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
# في مسار المصادقة:
@router.post("/login")
@limiter.limit("5/minute")
async def login(request: Request, credentials: LoginSchema):
...
وسيط تسجيل الطلبات
سجّل كل طلب مع الطريقة والمسار ورمز الحالة والمدة:
# app/middleware/logging.py
import time, logging
from starlette.middleware.base import BaseHTTPMiddleware
logger = logging.getLogger("taskflow.access")
class RequestLoggingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
start = time.perf_counter()
response = await call_next(request)
duration_ms = (time.perf_counter() - start) * 1000
logger.info(
f"{request.method} {request.url.path} "
f"status={response.status_code} duration={duration_ms:.1f}ms"
)
return response
نقطة نهاية فحص الصحة
واجهة API إنتاجية تحتاج فحص صحة يتحقق من جميع التبعيات:
@router.get("/health")
async def health_check(db: AsyncSession = Depends(get_db)):
checks = {"api": "healthy"}
# فحص قاعدة البيانات
try:
await db.execute(text("SELECT 1"))
checks["database"] = "healthy"
except Exception:
checks["database"] = "unhealthy"
# فحص Redis
try:
await redis_client.ping()
checks["redis"] = "healthy"
except Exception:
checks["redis"] = "unhealthy"
status = 200 if all(v == "healthy" for v in checks.values()) else 503
return JSONResponse(content=checks, status_code=status)
في المعمل، ستنفذ كل هذا: مجموعة اختبارات كاملة، تخزين Redis المؤقت، معالجة الأخطاء، تحديد المعدل، وفحص صحة لـ TaskFlow. :::