Multi-Agent Architecture
Communication Patterns
Multi-agent systems need reliable ways to exchange information. The communication pattern you choose affects everything from latency to debugging complexity.
Three Core Patterns
1. Message Passing
Agents communicate through discrete messages, like sending emails.
# Message passing with queues
from dataclasses import dataclass
from typing import Any
import asyncio
@dataclass
class AgentMessage:
sender: str
receiver: str
content: Any
message_type: str # "request", "response", "broadcast"
class MessageBus:
def __init__(self):
self.queues: dict[str, asyncio.Queue] = {}
async def send(self, message: AgentMessage):
if message.receiver not in self.queues:
self.queues[message.receiver] = asyncio.Queue()
await self.queues[message.receiver].put(message)
async def receive(self, agent_id: str) -> AgentMessage:
if agent_id not in self.queues:
self.queues[agent_id] = asyncio.Queue()
return await self.queues[agent_id].get()
Best for: Loosely coupled agents, async operations, audit trails
2. Shared State
Agents read and write to a common memory store.
# Shared state with thread-safe access
import threading
class SharedState:
def __init__(self):
self._state = {}
self._lock = threading.RLock()
def read(self, key: str) -> Any:
with self._lock:
return self._state.get(key)
def write(self, key: str, value: Any, agent_id: str):
with self._lock:
self._state[key] = {
"value": value,
"updated_by": agent_id,
"timestamp": time.time()
}
def get_snapshot(self) -> dict:
with self._lock:
return dict(self._state)
Best for: Real-time collaboration, simple coordination, small agent teams
3. Event-Driven
Agents publish events; interested agents subscribe and react.
# Event-driven communication
class EventBus:
def __init__(self):
self.subscribers: dict[str, list[callable]] = {}
def subscribe(self, event_type: str, handler: callable):
if event_type not in self.subscribers:
self.subscribers[event_type] = []
self.subscribers[event_type].append(handler)
def publish(self, event_type: str, data: Any):
for handler in self.subscribers.get(event_type, []):
handler(data)
# Usage
bus = EventBus()
bus.subscribe("research_complete", writer_agent.on_research_ready)
bus.subscribe("draft_ready", reviewer_agent.on_draft_ready)
Best for: Reactive workflows, decoupled systems, scalable architectures
Pattern Comparison
| Pattern | Latency | Coupling | Debugging | Scale |
|---|---|---|---|---|
| Message Passing | Medium | Low | Easy (logs) | High |
| Shared State | Low | High | Hard | Low |
| Event-Driven | Low | Low | Medium | High |
Hybrid Approaches
Production systems often combine patterns:
- Shared state for current task context
- Message passing for agent-to-agent requests
- Events for system-wide notifications
Nerd Note: Most framework bugs happen at communication boundaries. Log every message in development.
Next: How to coordinate these communicating agents. :::