Production Security Patterns

Secure Agent Design

3 min read

LLM agents that can take actions in the real world require careful security design. This lesson covers the principle of least privilege, tool sandboxing, and human-in-the-loop patterns.

The Agent Security Challenge

┌─────────────────────────────────────────────────────────────┐
│                    Agent Security Risks                      │
│                                                             │
│   LLM Agent                                                 │
│       ↓                                                     │
│   ┌─────────────────────────────────────────┐               │
│   │  Tools Available:                       │               │
│   │  • File system access                   │  ⚠️ Risk!     │
│   │  • Database queries                     │  ⚠️ Risk!     │
│   │  • API calls                           │  ⚠️ Risk!     │
│   │  • Email sending                       │  ⚠️ Risk!     │
│   │  • Code execution                      │  ⚠️ Risk!     │
│   └─────────────────────────────────────────┘               │
│                                                             │
│   If compromised via prompt injection:                      │
│   • Data exfiltration                                       │
│   • Unauthorized actions                                    │
│   • System compromise                                       │
└─────────────────────────────────────────────────────────────┘

Principle of Least Privilege

from dataclasses import dataclass
from typing import Set, List, Callable, Dict, Any
from enum import Enum

class Permission(Enum):
    FILE_READ = "file:read"
    FILE_WRITE = "file:write"
    DB_READ = "db:read"
    DB_WRITE = "db:write"
    API_CALL = "api:call"
    EMAIL_SEND = "email:send"
    CODE_EXECUTE = "code:execute"

@dataclass
class ToolDefinition:
    name: str
    function: Callable
    required_permissions: Set[Permission]
    description: str
    is_destructive: bool = False

class SecureToolRegistry:
    """Registry with permission-based access control."""

    def __init__(self):
        self.tools: Dict[str, ToolDefinition] = {}
        self.permission_cache: Dict[str, Set[Permission]] = {}

    def register(self, tool: ToolDefinition):
        """Register a tool with its permissions."""
        self.tools[tool.name] = tool

    def get_allowed_tools(
        self,
        granted_permissions: Set[Permission]
    ) -> List[ToolDefinition]:
        """Get tools the agent is allowed to use."""
        allowed = []
        for tool in self.tools.values():
            if tool.required_permissions.issubset(granted_permissions):
                allowed.append(tool)
        return allowed

    def can_execute(
        self,
        tool_name: str,
        granted_permissions: Set[Permission]
    ) -> bool:
        """Check if tool can be executed with given permissions."""
        if tool_name not in self.tools:
            return False

        tool = self.tools[tool_name]
        return tool.required_permissions.issubset(granted_permissions)

# Example tools
def read_file(path: str) -> str:
    """Read file contents."""
    from pathlib import Path
    return Path(path).read_text()

def write_file(path: str, content: str) -> bool:
    """Write content to file."""
    from pathlib import Path
    Path(path).write_text(content)
    return True

def query_database(query: str) -> List[Dict]:
    """Execute read-only database query."""
    # In production: actual DB connection
    return [{"result": "data"}]

# Register with permissions
registry = SecureToolRegistry()

registry.register(ToolDefinition(
    name="read_file",
    function=read_file,
    required_permissions={Permission.FILE_READ},
    description="Read a file",
    is_destructive=False
))

registry.register(ToolDefinition(
    name="write_file",
    function=write_file,
    required_permissions={Permission.FILE_WRITE},
    description="Write to a file",
    is_destructive=True
))

registry.register(ToolDefinition(
    name="query_db",
    function=query_database,
    required_permissions={Permission.DB_READ},
    description="Query database (read-only)",
    is_destructive=False
))

# Agent with limited permissions
agent_permissions = {Permission.FILE_READ, Permission.DB_READ}
allowed_tools = registry.get_allowed_tools(agent_permissions)
print(f"Agent can use: {[t.name for t in allowed_tools]}")
# Output: Agent can use: ['read_file', 'query_db']

Tool Sandboxing

from pathlib import Path
from typing import Optional, Any
import os

class SandboxedFileSystem:
    """File operations restricted to a sandbox directory."""

    def __init__(self, sandbox_root: Path):
        self.sandbox_root = sandbox_root.resolve()
        self.sandbox_root.mkdir(parents=True, exist_ok=True)

    def _validate_path(self, path: str) -> Path:
        """Ensure path is within sandbox."""
        # Resolve to absolute path
        full_path = (self.sandbox_root / path).resolve()

        # Check if within sandbox (prevent path traversal)
        try:
            full_path.relative_to(self.sandbox_root)
        except ValueError:
            raise PermissionError(
                f"Access denied: path outside sandbox"
            )

        return full_path

    def read(self, path: str) -> str:
        """Read file from sandbox."""
        safe_path = self._validate_path(path)
        return safe_path.read_text()

    def write(self, path: str, content: str) -> bool:
        """Write file to sandbox."""
        safe_path = self._validate_path(path)
        safe_path.parent.mkdir(parents=True, exist_ok=True)
        safe_path.write_text(content)
        return True

    def list_files(self, path: str = ".") -> list:
        """List files in sandbox directory."""
        safe_path = self._validate_path(path)
        return [str(p.relative_to(self.sandbox_root)) for p in safe_path.iterdir()]

# Usage
sandbox = SandboxedFileSystem(Path("./agent_sandbox"))

# Safe - within sandbox
sandbox.write("data/output.txt", "Hello World")
content = sandbox.read("data/output.txt")

# Blocked - path traversal attempt
try:
    sandbox.read("../../etc/passwd")
except PermissionError as e:
    print(f"Blocked: {e}")

Human-in-the-Loop for High-Risk Actions

from dataclasses import dataclass
from typing import Callable, Optional, Any
from enum import Enum
import asyncio

class RiskLevel(Enum):
    LOW = "low"  # Auto-approve
    MEDIUM = "medium"  # Log but approve
    HIGH = "high"  # Require human approval
    CRITICAL = "critical"  # Require human approval + confirmation

@dataclass
class ActionRequest:
    action_name: str
    parameters: dict
    risk_level: RiskLevel
    justification: str

@dataclass
class ApprovalResult:
    approved: bool
    approver: Optional[str] = None
    notes: Optional[str] = None

class HumanApprovalGate:
    """Gate for human approval of high-risk actions."""

    def __init__(self, approval_handler: Callable):
        self.approval_handler = approval_handler
        self.pending_approvals: Dict[str, ActionRequest] = {}
        self.approval_timeout = 300  # 5 minutes

    async def request_approval(
        self,
        request: ActionRequest
    ) -> ApprovalResult:
        """Request human approval for action."""
        # Auto-approve low risk
        if request.risk_level == RiskLevel.LOW:
            return ApprovalResult(approved=True)

        # Log medium risk but approve
        if request.risk_level == RiskLevel.MEDIUM:
            self._log_action(request)
            return ApprovalResult(approved=True)

        # Require approval for high/critical
        return await self._get_human_approval(request)

    async def _get_human_approval(
        self,
        request: ActionRequest
    ) -> ApprovalResult:
        """Get human approval with timeout."""
        request_id = self._generate_id()
        self.pending_approvals[request_id] = request

        # Notify human (webhook, email, Slack, etc.)
        await self.approval_handler(request_id, request)

        # Wait for approval with timeout
        try:
            result = await asyncio.wait_for(
                self._wait_for_approval(request_id),
                timeout=self.approval_timeout
            )
            return result
        except asyncio.TimeoutError:
            return ApprovalResult(
                approved=False,
                notes="Approval timed out"
            )

    async def _wait_for_approval(
        self,
        request_id: str
    ) -> ApprovalResult:
        """Wait for human to approve/deny."""
        # In production: webhook callback, polling, etc.
        # Placeholder implementation
        await asyncio.sleep(1)
        return ApprovalResult(approved=True, approver="human@example.com")

    def _log_action(self, request: ActionRequest):
        """Log action for audit."""
        print(f"[AUDIT] {request.action_name}: {request.parameters}")

    def _generate_id(self) -> str:
        import uuid
        return str(uuid.uuid4())[:8]

# Integration with agent
class SecureAgent:
    """Agent with human-in-the-loop for dangerous actions."""

    RISK_LEVELS = {
        "read_file": RiskLevel.LOW,
        "write_file": RiskLevel.MEDIUM,
        "delete_file": RiskLevel.HIGH,
        "send_email": RiskLevel.HIGH,
        "execute_code": RiskLevel.CRITICAL,
        "make_payment": RiskLevel.CRITICAL,
    }

    def __init__(self, approval_gate: HumanApprovalGate):
        self.approval_gate = approval_gate

    async def execute_tool(
        self,
        tool_name: str,
        parameters: dict,
        justification: str
    ) -> Any:
        """Execute tool with appropriate approval."""
        risk_level = self.RISK_LEVELS.get(tool_name, RiskLevel.HIGH)

        request = ActionRequest(
            action_name=tool_name,
            parameters=parameters,
            risk_level=risk_level,
            justification=justification
        )

        approval = await self.approval_gate.request_approval(request)

        if not approval.approved:
            raise PermissionError(
                f"Action '{tool_name}' was not approved: {approval.notes}"
            )

        # Execute the actual tool
        return self._do_execute(tool_name, parameters)

    def _do_execute(self, tool_name: str, parameters: dict) -> Any:
        """Actually execute the tool."""
        # Tool execution logic here
        pass

Complete Secure Agent

class SecureLLMAgent:
    """Production-ready secure LLM agent."""

    def __init__(
        self,
        llm_client,
        permissions: Set[Permission],
        sandbox_path: Path
    ):
        self.llm = llm_client
        self.permissions = permissions
        self.sandbox = SandboxedFileSystem(sandbox_path)
        self.registry = self._setup_tools()
        self.approval_gate = HumanApprovalGate(self._notify_approver)

    def _setup_tools(self) -> SecureToolRegistry:
        """Setup tool registry with sandboxed functions."""
        registry = SecureToolRegistry()

        # Wrap tools with sandbox
        registry.register(ToolDefinition(
            name="read_file",
            function=self.sandbox.read,
            required_permissions={Permission.FILE_READ},
            description="Read file from sandbox",
            is_destructive=False
        ))

        registry.register(ToolDefinition(
            name="write_file",
            function=self.sandbox.write,
            required_permissions={Permission.FILE_WRITE},
            description="Write file to sandbox",
            is_destructive=True
        ))

        return registry

    async def _notify_approver(self, request_id: str, request: ActionRequest):
        """Send approval request to human."""
        print(f"[APPROVAL NEEDED] {request_id}: {request.action_name}")
        print(f"  Parameters: {request.parameters}")
        print(f"  Justification: {request.justification}")

    async def run(self, task: str) -> str:
        """Run agent on task with security controls."""
        # Get allowed tools for this agent
        allowed_tools = self.registry.get_allowed_tools(self.permissions)
        tool_descriptions = [
            f"{t.name}: {t.description}"
            for t in allowed_tools
        ]

        # Generate plan with LLM
        plan = await self.llm.plan(
            task=task,
            available_tools=tool_descriptions
        )

        # Execute plan with approval gates
        results = []
        for step in plan.steps:
            if not self.registry.can_execute(step.tool, self.permissions):
                results.append(f"Skipped {step.tool}: insufficient permissions")
                continue

            try:
                result = await self._execute_step(step)
                results.append(result)
            except PermissionError as e:
                results.append(f"Blocked: {e}")

        return "\n".join(str(r) for r in results)

    async def _execute_step(self, step) -> Any:
        """Execute a single step with security checks."""
        tool = self.registry.tools.get(step.tool)

        if tool.is_destructive:
            # Require approval for destructive actions
            approval = await self.approval_gate.request_approval(
                ActionRequest(
                    action_name=step.tool,
                    parameters=step.parameters,
                    risk_level=RiskLevel.HIGH,
                    justification=step.justification
                )
            )

            if not approval.approved:
                raise PermissionError(f"Action not approved")

        return tool.function(**step.parameters)

# Usage
agent = SecureLLMAgent(
    llm_client=llm,
    permissions={Permission.FILE_READ, Permission.FILE_WRITE},
    sandbox_path=Path("./agent_workspace")
)

result = await agent.run("Summarize the files in the data folder")

Key Takeaway: Secure agents require multiple layers: minimal permissions, sandboxed environments, and human approval for high-risk actions. Never give an LLM agent more access than it needs for its specific task. :::

Quiz

Module 5: Production Security Patterns

Take Quiz