الاختبار ومشروع التخرج
التصحيح والتصور
تصحيح سير عمل LangGraph يتطلب تقنيات متخصصة تتجاوز التصحيح التقليدي. يغطي هذا الدرس استراتيجيات التصحيح الشاملة، أدوات التصور، تكامل LangSmith للتتبع الإنتاجي، والأنماط المتقدمة لتحديد وحل المشكلات المعقدة في أنظمة الوكلاء ذات الحالة.
تحدي تصحيح سير عمل الوكلاء
تصحيح تطبيقات LangGraph يقدم تحديات فريدة لا تستطيع أدوات التصحيح التقليدية معالجتها بالكامل.
لماذا تصحيح الوكلاء مختلف
"""
التصحيح التقليدي مقابل تصحيح سير عمل الوكلاء
التطبيق التقليدي:
- تدفق تنفيذ خطي
- استدعاءات دوال متوقعة
- مخرجات حتمية
- سهل إعادة إنتاج الأخطاء
سير عمل الوكيل:
- تنفيذ غير خطي، شرطي
- مخرجات LLM تتغير بين التشغيلات
- الحالة تتراكم عبر العقد
- ظروف سباق في التنفيذ غير المتزامن
- اعتماديات API خارجية
- مقاطعات الإنسان في الحلقة
تحديات التصحيح الرئيسية:
1. عدم الحتمية: نفس المدخلات تنتج مخرجات مختلفة
2. تعقيد الحالة: المخفضات تدمج التحديثات بطرق خفية
3. قرارات التوجيه: الحواف الشرطية قد تأخذ مسارات غير متوقعة
4. تاريخ نقاط التفتيش: تحتاج فهم تطور الحالة
5. إسناد الكمون: أي عقدة بطيئة؟
6. انتشار الأخطاء: الأخطاء قد تظهر بعيداً عن المصدر
"""
from typing import TypedDict, Annotated, Literal, Optional, Any
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
import operator
from datetime import datetime
import json
import logging
import traceback
# تكوين التسجيل الشامل للتصحيح
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s | %(levelname)s | %(name)s | %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger("langgraph.debug")
class DebugState(TypedDict):
"""حالة مع معلومات التصحيح."""
query: str
documents: Annotated[list[dict], operator.add]
analysis: Optional[str]
next_step: Literal["research", "analyze", "summarize", "done"]
iteration: int
max_iterations: int
# حقول التصحيح
_debug_trace: Annotated[list[dict], operator.add]
_node_timings: Annotated[list[dict], operator.add]
تقنيات تصور الرسم البياني
تصور هيكل الرسم البياني يساعد في تحديد مشاكل التوجيه، الحواف المفقودة، والمشاكل المعمارية قبل أن تسبب أخطاء وقت التشغيل.
توليد مخطط Mermaid المدمج
from langgraph.graph import StateGraph
from IPython.display import Image, display
import base64
def create_sample_graph():
"""إنشاء رسم بياني نموذجي لأمثلة التصور."""
class WorkflowState(TypedDict):
query: str
documents: list[dict]
analysis: Optional[str]
summary: Optional[str]
next_step: str
def supervisor(state: WorkflowState) -> dict:
if not state.get("documents"):
return {"next_step": "research"}
elif not state.get("analysis"):
return {"next_step": "analyze"}
elif not state.get("summary"):
return {"next_step": "summarize"}
return {"next_step": "done"}
def researcher(state: WorkflowState) -> dict:
return {"documents": [{"source": "web", "content": "نتائج البحث"}]}
def analyzer(state: WorkflowState) -> dict:
return {"analysis": "تحليل تفصيلي للمستندات"}
def summarizer(state: WorkflowState) -> dict:
return {"summary": "ملخص تنفيذي"}
def route_supervisor(state: WorkflowState) -> str:
return state["next_step"]
# بناء الرسم البياني
graph = StateGraph(WorkflowState)
graph.add_node("supervisor", supervisor)
graph.add_node("researcher", researcher)
graph.add_node("analyzer", analyzer)
graph.add_node("summarizer", summarizer)
graph.add_conditional_edges(
"supervisor",
route_supervisor,
{
"research": "researcher",
"analyze": "analyzer",
"summarize": "summarizer",
"done": END
}
)
graph.add_edge("researcher", "supervisor")
graph.add_edge("analyzer", "supervisor")
graph.add_edge("summarizer", "supervisor")
graph.set_entry_point("supervisor")
return graph
def visualize_graph_mermaid(graph: StateGraph, title: str = "سير العمل"):
"""
توليد مخطط Mermaid من الرسم البياني.
صيغة Mermaid مدعومة على نطاق واسع في:
- ملفات README على GitHub
- مستندات Notion
- دفاتر Jupyter
- مواقع التوثيق
"""
app = graph.compile()
# الحصول على مخطط Mermaid كنص
mermaid_code = app.get_graph().draw_mermaid()
print(f"=== مخطط Mermaid لـ {title} ===")
print(mermaid_code)
print()
return mermaid_code
def visualize_graph_png(graph: StateGraph, output_path: str = "graph.png"):
"""
تصدير الرسم البياني كصورة PNG.
يتطلب تثبيت graphviz:
- macOS: brew install graphviz
- Ubuntu: apt-get install graphviz
- Windows: choco install graphviz
"""
app = graph.compile()
try:
# الحصول على بايتات PNG
png_bytes = app.get_graph().draw_mermaid_png()
# حفظ في ملف
with open(output_path, "wb") as f:
f.write(png_bytes)
print(f"تم حفظ الرسم البياني في {output_path}")
return png_bytes
except Exception as e:
print(f"فشل توليد PNG: {e}")
print("العودة إلى مخرجات نص Mermaid")
return visualize_graph_mermaid(graph)
def visualize_graph_ascii(graph: StateGraph):
"""
توليد تمثيل ASCII للرسم البياني.
مفيد لبيئات الطرفية/CLI.
"""
app = graph.compile()
graph_repr = app.get_graph()
print("=== هيكل الرسم البياني (ASCII) ===")
print()
# الحصول على العقد
nodes = list(graph_repr.nodes.keys())
print(f"العقد ({len(nodes)}):")
for node in nodes:
print(f" - {node}")
print()
# الحصول على الحواف
edges = graph_repr.edges
print(f"الحواف ({len(edges)}):")
for edge in edges:
if hasattr(edge, 'source') and hasattr(edge, 'target'):
print(f" {edge.source} --> {edge.target}")
else:
print(f" {edge}")
print()
return {"nodes": nodes, "edges": edges}
# مثال الاستخدام
graph = create_sample_graph()
visualize_graph_mermaid(graph, "سير عمل البحث")
visualize_graph_ascii(graph)
تصور الرسوم الفرعية والتسلسلات المعقدة
def visualize_hierarchical_graph(main_graph, subgraphs: dict):
"""
تصور رسم بياني هرمي مع رسوم فرعية.
Args:
main_graph: الرسم البياني الأب
subgraphs: قاموس {الاسم: الرسم الفرعي}
"""
print("=== هيكل الرسم البياني الهرمي ===")
print()
# الرسم البياني الرئيسي
print("الرسم البياني الرئيسي:")
main_app = main_graph.compile()
print(main_app.get_graph().draw_mermaid())
print()
# كل رسم فرعي
for name, subgraph in subgraphs.items():
print(f"الرسم الفرعي: {name}")
sub_app = subgraph.compile()
print(sub_app.get_graph().draw_mermaid())
print()
def create_graph_documentation(graph: StateGraph, output_file: str):
"""
توليد توثيق markdown للرسم البياني.
مفيد لتوثيق المشروع.
"""
app = graph.compile()
graph_repr = app.get_graph()
doc = []
doc.append("# توثيق الرسم البياني لسير العمل")
doc.append("")
doc.append("## المخطط المرئي")
doc.append("")
doc.append("```mermaid")
doc.append(app.get_graph().draw_mermaid().replace("```mermaid\n", "").replace("\n```", ""))
doc.append("```")
doc.append("")
doc.append("## العقد")
doc.append("")
doc.append("| العقدة | الوصف |")
doc.append("|--------|-------|")
for node in graph_repr.nodes.keys():
doc.append(f"| `{node}` | TODO: أضف الوصف |")
doc.append("")
doc.append("## الحواف")
doc.append("")
doc.append("| من | إلى | الشرط |")
doc.append("|----|-----|-------|")
for edge in graph_repr.edges:
if hasattr(edge, 'source') and hasattr(edge, 'target'):
condition = getattr(edge, 'data', '-')
doc.append(f"| `{edge.source}` | `{edge.target}` | {condition} |")
doc.append("")
doc.append(f"*تم التوليد: {datetime.now().isoformat()}*")
content = "\n".join(doc)
with open(output_file, "w") as f:
f.write(content)
print(f"تم حفظ التوثيق في {output_file}")
return content
تصحيح التنفيذ خطوة بخطوة
فهم ما يحدث بالضبط في كل خطوة من التنفيذ أمر حاسم لتصحيح سير العمل المعقدة.
البث مع وضع التحديثات
def debug_execution_stream(app, input_state: dict, config: dict):
"""
بث التنفيذ مع مخرجات تصحيح تفصيلية.
وضع البث 'updates' يُظهر بالضبط ما تُرجعه
كل عقدة كتحديثات حالة.
"""
print("=" * 60)
print("تنفيذ التصحيح - وضع التحديثات")
print("=" * 60)
print()
print(f"حالة المدخلات: {json.dumps(input_state, indent=2, default=str)}")
print()
print("-" * 60)
step = 0
node_timings = []
for event in app.stream(input_state, config, stream_mode="updates"):
step += 1
timestamp = datetime.now()
# استخراج اسم العقدة والتحديثات
node_name = list(event.keys())[0]
state_update = event[node_name]
# تسجيل التوقيت
timing = {
"step": step,
"node": node_name,
"timestamp": timestamp.isoformat()
}
node_timings.append(timing)
# طباعة معلومات التصحيح
print(f"الخطوة {step}: {node_name}")
print(f" الطابع الزمني: {timestamp.strftime('%H:%M:%S.%f')}")
print(f" مفاتيح التحديث: {list(state_update.keys())}")
# طباعة منسقة للتحديث
for key, value in state_update.items():
if isinstance(value, str) and len(value) > 100:
print(f" {key}: {value[:100]}... (مقتطع)")
elif isinstance(value, list) and len(value) > 3:
print(f" {key}: [{len(value)} عنصر]")
else:
print(f" {key}: {value}")
print("-" * 60)
# طباعة ملخص التوقيت
print()
print("ملخص التنفيذ:")
print(f" إجمالي الخطوات: {step}")
if len(node_timings) >= 2:
start = datetime.fromisoformat(node_timings[0]["timestamp"])
end = datetime.fromisoformat(node_timings[-1]["timestamp"])
duration = (end - start).total_seconds()
print(f" المدة الإجمالية: {duration:.3f}ث")
print()
return node_timings
def debug_execution_values(app, input_state: dict, config: dict):
"""
بث التنفيذ مع الحالة الكاملة بعد كل عقدة.
وضع البث 'values' يُظهر الحالة المتراكمة
الكاملة بعد تنفيذ كل عقدة.
"""
print("=" * 60)
print("تنفيذ التصحيح - وضع القيم")
print("=" * 60)
print()
step = 0
for state in app.stream(input_state, config, stream_mode="values"):
step += 1
print(f"الحالة بعد الخطوة {step}:")
for key, value in state.items():
if key.startswith("_"):
continue # تخطي الحقول الداخلية
if isinstance(value, str) and len(value) > 200:
print(f" {key}: {value[:200]}...")
elif isinstance(value, list):
print(f" {key}: [{len(value)} عنصر]")
elif isinstance(value, dict):
print(f" {key}: {{...}}")
else:
print(f" {key}: {value}")
print("-" * 60)
return step
def compare_stream_modes(app, input_state: dict, config: dict):
"""
إظهار الفرق بين أوضاع البث.
أوضاع البث في LangGraph 1.0.5:
- "values": الحالة الكاملة بعد كل عقدة
- "updates": فقط التحديثات من كل عقدة
- "messages": لتطبيقات الدردشة
- "events": أحداث منخفضة المستوى للتصحيح المتقدم
"""
print("الوضع: updates")
print("-" * 40)
updates_result = []
for event in app.stream(input_state, config, stream_mode="updates"):
updates_result.append(event)
print(event)
print()
# إعادة تعيين الحالة للمقارنة النظيفة
new_config = {"configurable": {"thread_id": f"compare-{datetime.now().timestamp()}"}}
print("الوضع: values")
print("-" * 40)
values_result = []
for state in app.stream(input_state, new_config, stream_mode="values"):
values_result.append(state)
print(f"المفاتيح: {list(state.keys())}")
print()
return {
"updates_count": len(updates_result),
"values_count": len(values_result)
}
التصحيح التفاعلي مع نقاط التوقف
class DebugBreakpoint:
"""
مساعد تصحيح يوقف التنفيذ عند عقد محددة.
مفيد لجلسات التصحيح التفاعلية.
"""
def __init__(self, break_at_nodes: list[str] = None, break_on_condition=None):
self.break_at_nodes = break_at_nodes or []
self.break_on_condition = break_on_condition
self.execution_log = []
self.paused = False
def wrap_node(self, node_func, node_name: str):
"""تغليف دالة عقدة بمنطق نقطة التوقف."""
def wrapped(state: dict) -> dict:
# تسجيل الدخول
entry = {
"node": node_name,
"timestamp": datetime.now().isoformat(),
"input_state": {k: v for k, v in state.items() if not k.startswith("_")}
}
# التحقق مما إذا كان يجب التوقف
should_break = node_name in self.break_at_nodes
if self.break_on_condition and self.break_on_condition(state, node_name):
should_break = True
if should_break:
print()
print("=" * 50)
print(f"نقطة توقف: {node_name}")
print("=" * 50)
print(f"الحالة الحالية:")
for k, v in state.items():
if not k.startswith("_"):
print(f" {k}: {v}")
print()
print("الأوامر: [c]استمرار، [s]حالة، [q]خروج")
while True:
cmd = input("> ").strip().lower()
if cmd == 'c':
break
elif cmd == 's':
print(json.dumps(state, indent=2, default=str))
elif cmd == 'q':
raise KeyboardInterrupt("طُلب خروج التصحيح")
else:
print("أمر غير معروف")
# تنفيذ العقدة
result = node_func(state)
# تسجيل الخروج
entry["output"] = result
self.execution_log.append(entry)
return result
return wrapped
def get_log(self) -> list[dict]:
"""الحصول على سجل التنفيذ."""
return self.execution_log.copy()
def print_log(self):
"""طباعة سجل التنفيذ المنسق."""
print("=== سجل التنفيذ ===")
for i, entry in enumerate(self.execution_log):
print(f"\n{i+1}. {entry['node']} @ {entry['timestamp']}")
print(f" مفاتيح المخرجات: {list(entry['output'].keys())}")
فحص تاريخ الحالة
فهم كيف تتطور الحالة خلال التنفيذ أمر ضروري لتصحيح المشكلات المتعلقة بالحالة.
تحليل تاريخ الحالة الشامل
def inspect_full_state_history(app, config: dict):
"""
فحص تاريخ الحالة الكامل من نقاط التفتيش.
هذا يُظهر كل حالة محفوظة، مما يسمح لك بـ:
- تتبع كيف تطورت الحالة
- إيجاد أين أصبحت الحالة غير صحيحة
- تحديد أي عقدة سببت المشكلة
"""
print("=" * 60)
print("فحص تاريخ الحالة")
print("=" * 60)
print()
history = list(app.get_state_history(config))
print(f"إجمالي نقاط التفتيش: {len(history)}")
print()
for i, state_snapshot in enumerate(reversed(history)):
checkpoint_id = state_snapshot.config.get("configurable", {}).get("checkpoint_id", "غير معروف")
print(f"نقطة التفتيش {i + 1}: {checkpoint_id[:20]}...")
print(f" عقدة المصدر: {state_snapshot.metadata.get('source', 'غير معروف')}")
print(f" الخطوة: {state_snapshot.metadata.get('step', 'غير معروف')}")
print(f" تاريخ الإنشاء: {state_snapshot.metadata.get('created_at', 'غير معروف')}")
# عرض قيم الحالة
print(" قيم الحالة:")
for key, value in state_snapshot.values.items():
if key.startswith("_"):
continue
value_preview = str(value)
if len(value_preview) > 80:
value_preview = value_preview[:80] + "..."
print(f" {key}: {value_preview}")
# عرض العقد التالية
if hasattr(state_snapshot, 'next') and state_snapshot.next:
print(f" العقد التالية: {state_snapshot.next}")
print("-" * 40)
return history
def compare_checkpoints(app, config: dict, checkpoint_a: int, checkpoint_b: int):
"""
مقارنة نقطتي تفتيش لمعرفة ما تغير.
مفيد لإيجاد بالضبط أي عقدة غيرت أي حقول.
"""
history = list(app.get_state_history(config))
history.reverse() # ترتيب زمني
if checkpoint_a >= len(history) or checkpoint_b >= len(history):
print("فهارس نقاط التفتيش غير صالحة")
return None
state_a = history[checkpoint_a].values
state_b = history[checkpoint_b].values
print(f"مقارنة نقطة التفتيش {checkpoint_a} مع نقطة التفتيش {checkpoint_b}")
print("=" * 60)
# إيجاد الاختلافات
all_keys = set(state_a.keys()) | set(state_b.keys())
changes = {
"added": [],
"removed": [],
"modified": []
}
for key in all_keys:
if key.startswith("_"):
continue
in_a = key in state_a
in_b = key in state_b
if in_a and not in_b:
changes["removed"].append(key)
elif in_b and not in_a:
changes["added"].append(key)
elif state_a.get(key) != state_b.get(key):
changes["modified"].append({
"key": key,
"before": state_a.get(key),
"after": state_b.get(key)
})
# طباعة التغييرات
if changes["added"]:
print("\nمُضاف:")
for key in changes["added"]:
print(f" + {key}: {state_b[key]}")
if changes["removed"]:
print("\nمُحذوف:")
for key in changes["removed"]:
print(f" - {key}: {state_a[key]}")
if changes["modified"]:
print("\nمُعدَّل:")
for change in changes["modified"]:
print(f" {change['key']}:")
print(f" قبل: {change['before']}")
print(f" بعد: {change['after']}")
if not any(changes.values()):
print("\nلم يتم العثور على اختلافات")
return changes
def find_state_issue(app, config: dict, predicate) -> Optional[dict]:
"""
إيجاد أول نقطة تفتيش حيث يصبح الشرط صحيحاً.
مثال الاستخدام:
find_state_issue(app, config, lambda s: s.get("error") is not None)
find_state_issue(app, config, lambda s: len(s.get("documents", [])) > 10)
"""
history = list(app.get_state_history(config))
history.reverse() # ترتيب زمني
for i, state_snapshot in enumerate(history):
if predicate(state_snapshot.values):
print(f"تطابق الشرط عند نقطة التفتيش {i}")
print(f" العقدة: {state_snapshot.metadata.get('source')}")
print(f" الخطوة: {state_snapshot.metadata.get('step')}")
return {
"checkpoint_index": i,
"metadata": state_snapshot.metadata,
"values": state_snapshot.values
}
print("الشرط لم يتطابق أبداً في تاريخ الحالة")
return None
تصحيح السفر عبر الزمن
def replay_from_checkpoint(app, config: dict, checkpoint_index: int, new_input: dict = None):
"""
إعادة تشغيل التنفيذ من نقطة تفتيش محددة.
هذا قوي لـ:
- اختبار الإصلاحات دون إعادة تشغيل سير العمل بالكامل
- استكشاف مسارات تنفيذ بديلة
- تصحيح المشكلات المتقطعة
"""
history = list(app.get_state_history(config))
history.reverse()
if checkpoint_index >= len(history):
raise ValueError(f"نقطة التفتيش {checkpoint_index} غير موجودة")
target_checkpoint = history[checkpoint_index]
checkpoint_config = target_checkpoint.config
print(f"إعادة التشغيل من نقطة التفتيش {checkpoint_index}")
print(f" العقدة: {target_checkpoint.metadata.get('source')}")
print(f" الحالة: {list(target_checkpoint.values.keys())}")
print()
# تحديث الحالة إذا تم توفير new_input
if new_input:
print(f"تطبيق التعديلات: {new_input}")
app.update_state(checkpoint_config, new_input)
# استئناف التنفيذ
result = app.invoke(None, checkpoint_config)
print("اكتملت إعادة التشغيل")
return result
def fork_execution(app, original_config: dict, checkpoint_index: int, modifications: dict):
"""
تفرع التنفيذ من نقطة تفتيش مع تعديلات.
ينشئ خيطاً جديداً ينحرف عن الأصلي،
مما يسمح لك باختبار سيناريوهات "ماذا لو".
"""
history = list(app.get_state_history(original_config))
history.reverse()
if checkpoint_index >= len(history):
raise ValueError(f"نقطة التفتيش {checkpoint_index} غير موجودة")
target_state = history[checkpoint_index]
# إنشاء معرف خيط جديد للتفرع
fork_thread_id = f"fork-{datetime.now().timestamp()}"
fork_config = {"configurable": {"thread_id": fork_thread_id}}
# تهيئة الحالة المتفرعة
forked_state = {**target_state.values, **modifications}
print(f"التفرع إلى الخيط: {fork_thread_id}")
print(f"التعديلات: {modifications}")
# تشغيل التنفيذ المتفرع
result = app.invoke(forked_state, fork_config)
return {
"fork_thread_id": fork_thread_id,
"result": result
}
كشف الحلقات اللانهائية ومنعها
الحلقات اللانهائية مشكلة شائعة في سير عمل الوكلاء. إليك استراتيجيات شاملة للكشف والمنع.
أنماط كشف الحلقات
import hashlib
def create_state_hash(state: dict) -> str:
"""
إنشاء تجزئة حتمية للحالة لكشف الحلقات.
يستثني:
- الحقول الداخلية (التي تبدأ بـ _)
- الطوابع الزمنية والعدادات التي تتغير طبيعياً
"""
hashable_state = {}
for key, value in state.items():
if key.startswith("_"):
continue
if key in ["iteration", "timestamp", "step"]:
continue
# التحويل إلى تمثيل قابل للتجزئة
if isinstance(value, list):
hashable_state[key] = tuple(
json.dumps(v, sort_keys=True, default=str) if isinstance(v, dict) else v
for v in value
)
elif isinstance(value, dict):
hashable_state[key] = json.dumps(value, sort_keys=True, default=str)
else:
hashable_state[key] = value
state_str = json.dumps(hashable_state, sort_keys=True, default=str)
return hashlib.sha256(state_str.encode()).hexdigest()[:16]
class LoopDetector:
"""
كشف الحلقات اللانهائية في تنفيذ الرسم البياني.
الاستراتيجيات:
1. تجزئة الحالة: نفس الحالة مرتين تشير إلى حلقة
2. تسلسل العقد: نفس تسلسل العقد يتكرر
3. عد التكرارات: تجاوز الحد الأقصى للتكرارات
"""
def __init__(self, max_iterations: int = 50, max_same_state: int = 2):
self.max_iterations = max_iterations
self.max_same_state = max_same_state
self.state_hashes = []
self.node_sequence = []
self.iteration = 0
def check(self, state: dict, node_name: str) -> tuple[bool, str]:
"""
التحقق من شرط الحلقة.
يُرجع: (هل_حلقة، السبب)
"""
self.iteration += 1
self.node_sequence.append(node_name)
# التحقق من حد التكرار
if self.iteration > self.max_iterations:
return True, f"تم تجاوز الحد الأقصى للتكرارات ({self.max_iterations})"
# التحقق من تجزئة الحالة
state_hash = create_state_hash(state)
hash_count = self.state_hashes.count(state_hash)
if hash_count >= self.max_same_state:
return True, f"نفس الحالة شوهدت {hash_count + 1} مرات"
self.state_hashes.append(state_hash)
# التحقق من تسلسل العقد المتكرر
if len(self.node_sequence) >= 6:
last_3 = tuple(self.node_sequence[-3:])
prev_3 = tuple(self.node_sequence[-6:-3])
if last_3 == prev_3:
return True, f"تسلسل عقد متكرر: {last_3}"
return False, "موافق"
def get_report(self) -> dict:
"""الحصول على تقرير التصحيح."""
return {
"iterations": self.iteration,
"unique_states": len(set(self.state_hashes)),
"node_sequence": self.node_sequence,
"state_hash_history": self.state_hashes[-10:] # آخر 10
}
def create_loop_safe_node(node_func, loop_detector: LoopDetector, node_name: str):
"""
تغليف عقدة بكشف الحلقات.
"""
def wrapped(state: dict) -> dict:
is_loop, reason = loop_detector.check(state, node_name)
if is_loop:
logger.error(f"تم كشف حلقة في {node_name}: {reason}")
logger.error(f"التقرير: {loop_detector.get_report()}")
# إرجاع حالة توجه إلى END
return {
"_loop_detected": True,
"_loop_reason": reason,
"next_step": "done" # عدّل بناءً على حقل التوجيه لديك
}
return node_func(state)
return wrapped
def add_iteration_guard(graph: StateGraph, max_iterations: int = 50):
"""
إضافة حارس تكرار إلى رسم بياني.
يعدل التوجيه للتحقق من عدد التكرارات
والتوجيه إلى END إذا تم تجاوزه.
"""
def iteration_check(state: dict) -> str:
"""عقدة حارس تتحقق من عدد التكرارات."""
iteration = state.get("iteration", 0)
if iteration >= max_iterations:
logger.warning(f"تم الوصول لحد التكرار: {iteration}")
return "done"
return state.get("next_step", "done")
# إضافة كأول فحص توجيه
graph.add_node("_iteration_guard", lambda s: {"iteration": s.get("iteration", 0) + 1})
print(f"تمت إضافة حارس التكرار بحد أقصى={max_iterations}")
return graph
تصور مشاكل الحلقات
def visualize_execution_path(app, config: dict):
"""
تصور مسار التنفيذ الفعلي المتخذ.
يساعد في تحديد:
- الحلقات (نفس العقد تظهر بشكل متكرر)
- النهايات الميتة (المسارات التي لا تصل إلى END)
- المسارات غير المتوقعة
"""
history = list(app.get_state_history(config))
history.reverse()
path = []
for state in history:
node = state.metadata.get("source", "غير معروف")
path.append(node)
print("=== مسار التنفيذ ===")
print()
# طباعة كمخطط تدفق
for i, node in enumerate(path):
indent = " " if i > 0 else ""
connector = "└─> " if i > 0 else ""
print(f"{indent}{connector}{node}")
print()
# كشف التسلسلات المتكررة
node_counts = {}
for node in path:
node_counts[node] = node_counts.get(node, 0) + 1
repeated = {n: c for n, c in node_counts.items() if c > 1}
if repeated:
print("العقد المتكررة:")
for node, count in repeated.items():
print(f" {node}: {count} مرات")
return path
def create_execution_timeline(app, config: dict):
"""
إنشاء تصور جدول زمني للتنفيذ.
"""
history = list(app.get_state_history(config))
history.reverse()
print("=== الجدول الزمني للتنفيذ ===")
print()
for i, state in enumerate(history):
node = state.metadata.get("source", "غير معروف")
step = state.metadata.get("step", i)
# إنشاء جدول زمني ASCII
marker = "●" if node != "__start__" else "○"
line = "│" if i < len(history) - 1 else " "
print(f" {marker} الخطوة {step}: {node}")
print(f" {line}")
print()
تكامل LangSmith للتصحيح الإنتاجي
يوفر LangSmith تتبعاً وتصحيحاً شاملاً لتطبيقات LangGraph الإنتاجية.
إعداد تتبع LangSmith
import os
from langsmith import traceable
from langsmith.run_trees import RunTree
# تكوين LangSmith
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "مفتاح-api-الخاص-بك"
os.environ["LANGCHAIN_PROJECT"] = "langgraph-debugging"
# اختياري: تمكين التسجيل المفصل
os.environ["LANGCHAIN_VERBOSE"] = "true"
@traceable(name="research_node", run_type="chain")
def traced_research_node(state: dict) -> dict:
"""
عقدة مع تتبع LangSmith.
مزين @traceable يقوم تلقائياً بـ:
- تسجيل المدخلات/المخرجات
- تتبع وقت التنفيذ
- التقاط الأخطاء
- الربط بالآثار الأب
"""
# منطق العقدة هنا
return {"documents": [{"source": "traced", "content": "نتائج البحث"}]}
@traceable(name="analysis_node", run_type="chain", tags=["critical"])
def traced_analysis_node(state: dict) -> dict:
"""
عقدة مع وسوم مخصصة للتصفية.
الوسوم تساعدك في تصفية الآثار في واجهة LangSmith:
- "critical": عقد عالية الأولوية
- "llm-call": عقد تستدعي LLMs
- "external-api": عقد مع اعتماديات خارجية
"""
return {"analysis": "مخرجات تحليل متتبعة"}
class LangSmithDebugger:
"""
فئة مساعدة لعمليات تصحيح LangSmith.
"""
def __init__(self, project_name: str = "langgraph-debug"):
self.project_name = project_name
os.environ["LANGCHAIN_PROJECT"] = project_name
@traceable(name="debug_session", run_type="chain")
def run_with_tracing(self, app, input_state: dict, config: dict):
"""
تشغيل الرسم البياني مع تتبع شامل.
"""
result = app.invoke(input_state, config)
return result
@traceable(name="debug_stream", run_type="chain")
def stream_with_tracing(self, app, input_state: dict, config: dict):
"""
بث الرسم البياني مع تتبع لكل خطوة.
"""
results = []
for event in app.stream(input_state, config, stream_mode="updates"):
results.append(event)
return results
# مثال: التتبع مع بيانات وصفية مخصصة
@traceable(
name="supervised_task",
run_type="chain",
metadata={"version": "1.0", "environment": "development"}
)
def traced_supervisor(state: dict) -> dict:
"""
مشرف ببيانات وصفية غنية.
البيانات الوصفية تظهر في واجهة LangSmith وتساعد في:
- التصفية حسب الإصدار
- مقارنة البيئات
- تتبع عمليات النشر
"""
if not state.get("documents"):
return {"next_step": "research"}
return {"next_step": "done"}
التصحيح الإنتاجي مع LangSmith
from langsmith import Client
from datetime import datetime, timedelta
class ProductionDebugger:
"""
تصحيح تطبيقات LangGraph الإنتاجية باستخدام LangSmith.
"""
def __init__(self, api_key: str = None):
self.client = Client(api_key=api_key)
def find_errors(self, project_name: str, hours: int = 24):
"""
إيجاد جميع آثار الأخطاء في آخر N ساعة.
"""
start_time = datetime.now() - timedelta(hours=hours)
runs = self.client.list_runs(
project_name=project_name,
error=True,
start_time=start_time
)
errors = []
for run in runs:
errors.append({
"run_id": str(run.id),
"name": run.name,
"error": run.error,
"start_time": run.start_time,
"inputs": run.inputs,
"trace_id": str(run.trace_id)
})
print(f"تم العثور على {len(errors)} أخطاء في آخر {hours} ساعة")
return errors
def analyze_slow_runs(self, project_name: str, threshold_seconds: float = 30.0):
"""
إيجاد التشغيلات التي تجاوزت عتبة الكمون.
"""
runs = self.client.list_runs(
project_name=project_name,
start_time=datetime.now() - timedelta(hours=24)
)
slow_runs = []
for run in runs:
if run.end_time and run.start_time:
duration = (run.end_time - run.start_time).total_seconds()
if duration > threshold_seconds:
slow_runs.append({
"run_id": str(run.id),
"name": run.name,
"duration_seconds": duration,
"trace_id": str(run.trace_id)
})
# ترتيب حسب المدة
slow_runs.sort(key=lambda x: x["duration_seconds"], reverse=True)
print(f"تم العثور على {len(slow_runs)} تشغيلات بطيئة (>{threshold_seconds}ث)")
return slow_runs
def get_run_details(self, run_id: str):
"""
الحصول على معلومات تفصيلية حول تشغيل محدد.
"""
run = self.client.read_run(run_id)
return {
"id": str(run.id),
"name": run.name,
"status": run.status,
"error": run.error,
"inputs": run.inputs,
"outputs": run.outputs,
"start_time": run.start_time,
"end_time": run.end_time,
"latency_ms": run.latency_ms,
"token_usage": run.total_tokens,
"feedback": list(self.client.list_feedback(run_ids=[run_id]))
}
def compare_runs(self, run_id_a: str, run_id_b: str):
"""
مقارنة تشغيلين لإيجاد الاختلافات.
"""
run_a = self.get_run_details(run_id_a)
run_b = self.get_run_details(run_id_b)
comparison = {
"latency_diff_ms": (run_a.get("latency_ms", 0) or 0) - (run_b.get("latency_ms", 0) or 0),
"token_diff": (run_a.get("token_usage", 0) or 0) - (run_b.get("token_usage", 0) or 0),
"status_a": run_a.get("status"),
"status_b": run_b.get("status"),
"error_a": run_a.get("error"),
"error_b": run_b.get("error")
}
return comparison
أنماط التصحيح المتقدمة
تتبع تنفيذ العقد
from functools import wraps
from typing import Callable
import time
def trace_node(name: str = None, log_state: bool = True):
"""
مزين للتتبع الشامل للعقد.
الاستخدام:
@trace_node("research")
def research_node(state: dict) -> dict:
...
"""
def decorator(func: Callable):
node_name = name or func.__name__
@wraps(func)
def wrapper(state: dict) -> dict:
start_time = time.time()
# تسجيل الدخول
logger.debug(f"[{node_name}] دخول")
if log_state:
logger.debug(f"[{node_name}] المدخلات: {_truncate_state(state)}")
try:
result = func(state)
# تسجيل النجاح
elapsed = time.time() - start_time
logger.debug(f"[{node_name}] خروج ({elapsed:.3f}ث)")
if log_state:
logger.debug(f"[{node_name}] المخرجات: {_truncate_state(result)}")
return result
except Exception as e:
# تسجيل الخطأ
elapsed = time.time() - start_time
logger.error(f"[{node_name}] خطأ ({elapsed:.3f}ث): {e}")
logger.error(f"[{node_name}] التتبع:\n{traceback.format_exc()}")
raise
return wrapper
return decorator
def _truncate_state(state: dict, max_len: int = 200) -> str:
"""اقتطاع الحالة للتسجيل."""
result = {}
for key, value in state.items():
if key.startswith("_"):
continue
str_value = str(value)
if len(str_value) > max_len:
result[key] = str_value[:max_len] + "..."
else:
result[key] = value
return str(result)
# مثال الاستخدام
@trace_node("supervisor")
def traced_supervisor_node(state: dict) -> dict:
if not state.get("documents"):
return {"next_step": "research"}
return {"next_step": "done"}
@trace_node("researcher", log_state=True)
def traced_researcher_node(state: dict) -> dict:
# عمل محاكى
time.sleep(0.1)
return {"documents": [{"content": "نتائج البحث"}]}
التصحيح القائم على التأكيدات
class StateAssertion:
"""
إضافة تأكيدات وقت التشغيل للتحقق من الحالة.
يلتقط المشكلات مبكراً قبل انتشارها.
"""
def __init__(self):
self.failures = []
def assert_field(self, state: dict, field: str, predicate: Callable, message: str = None):
"""التأكيد على شرط في حقل حالة."""
value = state.get(field)
if not predicate(value):
failure = {
"field": field,
"value": value,
"message": message or f"فشل التأكيد لـ {field}"
}
self.failures.append(failure)
logger.error(f"فشل التأكيد: {failure}")
return False
return True
def assert_not_none(self, state: dict, field: str):
"""التأكيد أن الحقل ليس None."""
return self.assert_field(
state, field,
lambda v: v is not None,
f"{field} يجب ألا يكون None"
)
def assert_non_empty_list(self, state: dict, field: str):
"""التأكيد أن الحقل قائمة غير فارغة."""
return self.assert_field(
state, field,
lambda v: isinstance(v, list) and len(v) > 0,
f"{field} يجب أن يكون قائمة غير فارغة"
)
def assert_type(self, state: dict, field: str, expected_type: type):
"""التأكيد أن الحقل من النوع المتوقع."""
return self.assert_field(
state, field,
lambda v: isinstance(v, expected_type),
f"{field} يجب أن يكون من نوع {expected_type.__name__}"
)
def get_failures(self) -> list[dict]:
"""الحصول على جميع فشلات التأكيد."""
return self.failures.copy()
def clear(self):
"""مسح الفشلات."""
self.failures = []
def create_validated_node(node_func, input_assertions: list = None, output_assertions: list = None):
"""
تغليف عقدة بالتحقق من المدخلات/المخرجات.
Args:
node_func: دالة العقدة الأصلية
input_assertions: قائمة من (الحقل، الشرط، الرسالة) tuples
output_assertions: قائمة من (الحقل، الشرط، الرسالة) tuples
"""
validator = StateAssertion()
def wrapped(state: dict) -> dict:
# التحقق من المدخلات
if input_assertions:
for field, predicate, message in input_assertions:
validator.assert_field(state, field, predicate, message)
if validator.failures:
logger.error(f"فشل التحقق من المدخلات: {validator.get_failures()}")
validator.clear()
# تنفيذ العقدة
result = node_func(state)
# التحقق من المخرجات
if output_assertions:
for field, predicate, message in output_assertions:
validator.assert_field(result, field, predicate, message)
if validator.failures:
logger.error(f"فشل التحقق من المخرجات: {validator.get_failures()}")
validator.clear()
return result
return wrapped
# مثال الاستخدام
validated_analyzer = create_validated_node(
lambda s: {"analysis": "النتيجة"},
input_assertions=[
("documents", lambda v: v and len(v) > 0, "تحتاج مستندات للتحليل")
],
output_assertions=[
("analysis", lambda v: v and len(v) > 50, "التحليل قصير جداً")
]
)
تصحيح استعادة الأخطاء
class ErrorRecoveryDebugger:
"""
تصحيح سلوك استعادة الأخطاء في تطبيقات LangGraph.
"""
def __init__(self):
self.error_log = []
self.recovery_log = []
def log_error(self, node: str, error: Exception, state: dict):
"""تسجيل حدوث خطأ."""
entry = {
"timestamp": datetime.now().isoformat(),
"node": node,
"error_type": type(error).__name__,
"error_message": str(error),
"state_snapshot": {k: v for k, v in state.items() if not k.startswith("_")}
}
self.error_log.append(entry)
logger.error(f"خطأ في {node}: {error}")
def log_recovery(self, node: str, strategy: str, success: bool):
"""تسجيل محاولة استعادة."""
entry = {
"timestamp": datetime.now().isoformat(),
"node": node,
"strategy": strategy,
"success": success
}
self.recovery_log.append(entry)
if success:
logger.info(f"نجحت الاستعادة في {node} باستخدام {strategy}")
else:
logger.warning(f"فشلت الاستعادة في {node} باستخدام {strategy}")
def create_recoverable_node(self, node_func, node_name: str,
recovery_strategies: list = None):
"""
تغليف عقدة باستعادة الأخطاء والتسجيل.
"""
strategies = recovery_strategies or [
("retry_once", lambda s: s),
("fallback_value", lambda s: {"_fallback": True})
]
def wrapped(state: dict) -> dict:
try:
return node_func(state)
except Exception as e:
self.log_error(node_name, e, state)
# تجربة استراتيجيات الاستعادة
for strategy_name, strategy_func in strategies:
try:
result = strategy_func(state)
self.log_recovery(node_name, strategy_name, True)
return result
except Exception as recovery_error:
self.log_recovery(node_name, strategy_name, False)
continue
# فشلت جميع الاستراتيجيات
raise RuntimeError(f"فشلت جميع استراتيجيات الاستعادة لـ {node_name}") from e
return wrapped
def get_error_summary(self) -> dict:
"""الحصول على ملخص الأخطاء ومحاولات الاستعادة."""
return {
"total_errors": len(self.error_log),
"errors_by_node": self._count_by_field(self.error_log, "node"),
"errors_by_type": self._count_by_field(self.error_log, "error_type"),
"recovery_attempts": len(self.recovery_log),
"recovery_success_rate": self._calculate_success_rate()
}
def _count_by_field(self, log: list, field: str) -> dict:
counts = {}
for entry in log:
value = entry.get(field, "غير معروف")
counts[value] = counts.get(value, 0) + 1
return counts
def _calculate_success_rate(self) -> float:
if not self.recovery_log:
return 0.0
successes = sum(1 for r in self.recovery_log if r["success"])
return successes / len(self.recovery_log)
أسئلة المقابلة
س1: كيف تصحح سير عمل LangGraph؟
الإجابة:
"أستخدم نهج تصحيح متعدد الطبقات لسير عمل LangGraph:
1. التصور أولاً
# ابدأ دائماً بتصور هيكل الرسم البياني
app = graph.compile()
print(app.get_graph().draw_mermaid())
هذا يكشف المشاكل الهيكلية مثل الحواف المفقودة أو التوجيه غير الصحيح.
2. البث خطوة بخطوة
# استخدم وضع 'updates' لرؤية بالضبط ما تُرجعه كل عقدة
for event in app.stream(input_state, config, stream_mode='updates'):
node = list(event.keys())[0]
print(f'{node}: {event[node]}')
هذا يُظهر تغييرات الحالة الفعلية في كل خطوة.
3. فحص تاريخ الحالة
# مراجعة تاريخ نقاط التفتيش الكامل
for state in app.get_state_history(config):
print(f"العقدة: {state.metadata.get('source')}")
print(f"الحالة: {state.values}")
هذا يساعد في تتبع كيف تطورت الحالة وتحديد أين بدأت المشكلات.
4. LangSmith للإنتاج
- تمكين التتبع بـ
LANGCHAIN_TRACING_V2=true - استخدام مزين
@traceableعلى العقد الحرجة - الاستعلام عن الآثار حسب حالة الخطأ، الكمون، أو الوسوم المخصصة
5. كشف الحلقات
# تجزئة الحالة لكشف التكرار
state_hashes = []
for state in history:
h = hash(frozenset(state.values.items()))
if h in state_hashes:
print('تم كشف حلقة!')
state_hashes.append(h)
المفتاح هو الجمع بين التصور (الهيكل)، البث (وقت التشغيل)، نقاط التفتيش (التاريخ)، والتتبع (الإنتاج)."
س2: كيف تحدد وتمنع الحلقات اللانهائية؟
الإجابة:
"الحلقات اللانهائية في LangGraph تحدث عادةً من مشاكل التوجيه أو الحالة التي لا تتغير أبداً. إليك نهجي:
استراتيجيات الكشف:
- تجزئة الحالة:
def create_state_hash(state):
# استثناء الحقول المتغيرة مثل الطوابع الزمنية
hashable = {k: v for k, v in state.items()
if k not in ['iteration', 'timestamp']}
return hashlib.sha256(
json.dumps(hashable, sort_keys=True).encode()
).hexdigest()
# إذا ظهرت نفس التجزئة مرتين، نحن في حلقة
- عدادات التكرار:
class State(TypedDict):
iteration: int
max_iterations: int
def supervisor(state):
if state['iteration'] >= state['max_iterations']:
return {'next_step': 'done'} # إجبار الخروج
- كشف تسلسل العقد:
# إذا تكرر نفس تسلسل العقد، نحن في حلقة
if node_sequence[-3:] == node_sequence[-6:-3]:
print('تم كشف تسلسل متكرر')
استراتيجيات المنع:
- دائماً ضمّن حارس تكرار في الحالة
- تصور الرسم البياني للتأكد من أن جميع المسارات تصل إلى END
- أضف max_iterations لمنطق التوجيه
- استخدم غلاف كشف الحلقات على العقد
- اختبر مع مدخلات متنوعة قبل الإنتاج
الأسباب الأكثر شيوعاً التي رأيتها هي:
- حافة مفقودة إلى END من معالجات الأخطاء
- منطق توجيه لا يستوفي أبداً شروط الخروج
- تحديثات الحالة التي لا تغير الحقول ذات الصلة بالتوجيه"
س3: ما أوضاع البث التي يدعمها LangGraph ومتى تستخدم كلاً منها؟
الإجابة:
"LangGraph 1.0.5 يدعم عدة أوضاع بث، كل منها مُحسَّن لحالات استخدام مختلفة:
1. stream_mode='updates'
for event in app.stream(input, config, stream_mode='updates'):
# event = {'node_name': {'field': 'new_value'}}
- يُظهر فقط ما أرجعته كل عقدة
- الأفضل لتصحيح سلوك العقد
- أصغر حجم حمولة
- الاستخدام: تصحيح مخرجات عقد محددة
2. stream_mode='values'
for state in app.stream(input, config, stream_mode='values'):
# state = الحالة المتراكمة الكاملة
- يُظهر الحالة الكاملة بعد كل عقدة
- الأفضل لفهم تطور الحالة
- حجم حمولة أكبر
- الاستخدام: تتبع سلوك المخفضات
3. stream_mode='messages'
- مُحسَّن لتطبيقات الدردشة
- يبث أجزاء الرسائل الفردية
- الاستخدام: بناء واجهات محادثة
4. stream_mode='events'
- أحداث منخفضة المستوى للتصحيح المتقدم
- يتضمن أحداث LangGraph الداخلية
- الاستخدام: التصحيح العميق لداخليات الرسم البياني
للتصحيح الإنتاجي، أبدأ عادةً بـ 'updates' لرؤية ما تغير، ثم أنتقل إلى 'values' إذا احتجت لفهم الحالة المتراكمة. لتطبيقات الدردشة في الوقت الفعلي، 'messages' يوفر أفضل تجربة مستخدم."
س4: كيف تستخدم LangSmith للتصحيح الإنتاجي؟
الإجابة:
"LangSmith ضروري لتصحيح تطبيقات LangGraph الإنتاجية. إليك إعدادي:
1. تمكين التتبع:
import os
os.environ['LANGCHAIN_TRACING_V2'] = 'true'
os.environ['LANGCHAIN_API_KEY'] = 'مفتاحك'
os.environ['LANGCHAIN_PROJECT'] = 'production-agents'
2. تزيين العقد الحرجة:
from langsmith import traceable
@traceable(name='supervisor', tags=['critical'])
def supervisor(state):
# الآثار تظهر في LangSmith مع سياق غني
...
3. الاستعلام عن آثار الأخطاء:
from langsmith import Client
client = Client()
errors = client.list_runs(
project_name='production-agents',
error=True,
start_time=datetime.now() - timedelta(hours=24)
)
4. تحليل الكمون:
slow_runs = client.list_runs(
project_name='production-agents',
filter='latency > 30000' # أكثر من 30 ثانية
)
الفوائد الرئيسية:
- تاريخ تتبع كامل لأي تنفيذ
- التقاط المدخلات/المخرجات لإعادة الإنتاج
- تفصيل الكمون حسب العقدة
- تجميع الأخطاء والتنبيهات
- المقارنة بين التشغيلات
- جمع التعليقات للتقييم
أستخدم أيضاً البيانات الوصفية المخصصة لوسم الآثار حسب الإصدار، البيئة، ومعرف المستخدم، مما يسهل التصفية وتحليل سيناريوهات محددة."
النقاط الرئيسية
| التقنية | الغرض | متى تُستخدم |
|---|---|---|
draw_mermaid() |
تصور هيكل الرسم البياني | التطوير، التوثيق |
stream(mode='updates') |
رؤية التغييرات عقدة بعقدة | تصحيح سلوك وقت التشغيل |
stream(mode='values') |
رؤية تطور الحالة الكامل | فهم المخفضات |
get_state_history() |
فحص الجدول الزمني لنقاط التفتيش | تصحيح السفر عبر الزمن |
| تجزئة الحالة | كشف الحلقات اللانهائية | منع الحلقات |
@traceable |
تكامل LangSmith | مراقبة الإنتاج |
| حراس التكرار | منع التنفيذ الجامح | آلية أمان |
| أغلفة التأكيد | التحقق من الحالة وقت التشغيل | الكشف المبكر عن الأخطاء |
ملخص أفضل الممارسات
-
دائماً تصور أولاً - مخططات الرسم البياني تكشف المشاكل الهيكلية فوراً
-
استخدم أوضاع البث بشكل استراتيجي -
updatesللتصحيح،valuesلتتبع الحالة -
مكّن LangSmith في الإنتاج - ضروري لتصحيح المشكلات التي لا يمكنك إعادة إنتاجها محلياً
-
أضف حراس التكرار - امنع الحلقات اللانهائية بفحوصات max_iterations
-
سجّل دخول/خروج العقد - استخدم المزينات للتتبع المتسق
-
جزّئ الحالة لكشف الحلقات - قارن تجزئات الحالة لكشف التكرار
-
تحقق من انتقالات الحالة - أضف تأكيدات لالتقاط المشكلات مبكراً
-
استخدم تصحيح السفر عبر الزمن - أعد التشغيل من نقاط التفتيش لاختبار الإصلاحات
-
وثّق عملية التصحيح - أنشئ كتيبات تشغيل للمشكلات الشائعة
-
اختبر مع حالات الحافة - كثير من الأخطاء تظهر فقط مع مدخلات غير عادية
التالي: مشروع التخرج - نظام البحث الإنتاجي
:::