Production Security Patterns

Rate Limiting & Abuse Prevention

3 min read

Rate limiting protects your LLM application from abuse, controls costs, and ensures fair usage. This lesson covers implementation strategies and advanced abuse prevention techniques.

Why Rate Limiting Matters

┌─────────────────────────────────────────────────────────────┐
│                    Rate Limiting Benefits                    │
│                                                             │
│   Without Rate Limiting:                                    │
│   • Single user can exhaust API budget                      │
│   • DoS attacks can take down service                       │
│   • Automated abuse goes unchecked                          │
│   • Unfair resource distribution                            │
│                                                             │
│   With Rate Limiting:                                       │
│   ✓ Predictable costs                                       │
│   ✓ Service availability                                   │
│   ✓ Fair usage across users                                │
│   ✓ Abuse mitigation                                       │
└─────────────────────────────────────────────────────────────┘

Token Bucket Algorithm

import time
from dataclasses import dataclass
from typing import Dict, Optional
from threading import Lock

@dataclass
class Bucket:
    tokens: float
    last_update: float

class TokenBucketRateLimiter:
    """Token bucket rate limiter with burst support."""

    def __init__(
        self,
        rate: float,  # Tokens per second
        capacity: float  # Maximum burst capacity
    ):
        self.rate = rate
        self.capacity = capacity
        self.buckets: Dict[str, Bucket] = {}
        self.lock = Lock()

    def _get_bucket(self, key: str) -> Bucket:
        """Get or create bucket for key."""
        if key not in self.buckets:
            self.buckets[key] = Bucket(
                tokens=self.capacity,
                last_update=time.time()
            )
        return self.buckets[key]

    def _refill(self, bucket: Bucket) -> None:
        """Refill bucket based on time elapsed."""
        now = time.time()
        elapsed = now - bucket.last_update
        bucket.tokens = min(
            self.capacity,
            bucket.tokens + elapsed * self.rate
        )
        bucket.last_update = now

    def consume(self, key: str, tokens: float = 1.0) -> bool:
        """Try to consume tokens. Returns True if allowed."""
        with self.lock:
            bucket = self._get_bucket(key)
            self._refill(bucket)

            if bucket.tokens >= tokens:
                bucket.tokens -= tokens
                return True
            return False

    def get_wait_time(self, key: str, tokens: float = 1.0) -> float:
        """Get seconds until tokens are available."""
        with self.lock:
            bucket = self._get_bucket(key)
            self._refill(bucket)

            if bucket.tokens >= tokens:
                return 0.0

            needed = tokens - bucket.tokens
            return needed / self.rate

# Usage
limiter = TokenBucketRateLimiter(
    rate=1.0,  # 1 token per second
    capacity=10.0  # Allow burst of 10
)

user_id = "user123"
if limiter.consume(user_id):
    print("Request allowed")
else:
    wait = limiter.get_wait_time(user_id)
    print(f"Rate limited. Retry in {wait:.1f}s")

Sliding Window Rate Limiter

from collections import deque
from datetime import datetime, timedelta
from typing import Dict, Deque
import time

class SlidingWindowLimiter:
    """Sliding window rate limiter for precise control."""

    def __init__(
        self,
        max_requests: int,
        window_seconds: int
    ):
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        self.requests: Dict[str, Deque[float]] = {}

    def is_allowed(self, key: str) -> bool:
        """Check if request is allowed."""
        now = time.time()
        window_start = now - self.window_seconds

        # Initialize queue if needed
        if key not in self.requests:
            self.requests[key] = deque()

        # Remove expired timestamps
        while (
            self.requests[key] and
            self.requests[key][0] < window_start
        ):
            self.requests[key].popleft()

        # Check limit
        if len(self.requests[key]) >= self.max_requests:
            return False

        # Record request
        self.requests[key].append(now)
        return True

    def get_remaining(self, key: str) -> int:
        """Get remaining requests in current window."""
        now = time.time()
        window_start = now - self.window_seconds

        if key not in self.requests:
            return self.max_requests

        # Clean expired
        while (
            self.requests[key] and
            self.requests[key][0] < window_start
        ):
            self.requests[key].popleft()

        return max(0, self.max_requests - len(self.requests[key]))

# Usage
limiter = SlidingWindowLimiter(
    max_requests=100,
    window_seconds=60
)

if limiter.is_allowed("user123"):
    remaining = limiter.get_remaining("user123")
    print(f"Allowed. {remaining} requests remaining")

Multi-Tier Rate Limiting

from dataclasses import dataclass
from typing import Dict, List
from enum import Enum

class UserTier(Enum):
    FREE = "free"
    BASIC = "basic"
    PRO = "pro"
    ENTERPRISE = "enterprise"

@dataclass
class TierLimits:
    requests_per_minute: int
    requests_per_day: int
    max_tokens_per_request: int
    max_concurrent: int

class MultiTierLimiter:
    """Rate limiter with different tiers."""

    TIER_LIMITS = {
        UserTier.FREE: TierLimits(
            requests_per_minute=10,
            requests_per_day=100,
            max_tokens_per_request=1000,
            max_concurrent=1
        ),
        UserTier.BASIC: TierLimits(
            requests_per_minute=30,
            requests_per_day=1000,
            max_tokens_per_request=4000,
            max_concurrent=3
        ),
        UserTier.PRO: TierLimits(
            requests_per_minute=100,
            requests_per_day=10000,
            max_tokens_per_request=8000,
            max_concurrent=10
        ),
        UserTier.ENTERPRISE: TierLimits(
            requests_per_minute=500,
            requests_per_day=100000,
            max_tokens_per_request=32000,
            max_concurrent=50
        ),
    }

    def __init__(self):
        self.minute_limiter = SlidingWindowLimiter(1, 60)
        self.day_limiter = SlidingWindowLimiter(1, 86400)
        self.concurrent: Dict[str, int] = {}

    def check_all_limits(
        self,
        user_id: str,
        tier: UserTier,
        token_count: int
    ) -> tuple[bool, Optional[str]]:
        """Check all rate limits for a request."""
        limits = self.TIER_LIMITS[tier]

        # Check tokens per request
        if token_count > limits.max_tokens_per_request:
            return False, f"Max {limits.max_tokens_per_request} tokens per request"

        # Check concurrent requests
        current_concurrent = self.concurrent.get(user_id, 0)
        if current_concurrent >= limits.max_concurrent:
            return False, f"Max {limits.max_concurrent} concurrent requests"

        # Check minute limit
        minute_key = f"{user_id}:minute"
        # Temporarily update limiter max for this user
        self.minute_limiter.max_requests = limits.requests_per_minute
        if not self.minute_limiter.is_allowed(minute_key):
            return False, f"Rate limit: {limits.requests_per_minute}/minute"

        # Check daily limit
        day_key = f"{user_id}:day"
        self.day_limiter.max_requests = limits.requests_per_day
        if not self.day_limiter.is_allowed(day_key):
            return False, f"Daily limit: {limits.requests_per_day}/day"

        return True, None

    def start_request(self, user_id: str):
        """Mark request as started (for concurrent tracking)."""
        self.concurrent[user_id] = self.concurrent.get(user_id, 0) + 1

    def end_request(self, user_id: str):
        """Mark request as completed."""
        if user_id in self.concurrent:
            self.concurrent[user_id] = max(0, self.concurrent[user_id] - 1)

# Usage
limiter = MultiTierLimiter()

allowed, error = limiter.check_all_limits(
    user_id="user123",
    tier=UserTier.BASIC,
    token_count=2000
)

if allowed:
    limiter.start_request("user123")
    try:
        # Process request
        pass
    finally:
        limiter.end_request("user123")
else:
    print(f"Rate limited: {error}")

Abuse Prevention

from collections import defaultdict
from datetime import datetime, timedelta
from typing import Set

class AbusePreventor:
    """Detect and prevent abuse patterns."""

    def __init__(self):
        self.blocked_ips: Set[str] = set()
        self.suspicious_users: Dict[str, int] = defaultdict(int)
        self.failed_attempts: Dict[str, List[datetime]] = defaultdict(list)

    def check_ip(self, ip_address: str) -> bool:
        """Check if IP is blocked."""
        return ip_address not in self.blocked_ips

    def record_suspicious_activity(
        self,
        user_id: str,
        activity_type: str
    ):
        """Record suspicious activity."""
        self.suspicious_users[user_id] += 1

        # Auto-block after threshold
        if self.suspicious_users[user_id] >= 10:
            self._escalate_user(user_id)

    def record_failed_attempt(
        self,
        identifier: str,
        attempt_type: str
    ):
        """Record failed attempt (auth, validation, etc.)."""
        now = datetime.utcnow()
        self.failed_attempts[identifier].append(now)

        # Clean old attempts
        cutoff = now - timedelta(hours=1)
        self.failed_attempts[identifier] = [
            t for t in self.failed_attempts[identifier]
            if t > cutoff
        ]

        # Check for brute force
        if len(self.failed_attempts[identifier]) > 50:
            self._handle_brute_force(identifier)

    def _escalate_user(self, user_id: str):
        """Handle user that exceeded suspicious threshold."""
        print(f"[ALERT] User {user_id} flagged for review")
        # In production: notify security team, temporarily restrict

    def _handle_brute_force(self, identifier: str):
        """Handle detected brute force attack."""
        print(f"[BLOCK] Brute force detected from {identifier}")
        self.blocked_ips.add(identifier)

class CostProtector:
    """Protect against cost explosion."""

    def __init__(self, daily_budget: float):
        self.daily_budget = daily_budget
        self.daily_spend: Dict[str, float] = defaultdict(float)
        self.last_reset = datetime.utcnow().date()

    def _check_reset(self):
        """Reset daily counters if new day."""
        today = datetime.utcnow().date()
        if today > self.last_reset:
            self.daily_spend.clear()
            self.last_reset = today

    def check_budget(self, user_id: str, estimated_cost: float) -> bool:
        """Check if request fits within budget."""
        self._check_reset()

        projected = self.daily_spend[user_id] + estimated_cost
        return projected <= self.daily_budget

    def record_spend(self, user_id: str, cost: float):
        """Record actual spend."""
        self._check_reset()
        self.daily_spend[user_id] += cost

    def get_remaining_budget(self, user_id: str) -> float:
        """Get remaining daily budget for user."""
        self._check_reset()
        return max(0, self.daily_budget - self.daily_spend[user_id])

# Combined protection
class ProtectionMiddleware:
    """Combined rate limiting and abuse prevention."""

    def __init__(self):
        self.rate_limiter = MultiTierLimiter()
        self.abuse_preventer = AbusePreventor()
        self.cost_protector = CostProtector(daily_budget=10.0)

    def check_request(
        self,
        user_id: str,
        ip_address: str,
        tier: UserTier,
        token_count: int,
        estimated_cost: float
    ) -> tuple[bool, Optional[str]]:
        """Run all protection checks."""
        # Check IP block
        if not self.abuse_preventer.check_ip(ip_address):
            return False, "IP blocked"

        # Check rate limits
        allowed, error = self.rate_limiter.check_all_limits(
            user_id, tier, token_count
        )
        if not allowed:
            return False, error

        # Check budget
        if not self.cost_protector.check_budget(user_id, estimated_cost):
            return False, "Daily budget exceeded"

        return True, None

Key Takeaway: Implement multi-layer rate limiting that considers requests per minute, daily limits, concurrent connections, and cost budgets. Combine with abuse detection for comprehensive protection. :::

Quiz

Module 5: Production Security Patterns

Take Quiz