Production & Enterprise
Security Best Practices
4 min read
An agent with access to your systems is a powerful tool—and a powerful attack surface. Security isn't optional; it's existential.
The Threat Model
| Threat | Example | Mitigation |
|---|---|---|
| Prompt Injection | User tricks agent into running malicious code | Input sanitization, sandboxing |
| Data Exfiltration | Agent sends sensitive data to external service | Network policies, output filtering |
| Privilege Escalation | Agent gains access beyond its scope | Least privilege, capability tokens |
| Resource Abuse | Agent runs infinite loops, consumes all tokens | Rate limits, budgets, timeouts |
| Tool Misuse | Agent deletes wrong files, drops tables | Confirmation flows, dry-run modes |
Input Sanitization
Never trust user input—or agent-generated content:
class InputSanitizer:
def __init__(self):
self.dangerous_patterns = [
r"ignore previous instructions",
r"system prompt",
r"<\|.*?\|>", # Special tokens
r"```.*?exec\(.*?```", # Code execution
]
def sanitize(self, text: str) -> tuple[str, list[str]]:
"""Remove dangerous patterns and return warnings."""
warnings = []
for pattern in self.dangerous_patterns:
if re.search(pattern, text, re.IGNORECASE | re.DOTALL):
warnings.append(f"Blocked pattern: {pattern}")
text = re.sub(pattern, "[BLOCKED]", text, flags=re.IGNORECASE | re.DOTALL)
return text, warnings
def validate_tool_args(self, tool_name: str, args: dict) -> tuple[bool, str]:
"""Validate tool arguments before execution."""
if tool_name == "run_command":
cmd = args.get("command", "")
# Block dangerous commands
blocked = ["rm -rf", "sudo", "chmod 777", "curl | bash", "eval"]
for b in blocked:
if b in cmd:
return False, f"Blocked command pattern: {b}"
if tool_name == "write_file":
path = args.get("path", "")
# Prevent path traversal
if ".." in path or path.startswith("/etc") or path.startswith("/root"):
return False, f"Blocked path: {path}"
return True, ""
Sandboxed Execution
Run code in isolated environments:
import docker
class SandboxExecutor:
def __init__(self, image: str = "python:3.11-slim"):
self.client = docker.from_env()
self.image = image
self.timeout = 30
self.memory_limit = "256m"
self.network_disabled = True
async def execute(self, code: str) -> dict:
"""Run code in an isolated container."""
container = self.client.containers.run(
self.image,
command=["python", "-c", code],
detach=True,
mem_limit=self.memory_limit,
network_disabled=self.network_disabled,
read_only=True,
security_opt=["no-new-privileges:true"]
)
try:
result = container.wait(timeout=self.timeout)
logs = container.logs().decode()
return {
"success": result["StatusCode"] == 0,
"output": logs,
"exit_code": result["StatusCode"]
}
except Exception as e:
return {"success": False, "error": str(e)}
finally:
container.remove(force=True)
Output Filtering
Prevent sensitive data leakage:
class OutputFilter:
def __init__(self):
self.patterns = {
"api_key": r"(sk-[a-zA-Z0-9]{48}|api[_-]?key['\"]?\s*[:=]\s*['\"][^'\"]+)",
"password": r"password['\"]?\s*[:=]\s*['\"][^'\"]+",
"jwt": r"eyJ[a-zA-Z0-9_-]+\.eyJ[a-zA-Z0-9_-]+\.[a-zA-Z0-9_-]+",
"private_key": r"-----BEGIN (RSA |EC )?PRIVATE KEY-----",
"aws_key": r"AKIA[0-9A-Z]{16}",
}
def filter_output(self, text: str) -> tuple[str, list[str]]:
"""Redact sensitive information from agent output."""
redacted = []
for name, pattern in self.patterns.items():
matches = re.findall(pattern, text, re.IGNORECASE)
if matches:
redacted.append(f"Redacted {len(matches)} {name} pattern(s)")
text = re.sub(pattern, f"[REDACTED_{name.upper()}]", text, flags=re.IGNORECASE)
return text, redacted
def validate_urls(self, text: str, allowed_domains: list[str]) -> tuple[str, list[str]]:
"""Block requests to unauthorized domains."""
blocked = []
urls = re.findall(r'https?://([^/\s]+)', text)
for url in urls:
domain = url.split(':')[0] # Remove port
if not any(domain.endswith(d) for d in allowed_domains):
blocked.append(domain)
text = text.replace(url, "[BLOCKED_DOMAIN]")
return text, blocked
Confirmation for Destructive Actions
Require explicit approval for dangerous operations:
class ConfirmationGate:
def __init__(self, dangerous_tools: list[str]):
self.dangerous_tools = dangerous_tools
self.pending_confirmations = {}
async def check_tool_call(self, tool_name: str, args: dict, context: dict) -> dict:
"""Intercept dangerous tool calls for confirmation."""
if tool_name not in self.dangerous_tools:
return {"approved": True}
# Generate confirmation request
confirmation_id = str(uuid.uuid4())
self.pending_confirmations[confirmation_id] = {
"tool": tool_name,
"args": args,
"context": context,
"created_at": datetime.now()
}
return {
"approved": False,
"requires_confirmation": True,
"confirmation_id": confirmation_id,
"message": f"Action '{tool_name}' requires approval. Args: {args}"
}
async def confirm(self, confirmation_id: str, approved: bool) -> dict:
"""Process a confirmation decision."""
if confirmation_id not in self.pending_confirmations:
return {"error": "Unknown confirmation ID"}
pending = self.pending_confirmations.pop(confirmation_id)
if approved:
return {"approved": True, "tool": pending["tool"], "args": pending["args"]}
else:
return {"approved": False, "reason": "User rejected"}
Audit Logging
Log everything for forensics:
class SecurityAuditLog:
def __init__(self, log_file: str):
self.log_file = log_file
def log_event(self, event_type: str, details: dict):
"""Immutable audit log entry."""
entry = {
"timestamp": datetime.now().isoformat(),
"event_type": event_type,
"details": details,
"checksum": None
}
# Add integrity checksum
entry["checksum"] = hashlib.sha256(
json.dumps(entry, sort_keys=True).encode()
).hexdigest()
with open(self.log_file, "a") as f:
f.write(json.dumps(entry) + "\n")
# Log all tool executions
audit.log_event("tool_execution", {
"tool": "run_command",
"args": {"command": "ls -la"},
"user_id": "user_123",
"agent_session": "session_456",
"result": "success"
})
Nerd Note: Defense in depth. Assume every layer will fail and add another one. Input validation + sandboxing + output filtering + confirmation + audit logging.
Next: Managing who can access what. :::