Multi-Agent Architecture
Orchestration Strategies
How do you coordinate multiple agents? The orchestration strategy determines who decides what happens next and how work flows through the system.
Three Main Approaches
1. Hierarchical (Manager-Workers)
A central manager agent assigns tasks and coordinates workers.
# Hierarchical orchestration with CrewAI
from crewai import Crew, Process
crew = Crew(
agents=[manager, researcher, writer, reviewer],
tasks=[research_task, write_task, review_task],
process=Process.hierarchical, # Manager controls flow
manager_agent=manager
)
# The manager decides:
# - Which agent handles each task
# - When to move to next task
# - How to handle failures
Pros: Clear accountability, predictable flow, easy to debug Cons: Manager bottleneck, single point of failure
2. Peer-to-Peer (Collaborative)
Agents communicate directly without a central coordinator.
# Peer-to-peer with direct handoffs
class PeerAgent:
def __init__(self, name: str, peers: dict):
self.name = name
self.peers = peers # name -> agent instance
async def process(self, task: dict) -> dict:
result = await self.do_work(task)
# Decide next peer based on result
if result["needs_review"]:
return await self.peers["reviewer"].process(result)
elif result["needs_more_data"]:
return await self.peers["researcher"].process(result)
else:
return result
Pros: No bottleneck, resilient, flexible Cons: Complex coordination, harder to trace, potential loops
3. Swarm (Emergent Behavior)
Agents act independently following simple rules; complex behavior emerges.
# Swarm-like orchestration
class SwarmAgent:
def __init__(self, role: str, shared_board: TaskBoard):
self.role = role
self.board = shared_board
async def run(self):
while True:
# Each agent picks tasks matching their role
task = await self.board.claim_task(
filter=lambda t: t.type == self.role
)
if not task:
await asyncio.sleep(1)
continue
result = await self.process(task)
# Post results or new tasks to board
if result.creates_new_tasks:
for new_task in result.new_tasks:
await self.board.post_task(new_task)
await self.board.complete_task(task.id, result)
Pros: Highly scalable, self-organizing, fault tolerant Cons: Unpredictable, hard to reason about, debugging nightmare
Choosing Your Strategy
| Scenario | Best Strategy |
|---|---|
| Well-defined workflow | Hierarchical |
| Dynamic, changing tasks | Peer-to-Peer |
| Massive parallelism | Swarm |
| Need audit trail | Hierarchical |
| Real-time adaptation | Peer-to-Peer |
| Research/exploration | Swarm |
Hybrid Example: Supervised Swarm
# Supervisor + swarm workers
class SupervisedSwarm:
def __init__(self, supervisor: Agent, workers: list[SwarmAgent]):
self.supervisor = supervisor
self.workers = workers
self.board = TaskBoard()
async def execute(self, goal: str):
# Supervisor breaks down goal into tasks
tasks = await self.supervisor.decompose(goal)
for task in tasks:
await self.board.post_task(task)
# Workers swarm on tasks
worker_tasks = [w.run() for w in self.workers]
# Supervisor monitors and intervenes if needed
while not self.board.all_complete():
status = self.board.get_status()
if status.stuck_tasks:
await self.supervisor.unstick(status.stuck_tasks)
await asyncio.sleep(5)
Nerd Note: Start hierarchical, evolve to peer-to-peer as you understand your domain. Swarm only for truly parallelizable problems.
Next: Microsoft AutoGen's unique approach to multi-agent conversations. :::