Production & Enterprise
Identity & Access Control
4 min read
When agents act on behalf of users, you need robust identity systems. Who is this agent working for? What are they allowed to do?
The Identity Chain
User → Session → Agent → Tool → Resource
↓ ↓ ↓ ↓ ↓
Auth Context Scope Permission Data
Every link in this chain needs verification.
Capability-Based Access
Instead of checking permissions at every tool call, issue capability tokens:
from dataclasses import dataclass
from datetime import datetime, timedelta
import jwt
@dataclass
class Capability:
resource: str # What resource (e.g., "files:/project/*")
actions: list[str] # What actions (e.g., ["read", "write"])
expires_at: datetime
constraints: dict # Additional limits
class CapabilityManager:
def __init__(self, secret_key: str):
self.secret_key = secret_key
def issue_capability(
self,
user_id: str,
resource: str,
actions: list[str],
ttl_minutes: int = 60,
constraints: dict = None
) -> str:
"""Issue a capability token for specific access."""
payload = {
"sub": user_id,
"resource": resource,
"actions": actions,
"constraints": constraints or {},
"exp": datetime.utcnow() + timedelta(minutes=ttl_minutes),
"iat": datetime.utcnow()
}
return jwt.encode(payload, self.secret_key, algorithm="HS256")
def verify_capability(self, token: str, resource: str, action: str) -> dict:
"""Verify a capability token for a specific action."""
try:
payload = jwt.decode(token, self.secret_key, algorithms=["HS256"])
# Check resource match (supports wildcards)
if not self.resource_matches(payload["resource"], resource):
return {"valid": False, "error": "Resource mismatch"}
# Check action allowed
if action not in payload["actions"]:
return {"valid": False, "error": f"Action '{action}' not permitted"}
return {"valid": True, "user_id": payload["sub"], "constraints": payload["constraints"]}
except jwt.ExpiredSignatureError:
return {"valid": False, "error": "Token expired"}
except jwt.InvalidTokenError as e:
return {"valid": False, "error": str(e)}
def resource_matches(self, pattern: str, resource: str) -> bool:
"""Check if resource matches pattern with wildcards."""
import fnmatch
return fnmatch.fnmatch(resource, pattern)
Scoped Agent Sessions
Create agent instances with limited scope:
class ScopedAgentSession:
def __init__(
self,
user_id: str,
capabilities: list[str],
tool_whitelist: list[str],
resource_paths: list[str]
):
self.user_id = user_id
self.capabilities = capabilities
self.tool_whitelist = tool_whitelist
self.resource_paths = resource_paths
self.session_id = str(uuid.uuid4())
self.created_at = datetime.now()
def can_use_tool(self, tool_name: str) -> bool:
"""Check if tool is in whitelist."""
return tool_name in self.tool_whitelist
def can_access_resource(self, path: str) -> bool:
"""Check if path is within allowed resources."""
return any(
path.startswith(allowed) for allowed in self.resource_paths
)
def get_filtered_tools(self, all_tools: list[dict]) -> list[dict]:
"""Return only tools this session can use."""
return [t for t in all_tools if t["name"] in self.tool_whitelist]
# Create a session for a specific user role
def create_developer_session(user_id: str) -> ScopedAgentSession:
return ScopedAgentSession(
user_id=user_id,
capabilities=["code:read", "code:write", "shell:execute"],
tool_whitelist=["read_file", "write_file", "run_command", "git_commit"],
resource_paths=["/home/projects/", "/tmp/"]
)
def create_readonly_session(user_id: str) -> ScopedAgentSession:
return ScopedAgentSession(
user_id=user_id,
capabilities=["code:read"],
tool_whitelist=["read_file", "search_files", "list_directory"],
resource_paths=["/home/projects/"]
)
Role-Based Tool Access
Map organizational roles to tool permissions:
ROLE_PERMISSIONS = {
"viewer": {
"tools": ["read_file", "search", "list_directory"],
"actions": ["read"],
"resource_patterns": ["*"]
},
"developer": {
"tools": ["read_file", "write_file", "run_tests", "git_*"],
"actions": ["read", "write", "execute"],
"resource_patterns": ["/projects/*", "/tmp/*"]
},
"admin": {
"tools": ["*"],
"actions": ["*"],
"resource_patterns": ["*"]
},
"ci_bot": {
"tools": ["run_tests", "build", "deploy_staging"],
"actions": ["read", "execute"],
"resource_patterns": ["/ci/*", "/builds/*"]
}
}
class RoleBasedAccess:
def __init__(self):
self.user_roles = {} # user_id -> role
def assign_role(self, user_id: str, role: str):
if role not in ROLE_PERMISSIONS:
raise ValueError(f"Unknown role: {role}")
self.user_roles[user_id] = role
def get_user_tools(self, user_id: str, all_tools: list[dict]) -> list[dict]:
"""Filter tools based on user's role."""
role = self.user_roles.get(user_id, "viewer")
permissions = ROLE_PERMISSIONS[role]
allowed_tools = []
for tool in all_tools:
if self.tool_matches(tool["name"], permissions["tools"]):
allowed_tools.append(tool)
return allowed_tools
def tool_matches(self, tool_name: str, patterns: list[str]) -> bool:
"""Check if tool matches any pattern (supports wildcards)."""
import fnmatch
return any(fnmatch.fnmatch(tool_name, p) for p in patterns)
Multi-Tenant Isolation
When serving multiple organizations:
class TenantIsolation:
def __init__(self):
self.tenant_configs = {}
def register_tenant(self, tenant_id: str, config: dict):
"""Register tenant with their specific configuration."""
self.tenant_configs[tenant_id] = {
"allowed_models": config.get("allowed_models", ["claude-sonnet-4-20250514"]),
"max_tokens_per_request": config.get("max_tokens", 4096),
"allowed_tools": config.get("tools", []),
"data_region": config.get("region", "us"),
"custom_system_prompt": config.get("system_prompt", "")
}
def get_tenant_agent_config(self, tenant_id: str) -> dict:
"""Get agent configuration for a specific tenant."""
if tenant_id not in self.tenant_configs:
raise ValueError(f"Unknown tenant: {tenant_id}")
config = self.tenant_configs[tenant_id]
return {
"model": config["allowed_models"][0],
"max_tokens": config["max_tokens_per_request"],
"tools": self.filter_tools(config["allowed_tools"]),
"system_prompt": config["custom_system_prompt"]
}
def validate_request(self, tenant_id: str, request: dict) -> tuple[bool, str]:
"""Validate a request against tenant policies."""
config = self.tenant_configs.get(tenant_id)
if not config:
return False, "Unknown tenant"
if request.get("model") not in config["allowed_models"]:
return False, f"Model not allowed for tenant"
if request.get("max_tokens", 0) > config["max_tokens_per_request"]:
return False, f"Token limit exceeded"
return True, ""
Audit Trail for Access
Log all access decisions:
class AccessAuditLog:
async def log_access(
self,
user_id: str,
session_id: str,
resource: str,
action: str,
granted: bool,
reason: str = None
):
"""Log every access decision for compliance."""
entry = {
"timestamp": datetime.now().isoformat(),
"user_id": user_id,
"session_id": session_id,
"resource": resource,
"action": action,
"granted": granted,
"reason": reason
}
# Write to immutable audit log
await self.write_to_audit_store(entry)
Nerd Note: The principle of least privilege means agents should have the minimum access needed for their task—no more. When in doubt, deny.
Next: Controlling costs at scale. :::