Lesson 8 of 20

Long-Running Agents

Recovery & Resumption

4 min read

Things will fail. Networks drop, APIs timeout, bugs crash your agent. The question isn't if but how gracefully you recover.

Failure Modes

Failure Type Cause Recovery Strategy
Network timeout Slow API, connection drop Retry with backoff
Context overflow Too much accumulated context Fresh session, load checkpoint
Tool error External service failed Skip or retry task
Agent confusion Bad output, hallucination Rollback, re-prompt
Process crash OOM, exception Resume from last checkpoint

Graceful Restart Pattern

class ResilientAgent:
    def __init__(self, state_file: str, max_retries: int = 3):
        self.state_file = state_file
        self.max_retries = max_retries
        self.state = self.load_state()

    def run_with_recovery(self):
        """Main loop with automatic recovery."""
        while not self.is_complete():
            task = self.get_current_task()

            for attempt in range(self.max_retries):
                try:
                    result = self.execute_task(task)
                    self.checkpoint(task, result)
                    break
                except RecoverableError as e:
                    self.log(f"Attempt {attempt + 1} failed: {e}")
                    if attempt < self.max_retries - 1:
                        time.sleep(2 ** attempt)  # Exponential backoff
                    else:
                        self.mark_task_failed(task, str(e))
                        self.handle_failed_task(task)
                except FatalError as e:
                    self.log(f"Fatal error: {e}")
                    self.save_state()  # Preserve progress
                    raise  # Can't recover, exit

    def handle_failed_task(self, task: dict):
        """Decide what to do with failed tasks."""
        if task.get("critical"):
            # Can't continue without this task
            raise FatalError(f"Critical task failed: {task['id']}")
        else:
            # Skip and continue
            self.log(f"Skipping non-critical task: {task['id']}")
            self.advance_to_next_task()

Environment Setup on Resume

When resuming, the environment may have changed:

class EnvironmentManager:
    def __init__(self, required_state: dict):
        self.required = required_state

    def verify_environment(self) -> list[str]:
        """Check if environment matches expected state."""
        issues = []

        # Check working directory
        if not os.path.exists(self.required.get("work_dir", ".")):
            issues.append("Working directory missing")

        # Check required files exist
        for file in self.required.get("required_files", []):
            if not os.path.exists(file):
                issues.append(f"Required file missing: {file}")

        # Check git state
        if self.required.get("expected_branch"):
            current = self.get_current_branch()
            if current != self.required["expected_branch"]:
                issues.append(f"Wrong branch: {current} vs {self.required['expected_branch']}")

        return issues

    def restore_environment(self):
        """Attempt to restore expected environment."""
        # Checkout correct branch
        if self.required.get("expected_branch"):
            subprocess.run(["git", "checkout", self.required["expected_branch"]])

        # Restore from checkpoint commit if available
        if self.required.get("checkpoint_commit"):
            subprocess.run(["git", "reset", "--hard", self.required["checkpoint_commit"]])

        # Reinstall dependencies if needed
        if os.path.exists("requirements.txt"):
            subprocess.run(["pip", "install", "-r", "requirements.txt"])

Checkpoint Validation

Don't blindly trust checkpoints—validate them:

def validate_checkpoint(state: dict) -> tuple[bool, list[str]]:
    """Verify checkpoint integrity before resuming."""
    errors = []

    # Check required fields
    required = ["task", "subtasks", "current_index", "created_at"]
    for field in required:
        if field not in state:
            errors.append(f"Missing field: {field}")

    # Check index bounds
    if state.get("current_index", 0) > len(state.get("subtasks", [])):
        errors.append("Current index out of bounds")

    # Check timestamps are valid
    try:
        datetime.fromisoformat(state["created_at"])
    except (KeyError, ValueError):
        errors.append("Invalid timestamp format")

    # Check completed tasks have results
    for completed in state.get("completed", []):
        if "result" not in completed:
            errors.append(f"Completed task missing result: {completed.get('task')}")

    return len(errors) == 0, errors

The Resume Flow

def resume_agent(state_file: str):
    """Full resume flow with validation."""

    # 1. Load checkpoint
    if not os.path.exists(state_file):
        raise FileNotFoundError("No checkpoint found")

    state = json.load(open(state_file))

    # 2. Validate checkpoint
    valid, errors = validate_checkpoint(state)
    if not valid:
        print(f"Checkpoint validation failed: {errors}")
        if input("Attempt repair? (y/n): ") == "y":
            state = repair_checkpoint(state, errors)
        else:
            raise ValueError("Invalid checkpoint")

    # 3. Verify environment
    env_manager = EnvironmentManager(state.get("environment", {}))
    issues = env_manager.verify_environment()
    if issues:
        print(f"Environment issues: {issues}")
        env_manager.restore_environment()

    # 4. Create agent and resume
    agent = ResilientAgent(state_file)
    agent.run_with_recovery()

Nerd Note: Test your recovery logic by randomly killing your agent mid-task. If you can't resume cleanly, your checkpointing is broken.

Next module: The Model Context Protocol and building agent skills. :::

Quiz

Module 2: Long-Running Agents

Take Quiz