Multi-Agent Systems with LangGraph
Inter-Agent Communication
In multi-agent systems, how agents share information determines system effectiveness. LangGraph uses state-based communication rather than direct messaging--agents read from and write to shared state fields. This lesson covers communication patterns from simple message passing to complex coordination protocols.
Why State-Based Communication?
Traditional multi-agent systems use message queues or direct RPC calls. LangGraph takes a different approach: all communication happens through state.
# Traditional approach (NOT how LangGraph works)
# agent_a.send_message(agent_b, data) # Direct messaging
# LangGraph approach: Communication through state
class TeamState(TypedDict):
"""Shared state is the communication medium."""
# Task context - read by all agents
task: str
# Message board - agents append messages here
messages: Annotated[list[dict], operator.add]
# Agent outputs - written by specific agents
research_output: str
analysis_output: str
final_report: str
Benefits of state-based communication:
| Benefit | Explanation |
|---|---|
| Automatic persistence | Messages saved with checkpoints |
| Time-travel debugging | Replay any point in conversation |
| Deterministic replay | Same state produces same behavior |
| No message loss | State is always consistent |
| Simple testing | Mock state, not message queues |
Message Board Pattern
The most common pattern is a shared message board where agents post and read messages.
from typing import TypedDict, Annotated, Literal
from datetime import datetime
import operator
import uuid
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, START, END
# ============================================================
# STATE DEFINITION WITH MESSAGE BOARD
# ============================================================
class Message(TypedDict):
"""Structured message format for agent communication."""
id: str # Unique message ID
from_agent: str # Sender agent name
to_agent: str # Recipient ("all" for broadcast)
message_type: str # Category: finding, request, response, feedback
content: str # Message content
timestamp: str # ISO format timestamp
metadata: dict # Additional context
class CollaborativeState(TypedDict):
"""State with shared message board for agent communication."""
# Input
task: str
# Message board - the communication channel
messages: Annotated[list[Message], operator.add]
# Agent outputs
research_findings: str
analysis_result: str
final_output: str
# Workflow control
iteration: int
max_iterations: int
# ============================================================
# HELPER FUNCTIONS
# ============================================================
def create_message(
from_agent: str,
to_agent: str,
message_type: str,
content: str,
metadata: dict = None
) -> Message:
"""Create a standardized message."""
return {
"id": str(uuid.uuid4()),
"from_agent": from_agent,
"to_agent": to_agent,
"message_type": message_type,
"content": content,
"timestamp": datetime.now().isoformat(),
"metadata": metadata or {}
}
def get_messages_for_agent(
messages: list[Message],
agent_name: str,
message_type: str = None
) -> list[Message]:
"""Filter messages addressed to a specific agent."""
filtered = [
m for m in messages
if m["to_agent"] in [agent_name, "all"]
]
if message_type:
filtered = [m for m in filtered if m["message_type"] == message_type]
return filtered
def get_latest_message(
messages: list[Message],
from_agent: str = None,
message_type: str = None
) -> Message | None:
"""Get the most recent message matching criteria."""
filtered = messages
if from_agent:
filtered = [m for m in filtered if m["from_agent"] == from_agent]
if message_type:
filtered = [m for m in filtered if m["message_type"] == message_type]
return filtered[-1] if filtered else None
# ============================================================
# AGENT IMPLEMENTATIONS
# ============================================================
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
def researcher_agent(state: CollaborativeState) -> dict:
"""
Researcher agent that:
1. Reads task from state
2. Checks for feedback from supervisor
3. Posts findings to message board
"""
task = state["task"]
messages = state.get("messages", [])
# Check for feedback requiring revision
feedback = get_messages_for_agent(messages, "researcher", "feedback")
if feedback:
latest_feedback = feedback[-1]
prompt = f"""You are a research agent. You received feedback on your previous research.
Task: {task}
Previous findings: {state.get('research_findings', 'None')}
Feedback: {latest_feedback['content']}
Please revise your research based on the feedback. Be thorough and address all points."""
else:
prompt = f"""You are a research agent. Research the following task thoroughly.
Task: {task}
Provide detailed findings with:
1. Key facts and data
2. Sources and evidence
3. Important considerations"""
response = llm.invoke(prompt)
findings = response.content
# Post findings to message board
finding_message = create_message(
from_agent="researcher",
to_agent="all", # Broadcast to all agents
message_type="finding",
content=findings,
metadata={
"iteration": state.get("iteration", 1),
"is_revision": bool(feedback)
}
)
return {
"research_findings": findings,
"messages": [finding_message]
}
def analyst_agent(state: CollaborativeState) -> dict:
"""
Analyst agent that:
1. Reads research findings from messages
2. Checks for specific requests
3. Posts analysis to message board
"""
messages = state.get("messages", [])
# Get research findings from message board
research_messages = [
m for m in messages
if m["from_agent"] == "researcher" and m["message_type"] == "finding"
]
if not research_messages:
# No research to analyze yet
return {
"messages": [create_message(
from_agent="analyst",
to_agent="supervisor",
message_type="status",
content="Waiting for research findings"
)]
}
# Get latest research
latest_research = research_messages[-1]["content"]
# Check for specific analysis requests
requests = get_messages_for_agent(messages, "analyst", "request")
if requests:
latest_request = requests[-1]
prompt = f"""You are an analysis agent. You have a specific request to address.
Research findings:
{latest_research}
Request: {latest_request['content']}
Provide a focused analysis addressing the specific request."""
else:
prompt = f"""You are an analysis agent. Analyze the following research findings.
Research findings:
{latest_research}
Provide:
1. Key insights and patterns
2. Strengths and weaknesses
3. Actionable recommendations"""
response = llm.invoke(prompt)
analysis = response.content
# Post analysis to message board
analysis_message = create_message(
from_agent="analyst",
to_agent="all",
message_type="analysis",
content=analysis,
metadata={"based_on_research_id": research_messages[-1]["id"]}
)
return {
"analysis_result": analysis,
"messages": [analysis_message]
}
def supervisor_agent(state: CollaborativeState) -> dict:
"""
Supervisor that coordinates by reading and posting messages.
Does NOT call agents directly--uses messages for coordination.
"""
messages = state.get("messages", [])
iteration = state.get("iteration", 1)
max_iterations = state.get("max_iterations", 3)
# Gather outputs from message board
research_msgs = [m for m in messages if m["message_type"] == "finding"]
analysis_msgs = [m for m in messages if m["message_type"] == "analysis"]
# Build context for quality assessment
context = f"""Task: {state['task']}
Research findings: {research_msgs[-1]['content'] if research_msgs else 'None'}
Analysis: {analysis_msgs[-1]['content'] if analysis_msgs else 'None'}
Iteration: {iteration}/{max_iterations}"""
quality_prompt = f"""You are a supervisor reviewing team output.
{context}
Evaluate:
1. Is the research thorough and well-sourced? (1-10)
2. Is the analysis insightful and actionable? (1-10)
3. Are there gaps that need addressing?
If quality is below 7/10 in any area, specify what needs improvement.
If quality is good (7+/10), say "APPROVED".
Format: APPROVED or NEEDS_REVISION: [specific feedback]"""
response = llm.invoke(quality_prompt)
evaluation = response.content
output_messages = []
if "APPROVED" in evaluation.upper() or iteration >= max_iterations:
# Quality approved or max iterations reached
output_messages.append(create_message(
from_agent="supervisor",
to_agent="writer",
message_type="request",
content="Please compile the final report",
metadata={"approved": True}
))
return {
"messages": output_messages,
"iteration": iteration
}
else:
# Send feedback for revision
if "research" in evaluation.lower():
output_messages.append(create_message(
from_agent="supervisor",
to_agent="researcher",
message_type="feedback",
content=evaluation
))
if "analysis" in evaluation.lower():
output_messages.append(create_message(
from_agent="supervisor",
to_agent="analyst",
message_type="feedback",
content=evaluation
))
return {
"messages": output_messages,
"iteration": iteration + 1
}
Request-Response Pattern
For direct agent-to-agent communication, use the request-response pattern with correlation IDs.
from typing import TypedDict, Annotated
import operator
import uuid
class RequestResponseState(TypedDict):
"""State supporting request-response communication."""
task: str
messages: Annotated[list[dict], operator.add]
pending_requests: list[str] # Track awaited responses
completed_responses: dict # Map request_id -> response
def data_requester_agent(state: RequestResponseState) -> dict:
"""
Agent that requests data from another agent.
Uses correlation IDs to track request-response pairs.
"""
request_id = str(uuid.uuid4())
request_message = {
"id": request_id,
"from_agent": "requester",
"to_agent": "data_provider",
"message_type": "data_request",
"content": {
"query": "Get sales data for Q4 2025",
"format": "json",
"filters": {"region": "EMEA"}
},
"expects_response": True,
"timestamp": datetime.now().isoformat()
}
return {
"messages": [request_message],
"pending_requests": [request_id]
}
def data_provider_agent(state: RequestResponseState) -> dict:
"""
Agent that responds to data requests.
Links response to original request via response_to field.
"""
messages = state.get("messages", [])
# Find requests addressed to this agent
my_requests = [
m for m in messages
if m["to_agent"] == "data_provider"
and m["message_type"] == "data_request"
]
response_messages = []
for request in my_requests:
# Check if we already responded
existing_responses = [
m for m in messages
if m.get("response_to") == request["id"]
]
if existing_responses:
continue # Already responded
# Process the request
query = request["content"]["query"]
# Simulate data retrieval
data = {
"query": query,
"results": [
{"month": "October", "sales": 150000},
{"month": "November", "sales": 175000},
{"month": "December", "sales": 220000}
],
"total": 545000
}
response_messages.append({
"id": str(uuid.uuid4()),
"from_agent": "data_provider",
"to_agent": request["from_agent"], # Reply to requester
"message_type": "data_response",
"response_to": request["id"], # Correlation ID
"content": data,
"timestamp": datetime.now().isoformat()
})
return {"messages": response_messages}
def process_responses_agent(state: RequestResponseState) -> dict:
"""
Agent that processes received responses.
Matches responses to pending requests.
"""
messages = state.get("messages", [])
pending = state.get("pending_requests", [])
completed = state.get("completed_responses", {})
# Find responses to our requests
responses = [
m for m in messages
if m["message_type"] == "data_response"
and m.get("response_to") in pending
]
new_completed = dict(completed)
new_pending = list(pending)
for response in responses:
request_id = response["response_to"]
new_completed[request_id] = response["content"]
if request_id in new_pending:
new_pending.remove(request_id)
return {
"completed_responses": new_completed,
"pending_requests": new_pending
}
Pub-Sub Pattern with Topics
For complex systems, implement topic-based messaging where agents subscribe to specific topics.
from typing import TypedDict, Annotated, Literal
import operator
class TopicMessage(TypedDict):
"""Message with topic for pub-sub pattern."""
id: str
from_agent: str
topic: str # e.g., "research.findings", "analysis.complete"
content: str
timestamp: str
class PubSubState(TypedDict):
"""State for pub-sub communication pattern."""
task: str
messages: Annotated[list[TopicMessage], operator.add]
# Agent subscriptions (defined at design time)
# In practice, this is often implicit in agent logic
# Topic hierarchy examples:
# research.findings - New research discovered
# research.revision - Research needs revision
# analysis.complete - Analysis finished
# analysis.needs_data - Analysis requests more data
# quality.approved - Quality check passed
# quality.rejected - Quality check failed
# workflow.complete - Entire workflow done
def publish_message(
from_agent: str,
topic: str,
content: str
) -> TopicMessage:
"""Create a topic-based message."""
return {
"id": str(uuid.uuid4()),
"from_agent": from_agent,
"topic": topic,
"content": content,
"timestamp": datetime.now().isoformat()
}
def subscribe_to_topics(
messages: list[TopicMessage],
topics: list[str]
) -> list[TopicMessage]:
"""
Get messages matching subscribed topics.
Supports wildcards: "research.*" matches all research topics.
"""
matched = []
for message in messages:
msg_topic = message["topic"]
for pattern in topics:
if pattern.endswith(".*"):
# Wildcard match
prefix = pattern[:-2]
if msg_topic.startswith(prefix):
matched.append(message)
break
elif msg_topic == pattern:
matched.append(message)
break
return matched
def research_agent_pubsub(state: PubSubState) -> dict:
"""Research agent using pub-sub pattern."""
messages = state.get("messages", [])
# Subscribe to revision requests
revisions = subscribe_to_topics(messages, ["research.revision"])
if revisions:
latest = revisions[-1]
findings = f"Revised research based on: {latest['content']}"
else:
findings = f"Initial research on: {state['task']}"
# Publish findings
return {
"messages": [
publish_message(
from_agent="researcher",
topic="research.findings",
content=findings
)
]
}
def analyst_agent_pubsub(state: PubSubState) -> dict:
"""Analyst agent subscribing to research topics."""
messages = state.get("messages", [])
# Subscribe to all research topics
research_msgs = subscribe_to_topics(messages, ["research.*"])
if not research_msgs:
return {"messages": []}
latest_research = research_msgs[-1]["content"]
analysis = f"Analysis of: {latest_research}"
return {
"messages": [
publish_message(
from_agent="analyst",
topic="analysis.complete",
content=analysis
)
]
}
def quality_agent_pubsub(state: PubSubState) -> dict:
"""Quality agent subscribing to completion topics."""
messages = state.get("messages", [])
# Subscribe to completion events
completions = subscribe_to_topics(
messages,
["research.findings", "analysis.complete"]
)
if len(completions) < 2:
return {"messages": []} # Wait for both
# Quality check
all_good = True # Simplified
if all_good:
return {
"messages": [
publish_message(
from_agent="quality",
topic="quality.approved",
content="All outputs meet quality standards"
)
]
}
else:
return {
"messages": [
publish_message(
from_agent="quality",
topic="research.revision",
content="Research needs more depth"
)
]
}
Blackboard Pattern
The blackboard pattern uses typed sections of state for different data categories, enabling structured collaboration.
from typing import TypedDict, Annotated
from dataclasses import dataclass
import operator
class BlackboardState(TypedDict):
"""
Blackboard pattern: Structured sections for different data types.
Agents read from sections relevant to them and write to their section.
"""
# Input section
problem: str
constraints: list[str]
# Hypothesis section (written by hypothesis agents)
hypotheses: Annotated[list[dict], operator.add]
# Evidence section (written by research agents)
evidence: Annotated[list[dict], operator.add]
# Evaluation section (written by evaluation agents)
evaluations: Annotated[list[dict], operator.add]
# Solution section (written by synthesis agent)
current_solution: dict
solution_history: Annotated[list[dict], operator.add]
# Control section
phase: Literal["hypothesize", "gather_evidence", "evaluate", "synthesize"]
iteration: int
def hypothesis_agent(state: BlackboardState) -> dict:
"""
Generates hypotheses based on problem and existing evidence.
Reads: problem, constraints, evidence
Writes: hypotheses
"""
problem = state["problem"]
constraints = state.get("constraints", [])
evidence = state.get("evidence", [])
# Generate hypothesis based on available information
existing_hypotheses = state.get("hypotheses", [])
prompt = f"""Generate a hypothesis for this problem:
Problem: {problem}
Constraints: {constraints}
Existing evidence: {[e['finding'] for e in evidence]}
Previous hypotheses: {[h['statement'] for h in existing_hypotheses]}
Provide a NEW hypothesis not already proposed."""
response = llm.invoke(prompt)
new_hypothesis = {
"id": str(uuid.uuid4()),
"statement": response.content,
"confidence": 0.5, # Initial confidence
"supporting_evidence": [],
"contradicting_evidence": [],
"created_at": datetime.now().isoformat()
}
return {"hypotheses": [new_hypothesis]}
def evidence_gatherer_agent(state: BlackboardState) -> dict:
"""
Gathers evidence related to current hypotheses.
Reads: hypotheses, problem
Writes: evidence
"""
hypotheses = state.get("hypotheses", [])
if not hypotheses:
return {"evidence": []}
# Focus on most recent hypothesis
target_hypothesis = hypotheses[-1]
prompt = f"""Find evidence related to this hypothesis:
Hypothesis: {target_hypothesis['statement']}
Provide specific evidence that either supports or contradicts this hypothesis."""
response = llm.invoke(prompt)
new_evidence = {
"id": str(uuid.uuid4()),
"finding": response.content,
"related_hypothesis_id": target_hypothesis["id"],
"type": "supporting", # or "contradicting"
"source": "research",
"created_at": datetime.now().isoformat()
}
return {"evidence": [new_evidence]}
def evaluation_agent(state: BlackboardState) -> dict:
"""
Evaluates hypotheses against collected evidence.
Reads: hypotheses, evidence
Writes: evaluations
"""
hypotheses = state.get("hypotheses", [])
evidence = state.get("evidence", [])
evaluations = []
for hypothesis in hypotheses:
# Get evidence related to this hypothesis
related_evidence = [
e for e in evidence
if e.get("related_hypothesis_id") == hypothesis["id"]
]
supporting = [e for e in related_evidence if e["type"] == "supporting"]
contradicting = [e for e in related_evidence if e["type"] == "contradicting"]
# Calculate confidence
if related_evidence:
confidence = len(supporting) / len(related_evidence)
else:
confidence = 0.5
evaluations.append({
"hypothesis_id": hypothesis["id"],
"confidence": confidence,
"supporting_count": len(supporting),
"contradicting_count": len(contradicting),
"recommendation": "accept" if confidence > 0.7 else "reject" if confidence < 0.3 else "investigate",
"evaluated_at": datetime.now().isoformat()
})
return {"evaluations": evaluations}
def synthesis_agent(state: BlackboardState) -> dict:
"""
Synthesizes final solution from evaluated hypotheses.
Reads: hypotheses, evaluations, evidence
Writes: current_solution, solution_history
"""
hypotheses = state.get("hypotheses", [])
evaluations = state.get("evaluations", [])
# Find accepted hypotheses
accepted_ids = {
e["hypothesis_id"] for e in evaluations
if e["recommendation"] == "accept"
}
accepted_hypotheses = [
h for h in hypotheses
if h["id"] in accepted_ids
]
if not accepted_hypotheses:
return {
"current_solution": {"status": "no_accepted_hypotheses"},
"solution_history": []
}
solution = {
"id": str(uuid.uuid4()),
"accepted_hypotheses": [h["statement"] for h in accepted_hypotheses],
"synthesis": f"Solution based on {len(accepted_hypotheses)} accepted hypotheses",
"confidence": sum(
e["confidence"] for e in evaluations
if e["hypothesis_id"] in accepted_ids
) / len(accepted_ids),
"created_at": datetime.now().isoformat()
}
return {
"current_solution": solution,
"solution_history": [solution]
}
Consensus Protocol
When multiple agents need to agree on a decision, implement a consensus protocol.
from typing import TypedDict, Annotated, Literal
from collections import Counter
import operator
class ConsensusState(TypedDict):
"""State for multi-agent consensus."""
proposal: str
votes: Annotated[list[dict], operator.add]
consensus_reached: bool
final_decision: str
round: int
max_rounds: int
def voting_agent(
agent_name: str,
expertise: str
) -> callable:
"""
Factory for voting agents with different expertise.
Each agent votes based on their perspective.
"""
def agent(state: ConsensusState) -> dict:
proposal = state["proposal"]
existing_votes = state.get("votes", [])
current_round = state.get("round", 1)
# Check if this agent already voted this round
my_votes = [
v for v in existing_votes
if v["agent"] == agent_name and v["round"] == current_round
]
if my_votes:
return {"votes": []} # Already voted
# Make decision based on expertise
prompt = f"""You are {agent_name}, an expert in {expertise}.
Proposal: {proposal}
Previous votes this round: {[
f"{v['agent']}: {v['vote']} ({v['reasoning'][:50]}...)"
for v in existing_votes
if v['round'] == current_round
]}
Vote APPROVE, REJECT, or ABSTAIN with brief reasoning.
Format: VOTE: [your vote] REASON: [one sentence]"""
response = llm.invoke(prompt)
content = response.content
# Parse vote
if "APPROVE" in content.upper():
vote = "approve"
elif "REJECT" in content.upper():
vote = "reject"
else:
vote = "abstain"
return {
"votes": [{
"agent": agent_name,
"vote": vote,
"reasoning": content,
"round": current_round,
"timestamp": datetime.now().isoformat()
}]
}
return agent
def consensus_checker(state: ConsensusState) -> dict:
"""
Checks if consensus has been reached.
Requires 2/3 majority for approval.
"""
votes = state.get("votes", [])
current_round = state.get("round", 1)
max_rounds = state.get("max_rounds", 3)
# Count votes for current round
round_votes = [v for v in votes if v["round"] == current_round]
vote_counts = Counter(v["vote"] for v in round_votes)
total_voters = len(set(v["agent"] for v in round_votes))
approve_count = vote_counts.get("approve", 0)
reject_count = vote_counts.get("reject", 0)
# Check for 2/3 majority
threshold = total_voters * 2 / 3
if approve_count >= threshold:
return {
"consensus_reached": True,
"final_decision": "approved",
"round": current_round
}
elif reject_count >= threshold:
return {
"consensus_reached": True,
"final_decision": "rejected",
"round": current_round
}
elif current_round >= max_rounds:
# No consensus after max rounds
return {
"consensus_reached": True,
"final_decision": "no_consensus",
"round": current_round
}
else:
# Need another round
return {
"consensus_reached": False,
"round": current_round + 1
}
# Build consensus workflow
def build_consensus_workflow(agent_configs: list[tuple[str, str]]):
"""
Build a consensus workflow with multiple voting agents.
Args:
agent_configs: List of (agent_name, expertise) tuples
"""
builder = StateGraph(ConsensusState)
# Add voting agents
for name, expertise in agent_configs:
builder.add_node(f"voter_{name}", voting_agent(name, expertise))
builder.add_node("check_consensus", consensus_checker)
# Wire up: START -> all voters -> consensus check
for name, _ in agent_configs:
builder.add_edge(START, f"voter_{name}")
builder.add_edge(f"voter_{name}", "check_consensus")
# Consensus check routes back or ends
def route_after_consensus(state: ConsensusState) -> str:
if state.get("consensus_reached"):
return END
else:
# Go back to first voter for new round
return f"voter_{agent_configs[0][0]}"
builder.add_conditional_edges(
"check_consensus",
route_after_consensus,
{END: END, **{f"voter_{name}": f"voter_{name}" for name, _ in agent_configs}}
)
return builder.compile()
# Usage example
consensus_app = build_consensus_workflow([
("tech_lead", "technical feasibility"),
("product_manager", "business value"),
("security_expert", "security implications"),
])
result = consensus_app.invoke({
"proposal": "Implement SSO authentication using OAuth 2.0",
"votes": [],
"consensus_reached": False,
"final_decision": "",
"round": 1,
"max_rounds": 3
})
Production Pattern: Communication Audit Trail
In production systems, maintain an audit trail of all inter-agent communication.
from typing import TypedDict, Annotated
from datetime import datetime
import operator
import json
class AuditableMessage(TypedDict):
"""Message with full audit trail information."""
id: str
from_agent: str
to_agent: str
message_type: str
content: str
timestamp: str
# Audit fields
parent_message_id: str | None # For threading
conversation_id: str # Group related messages
step_number: int # Position in workflow
checkpoint_id: str | None # Link to checkpoint
class AuditedState(TypedDict):
"""State with auditable communication."""
task: str
messages: Annotated[list[AuditableMessage], operator.add]
# Audit metadata
conversation_id: str
step_count: int
def create_auditable_message(
state: AuditedState,
from_agent: str,
to_agent: str,
message_type: str,
content: str,
parent_message_id: str = None
) -> AuditableMessage:
"""Create message with audit trail."""
return {
"id": str(uuid.uuid4()),
"from_agent": from_agent,
"to_agent": to_agent,
"message_type": message_type,
"content": content,
"timestamp": datetime.now().isoformat(),
"parent_message_id": parent_message_id,
"conversation_id": state.get("conversation_id", "unknown"),
"step_number": state.get("step_count", 0),
"checkpoint_id": None # Set by checkpointer
}
def audit_communication(
messages: list[AuditableMessage],
conversation_id: str
) -> dict:
"""
Generate audit report for a conversation.
Useful for debugging and compliance.
"""
conv_messages = [
m for m in messages
if m["conversation_id"] == conversation_id
]
# Sort by timestamp
conv_messages.sort(key=lambda m: m["timestamp"])
# Build communication graph
agent_interactions = {}
for msg in conv_messages:
key = (msg["from_agent"], msg["to_agent"])
if key not in agent_interactions:
agent_interactions[key] = []
agent_interactions[key].append(msg["message_type"])
# Generate statistics
return {
"conversation_id": conversation_id,
"total_messages": len(conv_messages),
"agents_involved": list(set(
m["from_agent"] for m in conv_messages
) | set(
m["to_agent"] for m in conv_messages
)),
"message_types": dict(Counter(m["message_type"] for m in conv_messages)),
"agent_interactions": {
f"{k[0]}->{k[1]}": v
for k, v in agent_interactions.items()
},
"timeline": [
{
"step": m["step_number"],
"from": m["from_agent"],
"to": m["to_agent"],
"type": m["message_type"],
"timestamp": m["timestamp"]
}
for m in conv_messages
],
"first_message": conv_messages[0]["timestamp"] if conv_messages else None,
"last_message": conv_messages[-1]["timestamp"] if conv_messages else None
}
def export_communication_log(
messages: list[AuditableMessage],
filepath: str
):
"""Export communication log for analysis."""
with open(filepath, "w") as f:
json.dump(messages, f, indent=2, default=str)
Interview Questions
Q: How do agents communicate in LangGraph?
"Through shared state fields, not direct messaging. Agents write to state fields with reducers (like
Annotated[list, operator.add]) that accumulate messages without overwriting. This provides automatic persistence, deterministic replay, and time-travel debugging. The message board pattern is most common--agents post to and read from a shared messages list."
Q: What is the difference between broadcast and directed messages?
"Broadcast messages use
to_agent: 'all'and are read by all agents. Directed messages target a specific agent (to_agent: 'analyst') and are filtered by the recipient. Agents filter messages using list comprehensions:[m for m in messages if m['to_agent'] in [agent_name, 'all']]. This enables both team-wide announcements and private communication."
Q: How do you implement request-response patterns in LangGraph?
"Use correlation IDs. The requester creates a unique request_id, includes it in the message, and tracks it in a pending_requests list. The responder includes
response_to: request_idin the response. The requester matches responses to pending requests by this ID. This maintains correlation in stateless execution."
Q: When would you use pub-sub vs direct messaging?
"Pub-sub is better for loosely coupled systems where agents don't know about each other--they publish to topics and subscribe to topics they care about. Direct messaging is better for tightly coupled workflows where agents have known relationships. Pub-sub scales better but adds complexity; direct messaging is simpler but creates tight coupling."
Q: How do you implement consensus among multiple agents?
"Create voting agents that each cast a vote with reasoning. A consensus checker tallies votes and determines if threshold is met (typically 2/3 majority). If not, increment round and continue voting. Track votes with round numbers to distinguish between voting rounds. Set max_rounds to prevent infinite loops."
Key Takeaways
- State is the communication channel - Agents read from and write to shared state, not direct messages
- Reducers enable accumulation -
Annotated[list, operator.add]appends without overwriting - Message structure matters - Include from, to, type, content, and timestamps
- Correlation IDs for request-response - Link requests to responses with unique IDs
- Pub-sub for loose coupling - Topic-based messaging decouples agents
- Blackboard for structured collaboration - Typed sections for different data categories
- Consensus for group decisions - Voting rounds with thresholds and max rounds
- Audit trails for production - Track conversation_id, step_number, parent_message_id
:::