Multi-Agent System Design
State & Memory Management
4 min read
Agents need memory to maintain context across interactions. Designing effective state management is crucial for building reliable, consistent AI systems.
Types of Agent Memory
┌─────────────────────────────────────────────────────────────┐
│ Agent Memory Types │
├─────────────────────────────────────────────────────────────┤
│ │
│ Short-Term Long-Term │
│ ────────────────────── ────────────────────────────── │
│ • Current conversation • User preferences │
│ • Working context • Past interactions summary │
│ • Tool call results • Learned facts │
│ • In-flight state • Persistent knowledge │
│ │
│ Episodic Semantic │
│ ────────────────────── ────────────────────────────── │
│ • What happened • What is true │
│ • Specific events • General knowledge │
│ • Timestamped • Timeless │
│ │
└─────────────────────────────────────────────────────────────┘
Short-Term Memory
Conversation context within a single session:
class ShortTermMemory:
def __init__(self, max_tokens: int = 4000):
self.messages = []
self.max_tokens = max_tokens
def add(self, role: str, content: str):
self.messages.append({"role": role, "content": content})
self._trim_if_needed()
def get_context(self) -> list:
return self.messages.copy()
def _trim_if_needed(self):
"""Keep memory within token limit."""
while self._estimate_tokens() > self.max_tokens and len(self.messages) > 2:
# Remove oldest messages (keep system prompt)
self.messages.pop(1)
def _estimate_tokens(self) -> int:
return sum(len(m["content"]) // 4 for m in self.messages)
def summarize_and_compress(self, llm) -> str:
"""Summarize old context to save tokens."""
if len(self.messages) < 10:
return
# Keep recent messages
recent = self.messages[-5:]
old = self.messages[:-5]
# Summarize old messages
summary = llm.complete(
f"Summarize this conversation briefly:\n"
f"{json.dumps(old, indent=2)}"
)
# Replace old with summary
self.messages = [
{"role": "system", "content": f"Previous context: {summary}"}
] + recent
Long-Term Memory
Persistent storage across sessions:
class LongTermMemory:
def __init__(self, vector_store, embedding_model):
self.store = vector_store
self.embedder = embedding_model
async def store_memory(
self,
user_id: str,
content: str,
memory_type: str, # "fact", "preference", "episode"
metadata: dict = None
):
"""Store a memory for later retrieval."""
embedding = await self.embedder.embed(content)
await self.store.insert({
"embedding": embedding,
"content": content,
"user_id": user_id,
"memory_type": memory_type,
"timestamp": datetime.utcnow().isoformat(),
"metadata": metadata or {}
})
async def recall(
self,
user_id: str,
query: str,
memory_types: list = None,
top_k: int = 5
) -> list:
"""Recall relevant memories."""
query_embedding = await self.embedder.embed(query)
filters = {"user_id": user_id}
if memory_types:
filters["memory_type"] = {"$in": memory_types}
results = await self.store.search(
embedding=query_embedding,
filter=filters,
top_k=top_k
)
return [r["content"] for r in results]
async def update_memory(self, memory_id: str, new_content: str):
"""Update an existing memory."""
new_embedding = await self.embedder.embed(new_content)
await self.store.update(
memory_id,
{"content": new_content, "embedding": new_embedding}
)
Working Memory Pattern
Combine short and long-term for full context:
class WorkingMemory:
def __init__(self, short_term, long_term, max_context_tokens: int = 6000):
self.short_term = short_term
self.long_term = long_term
self.max_tokens = max_context_tokens
async def build_context(self, user_id: str, current_query: str) -> list:
"""Build complete context for LLM."""
context = []
# 1. Recall relevant long-term memories
memories = await self.long_term.recall(user_id, current_query)
if memories:
memory_text = "\n".join(memories)
context.append({
"role": "system",
"content": f"Relevant memories about this user:\n{memory_text}"
})
# 2. Add conversation history
context.extend(self.short_term.get_context())
# 3. Add current query
context.append({"role": "user", "content": current_query})
return self._fit_to_token_limit(context)
def _fit_to_token_limit(self, context: list) -> list:
"""Trim context to fit token limit."""
while self._estimate_tokens(context) > self.max_tokens:
# Remove oldest conversation message (keep system + memories)
for i, msg in enumerate(context):
if msg["role"] != "system":
context.pop(i)
break
return context
Distributed State Management
For multi-agent systems with shared state:
class DistributedAgentState:
def __init__(self, redis_client):
self.redis = redis_client
async def get_state(self, agent_id: str, session_id: str) -> dict:
"""Get agent's state for a session."""
key = f"agent:{agent_id}:session:{session_id}"
data = await self.redis.get(key)
return json.loads(data) if data else {}
async def set_state(self, agent_id: str, session_id: str, state: dict):
"""Update agent's state."""
key = f"agent:{agent_id}:session:{session_id}"
await self.redis.setex(key, 3600, json.dumps(state)) # 1 hour TTL
async def share_state(
self,
from_agent: str,
to_agent: str,
session_id: str,
data: dict
):
"""Share state between agents."""
key = f"shared:{from_agent}:{to_agent}:{session_id}"
await self.redis.setex(key, 600, json.dumps(data)) # 10 min TTL
async def get_shared_state(
self,
from_agent: str,
to_agent: str,
session_id: str
) -> dict:
"""Retrieve shared state."""
key = f"shared:{from_agent}:{to_agent}:{session_id}"
data = await self.redis.get(key)
return json.loads(data) if data else {}
Memory Consolidation
Periodically convert short-term to long-term:
class MemoryConsolidator:
def __init__(self, short_term, long_term, llm):
self.short_term = short_term
self.long_term = long_term
self.llm = llm
async def consolidate(self, user_id: str):
"""Extract important facts and store long-term."""
conversation = self.short_term.get_context()
if len(conversation) < 5:
return # Not enough to consolidate
# Extract facts using LLM
prompt = """From this conversation, extract:
1. User preferences (things they like/dislike)
2. Important facts about them
3. Key decisions made
Return as JSON: {"preferences": [], "facts": [], "decisions": []}
Conversation:
""" + json.dumps(conversation, indent=2)
extraction = await self.llm.complete(prompt)
data = json.loads(extraction)
# Store each extracted memory
for pref in data["preferences"]:
await self.long_term.store_memory(
user_id, pref, "preference"
)
for fact in data["facts"]:
await self.long_term.store_memory(
user_id, fact, "fact"
)
Next, we'll explore communication patterns between agents. :::