Production & Enterprise
Cost Management
3 min read
Agents can burn through API budgets fast. A runaway loop or verbose prompts can cost hundreds of dollars in minutes. Cost management isn't optional—it's survival.
Understanding Agent Costs
| Cost Driver | Impact | Mitigation |
|---|---|---|
| Input tokens | System prompts, context, tool results | Compression, caching |
| Output tokens | Verbose responses, repeated explanations | Concise prompts, max_tokens |
| Tool loops | Agent calls same tool repeatedly | Loop detection, limits |
| Model choice | Opus vs Sonnet vs Haiku | Right-size per task |
| Failed retries | Wasted tokens on errors | Better error handling |
Budget Enforcement
Hard limits that actually stop spending:
from dataclasses import dataclass, field
from datetime import datetime, timedelta
@dataclass
class Budget:
max_tokens: int
max_requests: int
max_cost_usd: float
period: timedelta
tokens_used: int = 0
requests_made: int = 0
cost_incurred: float = 0.0
period_start: datetime = field(default_factory=datetime.now)
class BudgetEnforcer:
def __init__(self):
self.budgets = {} # user_id -> Budget
def set_budget(self, user_id: str, budget: Budget):
self.budgets[user_id] = budget
def check_budget(self, user_id: str, estimated_tokens: int) -> tuple[bool, str]:
"""Check if request fits within budget."""
budget = self.budgets.get(user_id)
if not budget:
return True, "" # No budget set
# Reset if period expired
if datetime.now() - budget.period_start > budget.period:
budget.tokens_used = 0
budget.requests_made = 0
budget.cost_incurred = 0.0
budget.period_start = datetime.now()
# Check all limits
if budget.tokens_used + estimated_tokens > budget.max_tokens:
return False, f"Token budget exceeded: {budget.tokens_used}/{budget.max_tokens}"
if budget.requests_made >= budget.max_requests:
return False, f"Request limit reached: {budget.requests_made}/{budget.max_requests}"
estimated_cost = self.estimate_cost(estimated_tokens)
if budget.cost_incurred + estimated_cost > budget.max_cost_usd:
return False, f"Cost budget exceeded: ${budget.cost_incurred:.2f}/${budget.max_cost_usd:.2f}"
return True, ""
def record_usage(self, user_id: str, tokens: int, cost: float):
"""Record actual usage after request."""
budget = self.budgets.get(user_id)
if budget:
budget.tokens_used += tokens
budget.requests_made += 1
budget.cost_incurred += cost
def estimate_cost(self, tokens: int, model: str = "claude-sonnet-4-20250514") -> float:
"""Estimate cost based on token count."""
# Prices as of December 2025
prices = {
"claude-opus-4-20250514": {"input": 15.0, "output": 75.0},
"claude-sonnet-4-20250514": {"input": 3.0, "output": 15.0},
"claude-3-haiku-20240307": {"input": 0.25, "output": 1.25}
}
rate = prices.get(model, prices["claude-sonnet-4-20250514"])
# Assume 50/50 input/output split for estimation
return (tokens / 2 * rate["input"] + tokens / 2 * rate["output"]) / 1_000_000
Model Selection by Task
Use the cheapest model that works:
class ModelSelector:
def __init__(self):
self.model_tiers = {
"haiku": {
"model": "claude-3-haiku-20240307",
"tasks": ["classification", "extraction", "simple_qa", "routing"],
"max_complexity": 2
},
"sonnet": {
"model": "claude-sonnet-4-20250514",
"tasks": ["coding", "analysis", "summarization", "tool_use"],
"max_complexity": 7
},
"opus": {
"model": "claude-opus-4-20250514",
"tasks": ["research", "complex_reasoning", "creative", "multi_step"],
"max_complexity": 10
}
}
def select_model(self, task_type: str, complexity: int = 5) -> str:
"""Select the most cost-effective model for the task."""
for tier_name in ["haiku", "sonnet", "opus"]:
tier = self.model_tiers[tier_name]
if task_type in tier["tasks"] and complexity <= tier["max_complexity"]:
return tier["model"]
return self.model_tiers["sonnet"]["model"] # Default
# Usage
selector = ModelSelector()
model = selector.select_model("classification", complexity=1) # Returns haiku
model = selector.select_model("coding", complexity=6) # Returns sonnet
model = selector.select_model("research", complexity=9) # Returns opus
Loop Detection and Limits
Prevent runaway agents:
class LoopDetector:
def __init__(self, max_iterations: int = 20, similarity_threshold: float = 0.9):
self.max_iterations = max_iterations
self.similarity_threshold = similarity_threshold
self.history = []
def check_iteration(self, tool_call: dict) -> tuple[bool, str]:
"""Check if agent is stuck in a loop."""
# Check total iterations
if len(self.history) >= self.max_iterations:
return False, f"Max iterations ({self.max_iterations}) reached"
# Check for repeated identical calls
call_signature = f"{tool_call['name']}:{json.dumps(tool_call['args'], sort_keys=True)}"
identical_count = sum(1 for h in self.history[-5:] if h == call_signature)
if identical_count >= 3:
return False, "Detected repeated identical tool calls"
self.history.append(call_signature)
return True, ""
def reset(self):
self.history = []
Prompt Caching
Reuse expensive context:
class PromptCache:
def __init__(self, ttl_seconds: int = 300):
self.cache = {}
self.ttl = ttl_seconds
def get_cached_context(self, context_key: str) -> str | None:
"""Retrieve cached context if still valid."""
if context_key in self.cache:
entry = self.cache[context_key]
if time.time() - entry["timestamp"] < self.ttl:
return entry["content"]
del self.cache[context_key]
return None
def cache_context(self, context_key: str, content: str):
"""Cache expensive context for reuse."""
self.cache[context_key] = {
"content": content,
"timestamp": time.time()
}
# Example: Cache system prompts and large file contents
async def get_agent_context(project_id: str) -> str:
cache_key = f"project:{project_id}"
cached = prompt_cache.get_cached_context(cache_key)
if cached:
return cached # Save tokens!
# Build expensive context
context = await build_project_context(project_id)
prompt_cache.cache_context(cache_key, context)
return context
Cost Dashboard
Track and visualize spending:
class CostDashboard:
def __init__(self):
self.usage_log = []
def log_request(self, user_id: str, model: str, input_tokens: int, output_tokens: int):
"""Log each request with full cost breakdown."""
cost = self.calculate_cost(model, input_tokens, output_tokens)
self.usage_log.append({
"timestamp": datetime.now(),
"user_id": user_id,
"model": model,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"cost_usd": cost
})
def get_daily_summary(self, date: datetime = None) -> dict:
"""Get cost summary for a specific day."""
date = date or datetime.now()
day_start = date.replace(hour=0, minute=0, second=0)
day_end = day_start + timedelta(days=1)
day_logs = [l for l in self.usage_log
if day_start <= l["timestamp"] < day_end]
return {
"date": date.date().isoformat(),
"total_requests": len(day_logs),
"total_tokens": sum(l["input_tokens"] + l["output_tokens"] for l in day_logs),
"total_cost_usd": sum(l["cost_usd"] for l in day_logs),
"by_model": self.group_by_model(day_logs),
"by_user": self.group_by_user(day_logs)
}
Nerd Note: The best cost optimization is not making the call at all. Can you cache it? Can a smaller model do it? Does the user actually need this feature?
Next module: Where agents are headed. :::