Lesson 16 of 18

Incident Response for Prompt Leaks

Detecting Prompt Security Incidents

5 min read

Most prompt leaks aren't detected until the damage is done. This lesson covers detection mechanisms that catch incidents early.

Detection Mechanisms

1. Canary Token Monitoring

import hashlib
import logging
from datetime import datetime

class CanaryMonitor:
    def __init__(self, canary_registry: dict):
        self.canary_registry = canary_registry  # token -> metadata
        self.alerts = []

    def check_output(self, session_id: str, output: str) -> bool:
        """Check if any canary tokens appear in output."""
        for token, metadata in self.canary_registry.items():
            if token in output:
                self._trigger_alert(session_id, token, metadata, output)
                return True
        return False

    def _trigger_alert(self, session_id: str, token: str, metadata: dict, output: str):
        alert = {
            "timestamp": datetime.utcnow().isoformat(),
            "type": "canary_leaked",
            "severity": "critical",
            "session_id": session_id,
            "canary_type": metadata.get("type", "unknown"),
            "canary_location": metadata.get("location", "unknown"),
            "output_snippet": self._extract_context(output, token),
        }

        self.alerts.append(alert)
        self._notify_security_team(alert)

    def _extract_context(self, output: str, token: str) -> str:
        """Extract surrounding context for investigation."""
        idx = output.find(token)
        start = max(0, idx - 100)
        end = min(len(output), idx + len(token) + 100)
        return output[start:end]

    def _notify_security_team(self, alert: dict):
        logging.critical(f"CANARY LEAK DETECTED: {alert}")
        # Send to PagerDuty, Slack, etc.

2. Anomaly Detection

from collections import defaultdict
from datetime import datetime, timedelta

class AnomalyDetector:
    def __init__(self):
        self.user_patterns = defaultdict(list)
        self.global_patterns = []
        self.thresholds = {
            "extraction_keywords_per_session": 5,
            "failed_requests_per_minute": 10,
            "unusual_query_length": 5000,
            "rapid_fire_queries": 20,  # queries per minute
        }

    def analyze_request(self, user_id: str, query: str, response: str) -> list:
        """Analyze request for anomalous patterns."""
        anomalies = []

        # Check for extraction keyword density
        extraction_keywords = [
            "system prompt", "instructions", "reveal", "show me",
            "configuration", "ignore previous", "forget",
        ]
        keyword_count = sum(1 for kw in extraction_keywords if kw in query.lower())
        if keyword_count >= 2:
            anomalies.append({
                "type": "high_extraction_keyword_density",
                "severity": "medium",
                "details": f"Found {keyword_count} extraction keywords"
            })

        # Check for unusual query length
        if len(query) > self.thresholds["unusual_query_length"]:
            anomalies.append({
                "type": "unusual_query_length",
                "severity": "low",
                "details": f"Query length: {len(query)} chars"
            })

        # Check for rapid fire queries
        recent_requests = self._get_recent_requests(user_id, minutes=1)
        if len(recent_requests) > self.thresholds["rapid_fire_queries"]:
            anomalies.append({
                "type": "rapid_fire_queries",
                "severity": "high",
                "details": f"{len(recent_requests)} queries in last minute"
            })

        # Check for escalation pattern
        if self._detect_escalation_pattern(user_id, query):
            anomalies.append({
                "type": "escalation_pattern",
                "severity": "high",
                "details": "Detected gradual escalation toward sensitive topics"
            })

        self._record_request(user_id, query, response, anomalies)
        return anomalies

    def _detect_escalation_pattern(self, user_id: str, current_query: str) -> bool:
        """Detect Crescendo-style escalation attacks."""
        history = self.user_patterns.get(user_id, [])
        if len(history) < 3:
            return False

        # Check if queries are getting progressively more sensitive
        sensitivity_scores = []
        sensitive_terms = ["prompt", "instructions", "system", "secret", "reveal"]

        for entry in history[-5:]:
            score = sum(1 for term in sensitive_terms if term in entry["query"].lower())
            sensitivity_scores.append(score)

        # Check for increasing pattern
        if len(sensitivity_scores) >= 3:
            return all(
                sensitivity_scores[i] <= sensitivity_scores[i+1]
                for i in range(len(sensitivity_scores) - 1)
            ) and sensitivity_scores[-1] >= 2

        return False

3. Output Pattern Matching

class OutputMonitor:
    def __init__(self):
        self.leak_patterns = [
            # System prompt fragments
            r"##\s*(SYSTEM|PRIORITY|CRITICAL|IMMUTABLE)",
            r"<system>.*</system>",
            r"(CANARY|SECURITY_TOKEN):\s*[a-zA-Z0-9]+",

            # Internal structure exposure
            r"(api|endpoint):\s*https?://",
            r"(database|db):\s*\w+://",
            r"(key|token|secret):\s*[\w\-]+",

            # Configuration exposure
            r"model:\s*(claude|gpt|gemini)",
            r"temperature:\s*[\d.]+",
            r"max_tokens:\s*\d+",
        ]

    def check_output(self, output: str) -> list:
        """Check output for potential leakage patterns."""
        matches = []

        for pattern in self.leak_patterns:
            import re
            if re.search(pattern, output, re.IGNORECASE):
                matches.append({
                    "pattern": pattern,
                    "severity": "high" if "CANARY" in pattern or "secret" in pattern else "medium"
                })

        return matches

4. External Monitoring

class ExternalLeakMonitor:
    """Monitor external sources for leaked prompts."""

    def __init__(self, company_identifiers: list):
        self.identifiers = company_identifiers
        self.sources = [
            "github.com",
            "gitlab.com",
            "pastebin.com",
            "reddit.com",
            "twitter.com",
            "hacker news",
        ]

    async def scan_for_leaks(self) -> list:
        """Scan external sources for company prompt leaks."""
        findings = []

        # Search code repositories
        github_results = await self._search_github()
        findings.extend(github_results)

        # Search social media mentions
        social_results = await self._search_social_media()
        findings.extend(social_results)

        # Search paste sites
        paste_results = await self._search_paste_sites()
        findings.extend(paste_results)

        return findings

    async def _search_github(self) -> list:
        """Search GitHub for leaked prompts."""
        queries = [
            f'"{identifier}" "system prompt"'
            for identifier in self.identifiers
        ]

        # Use GitHub API to search
        findings = []
        for query in queries:
            # results = await github_api.search_code(query)
            # Process results...
            pass

        return findings

Alert Severity Matrix

Detection Type Severity Response Time Escalation
Canary token in output Critical Immediate Auto-block + Page on-call
Extraction keyword burst High < 5 min Security team Slack
Escalation pattern High < 5 min Security team Slack
Unusual query length Low < 1 hour Log only
Rapid fire queries Medium < 15 min Rate limit + Alert
External leak found Critical Immediate Exec + Legal + PR

Detection Dashboard

# Grafana dashboard configuration
dashboard:
  title: "AI Security Monitoring"

  panels:
    - title: "Canary Leaks (24h)"
      type: stat
      query: |
        sum(increase(canary_leaks_total[24h]))
      thresholds:
        - value: 0
          color: green
        - value: 1
          color: red

    - title: "Extraction Attempts by Type"
      type: piechart
      query: |
        sum by (type) (extraction_attempts_total)

    - title: "Anomaly Score Over Time"
      type: timeseries
      query: |
        avg(anomaly_score) by (user_id)

    - title: "Rate Limit Triggers"
      type: timeseries
      query: |
        sum(rate(rate_limit_triggers[5m]))

Automated Response Rules

DETECTION_RULES = {
    "canary_leaked": {
        "severity": "critical",
        "auto_actions": [
            "block_session",
            "page_oncall",
            "create_incident",
            "capture_forensics",
        ],
    },
    "escalation_pattern": {
        "severity": "high",
        "auto_actions": [
            "elevate_monitoring",
            "notify_security",
            "capture_session_log",
        ],
    },
    "rapid_fire_queries": {
        "severity": "medium",
        "auto_actions": [
            "apply_rate_limit",
            "notify_security",
        ],
    },
}

async def handle_detection(detection: dict):
    """Execute automated responses based on detection type."""
    rules = DETECTION_RULES.get(detection["type"], {})

    for action in rules.get("auto_actions", []):
        await execute_action(action, detection)

Key Insight: Detection is only valuable if it leads to action. Every detection mechanism should have a defined response. The goal isn't just to know about incidents—it's to stop them before damage spreads.

Next: Containing and assessing prompt security incidents. :::

Quiz

Module 6: Incident Response for Prompt Leaks

Take Quiz