Multi-Agent System Design
Agent Communication
3 min read
When multiple agents work together, they need effective communication patterns. This lesson covers how agents share information, coordinate, and collaborate.
Communication Patterns
1. Message Passing
Direct communication between agents:
from dataclasses import dataclass
from typing import Any
import asyncio
@dataclass
class AgentMessage:
sender: str
receiver: str
content: Any
message_type: str # "request", "response", "broadcast"
correlation_id: str = None # For request-response pairing
class MessageBus:
def __init__(self):
self.queues = {} # agent_id -> asyncio.Queue
self.handlers = {} # agent_id -> callback
def register_agent(self, agent_id: str):
self.queues[agent_id] = asyncio.Queue()
async def send(self, message: AgentMessage):
"""Send message to specific agent."""
if message.receiver in self.queues:
await self.queues[message.receiver].put(message)
async def broadcast(self, sender: str, content: Any, exclude: list = None):
"""Send to all agents except excluded."""
exclude = exclude or []
for agent_id, queue in self.queues.items():
if agent_id != sender and agent_id not in exclude:
await queue.put(AgentMessage(
sender=sender,
receiver=agent_id,
content=content,
message_type="broadcast"
))
async def receive(self, agent_id: str, timeout: float = None) -> AgentMessage:
"""Receive next message for agent."""
try:
return await asyncio.wait_for(
self.queues[agent_id].get(),
timeout=timeout
)
except asyncio.TimeoutError:
return None
2. Shared Memory
Agents read/write to common state:
class SharedBlackboard:
def __init__(self):
self.data = {}
self.locks = {}
self.watchers = {} # key -> list of callbacks
async def write(self, key: str, value: Any, agent_id: str):
"""Write to blackboard with lock."""
if key not in self.locks:
self.locks[key] = asyncio.Lock()
async with self.locks[key]:
self.data[key] = {
"value": value,
"written_by": agent_id,
"timestamp": datetime.utcnow()
}
# Notify watchers
if key in self.watchers:
for callback in self.watchers[key]:
await callback(key, value, agent_id)
async def read(self, key: str) -> Any:
"""Read from blackboard."""
entry = self.data.get(key)
return entry["value"] if entry else None
def watch(self, key: str, callback):
"""Register callback for key changes."""
if key not in self.watchers:
self.watchers[key] = []
self.watchers[key].append(callback)
# Usage
blackboard = SharedBlackboard()
# Research agent writes findings
await blackboard.write("research_results", findings, "research_agent")
# Summary agent watches for results
blackboard.watch("research_results", summary_agent.on_research_complete)
3. Event-Driven
Agents react to events:
class EventEmitter:
def __init__(self):
self.listeners = {}
def on(self, event: str, callback):
"""Register event listener."""
if event not in self.listeners:
self.listeners[event] = []
self.listeners[event].append(callback)
async def emit(self, event: str, data: Any):
"""Emit event to all listeners."""
if event in self.listeners:
tasks = [callback(data) for callback in self.listeners[event]]
await asyncio.gather(*tasks)
class EventDrivenAgent:
def __init__(self, agent_id: str, emitter: EventEmitter):
self.id = agent_id
self.emitter = emitter
async def start(self):
"""Register event handlers."""
self.emitter.on("task_assigned", self.handle_task)
self.emitter.on("task_completed", self.handle_completion)
async def handle_task(self, data):
if data["assigned_to"] == self.id:
result = await self.process_task(data)
await self.emitter.emit("task_completed", {
"task_id": data["task_id"],
"result": result,
"completed_by": self.id
})
Coordination Patterns
Handoff Protocol
Transfer work between agents:
class HandoffManager:
def __init__(self, message_bus: MessageBus):
self.bus = message_bus
self.pending_handoffs = {}
async def initiate_handoff(
self,
from_agent: str,
to_agent: str,
context: dict,
reason: str
) -> str:
"""Start handoff process."""
handoff_id = str(uuid.uuid4())
self.pending_handoffs[handoff_id] = {
"from": from_agent,
"to": to_agent,
"status": "pending",
"context": context
}
await self.bus.send(AgentMessage(
sender=from_agent,
receiver=to_agent,
content={
"handoff_id": handoff_id,
"context": context,
"reason": reason
},
message_type="handoff_request"
))
return handoff_id
async def accept_handoff(self, handoff_id: str, agent_id: str):
"""Accept and begin working on handoff."""
handoff = self.pending_handoffs.get(handoff_id)
if handoff and handoff["to"] == agent_id:
handoff["status"] = "accepted"
# Notify original agent
await self.bus.send(AgentMessage(
sender=agent_id,
receiver=handoff["from"],
content={"handoff_id": handoff_id, "status": "accepted"},
message_type="handoff_ack"
))
Consensus Protocol
Multiple agents agree on a decision:
class ConsensusManager:
def __init__(self, agents: list, threshold: float = 0.66):
self.agents = agents
self.threshold = threshold
self.votes = {}
async def propose(self, proposal_id: str, proposal: Any) -> bool:
"""Get consensus on a proposal."""
self.votes[proposal_id] = {}
# Collect votes from all agents
tasks = [
agent.vote(proposal_id, proposal)
for agent in self.agents
]
results = await asyncio.gather(*tasks)
# Count votes
approvals = sum(1 for r in results if r["vote"] == "approve")
approval_rate = approvals / len(self.agents)
return approval_rate >= self.threshold
Best Practices
| Pattern | Use When | Avoid When |
|---|---|---|
| Message Passing | Point-to-point communication | Many receivers |
| Shared Memory | Multiple readers, few writers | High contention |
| Event-Driven | Loose coupling needed | Strict ordering required |
| Handoff | Sequential processing | Parallel work |
| Consensus | Critical decisions | Time-sensitive operations |
Interview Tip
When discussing agent communication:
- Failure modes - What if a message is lost?
- Ordering - Does order matter?
- Latency - Sync vs async trade-offs
- Debugging - How do you trace issues?
Now let's move to production concerns—monitoring, safety, and reliability. :::