Lesson 16 of 20

Production & Enterprise

Cost Management

3 min read

Agents can burn through API budgets fast. A runaway loop or verbose prompts can cost hundreds of dollars in minutes. Cost management isn't optional—it's survival.

Understanding Agent Costs

Cost Driver Impact Mitigation
Input tokens System prompts, context, tool results Compression, caching
Output tokens Verbose responses, repeated explanations Concise prompts, max_tokens
Tool loops Agent calls same tool repeatedly Loop detection, limits
Model choice Opus vs Sonnet vs Haiku Right-size per task
Failed retries Wasted tokens on errors Better error handling

Budget Enforcement

Hard limits that actually stop spending:

from dataclasses import dataclass, field
from datetime import datetime, timedelta

@dataclass
class Budget:
    max_tokens: int
    max_requests: int
    max_cost_usd: float
    period: timedelta
    tokens_used: int = 0
    requests_made: int = 0
    cost_incurred: float = 0.0
    period_start: datetime = field(default_factory=datetime.now)

class BudgetEnforcer:
    def __init__(self):
        self.budgets = {}  # user_id -> Budget

    def set_budget(self, user_id: str, budget: Budget):
        self.budgets[user_id] = budget

    def check_budget(self, user_id: str, estimated_tokens: int) -> tuple[bool, str]:
        """Check if request fits within budget."""
        budget = self.budgets.get(user_id)
        if not budget:
            return True, ""  # No budget set

        # Reset if period expired
        if datetime.now() - budget.period_start > budget.period:
            budget.tokens_used = 0
            budget.requests_made = 0
            budget.cost_incurred = 0.0
            budget.period_start = datetime.now()

        # Check all limits
        if budget.tokens_used + estimated_tokens > budget.max_tokens:
            return False, f"Token budget exceeded: {budget.tokens_used}/{budget.max_tokens}"

        if budget.requests_made >= budget.max_requests:
            return False, f"Request limit reached: {budget.requests_made}/{budget.max_requests}"

        estimated_cost = self.estimate_cost(estimated_tokens)
        if budget.cost_incurred + estimated_cost > budget.max_cost_usd:
            return False, f"Cost budget exceeded: ${budget.cost_incurred:.2f}/${budget.max_cost_usd:.2f}"

        return True, ""

    def record_usage(self, user_id: str, tokens: int, cost: float):
        """Record actual usage after request."""
        budget = self.budgets.get(user_id)
        if budget:
            budget.tokens_used += tokens
            budget.requests_made += 1
            budget.cost_incurred += cost

    def estimate_cost(self, tokens: int, model: str = "claude-sonnet-4-20250514") -> float:
        """Estimate cost based on token count."""
        # Prices as of December 2025
        prices = {
            "claude-opus-4-20250514": {"input": 15.0, "output": 75.0},
            "claude-sonnet-4-20250514": {"input": 3.0, "output": 15.0},
            "claude-3-haiku-20240307": {"input": 0.25, "output": 1.25}
        }
        rate = prices.get(model, prices["claude-sonnet-4-20250514"])
        # Assume 50/50 input/output split for estimation
        return (tokens / 2 * rate["input"] + tokens / 2 * rate["output"]) / 1_000_000

Model Selection by Task

Use the cheapest model that works:

class ModelSelector:
    def __init__(self):
        self.model_tiers = {
            "haiku": {
                "model": "claude-3-haiku-20240307",
                "tasks": ["classification", "extraction", "simple_qa", "routing"],
                "max_complexity": 2
            },
            "sonnet": {
                "model": "claude-sonnet-4-20250514",
                "tasks": ["coding", "analysis", "summarization", "tool_use"],
                "max_complexity": 7
            },
            "opus": {
                "model": "claude-opus-4-20250514",
                "tasks": ["research", "complex_reasoning", "creative", "multi_step"],
                "max_complexity": 10
            }
        }

    def select_model(self, task_type: str, complexity: int = 5) -> str:
        """Select the most cost-effective model for the task."""
        for tier_name in ["haiku", "sonnet", "opus"]:
            tier = self.model_tiers[tier_name]
            if task_type in tier["tasks"] and complexity <= tier["max_complexity"]:
                return tier["model"]

        return self.model_tiers["sonnet"]["model"]  # Default

# Usage
selector = ModelSelector()
model = selector.select_model("classification", complexity=1)  # Returns haiku
model = selector.select_model("coding", complexity=6)  # Returns sonnet
model = selector.select_model("research", complexity=9)  # Returns opus

Loop Detection and Limits

Prevent runaway agents:

class LoopDetector:
    def __init__(self, max_iterations: int = 20, similarity_threshold: float = 0.9):
        self.max_iterations = max_iterations
        self.similarity_threshold = similarity_threshold
        self.history = []

    def check_iteration(self, tool_call: dict) -> tuple[bool, str]:
        """Check if agent is stuck in a loop."""
        # Check total iterations
        if len(self.history) >= self.max_iterations:
            return False, f"Max iterations ({self.max_iterations}) reached"

        # Check for repeated identical calls
        call_signature = f"{tool_call['name']}:{json.dumps(tool_call['args'], sort_keys=True)}"

        identical_count = sum(1 for h in self.history[-5:] if h == call_signature)
        if identical_count >= 3:
            return False, "Detected repeated identical tool calls"

        self.history.append(call_signature)
        return True, ""

    def reset(self):
        self.history = []

Prompt Caching

Reuse expensive context:

class PromptCache:
    def __init__(self, ttl_seconds: int = 300):
        self.cache = {}
        self.ttl = ttl_seconds

    def get_cached_context(self, context_key: str) -> str | None:
        """Retrieve cached context if still valid."""
        if context_key in self.cache:
            entry = self.cache[context_key]
            if time.time() - entry["timestamp"] < self.ttl:
                return entry["content"]
            del self.cache[context_key]
        return None

    def cache_context(self, context_key: str, content: str):
        """Cache expensive context for reuse."""
        self.cache[context_key] = {
            "content": content,
            "timestamp": time.time()
        }

# Example: Cache system prompts and large file contents
async def get_agent_context(project_id: str) -> str:
    cache_key = f"project:{project_id}"

    cached = prompt_cache.get_cached_context(cache_key)
    if cached:
        return cached  # Save tokens!

    # Build expensive context
    context = await build_project_context(project_id)
    prompt_cache.cache_context(cache_key, context)
    return context

Cost Dashboard

Track and visualize spending:

class CostDashboard:
    def __init__(self):
        self.usage_log = []

    def log_request(self, user_id: str, model: str, input_tokens: int, output_tokens: int):
        """Log each request with full cost breakdown."""
        cost = self.calculate_cost(model, input_tokens, output_tokens)
        self.usage_log.append({
            "timestamp": datetime.now(),
            "user_id": user_id,
            "model": model,
            "input_tokens": input_tokens,
            "output_tokens": output_tokens,
            "cost_usd": cost
        })

    def get_daily_summary(self, date: datetime = None) -> dict:
        """Get cost summary for a specific day."""
        date = date or datetime.now()
        day_start = date.replace(hour=0, minute=0, second=0)
        day_end = day_start + timedelta(days=1)

        day_logs = [l for l in self.usage_log
                    if day_start <= l["timestamp"] < day_end]

        return {
            "date": date.date().isoformat(),
            "total_requests": len(day_logs),
            "total_tokens": sum(l["input_tokens"] + l["output_tokens"] for l in day_logs),
            "total_cost_usd": sum(l["cost_usd"] for l in day_logs),
            "by_model": self.group_by_model(day_logs),
            "by_user": self.group_by_user(day_logs)
        }

Nerd Note: The best cost optimization is not making the call at all. Can you cache it? Can a smaller model do it? Does the user actually need this feature?

Next module: Where agents are headed. :::

Quiz

Module 4: Production & Enterprise

Take Quiz