أساسيات إدارة الحالة
أنماط الحالة المتقدمة في الإنتاج
لماذا أنماط الحالة المتقدمة مهمة
حادثة إنتاجية حقيقية (يناير 2026):
تعطل نظام تحليل المستندات القانونية متعدد الوكلاء في مكتب محاماة بعد 48 ساعة من معالجة 100K+ صفحة. السبب الجذري؟ لا توجد مصادقة على الحالة: بيانات تالفة من عقدة OCR فاشلة انتشرت عبر خط الأنابيب، مما تسبب في فشل مكالمات LLM اللاحقة مع أخطاء تجاوز السياق.
بعد تنفيذ المصادقة، وحالات الخطأ، وحدود الحجم، عمل النظام لمدة 7 أيام متواصلة معالجاً 500K+ صفحة مع صفر أعطال.
هذا الدرس يعلمك: أنماط الحالة المعززة للإنتاج للمصادقة، واستعادة الخطأ، وسير العمل طويل المدى التي تتوقعها شركات مثل Anthropic و Meta في مهندسي LangGraph الكبار.
مصادقة الحالة: احمِ مدخلاتك
المصادقة في زمن التشغيل مع Pydantic
المشكلة: إدخال المستخدم أو واجهات برمجة التطبيقات الخارجية يمكن أن ترسل بيانات مشوهة.
الحل: المصادقة عند نقطة دخول الرسم البياني.
from pydantic import BaseModel, Field, field_validator, ValidationError
from typing import Annotated, Optional
import operator
from langgraph.graph import StateGraph
class ValidatedInput(BaseModel):
"""نموذج Pydantic لمصادقة إدخال API."""
query: str = Field(..., min_length=10, max_length=1000)
max_documents: int = Field(default=10, ge=1, le=100)
search_depth: int = Field(default=1, ge=1, le=3)
@field_validator('query')
@classmethod
def no_malicious_content(cls, v: str) -> str:
"""حظر حقن الأوامر المحتمل."""
forbidden = ['ignore previous', 'disregard', 'system:', 'admin']
if any(term in v.lower() for term in forbidden):
raise ValueError(f"Query contains forbidden terms")
return v
class AgentState(TypedDict):
"""الحالة الداخلية (TypedDict للأداء)."""
query: str
documents: Annotated[list[str], operator.add]
analysis: Optional[str]
error: Optional[str]
def validate_and_convert_input(user_input: dict) -> AgentState:
"""
نقطة الدخول: المصادقة مع Pydantic، التحويل إلى TypedDict.
اعتباراً من يناير 2026، هذا هو النمط الموصى به.
"""
try:
validated = ValidatedInput(**user_input)
return AgentState(
query=validated.query,
documents=[],
analysis=None,
error=None
)
except ValidationError as e:
# إرجاع حالة خطأ بدلاً من الرفع
return AgentState(
query=user_input.get("query", ""),
documents=[],
analysis=None,
error=f"Validation failed: {e.errors()}"
)
# الاستخدام
graph = StateGraph(AgentState)
# المصادقة قبل الاستدعاء
raw_input = {"query": "short", "max_documents": 200} # غير صالح!
state = validate_and_convert_input(raw_input)
if state["error"]:
print(f"Invalid input: {state['error']}")
else:
result = graph.invoke(state)
نمط الإنتاج:
Pydantic عند الحدود (إدخال/إخراج API)، TypedDict داخلياً (تكلفة صفرية).
إدارة حجم الحالة لسير العمل طويل المدى
المشكلة: النمو غير المحدود
# بعد 1000 تكرار:
state = {
"documents": ["doc1", "doc2", ..., "doc10000"], # 50MB+
"messages": [...], # 20MB أخرى
"traces": [...] # 30MB أخرى
}
# الإجمالي: 100MB+ لكل نقطة تفتيش! PostgresSaver يتباطأ.
الحل 1: التقليم الدوري
from typing import TypedDict, Annotated, Optional
import operator
class ManagedState(TypedDict):
"""حالة مع إدارة الحجم."""
documents: Annotated[list[str], operator.add]
messages: Annotated[list[dict], operator.add]
# تتبع الحجم
state_size_mb: float
max_state_size_mb: float # مثل، 10MB
def estimate_size_mb(state: dict) -> float:
"""تقدير حجم الحالة بالميغابايت (تقريب سريع)."""
import json
return len(json.dumps(state, default=str).encode('utf-8')) / (1024 * 1024)
def prune_state(state: ManagedState) -> dict:
"""
عقدة المشرف: تقليم الحالة إذا كانت كبيرة جداً.
تُستدعى كل N تكرار (مثل، كل 10 عقد).
"""
current_size = estimate_size_mb(state)
updates = {"state_size_mb": current_size}
if current_size > state["max_state_size_mb"]:
# الاستراتيجية 1: احتفظ بالبيانات الحديثة فقط
updates["documents"] = state["documents"][-50:] # آخر 50 مستند
updates["messages"] = state["messages"][-20:] # آخر 20 رسالة
print(f"⚠️ تم تقليم الحالة: {current_size:.2f}MB → {estimate_size_mb(updates):.2f}MB")
return updates
# بناء رسم بياني مع مشرف التقليم
def supervisor_node(state: ManagedState) -> dict:
"""تشغيل كل 10 تكرارات لتقليم الحالة."""
pruned = prune_state(state)
# ... منطق التوجيه ...
return pruned
الحل 2: النقل إلى التخزين الخارجي
import hashlib
import json
from typing import TypedDict, Annotated
import operator
class OffloadedState(TypedDict):
"""حالة مع حقول كبيرة منقولة."""
query: str
document_ids: Annotated[list[str], operator.add] # معرّفات، وليس مستندات كاملة
analysis: str
def store_document_external(doc: str, storage_client) -> str:
"""
تخزين مستند كبير في S3/Redis، إرجاع المعرّف.
استخدم للمستندات >10KB.
"""
doc_id = hashlib.sha256(doc.encode()).hexdigest()[:16]
storage_client.set(doc_id, doc) # Redis/S3
return doc_id
def retrieve_documents(doc_ids: list[str], storage_client) -> list[str]:
"""استرجاع المستندات عند الحاجة."""
return [storage_client.get(doc_id) for doc_id in doc_ids]
# الاستخدام
def research_node(state: OffloadedState, config: dict) -> dict:
"""تخزين المستندات الكبيرة خارجياً."""
storage = config["configurable"]["storage_client"]
large_docs = fetch_documents(state["query"]) # يعيد مستندات 100KB+
doc_ids = [store_document_external(doc, storage) for doc in large_docs]
return {"document_ids": doc_ids} # تخزين المعرّفات في الحالة (صغيرة)
def analysis_node(state: OffloadedState, config: dict) -> dict:
"""استرجاع المستندات للتحليل."""
storage = config["configurable"]["storage_client"]
docs = retrieve_documents(state["document_ids"], storage)
analysis = analyze(docs)
return {"analysis": analysis}
# الاستدعاء مع التخزين الخارجي
app.invoke(
{"query": "AI trends"},
config={"configurable": {"storage_client": redis_client}}
)
حدود الإنتاج (يناير 2026):
| حافظ نقاط التفتيش | الحجم الأقصى | التوصية |
|---|---|---|
| MemorySaver | 100MB | تقليم عند 50MB |
| SqliteSaver | 1GB | تقليم عند 100MB أو نقل |
| PostgresSaver | 1GB (قابل للتكوين) | تقليم عند 100MB، استخدم التخزين الخارجي لـ >10MB |
| RedisSaver | 512MB افتراضي | زيادة الحد أو النقل |
حالات الخطأ وبيانات الاستعادة الوصفية
النمط: لا ترفع أبداً، أعد دائماً حالة خطأ
from typing import TypedDict, Optional, Literal
class RobustState(TypedDict):
"""حالة مع تتبع الخطأ."""
query: str
documents: list[str]
analysis: Optional[str]
# حقول معالجة الأخطاء
error_message: Optional[str]
failed_node: Optional[str]
retry_count: int
max_retries: int
def fragile_node(state: RobustState) -> dict:
"""
عقدة يمكن أن تفشل.
❌ سيء: raise Exception("API failed")
✅ جيد: إرجاع حالة خطأ
"""
try:
# استدعاء API خارجي (يمكن أن يفشل)
docs = call_external_api(state["query"])
return {"documents": docs}
except Exception as e:
# إرجاع حالة خطأ بدلاً من التعطل
return {
"error_message": f"API call failed: {str(e)}",
"failed_node": "fragile_node",
"retry_count": state["retry_count"] + 1
}
def error_handler_node(state: RobustState) -> dict:
"""المشرف يتحقق من الأخطاء ويقرر إعادة المحاولة/الإنهاء."""
if state["error_message"]:
if state["retry_count"] < state["max_retries"]:
# مسح الخطأ وإعادة المحاولة
return {
"error_message": None, # مسح الخطأ
"next_action": "retry"
}
else:
# تم الوصول إلى الحد الأقصى من إعادة المحاولات، الإنهاء بأمان
return {
"next_action": "abort",
"analysis": f"Failed after {state['retry_count']} retries"
}
# لا يوجد خطأ، متابعة
return {"next_action": "continue"}
لماذا هذا مهم:
- ✅ الرسم البياني لا يتعطل أبداً، يكتمل دائماً بأمان
- ✅ تتبعات LangSmith تُظهر الأخطاء في السياق (وليس فقط تتبع المكدس)
- ✅ نقاط التفتيش يمكن أن تستأنف من حالة الخطأ
- ✅ لوحات معلومات الإنتاج يمكن أن تنبه على أنماط الخطأ
نمط الإنتاج: قاطع الدائرة
المشكلة: API خارجي معطل → 100 عقدة كلها تفشل → تضخم نقطة التفتيش.
الحل: قاطع الدائرة يوقف استدعاء الخدمة الفاشلة.
from typing import TypedDict, Literal
from datetime import datetime, timedelta
class CircuitBreakerState(TypedDict):
"""حالة مع نمط قاطع الدائرة."""
query: str
documents: list[str]
# حقول قاطع الدائرة
circuit_status: Literal["closed", "open", "half_open"]
failure_count: int
last_failure_time: Optional[str] # طابع زمني ISO
circuit_threshold: int # فتح الدائرة بعد N فشل
def call_with_circuit_breaker(state: CircuitBreakerState) -> dict:
"""تحقق من الدائرة قبل استدعاء الخدمة الخارجية."""
# الدائرة مفتوحة: تخطي الاستدعاء، إرجاع فوري
if state["circuit_status"] == "open":
# تحقق من مرور فترة التهدئة (مثل، 60 ثانية)
last_failure = datetime.fromisoformat(state["last_failure_time"])
if datetime.now() - last_failure > timedelta(seconds=60):
# حاول نصف مفتوح: اسمح باستدعاء اختبار واحد
return {**state, "circuit_status": "half_open"}
else:
# لا يزال في فترة التهدئة
return {
"error_message": "Circuit breaker open, skipping API call",
"circuit_status": "open"
}
# الدائرة مغلقة أو نصف مفتوحة: حاول الاستدعاء
try:
docs = call_external_api(state["query"])
# نجاح: إعادة تعيين الدائرة
return {
"documents": docs,
"circuit_status": "closed",
"failure_count": 0
}
except Exception as e:
# فشل: زيادة العداد
new_failure_count = state["failure_count"] + 1
if new_failure_count >= state["circuit_threshold"]:
# فتح الدائرة
return {
"error_message": f"API failed {new_failure_count} times, opening circuit",
"circuit_status": "open",
"failure_count": new_failure_count,
"last_failure_time": datetime.now().isoformat()
}
else:
# البقاء مغلقاً، زيادة العداد
return {
"error_message": str(e),
"failure_count": new_failure_count
}
فائدة الإنتاج:
بعد 3 فشل، تفتح الدائرة. العقد اللاحقة تتخطى استدعاءات API فوراً بدلاً من الانتظار لانتهاء المهلات. النظام يتدهور بأمان بدلاً من الفشل المتتالي.
البيانات الوصفية لقابلية المراقبة
from typing import TypedDict, Annotated
import operator
from datetime import datetime
class ObservableState(TypedDict):
"""حالة مع بيانات وصفية غنية لقابلية المراقبة."""
query: str
documents: Annotated[list[str], operator.add]
# البيانات الوصفية لقابلية المراقبة
workflow_id: str
started_at: str # طابع زمني ISO
last_updated_at: str
total_llm_calls: Annotated[int, operator.add]
total_tokens: Annotated[int, operator.add]
total_cost_usd: Annotated[float, operator.add]
node_execution_times: dict[str, float] # {"research": 2.3, "analysis": 5.1}
def instrumented_node(state: ObservableState) -> dict:
"""عقدة مع قياس."""
import time
start_time = time.time()
# ... قم بالعمل ...
docs = fetch_documents(state["query"])
tokens_used = len(docs) * 100
execution_time = time.time() - start_time
return {
"documents": docs,
"last_updated_at": datetime.now().isoformat(),
"total_llm_calls": 1,
"total_tokens": tokens_used,
"total_cost_usd": tokens_used * 0.00002,
"node_execution_times": {
**state["node_execution_times"],
"instrumented_node": execution_time
}
}
# بعد سير العمل: مقاييس غنية للوحات المعلومات
# total_cost_usd: $2.45
# total_tokens: 122,500
# node_execution_times: {"research": 2.3s, "analysis": 5.1s, "writer": 8.2s}
الاستخدام في الإنتاج:
- إرسال إلى LangSmith للوحات معلومات في الوقت الفعلي
- تنبيه عندما
total_cost_usd > $10(حد الميزانية) - تحليل العقد البطيئة:
node_execution_times > 30s
أسئلة المقابلة
س1: "كيف تمنع الحالة من النمو بلا حدود في سير عمل 24/7؟"
إجابة قوية:
"أستخدم ثلاث استراتيجيات: (1) التقليم الدوري في عقدة المشرف—احتفظ بآخر N مستندات/رسائل بناءً على حدود الذاكرة. (2) نقل الكائنات الكبيرة إلى التخزين الخارجي (Redis/S3) وتخزين المعرّفات فقط في الحالة. (3) تتبع حجم الحالة بـ
estimate_size_mb()وتنبيه عندما تتجاوز العتبات. على سبيل المثال، في نظام مستندات قانونية، قلّمت الحالة كل 100 مستند، مع الاحتفاظ بالحجم تحت 10MB لنقاط تفتيش PostgresSaver السريعة."
س2: "هل يجب أن ترفع العقد استثناءات أو تعيد حالات خطأ؟"
الإجابة:
"أعد حالات خطأ، وليس استثناءات. في الإنتاج، رفع الاستثناءات يعطل الرسم البياني ويفقد سياق نقطة التفتيش. بدلاً من ذلك، أعيد
{"error_message": str(e), "failed_node": "research"}وأترك عقدة المشرف تقرر ما إذا كانت ستعيد المحاولة أو تتخطى أو تنهي. هذا يحافظ على تشغيل الرسم البياني، ويحفظ سياق الخطأ في تتبعات LangSmith، ويسمح لنقاط التفتيش بالاستئناف من حالة الخطأ."
س3: "كيف تنفذ نمط قاطع الدائرة في LangGraph؟"
الإجابة:
"أضيف حقول قاطع الدائرة إلى الحالة:
circuit_status(closed/open/half_open)،failure_count،last_failure_time،circuit_threshold. قبل استدعاء API خارجي، أتحقق من أن الدائرة مفتوحة. إذا كان الأمر كذلك، أتخطى الاستدعاء وأعيد حالة خطأ فوراً. بعد N فشل متتالي (العتبة)، أفتح الدائرة وأدخل فترة تهدئة. بعد التهدئة، أحاول استدعاء اختبار واحد 'نصف مفتوح'. إذا نجح، أغلق الدائرة. هذا يمنع الفشل المتتالي عندما تكون واجهات برمجة التطبيقات معطلة."
مثال كود الإنتاج
from typing import TypedDict, Annotated, Optional, Literal
import operator
from datetime import datetime
import json
class ProductionState(TypedDict):
"""حالة جاهزة للإنتاج مع جميع الأنماط."""
# البيانات الأساسية
query: str
documents: Annotated[list[str], operator.add]
analysis: Optional[str]
# إدارة الحجم
state_size_mb: float
max_state_size_mb: float
# معالجة الأخطاء
error_message: Optional[str]
retry_count: int
max_retries: int
# قاطع الدائرة
circuit_status: Literal["closed", "open", "half_open"]
failure_count: int
circuit_threshold: int
last_failure_time: Optional[str]
# قابلية المراقبة
workflow_id: str
total_tokens: Annotated[int, operator.add]
total_cost_usd: Annotated[float, operator.add]
node_execution_times: dict[str, float]
def production_research_node(state: ProductionState) -> dict:
"""عقدة مع جميع أنماط الإنتاج."""
import time
start_time = time.time()
# تحقق من قاطع الدائرة
if state["circuit_status"] == "open":
return {"error_message": "Circuit open, skipping API call"}
try:
# استدعاء API خارجي
docs = fetch_documents(state["query"])
tokens = len(docs) * 100
# نجاح: إرجاع التحديثات
return {
"documents": docs,
"total_tokens": tokens,
"total_cost_usd": tokens * 0.00002,
"node_execution_times": {
**state["node_execution_times"],
"research": time.time() - start_time
},
"circuit_status": "closed",
"failure_count": 0
}
except Exception as e:
# فشل: تحديث حالة الخطأ
new_failure_count = state["failure_count"] + 1
circuit_status = "open" if new_failure_count >= state["circuit_threshold"] else "closed"
return {
"error_message": str(e),
"retry_count": state["retry_count"] + 1,
"failure_count": new_failure_count,
"circuit_status": circuit_status,
"last_failure_time": datetime.now().isoformat()
}
def supervisor_with_pruning(state: ProductionState) -> dict:
"""المشرف: استعادة الخطأ + إدارة الحجم."""
updates = {}
# استعادة الخطأ
if state["error_message"]:
if state["retry_count"] < state["max_retries"]:
updates["error_message"] = None # إعادة المحاولة
else:
updates["next_action"] = "abort" # الاستسلام
# إدارة الحجم
current_size = len(json.dumps(dict(state)).encode()) / (1024 * 1024)
updates["state_size_mb"] = current_size
if current_size > state["max_state_size_mb"]:
updates["documents"] = state["documents"][-50:] # تقليم
return updates
النقاط الرئيسية
✅ تحقق من الإدخال مع Pydantic عند دخول الرسم البياني (منع البيانات السيئة) ✅ قلّم الحالة دورياً في عقد المشرف (منع تضخم الذاكرة) ✅ انقل الكائنات الكبيرة إلى التخزين الخارجي (احتفظ بالحالة صغيرة) ✅ أعد حالات خطأ بدلاً من رفع الاستثناءات (تدهور أمين) ✅ نفّذ قاطع الدائرة لواجهات برمجة التطبيقات الخارجية (منع الفشل المتتالي) ✅ تتبع البيانات الوصفية لقابلية المراقبة (الرموز، التكلفة، وقت التنفيذ)
التالي: لقد أتقنت إدارة الحالة! انتقل إلى الوحدة 2 لتعلم أنماط الرسم البياني المتقدمة: الحلقات التكرارية، التفريع الديناميكي، واستعادة الخطأ. لا تنسَ إكمال اختبار الوحدة 1 لاختبار معرفتك!
:::