الإنتاج والموثوقية
معالجة الأخطاء والاحتياطات
4 دقيقة للقراءة
أنظمة الذكاء الاصطناعي تواجه أنماط فشل فريدة—انقطاع API، حدود المعدل، الهلوسة، وحلقات المهلة. يغطي هذا الدرس بناء أنظمة مرنة تتعامل مع الفشل بشكل رشيق.
أنماط الفشل الشائعة
| نوع الفشل | السبب | التأثير |
|---|---|---|
| مهلة API | مشاكل الشبكة، الحمل الزائد | فشل الطلب |
| تحديد المعدل | طلبات كثيرة جداً | رفض الطلبات |
| تجاوز السياق | المدخل طويل جداً | اقتطاع أو خطأ |
| حلقات لا نهائية | الوكيل عالق في دورة أدوات | استنزاف الموارد |
| الهلوسة | النموذج يختلق حقائق | مخرجات غير صحيحة |
| انفجار التكلفة | استخدام رموز خارج عن السيطرة | تجاوز الميزانية |
إعادة المحاولة مع التراجع الأسي
import asyncio
import random
from functools import wraps
class RetryConfig:
def __init__(
self,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0,
jitter: bool = True
):
self.max_retries = max_retries
self.base_delay = base_delay
self.max_delay = max_delay
self.exponential_base = exponential_base
self.jitter = jitter
def retry_with_backoff(config: RetryConfig = None):
config = config or RetryConfig()
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(config.max_retries + 1):
try:
return await func(*args, **kwargs)
except (RateLimitError, TimeoutError, APIError) as e:
last_exception = e
if attempt == config.max_retries:
raise
# حساب التأخير مع التراجع الأسي
delay = min(
config.base_delay * (config.exponential_base ** attempt),
config.max_delay
)
# إضافة اهتزاز لمنع تدافع القطيع
if config.jitter:
delay = delay * (0.5 + random.random())
await asyncio.sleep(delay)
raise last_exception
return wrapper
return decorator
# الاستخدام
@retry_with_backoff(RetryConfig(max_retries=3))
async def call_llm(messages):
return await llm_client.complete(messages)
نمط قاطع الدائرة
منع الفشل المتتالي:
import time
from enum import Enum
class CircuitState(Enum):
CLOSED = "closed" # التشغيل الطبيعي
OPEN = "open" # فاشل، رفض الطلبات
HALF_OPEN = "half_open" # اختبار إذا تعافى
class CircuitBreaker:
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 30.0,
half_open_requests: int = 3
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.half_open_requests = half_open_requests
self.state = CircuitState.CLOSED
self.failure_count = 0
self.last_failure_time = None
self.half_open_successes = 0
async def call(self, func, *args, **kwargs):
# التحقق إذا يجب الانتقال من OPEN إلى HALF_OPEN
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
self.half_open_successes = 0
else:
raise CircuitOpenError("قاطع الدائرة مفتوح")
try:
result = await func(*args, **kwargs)
# معالجة النجاح
if self.state == CircuitState.HALF_OPEN:
self.half_open_successes += 1
if self.half_open_successes >= self.half_open_requests:
self.state = CircuitState.CLOSED
self.failure_count = 0
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
raise
# الاستخدام
openai_circuit = CircuitBreaker(failure_threshold=5)
async def safe_llm_call(messages):
return await openai_circuit.call(llm_client.complete, messages)
سلسلة النماذج الاحتياطية
عند فشل النموذج الأساسي، استخدم البدائل:
from dataclasses import dataclass
from typing import Optional
@dataclass
class ModelConfig:
name: str
client: any
max_tokens: int
cost_per_1k: float
timeout: float = 30.0
class FallbackChain:
def __init__(self, models: list[ModelConfig]):
self.models = models
self.circuit_breakers = {
m.name: CircuitBreaker() for m in models
}
async def complete(self, messages: list, **kwargs) -> tuple:
"""جرب النماذج بالترتيب حتى ينجح واحد."""
last_error = None
for model in self.models:
try:
circuit = self.circuit_breakers[model.name]
response = await circuit.call(
model.client.complete,
messages=messages,
timeout=model.timeout,
**kwargs
)
return response, model.name
except CircuitOpenError:
# تخطي هذا النموذج، الدائرة مفتوحة
continue
except Exception as e:
last_error = e
# سجل وجرب النموذج التالي
continue
raise FallbackExhaustedError(
f"جميع النماذج فشلت. آخر خطأ: {last_error}"
)
# الاستخدام
fallback = FallbackChain([
ModelConfig("gpt-4", openai_client, 8192, 0.03),
ModelConfig("claude-3", anthropic_client, 100000, 0.025),
ModelConfig("gpt-3.5-turbo", openai_client, 4096, 0.002),
])
response, used_model = await fallback.complete(messages)
منع المهلة والحلقات
import asyncio
from contextlib import asynccontextmanager
class AgentGuardrails:
def __init__(
self,
max_iterations: int = 20,
max_tool_calls: int = 50,
max_runtime_seconds: float = 300.0,
max_tokens_per_run: int = 100000
):
self.max_iterations = max_iterations
self.max_tool_calls = max_tool_calls
self.max_runtime_seconds = max_runtime_seconds
self.max_tokens_per_run = max_tokens_per_run
@asynccontextmanager
async def guard(self):
"""مدير سياق لتشغيل الوكيل مع حدود."""
state = {
"iterations": 0,
"tool_calls": 0,
"tokens_used": 0,
"start_time": time.time()
}
try:
yield state
finally:
# تسجيل الإحصائيات النهائية
pass
def check_limits(self, state: dict):
"""أطلق استثناء إذا تم تجاوز أي حد."""
elapsed = time.time() - state["start_time"]
if state["iterations"] >= self.max_iterations:
raise MaxIterationsExceeded(
f"الوكيل تجاوز {self.max_iterations} تكرار"
)
if state["tool_calls"] >= self.max_tool_calls:
raise MaxToolCallsExceeded(
f"الوكيل تجاوز {self.max_tool_calls} استدعاء أداة"
)
if elapsed >= self.max_runtime_seconds:
raise AgentTimeoutError(
f"الوكيل تجاوز {self.max_runtime_seconds} ثانية"
)
if state["tokens_used"] >= self.max_tokens_per_run:
raise TokenBudgetExceeded(
f"الوكيل تجاوز {self.max_tokens_per_run} رمز"
)
# الاستخدام
guardrails = AgentGuardrails()
async def run_agent(task: str):
async with guardrails.guard() as state:
while True:
state["iterations"] += 1
guardrails.check_limits(state)
response = await llm.complete(messages)
state["tokens_used"] += response.usage.total_tokens
if response.tool_calls:
state["tool_calls"] += len(response.tool_calls)
# تنفيذ الأدوات...
else:
return response.content
التدهور الرشيق
class DegradedResponse:
"""استجابة عندما يكون النظام متدهوراً."""
def __init__(self, message: str, severity: str, fallback_used: bool):
self.message = message
self.severity = severity # "partial", "cached", "unavailable"
self.fallback_used = fallback_used
async def handle_with_degradation(request):
"""معالجة الطلب مع تدهور رشيق."""
try:
# جرب القدرة الكاملة
return await full_agent_response(request)
except ModelUnavailableError:
# الرجوع لنموذج أبسط
return DegradedResponse(
message=await simple_model_response(request),
severity="partial",
fallback_used=True
)
except CacheHitError as e:
# إرجاع الاستجابة المخزنة مؤقتاً
return DegradedResponse(
message=e.cached_response,
severity="cached",
fallback_used=True
)
except Exception:
# إرجاع احتياطي ثابت
return DegradedResponse(
message="أواجه مشاكل. يرجى المحاولة مرة أخرى لاحقاً.",
severity="unavailable",
fallback_used=True
)
نصيحة للمقابلة
عند مناقشة معالجة الأخطاء:
- التكافؤ - هل يمكن إعادة محاولة الطلبات بأمان؟
- الفشل الجزئي - ماذا لو فشلت أداة واحدة فقط؟
- تجربة المستخدم - كيف تتواصل بشأن التدهور؟
- التعافي - كيف يتعافى النظام بعد الانقطاعات؟
بعد ذلك، سنغطي حواجز الأمان وتصفية المحتوى. :::