أنظمة الوكلاء المتعددة مع LangGraph

التواصل بين الوكلاء

5 دقيقة للقراءة

في أنظمة الوكلاء المتعددة، تحدد كيفية مشاركة الوكلاء للمعلومات فعالية النظام. يستخدم LangGraph التواصل القائم على الحالة بدلاً من الرسائل المباشرة--يقرأ الوكلاء من ويكتبون إلى حقول الحالة المشتركة. يغطي هذا الدرس أنماط التواصل من تمرير الرسائل البسيط إلى بروتوكولات التنسيق المعقدة.


لماذا التواصل القائم على الحالة؟

تستخدم أنظمة الوكلاء المتعددة التقليدية طوابير الرسائل أو استدعاءات RPC المباشرة. يتبع LangGraph نهجاً مختلفاً: كل التواصل يحدث عبر الحالة.

# النهج التقليدي (ليس كيفية عمل LangGraph)
# agent_a.send_message(agent_b, data)  # رسائل مباشرة

# نهج LangGraph: التواصل عبر الحالة
class TeamState(TypedDict):
    """الحالة المشتركة هي وسيط التواصل."""
    # سياق المهمة - يقرأه جميع الوكلاء
    task: str

    # لوحة الرسائل - يلحق الوكلاء الرسائل هنا
    messages: Annotated[list[dict], operator.add]

    # مخرجات الوكلاء - يكتبها وكلاء محددون
    research_output: str
    analysis_output: str
    final_report: str

فوائد التواصل القائم على الحالة:

الفائدة الشرح
الاستمرارية التلقائية الرسائل تُحفظ مع نقاط التفتيش
تصحيح السفر عبر الزمن إعادة تشغيل أي نقطة في المحادثة
إعادة التشغيل الحتمية نفس الحالة تنتج نفس السلوك
لا فقدان للرسائل الحالة دائماً متسقة
اختبار بسيط محاكاة الحالة، ليس طوابير الرسائل

نمط لوحة الرسائل

النمط الأكثر شيوعاً هو لوحة رسائل مشتركة حيث ينشر الوكلاء ويقرأون الرسائل.

from typing import TypedDict, Annotated, Literal
from datetime import datetime
import operator
import uuid
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, START, END

# ============================================================
# تعريف الحالة مع لوحة الرسائل
# ============================================================

class Message(TypedDict):
    """صيغة رسالة منظمة للتواصل بين الوكلاء."""
    id: str                    # معرف الرسالة الفريد
    from_agent: str            # اسم الوكيل المرسل
    to_agent: str              # المستلم ("all" للبث)
    message_type: str          # الفئة: finding, request, response, feedback
    content: str               # محتوى الرسالة
    timestamp: str             # الطابع الزمني بصيغة ISO
    metadata: dict             # سياق إضافي


class CollaborativeState(TypedDict):
    """حالة مع لوحة رسائل مشتركة للتواصل بين الوكلاء."""
    # الإدخال
    task: str

    # لوحة الرسائل - قناة التواصل
    messages: Annotated[list[Message], operator.add]

    # مخرجات الوكلاء
    research_findings: str
    analysis_result: str
    final_output: str

    # التحكم في سير العمل
    iteration: int
    max_iterations: int


# ============================================================
# الدوال المساعدة
# ============================================================

def create_message(
    from_agent: str,
    to_agent: str,
    message_type: str,
    content: str,
    metadata: dict = None
) -> Message:
    """إنشاء رسالة موحدة."""
    return {
        "id": str(uuid.uuid4()),
        "from_agent": from_agent,
        "to_agent": to_agent,
        "message_type": message_type,
        "content": content,
        "timestamp": datetime.now().isoformat(),
        "metadata": metadata or {}
    }


def get_messages_for_agent(
    messages: list[Message],
    agent_name: str,
    message_type: str = None
) -> list[Message]:
    """تصفية الرسائل الموجهة لوكيل محدد."""
    filtered = [
        m for m in messages
        if m["to_agent"] in [agent_name, "all"]
    ]

    if message_type:
        filtered = [m for m in filtered if m["message_type"] == message_type]

    return filtered


def get_latest_message(
    messages: list[Message],
    from_agent: str = None,
    message_type: str = None
) -> Message | None:
    """الحصول على أحدث رسالة تطابق المعايير."""
    filtered = messages

    if from_agent:
        filtered = [m for m in filtered if m["from_agent"] == from_agent]

    if message_type:
        filtered = [m for m in filtered if m["message_type"] == message_type]

    return filtered[-1] if filtered else None


# ============================================================
# تنفيذ الوكلاء
# ============================================================

llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)


def researcher_agent(state: CollaborativeState) -> dict:
    """
    وكيل الباحث الذي:
    1. يقرأ المهمة من الحالة
    2. يتحقق من التغذية الراجعة من المشرف
    3. ينشر النتائج في لوحة الرسائل
    """
    task = state["task"]
    messages = state.get("messages", [])

    # التحقق من التغذية الراجعة التي تتطلب المراجعة
    feedback = get_messages_for_agent(messages, "researcher", "feedback")

    if feedback:
        latest_feedback = feedback[-1]
        prompt = f"""أنت وكيل بحث. تلقيت تغذية راجعة على بحثك السابق.

المهمة: {task}

النتائج السابقة: {state.get('research_findings', 'لا يوجد')}

التغذية الراجعة: {latest_feedback['content']}

يرجى مراجعة بحثك بناءً على التغذية الراجعة. كن شاملاً وعالج جميع النقاط."""
    else:
        prompt = f"""أنت وكيل بحث. ابحث في المهمة التالية بشكل شامل.

المهمة: {task}

قدم نتائج مفصلة مع:
1. الحقائق والبيانات الرئيسية
2. المصادر والأدلة
3. الاعتبارات المهمة"""

    response = llm.invoke(prompt)
    findings = response.content

    # نشر النتائج في لوحة الرسائل
    finding_message = create_message(
        from_agent="researcher",
        to_agent="all",  # بث لجميع الوكلاء
        message_type="finding",
        content=findings,
        metadata={
            "iteration": state.get("iteration", 1),
            "is_revision": bool(feedback)
        }
    )

    return {
        "research_findings": findings,
        "messages": [finding_message]
    }


def analyst_agent(state: CollaborativeState) -> dict:
    """
    وكيل المحلل الذي:
    1. يقرأ نتائج البحث من الرسائل
    2. يتحقق من الطلبات المحددة
    3. ينشر التحليل في لوحة الرسائل
    """
    messages = state.get("messages", [])

    # الحصول على نتائج البحث من لوحة الرسائل
    research_messages = [
        m for m in messages
        if m["from_agent"] == "researcher" and m["message_type"] == "finding"
    ]

    if not research_messages:
        # لا يوجد بحث للتحليل بعد
        return {
            "messages": [create_message(
                from_agent="analyst",
                to_agent="supervisor",
                message_type="status",
                content="في انتظار نتائج البحث"
            )]
        }

    # الحصول على أحدث بحث
    latest_research = research_messages[-1]["content"]

    # التحقق من طلبات التحليل المحددة
    requests = get_messages_for_agent(messages, "analyst", "request")

    if requests:
        latest_request = requests[-1]
        prompt = f"""أنت وكيل تحليل. لديك طلب محدد للتعامل معه.

نتائج البحث:
{latest_research}

الطلب: {latest_request['content']}

قدم تحليلاً مركزاً يعالج الطلب المحدد."""
    else:
        prompt = f"""أنت وكيل تحليل. حلل نتائج البحث التالية.

نتائج البحث:
{latest_research}

قدم:
1. الرؤى والأنماط الرئيسية
2. نقاط القوة والضعف
3. التوصيات القابلة للتنفيذ"""

    response = llm.invoke(prompt)
    analysis = response.content

    # نشر التحليل في لوحة الرسائل
    analysis_message = create_message(
        from_agent="analyst",
        to_agent="all",
        message_type="analysis",
        content=analysis,
        metadata={"based_on_research_id": research_messages[-1]["id"]}
    )

    return {
        "analysis_result": analysis,
        "messages": [analysis_message]
    }


def supervisor_agent(state: CollaborativeState) -> dict:
    """
    المشرف الذي ينسق بقراءة ونشر الرسائل.
    لا يستدعي الوكلاء مباشرة--يستخدم الرسائل للتنسيق.
    """
    messages = state.get("messages", [])
    iteration = state.get("iteration", 1)
    max_iterations = state.get("max_iterations", 3)

    # جمع المخرجات من لوحة الرسائل
    research_msgs = [m for m in messages if m["message_type"] == "finding"]
    analysis_msgs = [m for m in messages if m["message_type"] == "analysis"]

    # بناء السياق لتقييم الجودة
    context = f"""المهمة: {state['task']}

نتائج البحث: {research_msgs[-1]['content'] if research_msgs else 'لا يوجد'}

التحليل: {analysis_msgs[-1]['content'] if analysis_msgs else 'لا يوجد'}

التكرار: {iteration}/{max_iterations}"""

    quality_prompt = f"""أنت مشرف يراجع مخرجات الفريق.

{context}

قيّم:
1. هل البحث شامل وموثق جيداً؟ (1-10)
2. هل التحليل ثاقب وقابل للتنفيذ؟ (1-10)
3. هل هناك فجوات تحتاج معالجة؟

إذا كانت الجودة أقل من 7/10 في أي مجال، حدد ما يحتاج تحسين.
إذا كانت الجودة جيدة (7+/10)، قل "APPROVED".

الصيغة: APPROVED أو NEEDS_REVISION: [تغذية راجعة محددة]"""

    response = llm.invoke(quality_prompt)
    evaluation = response.content

    output_messages = []

    if "APPROVED" in evaluation.upper() or iteration >= max_iterations:
        # الجودة معتمدة أو وصلنا للحد الأقصى من التكرارات
        output_messages.append(create_message(
            from_agent="supervisor",
            to_agent="writer",
            message_type="request",
            content="يرجى تجميع التقرير النهائي",
            metadata={"approved": True}
        ))

        return {
            "messages": output_messages,
            "iteration": iteration
        }
    else:
        # إرسال تغذية راجعة للمراجعة
        if "research" in evaluation.lower():
            output_messages.append(create_message(
                from_agent="supervisor",
                to_agent="researcher",
                message_type="feedback",
                content=evaluation
            ))

        if "analysis" in evaluation.lower():
            output_messages.append(create_message(
                from_agent="supervisor",
                to_agent="analyst",
                message_type="feedback",
                content=evaluation
            ))

        return {
            "messages": output_messages,
            "iteration": iteration + 1
        }

نمط الطلب-الاستجابة

للتواصل المباشر بين الوكلاء، استخدم نمط الطلب-الاستجابة مع معرفات الربط.

from typing import TypedDict, Annotated
import operator
import uuid


class RequestResponseState(TypedDict):
    """حالة تدعم تواصل الطلب-الاستجابة."""
    task: str
    messages: Annotated[list[dict], operator.add]
    pending_requests: list[str]  # تتبع الاستجابات المنتظرة
    completed_responses: dict    # ربط request_id -> الاستجابة


def data_requester_agent(state: RequestResponseState) -> dict:
    """
    وكيل يطلب بيانات من وكيل آخر.
    يستخدم معرفات الربط لتتبع أزواج الطلب-الاستجابة.
    """
    request_id = str(uuid.uuid4())

    request_message = {
        "id": request_id,
        "from_agent": "requester",
        "to_agent": "data_provider",
        "message_type": "data_request",
        "content": {
            "query": "الحصول على بيانات المبيعات للربع الرابع 2025",
            "format": "json",
            "filters": {"region": "EMEA"}
        },
        "expects_response": True,
        "timestamp": datetime.now().isoformat()
    }

    return {
        "messages": [request_message],
        "pending_requests": [request_id]
    }


def data_provider_agent(state: RequestResponseState) -> dict:
    """
    وكيل يستجيب لطلبات البيانات.
    يربط الاستجابة بالطلب الأصلي عبر حقل response_to.
    """
    messages = state.get("messages", [])

    # البحث عن الطلبات الموجهة لهذا الوكيل
    my_requests = [
        m for m in messages
        if m["to_agent"] == "data_provider"
        and m["message_type"] == "data_request"
    ]

    response_messages = []

    for request in my_requests:
        # التحقق مما إذا كنا قد استجبنا بالفعل
        existing_responses = [
            m for m in messages
            if m.get("response_to") == request["id"]
        ]

        if existing_responses:
            continue  # استجبنا بالفعل

        # معالجة الطلب
        query = request["content"]["query"]

        # محاكاة استرجاع البيانات
        data = {
            "query": query,
            "results": [
                {"month": "أكتوبر", "sales": 150000},
                {"month": "نوفمبر", "sales": 175000},
                {"month": "ديسمبر", "sales": 220000}
            ],
            "total": 545000
        }

        response_messages.append({
            "id": str(uuid.uuid4()),
            "from_agent": "data_provider",
            "to_agent": request["from_agent"],  # الرد على الطالب
            "message_type": "data_response",
            "response_to": request["id"],  # معرف الربط
            "content": data,
            "timestamp": datetime.now().isoformat()
        })

    return {"messages": response_messages}


def process_responses_agent(state: RequestResponseState) -> dict:
    """
    وكيل يعالج الاستجابات المستلمة.
    يطابق الاستجابات مع الطلبات المعلقة.
    """
    messages = state.get("messages", [])
    pending = state.get("pending_requests", [])
    completed = state.get("completed_responses", {})

    # البحث عن استجابات لطلباتنا
    responses = [
        m for m in messages
        if m["message_type"] == "data_response"
        and m.get("response_to") in pending
    ]

    new_completed = dict(completed)
    new_pending = list(pending)

    for response in responses:
        request_id = response["response_to"]
        new_completed[request_id] = response["content"]

        if request_id in new_pending:
            new_pending.remove(request_id)

    return {
        "completed_responses": new_completed,
        "pending_requests": new_pending
    }

نمط النشر-الاشتراك مع المواضيع

للأنظمة المعقدة، نفذ الرسائل القائمة على المواضيع حيث يشترك الوكلاء في مواضيع محددة.

from typing import TypedDict, Annotated, Literal
import operator


class TopicMessage(TypedDict):
    """رسالة مع موضوع لنمط النشر-الاشتراك."""
    id: str
    from_agent: str
    topic: str  # مثل "research.findings", "analysis.complete"
    content: str
    timestamp: str


class PubSubState(TypedDict):
    """حالة لنمط تواصل النشر-الاشتراك."""
    task: str
    messages: Annotated[list[TopicMessage], operator.add]


# أمثلة على التسلسل الهرمي للمواضيع:
# research.findings      - بحث جديد تم اكتشافه
# research.revision      - البحث يحتاج مراجعة
# analysis.complete      - التحليل اكتمل
# analysis.needs_data    - التحليل يطلب المزيد من البيانات
# quality.approved       - فحص الجودة نجح
# quality.rejected       - فحص الجودة فشل
# workflow.complete      - سير العمل بالكامل اكتمل


def publish_message(
    from_agent: str,
    topic: str,
    content: str
) -> TopicMessage:
    """إنشاء رسالة قائمة على الموضوع."""
    return {
        "id": str(uuid.uuid4()),
        "from_agent": from_agent,
        "topic": topic,
        "content": content,
        "timestamp": datetime.now().isoformat()
    }


def subscribe_to_topics(
    messages: list[TopicMessage],
    topics: list[str]
) -> list[TopicMessage]:
    """
    الحصول على الرسائل المطابقة للمواضيع المشترك بها.
    يدعم أحرف البدل: "research.*" يطابق جميع مواضيع البحث.
    """
    matched = []

    for message in messages:
        msg_topic = message["topic"]

        for pattern in topics:
            if pattern.endswith(".*"):
                # مطابقة حرف البدل
                prefix = pattern[:-2]
                if msg_topic.startswith(prefix):
                    matched.append(message)
                    break
            elif msg_topic == pattern:
                matched.append(message)
                break

    return matched


def research_agent_pubsub(state: PubSubState) -> dict:
    """وكيل البحث باستخدام نمط النشر-الاشتراك."""
    messages = state.get("messages", [])

    # الاشتراك في طلبات المراجعة
    revisions = subscribe_to_topics(messages, ["research.revision"])

    if revisions:
        latest = revisions[-1]
        findings = f"بحث منقح بناءً على: {latest['content']}"
    else:
        findings = f"بحث أولي عن: {state['task']}"

    # نشر النتائج
    return {
        "messages": [
            publish_message(
                from_agent="researcher",
                topic="research.findings",
                content=findings
            )
        ]
    }


def analyst_agent_pubsub(state: PubSubState) -> dict:
    """وكيل المحلل يشترك في مواضيع البحث."""
    messages = state.get("messages", [])

    # الاشتراك في جميع مواضيع البحث
    research_msgs = subscribe_to_topics(messages, ["research.*"])

    if not research_msgs:
        return {"messages": []}

    latest_research = research_msgs[-1]["content"]
    analysis = f"تحليل لـ: {latest_research}"

    return {
        "messages": [
            publish_message(
                from_agent="analyst",
                topic="analysis.complete",
                content=analysis
            )
        ]
    }


def quality_agent_pubsub(state: PubSubState) -> dict:
    """وكيل الجودة يشترك في أحداث الاكتمال."""
    messages = state.get("messages", [])

    # الاشتراك في أحداث الاكتمال
    completions = subscribe_to_topics(
        messages,
        ["research.findings", "analysis.complete"]
    )

    if len(completions) < 2:
        return {"messages": []}  # انتظار كليهما

    # فحص الجودة
    all_good = True  # مبسط

    if all_good:
        return {
            "messages": [
                publish_message(
                    from_agent="quality",
                    topic="quality.approved",
                    content="جميع المخرجات تستوفي معايير الجودة"
                )
            ]
        }
    else:
        return {
            "messages": [
                publish_message(
                    from_agent="quality",
                    topic="research.revision",
                    content="البحث يحتاج المزيد من العمق"
                )
            ]
        }

نمط السبورة

يستخدم نمط السبورة أقساماً مصنفة من الحالة لفئات بيانات مختلفة، مما يمكّن التعاون المنظم.

from typing import TypedDict, Annotated
from dataclasses import dataclass
import operator


class BlackboardState(TypedDict):
    """
    نمط السبورة: أقسام منظمة لأنواع بيانات مختلفة.
    الوكلاء يقرأون من الأقسام ذات الصلة بهم ويكتبون إلى قسمهم.
    """
    # قسم الإدخال
    problem: str
    constraints: list[str]

    # قسم الفرضيات (يكتبه وكلاء الفرضيات)
    hypotheses: Annotated[list[dict], operator.add]

    # قسم الأدلة (يكتبه وكلاء البحث)
    evidence: Annotated[list[dict], operator.add]

    # قسم التقييم (يكتبه وكلاء التقييم)
    evaluations: Annotated[list[dict], operator.add]

    # قسم الحل (يكتبه وكيل التجميع)
    current_solution: dict
    solution_history: Annotated[list[dict], operator.add]

    # قسم التحكم
    phase: Literal["hypothesize", "gather_evidence", "evaluate", "synthesize"]
    iteration: int


def hypothesis_agent(state: BlackboardState) -> dict:
    """
    يولد الفرضيات بناءً على المشكلة والأدلة الموجودة.
    يقرأ: problem, constraints, evidence
    يكتب: hypotheses
    """
    problem = state["problem"]
    constraints = state.get("constraints", [])
    evidence = state.get("evidence", [])

    # توليد فرضية بناءً على المعلومات المتاحة
    existing_hypotheses = state.get("hypotheses", [])

    prompt = f"""ولّد فرضية لهذه المشكلة:

المشكلة: {problem}
القيود: {constraints}
الأدلة الموجودة: {[e['finding'] for e in evidence]}
الفرضيات السابقة: {[h['statement'] for h in existing_hypotheses]}

قدم فرضية جديدة لم تُقترح بعد."""

    response = llm.invoke(prompt)

    new_hypothesis = {
        "id": str(uuid.uuid4()),
        "statement": response.content,
        "confidence": 0.5,  # الثقة الأولية
        "supporting_evidence": [],
        "contradicting_evidence": [],
        "created_at": datetime.now().isoformat()
    }

    return {"hypotheses": [new_hypothesis]}


def evidence_gatherer_agent(state: BlackboardState) -> dict:
    """
    يجمع الأدلة المتعلقة بالفرضيات الحالية.
    يقرأ: hypotheses, problem
    يكتب: evidence
    """
    hypotheses = state.get("hypotheses", [])

    if not hypotheses:
        return {"evidence": []}

    # التركيز على أحدث فرضية
    target_hypothesis = hypotheses[-1]

    prompt = f"""اعثر على أدلة متعلقة بهذه الفرضية:

الفرضية: {target_hypothesis['statement']}

قدم أدلة محددة تدعم أو تناقض هذه الفرضية."""

    response = llm.invoke(prompt)

    new_evidence = {
        "id": str(uuid.uuid4()),
        "finding": response.content,
        "related_hypothesis_id": target_hypothesis["id"],
        "type": "supporting",  # أو "contradicting"
        "source": "research",
        "created_at": datetime.now().isoformat()
    }

    return {"evidence": [new_evidence]}


def evaluation_agent(state: BlackboardState) -> dict:
    """
    يقيّم الفرضيات مقابل الأدلة المجمعة.
    يقرأ: hypotheses, evidence
    يكتب: evaluations
    """
    hypotheses = state.get("hypotheses", [])
    evidence = state.get("evidence", [])

    evaluations = []

    for hypothesis in hypotheses:
        # الحصول على الأدلة المتعلقة بهذه الفرضية
        related_evidence = [
            e for e in evidence
            if e.get("related_hypothesis_id") == hypothesis["id"]
        ]

        supporting = [e for e in related_evidence if e["type"] == "supporting"]
        contradicting = [e for e in related_evidence if e["type"] == "contradicting"]

        # حساب الثقة
        if related_evidence:
            confidence = len(supporting) / len(related_evidence)
        else:
            confidence = 0.5

        evaluations.append({
            "hypothesis_id": hypothesis["id"],
            "confidence": confidence,
            "supporting_count": len(supporting),
            "contradicting_count": len(contradicting),
            "recommendation": "accept" if confidence > 0.7 else "reject" if confidence < 0.3 else "investigate",
            "evaluated_at": datetime.now().isoformat()
        })

    return {"evaluations": evaluations}


def synthesis_agent(state: BlackboardState) -> dict:
    """
    يجمع الحل النهائي من الفرضيات المقيّمة.
    يقرأ: hypotheses, evaluations, evidence
    يكتب: current_solution, solution_history
    """
    hypotheses = state.get("hypotheses", [])
    evaluations = state.get("evaluations", [])

    # البحث عن الفرضيات المقبولة
    accepted_ids = {
        e["hypothesis_id"] for e in evaluations
        if e["recommendation"] == "accept"
    }

    accepted_hypotheses = [
        h for h in hypotheses
        if h["id"] in accepted_ids
    ]

    if not accepted_hypotheses:
        return {
            "current_solution": {"status": "no_accepted_hypotheses"},
            "solution_history": []
        }

    solution = {
        "id": str(uuid.uuid4()),
        "accepted_hypotheses": [h["statement"] for h in accepted_hypotheses],
        "synthesis": f"حل قائم على {len(accepted_hypotheses)} فرضية مقبولة",
        "confidence": sum(
            e["confidence"] for e in evaluations
            if e["hypothesis_id"] in accepted_ids
        ) / len(accepted_ids),
        "created_at": datetime.now().isoformat()
    }

    return {
        "current_solution": solution,
        "solution_history": [solution]
    }

بروتوكول الإجماع

عندما يحتاج عدة وكلاء للاتفاق على قرار، نفذ بروتوكول الإجماع.

from typing import TypedDict, Annotated, Literal
from collections import Counter
import operator


class ConsensusState(TypedDict):
    """حالة لإجماع الوكلاء المتعددة."""
    proposal: str
    votes: Annotated[list[dict], operator.add]
    consensus_reached: bool
    final_decision: str
    round: int
    max_rounds: int


def voting_agent(
    agent_name: str,
    expertise: str
) -> callable:
    """
    مصنع لوكلاء التصويت بخبرات مختلفة.
    كل وكيل يصوت بناءً على منظوره.
    """
    def agent(state: ConsensusState) -> dict:
        proposal = state["proposal"]
        existing_votes = state.get("votes", [])
        current_round = state.get("round", 1)

        # التحقق مما إذا كان هذا الوكيل قد صوت بالفعل في هذه الجولة
        my_votes = [
            v for v in existing_votes
            if v["agent"] == agent_name and v["round"] == current_round
        ]

        if my_votes:
            return {"votes": []}  # صوت بالفعل

        # اتخاذ القرار بناءً على الخبرة
        prompt = f"""أنت {agent_name}، خبير في {expertise}.

الاقتراح: {proposal}

الأصوات السابقة في هذه الجولة: {[
    f"{v['agent']}: {v['vote']} ({v['reasoning'][:50]}...)"
    for v in existing_votes
    if v['round'] == current_round
]}

صوت APPROVE أو REJECT أو ABSTAIN مع تبرير موجز.
الصيغة: VOTE: [تصويتك] REASON: [جملة واحدة]"""

        response = llm.invoke(prompt)
        content = response.content

        # تحليل التصويت
        if "APPROVE" in content.upper():
            vote = "approve"
        elif "REJECT" in content.upper():
            vote = "reject"
        else:
            vote = "abstain"

        return {
            "votes": [{
                "agent": agent_name,
                "vote": vote,
                "reasoning": content,
                "round": current_round,
                "timestamp": datetime.now().isoformat()
            }]
        }

    return agent


def consensus_checker(state: ConsensusState) -> dict:
    """
    يتحقق مما إذا تم الوصول إلى الإجماع.
    يتطلب أغلبية 2/3 للموافقة.
    """
    votes = state.get("votes", [])
    current_round = state.get("round", 1)
    max_rounds = state.get("max_rounds", 3)

    # عد الأصوات للجولة الحالية
    round_votes = [v for v in votes if v["round"] == current_round]
    vote_counts = Counter(v["vote"] for v in round_votes)

    total_voters = len(set(v["agent"] for v in round_votes))
    approve_count = vote_counts.get("approve", 0)
    reject_count = vote_counts.get("reject", 0)

    # التحقق من أغلبية 2/3
    threshold = total_voters * 2 / 3

    if approve_count >= threshold:
        return {
            "consensus_reached": True,
            "final_decision": "approved",
            "round": current_round
        }
    elif reject_count >= threshold:
        return {
            "consensus_reached": True,
            "final_decision": "rejected",
            "round": current_round
        }
    elif current_round >= max_rounds:
        # لا إجماع بعد الحد الأقصى من الجولات
        return {
            "consensus_reached": True,
            "final_decision": "no_consensus",
            "round": current_round
        }
    else:
        # نحتاج جولة أخرى
        return {
            "consensus_reached": False,
            "round": current_round + 1
        }


# بناء سير عمل الإجماع
def build_consensus_workflow(agent_configs: list[tuple[str, str]]):
    """
    بناء سير عمل إجماع مع عدة وكلاء تصويت.

    المعاملات:
        agent_configs: قائمة من أزواج (اسم_الوكيل، الخبرة)
    """
    builder = StateGraph(ConsensusState)

    # إضافة وكلاء التصويت
    for name, expertise in agent_configs:
        builder.add_node(f"voter_{name}", voting_agent(name, expertise))

    builder.add_node("check_consensus", consensus_checker)

    # التوصيل: START -> جميع المصوتين -> فحص الإجماع
    for name, _ in agent_configs:
        builder.add_edge(START, f"voter_{name}")
        builder.add_edge(f"voter_{name}", "check_consensus")

    # فحص الإجماع يوجه للخلف أو ينهي
    def route_after_consensus(state: ConsensusState) -> str:
        if state.get("consensus_reached"):
            return END
        else:
            # العودة للمصوت الأول لجولة جديدة
            return f"voter_{agent_configs[0][0]}"

    builder.add_conditional_edges(
        "check_consensus",
        route_after_consensus,
        {END: END, **{f"voter_{name}": f"voter_{name}" for name, _ in agent_configs}}
    )

    return builder.compile()


# مثال الاستخدام
consensus_app = build_consensus_workflow([
    ("tech_lead", "الجدوى التقنية"),
    ("product_manager", "القيمة التجارية"),
    ("security_expert", "الآثار الأمنية"),
])

result = consensus_app.invoke({
    "proposal": "تنفيذ مصادقة SSO باستخدام OAuth 2.0",
    "votes": [],
    "consensus_reached": False,
    "final_decision": "",
    "round": 1,
    "max_rounds": 3
})

نمط الإنتاج: سجل تدقيق التواصل

في أنظمة الإنتاج، حافظ على سجل تدقيق لجميع التواصل بين الوكلاء.

from typing import TypedDict, Annotated
from datetime import datetime
import operator
import json


class AuditableMessage(TypedDict):
    """رسالة مع معلومات سجل تدقيق كامل."""
    id: str
    from_agent: str
    to_agent: str
    message_type: str
    content: str
    timestamp: str

    # حقول التدقيق
    parent_message_id: str | None  # للتسلسل
    conversation_id: str           # تجميع الرسائل ذات الصلة
    step_number: int               # الموقع في سير العمل
    checkpoint_id: str | None      # الربط بنقطة التفتيش


class AuditedState(TypedDict):
    """حالة مع تواصل قابل للتدقيق."""
    task: str
    messages: Annotated[list[AuditableMessage], operator.add]

    # بيانات التدقيق الوصفية
    conversation_id: str
    step_count: int


def create_auditable_message(
    state: AuditedState,
    from_agent: str,
    to_agent: str,
    message_type: str,
    content: str,
    parent_message_id: str = None
) -> AuditableMessage:
    """إنشاء رسالة مع سجل تدقيق."""
    return {
        "id": str(uuid.uuid4()),
        "from_agent": from_agent,
        "to_agent": to_agent,
        "message_type": message_type,
        "content": content,
        "timestamp": datetime.now().isoformat(),
        "parent_message_id": parent_message_id,
        "conversation_id": state.get("conversation_id", "unknown"),
        "step_number": state.get("step_count", 0),
        "checkpoint_id": None  # يُعيّنه حافظ نقاط التفتيش
    }


def audit_communication(
    messages: list[AuditableMessage],
    conversation_id: str
) -> dict:
    """
    إنشاء تقرير تدقيق لمحادثة.
    مفيد للتصحيح والامتثال.
    """
    conv_messages = [
        m for m in messages
        if m["conversation_id"] == conversation_id
    ]

    # الترتيب حسب الطابع الزمني
    conv_messages.sort(key=lambda m: m["timestamp"])

    # بناء رسم بياني للتواصل
    agent_interactions = {}
    for msg in conv_messages:
        key = (msg["from_agent"], msg["to_agent"])
        if key not in agent_interactions:
            agent_interactions[key] = []
        agent_interactions[key].append(msg["message_type"])

    # إنشاء الإحصائيات
    return {
        "conversation_id": conversation_id,
        "total_messages": len(conv_messages),
        "agents_involved": list(set(
            m["from_agent"] for m in conv_messages
        ) | set(
            m["to_agent"] for m in conv_messages
        )),
        "message_types": dict(Counter(m["message_type"] for m in conv_messages)),
        "agent_interactions": {
            f"{k[0]}->{k[1]}": v
            for k, v in agent_interactions.items()
        },
        "timeline": [
            {
                "step": m["step_number"],
                "from": m["from_agent"],
                "to": m["to_agent"],
                "type": m["message_type"],
                "timestamp": m["timestamp"]
            }
            for m in conv_messages
        ],
        "first_message": conv_messages[0]["timestamp"] if conv_messages else None,
        "last_message": conv_messages[-1]["timestamp"] if conv_messages else None
    }


def export_communication_log(
    messages: list[AuditableMessage],
    filepath: str
):
    """تصدير سجل التواصل للتحليل."""
    with open(filepath, "w") as f:
        json.dump(messages, f, indent=2, default=str)

أسئلة المقابلة

س: كيف يتواصل الوكلاء في LangGraph؟

"عبر حقول الحالة المشتركة، ليس الرسائل المباشرة. يكتب الوكلاء إلى حقول الحالة مع المخفضات (مثل Annotated[list, operator.add]) التي تراكم الرسائل بدون الكتابة فوقها. هذا يوفر الاستمرارية التلقائية، وإعادة التشغيل الحتمية، وتصحيح السفر عبر الزمن. نمط لوحة الرسائل هو الأكثر شيوعاً--الوكلاء ينشرون إلى ويقرأون من قائمة رسائل مشتركة."

س: ما الفرق بين رسائل البث والرسائل الموجهة؟

"رسائل البث تستخدم to_agent: 'all' ويقرأها جميع الوكلاء. الرسائل الموجهة تستهدف وكيلاً محدداً (to_agent: 'analyst') ويصفيها المستلم. يصفي الوكلاء الرسائل باستخدام list comprehensions: [m for m in messages if m['to_agent'] in [agent_name, 'all']]. هذا يمكّن كلاً من الإعلانات على مستوى الفريق والتواصل الخاص."

س: كيف تنفذ أنماط الطلب-الاستجابة في LangGraph؟

"استخدم معرفات الربط. الطالب ينشئ request_id فريد، يضمّنه في الرسالة، ويتتبعه في قائمة pending_requests. المستجيب يضمّن response_to: request_id في الاستجابة. الطالب يطابق الاستجابات مع الطلبات المعلقة بهذا المعرف. هذا يحافظ على الربط في التنفيذ عديم الحالة."

س: متى تستخدم النشر-الاشتراك مقابل الرسائل المباشرة؟

"النشر-الاشتراك أفضل للأنظمة ذات الارتباط الضعيف حيث الوكلاء لا يعرفون بعضهم البعض--ينشرون إلى مواضيع ويشتركون في المواضيع التي يهتمون بها. الرسائل المباشرة أفضل لسير العمل ذي الارتباط القوي حيث الوكلاء لديهم علاقات معروفة. النشر-الاشتراك يتوسع بشكل أفضل لكنه يضيف تعقيداً؛ الرسائل المباشرة أبسط لكنها تخلق ارتباطاً قوياً."

س: كيف تنفذ الإجماع بين عدة وكلاء؟

"أنشئ وكلاء تصويت كل منهم يدلي بصوت مع تبرير. فاحص الإجماع يجمع الأصوات ويحدد إذا تم تحقيق العتبة (عادةً أغلبية 2/3). إذا لم يتحقق، زِد الجولة واستمر في التصويت. تتبع الأصوات بأرقام الجولات للتمييز بين جولات التصويت. عيّن max_rounds لمنع الحلقات اللانهائية."


النقاط الرئيسية

  • الحالة هي قناة التواصل - الوكلاء يقرأون من ويكتبون إلى حالة مشتركة، ليس رسائل مباشرة
  • المخفضات تمكّن التراكم - Annotated[list, operator.add] يلحق بدون الكتابة فوق
  • بنية الرسالة مهمة - تضمين from, to, type, content, والطوابع الزمنية
  • معرفات الربط للطلب-الاستجابة - ربط الطلبات بالاستجابات بمعرفات فريدة
  • النشر-الاشتراك للارتباط الضعيف - الرسائل القائمة على المواضيع تفصل الوكلاء
  • السبورة للتعاون المنظم - أقسام مصنفة لفئات بيانات مختلفة
  • الإجماع للقرارات الجماعية - جولات التصويت مع عتبات وحد أقصى للجولات
  • سجلات التدقيق للإنتاج - تتبع conversation_id, step_number, parent_message_id

:::

اختبار

الوحدة 4: أنظمة الوكلاء المتعددة مع LangGraph

خذ الاختبار