Lesson 2 of 20

Multi-Agent Architecture

Communication Patterns

4 min read

Multi-agent systems need reliable ways to exchange information. The communication pattern you choose affects everything from latency to debugging complexity.

Three Core Patterns

1. Message Passing

Agents communicate through discrete messages, like sending emails.

# Message passing with queues
from dataclasses import dataclass
from typing import Any
import asyncio

@dataclass
class AgentMessage:
    sender: str
    receiver: str
    content: Any
    message_type: str  # "request", "response", "broadcast"

class MessageBus:
    def __init__(self):
        self.queues: dict[str, asyncio.Queue] = {}

    async def send(self, message: AgentMessage):
        if message.receiver not in self.queues:
            self.queues[message.receiver] = asyncio.Queue()
        await self.queues[message.receiver].put(message)

    async def receive(self, agent_id: str) -> AgentMessage:
        if agent_id not in self.queues:
            self.queues[agent_id] = asyncio.Queue()
        return await self.queues[agent_id].get()

Best for: Loosely coupled agents, async operations, audit trails

2. Shared State

Agents read and write to a common memory store.

# Shared state with thread-safe access
import threading

class SharedState:
    def __init__(self):
        self._state = {}
        self._lock = threading.RLock()

    def read(self, key: str) -> Any:
        with self._lock:
            return self._state.get(key)

    def write(self, key: str, value: Any, agent_id: str):
        with self._lock:
            self._state[key] = {
                "value": value,
                "updated_by": agent_id,
                "timestamp": time.time()
            }

    def get_snapshot(self) -> dict:
        with self._lock:
            return dict(self._state)

Best for: Real-time collaboration, simple coordination, small agent teams

3. Event-Driven

Agents publish events; interested agents subscribe and react.

# Event-driven communication
class EventBus:
    def __init__(self):
        self.subscribers: dict[str, list[callable]] = {}

    def subscribe(self, event_type: str, handler: callable):
        if event_type not in self.subscribers:
            self.subscribers[event_type] = []
        self.subscribers[event_type].append(handler)

    def publish(self, event_type: str, data: Any):
        for handler in self.subscribers.get(event_type, []):
            handler(data)

# Usage
bus = EventBus()
bus.subscribe("research_complete", writer_agent.on_research_ready)
bus.subscribe("draft_ready", reviewer_agent.on_draft_ready)

Best for: Reactive workflows, decoupled systems, scalable architectures

Pattern Comparison

Pattern Latency Coupling Debugging Scale
Message Passing Medium Low Easy (logs) High
Shared State Low High Hard Low
Event-Driven Low Low Medium High

Hybrid Approaches

Production systems often combine patterns:

  • Shared state for current task context
  • Message passing for agent-to-agent requests
  • Events for system-wide notifications

Nerd Note: Most framework bugs happen at communication boundaries. Log every message in development.

Next: How to coordinate these communicating agents. :::

Quiz

Module 1: Multi-Agent Architecture

Take Quiz