Multi-Agent Systems with LangGraph

Inter-Agent Communication

5 min read

In multi-agent systems, how agents share information determines system effectiveness. LangGraph uses state-based communication rather than direct messaging--agents read from and write to shared state fields. This lesson covers communication patterns from simple message passing to complex coordination protocols.


Why State-Based Communication?

Traditional multi-agent systems use message queues or direct RPC calls. LangGraph takes a different approach: all communication happens through state.

# Traditional approach (NOT how LangGraph works)
# agent_a.send_message(agent_b, data)  # Direct messaging

# LangGraph approach: Communication through state
class TeamState(TypedDict):
    """Shared state is the communication medium."""
    # Task context - read by all agents
    task: str

    # Message board - agents append messages here
    messages: Annotated[list[dict], operator.add]

    # Agent outputs - written by specific agents
    research_output: str
    analysis_output: str
    final_report: str

Benefits of state-based communication:

Benefit Explanation
Automatic persistence Messages saved with checkpoints
Time-travel debugging Replay any point in conversation
Deterministic replay Same state produces same behavior
No message loss State is always consistent
Simple testing Mock state, not message queues

Message Board Pattern

The most common pattern is a shared message board where agents post and read messages.

from typing import TypedDict, Annotated, Literal
from datetime import datetime
import operator
import uuid
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, START, END

# ============================================================
# STATE DEFINITION WITH MESSAGE BOARD
# ============================================================

class Message(TypedDict):
    """Structured message format for agent communication."""
    id: str                    # Unique message ID
    from_agent: str            # Sender agent name
    to_agent: str              # Recipient ("all" for broadcast)
    message_type: str          # Category: finding, request, response, feedback
    content: str               # Message content
    timestamp: str             # ISO format timestamp
    metadata: dict             # Additional context


class CollaborativeState(TypedDict):
    """State with shared message board for agent communication."""
    # Input
    task: str

    # Message board - the communication channel
    messages: Annotated[list[Message], operator.add]

    # Agent outputs
    research_findings: str
    analysis_result: str
    final_output: str

    # Workflow control
    iteration: int
    max_iterations: int


# ============================================================
# HELPER FUNCTIONS
# ============================================================

def create_message(
    from_agent: str,
    to_agent: str,
    message_type: str,
    content: str,
    metadata: dict = None
) -> Message:
    """Create a standardized message."""
    return {
        "id": str(uuid.uuid4()),
        "from_agent": from_agent,
        "to_agent": to_agent,
        "message_type": message_type,
        "content": content,
        "timestamp": datetime.now().isoformat(),
        "metadata": metadata or {}
    }


def get_messages_for_agent(
    messages: list[Message],
    agent_name: str,
    message_type: str = None
) -> list[Message]:
    """Filter messages addressed to a specific agent."""
    filtered = [
        m for m in messages
        if m["to_agent"] in [agent_name, "all"]
    ]

    if message_type:
        filtered = [m for m in filtered if m["message_type"] == message_type]

    return filtered


def get_latest_message(
    messages: list[Message],
    from_agent: str = None,
    message_type: str = None
) -> Message | None:
    """Get the most recent message matching criteria."""
    filtered = messages

    if from_agent:
        filtered = [m for m in filtered if m["from_agent"] == from_agent]

    if message_type:
        filtered = [m for m in filtered if m["message_type"] == message_type]

    return filtered[-1] if filtered else None


# ============================================================
# AGENT IMPLEMENTATIONS
# ============================================================

llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)


def researcher_agent(state: CollaborativeState) -> dict:
    """
    Researcher agent that:
    1. Reads task from state
    2. Checks for feedback from supervisor
    3. Posts findings to message board
    """
    task = state["task"]
    messages = state.get("messages", [])

    # Check for feedback requiring revision
    feedback = get_messages_for_agent(messages, "researcher", "feedback")

    if feedback:
        latest_feedback = feedback[-1]
        prompt = f"""You are a research agent. You received feedback on your previous research.

Task: {task}

Previous findings: {state.get('research_findings', 'None')}

Feedback: {latest_feedback['content']}

Please revise your research based on the feedback. Be thorough and address all points."""
    else:
        prompt = f"""You are a research agent. Research the following task thoroughly.

Task: {task}

Provide detailed findings with:
1. Key facts and data
2. Sources and evidence
3. Important considerations"""

    response = llm.invoke(prompt)
    findings = response.content

    # Post findings to message board
    finding_message = create_message(
        from_agent="researcher",
        to_agent="all",  # Broadcast to all agents
        message_type="finding",
        content=findings,
        metadata={
            "iteration": state.get("iteration", 1),
            "is_revision": bool(feedback)
        }
    )

    return {
        "research_findings": findings,
        "messages": [finding_message]
    }


def analyst_agent(state: CollaborativeState) -> dict:
    """
    Analyst agent that:
    1. Reads research findings from messages
    2. Checks for specific requests
    3. Posts analysis to message board
    """
    messages = state.get("messages", [])

    # Get research findings from message board
    research_messages = [
        m for m in messages
        if m["from_agent"] == "researcher" and m["message_type"] == "finding"
    ]

    if not research_messages:
        # No research to analyze yet
        return {
            "messages": [create_message(
                from_agent="analyst",
                to_agent="supervisor",
                message_type="status",
                content="Waiting for research findings"
            )]
        }

    # Get latest research
    latest_research = research_messages[-1]["content"]

    # Check for specific analysis requests
    requests = get_messages_for_agent(messages, "analyst", "request")

    if requests:
        latest_request = requests[-1]
        prompt = f"""You are an analysis agent. You have a specific request to address.

Research findings:
{latest_research}

Request: {latest_request['content']}

Provide a focused analysis addressing the specific request."""
    else:
        prompt = f"""You are an analysis agent. Analyze the following research findings.

Research findings:
{latest_research}

Provide:
1. Key insights and patterns
2. Strengths and weaknesses
3. Actionable recommendations"""

    response = llm.invoke(prompt)
    analysis = response.content

    # Post analysis to message board
    analysis_message = create_message(
        from_agent="analyst",
        to_agent="all",
        message_type="analysis",
        content=analysis,
        metadata={"based_on_research_id": research_messages[-1]["id"]}
    )

    return {
        "analysis_result": analysis,
        "messages": [analysis_message]
    }


def supervisor_agent(state: CollaborativeState) -> dict:
    """
    Supervisor that coordinates by reading and posting messages.
    Does NOT call agents directly--uses messages for coordination.
    """
    messages = state.get("messages", [])
    iteration = state.get("iteration", 1)
    max_iterations = state.get("max_iterations", 3)

    # Gather outputs from message board
    research_msgs = [m for m in messages if m["message_type"] == "finding"]
    analysis_msgs = [m for m in messages if m["message_type"] == "analysis"]

    # Build context for quality assessment
    context = f"""Task: {state['task']}

Research findings: {research_msgs[-1]['content'] if research_msgs else 'None'}

Analysis: {analysis_msgs[-1]['content'] if analysis_msgs else 'None'}

Iteration: {iteration}/{max_iterations}"""

    quality_prompt = f"""You are a supervisor reviewing team output.

{context}

Evaluate:
1. Is the research thorough and well-sourced? (1-10)
2. Is the analysis insightful and actionable? (1-10)
3. Are there gaps that need addressing?

If quality is below 7/10 in any area, specify what needs improvement.
If quality is good (7+/10), say "APPROVED".

Format: APPROVED or NEEDS_REVISION: [specific feedback]"""

    response = llm.invoke(quality_prompt)
    evaluation = response.content

    output_messages = []

    if "APPROVED" in evaluation.upper() or iteration >= max_iterations:
        # Quality approved or max iterations reached
        output_messages.append(create_message(
            from_agent="supervisor",
            to_agent="writer",
            message_type="request",
            content="Please compile the final report",
            metadata={"approved": True}
        ))

        return {
            "messages": output_messages,
            "iteration": iteration
        }
    else:
        # Send feedback for revision
        if "research" in evaluation.lower():
            output_messages.append(create_message(
                from_agent="supervisor",
                to_agent="researcher",
                message_type="feedback",
                content=evaluation
            ))

        if "analysis" in evaluation.lower():
            output_messages.append(create_message(
                from_agent="supervisor",
                to_agent="analyst",
                message_type="feedback",
                content=evaluation
            ))

        return {
            "messages": output_messages,
            "iteration": iteration + 1
        }

Request-Response Pattern

For direct agent-to-agent communication, use the request-response pattern with correlation IDs.

from typing import TypedDict, Annotated
import operator
import uuid


class RequestResponseState(TypedDict):
    """State supporting request-response communication."""
    task: str
    messages: Annotated[list[dict], operator.add]
    pending_requests: list[str]  # Track awaited responses
    completed_responses: dict    # Map request_id -> response


def data_requester_agent(state: RequestResponseState) -> dict:
    """
    Agent that requests data from another agent.
    Uses correlation IDs to track request-response pairs.
    """
    request_id = str(uuid.uuid4())

    request_message = {
        "id": request_id,
        "from_agent": "requester",
        "to_agent": "data_provider",
        "message_type": "data_request",
        "content": {
            "query": "Get sales data for Q4 2025",
            "format": "json",
            "filters": {"region": "EMEA"}
        },
        "expects_response": True,
        "timestamp": datetime.now().isoformat()
    }

    return {
        "messages": [request_message],
        "pending_requests": [request_id]
    }


def data_provider_agent(state: RequestResponseState) -> dict:
    """
    Agent that responds to data requests.
    Links response to original request via response_to field.
    """
    messages = state.get("messages", [])

    # Find requests addressed to this agent
    my_requests = [
        m for m in messages
        if m["to_agent"] == "data_provider"
        and m["message_type"] == "data_request"
    ]

    response_messages = []

    for request in my_requests:
        # Check if we already responded
        existing_responses = [
            m for m in messages
            if m.get("response_to") == request["id"]
        ]

        if existing_responses:
            continue  # Already responded

        # Process the request
        query = request["content"]["query"]

        # Simulate data retrieval
        data = {
            "query": query,
            "results": [
                {"month": "October", "sales": 150000},
                {"month": "November", "sales": 175000},
                {"month": "December", "sales": 220000}
            ],
            "total": 545000
        }

        response_messages.append({
            "id": str(uuid.uuid4()),
            "from_agent": "data_provider",
            "to_agent": request["from_agent"],  # Reply to requester
            "message_type": "data_response",
            "response_to": request["id"],  # Correlation ID
            "content": data,
            "timestamp": datetime.now().isoformat()
        })

    return {"messages": response_messages}


def process_responses_agent(state: RequestResponseState) -> dict:
    """
    Agent that processes received responses.
    Matches responses to pending requests.
    """
    messages = state.get("messages", [])
    pending = state.get("pending_requests", [])
    completed = state.get("completed_responses", {})

    # Find responses to our requests
    responses = [
        m for m in messages
        if m["message_type"] == "data_response"
        and m.get("response_to") in pending
    ]

    new_completed = dict(completed)
    new_pending = list(pending)

    for response in responses:
        request_id = response["response_to"]
        new_completed[request_id] = response["content"]

        if request_id in new_pending:
            new_pending.remove(request_id)

    return {
        "completed_responses": new_completed,
        "pending_requests": new_pending
    }

Pub-Sub Pattern with Topics

For complex systems, implement topic-based messaging where agents subscribe to specific topics.

from typing import TypedDict, Annotated, Literal
import operator


class TopicMessage(TypedDict):
    """Message with topic for pub-sub pattern."""
    id: str
    from_agent: str
    topic: str  # e.g., "research.findings", "analysis.complete"
    content: str
    timestamp: str


class PubSubState(TypedDict):
    """State for pub-sub communication pattern."""
    task: str
    messages: Annotated[list[TopicMessage], operator.add]

    # Agent subscriptions (defined at design time)
    # In practice, this is often implicit in agent logic


# Topic hierarchy examples:
# research.findings      - New research discovered
# research.revision      - Research needs revision
# analysis.complete      - Analysis finished
# analysis.needs_data    - Analysis requests more data
# quality.approved       - Quality check passed
# quality.rejected       - Quality check failed
# workflow.complete      - Entire workflow done


def publish_message(
    from_agent: str,
    topic: str,
    content: str
) -> TopicMessage:
    """Create a topic-based message."""
    return {
        "id": str(uuid.uuid4()),
        "from_agent": from_agent,
        "topic": topic,
        "content": content,
        "timestamp": datetime.now().isoformat()
    }


def subscribe_to_topics(
    messages: list[TopicMessage],
    topics: list[str]
) -> list[TopicMessage]:
    """
    Get messages matching subscribed topics.
    Supports wildcards: "research.*" matches all research topics.
    """
    matched = []

    for message in messages:
        msg_topic = message["topic"]

        for pattern in topics:
            if pattern.endswith(".*"):
                # Wildcard match
                prefix = pattern[:-2]
                if msg_topic.startswith(prefix):
                    matched.append(message)
                    break
            elif msg_topic == pattern:
                matched.append(message)
                break

    return matched


def research_agent_pubsub(state: PubSubState) -> dict:
    """Research agent using pub-sub pattern."""
    messages = state.get("messages", [])

    # Subscribe to revision requests
    revisions = subscribe_to_topics(messages, ["research.revision"])

    if revisions:
        latest = revisions[-1]
        findings = f"Revised research based on: {latest['content']}"
    else:
        findings = f"Initial research on: {state['task']}"

    # Publish findings
    return {
        "messages": [
            publish_message(
                from_agent="researcher",
                topic="research.findings",
                content=findings
            )
        ]
    }


def analyst_agent_pubsub(state: PubSubState) -> dict:
    """Analyst agent subscribing to research topics."""
    messages = state.get("messages", [])

    # Subscribe to all research topics
    research_msgs = subscribe_to_topics(messages, ["research.*"])

    if not research_msgs:
        return {"messages": []}

    latest_research = research_msgs[-1]["content"]
    analysis = f"Analysis of: {latest_research}"

    return {
        "messages": [
            publish_message(
                from_agent="analyst",
                topic="analysis.complete",
                content=analysis
            )
        ]
    }


def quality_agent_pubsub(state: PubSubState) -> dict:
    """Quality agent subscribing to completion topics."""
    messages = state.get("messages", [])

    # Subscribe to completion events
    completions = subscribe_to_topics(
        messages,
        ["research.findings", "analysis.complete"]
    )

    if len(completions) < 2:
        return {"messages": []}  # Wait for both

    # Quality check
    all_good = True  # Simplified

    if all_good:
        return {
            "messages": [
                publish_message(
                    from_agent="quality",
                    topic="quality.approved",
                    content="All outputs meet quality standards"
                )
            ]
        }
    else:
        return {
            "messages": [
                publish_message(
                    from_agent="quality",
                    topic="research.revision",
                    content="Research needs more depth"
                )
            ]
        }

Blackboard Pattern

The blackboard pattern uses typed sections of state for different data categories, enabling structured collaboration.

from typing import TypedDict, Annotated
from dataclasses import dataclass
import operator


class BlackboardState(TypedDict):
    """
    Blackboard pattern: Structured sections for different data types.
    Agents read from sections relevant to them and write to their section.
    """
    # Input section
    problem: str
    constraints: list[str]

    # Hypothesis section (written by hypothesis agents)
    hypotheses: Annotated[list[dict], operator.add]

    # Evidence section (written by research agents)
    evidence: Annotated[list[dict], operator.add]

    # Evaluation section (written by evaluation agents)
    evaluations: Annotated[list[dict], operator.add]

    # Solution section (written by synthesis agent)
    current_solution: dict
    solution_history: Annotated[list[dict], operator.add]

    # Control section
    phase: Literal["hypothesize", "gather_evidence", "evaluate", "synthesize"]
    iteration: int


def hypothesis_agent(state: BlackboardState) -> dict:
    """
    Generates hypotheses based on problem and existing evidence.
    Reads: problem, constraints, evidence
    Writes: hypotheses
    """
    problem = state["problem"]
    constraints = state.get("constraints", [])
    evidence = state.get("evidence", [])

    # Generate hypothesis based on available information
    existing_hypotheses = state.get("hypotheses", [])

    prompt = f"""Generate a hypothesis for this problem:

Problem: {problem}
Constraints: {constraints}
Existing evidence: {[e['finding'] for e in evidence]}
Previous hypotheses: {[h['statement'] for h in existing_hypotheses]}

Provide a NEW hypothesis not already proposed."""

    response = llm.invoke(prompt)

    new_hypothesis = {
        "id": str(uuid.uuid4()),
        "statement": response.content,
        "confidence": 0.5,  # Initial confidence
        "supporting_evidence": [],
        "contradicting_evidence": [],
        "created_at": datetime.now().isoformat()
    }

    return {"hypotheses": [new_hypothesis]}


def evidence_gatherer_agent(state: BlackboardState) -> dict:
    """
    Gathers evidence related to current hypotheses.
    Reads: hypotheses, problem
    Writes: evidence
    """
    hypotheses = state.get("hypotheses", [])

    if not hypotheses:
        return {"evidence": []}

    # Focus on most recent hypothesis
    target_hypothesis = hypotheses[-1]

    prompt = f"""Find evidence related to this hypothesis:

Hypothesis: {target_hypothesis['statement']}

Provide specific evidence that either supports or contradicts this hypothesis."""

    response = llm.invoke(prompt)

    new_evidence = {
        "id": str(uuid.uuid4()),
        "finding": response.content,
        "related_hypothesis_id": target_hypothesis["id"],
        "type": "supporting",  # or "contradicting"
        "source": "research",
        "created_at": datetime.now().isoformat()
    }

    return {"evidence": [new_evidence]}


def evaluation_agent(state: BlackboardState) -> dict:
    """
    Evaluates hypotheses against collected evidence.
    Reads: hypotheses, evidence
    Writes: evaluations
    """
    hypotheses = state.get("hypotheses", [])
    evidence = state.get("evidence", [])

    evaluations = []

    for hypothesis in hypotheses:
        # Get evidence related to this hypothesis
        related_evidence = [
            e for e in evidence
            if e.get("related_hypothesis_id") == hypothesis["id"]
        ]

        supporting = [e for e in related_evidence if e["type"] == "supporting"]
        contradicting = [e for e in related_evidence if e["type"] == "contradicting"]

        # Calculate confidence
        if related_evidence:
            confidence = len(supporting) / len(related_evidence)
        else:
            confidence = 0.5

        evaluations.append({
            "hypothesis_id": hypothesis["id"],
            "confidence": confidence,
            "supporting_count": len(supporting),
            "contradicting_count": len(contradicting),
            "recommendation": "accept" if confidence > 0.7 else "reject" if confidence < 0.3 else "investigate",
            "evaluated_at": datetime.now().isoformat()
        })

    return {"evaluations": evaluations}


def synthesis_agent(state: BlackboardState) -> dict:
    """
    Synthesizes final solution from evaluated hypotheses.
    Reads: hypotheses, evaluations, evidence
    Writes: current_solution, solution_history
    """
    hypotheses = state.get("hypotheses", [])
    evaluations = state.get("evaluations", [])

    # Find accepted hypotheses
    accepted_ids = {
        e["hypothesis_id"] for e in evaluations
        if e["recommendation"] == "accept"
    }

    accepted_hypotheses = [
        h for h in hypotheses
        if h["id"] in accepted_ids
    ]

    if not accepted_hypotheses:
        return {
            "current_solution": {"status": "no_accepted_hypotheses"},
            "solution_history": []
        }

    solution = {
        "id": str(uuid.uuid4()),
        "accepted_hypotheses": [h["statement"] for h in accepted_hypotheses],
        "synthesis": f"Solution based on {len(accepted_hypotheses)} accepted hypotheses",
        "confidence": sum(
            e["confidence"] for e in evaluations
            if e["hypothesis_id"] in accepted_ids
        ) / len(accepted_ids),
        "created_at": datetime.now().isoformat()
    }

    return {
        "current_solution": solution,
        "solution_history": [solution]
    }

Consensus Protocol

When multiple agents need to agree on a decision, implement a consensus protocol.

from typing import TypedDict, Annotated, Literal
from collections import Counter
import operator


class ConsensusState(TypedDict):
    """State for multi-agent consensus."""
    proposal: str
    votes: Annotated[list[dict], operator.add]
    consensus_reached: bool
    final_decision: str
    round: int
    max_rounds: int


def voting_agent(
    agent_name: str,
    expertise: str
) -> callable:
    """
    Factory for voting agents with different expertise.
    Each agent votes based on their perspective.
    """
    def agent(state: ConsensusState) -> dict:
        proposal = state["proposal"]
        existing_votes = state.get("votes", [])
        current_round = state.get("round", 1)

        # Check if this agent already voted this round
        my_votes = [
            v for v in existing_votes
            if v["agent"] == agent_name and v["round"] == current_round
        ]

        if my_votes:
            return {"votes": []}  # Already voted

        # Make decision based on expertise
        prompt = f"""You are {agent_name}, an expert in {expertise}.

Proposal: {proposal}

Previous votes this round: {[
    f"{v['agent']}: {v['vote']} ({v['reasoning'][:50]}...)"
    for v in existing_votes
    if v['round'] == current_round
]}

Vote APPROVE, REJECT, or ABSTAIN with brief reasoning.
Format: VOTE: [your vote] REASON: [one sentence]"""

        response = llm.invoke(prompt)
        content = response.content

        # Parse vote
        if "APPROVE" in content.upper():
            vote = "approve"
        elif "REJECT" in content.upper():
            vote = "reject"
        else:
            vote = "abstain"

        return {
            "votes": [{
                "agent": agent_name,
                "vote": vote,
                "reasoning": content,
                "round": current_round,
                "timestamp": datetime.now().isoformat()
            }]
        }

    return agent


def consensus_checker(state: ConsensusState) -> dict:
    """
    Checks if consensus has been reached.
    Requires 2/3 majority for approval.
    """
    votes = state.get("votes", [])
    current_round = state.get("round", 1)
    max_rounds = state.get("max_rounds", 3)

    # Count votes for current round
    round_votes = [v for v in votes if v["round"] == current_round]
    vote_counts = Counter(v["vote"] for v in round_votes)

    total_voters = len(set(v["agent"] for v in round_votes))
    approve_count = vote_counts.get("approve", 0)
    reject_count = vote_counts.get("reject", 0)

    # Check for 2/3 majority
    threshold = total_voters * 2 / 3

    if approve_count >= threshold:
        return {
            "consensus_reached": True,
            "final_decision": "approved",
            "round": current_round
        }
    elif reject_count >= threshold:
        return {
            "consensus_reached": True,
            "final_decision": "rejected",
            "round": current_round
        }
    elif current_round >= max_rounds:
        # No consensus after max rounds
        return {
            "consensus_reached": True,
            "final_decision": "no_consensus",
            "round": current_round
        }
    else:
        # Need another round
        return {
            "consensus_reached": False,
            "round": current_round + 1
        }


# Build consensus workflow
def build_consensus_workflow(agent_configs: list[tuple[str, str]]):
    """
    Build a consensus workflow with multiple voting agents.

    Args:
        agent_configs: List of (agent_name, expertise) tuples
    """
    builder = StateGraph(ConsensusState)

    # Add voting agents
    for name, expertise in agent_configs:
        builder.add_node(f"voter_{name}", voting_agent(name, expertise))

    builder.add_node("check_consensus", consensus_checker)

    # Wire up: START -> all voters -> consensus check
    for name, _ in agent_configs:
        builder.add_edge(START, f"voter_{name}")
        builder.add_edge(f"voter_{name}", "check_consensus")

    # Consensus check routes back or ends
    def route_after_consensus(state: ConsensusState) -> str:
        if state.get("consensus_reached"):
            return END
        else:
            # Go back to first voter for new round
            return f"voter_{agent_configs[0][0]}"

    builder.add_conditional_edges(
        "check_consensus",
        route_after_consensus,
        {END: END, **{f"voter_{name}": f"voter_{name}" for name, _ in agent_configs}}
    )

    return builder.compile()


# Usage example
consensus_app = build_consensus_workflow([
    ("tech_lead", "technical feasibility"),
    ("product_manager", "business value"),
    ("security_expert", "security implications"),
])

result = consensus_app.invoke({
    "proposal": "Implement SSO authentication using OAuth 2.0",
    "votes": [],
    "consensus_reached": False,
    "final_decision": "",
    "round": 1,
    "max_rounds": 3
})

Production Pattern: Communication Audit Trail

In production systems, maintain an audit trail of all inter-agent communication.

from typing import TypedDict, Annotated
from datetime import datetime
import operator
import json


class AuditableMessage(TypedDict):
    """Message with full audit trail information."""
    id: str
    from_agent: str
    to_agent: str
    message_type: str
    content: str
    timestamp: str

    # Audit fields
    parent_message_id: str | None  # For threading
    conversation_id: str           # Group related messages
    step_number: int               # Position in workflow
    checkpoint_id: str | None      # Link to checkpoint


class AuditedState(TypedDict):
    """State with auditable communication."""
    task: str
    messages: Annotated[list[AuditableMessage], operator.add]

    # Audit metadata
    conversation_id: str
    step_count: int


def create_auditable_message(
    state: AuditedState,
    from_agent: str,
    to_agent: str,
    message_type: str,
    content: str,
    parent_message_id: str = None
) -> AuditableMessage:
    """Create message with audit trail."""
    return {
        "id": str(uuid.uuid4()),
        "from_agent": from_agent,
        "to_agent": to_agent,
        "message_type": message_type,
        "content": content,
        "timestamp": datetime.now().isoformat(),
        "parent_message_id": parent_message_id,
        "conversation_id": state.get("conversation_id", "unknown"),
        "step_number": state.get("step_count", 0),
        "checkpoint_id": None  # Set by checkpointer
    }


def audit_communication(
    messages: list[AuditableMessage],
    conversation_id: str
) -> dict:
    """
    Generate audit report for a conversation.
    Useful for debugging and compliance.
    """
    conv_messages = [
        m for m in messages
        if m["conversation_id"] == conversation_id
    ]

    # Sort by timestamp
    conv_messages.sort(key=lambda m: m["timestamp"])

    # Build communication graph
    agent_interactions = {}
    for msg in conv_messages:
        key = (msg["from_agent"], msg["to_agent"])
        if key not in agent_interactions:
            agent_interactions[key] = []
        agent_interactions[key].append(msg["message_type"])

    # Generate statistics
    return {
        "conversation_id": conversation_id,
        "total_messages": len(conv_messages),
        "agents_involved": list(set(
            m["from_agent"] for m in conv_messages
        ) | set(
            m["to_agent"] for m in conv_messages
        )),
        "message_types": dict(Counter(m["message_type"] for m in conv_messages)),
        "agent_interactions": {
            f"{k[0]}->{k[1]}": v
            for k, v in agent_interactions.items()
        },
        "timeline": [
            {
                "step": m["step_number"],
                "from": m["from_agent"],
                "to": m["to_agent"],
                "type": m["message_type"],
                "timestamp": m["timestamp"]
            }
            for m in conv_messages
        ],
        "first_message": conv_messages[0]["timestamp"] if conv_messages else None,
        "last_message": conv_messages[-1]["timestamp"] if conv_messages else None
    }


def export_communication_log(
    messages: list[AuditableMessage],
    filepath: str
):
    """Export communication log for analysis."""
    with open(filepath, "w") as f:
        json.dump(messages, f, indent=2, default=str)

Interview Questions

Q: How do agents communicate in LangGraph?

"Through shared state fields, not direct messaging. Agents write to state fields with reducers (like Annotated[list, operator.add]) that accumulate messages without overwriting. This provides automatic persistence, deterministic replay, and time-travel debugging. The message board pattern is most common--agents post to and read from a shared messages list."

Q: What is the difference between broadcast and directed messages?

"Broadcast messages use to_agent: 'all' and are read by all agents. Directed messages target a specific agent (to_agent: 'analyst') and are filtered by the recipient. Agents filter messages using list comprehensions: [m for m in messages if m['to_agent'] in [agent_name, 'all']]. This enables both team-wide announcements and private communication."

Q: How do you implement request-response patterns in LangGraph?

"Use correlation IDs. The requester creates a unique request_id, includes it in the message, and tracks it in a pending_requests list. The responder includes response_to: request_id in the response. The requester matches responses to pending requests by this ID. This maintains correlation in stateless execution."

Q: When would you use pub-sub vs direct messaging?

"Pub-sub is better for loosely coupled systems where agents don't know about each other--they publish to topics and subscribe to topics they care about. Direct messaging is better for tightly coupled workflows where agents have known relationships. Pub-sub scales better but adds complexity; direct messaging is simpler but creates tight coupling."

Q: How do you implement consensus among multiple agents?

"Create voting agents that each cast a vote with reasoning. A consensus checker tallies votes and determines if threshold is met (typically 2/3 majority). If not, increment round and continue voting. Track votes with round numbers to distinguish between voting rounds. Set max_rounds to prevent infinite loops."


Key Takeaways

  • State is the communication channel - Agents read from and write to shared state, not direct messages
  • Reducers enable accumulation - Annotated[list, operator.add] appends without overwriting
  • Message structure matters - Include from, to, type, content, and timestamps
  • Correlation IDs for request-response - Link requests to responses with unique IDs
  • Pub-sub for loose coupling - Topic-based messaging decouples agents
  • Blackboard for structured collaboration - Typed sections for different data categories
  • Consensus for group decisions - Voting rounds with thresholds and max rounds
  • Audit trails for production - Track conversation_id, step_number, parent_message_id

:::

Quiz

Module 4: Multi-Agent Systems with LangGraph

Take Quiz