الدرس 16 من 23

تصميم أنظمة الوكلاء المتعددين

تواصل الوكلاء

3 دقيقة للقراءة

عندما يعمل عدة وكلاء معاً، يحتاجون أنماط تواصل فعالة. يغطي هذا الدرس كيف يتشارك الوكلاء المعلومات وينسقون ويتعاونون.

أنماط التواصل

1. تمرير الرسائل

التواصل المباشر بين الوكلاء:

from dataclasses import dataclass
from typing import Any
import asyncio

@dataclass
class AgentMessage:
    sender: str
    receiver: str
    content: Any
    message_type: str  # "request", "response", "broadcast"
    correlation_id: str = None  # لربط الطلب-الاستجابة

class MessageBus:
    def __init__(self):
        self.queues = {}  # معرف_الوكيل -> asyncio.Queue
        self.handlers = {}  # معرف_الوكيل -> callback

    def register_agent(self, agent_id: str):
        self.queues[agent_id] = asyncio.Queue()

    async def send(self, message: AgentMessage):
        """إرسال رسالة لوكيل محدد."""
        if message.receiver in self.queues:
            await self.queues[message.receiver].put(message)

    async def broadcast(self, sender: str, content: Any, exclude: list = None):
        """الإرسال لجميع الوكلاء باستثناء المستبعدين."""
        exclude = exclude or []
        for agent_id, queue in self.queues.items():
            if agent_id != sender and agent_id not in exclude:
                await queue.put(AgentMessage(
                    sender=sender,
                    receiver=agent_id,
                    content=content,
                    message_type="broadcast"
                ))

    async def receive(self, agent_id: str, timeout: float = None) -> AgentMessage:
        """استلام الرسالة التالية للوكيل."""
        try:
            return await asyncio.wait_for(
                self.queues[agent_id].get(),
                timeout=timeout
            )
        except asyncio.TimeoutError:
            return None

2. الذاكرة المشتركة

الوكلاء يقرؤون/يكتبون إلى حالة مشتركة:

class SharedBlackboard:
    def __init__(self):
        self.data = {}
        self.locks = {}
        self.watchers = {}  # المفتاح -> قائمة callbacks

    async def write(self, key: str, value: Any, agent_id: str):
        """الكتابة للسبورة مع قفل."""
        if key not in self.locks:
            self.locks[key] = asyncio.Lock()

        async with self.locks[key]:
            self.data[key] = {
                "value": value,
                "written_by": agent_id,
                "timestamp": datetime.utcnow()
            }

            # إعلام المراقبين
            if key in self.watchers:
                for callback in self.watchers[key]:
                    await callback(key, value, agent_id)

    async def read(self, key: str) -> Any:
        """القراءة من السبورة."""
        entry = self.data.get(key)
        return entry["value"] if entry else None

    def watch(self, key: str, callback):
        """تسجيل callback لتغييرات المفتاح."""
        if key not in self.watchers:
            self.watchers[key] = []
        self.watchers[key].append(callback)

# الاستخدام
blackboard = SharedBlackboard()

# وكيل البحث يكتب النتائج
await blackboard.write("research_results", findings, "research_agent")

# وكيل التلخيص يراقب النتائج
blackboard.watch("research_results", summary_agent.on_research_complete)

3. مدفوع بالأحداث

الوكلاء يتفاعلون مع الأحداث:

class EventEmitter:
    def __init__(self):
        self.listeners = {}

    def on(self, event: str, callback):
        """تسجيل مستمع للأحداث."""
        if event not in self.listeners:
            self.listeners[event] = []
        self.listeners[event].append(callback)

    async def emit(self, event: str, data: Any):
        """إصدار حدث لجميع المستمعين."""
        if event in self.listeners:
            tasks = [callback(data) for callback in self.listeners[event]]
            await asyncio.gather(*tasks)

class EventDrivenAgent:
    def __init__(self, agent_id: str, emitter: EventEmitter):
        self.id = agent_id
        self.emitter = emitter

    async def start(self):
        """تسجيل معالجات الأحداث."""
        self.emitter.on("task_assigned", self.handle_task)
        self.emitter.on("task_completed", self.handle_completion)

    async def handle_task(self, data):
        if data["assigned_to"] == self.id:
            result = await self.process_task(data)
            await self.emitter.emit("task_completed", {
                "task_id": data["task_id"],
                "result": result,
                "completed_by": self.id
            })

أنماط التنسيق

بروتوكول التسليم

نقل العمل بين الوكلاء:

class HandoffManager:
    def __init__(self, message_bus: MessageBus):
        self.bus = message_bus
        self.pending_handoffs = {}

    async def initiate_handoff(
        self,
        from_agent: str,
        to_agent: str,
        context: dict,
        reason: str
    ) -> str:
        """بدء عملية التسليم."""
        handoff_id = str(uuid.uuid4())

        self.pending_handoffs[handoff_id] = {
            "from": from_agent,
            "to": to_agent,
            "status": "pending",
            "context": context
        }

        await self.bus.send(AgentMessage(
            sender=from_agent,
            receiver=to_agent,
            content={
                "handoff_id": handoff_id,
                "context": context,
                "reason": reason
            },
            message_type="handoff_request"
        ))

        return handoff_id

    async def accept_handoff(self, handoff_id: str, agent_id: str):
        """قبول والبدء بالعمل على التسليم."""
        handoff = self.pending_handoffs.get(handoff_id)
        if handoff and handoff["to"] == agent_id:
            handoff["status"] = "accepted"

            # إعلام الوكيل الأصلي
            await self.bus.send(AgentMessage(
                sender=agent_id,
                receiver=handoff["from"],
                content={"handoff_id": handoff_id, "status": "accepted"},
                message_type="handoff_ack"
            ))

بروتوكول الإجماع

عدة وكلاء يتفقون على قرار:

class ConsensusManager:
    def __init__(self, agents: list, threshold: float = 0.66):
        self.agents = agents
        self.threshold = threshold
        self.votes = {}

    async def propose(self, proposal_id: str, proposal: Any) -> bool:
        """الحصول على إجماع على اقتراح."""
        self.votes[proposal_id] = {}

        # جمع الأصوات من جميع الوكلاء
        tasks = [
            agent.vote(proposal_id, proposal)
            for agent in self.agents
        ]
        results = await asyncio.gather(*tasks)

        # عد الأصوات
        approvals = sum(1 for r in results if r["vote"] == "approve")
        approval_rate = approvals / len(self.agents)

        return approval_rate >= self.threshold

أفضل الممارسات

النمط استخدمه عندما تجنبه عندما
تمرير الرسائل تواصل نقطة لنقطة مستلمين كثر
الذاكرة المشتركة قراء متعددون، كتّاب قليلون تنافس عالي
مدفوع بالأحداث الحاجة لاقتران فضفاض الحاجة لترتيب صارم
التسليم معالجة تسلسلية عمل متوازي
الإجماع قرارات حرجة عمليات حساسة للوقت

نصيحة للمقابلة

عند مناقشة تواصل الوكلاء:

  1. أوضاع الفشل - ماذا لو ضاعت رسالة؟
  2. الترتيب - هل يهم الترتيب؟
  3. زمن الاستجابة - مقايضات متزامن مقابل غير متزامن
  4. التصحيح - كيف تتبع المشاكل؟

الآن لننتقل لاهتمامات الإنتاج—المراقبة والسلامة والموثوقية. :::

اختبار

الوحدة 4: تصميم أنظمة الوكلاء المتعددين

خذ الاختبار