Lesson 15 of 20

Production & Enterprise

Identity & Access Control

4 min read

When agents act on behalf of users, you need robust identity systems. Who is this agent working for? What are they allowed to do?

The Identity Chain

User → Session → Agent → Tool → Resource
  ↓        ↓        ↓       ↓        ↓
 Auth   Context   Scope  Permission  Data

Every link in this chain needs verification.

Capability-Based Access

Instead of checking permissions at every tool call, issue capability tokens:

from dataclasses import dataclass
from datetime import datetime, timedelta
import jwt

@dataclass
class Capability:
    resource: str      # What resource (e.g., "files:/project/*")
    actions: list[str] # What actions (e.g., ["read", "write"])
    expires_at: datetime
    constraints: dict  # Additional limits

class CapabilityManager:
    def __init__(self, secret_key: str):
        self.secret_key = secret_key

    def issue_capability(
        self,
        user_id: str,
        resource: str,
        actions: list[str],
        ttl_minutes: int = 60,
        constraints: dict = None
    ) -> str:
        """Issue a capability token for specific access."""
        payload = {
            "sub": user_id,
            "resource": resource,
            "actions": actions,
            "constraints": constraints or {},
            "exp": datetime.utcnow() + timedelta(minutes=ttl_minutes),
            "iat": datetime.utcnow()
        }
        return jwt.encode(payload, self.secret_key, algorithm="HS256")

    def verify_capability(self, token: str, resource: str, action: str) -> dict:
        """Verify a capability token for a specific action."""
        try:
            payload = jwt.decode(token, self.secret_key, algorithms=["HS256"])

            # Check resource match (supports wildcards)
            if not self.resource_matches(payload["resource"], resource):
                return {"valid": False, "error": "Resource mismatch"}

            # Check action allowed
            if action not in payload["actions"]:
                return {"valid": False, "error": f"Action '{action}' not permitted"}

            return {"valid": True, "user_id": payload["sub"], "constraints": payload["constraints"]}

        except jwt.ExpiredSignatureError:
            return {"valid": False, "error": "Token expired"}
        except jwt.InvalidTokenError as e:
            return {"valid": False, "error": str(e)}

    def resource_matches(self, pattern: str, resource: str) -> bool:
        """Check if resource matches pattern with wildcards."""
        import fnmatch
        return fnmatch.fnmatch(resource, pattern)

Scoped Agent Sessions

Create agent instances with limited scope:

class ScopedAgentSession:
    def __init__(
        self,
        user_id: str,
        capabilities: list[str],
        tool_whitelist: list[str],
        resource_paths: list[str]
    ):
        self.user_id = user_id
        self.capabilities = capabilities
        self.tool_whitelist = tool_whitelist
        self.resource_paths = resource_paths
        self.session_id = str(uuid.uuid4())
        self.created_at = datetime.now()

    def can_use_tool(self, tool_name: str) -> bool:
        """Check if tool is in whitelist."""
        return tool_name in self.tool_whitelist

    def can_access_resource(self, path: str) -> bool:
        """Check if path is within allowed resources."""
        return any(
            path.startswith(allowed) for allowed in self.resource_paths
        )

    def get_filtered_tools(self, all_tools: list[dict]) -> list[dict]:
        """Return only tools this session can use."""
        return [t for t in all_tools if t["name"] in self.tool_whitelist]


# Create a session for a specific user role
def create_developer_session(user_id: str) -> ScopedAgentSession:
    return ScopedAgentSession(
        user_id=user_id,
        capabilities=["code:read", "code:write", "shell:execute"],
        tool_whitelist=["read_file", "write_file", "run_command", "git_commit"],
        resource_paths=["/home/projects/", "/tmp/"]
    )

def create_readonly_session(user_id: str) -> ScopedAgentSession:
    return ScopedAgentSession(
        user_id=user_id,
        capabilities=["code:read"],
        tool_whitelist=["read_file", "search_files", "list_directory"],
        resource_paths=["/home/projects/"]
    )

Role-Based Tool Access

Map organizational roles to tool permissions:

ROLE_PERMISSIONS = {
    "viewer": {
        "tools": ["read_file", "search", "list_directory"],
        "actions": ["read"],
        "resource_patterns": ["*"]
    },
    "developer": {
        "tools": ["read_file", "write_file", "run_tests", "git_*"],
        "actions": ["read", "write", "execute"],
        "resource_patterns": ["/projects/*", "/tmp/*"]
    },
    "admin": {
        "tools": ["*"],
        "actions": ["*"],
        "resource_patterns": ["*"]
    },
    "ci_bot": {
        "tools": ["run_tests", "build", "deploy_staging"],
        "actions": ["read", "execute"],
        "resource_patterns": ["/ci/*", "/builds/*"]
    }
}

class RoleBasedAccess:
    def __init__(self):
        self.user_roles = {}  # user_id -> role

    def assign_role(self, user_id: str, role: str):
        if role not in ROLE_PERMISSIONS:
            raise ValueError(f"Unknown role: {role}")
        self.user_roles[user_id] = role

    def get_user_tools(self, user_id: str, all_tools: list[dict]) -> list[dict]:
        """Filter tools based on user's role."""
        role = self.user_roles.get(user_id, "viewer")
        permissions = ROLE_PERMISSIONS[role]

        allowed_tools = []
        for tool in all_tools:
            if self.tool_matches(tool["name"], permissions["tools"]):
                allowed_tools.append(tool)

        return allowed_tools

    def tool_matches(self, tool_name: str, patterns: list[str]) -> bool:
        """Check if tool matches any pattern (supports wildcards)."""
        import fnmatch
        return any(fnmatch.fnmatch(tool_name, p) for p in patterns)

Multi-Tenant Isolation

When serving multiple organizations:

class TenantIsolation:
    def __init__(self):
        self.tenant_configs = {}

    def register_tenant(self, tenant_id: str, config: dict):
        """Register tenant with their specific configuration."""
        self.tenant_configs[tenant_id] = {
            "allowed_models": config.get("allowed_models", ["claude-sonnet-4-20250514"]),
            "max_tokens_per_request": config.get("max_tokens", 4096),
            "allowed_tools": config.get("tools", []),
            "data_region": config.get("region", "us"),
            "custom_system_prompt": config.get("system_prompt", "")
        }

    def get_tenant_agent_config(self, tenant_id: str) -> dict:
        """Get agent configuration for a specific tenant."""
        if tenant_id not in self.tenant_configs:
            raise ValueError(f"Unknown tenant: {tenant_id}")

        config = self.tenant_configs[tenant_id]

        return {
            "model": config["allowed_models"][0],
            "max_tokens": config["max_tokens_per_request"],
            "tools": self.filter_tools(config["allowed_tools"]),
            "system_prompt": config["custom_system_prompt"]
        }

    def validate_request(self, tenant_id: str, request: dict) -> tuple[bool, str]:
        """Validate a request against tenant policies."""
        config = self.tenant_configs.get(tenant_id)
        if not config:
            return False, "Unknown tenant"

        if request.get("model") not in config["allowed_models"]:
            return False, f"Model not allowed for tenant"

        if request.get("max_tokens", 0) > config["max_tokens_per_request"]:
            return False, f"Token limit exceeded"

        return True, ""

Audit Trail for Access

Log all access decisions:

class AccessAuditLog:
    async def log_access(
        self,
        user_id: str,
        session_id: str,
        resource: str,
        action: str,
        granted: bool,
        reason: str = None
    ):
        """Log every access decision for compliance."""
        entry = {
            "timestamp": datetime.now().isoformat(),
            "user_id": user_id,
            "session_id": session_id,
            "resource": resource,
            "action": action,
            "granted": granted,
            "reason": reason
        }
        # Write to immutable audit log
        await self.write_to_audit_store(entry)

Nerd Note: The principle of least privilege means agents should have the minimum access needed for their task—no more. When in doubt, deny.

Next: Controlling costs at scale. :::

Quiz

Module 4: Production & Enterprise

Take Quiz