الإنسان في الحلقة وأنماط الإنتاج
أنماط الإنسان في الحلقة
أنظمة AI الإنتاجية نادراً ما تعمل بشكل مستقل تماماً. من مراجعة العقود القانونية إلى الموافقات المالية إلى إشراف المحتوى، الرقابة البشرية ضرورية. يوفر LangGraph أنماط مقاطعة واستئناف أصلية تمكّن سير عمل توقف-استئناف نظيف بدون استقصاء أو طوابير خارجية.
لماذا الإنسان في الحلقة؟
| حالة الاستخدام | المتطلب |
|---|---|
| قرارات عالية المخاطر | موافقات مالية، مستندات قانونية، توصيات طبية |
| ضمان الجودة | مراجعة المحتوى، التحقق من الترجمة، التحقق من البيانات |
| الامتثال | سجلات التدقيق، سلاسل الموافقة، المتطلبات التنظيمية |
| التعلم | تصحيحات البشر تحسّن سلوك الوكيل المستقبلي |
| الحالات الاستثنائية | الوكيل يتعامل مع 95% تلقائياً، البشر يتعاملون مع الاستثناءات |
سيناريو حقيقي (يناير 2026): AI قانوني يصيغ عقود احتاج موافقة المحامي قبل الإرسال. نمط المقاطعة الأصلي: الرسم البياني يتوقف، المحامي يراجع في لوحة التحكم، يوافق أو يطلب تغييرات، الرسم البياني يستأنف. صفر استقصاء، صفر طوابير خارجية.
دالة interrupt()
يوفر LangGraph 1.0.5 دعم مقاطعة أصلي. عندما يتم استدعاء interrupt()، يتوقف التنفيذ ويعود التحكم للمتصل.
from typing import TypedDict, Literal
from langgraph.types import interrupt, Command
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.postgres import PostgresSaver
from langchain_openai import ChatOpenAI
# ============================================================
# تعريف الحالة
# ============================================================
class ContractState(TypedDict):
"""حالة لسير عمل صياغة العقود."""
# الإدخال
client_name: str
contract_type: str
terms: dict
# المسودة
draft: str
draft_version: int
# المراجعة
review_decision: str
reviewer_feedback: str
approved: bool
# البيانات الوصفية
review_history: list[dict]
# ============================================================
# تنفيذ العقد
# ============================================================
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
def draft_contract(state: ContractState) -> dict:
"""توليد أو مراجعة مسودة العقد."""
version = state.get("draft_version", 0) + 1
feedback = state.get("reviewer_feedback", "")
if feedback:
# دمج التغذية الراجعة في المراجعة
prompt = f"""راجع هذا العقد بناءً على تغذية المراجع الراجعة.
المسودة الحالية:
{state['draft']}
تغذية المراجع الراجعة:
{feedback}
قدم العقد المنقح."""
else:
# المسودة الأولية
prompt = f"""صِغ عقد {state['contract_type']} لـ {state['client_name']}.
الشروط:
{state['terms']}
قدم مسودة عقد احترافية."""
response = llm.invoke(prompt)
return {
"draft": response.content,
"draft_version": version,
"reviewer_feedback": "" # مسح التغذية الراجعة بعد المعالجة
}
def human_review(state: ContractState) -> Command:
"""
توقف للمراجعة البشرية باستخدام المقاطعة الأصلية.
استدعاء interrupt():
1. حفظ الحالة الحالية في نقطة تفتيش
2. إعادة التحكم للمتصل مع حمولة المقاطعة
3. انتظار Command(resume=data) للمتابعة
"""
# إنشاء حمولة المقاطعة للمراجع
review_request = interrupt({
"type": "contract_review",
"contract_id": f"contract-{state['client_name'].lower().replace(' ', '-')}",
"draft": state["draft"],
"version": state["draft_version"],
"client": state["client_name"],
"contract_type": state["contract_type"],
"options": ["approve", "reject", "request_changes"],
"instructions": "يرجى مراجعة مسودة العقد هذه وتقديم قرارك."
})
# عند الاستئناف، review_request يحتوي استجابة الإنسان
action = review_request.get("action", "reject")
feedback = review_request.get("feedback", "")
# تسجيل المراجعة في السجل
review_record = {
"version": state["draft_version"],
"action": action,
"feedback": feedback,
"reviewer": review_request.get("reviewer", "unknown")
}
history = state.get("review_history", []) + [review_record]
if action == "approve":
return Command(
goto="finalize_contract",
update={
"approved": True,
"review_decision": "approved",
"review_history": history
}
)
elif action == "request_changes":
return Command(
goto="draft_contract",
update={
"reviewer_feedback": feedback,
"review_decision": "changes_requested",
"review_history": history
}
)
else: # reject
return Command(
goto=END,
update={
"approved": False,
"review_decision": "rejected",
"reviewer_feedback": feedback,
"review_history": history
}
)
def finalize_contract(state: ContractState) -> dict:
"""إنهاء العقد المعتمد مع التوقيعات والبيانات الوصفية."""
final_draft = f"""
{state['draft']}
---
سجل الموافقة
العميل: {state['client_name']}
نوع العقد: {state['contract_type']}
الإصدار: {state['draft_version']}
الحالة: معتمد
سجل المراجعة: {len(state.get('review_history', []))} مراجعات
---
"""
return {"draft": final_draft}
# ============================================================
# بناء الرسم البياني
# ============================================================
def build_contract_workflow():
"""بناء سير عمل مراجعة العقد."""
builder = StateGraph(ContractState)
# إضافة العقد
builder.add_node("draft_contract", draft_contract)
builder.add_node("human_review", human_review)
builder.add_node("finalize_contract", finalize_contract)
# تحديد التدفق
builder.add_edge(START, "draft_contract")
builder.add_edge("draft_contract", "human_review")
# human_review يستخدم Command للتوجيه
builder.add_edge("finalize_contract", END)
return builder
# الاستخدام
workflow = build_contract_workflow()
app = workflow.compile(checkpointer=PostgresSaver.from_conn_string(
"postgresql://user:pass@localhost/langgraph"
))
الاستئناف بعد المقاطعة
عندما يصل الرسم البياني إلى مقاطعة، يعود فوراً مع بيانات المقاطعة. العميل يتعامل مع التفاعل البشري، ثم يستأنف.
from langgraph.types import Command
# ============================================================
# الاستدعاء الأولي - سيتوقف عند المقاطعة
# ============================================================
config = {"configurable": {"thread_id": "contract-acme-2026-001"}}
# بدء سير العمل
result = app.invoke(
{
"client_name": "شركة ACME",
"contract_type": "اتفاقية خدمة",
"terms": {
"duration": "12 شهر",
"value": "50,000 دولار",
"payment_terms": "صافي 30"
},
"draft": "",
"draft_version": 0,
"approved": False,
"review_history": []
},
config
)
# النتيجة تحتوي حمولة المقاطعة
print(result)
# ============================================================
# المراجعة البشرية (خارج الرسم البياني)
# ============================================================
# ... يمر الوقت، الإنسان يراجع في واجهة المستخدم ...
# الإنسان يقرر: طلب تغييرات مع تغذية راجعة
# ============================================================
# الاستئناف مع قرار الإنسان
# ============================================================
resume_result = app.invoke(
Command(resume={
"action": "request_changes",
"feedback": "يرجى إضافة بند إنهاء وتوضيح عقوبات الدفع.",
"reviewer": "jane.attorney@firm.com"
}),
config # نفس thread_id!
)
# الرسم البياني يستمر: draft_contract يراجع بناءً على التغذية الراجعة،
# ثم human_review يُشغّل مرة أخرى
# استئناف مرة أخرى مع الموافقة
final_result = app.invoke(
Command(resume={
"action": "approve",
"reviewer": "jane.attorney@firm.com"
}),
config
)
# الرسم البياني يكتمل: finalize_contract يعمل، سير العمل ينتهي
print(final_result["approved"]) # True
print(final_result["draft_version"]) # 2 (بعد المراجعة)
مراحل موافقة متعددة
سير العمل المعقد غالباً يتطلب نقاط اتصال بشرية متعددة مع مراجعين مختلفين.
from typing import TypedDict, Literal
from langgraph.types import interrupt, Command
from langgraph.graph import StateGraph, START, END
class MultiStageState(TypedDict):
"""حالة لسير عمل موافقة متعدد المراحل."""
document: str
document_type: str
# تتبع المرحلة
current_stage: Literal["draft", "manager", "legal", "executive", "complete"]
stages_completed: list[str]
# الموافقات
manager_approval: bool
legal_approval: bool
executive_approval: bool
# التغذية الراجعة من كل مرحلة
stage_feedback: dict
def manager_review(state: MultiStageState) -> Command:
"""المرحلة الأولى: مراجعة المدير."""
decision = interrupt({
"stage": "manager_review",
"reviewer_role": "مدير القسم",
"document": state["document"],
"document_type": state["document_type"],
"required_action": "مراجعة الدقة والاكتمال التجاري",
"options": ["approve", "reject", "request_changes"],
"escalate_option": True # يمكن التخطي للتنفيذي
})
action = decision.get("action")
feedback = decision.get("feedback", "")
completed = state.get("stages_completed", []) + ["manager"]
stage_feedback = state.get("stage_feedback", {})
stage_feedback["manager"] = feedback
if action == "approve":
return Command(
goto="legal_review",
update={
"manager_approval": True,
"current_stage": "legal",
"stages_completed": completed,
"stage_feedback": stage_feedback
}
)
elif action == "escalate":
# تخطي القانوني، الذهاب مباشرة للتنفيذي
return Command(
goto="executive_review",
update={
"manager_approval": True,
"current_stage": "executive",
"stages_completed": completed,
"stage_feedback": stage_feedback
}
)
elif action == "request_changes":
return Command(
goto="revise_document",
update={
"current_stage": "draft",
"stage_feedback": stage_feedback
}
)
else: # reject
return Command(
goto=END,
update={
"manager_approval": False,
"stage_feedback": stage_feedback
}
)
def legal_review(state: MultiStageState) -> Command:
"""المرحلة الثانية: المراجعة القانونية."""
decision = interrupt({
"stage": "legal_review",
"reviewer_role": "المستشار القانوني",
"document": state["document"],
"document_type": state["document_type"],
"required_action": "مراجعة الامتثال القانوني والمخاطر",
"previous_approvals": state.get("stages_completed", []),
"options": ["approve", "reject", "request_changes"]
})
action = decision.get("action")
feedback = decision.get("feedback", "")
completed = state.get("stages_completed", []) + ["legal"]
stage_feedback = state.get("stage_feedback", {})
stage_feedback["legal"] = feedback
if action == "approve":
return Command(
goto="executive_review",
update={
"legal_approval": True,
"current_stage": "executive",
"stages_completed": completed,
"stage_feedback": stage_feedback
}
)
elif action == "request_changes":
return Command(
goto="revise_document",
update={
"current_stage": "draft",
"stage_feedback": stage_feedback
}
)
else:
return Command(
goto=END,
update={
"legal_approval": False,
"stage_feedback": stage_feedback
}
)
def executive_review(state: MultiStageState) -> Command:
"""المرحلة النهائية: موافقة التنفيذي."""
decision = interrupt({
"stage": "executive_review",
"reviewer_role": "التنفيذي",
"document": state["document"],
"document_type": state["document_type"],
"required_action": "الموافقة النهائية",
"previous_approvals": state.get("stages_completed", []),
"options": ["approve", "reject"]
})
action = decision.get("action")
completed = state.get("stages_completed", []) + ["executive"]
if action == "approve":
return Command(
goto="finalize",
update={
"executive_approval": True,
"current_stage": "complete",
"stages_completed": completed
}
)
else:
return Command(
goto=END,
update={
"executive_approval": False,
"stages_completed": completed
}
)
أنماط المهلة والتصعيد
أنظمة الإنتاج تحتاج للتعامل مع الحالات التي لا يستجيب فيها البشر في الوقت المحدد.
from datetime import datetime, timedelta
from typing import TypedDict
from langgraph.types import interrupt, Command
class TimeoutState(TypedDict):
"""حالة مع تتبع المهلة."""
task: str
result: str
# تتبع المهلة
interrupt_timestamp: str
timeout_hours: int
escalated: bool
escalation_reason: str
def review_with_timeout(state: TimeoutState) -> Command:
"""
مراجعة بشرية مع تصعيد تلقائي عند انتهاء المهلة.
ملاحظة: المهلة تُفحص عند الاستئناف، ليس أثناء الانتظار.
مجدول خارجي يجب أن يشغّل الاستئناف بعد المهلة.
"""
timeout_hours = state.get("timeout_hours", 24)
interrupt_time = datetime.now()
decision = interrupt({
"type": "review_with_timeout",
"task": state["task"],
"current_result": state["result"],
"deadline": (interrupt_time + timedelta(hours=timeout_hours)).isoformat(),
"escalation_contact": "senior.manager@company.com",
"instructions": f"يرجى المراجعة خلال {timeout_hours} ساعة أو المهمة ستُصعَّد."
})
# التحقق مما إذا استأنفنا بعد المهلة
resume_time = datetime.now()
elapsed = resume_time - interrupt_time
if elapsed > timedelta(hours=timeout_hours):
# حدثت مهلة - تصعيد
return Command(
goto="escalate",
update={
"escalated": True,
"escalation_reason": f"مهلة المراجعة بعد {elapsed.total_seconds() / 3600:.1f} ساعات"
}
)
# استجابة عادية ضمن المهلة
return Command(
goto="continue_workflow",
update={
"result": decision.get("result", state["result"]),
"escalated": False
}
)
def escalate(state: TimeoutState) -> dict:
"""التعامل مع المهام المُصعَّدة."""
# إخطار المدير الأقدم، إنشاء تذكرة، إلخ.
return {
"result": f"مُصعَّد: {state['task']} - {state['escalation_reason']}"
}
# ============================================================
# معالج المهلة الخارجي
# ============================================================
async def timeout_monitor(app, thread_id: str, timeout_hours: int = 24):
"""
خدمة خارجية تشغّل الاستئناف عند المهلة.
شغّل هذا كمهمة خلفية أو وظيفة مجدولة.
"""
import asyncio
await asyncio.sleep(timeout_hours * 3600)
# التحقق مما إذا كان لا يزال ينتظر (لم يُستأنف بالفعل)
state = app.get_state({"configurable": {"thread_id": thread_id}})
if state.next: # لا يزال لديه عقد معلقة (ينتظر عند المقاطعة)
# فرض الاستئناف مع مؤشر المهلة
app.invoke(
Command(resume={
"action": "timeout",
"auto_escalate": True
}),
{"configurable": {"thread_id": thread_id}}
)
البث مع المقاطعات
لتحديثات واجهة المستخدم في الوقت الفعلي، ادمج البث مع معالجة المقاطعات.
from langgraph.types import interrupt, Command, StreamWriter
async def streaming_review(
state: ContractState,
writer: StreamWriter
) -> Command:
"""بث تحديثات التقدم أثناء انتظار المراجعة البشرية."""
# بث تحديث الحالة
writer({"status": "awaiting_review", "draft": state["draft"]})
# مقاطعة للإدخال البشري
decision = interrupt({
"type": "review",
"draft": state["draft"]
})
# بث القرار المستلم
writer({"status": "review_complete", "decision": decision["action"]})
if decision["action"] == "approve":
return Command(goto="finalize")
else:
return Command(goto="revise")
# معالج البث من جانب العميل
async def handle_workflow_stream(app, config, initial_state):
"""معالجة سير العمل المتدفق مع المقاطعات."""
async for event in app.astream(initial_state, config):
if "status" in event:
print(f"الحالة: {event['status']}")
if event.get("type") == "interrupt":
# سير العمل متوقف - عرض واجهة المراجعة
print(f"مراجعة مطلوبة: {event['payload']}")
# الحصول على قرار الإنسان (من واجهة المستخدم، API، إلخ.)
decision = await get_human_decision(event["payload"])
# استئناف سير العمل
async for resume_event in app.astream(
Command(resume=decision),
config
):
print(f"حدث الاستئناف: {resume_event}")
أسئلة المقابلة
س: كيف تنفذ الإنسان في الحلقة في LangGraph؟
"استخدم دالة
interrupt()الأصلية. عند التنفيذ، توقف الرسم البياني وتعيد التحكم للمتصل مع حمولة المقاطعة. حالة الرسم البياني تُحفظ تلقائياً في نقطة تفتيش. لاحقاً، استئناف معCommand(resume=data)لمتابعة التنفيذ من حيث توقف. استجابة الإنسان متاحة كقيمة إرجاعinterrupt()."
س: لماذا تستخدم interrupt() بدلاً من الاستقصاء البسيط أو الطوابير الخارجية؟
"المقاطعة أصلية لنموذج تنفيذ LangGraph. تحفظ الحالة بشكل صحيح في نقطة التوقف الدقيقة، تتكامل مع طابور مهام منصة LangGraph، تدعم أنماط المهلة والتصعيد النظيفة، وتمكّن دلالات توقف-استئناف حقيقية. على عكس الاستقصاء، لا يوجد انتظار مشغول. على عكس الطوابير الخارجية، اتساق الحالة مضمون لأن حافظ نقاط التفتيش يتعامل معه."
س: كيف تتعامل مع المهلات في سير عمل الإنسان في الحلقة؟
"سجّل الطابع الزمني للمقاطعة في حمولة المقاطعة، ضمّن موعد نهائي. عند الاستئناف، قارن الوقت الحالي بالموعد النهائي. إذا تجاوز، وجّه إلى عقدة تصعيد. مجدول خارجي أو وظيفة cron يمكنها تشغيل استئناف تلقائي بعد المهلة باستدعاء
Command(resume=timeout_indicator). هذا الفصل يحافظ على منطق المهلة صريح وقابل للاختبار."
س: كيف تنفذ موافقات متعددة المراحل؟
"سلسل نقاط مقاطعة متعددة بأدوار مختلفة. كل مقاطعة تحدد دور المراجع والإجراء المطلوب. استخدم الحالة لتتبع المراحل المكتملة. منطق التوجيه بعد كل مقاطعة يقرر ما إذا كان يجب المتابعة للمرحلة التالية، طلب مراجعات، أو الرفض. بنية الرسم البياني تدعم طبيعياً سلاسل الموافقة المعقدة بما في ذلك تخطي المستويات والموافقات المتوازية."
النقاط الرئيسية
- interrupt() يوقف التنفيذ ويعيد التحكم للمتصل مع الحمولة
- Command(resume=data) يستمر التنفيذ من نقطة التوقف مع استجابة الإنسان
- التفتيش مطلوب - المقاطعة تعمل فقط مع حافظ نقاط تفتيش مُكوَّن
- نفس thread_id - الاستئناف يجب أن يستخدم نفس thread_id كالاستدعاء الأولي
- مراحل متعددة - سلسل المقاطعات لسير عمل موافقة معقد
- معالجة المهلة - تحقق من الوقت المنقضي عند الاستئناف، استخدم مجدول خارجي للتصعيد التلقائي
- متوافق مع البث - ادمج مع astream لتحديثات واجهة المستخدم في الوقت الفعلي
- تكامل قاعدة البيانات - خزّن طلبات الموافقة للوصول من لوحة التحكم/API
:::