Long-Running Agents
Recovery & Resumption
4 min read
Things will fail. Networks drop, APIs timeout, bugs crash your agent. The question isn't if but how gracefully you recover.
Failure Modes
| Failure Type | Cause | Recovery Strategy |
|---|---|---|
| Network timeout | Slow API, connection drop | Retry with backoff |
| Context overflow | Too much accumulated context | Fresh session, load checkpoint |
| Tool error | External service failed | Skip or retry task |
| Agent confusion | Bad output, hallucination | Rollback, re-prompt |
| Process crash | OOM, exception | Resume from last checkpoint |
Graceful Restart Pattern
class ResilientAgent:
def __init__(self, state_file: str, max_retries: int = 3):
self.state_file = state_file
self.max_retries = max_retries
self.state = self.load_state()
def run_with_recovery(self):
"""Main loop with automatic recovery."""
while not self.is_complete():
task = self.get_current_task()
for attempt in range(self.max_retries):
try:
result = self.execute_task(task)
self.checkpoint(task, result)
break
except RecoverableError as e:
self.log(f"Attempt {attempt + 1} failed: {e}")
if attempt < self.max_retries - 1:
time.sleep(2 ** attempt) # Exponential backoff
else:
self.mark_task_failed(task, str(e))
self.handle_failed_task(task)
except FatalError as e:
self.log(f"Fatal error: {e}")
self.save_state() # Preserve progress
raise # Can't recover, exit
def handle_failed_task(self, task: dict):
"""Decide what to do with failed tasks."""
if task.get("critical"):
# Can't continue without this task
raise FatalError(f"Critical task failed: {task['id']}")
else:
# Skip and continue
self.log(f"Skipping non-critical task: {task['id']}")
self.advance_to_next_task()
Environment Setup on Resume
When resuming, the environment may have changed:
class EnvironmentManager:
def __init__(self, required_state: dict):
self.required = required_state
def verify_environment(self) -> list[str]:
"""Check if environment matches expected state."""
issues = []
# Check working directory
if not os.path.exists(self.required.get("work_dir", ".")):
issues.append("Working directory missing")
# Check required files exist
for file in self.required.get("required_files", []):
if not os.path.exists(file):
issues.append(f"Required file missing: {file}")
# Check git state
if self.required.get("expected_branch"):
current = self.get_current_branch()
if current != self.required["expected_branch"]:
issues.append(f"Wrong branch: {current} vs {self.required['expected_branch']}")
return issues
def restore_environment(self):
"""Attempt to restore expected environment."""
# Checkout correct branch
if self.required.get("expected_branch"):
subprocess.run(["git", "checkout", self.required["expected_branch"]])
# Restore from checkpoint commit if available
if self.required.get("checkpoint_commit"):
subprocess.run(["git", "reset", "--hard", self.required["checkpoint_commit"]])
# Reinstall dependencies if needed
if os.path.exists("requirements.txt"):
subprocess.run(["pip", "install", "-r", "requirements.txt"])
Checkpoint Validation
Don't blindly trust checkpoints—validate them:
def validate_checkpoint(state: dict) -> tuple[bool, list[str]]:
"""Verify checkpoint integrity before resuming."""
errors = []
# Check required fields
required = ["task", "subtasks", "current_index", "created_at"]
for field in required:
if field not in state:
errors.append(f"Missing field: {field}")
# Check index bounds
if state.get("current_index", 0) > len(state.get("subtasks", [])):
errors.append("Current index out of bounds")
# Check timestamps are valid
try:
datetime.fromisoformat(state["created_at"])
except (KeyError, ValueError):
errors.append("Invalid timestamp format")
# Check completed tasks have results
for completed in state.get("completed", []):
if "result" not in completed:
errors.append(f"Completed task missing result: {completed.get('task')}")
return len(errors) == 0, errors
The Resume Flow
def resume_agent(state_file: str):
"""Full resume flow with validation."""
# 1. Load checkpoint
if not os.path.exists(state_file):
raise FileNotFoundError("No checkpoint found")
state = json.load(open(state_file))
# 2. Validate checkpoint
valid, errors = validate_checkpoint(state)
if not valid:
print(f"Checkpoint validation failed: {errors}")
if input("Attempt repair? (y/n): ") == "y":
state = repair_checkpoint(state, errors)
else:
raise ValueError("Invalid checkpoint")
# 3. Verify environment
env_manager = EnvironmentManager(state.get("environment", {}))
issues = env_manager.verify_environment()
if issues:
print(f"Environment issues: {issues}")
env_manager.restore_environment()
# 4. Create agent and resume
agent = ResilientAgent(state_file)
agent.run_with_recovery()
Nerd Note: Test your recovery logic by randomly killing your agent mid-task. If you can't resume cleanly, your checkpointing is broken.
Next module: The Model Context Protocol and building agent skills. :::