Incident Response for Prompt Leaks
Detecting Prompt Security Incidents
5 min read
Most prompt leaks aren't detected until the damage is done. This lesson covers detection mechanisms that catch incidents early.
Detection Mechanisms
1. Canary Token Monitoring
import hashlib
import logging
from datetime import datetime
class CanaryMonitor:
def __init__(self, canary_registry: dict):
self.canary_registry = canary_registry # token -> metadata
self.alerts = []
def check_output(self, session_id: str, output: str) -> bool:
"""Check if any canary tokens appear in output."""
for token, metadata in self.canary_registry.items():
if token in output:
self._trigger_alert(session_id, token, metadata, output)
return True
return False
def _trigger_alert(self, session_id: str, token: str, metadata: dict, output: str):
alert = {
"timestamp": datetime.utcnow().isoformat(),
"type": "canary_leaked",
"severity": "critical",
"session_id": session_id,
"canary_type": metadata.get("type", "unknown"),
"canary_location": metadata.get("location", "unknown"),
"output_snippet": self._extract_context(output, token),
}
self.alerts.append(alert)
self._notify_security_team(alert)
def _extract_context(self, output: str, token: str) -> str:
"""Extract surrounding context for investigation."""
idx = output.find(token)
start = max(0, idx - 100)
end = min(len(output), idx + len(token) + 100)
return output[start:end]
def _notify_security_team(self, alert: dict):
logging.critical(f"CANARY LEAK DETECTED: {alert}")
# Send to PagerDuty, Slack, etc.
2. Anomaly Detection
from collections import defaultdict
from datetime import datetime, timedelta
class AnomalyDetector:
def __init__(self):
self.user_patterns = defaultdict(list)
self.global_patterns = []
self.thresholds = {
"extraction_keywords_per_session": 5,
"failed_requests_per_minute": 10,
"unusual_query_length": 5000,
"rapid_fire_queries": 20, # queries per minute
}
def analyze_request(self, user_id: str, query: str, response: str) -> list:
"""Analyze request for anomalous patterns."""
anomalies = []
# Check for extraction keyword density
extraction_keywords = [
"system prompt", "instructions", "reveal", "show me",
"configuration", "ignore previous", "forget",
]
keyword_count = sum(1 for kw in extraction_keywords if kw in query.lower())
if keyword_count >= 2:
anomalies.append({
"type": "high_extraction_keyword_density",
"severity": "medium",
"details": f"Found {keyword_count} extraction keywords"
})
# Check for unusual query length
if len(query) > self.thresholds["unusual_query_length"]:
anomalies.append({
"type": "unusual_query_length",
"severity": "low",
"details": f"Query length: {len(query)} chars"
})
# Check for rapid fire queries
recent_requests = self._get_recent_requests(user_id, minutes=1)
if len(recent_requests) > self.thresholds["rapid_fire_queries"]:
anomalies.append({
"type": "rapid_fire_queries",
"severity": "high",
"details": f"{len(recent_requests)} queries in last minute"
})
# Check for escalation pattern
if self._detect_escalation_pattern(user_id, query):
anomalies.append({
"type": "escalation_pattern",
"severity": "high",
"details": "Detected gradual escalation toward sensitive topics"
})
self._record_request(user_id, query, response, anomalies)
return anomalies
def _detect_escalation_pattern(self, user_id: str, current_query: str) -> bool:
"""Detect Crescendo-style escalation attacks."""
history = self.user_patterns.get(user_id, [])
if len(history) < 3:
return False
# Check if queries are getting progressively more sensitive
sensitivity_scores = []
sensitive_terms = ["prompt", "instructions", "system", "secret", "reveal"]
for entry in history[-5:]:
score = sum(1 for term in sensitive_terms if term in entry["query"].lower())
sensitivity_scores.append(score)
# Check for increasing pattern
if len(sensitivity_scores) >= 3:
return all(
sensitivity_scores[i] <= sensitivity_scores[i+1]
for i in range(len(sensitivity_scores) - 1)
) and sensitivity_scores[-1] >= 2
return False
3. Output Pattern Matching
class OutputMonitor:
def __init__(self):
self.leak_patterns = [
# System prompt fragments
r"##\s*(SYSTEM|PRIORITY|CRITICAL|IMMUTABLE)",
r"<system>.*</system>",
r"(CANARY|SECURITY_TOKEN):\s*[a-zA-Z0-9]+",
# Internal structure exposure
r"(api|endpoint):\s*https?://",
r"(database|db):\s*\w+://",
r"(key|token|secret):\s*[\w\-]+",
# Configuration exposure
r"model:\s*(claude|gpt|gemini)",
r"temperature:\s*[\d.]+",
r"max_tokens:\s*\d+",
]
def check_output(self, output: str) -> list:
"""Check output for potential leakage patterns."""
matches = []
for pattern in self.leak_patterns:
import re
if re.search(pattern, output, re.IGNORECASE):
matches.append({
"pattern": pattern,
"severity": "high" if "CANARY" in pattern or "secret" in pattern else "medium"
})
return matches
4. External Monitoring
class ExternalLeakMonitor:
"""Monitor external sources for leaked prompts."""
def __init__(self, company_identifiers: list):
self.identifiers = company_identifiers
self.sources = [
"github.com",
"gitlab.com",
"pastebin.com",
"reddit.com",
"twitter.com",
"hacker news",
]
async def scan_for_leaks(self) -> list:
"""Scan external sources for company prompt leaks."""
findings = []
# Search code repositories
github_results = await self._search_github()
findings.extend(github_results)
# Search social media mentions
social_results = await self._search_social_media()
findings.extend(social_results)
# Search paste sites
paste_results = await self._search_paste_sites()
findings.extend(paste_results)
return findings
async def _search_github(self) -> list:
"""Search GitHub for leaked prompts."""
queries = [
f'"{identifier}" "system prompt"'
for identifier in self.identifiers
]
# Use GitHub API to search
findings = []
for query in queries:
# results = await github_api.search_code(query)
# Process results...
pass
return findings
Alert Severity Matrix
| Detection Type | Severity | Response Time | Escalation |
|---|---|---|---|
| Canary token in output | Critical | Immediate | Auto-block + Page on-call |
| Extraction keyword burst | High | < 5 min | Security team Slack |
| Escalation pattern | High | < 5 min | Security team Slack |
| Unusual query length | Low | < 1 hour | Log only |
| Rapid fire queries | Medium | < 15 min | Rate limit + Alert |
| External leak found | Critical | Immediate | Exec + Legal + PR |
Detection Dashboard
# Grafana dashboard configuration
dashboard:
title: "AI Security Monitoring"
panels:
- title: "Canary Leaks (24h)"
type: stat
query: |
sum(increase(canary_leaks_total[24h]))
thresholds:
- value: 0
color: green
- value: 1
color: red
- title: "Extraction Attempts by Type"
type: piechart
query: |
sum by (type) (extraction_attempts_total)
- title: "Anomaly Score Over Time"
type: timeseries
query: |
avg(anomaly_score) by (user_id)
- title: "Rate Limit Triggers"
type: timeseries
query: |
sum(rate(rate_limit_triggers[5m]))
Automated Response Rules
DETECTION_RULES = {
"canary_leaked": {
"severity": "critical",
"auto_actions": [
"block_session",
"page_oncall",
"create_incident",
"capture_forensics",
],
},
"escalation_pattern": {
"severity": "high",
"auto_actions": [
"elevate_monitoring",
"notify_security",
"capture_session_log",
],
},
"rapid_fire_queries": {
"severity": "medium",
"auto_actions": [
"apply_rate_limit",
"notify_security",
],
},
}
async def handle_detection(detection: dict):
"""Execute automated responses based on detection type."""
rules = DETECTION_RULES.get(detection["type"], {})
for action in rules.get("auto_actions", []):
await execute_action(action, detection)
Key Insight: Detection is only valuable if it leads to action. Every detection mechanism should have a defined response. The goal isn't just to know about incidents—it's to stop them before damage spreads.
Next: Containing and assessing prompt security incidents. :::