أنماط الرسوم البيانية المتقدمة
التفرع الشرطي والتنفيذ المتوازي
لماذا التفرع مهم في الإنتاج
سيناريو إنتاجي حقيقي (يناير 2026):
احتاج نظام معالجة مستندات مؤسسي إلى توجيه المستندات إلى خطوط معالجة مختلفة بناءً على النوع (PDF، صورة، بيانات منظمة) ثم معالجة أقسام متعددة بالتوازي. مع المعالجة التسلسلية، استغرق تقرير من 100 صفحة 45 دقيقة. مع التفرع المتوازي باستخدام Send API، يستغرق الآن 8 دقائق.
هذا الدرس يعلمك: كيفية تنفيذ التوجيه الشرطي مع 3+ فروع والاستفادة من Send API للتنفيذ المتوازي في سير عمل LangGraph الإنتاجي.
الحواف الشرطية مع فروع متعددة
النمط الأساسي: فرعين
from typing import Literal
from langgraph.graph import StateGraph, END
def route_simple(state: dict) -> Literal["path_a", "path_b"]:
if state["score"] > 0.8:
return "path_a"
return "path_b"
graph.add_conditional_edges("router", route_simple, {
"path_a": "node_a",
"path_b": "node_b"
})
نمط الإنتاج: 3+ فروع
from typing import TypedDict, Literal, Optional
class DocumentState(TypedDict):
content: str
doc_type: Optional[str]
language: Optional[str]
priority: str
result: Optional[str]
def classify_document(state: DocumentState) -> Literal[
"process_pdf",
"process_image",
"process_structured",
"process_text",
"error_handler"
]:
"""التوجيه إلى المعالج المناسب بناءً على نوع المستند."""
doc_type = state.get("doc_type", "").lower()
# تجاوز الأولوية للمستندات العاجلة
if state.get("priority") == "urgent":
return "process_text" # المسار السريع
# التوجيه القائم على النوع
routing_map = {
"pdf": "process_pdf",
"image": "process_image",
"jpg": "process_image",
"png": "process_image",
"json": "process_structured",
"csv": "process_structured",
"xml": "process_structured",
}
route = routing_map.get(doc_type, "process_text")
return route
Send API: التنفيذ المتوازي (يناير 2026)
المشكلة: المعالجة التسلسلية للمهام المستقلة بطيئة.
الحل: Send API في LangGraph تطلق تنفيذات متوازية.
from langgraph.constants import Send
from typing import TypedDict, Annotated, List
import operator
class ParallelState(TypedDict):
documents: List[str]
results: Annotated[List[str], operator.add] # تتراكم من التشغيلات المتوازية
def fan_out(state: ParallelState) -> List[Send]:
"""إطلاق معالجة متوازية لكل مستند."""
sends = []
for i, doc in enumerate(state["documents"]):
sends.append(
Send(
"process_document", # العقدة الهدف
{"doc_id": i, "content": doc} # الحالة لهذا الفرع
)
)
return sends
def process_document(state: dict) -> dict:
"""معالجة مستند واحد (يعمل بالتوازي)."""
doc_id = state["doc_id"]
content = state["content"]
result = f"تمت معالجة المستند {doc_id}: {len(content)} حرف"
return {"results": [result]} # سيتم دمجها عبر operator.add
المفاهيم الرئيسية:
Send(node_name, state)ينشئ تنفيذاً متوازياً- أرجع
List[Send]من عقدة للتفرع - استخدم
Annotated[List, operator.add]لدمج النتائج المتوازية - جميع الفروع المتوازية تتزامن تلقائياً قبل العقدة التالية
نمط Map-Reduce مع Send
نمط الإنتاج: معالجة الأجزاء بالتوازي، ثم التخفيض:
from langgraph.constants import Send
from typing import TypedDict, Annotated, List, Optional
import operator
class MapReduceState(TypedDict):
# الإدخال
large_document: str
chunk_size: int
# المعالجة المتوازية
chunk_results: Annotated[List[dict], operator.add]
# المخرجات النهائية
final_summary: Optional[str]
def split_and_map(state: MapReduceState) -> List[Send]:
"""تقسيم المستند إلى أجزاء ومعالجتها بالتوازي."""
doc = state["large_document"]
chunk_size = state.get("chunk_size", 1000)
# التقسيم إلى أجزاء
chunks = [doc[i:i+chunk_size] for i in range(0, len(doc), chunk_size)]
# إطلاق المعالجة المتوازية
sends = []
for i, chunk in enumerate(chunks):
sends.append(Send(
"process_chunk",
{
"chunk_id": i,
"chunk_content": chunk,
"total_chunks": len(chunks)
}
))
return sends
def reduce_results(state: MapReduceState) -> dict:
"""دمج جميع نتائج الأجزاء في ملخص نهائي."""
results = state.get("chunk_results", [])
# الترتيب حسب chunk_id للحفاظ على الترتيب
sorted_results = sorted(results, key=lambda x: x["chunk_id"])
# دمج الملخصات
combined = "\n".join([r["summary"] for r in sorted_results])
return {"final_summary": combined}
معالجة الأخطاء في التنفيذ المتوازي
حرج: التعامل مع الفشل في الفروع الفردية:
def process_with_error_handling(state: dict) -> dict:
"""المعالجة مع التقاط الأخطاء."""
item_id = state["item_id"]
content = state["content"]
try:
# محاولة المعالجة
result = f"تمت المعالجة: {content}"
return {
"successes": [{"item_id": item_id, "result": result}]
}
except Exception as e:
return {
"failures": [{
"item_id": item_id,
"error": str(e)
}]
}
أسئلة المقابلة الشائعة
س1: "كيف يختلف Send API عن الحواف الشرطية البسيطة؟"
إجابة قوية:
"الحواف الشرطية توجه إلى فرع واحد في كل مرة—إنه اختيار واحد من N. Send API تمكّن التفرع إلى فروع متعددة في وقت واحد—إنه تنفيذ متوازي. استخدم الحواف الشرطية عندما تحتاج لاختيار مسار بناءً على الحالة. استخدم Send عندما تحتاج لمعالجة عناصر أو جوانب متعددة بالتوازي ودمج النتائج."
س2: "كيف تتعامل مع الفشل في الفروع المتوازية؟"
الإجابة:
"يجب على كل فرع متوازي أن يغلف منطقه في try-catch ويعيد إما إلى قائمة 'successes' أو 'failures' في الحالة، كلاهما يستخدم مخفضات operator.add. عقدة التجميع تلخص النتائج، وتحسب معدلات النجاح، ويمكنها تشغيل التنبيهات أو إعادة المحاولات للفشل."
النقاط الرئيسية للإنتاج
✅ استخدم توجيه 3+ فروع للتصنيف المعقد ✅ Send API للتنفيذ المتوازي - ضروري للأداء على نطاق واسع ✅ نمط map-reduce لمعالجة المستندات/البيانات الكبيرة ✅ تعامل دائماً مع الأخطاء في الفروع المتوازية بشكل فردي ✅ استخدم مخفضات operator.add لدمج النتائج المتوازية
التالي: تعلم أنماط استرداد الأخطاء بما في ذلك عقد try-catch، ومسارات الاحتياط، وقواطع الدائرة في الدرس 3.
:::