تصميم أنظمة الوكلاء المتعددين
تواصل الوكلاء
3 دقيقة للقراءة
عندما يعمل عدة وكلاء معاً، يحتاجون أنماط تواصل فعالة. يغطي هذا الدرس كيف يتشارك الوكلاء المعلومات وينسقون ويتعاونون.
أنماط التواصل
1. تمرير الرسائل
التواصل المباشر بين الوكلاء:
from dataclasses import dataclass
from typing import Any
import asyncio
@dataclass
class AgentMessage:
sender: str
receiver: str
content: Any
message_type: str # "request", "response", "broadcast"
correlation_id: str = None # لربط الطلب-الاستجابة
class MessageBus:
def __init__(self):
self.queues = {} # معرف_الوكيل -> asyncio.Queue
self.handlers = {} # معرف_الوكيل -> callback
def register_agent(self, agent_id: str):
self.queues[agent_id] = asyncio.Queue()
async def send(self, message: AgentMessage):
"""إرسال رسالة لوكيل محدد."""
if message.receiver in self.queues:
await self.queues[message.receiver].put(message)
async def broadcast(self, sender: str, content: Any, exclude: list = None):
"""الإرسال لجميع الوكلاء باستثناء المستبعدين."""
exclude = exclude or []
for agent_id, queue in self.queues.items():
if agent_id != sender and agent_id not in exclude:
await queue.put(AgentMessage(
sender=sender,
receiver=agent_id,
content=content,
message_type="broadcast"
))
async def receive(self, agent_id: str, timeout: float = None) -> AgentMessage:
"""استلام الرسالة التالية للوكيل."""
try:
return await asyncio.wait_for(
self.queues[agent_id].get(),
timeout=timeout
)
except asyncio.TimeoutError:
return None
2. الذاكرة المشتركة
الوكلاء يقرؤون/يكتبون إلى حالة مشتركة:
class SharedBlackboard:
def __init__(self):
self.data = {}
self.locks = {}
self.watchers = {} # المفتاح -> قائمة callbacks
async def write(self, key: str, value: Any, agent_id: str):
"""الكتابة للسبورة مع قفل."""
if key not in self.locks:
self.locks[key] = asyncio.Lock()
async with self.locks[key]:
self.data[key] = {
"value": value,
"written_by": agent_id,
"timestamp": datetime.utcnow()
}
# إعلام المراقبين
if key in self.watchers:
for callback in self.watchers[key]:
await callback(key, value, agent_id)
async def read(self, key: str) -> Any:
"""القراءة من السبورة."""
entry = self.data.get(key)
return entry["value"] if entry else None
def watch(self, key: str, callback):
"""تسجيل callback لتغييرات المفتاح."""
if key not in self.watchers:
self.watchers[key] = []
self.watchers[key].append(callback)
# الاستخدام
blackboard = SharedBlackboard()
# وكيل البحث يكتب النتائج
await blackboard.write("research_results", findings, "research_agent")
# وكيل التلخيص يراقب النتائج
blackboard.watch("research_results", summary_agent.on_research_complete)
3. مدفوع بالأحداث
الوكلاء يتفاعلون مع الأحداث:
class EventEmitter:
def __init__(self):
self.listeners = {}
def on(self, event: str, callback):
"""تسجيل مستمع للأحداث."""
if event not in self.listeners:
self.listeners[event] = []
self.listeners[event].append(callback)
async def emit(self, event: str, data: Any):
"""إصدار حدث لجميع المستمعين."""
if event in self.listeners:
tasks = [callback(data) for callback in self.listeners[event]]
await asyncio.gather(*tasks)
class EventDrivenAgent:
def __init__(self, agent_id: str, emitter: EventEmitter):
self.id = agent_id
self.emitter = emitter
async def start(self):
"""تسجيل معالجات الأحداث."""
self.emitter.on("task_assigned", self.handle_task)
self.emitter.on("task_completed", self.handle_completion)
async def handle_task(self, data):
if data["assigned_to"] == self.id:
result = await self.process_task(data)
await self.emitter.emit("task_completed", {
"task_id": data["task_id"],
"result": result,
"completed_by": self.id
})
أنماط التنسيق
بروتوكول التسليم
نقل العمل بين الوكلاء:
class HandoffManager:
def __init__(self, message_bus: MessageBus):
self.bus = message_bus
self.pending_handoffs = {}
async def initiate_handoff(
self,
from_agent: str,
to_agent: str,
context: dict,
reason: str
) -> str:
"""بدء عملية التسليم."""
handoff_id = str(uuid.uuid4())
self.pending_handoffs[handoff_id] = {
"from": from_agent,
"to": to_agent,
"status": "pending",
"context": context
}
await self.bus.send(AgentMessage(
sender=from_agent,
receiver=to_agent,
content={
"handoff_id": handoff_id,
"context": context,
"reason": reason
},
message_type="handoff_request"
))
return handoff_id
async def accept_handoff(self, handoff_id: str, agent_id: str):
"""قبول والبدء بالعمل على التسليم."""
handoff = self.pending_handoffs.get(handoff_id)
if handoff and handoff["to"] == agent_id:
handoff["status"] = "accepted"
# إعلام الوكيل الأصلي
await self.bus.send(AgentMessage(
sender=agent_id,
receiver=handoff["from"],
content={"handoff_id": handoff_id, "status": "accepted"},
message_type="handoff_ack"
))
بروتوكول الإجماع
عدة وكلاء يتفقون على قرار:
class ConsensusManager:
def __init__(self, agents: list, threshold: float = 0.66):
self.agents = agents
self.threshold = threshold
self.votes = {}
async def propose(self, proposal_id: str, proposal: Any) -> bool:
"""الحصول على إجماع على اقتراح."""
self.votes[proposal_id] = {}
# جمع الأصوات من جميع الوكلاء
tasks = [
agent.vote(proposal_id, proposal)
for agent in self.agents
]
results = await asyncio.gather(*tasks)
# عد الأصوات
approvals = sum(1 for r in results if r["vote"] == "approve")
approval_rate = approvals / len(self.agents)
return approval_rate >= self.threshold
أفضل الممارسات
| النمط | استخدمه عندما | تجنبه عندما |
|---|---|---|
| تمرير الرسائل | تواصل نقطة لنقطة | مستلمين كثر |
| الذاكرة المشتركة | قراء متعددون، كتّاب قليلون | تنافس عالي |
| مدفوع بالأحداث | الحاجة لاقتران فضفاض | الحاجة لترتيب صارم |
| التسليم | معالجة تسلسلية | عمل متوازي |
| الإجماع | قرارات حرجة | عمليات حساسة للوقت |
نصيحة للمقابلة
عند مناقشة تواصل الوكلاء:
- أوضاع الفشل - ماذا لو ضاعت رسالة؟
- الترتيب - هل يهم الترتيب؟
- زمن الاستجابة - مقايضات متزامن مقابل غير متزامن
- التصحيح - كيف تتبع المشاكل؟
الآن لننتقل لاهتمامات الإنتاج—المراقبة والسلامة والموثوقية. :::