Lesson 14 of 20

Production & Enterprise

Security Best Practices

4 min read

An agent with access to your systems is a powerful tool—and a powerful attack surface. Security isn't optional; it's existential.

The Threat Model

Threat Example Mitigation
Prompt Injection User tricks agent into running malicious code Input sanitization, sandboxing
Data Exfiltration Agent sends sensitive data to external service Network policies, output filtering
Privilege Escalation Agent gains access beyond its scope Least privilege, capability tokens
Resource Abuse Agent runs infinite loops, consumes all tokens Rate limits, budgets, timeouts
Tool Misuse Agent deletes wrong files, drops tables Confirmation flows, dry-run modes

Input Sanitization

Never trust user input—or agent-generated content:

class InputSanitizer:
    def __init__(self):
        self.dangerous_patterns = [
            r"ignore previous instructions",
            r"system prompt",
            r"<\|.*?\|>",  # Special tokens
            r"```.*?exec\(.*?```",  # Code execution
        ]

    def sanitize(self, text: str) -> tuple[str, list[str]]:
        """Remove dangerous patterns and return warnings."""
        warnings = []

        for pattern in self.dangerous_patterns:
            if re.search(pattern, text, re.IGNORECASE | re.DOTALL):
                warnings.append(f"Blocked pattern: {pattern}")
                text = re.sub(pattern, "[BLOCKED]", text, flags=re.IGNORECASE | re.DOTALL)

        return text, warnings

    def validate_tool_args(self, tool_name: str, args: dict) -> tuple[bool, str]:
        """Validate tool arguments before execution."""
        if tool_name == "run_command":
            cmd = args.get("command", "")
            # Block dangerous commands
            blocked = ["rm -rf", "sudo", "chmod 777", "curl | bash", "eval"]
            for b in blocked:
                if b in cmd:
                    return False, f"Blocked command pattern: {b}"

        if tool_name == "write_file":
            path = args.get("path", "")
            # Prevent path traversal
            if ".." in path or path.startswith("/etc") or path.startswith("/root"):
                return False, f"Blocked path: {path}"

        return True, ""

Sandboxed Execution

Run code in isolated environments:

import docker

class SandboxExecutor:
    def __init__(self, image: str = "python:3.11-slim"):
        self.client = docker.from_env()
        self.image = image
        self.timeout = 30
        self.memory_limit = "256m"
        self.network_disabled = True

    async def execute(self, code: str) -> dict:
        """Run code in an isolated container."""
        container = self.client.containers.run(
            self.image,
            command=["python", "-c", code],
            detach=True,
            mem_limit=self.memory_limit,
            network_disabled=self.network_disabled,
            read_only=True,
            security_opt=["no-new-privileges:true"]
        )

        try:
            result = container.wait(timeout=self.timeout)
            logs = container.logs().decode()
            return {
                "success": result["StatusCode"] == 0,
                "output": logs,
                "exit_code": result["StatusCode"]
            }
        except Exception as e:
            return {"success": False, "error": str(e)}
        finally:
            container.remove(force=True)

Output Filtering

Prevent sensitive data leakage:

class OutputFilter:
    def __init__(self):
        self.patterns = {
            "api_key": r"(sk-[a-zA-Z0-9]{48}|api[_-]?key['\"]?\s*[:=]\s*['\"][^'\"]+)",
            "password": r"password['\"]?\s*[:=]\s*['\"][^'\"]+",
            "jwt": r"eyJ[a-zA-Z0-9_-]+\.eyJ[a-zA-Z0-9_-]+\.[a-zA-Z0-9_-]+",
            "private_key": r"-----BEGIN (RSA |EC )?PRIVATE KEY-----",
            "aws_key": r"AKIA[0-9A-Z]{16}",
        }

    def filter_output(self, text: str) -> tuple[str, list[str]]:
        """Redact sensitive information from agent output."""
        redacted = []

        for name, pattern in self.patterns.items():
            matches = re.findall(pattern, text, re.IGNORECASE)
            if matches:
                redacted.append(f"Redacted {len(matches)} {name} pattern(s)")
                text = re.sub(pattern, f"[REDACTED_{name.upper()}]", text, flags=re.IGNORECASE)

        return text, redacted

    def validate_urls(self, text: str, allowed_domains: list[str]) -> tuple[str, list[str]]:
        """Block requests to unauthorized domains."""
        blocked = []
        urls = re.findall(r'https?://([^/\s]+)', text)

        for url in urls:
            domain = url.split(':')[0]  # Remove port
            if not any(domain.endswith(d) for d in allowed_domains):
                blocked.append(domain)
                text = text.replace(url, "[BLOCKED_DOMAIN]")

        return text, blocked

Confirmation for Destructive Actions

Require explicit approval for dangerous operations:

class ConfirmationGate:
    def __init__(self, dangerous_tools: list[str]):
        self.dangerous_tools = dangerous_tools
        self.pending_confirmations = {}

    async def check_tool_call(self, tool_name: str, args: dict, context: dict) -> dict:
        """Intercept dangerous tool calls for confirmation."""
        if tool_name not in self.dangerous_tools:
            return {"approved": True}

        # Generate confirmation request
        confirmation_id = str(uuid.uuid4())
        self.pending_confirmations[confirmation_id] = {
            "tool": tool_name,
            "args": args,
            "context": context,
            "created_at": datetime.now()
        }

        return {
            "approved": False,
            "requires_confirmation": True,
            "confirmation_id": confirmation_id,
            "message": f"Action '{tool_name}' requires approval. Args: {args}"
        }

    async def confirm(self, confirmation_id: str, approved: bool) -> dict:
        """Process a confirmation decision."""
        if confirmation_id not in self.pending_confirmations:
            return {"error": "Unknown confirmation ID"}

        pending = self.pending_confirmations.pop(confirmation_id)

        if approved:
            return {"approved": True, "tool": pending["tool"], "args": pending["args"]}
        else:
            return {"approved": False, "reason": "User rejected"}

Audit Logging

Log everything for forensics:

class SecurityAuditLog:
    def __init__(self, log_file: str):
        self.log_file = log_file

    def log_event(self, event_type: str, details: dict):
        """Immutable audit log entry."""
        entry = {
            "timestamp": datetime.now().isoformat(),
            "event_type": event_type,
            "details": details,
            "checksum": None
        }
        # Add integrity checksum
        entry["checksum"] = hashlib.sha256(
            json.dumps(entry, sort_keys=True).encode()
        ).hexdigest()

        with open(self.log_file, "a") as f:
            f.write(json.dumps(entry) + "\n")

# Log all tool executions
audit.log_event("tool_execution", {
    "tool": "run_command",
    "args": {"command": "ls -la"},
    "user_id": "user_123",
    "agent_session": "session_456",
    "result": "success"
})

Nerd Note: Defense in depth. Assume every layer will fail and add another one. Input validation + sandboxing + output filtering + confirmation + audit logging.

Next: Managing who can access what. :::

Quiz

Module 4: Production & Enterprise

Take Quiz