Production Security Patterns
Secure Agent Design
3 min read
LLM agents that can take actions in the real world require careful security design. This lesson covers the principle of least privilege, tool sandboxing, and human-in-the-loop patterns.
The Agent Security Challenge
┌─────────────────────────────────────────────────────────────┐
│ Agent Security Risks │
│ │
│ LLM Agent │
│ ↓ │
│ ┌─────────────────────────────────────────┐ │
│ │ Tools Available: │ │
│ │ • File system access │ ⚠️ Risk! │
│ │ • Database queries │ ⚠️ Risk! │
│ │ • API calls │ ⚠️ Risk! │
│ │ • Email sending │ ⚠️ Risk! │
│ │ • Code execution │ ⚠️ Risk! │
│ └─────────────────────────────────────────┘ │
│ │
│ If compromised via prompt injection: │
│ • Data exfiltration │
│ • Unauthorized actions │
│ • System compromise │
└─────────────────────────────────────────────────────────────┘
Principle of Least Privilege
from dataclasses import dataclass
from typing import Set, List, Callable, Dict, Any
from enum import Enum
class Permission(Enum):
FILE_READ = "file:read"
FILE_WRITE = "file:write"
DB_READ = "db:read"
DB_WRITE = "db:write"
API_CALL = "api:call"
EMAIL_SEND = "email:send"
CODE_EXECUTE = "code:execute"
@dataclass
class ToolDefinition:
name: str
function: Callable
required_permissions: Set[Permission]
description: str
is_destructive: bool = False
class SecureToolRegistry:
"""Registry with permission-based access control."""
def __init__(self):
self.tools: Dict[str, ToolDefinition] = {}
self.permission_cache: Dict[str, Set[Permission]] = {}
def register(self, tool: ToolDefinition):
"""Register a tool with its permissions."""
self.tools[tool.name] = tool
def get_allowed_tools(
self,
granted_permissions: Set[Permission]
) -> List[ToolDefinition]:
"""Get tools the agent is allowed to use."""
allowed = []
for tool in self.tools.values():
if tool.required_permissions.issubset(granted_permissions):
allowed.append(tool)
return allowed
def can_execute(
self,
tool_name: str,
granted_permissions: Set[Permission]
) -> bool:
"""Check if tool can be executed with given permissions."""
if tool_name not in self.tools:
return False
tool = self.tools[tool_name]
return tool.required_permissions.issubset(granted_permissions)
# Example tools
def read_file(path: str) -> str:
"""Read file contents."""
from pathlib import Path
return Path(path).read_text()
def write_file(path: str, content: str) -> bool:
"""Write content to file."""
from pathlib import Path
Path(path).write_text(content)
return True
def query_database(query: str) -> List[Dict]:
"""Execute read-only database query."""
# In production: actual DB connection
return [{"result": "data"}]
# Register with permissions
registry = SecureToolRegistry()
registry.register(ToolDefinition(
name="read_file",
function=read_file,
required_permissions={Permission.FILE_READ},
description="Read a file",
is_destructive=False
))
registry.register(ToolDefinition(
name="write_file",
function=write_file,
required_permissions={Permission.FILE_WRITE},
description="Write to a file",
is_destructive=True
))
registry.register(ToolDefinition(
name="query_db",
function=query_database,
required_permissions={Permission.DB_READ},
description="Query database (read-only)",
is_destructive=False
))
# Agent with limited permissions
agent_permissions = {Permission.FILE_READ, Permission.DB_READ}
allowed_tools = registry.get_allowed_tools(agent_permissions)
print(f"Agent can use: {[t.name for t in allowed_tools]}")
# Output: Agent can use: ['read_file', 'query_db']
Tool Sandboxing
from pathlib import Path
from typing import Optional, Any
import os
class SandboxedFileSystem:
"""File operations restricted to a sandbox directory."""
def __init__(self, sandbox_root: Path):
self.sandbox_root = sandbox_root.resolve()
self.sandbox_root.mkdir(parents=True, exist_ok=True)
def _validate_path(self, path: str) -> Path:
"""Ensure path is within sandbox."""
# Resolve to absolute path
full_path = (self.sandbox_root / path).resolve()
# Check if within sandbox (prevent path traversal)
try:
full_path.relative_to(self.sandbox_root)
except ValueError:
raise PermissionError(
f"Access denied: path outside sandbox"
)
return full_path
def read(self, path: str) -> str:
"""Read file from sandbox."""
safe_path = self._validate_path(path)
return safe_path.read_text()
def write(self, path: str, content: str) -> bool:
"""Write file to sandbox."""
safe_path = self._validate_path(path)
safe_path.parent.mkdir(parents=True, exist_ok=True)
safe_path.write_text(content)
return True
def list_files(self, path: str = ".") -> list:
"""List files in sandbox directory."""
safe_path = self._validate_path(path)
return [str(p.relative_to(self.sandbox_root)) for p in safe_path.iterdir()]
# Usage
sandbox = SandboxedFileSystem(Path("./agent_sandbox"))
# Safe - within sandbox
sandbox.write("data/output.txt", "Hello World")
content = sandbox.read("data/output.txt")
# Blocked - path traversal attempt
try:
sandbox.read("../../etc/passwd")
except PermissionError as e:
print(f"Blocked: {e}")
Human-in-the-Loop for High-Risk Actions
from dataclasses import dataclass
from typing import Callable, Optional, Any
from enum import Enum
import asyncio
class RiskLevel(Enum):
LOW = "low" # Auto-approve
MEDIUM = "medium" # Log but approve
HIGH = "high" # Require human approval
CRITICAL = "critical" # Require human approval + confirmation
@dataclass
class ActionRequest:
action_name: str
parameters: dict
risk_level: RiskLevel
justification: str
@dataclass
class ApprovalResult:
approved: bool
approver: Optional[str] = None
notes: Optional[str] = None
class HumanApprovalGate:
"""Gate for human approval of high-risk actions."""
def __init__(self, approval_handler: Callable):
self.approval_handler = approval_handler
self.pending_approvals: Dict[str, ActionRequest] = {}
self.approval_timeout = 300 # 5 minutes
async def request_approval(
self,
request: ActionRequest
) -> ApprovalResult:
"""Request human approval for action."""
# Auto-approve low risk
if request.risk_level == RiskLevel.LOW:
return ApprovalResult(approved=True)
# Log medium risk but approve
if request.risk_level == RiskLevel.MEDIUM:
self._log_action(request)
return ApprovalResult(approved=True)
# Require approval for high/critical
return await self._get_human_approval(request)
async def _get_human_approval(
self,
request: ActionRequest
) -> ApprovalResult:
"""Get human approval with timeout."""
request_id = self._generate_id()
self.pending_approvals[request_id] = request
# Notify human (webhook, email, Slack, etc.)
await self.approval_handler(request_id, request)
# Wait for approval with timeout
try:
result = await asyncio.wait_for(
self._wait_for_approval(request_id),
timeout=self.approval_timeout
)
return result
except asyncio.TimeoutError:
return ApprovalResult(
approved=False,
notes="Approval timed out"
)
async def _wait_for_approval(
self,
request_id: str
) -> ApprovalResult:
"""Wait for human to approve/deny."""
# In production: webhook callback, polling, etc.
# Placeholder implementation
await asyncio.sleep(1)
return ApprovalResult(approved=True, approver="human@example.com")
def _log_action(self, request: ActionRequest):
"""Log action for audit."""
print(f"[AUDIT] {request.action_name}: {request.parameters}")
def _generate_id(self) -> str:
import uuid
return str(uuid.uuid4())[:8]
# Integration with agent
class SecureAgent:
"""Agent with human-in-the-loop for dangerous actions."""
RISK_LEVELS = {
"read_file": RiskLevel.LOW,
"write_file": RiskLevel.MEDIUM,
"delete_file": RiskLevel.HIGH,
"send_email": RiskLevel.HIGH,
"execute_code": RiskLevel.CRITICAL,
"make_payment": RiskLevel.CRITICAL,
}
def __init__(self, approval_gate: HumanApprovalGate):
self.approval_gate = approval_gate
async def execute_tool(
self,
tool_name: str,
parameters: dict,
justification: str
) -> Any:
"""Execute tool with appropriate approval."""
risk_level = self.RISK_LEVELS.get(tool_name, RiskLevel.HIGH)
request = ActionRequest(
action_name=tool_name,
parameters=parameters,
risk_level=risk_level,
justification=justification
)
approval = await self.approval_gate.request_approval(request)
if not approval.approved:
raise PermissionError(
f"Action '{tool_name}' was not approved: {approval.notes}"
)
# Execute the actual tool
return self._do_execute(tool_name, parameters)
def _do_execute(self, tool_name: str, parameters: dict) -> Any:
"""Actually execute the tool."""
# Tool execution logic here
pass
Complete Secure Agent
class SecureLLMAgent:
"""Production-ready secure LLM agent."""
def __init__(
self,
llm_client,
permissions: Set[Permission],
sandbox_path: Path
):
self.llm = llm_client
self.permissions = permissions
self.sandbox = SandboxedFileSystem(sandbox_path)
self.registry = self._setup_tools()
self.approval_gate = HumanApprovalGate(self._notify_approver)
def _setup_tools(self) -> SecureToolRegistry:
"""Setup tool registry with sandboxed functions."""
registry = SecureToolRegistry()
# Wrap tools with sandbox
registry.register(ToolDefinition(
name="read_file",
function=self.sandbox.read,
required_permissions={Permission.FILE_READ},
description="Read file from sandbox",
is_destructive=False
))
registry.register(ToolDefinition(
name="write_file",
function=self.sandbox.write,
required_permissions={Permission.FILE_WRITE},
description="Write file to sandbox",
is_destructive=True
))
return registry
async def _notify_approver(self, request_id: str, request: ActionRequest):
"""Send approval request to human."""
print(f"[APPROVAL NEEDED] {request_id}: {request.action_name}")
print(f" Parameters: {request.parameters}")
print(f" Justification: {request.justification}")
async def run(self, task: str) -> str:
"""Run agent on task with security controls."""
# Get allowed tools for this agent
allowed_tools = self.registry.get_allowed_tools(self.permissions)
tool_descriptions = [
f"{t.name}: {t.description}"
for t in allowed_tools
]
# Generate plan with LLM
plan = await self.llm.plan(
task=task,
available_tools=tool_descriptions
)
# Execute plan with approval gates
results = []
for step in plan.steps:
if not self.registry.can_execute(step.tool, self.permissions):
results.append(f"Skipped {step.tool}: insufficient permissions")
continue
try:
result = await self._execute_step(step)
results.append(result)
except PermissionError as e:
results.append(f"Blocked: {e}")
return "\n".join(str(r) for r in results)
async def _execute_step(self, step) -> Any:
"""Execute a single step with security checks."""
tool = self.registry.tools.get(step.tool)
if tool.is_destructive:
# Require approval for destructive actions
approval = await self.approval_gate.request_approval(
ActionRequest(
action_name=step.tool,
parameters=step.parameters,
risk_level=RiskLevel.HIGH,
justification=step.justification
)
)
if not approval.approved:
raise PermissionError(f"Action not approved")
return tool.function(**step.parameters)
# Usage
agent = SecureLLMAgent(
llm_client=llm,
permissions={Permission.FILE_READ, Permission.FILE_WRITE},
sandbox_path=Path("./agent_workspace")
)
result = await agent.run("Summarize the files in the data folder")
Key Takeaway: Secure agents require multiple layers: minimal permissions, sandboxed environments, and human approval for high-risk actions. Never give an LLM agent more access than it needs for its specific task. :::