Production Security Patterns
Rate Limiting & Abuse Prevention
3 min read
Rate limiting protects your LLM application from abuse, controls costs, and ensures fair usage. This lesson covers implementation strategies and advanced abuse prevention techniques.
Why Rate Limiting Matters
┌─────────────────────────────────────────────────────────────┐
│ Rate Limiting Benefits │
│ │
│ Without Rate Limiting: │
│ • Single user can exhaust API budget │
│ • DoS attacks can take down service │
│ • Automated abuse goes unchecked │
│ • Unfair resource distribution │
│ │
│ With Rate Limiting: │
│ ✓ Predictable costs │
│ ✓ Service availability │
│ ✓ Fair usage across users │
│ ✓ Abuse mitigation │
└─────────────────────────────────────────────────────────────┘
Token Bucket Algorithm
import time
from dataclasses import dataclass
from typing import Dict, Optional
from threading import Lock
@dataclass
class Bucket:
tokens: float
last_update: float
class TokenBucketRateLimiter:
"""Token bucket rate limiter with burst support."""
def __init__(
self,
rate: float, # Tokens per second
capacity: float # Maximum burst capacity
):
self.rate = rate
self.capacity = capacity
self.buckets: Dict[str, Bucket] = {}
self.lock = Lock()
def _get_bucket(self, key: str) -> Bucket:
"""Get or create bucket for key."""
if key not in self.buckets:
self.buckets[key] = Bucket(
tokens=self.capacity,
last_update=time.time()
)
return self.buckets[key]
def _refill(self, bucket: Bucket) -> None:
"""Refill bucket based on time elapsed."""
now = time.time()
elapsed = now - bucket.last_update
bucket.tokens = min(
self.capacity,
bucket.tokens + elapsed * self.rate
)
bucket.last_update = now
def consume(self, key: str, tokens: float = 1.0) -> bool:
"""Try to consume tokens. Returns True if allowed."""
with self.lock:
bucket = self._get_bucket(key)
self._refill(bucket)
if bucket.tokens >= tokens:
bucket.tokens -= tokens
return True
return False
def get_wait_time(self, key: str, tokens: float = 1.0) -> float:
"""Get seconds until tokens are available."""
with self.lock:
bucket = self._get_bucket(key)
self._refill(bucket)
if bucket.tokens >= tokens:
return 0.0
needed = tokens - bucket.tokens
return needed / self.rate
# Usage
limiter = TokenBucketRateLimiter(
rate=1.0, # 1 token per second
capacity=10.0 # Allow burst of 10
)
user_id = "user123"
if limiter.consume(user_id):
print("Request allowed")
else:
wait = limiter.get_wait_time(user_id)
print(f"Rate limited. Retry in {wait:.1f}s")
Sliding Window Rate Limiter
from collections import deque
from datetime import datetime, timedelta
from typing import Dict, Deque
import time
class SlidingWindowLimiter:
"""Sliding window rate limiter for precise control."""
def __init__(
self,
max_requests: int,
window_seconds: int
):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests: Dict[str, Deque[float]] = {}
def is_allowed(self, key: str) -> bool:
"""Check if request is allowed."""
now = time.time()
window_start = now - self.window_seconds
# Initialize queue if needed
if key not in self.requests:
self.requests[key] = deque()
# Remove expired timestamps
while (
self.requests[key] and
self.requests[key][0] < window_start
):
self.requests[key].popleft()
# Check limit
if len(self.requests[key]) >= self.max_requests:
return False
# Record request
self.requests[key].append(now)
return True
def get_remaining(self, key: str) -> int:
"""Get remaining requests in current window."""
now = time.time()
window_start = now - self.window_seconds
if key not in self.requests:
return self.max_requests
# Clean expired
while (
self.requests[key] and
self.requests[key][0] < window_start
):
self.requests[key].popleft()
return max(0, self.max_requests - len(self.requests[key]))
# Usage
limiter = SlidingWindowLimiter(
max_requests=100,
window_seconds=60
)
if limiter.is_allowed("user123"):
remaining = limiter.get_remaining("user123")
print(f"Allowed. {remaining} requests remaining")
Multi-Tier Rate Limiting
from dataclasses import dataclass
from typing import Dict, List
from enum import Enum
class UserTier(Enum):
FREE = "free"
BASIC = "basic"
PRO = "pro"
ENTERPRISE = "enterprise"
@dataclass
class TierLimits:
requests_per_minute: int
requests_per_day: int
max_tokens_per_request: int
max_concurrent: int
class MultiTierLimiter:
"""Rate limiter with different tiers."""
TIER_LIMITS = {
UserTier.FREE: TierLimits(
requests_per_minute=10,
requests_per_day=100,
max_tokens_per_request=1000,
max_concurrent=1
),
UserTier.BASIC: TierLimits(
requests_per_minute=30,
requests_per_day=1000,
max_tokens_per_request=4000,
max_concurrent=3
),
UserTier.PRO: TierLimits(
requests_per_minute=100,
requests_per_day=10000,
max_tokens_per_request=8000,
max_concurrent=10
),
UserTier.ENTERPRISE: TierLimits(
requests_per_minute=500,
requests_per_day=100000,
max_tokens_per_request=32000,
max_concurrent=50
),
}
def __init__(self):
self.minute_limiter = SlidingWindowLimiter(1, 60)
self.day_limiter = SlidingWindowLimiter(1, 86400)
self.concurrent: Dict[str, int] = {}
def check_all_limits(
self,
user_id: str,
tier: UserTier,
token_count: int
) -> tuple[bool, Optional[str]]:
"""Check all rate limits for a request."""
limits = self.TIER_LIMITS[tier]
# Check tokens per request
if token_count > limits.max_tokens_per_request:
return False, f"Max {limits.max_tokens_per_request} tokens per request"
# Check concurrent requests
current_concurrent = self.concurrent.get(user_id, 0)
if current_concurrent >= limits.max_concurrent:
return False, f"Max {limits.max_concurrent} concurrent requests"
# Check minute limit
minute_key = f"{user_id}:minute"
# Temporarily update limiter max for this user
self.minute_limiter.max_requests = limits.requests_per_minute
if not self.minute_limiter.is_allowed(minute_key):
return False, f"Rate limit: {limits.requests_per_minute}/minute"
# Check daily limit
day_key = f"{user_id}:day"
self.day_limiter.max_requests = limits.requests_per_day
if not self.day_limiter.is_allowed(day_key):
return False, f"Daily limit: {limits.requests_per_day}/day"
return True, None
def start_request(self, user_id: str):
"""Mark request as started (for concurrent tracking)."""
self.concurrent[user_id] = self.concurrent.get(user_id, 0) + 1
def end_request(self, user_id: str):
"""Mark request as completed."""
if user_id in self.concurrent:
self.concurrent[user_id] = max(0, self.concurrent[user_id] - 1)
# Usage
limiter = MultiTierLimiter()
allowed, error = limiter.check_all_limits(
user_id="user123",
tier=UserTier.BASIC,
token_count=2000
)
if allowed:
limiter.start_request("user123")
try:
# Process request
pass
finally:
limiter.end_request("user123")
else:
print(f"Rate limited: {error}")
Abuse Prevention
from collections import defaultdict
from datetime import datetime, timedelta
from typing import Set
class AbusePreventor:
"""Detect and prevent abuse patterns."""
def __init__(self):
self.blocked_ips: Set[str] = set()
self.suspicious_users: Dict[str, int] = defaultdict(int)
self.failed_attempts: Dict[str, List[datetime]] = defaultdict(list)
def check_ip(self, ip_address: str) -> bool:
"""Check if IP is blocked."""
return ip_address not in self.blocked_ips
def record_suspicious_activity(
self,
user_id: str,
activity_type: str
):
"""Record suspicious activity."""
self.suspicious_users[user_id] += 1
# Auto-block after threshold
if self.suspicious_users[user_id] >= 10:
self._escalate_user(user_id)
def record_failed_attempt(
self,
identifier: str,
attempt_type: str
):
"""Record failed attempt (auth, validation, etc.)."""
now = datetime.utcnow()
self.failed_attempts[identifier].append(now)
# Clean old attempts
cutoff = now - timedelta(hours=1)
self.failed_attempts[identifier] = [
t for t in self.failed_attempts[identifier]
if t > cutoff
]
# Check for brute force
if len(self.failed_attempts[identifier]) > 50:
self._handle_brute_force(identifier)
def _escalate_user(self, user_id: str):
"""Handle user that exceeded suspicious threshold."""
print(f"[ALERT] User {user_id} flagged for review")
# In production: notify security team, temporarily restrict
def _handle_brute_force(self, identifier: str):
"""Handle detected brute force attack."""
print(f"[BLOCK] Brute force detected from {identifier}")
self.blocked_ips.add(identifier)
class CostProtector:
"""Protect against cost explosion."""
def __init__(self, daily_budget: float):
self.daily_budget = daily_budget
self.daily_spend: Dict[str, float] = defaultdict(float)
self.last_reset = datetime.utcnow().date()
def _check_reset(self):
"""Reset daily counters if new day."""
today = datetime.utcnow().date()
if today > self.last_reset:
self.daily_spend.clear()
self.last_reset = today
def check_budget(self, user_id: str, estimated_cost: float) -> bool:
"""Check if request fits within budget."""
self._check_reset()
projected = self.daily_spend[user_id] + estimated_cost
return projected <= self.daily_budget
def record_spend(self, user_id: str, cost: float):
"""Record actual spend."""
self._check_reset()
self.daily_spend[user_id] += cost
def get_remaining_budget(self, user_id: str) -> float:
"""Get remaining daily budget for user."""
self._check_reset()
return max(0, self.daily_budget - self.daily_spend[user_id])
# Combined protection
class ProtectionMiddleware:
"""Combined rate limiting and abuse prevention."""
def __init__(self):
self.rate_limiter = MultiTierLimiter()
self.abuse_preventer = AbusePreventor()
self.cost_protector = CostProtector(daily_budget=10.0)
def check_request(
self,
user_id: str,
ip_address: str,
tier: UserTier,
token_count: int,
estimated_cost: float
) -> tuple[bool, Optional[str]]:
"""Run all protection checks."""
# Check IP block
if not self.abuse_preventer.check_ip(ip_address):
return False, "IP blocked"
# Check rate limits
allowed, error = self.rate_limiter.check_all_limits(
user_id, tier, token_count
)
if not allowed:
return False, error
# Check budget
if not self.cost_protector.check_budget(user_id, estimated_cost):
return False, "Daily budget exceeded"
return True, None
Key Takeaway: Implement multi-layer rate limiting that considers requests per minute, daily limits, concurrent connections, and cost budgets. Combine with abuse detection for comprehensive protection. :::