Lesson 17 of 18

Incident Response for Prompt Leaks

Containment and Impact Assessment

5 min read

When a prompt security incident is detected, immediate containment prevents further damage while you assess the scope.

Immediate Containment Actions

1. Session Termination

class IncidentContainment:
    async def contain_session(self, incident: dict):
        """Immediately contain an active incident."""

        session_id = incident.get("session_id")
        user_id = incident.get("user_id")

        # 1. Terminate active session
        await self.session_manager.terminate(session_id)

        # 2. Revoke user's active tokens
        await self.auth_service.revoke_tokens(user_id)

        # 3. Block future requests from this user temporarily
        await self.rate_limiter.block_user(
            user_id,
            duration_minutes=60,
            reason="Security incident investigation"
        )

        # 4. Capture forensic data before session cleanup
        forensics = await self.capture_forensics(session_id)

        return {
            "session_terminated": True,
            "user_blocked": True,
            "forensics_captured": forensics is not None,
        }

    async def capture_forensics(self, session_id: str) -> dict:
        """Capture all relevant data for investigation."""
        return {
            "session_log": await self.get_full_session_log(session_id),
            "user_history": await self.get_user_request_history(),
            "ip_address": await self.get_session_ip(session_id),
            "user_agent": await self.get_session_user_agent(session_id),
            "timestamp": datetime.utcnow().isoformat(),
        }

2. Prompt Rotation

class PromptRotation:
    def __init__(self, prompt_registry: dict):
        self.registry = prompt_registry
        self.rotation_history = []

    async def emergency_rotate(self, compromised_prompt_id: str):
        """Immediately rotate a compromised prompt."""

        # 1. Get backup prompt
        backup = self.get_backup_prompt(compromised_prompt_id)
        if not backup:
            backup = self.get_fallback_prompt()

        # 2. Deploy backup
        await self.deploy_prompt(backup)

        # 3. Invalidate old prompt
        await self.invalidate_prompt(compromised_prompt_id)

        # 4. Rotate canary tokens
        new_canary = self.generate_new_canary()
        await self.update_canary(new_canary)

        # 5. Log rotation
        self.rotation_history.append({
            "timestamp": datetime.utcnow(),
            "old_prompt_id": compromised_prompt_id,
            "new_prompt_id": backup["id"],
            "reason": "security_incident",
        })

        return backup

    def get_backup_prompt(self, prompt_id: str) -> dict:
        """Get pre-configured backup prompt."""
        backup_id = f"{prompt_id}_backup"
        return self.registry.get(backup_id)

    def get_fallback_prompt(self) -> dict:
        """Minimal safe fallback prompt."""
        return {
            "id": "fallback_minimal",
            "content": """
You are a helpful assistant. Due to temporary maintenance,
some features may be limited. Please contact support if
you need assistance with specific functionality.

You cannot:
- Access external systems
- Process sensitive data
- Execute any commands
""",
            "canary": self.generate_new_canary(),
        }

3. Service Degradation

class ServiceDegradation:
    """Gracefully degrade service during incidents."""

    DEGRADATION_LEVELS = {
        "normal": {
            "features": "all",
            "rate_limit": 100,
            "model": "claude-sonnet-4-5-20250929",
        },
        "elevated": {
            "features": "reduced",
            "rate_limit": 20,
            "model": "claude-sonnet-4-5-20250929",
        },
        "critical": {
            "features": "minimal",
            "rate_limit": 5,
            "model": "claude-haiku-4-20261001",  # Faster, more constrained
        },
        "lockdown": {
            "features": "none",
            "rate_limit": 0,
            "model": None,  # No AI access
        },
    }

    async def set_degradation_level(self, level: str, reason: str):
        """Set service degradation level."""
        config = self.DEGRADATION_LEVELS.get(level)
        if not config:
            raise ValueError(f"Unknown degradation level: {level}")

        await self.apply_config(config)
        await self.notify_users(level, reason)
        await self.log_degradation(level, reason)

    async def notify_users(self, level: str, reason: str):
        """Notify users of service changes."""
        if level in ["critical", "lockdown"]:
            # Show maintenance banner
            await self.set_status_page("incident", reason)

Impact Assessment Framework

Severity Classification

Severity Definition Examples
P0 - Critical Immediate business impact, data breach Full prompt leaked publicly, customer data exposed
P1 - High Significant security risk, limited exposure Prompt leaked to single user, partial extraction
P2 - Medium Security concern, no immediate risk Extraction attempt detected but blocked
P3 - Low Minor issue, informational Unusual query patterns, false positive

Assessment Checklist

class ImpactAssessment:
    def assess(self, incident: dict) -> dict:
        """Comprehensive impact assessment."""

        assessment = {
            "incident_id": incident["id"],
            "timestamp": datetime.utcnow().isoformat(),
            "severity": None,
            "exposure": {},
            "business_impact": {},
            "recommendations": [],
        }

        # 1. What was exposed?
        assessment["exposure"] = self.assess_exposure(incident)

        # 2. Who was affected?
        assessment["affected_parties"] = self.assess_affected_parties(incident)

        # 3. What's the business impact?
        assessment["business_impact"] = self.assess_business_impact(incident)

        # 4. Calculate severity
        assessment["severity"] = self.calculate_severity(assessment)

        # 5. Generate recommendations
        assessment["recommendations"] = self.generate_recommendations(assessment)

        return assessment

    def assess_exposure(self, incident: dict) -> dict:
        """Determine what information was exposed."""
        exposed = {
            "system_prompt": False,
            "partial_prompt": False,
            "canary_tokens": False,
            "internal_apis": False,
            "customer_data": False,
            "credentials": False,
        }

        leaked_content = incident.get("leaked_content", "")

        # Check for each type of exposure
        if "SYSTEM" in leaked_content or "instructions" in leaked_content.lower():
            exposed["partial_prompt"] = True

        if incident.get("canary_leaked"):
            exposed["canary_tokens"] = True

        if "api" in leaked_content.lower() or "endpoint" in leaked_content.lower():
            exposed["internal_apis"] = True

        # ... additional checks

        return exposed

    def calculate_severity(self, assessment: dict) -> str:
        """Calculate incident severity based on exposure and impact."""
        exposure = assessment["exposure"]
        impact = assessment["business_impact"]

        if exposure.get("credentials") or exposure.get("customer_data"):
            return "P0"

        if exposure.get("system_prompt") and impact.get("external_visibility"):
            return "P0"

        if exposure.get("partial_prompt") or exposure.get("canary_tokens"):
            return "P1"

        if exposure.get("internal_apis"):
            return "P2"

        return "P3"

Investigation Queries

-- Find all related sessions
SELECT
    session_id,
    user_id,
    request_timestamp,
    query_text,
    response_text,
    security_flags
FROM ai_requests
WHERE user_id = :suspect_user_id
  AND request_timestamp BETWEEN :incident_time - INTERVAL '1 hour'
                             AND :incident_time + INTERVAL '10 minutes'
ORDER BY request_timestamp;

-- Check for similar attack patterns from other users
SELECT
    user_id,
    COUNT(*) as suspicious_requests,
    array_agg(DISTINCT security_flags) as flags
FROM ai_requests
WHERE request_timestamp > :incident_time - INTERVAL '24 hours'
  AND security_flags @> ARRAY['extraction_attempt']
GROUP BY user_id
HAVING COUNT(*) > 3;

-- Find if canary appeared elsewhere
SELECT
    session_id,
    user_id,
    request_timestamp,
    LEFT(response_text, 200) as response_preview
FROM ai_requests
WHERE response_text LIKE '%' || :canary_token || '%'
ORDER BY request_timestamp;

Communication Templates

Internal Escalation

# Security Incident Alert

**Severity:** P1 - High
**Time Detected:** 2026-01-06 14:23 UTC
**Status:** Containment in progress

## Summary
Canary token detected in AI assistant output, indicating potential
system prompt extraction.

## Immediate Actions Taken
- [ ] User session terminated
- [ ] User temporarily blocked
- [ ] Forensic data captured
- [ ] Prompt rotation initiated

## Impact Assessment (Preliminary)
- Exposure: Partial system prompt
- Affected Users: 1 confirmed
- External Visibility: Unknown (investigating)

## Next Steps
1. Complete forensic analysis
2. Determine full scope of exposure
3. Assess need for customer notification
4. Update defenses

## Assigned To
Primary: @security-oncall
Secondary: @ai-platform-lead

User Communication (if needed)

# Service Notification

We detected unusual activity in your account and have
temporarily limited access as a precautionary measure.

This is not related to any action on your part—we're
conducting routine security maintenance.

Your access will be restored within [timeframe].
If you have questions, please contact support@company.com.

We apologize for any inconvenience.

Key Insight: The first hour after detection determines the incident's ultimate impact. Have containment procedures ready BEFORE you need them. Practice your response—the worst time to figure out your incident process is during an actual incident.

Next: Recovery and post-incident improvements. :::

Quiz

Module 6: Incident Response for Prompt Leaks

Take Quiz