أنماط أمن الإنتاج

تحديد المعدل ومنع الإساءة

3 دقيقة للقراءة

تحديد المعدل يحمي تطبيق LLM الخاص بك من الإساءة، ويتحكم في التكاليف، ويضمن الاستخدام العادل. يغطي هذا الدرس استراتيجيات التنفيذ وتقنيات منع الإساءة المتقدمة.

لماذا تحديد المعدل مهم

┌─────────────────────────────────────────────────────────────┐
│                    فوائد تحديد المعدل                        │
│                                                             │
│   بدون تحديد المعدل:                                        │
│   • مستخدم واحد يمكنه استنفاد ميزانية API                   │
│   • هجمات DoS يمكنها إسقاط الخدمة                          │
│   • الإساءة الآلية تمر دون فحص                             │
│   • توزيع غير عادل للموارد                                 │
│                                                             │
│   مع تحديد المعدل:                                          │
│   ✓ تكاليف متوقعة                                          │
│   ✓ توفر الخدمة                                            │
│   ✓ استخدام عادل عبر المستخدمين                            │
│   ✓ تخفيف الإساءة                                          │
└─────────────────────────────────────────────────────────────┘

خوارزمية دلو الرموز

import time
from dataclasses import dataclass
from typing import Dict, Optional
from threading import Lock

@dataclass
class Bucket:
    tokens: float
    last_update: float

class TokenBucketRateLimiter:
    """محدد معدل بدلو الرموز مع دعم الاندفاع."""

    def __init__(
        self,
        rate: float,  # رموز في الثانية
        capacity: float  # سعة الاندفاع القصوى
    ):
        self.rate = rate
        self.capacity = capacity
        self.buckets: Dict[str, Bucket] = {}
        self.lock = Lock()

    def _get_bucket(self, key: str) -> Bucket:
        """الحصول على أو إنشاء دلو للمفتاح."""
        if key not in self.buckets:
            self.buckets[key] = Bucket(
                tokens=self.capacity,
                last_update=time.time()
            )
        return self.buckets[key]

    def _refill(self, bucket: Bucket) -> None:
        """إعادة ملء الدلو بناءً على الوقت المنقضي."""
        now = time.time()
        elapsed = now - bucket.last_update
        bucket.tokens = min(
            self.capacity,
            bucket.tokens + elapsed * self.rate
        )
        bucket.last_update = now

    def consume(self, key: str, tokens: float = 1.0) -> bool:
        """محاولة استهلاك الرموز. يُرجع True إذا مسموح."""
        with self.lock:
            bucket = self._get_bucket(key)
            self._refill(bucket)

            if bucket.tokens >= tokens:
                bucket.tokens -= tokens
                return True
            return False

    def get_wait_time(self, key: str, tokens: float = 1.0) -> float:
        """الحصول على الثواني حتى تتوفر الرموز."""
        with self.lock:
            bucket = self._get_bucket(key)
            self._refill(bucket)

            if bucket.tokens >= tokens:
                return 0.0

            needed = tokens - bucket.tokens
            return needed / self.rate

# الاستخدام
limiter = TokenBucketRateLimiter(
    rate=1.0,  # رمز واحد في الثانية
    capacity=10.0  # السماح باندفاع 10
)

user_id = "user123"
if limiter.consume(user_id):
    print("الطلب مسموح")
else:
    wait = limiter.get_wait_time(user_id)
    print(f"محدود المعدل. أعد المحاولة خلال {wait:.1f} ثانية")

محدد معدل النافذة المنزلقة

from collections import deque
from datetime import datetime, timedelta
from typing import Dict, Deque
import time

class SlidingWindowLimiter:
    """محدد معدل النافذة المنزلقة للتحكم الدقيق."""

    def __init__(
        self,
        max_requests: int,
        window_seconds: int
    ):
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        self.requests: Dict[str, Deque[float]] = {}

    def is_allowed(self, key: str) -> bool:
        """تحقق إذا كان الطلب مسموحاً."""
        now = time.time()
        window_start = now - self.window_seconds

        # تهيئة الطابور إذا لزم
        if key not in self.requests:
            self.requests[key] = deque()

        # إزالة الطوابع الزمنية المنتهية
        while (
            self.requests[key] and
            self.requests[key][0] < window_start
        ):
            self.requests[key].popleft()

        # فحص الحد
        if len(self.requests[key]) >= self.max_requests:
            return False

        # تسجيل الطلب
        self.requests[key].append(now)
        return True

    def get_remaining(self, key: str) -> int:
        """الحصول على الطلبات المتبقية في النافذة الحالية."""
        now = time.time()
        window_start = now - self.window_seconds

        if key not in self.requests:
            return self.max_requests

        # تنظيف المنتهية
        while (
            self.requests[key] and
            self.requests[key][0] < window_start
        ):
            self.requests[key].popleft()

        return max(0, self.max_requests - len(self.requests[key]))

# الاستخدام
limiter = SlidingWindowLimiter(
    max_requests=100,
    window_seconds=60
)

if limiter.is_allowed("user123"):
    remaining = limiter.get_remaining("user123")
    print(f"مسموح. {remaining} طلب متبقي")

تحديد المعدل متعدد المستويات

from dataclasses import dataclass
from typing import Dict, List
from enum import Enum

class UserTier(Enum):
    FREE = "free"
    BASIC = "basic"
    PRO = "pro"
    ENTERPRISE = "enterprise"

@dataclass
class TierLimits:
    requests_per_minute: int
    requests_per_day: int
    max_tokens_per_request: int
    max_concurrent: int

class MultiTierLimiter:
    """محدد معدل بمستويات مختلفة."""

    TIER_LIMITS = {
        UserTier.FREE: TierLimits(
            requests_per_minute=10,
            requests_per_day=100,
            max_tokens_per_request=1000,
            max_concurrent=1
        ),
        UserTier.BASIC: TierLimits(
            requests_per_minute=30,
            requests_per_day=1000,
            max_tokens_per_request=4000,
            max_concurrent=3
        ),
        UserTier.PRO: TierLimits(
            requests_per_minute=100,
            requests_per_day=10000,
            max_tokens_per_request=8000,
            max_concurrent=10
        ),
        UserTier.ENTERPRISE: TierLimits(
            requests_per_minute=500,
            requests_per_day=100000,
            max_tokens_per_request=32000,
            max_concurrent=50
        ),
    }

    def __init__(self):
        self.minute_limiter = SlidingWindowLimiter(1, 60)
        self.day_limiter = SlidingWindowLimiter(1, 86400)
        self.concurrent: Dict[str, int] = {}

    def check_all_limits(
        self,
        user_id: str,
        tier: UserTier,
        token_count: int
    ) -> tuple[bool, Optional[str]]:
        """فحص جميع حدود المعدل للطلب."""
        limits = self.TIER_LIMITS[tier]

        # فحص الرموز لكل طلب
        if token_count > limits.max_tokens_per_request:
            return False, f"الحد الأقصى {limits.max_tokens_per_request} رمز لكل طلب"

        # فحص الطلبات المتزامنة
        current_concurrent = self.concurrent.get(user_id, 0)
        if current_concurrent >= limits.max_concurrent:
            return False, f"الحد الأقصى {limits.max_concurrent} طلب متزامن"

        # فحص حد الدقيقة
        minute_key = f"{user_id}:minute"
        self.minute_limiter.max_requests = limits.requests_per_minute
        if not self.minute_limiter.is_allowed(minute_key):
            return False, f"حد المعدل: {limits.requests_per_minute}/دقيقة"

        # فحص الحد اليومي
        day_key = f"{user_id}:day"
        self.day_limiter.max_requests = limits.requests_per_day
        if not self.day_limiter.is_allowed(day_key):
            return False, f"الحد اليومي: {limits.requests_per_day}/يوم"

        return True, None

    def start_request(self, user_id: str):
        """تحديد الطلب كمبدوء (لتتبع التزامن)."""
        self.concurrent[user_id] = self.concurrent.get(user_id, 0) + 1

    def end_request(self, user_id: str):
        """تحديد الطلب كمكتمل."""
        if user_id in self.concurrent:
            self.concurrent[user_id] = max(0, self.concurrent[user_id] - 1)

# الاستخدام
limiter = MultiTierLimiter()

allowed, error = limiter.check_all_limits(
    user_id="user123",
    tier=UserTier.BASIC,
    token_count=2000
)

if allowed:
    limiter.start_request("user123")
    try:
        # معالجة الطلب
        pass
    finally:
        limiter.end_request("user123")
else:
    print(f"محدود المعدل: {error}")

منع الإساءة

from collections import defaultdict
from datetime import datetime, timedelta
from typing import Set

class AbusePreventor:
    """كشف ومنع أنماط الإساءة."""

    def __init__(self):
        self.blocked_ips: Set[str] = set()
        self.suspicious_users: Dict[str, int] = defaultdict(int)
        self.failed_attempts: Dict[str, List[datetime]] = defaultdict(list)

    def check_ip(self, ip_address: str) -> bool:
        """تحقق إذا كان IP محظوراً."""
        return ip_address not in self.blocked_ips

    def record_suspicious_activity(
        self,
        user_id: str,
        activity_type: str
    ):
        """تسجيل النشاط المشبوه."""
        self.suspicious_users[user_id] += 1

        # حظر تلقائي بعد العتبة
        if self.suspicious_users[user_id] >= 10:
            self._escalate_user(user_id)

    def record_failed_attempt(
        self,
        identifier: str,
        attempt_type: str
    ):
        """تسجيل محاولة فاشلة (مصادقة، تحقق، إلخ)."""
        now = datetime.utcnow()
        self.failed_attempts[identifier].append(now)

        # تنظيف المحاولات القديمة
        cutoff = now - timedelta(hours=1)
        self.failed_attempts[identifier] = [
            t for t in self.failed_attempts[identifier]
            if t > cutoff
        ]

        # فحص هجوم القوة الغاشمة
        if len(self.failed_attempts[identifier]) > 50:
            self._handle_brute_force(identifier)

    def _escalate_user(self, user_id: str):
        """التعامل مع المستخدم الذي تجاوز عتبة الشبهة."""
        print(f"[تنبيه] المستخدم {user_id} تم تعليمه للمراجعة")
        # في الإنتاج: إخطار فريق الأمان، تقييد مؤقت

    def _handle_brute_force(self, identifier: str):
        """التعامل مع هجوم القوة الغاشمة المكتشف."""
        print(f"[حظر] تم كشف هجوم قوة غاشمة من {identifier}")
        self.blocked_ips.add(identifier)

class CostProtector:
    """الحماية من انفجار التكاليف."""

    def __init__(self, daily_budget: float):
        self.daily_budget = daily_budget
        self.daily_spend: Dict[str, float] = defaultdict(float)
        self.last_reset = datetime.utcnow().date()

    def _check_reset(self):
        """إعادة تعيين العدادات اليومية إذا يوم جديد."""
        today = datetime.utcnow().date()
        if today > self.last_reset:
            self.daily_spend.clear()
            self.last_reset = today

    def check_budget(self, user_id: str, estimated_cost: float) -> bool:
        """تحقق إذا كان الطلب يناسب الميزانية."""
        self._check_reset()

        projected = self.daily_spend[user_id] + estimated_cost
        return projected <= self.daily_budget

    def record_spend(self, user_id: str, cost: float):
        """تسجيل الإنفاق الفعلي."""
        self._check_reset()
        self.daily_spend[user_id] += cost

    def get_remaining_budget(self, user_id: str) -> float:
        """الحصول على الميزانية اليومية المتبقية للمستخدم."""
        self._check_reset()
        return max(0, self.daily_budget - self.daily_spend[user_id])

# الحماية المدمجة
class ProtectionMiddleware:
    """تحديد المعدل ومنع الإساءة المدمج."""

    def __init__(self):
        self.rate_limiter = MultiTierLimiter()
        self.abuse_preventer = AbusePreventor()
        self.cost_protector = CostProtector(daily_budget=10.0)

    def check_request(
        self,
        user_id: str,
        ip_address: str,
        tier: UserTier,
        token_count: int,
        estimated_cost: float
    ) -> tuple[bool, Optional[str]]:
        """تشغيل جميع فحوصات الحماية."""
        # فحص حظر IP
        if not self.abuse_preventer.check_ip(ip_address):
            return False, "IP محظور"

        # فحص حدود المعدل
        allowed, error = self.rate_limiter.check_all_limits(
            user_id, tier, token_count
        )
        if not allowed:
            return False, error

        # فحص الميزانية
        if not self.cost_protector.check_budget(user_id, estimated_cost):
            return False, "تم تجاوز الميزانية اليومية"

        return True, None

النقطة الرئيسية: طبّق تحديد معدل متعدد الطبقات يأخذ في الاعتبار الطلبات في الدقيقة، الحدود اليومية، الاتصالات المتزامنة، وميزانيات التكلفة. ادمجه مع كشف الإساءة للحماية الشاملة. :::

اختبار

الوحدة 5: أنماط أمن الإنتاج

خذ الاختبار