Lesson 3 of 20

Multi-Agent Architecture

Orchestration Strategies

4 min read

How do you coordinate multiple agents? The orchestration strategy determines who decides what happens next and how work flows through the system.

Three Main Approaches

1. Hierarchical (Manager-Workers)

A central manager agent assigns tasks and coordinates workers.

# Hierarchical orchestration with CrewAI
from crewai import Crew, Process

crew = Crew(
    agents=[manager, researcher, writer, reviewer],
    tasks=[research_task, write_task, review_task],
    process=Process.hierarchical,  # Manager controls flow
    manager_agent=manager
)

# The manager decides:
# - Which agent handles each task
# - When to move to next task
# - How to handle failures

Pros: Clear accountability, predictable flow, easy to debug Cons: Manager bottleneck, single point of failure

2. Peer-to-Peer (Collaborative)

Agents communicate directly without a central coordinator.

# Peer-to-peer with direct handoffs
class PeerAgent:
    def __init__(self, name: str, peers: dict):
        self.name = name
        self.peers = peers  # name -> agent instance

    async def process(self, task: dict) -> dict:
        result = await self.do_work(task)

        # Decide next peer based on result
        if result["needs_review"]:
            return await self.peers["reviewer"].process(result)
        elif result["needs_more_data"]:
            return await self.peers["researcher"].process(result)
        else:
            return result

Pros: No bottleneck, resilient, flexible Cons: Complex coordination, harder to trace, potential loops

3. Swarm (Emergent Behavior)

Agents act independently following simple rules; complex behavior emerges.

# Swarm-like orchestration
class SwarmAgent:
    def __init__(self, role: str, shared_board: TaskBoard):
        self.role = role
        self.board = shared_board

    async def run(self):
        while True:
            # Each agent picks tasks matching their role
            task = await self.board.claim_task(
                filter=lambda t: t.type == self.role
            )
            if not task:
                await asyncio.sleep(1)
                continue

            result = await self.process(task)

            # Post results or new tasks to board
            if result.creates_new_tasks:
                for new_task in result.new_tasks:
                    await self.board.post_task(new_task)

            await self.board.complete_task(task.id, result)

Pros: Highly scalable, self-organizing, fault tolerant Cons: Unpredictable, hard to reason about, debugging nightmare

Choosing Your Strategy

Scenario Best Strategy
Well-defined workflow Hierarchical
Dynamic, changing tasks Peer-to-Peer
Massive parallelism Swarm
Need audit trail Hierarchical
Real-time adaptation Peer-to-Peer
Research/exploration Swarm

Hybrid Example: Supervised Swarm

# Supervisor + swarm workers
class SupervisedSwarm:
    def __init__(self, supervisor: Agent, workers: list[SwarmAgent]):
        self.supervisor = supervisor
        self.workers = workers
        self.board = TaskBoard()

    async def execute(self, goal: str):
        # Supervisor breaks down goal into tasks
        tasks = await self.supervisor.decompose(goal)
        for task in tasks:
            await self.board.post_task(task)

        # Workers swarm on tasks
        worker_tasks = [w.run() for w in self.workers]

        # Supervisor monitors and intervenes if needed
        while not self.board.all_complete():
            status = self.board.get_status()
            if status.stuck_tasks:
                await self.supervisor.unstick(status.stuck_tasks)
            await asyncio.sleep(5)

Nerd Note: Start hierarchical, evolve to peer-to-peer as you understand your domain. Swarm only for truly parallelizable problems.

Next: Microsoft AutoGen's unique approach to multi-agent conversations. :::

Quiz

Module 1: Multi-Agent Architecture

Take Quiz