الدرس 18 من 23

الإنتاج والموثوقية

معالجة الأخطاء والاحتياطات

4 دقيقة للقراءة

أنظمة الذكاء الاصطناعي تواجه أنماط فشل فريدة—انقطاع API، حدود المعدل، الهلوسة، وحلقات المهلة. يغطي هذا الدرس بناء أنظمة مرنة تتعامل مع الفشل بشكل رشيق.

أنماط الفشل الشائعة

نوع الفشل السبب التأثير
مهلة API مشاكل الشبكة، الحمل الزائد فشل الطلب
تحديد المعدل طلبات كثيرة جداً رفض الطلبات
تجاوز السياق المدخل طويل جداً اقتطاع أو خطأ
حلقات لا نهائية الوكيل عالق في دورة أدوات استنزاف الموارد
الهلوسة النموذج يختلق حقائق مخرجات غير صحيحة
انفجار التكلفة استخدام رموز خارج عن السيطرة تجاوز الميزانية

إعادة المحاولة مع التراجع الأسي

import asyncio
import random
from functools import wraps

class RetryConfig:
    def __init__(
        self,
        max_retries: int = 3,
        base_delay: float = 1.0,
        max_delay: float = 60.0,
        exponential_base: float = 2.0,
        jitter: bool = True
    ):
        self.max_retries = max_retries
        self.base_delay = base_delay
        self.max_delay = max_delay
        self.exponential_base = exponential_base
        self.jitter = jitter

def retry_with_backoff(config: RetryConfig = None):
    config = config or RetryConfig()

    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            last_exception = None

            for attempt in range(config.max_retries + 1):
                try:
                    return await func(*args, **kwargs)

                except (RateLimitError, TimeoutError, APIError) as e:
                    last_exception = e

                    if attempt == config.max_retries:
                        raise

                    # حساب التأخير مع التراجع الأسي
                    delay = min(
                        config.base_delay * (config.exponential_base ** attempt),
                        config.max_delay
                    )

                    # إضافة اهتزاز لمنع تدافع القطيع
                    if config.jitter:
                        delay = delay * (0.5 + random.random())

                    await asyncio.sleep(delay)

            raise last_exception

        return wrapper
    return decorator

# الاستخدام
@retry_with_backoff(RetryConfig(max_retries=3))
async def call_llm(messages):
    return await llm_client.complete(messages)

نمط قاطع الدائرة

منع الفشل المتتالي:

import time
from enum import Enum

class CircuitState(Enum):
    CLOSED = "closed"      # التشغيل الطبيعي
    OPEN = "open"          # فاشل، رفض الطلبات
    HALF_OPEN = "half_open"  # اختبار إذا تعافى

class CircuitBreaker:
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: float = 30.0,
        half_open_requests: int = 3
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.half_open_requests = half_open_requests

        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.last_failure_time = None
        self.half_open_successes = 0

    async def call(self, func, *args, **kwargs):
        # التحقق إذا يجب الانتقال من OPEN إلى HALF_OPEN
        if self.state == CircuitState.OPEN:
            if time.time() - self.last_failure_time > self.recovery_timeout:
                self.state = CircuitState.HALF_OPEN
                self.half_open_successes = 0
            else:
                raise CircuitOpenError("قاطع الدائرة مفتوح")

        try:
            result = await func(*args, **kwargs)

            # معالجة النجاح
            if self.state == CircuitState.HALF_OPEN:
                self.half_open_successes += 1
                if self.half_open_successes >= self.half_open_requests:
                    self.state = CircuitState.CLOSED
                    self.failure_count = 0

            return result

        except Exception as e:
            self.failure_count += 1
            self.last_failure_time = time.time()

            if self.failure_count >= self.failure_threshold:
                self.state = CircuitState.OPEN

            raise

# الاستخدام
openai_circuit = CircuitBreaker(failure_threshold=5)

async def safe_llm_call(messages):
    return await openai_circuit.call(llm_client.complete, messages)

سلسلة النماذج الاحتياطية

عند فشل النموذج الأساسي، استخدم البدائل:

from dataclasses import dataclass
from typing import Optional

@dataclass
class ModelConfig:
    name: str
    client: any
    max_tokens: int
    cost_per_1k: float
    timeout: float = 30.0

class FallbackChain:
    def __init__(self, models: list[ModelConfig]):
        self.models = models
        self.circuit_breakers = {
            m.name: CircuitBreaker() for m in models
        }

    async def complete(self, messages: list, **kwargs) -> tuple:
        """جرب النماذج بالترتيب حتى ينجح واحد."""
        last_error = None

        for model in self.models:
            try:
                circuit = self.circuit_breakers[model.name]

                response = await circuit.call(
                    model.client.complete,
                    messages=messages,
                    timeout=model.timeout,
                    **kwargs
                )

                return response, model.name

            except CircuitOpenError:
                # تخطي هذا النموذج، الدائرة مفتوحة
                continue

            except Exception as e:
                last_error = e
                # سجل وجرب النموذج التالي
                continue

        raise FallbackExhaustedError(
            f"جميع النماذج فشلت. آخر خطأ: {last_error}"
        )

# الاستخدام
fallback = FallbackChain([
    ModelConfig("gpt-4", openai_client, 8192, 0.03),
    ModelConfig("claude-3", anthropic_client, 100000, 0.025),
    ModelConfig("gpt-3.5-turbo", openai_client, 4096, 0.002),
])

response, used_model = await fallback.complete(messages)

منع المهلة والحلقات

import asyncio
from contextlib import asynccontextmanager

class AgentGuardrails:
    def __init__(
        self,
        max_iterations: int = 20,
        max_tool_calls: int = 50,
        max_runtime_seconds: float = 300.0,
        max_tokens_per_run: int = 100000
    ):
        self.max_iterations = max_iterations
        self.max_tool_calls = max_tool_calls
        self.max_runtime_seconds = max_runtime_seconds
        self.max_tokens_per_run = max_tokens_per_run

    @asynccontextmanager
    async def guard(self):
        """مدير سياق لتشغيل الوكيل مع حدود."""
        state = {
            "iterations": 0,
            "tool_calls": 0,
            "tokens_used": 0,
            "start_time": time.time()
        }

        try:
            yield state
        finally:
            # تسجيل الإحصائيات النهائية
            pass

    def check_limits(self, state: dict):
        """أطلق استثناء إذا تم تجاوز أي حد."""
        elapsed = time.time() - state["start_time"]

        if state["iterations"] >= self.max_iterations:
            raise MaxIterationsExceeded(
                f"الوكيل تجاوز {self.max_iterations} تكرار"
            )

        if state["tool_calls"] >= self.max_tool_calls:
            raise MaxToolCallsExceeded(
                f"الوكيل تجاوز {self.max_tool_calls} استدعاء أداة"
            )

        if elapsed >= self.max_runtime_seconds:
            raise AgentTimeoutError(
                f"الوكيل تجاوز {self.max_runtime_seconds} ثانية"
            )

        if state["tokens_used"] >= self.max_tokens_per_run:
            raise TokenBudgetExceeded(
                f"الوكيل تجاوز {self.max_tokens_per_run} رمز"
            )

# الاستخدام
guardrails = AgentGuardrails()

async def run_agent(task: str):
    async with guardrails.guard() as state:
        while True:
            state["iterations"] += 1
            guardrails.check_limits(state)

            response = await llm.complete(messages)
            state["tokens_used"] += response.usage.total_tokens

            if response.tool_calls:
                state["tool_calls"] += len(response.tool_calls)
                # تنفيذ الأدوات...
            else:
                return response.content

التدهور الرشيق

class DegradedResponse:
    """استجابة عندما يكون النظام متدهوراً."""

    def __init__(self, message: str, severity: str, fallback_used: bool):
        self.message = message
        self.severity = severity  # "partial", "cached", "unavailable"
        self.fallback_used = fallback_used

async def handle_with_degradation(request):
    """معالجة الطلب مع تدهور رشيق."""

    try:
        # جرب القدرة الكاملة
        return await full_agent_response(request)

    except ModelUnavailableError:
        # الرجوع لنموذج أبسط
        return DegradedResponse(
            message=await simple_model_response(request),
            severity="partial",
            fallback_used=True
        )

    except CacheHitError as e:
        # إرجاع الاستجابة المخزنة مؤقتاً
        return DegradedResponse(
            message=e.cached_response,
            severity="cached",
            fallback_used=True
        )

    except Exception:
        # إرجاع احتياطي ثابت
        return DegradedResponse(
            message="أواجه مشاكل. يرجى المحاولة مرة أخرى لاحقاً.",
            severity="unavailable",
            fallback_used=True
        )

نصيحة للمقابلة

عند مناقشة معالجة الأخطاء:

  1. التكافؤ - هل يمكن إعادة محاولة الطلبات بأمان؟
  2. الفشل الجزئي - ماذا لو فشلت أداة واحدة فقط؟
  3. تجربة المستخدم - كيف تتواصل بشأن التدهور؟
  4. التعافي - كيف يتعافى النظام بعد الانقطاعات؟

بعد ذلك، سنغطي حواجز الأمان وتصفية المحتوى. :::

اختبار

الوحدة 5: الإنتاج والموثوقية

خذ الاختبار