The Future of Agents
Emerging Patterns
3 min read
After thousands of production agent deployments in 2025, clear patterns have emerged. These aren't theoretical—they're battle-tested.
Pattern 1: Human-in-the-Loop Checkpoints
The best agents know when to pause and ask:
class HumanCheckpointAgent:
def __init__(self):
self.checkpoint_rules = [
{"condition": "cost_exceeds", "threshold": 10.0},
{"condition": "destructive_action", "types": ["delete", "deploy", "publish"]},
{"condition": "confidence_below", "threshold": 0.7},
{"condition": "external_api", "domains": ["production", "payment"]}
]
async def execute_with_checkpoints(self, task: str) -> dict:
"""Execute with human checkpoints at critical points."""
plan = await self.create_plan(task)
for step in plan["steps"]:
checkpoint_needed = self.check_rules(step)
if checkpoint_needed:
approval = await self.request_human_approval(step)
if not approval["approved"]:
return {"status": "cancelled", "reason": approval.get("reason")}
await self.execute_step(step)
return {"status": "completed"}
def check_rules(self, step: dict) -> bool:
"""Check if any checkpoint rule triggers."""
for rule in self.checkpoint_rules:
if rule["condition"] == "destructive_action":
if step.get("action_type") in rule["types"]:
return True
elif rule["condition"] == "confidence_below":
if step.get("confidence", 1.0) < rule["threshold"]:
return True
return False
Pattern 2: Graceful Degradation
When capabilities fail, fall back intelligently:
class GracefulAgent:
def __init__(self):
self.capability_fallbacks = {
"web_search": ["cached_search", "ask_user"],
"code_execution": ["static_analysis", "explanation_only"],
"file_write": ["suggest_changes", "clipboard_copy"],
"api_call": ["mock_response", "manual_instruction"]
}
async def execute_with_fallback(self, capability: str, task: dict) -> dict:
"""Try capability with fallback chain."""
fallbacks = [capability] + self.capability_fallbacks.get(capability, [])
for attempt_capability in fallbacks:
try:
result = await self.try_capability(attempt_capability, task)
if result["success"]:
return {
"result": result,
"used_fallback": attempt_capability != capability,
"capability_used": attempt_capability
}
except Exception as e:
self.log(f"Capability {attempt_capability} failed: {e}")
continue
return {
"success": False,
"error": f"All fallbacks exhausted for {capability}"
}
Pattern 3: Context Compression
Keep context small but useful:
class ContextCompressor:
def __init__(self, max_tokens: int = 4000):
self.max_tokens = max_tokens
async def compress_context(self, full_context: str) -> str:
"""Intelligently compress context to fit limits."""
current_tokens = self.count_tokens(full_context)
if current_tokens <= self.max_tokens:
return full_context
# Strategy 1: Summarize old messages
compressed = await self.summarize_history(full_context)
# Strategy 2: Keep only relevant code blocks
if self.count_tokens(compressed) > self.max_tokens:
compressed = await self.extract_relevant_code(compressed)
# Strategy 3: Remove verbose explanations
if self.count_tokens(compressed) > self.max_tokens:
compressed = await self.strip_explanations(compressed)
return compressed
async def summarize_history(self, context: str) -> str:
"""Summarize older parts of conversation."""
response = await llm.chat(
model="claude-3-haiku-20240307", # Fast summarizer
messages=[{
"role": "user",
"content": f"""Summarize this conversation history, keeping:
- Key decisions made
- Current task state
- Important file paths and names
- Error messages encountered
Full context:
{context[:10000]}
Return a concise summary."""
}]
)
return response.content
Pattern 4: Speculative Execution
Do likely-needed work while waiting:
class SpeculativeAgent:
def __init__(self):
self.speculation_cache = {}
async def execute_speculatively(self, task: str) -> dict:
"""Start likely follow-up tasks early."""
# Start main task
main_task = asyncio.create_task(self.execute_task(task))
# Predict and start likely next steps
predictions = await self.predict_followups(task)
speculative_tasks = [
asyncio.create_task(self.speculative_execute(p))
for p in predictions[:3] # Top 3 predictions
]
# Wait for main task
main_result = await main_task
# Check if any speculation was useful
for pred, spec_task in zip(predictions[:3], speculative_tasks):
result = await spec_task
self.speculation_cache[pred["task"]] = result
return main_result
async def predict_followups(self, task: str) -> list[dict]:
"""Predict likely follow-up tasks."""
response = await llm.chat(
model="claude-3-haiku-20240307",
messages=[{
"role": "user",
"content": f"""Given this task, predict 3 likely follow-up tasks:
Task: {task}
Return JSON array of: [{{"task": "...", "probability": 0.X}}]"""
}]
)
return json.loads(response.content)
Pattern 5: Tool Result Caching
Don't repeat expensive operations:
class CachingToolExecutor:
def __init__(self, cache_ttl: int = 300):
self.cache = {}
self.cache_ttl = cache_ttl
def get_cache_key(self, tool: str, args: dict) -> str:
"""Generate cache key for tool call."""
# Only cache deterministic tools
cacheable = ["read_file", "search", "list_directory", "get_config"]
if tool not in cacheable:
return None
return f"{tool}:{hashlib.md5(json.dumps(args, sort_keys=True).encode()).hexdigest()}"
async def execute_tool(self, tool: str, args: dict) -> dict:
"""Execute tool with caching."""
cache_key = self.get_cache_key(tool, args)
if cache_key and cache_key in self.cache:
entry = self.cache[cache_key]
if time.time() - entry["timestamp"] < self.cache_ttl:
return {"result": entry["result"], "cached": True}
result = await self.actual_execute(tool, args)
if cache_key:
self.cache[cache_key] = {
"result": result,
"timestamp": time.time()
}
return {"result": result, "cached": False}
Pattern 6: Parallel Tool Execution
Run independent tools concurrently:
class ParallelToolExecutor:
async def execute_tool_batch(self, tool_calls: list[dict]) -> list[dict]:
"""Execute independent tool calls in parallel."""
# Analyze dependencies
independent, dependent = self.analyze_dependencies(tool_calls)
# Run independent calls in parallel
results = {}
if independent:
parallel_results = await asyncio.gather(*[
self.execute_tool(tc["tool"], tc["args"])
for tc in independent
])
for tc, result in zip(independent, parallel_results):
results[tc["id"]] = result
# Run dependent calls sequentially
for tc in dependent:
# Substitute results from dependencies
resolved_args = self.resolve_args(tc["args"], results)
results[tc["id"]] = await self.execute_tool(tc["tool"], resolved_args)
return [results[tc["id"]] for tc in tool_calls]
Nerd Note: These patterns emerged from pain. Each one represents thousands of failed runs before someone figured out the fix. Learn from our mistakes.
Next: Resources to continue your journey. :::