Lesson 14 of 20

Production & Enterprise

Security Best Practices

4 min read

An agent with access to your systems is a powerful tool—and a powerful attack surface. Security isn't optional; it's existential.

The Threat Model

ThreatExampleMitigation
Prompt InjectionUser tricks agent into running malicious codeInput sanitization, sandboxing
Data ExfiltrationAgent sends sensitive data to external serviceNetwork policies, output filtering
Privilege EscalationAgent gains access beyond its scopeLeast privilege, capability tokens
Resource AbuseAgent runs infinite loops, consumes all tokensRate limits, budgets, timeouts
Tool MisuseAgent deletes wrong files, drops tablesConfirmation flows, dry-run modes

Input Sanitization

Never trust user input—or agent-generated content:

class InputSanitizer:
    def __init__(self):
        self.dangerous_patterns = [
            r"ignore previous instructions",
            r"system prompt",
            r"<\|.*?\|>",  # Special tokens
            r"```.*?exec\(.*?```",  # Code execution
        ]

    def sanitize(self, text: str) -> tuple[str, list[str]]:
        """Remove dangerous patterns and return warnings."""
        warnings = []

        for pattern in self.dangerous_patterns:
            if re.search(pattern, text, re.IGNORECASE | re.DOTALL):
                warnings.append(f"Blocked pattern: {pattern}")
                text = re.sub(pattern, "[BLOCKED]", text, flags=re.IGNORECASE | re.DOTALL)

        return text, warnings

    def validate_tool_args(self, tool_name: str, args: dict) -> tuple[bool, str]:
        """Validate tool arguments before execution."""
        if tool_name == "run_command":
            cmd = args.get("command", "")
            # Block dangerous commands
            blocked = ["rm -rf", "sudo", "chmod 777", "curl | bash", "eval"]
            for b in blocked:
                if b in cmd:
                    return False, f"Blocked command pattern: {b}"

        if tool_name == "write_file":
            path = args.get("path", "")
            # Prevent path traversal
            if ".." in path or path.startswith("/etc") or path.startswith("/root"):
                return False, f"Blocked path: {path}"

        return True, ""

Sandboxed Execution

Run code in isolated environments:

import docker

class SandboxExecutor:
    def __init__(self, image: str = "python:3.11-slim"):
        self.client = docker.from_env()
        self.image = image
        self.timeout = 30
        self.memory_limit = "256m"
        self.network_disabled = True

    async def execute(self, code: str) -> dict:
        """Run code in an isolated container."""
        container = self.client.containers.run(
            self.image,
            command=["python", "-c", code],
            detach=True,
            mem_limit=self.memory_limit,
            network_disabled=self.network_disabled,
            read_only=True,
            security_opt=["no-new-privileges:true"]
        )

        try:
            result = container.wait(timeout=self.timeout)
            logs = container.logs().decode()
            return {
                "success": result["StatusCode"] == 0,
                "output": logs,
                "exit_code": result["StatusCode"]
            }
        except Exception as e:
            return {"success": False, "error": str(e)}
        finally:
            container.remove(force=True)

Output Filtering

Prevent sensitive data leakage:

class OutputFilter:
    def __init__(self):
        self.patterns = {
            "api_key": r"(sk-[a-zA-Z0-9]{48}|api[_-]?key['\"]?\s*[:=]\s*['\"][^'\"]+)",
            "password": r"password['\"]?\s*[:=]\s*['\"][^'\"]+",
            "jwt": r"eyJ[a-zA-Z0-9_-]+\.eyJ[a-zA-Z0-9_-]+\.[a-zA-Z0-9_-]+",
            "private_key": r"-----BEGIN (RSA |EC )?PRIVATE KEY-----",
            "aws_key": r"AKIA[0-9A-Z]{16}",
        }

    def filter_output(self, text: str) -> tuple[str, list[str]]:
        """Redact sensitive information from agent output."""
        redacted = []

        for name, pattern in self.patterns.items():
            matches = re.findall(pattern, text, re.IGNORECASE)
            if matches:
                redacted.append(f"Redacted {len(matches)} {name} pattern(s)")
                text = re.sub(pattern, f"[REDACTED_{name.upper()}]", text, flags=re.IGNORECASE)

        return text, redacted

    def validate_urls(self, text: str, allowed_domains: list[str]) -> tuple[str, list[str]]:
        """Block requests to unauthorized domains."""
        blocked = []
        urls = re.findall(r'https?://([^/\s]+)', text)

        for url in urls:
            domain = url.split(':')[0]  # Remove port
            if not any(domain.endswith(d) for d in allowed_domains):
                blocked.append(domain)
                text = text.replace(url, "[BLOCKED_DOMAIN]")

        return text, blocked

Confirmation for Destructive Actions

Require explicit approval for dangerous operations:

class ConfirmationGate:
    def __init__(self, dangerous_tools: list[str]):
        self.dangerous_tools = dangerous_tools
        self.pending_confirmations = {}

    async def check_tool_call(self, tool_name: str, args: dict, context: dict) -> dict:
        """Intercept dangerous tool calls for confirmation."""
        if tool_name not in self.dangerous_tools:
            return {"approved": True}

        # Generate confirmation request
        confirmation_id = str(uuid.uuid4())
        self.pending_confirmations[confirmation_id] = {
            "tool": tool_name,
            "args": args,
            "context": context,
            "created_at": datetime.now()
        }

        return {
            "approved": False,
            "requires_confirmation": True,
            "confirmation_id": confirmation_id,
            "message": f"Action '{tool_name}' requires approval. Args: {args}"
        }

    async def confirm(self, confirmation_id: str, approved: bool) -> dict:
        """Process a confirmation decision."""
        if confirmation_id not in self.pending_confirmations:
            return {"error": "Unknown confirmation ID"}

        pending = self.pending_confirmations.pop(confirmation_id)

        if approved:
            return {"approved": True, "tool": pending["tool"], "args": pending["args"]}
        else:
            return {"approved": False, "reason": "User rejected"}

Audit Logging

Log everything for forensics:

class SecurityAuditLog:
    def __init__(self, log_file: str):
        self.log_file = log_file

    def log_event(self, event_type: str, details: dict):
        """Immutable audit log entry."""
        entry = {
            "timestamp": datetime.now().isoformat(),
            "event_type": event_type,
            "details": details,
            "checksum": None
        }
        # Add integrity checksum
        entry["checksum"] = hashlib.sha256(
            json.dumps(entry, sort_keys=True).encode()
        ).hexdigest()

        with open(self.log_file, "a") as f:
            f.write(json.dumps(entry) + "\n")

# Log all tool executions
audit.log_event("tool_execution", {
    "tool": "run_command",
    "args": {"command": "ls -la"},
    "user_id": "user_123",
    "agent_session": "session_456",
    "result": "success"
})

Nerd Note: Defense in depth. Assume every layer will fail and add another one. Input validation + sandboxing + output filtering + confirmation + audit logging.

Next: Managing who can access what. :::

Quick check: how does this lesson land for you?

Quiz

Module 4: Production & Enterprise

Take Quiz
FREE WEEKLY NEWSLETTER

Stay on the Nerd Track

One email per week — courses, deep dives, tools, and AI experiments.

No spam. Unsubscribe anytime.