أنماط أمن الإنتاج
تحديد المعدل ومنع الإساءة
3 دقيقة للقراءة
تحديد المعدل يحمي تطبيق LLM الخاص بك من الإساءة، ويتحكم في التكاليف، ويضمن الاستخدام العادل. يغطي هذا الدرس استراتيجيات التنفيذ وتقنيات منع الإساءة المتقدمة.
لماذا تحديد المعدل مهم
┌─────────────────────────────────────────────────────────────┐
│ فوائد تحديد المعدل │
│ │
│ بدون تحديد المعدل: │
│ • مستخدم واحد يمكنه استنفاد ميزانية API │
│ • هجمات DoS يمكنها إسقاط الخدمة │
│ • الإساءة الآلية تمر دون فحص │
│ • توزيع غير عادل للموارد │
│ │
│ مع تحديد المعدل: │
│ ✓ تكاليف متوقعة │
│ ✓ توفر الخدمة │
│ ✓ استخدام عادل عبر المستخدمين │
│ ✓ تخفيف الإساءة │
└─────────────────────────────────────────────────────────────┘
خوارزمية دلو الرموز
import time
from dataclasses import dataclass
from typing import Dict, Optional
from threading import Lock
@dataclass
class Bucket:
tokens: float
last_update: float
class TokenBucketRateLimiter:
"""محدد معدل بدلو الرموز مع دعم الاندفاع."""
def __init__(
self,
rate: float, # رموز في الثانية
capacity: float # سعة الاندفاع القصوى
):
self.rate = rate
self.capacity = capacity
self.buckets: Dict[str, Bucket] = {}
self.lock = Lock()
def _get_bucket(self, key: str) -> Bucket:
"""الحصول على أو إنشاء دلو للمفتاح."""
if key not in self.buckets:
self.buckets[key] = Bucket(
tokens=self.capacity,
last_update=time.time()
)
return self.buckets[key]
def _refill(self, bucket: Bucket) -> None:
"""إعادة ملء الدلو بناءً على الوقت المنقضي."""
now = time.time()
elapsed = now - bucket.last_update
bucket.tokens = min(
self.capacity,
bucket.tokens + elapsed * self.rate
)
bucket.last_update = now
def consume(self, key: str, tokens: float = 1.0) -> bool:
"""محاولة استهلاك الرموز. يُرجع True إذا مسموح."""
with self.lock:
bucket = self._get_bucket(key)
self._refill(bucket)
if bucket.tokens >= tokens:
bucket.tokens -= tokens
return True
return False
def get_wait_time(self, key: str, tokens: float = 1.0) -> float:
"""الحصول على الثواني حتى تتوفر الرموز."""
with self.lock:
bucket = self._get_bucket(key)
self._refill(bucket)
if bucket.tokens >= tokens:
return 0.0
needed = tokens - bucket.tokens
return needed / self.rate
# الاستخدام
limiter = TokenBucketRateLimiter(
rate=1.0, # رمز واحد في الثانية
capacity=10.0 # السماح باندفاع 10
)
user_id = "user123"
if limiter.consume(user_id):
print("الطلب مسموح")
else:
wait = limiter.get_wait_time(user_id)
print(f"محدود المعدل. أعد المحاولة خلال {wait:.1f} ثانية")
محدد معدل النافذة المنزلقة
from collections import deque
from datetime import datetime, timedelta
from typing import Dict, Deque
import time
class SlidingWindowLimiter:
"""محدد معدل النافذة المنزلقة للتحكم الدقيق."""
def __init__(
self,
max_requests: int,
window_seconds: int
):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests: Dict[str, Deque[float]] = {}
def is_allowed(self, key: str) -> bool:
"""تحقق إذا كان الطلب مسموحاً."""
now = time.time()
window_start = now - self.window_seconds
# تهيئة الطابور إذا لزم
if key not in self.requests:
self.requests[key] = deque()
# إزالة الطوابع الزمنية المنتهية
while (
self.requests[key] and
self.requests[key][0] < window_start
):
self.requests[key].popleft()
# فحص الحد
if len(self.requests[key]) >= self.max_requests:
return False
# تسجيل الطلب
self.requests[key].append(now)
return True
def get_remaining(self, key: str) -> int:
"""الحصول على الطلبات المتبقية في النافذة الحالية."""
now = time.time()
window_start = now - self.window_seconds
if key not in self.requests:
return self.max_requests
# تنظيف المنتهية
while (
self.requests[key] and
self.requests[key][0] < window_start
):
self.requests[key].popleft()
return max(0, self.max_requests - len(self.requests[key]))
# الاستخدام
limiter = SlidingWindowLimiter(
max_requests=100,
window_seconds=60
)
if limiter.is_allowed("user123"):
remaining = limiter.get_remaining("user123")
print(f"مسموح. {remaining} طلب متبقي")
تحديد المعدل متعدد المستويات
from dataclasses import dataclass
from typing import Dict, List
from enum import Enum
class UserTier(Enum):
FREE = "free"
BASIC = "basic"
PRO = "pro"
ENTERPRISE = "enterprise"
@dataclass
class TierLimits:
requests_per_minute: int
requests_per_day: int
max_tokens_per_request: int
max_concurrent: int
class MultiTierLimiter:
"""محدد معدل بمستويات مختلفة."""
TIER_LIMITS = {
UserTier.FREE: TierLimits(
requests_per_minute=10,
requests_per_day=100,
max_tokens_per_request=1000,
max_concurrent=1
),
UserTier.BASIC: TierLimits(
requests_per_minute=30,
requests_per_day=1000,
max_tokens_per_request=4000,
max_concurrent=3
),
UserTier.PRO: TierLimits(
requests_per_minute=100,
requests_per_day=10000,
max_tokens_per_request=8000,
max_concurrent=10
),
UserTier.ENTERPRISE: TierLimits(
requests_per_minute=500,
requests_per_day=100000,
max_tokens_per_request=32000,
max_concurrent=50
),
}
def __init__(self):
self.minute_limiter = SlidingWindowLimiter(1, 60)
self.day_limiter = SlidingWindowLimiter(1, 86400)
self.concurrent: Dict[str, int] = {}
def check_all_limits(
self,
user_id: str,
tier: UserTier,
token_count: int
) -> tuple[bool, Optional[str]]:
"""فحص جميع حدود المعدل للطلب."""
limits = self.TIER_LIMITS[tier]
# فحص الرموز لكل طلب
if token_count > limits.max_tokens_per_request:
return False, f"الحد الأقصى {limits.max_tokens_per_request} رمز لكل طلب"
# فحص الطلبات المتزامنة
current_concurrent = self.concurrent.get(user_id, 0)
if current_concurrent >= limits.max_concurrent:
return False, f"الحد الأقصى {limits.max_concurrent} طلب متزامن"
# فحص حد الدقيقة
minute_key = f"{user_id}:minute"
self.minute_limiter.max_requests = limits.requests_per_minute
if not self.minute_limiter.is_allowed(minute_key):
return False, f"حد المعدل: {limits.requests_per_minute}/دقيقة"
# فحص الحد اليومي
day_key = f"{user_id}:day"
self.day_limiter.max_requests = limits.requests_per_day
if not self.day_limiter.is_allowed(day_key):
return False, f"الحد اليومي: {limits.requests_per_day}/يوم"
return True, None
def start_request(self, user_id: str):
"""تحديد الطلب كمبدوء (لتتبع التزامن)."""
self.concurrent[user_id] = self.concurrent.get(user_id, 0) + 1
def end_request(self, user_id: str):
"""تحديد الطلب كمكتمل."""
if user_id in self.concurrent:
self.concurrent[user_id] = max(0, self.concurrent[user_id] - 1)
# الاستخدام
limiter = MultiTierLimiter()
allowed, error = limiter.check_all_limits(
user_id="user123",
tier=UserTier.BASIC,
token_count=2000
)
if allowed:
limiter.start_request("user123")
try:
# معالجة الطلب
pass
finally:
limiter.end_request("user123")
else:
print(f"محدود المعدل: {error}")
منع الإساءة
from collections import defaultdict
from datetime import datetime, timedelta
from typing import Set
class AbusePreventor:
"""كشف ومنع أنماط الإساءة."""
def __init__(self):
self.blocked_ips: Set[str] = set()
self.suspicious_users: Dict[str, int] = defaultdict(int)
self.failed_attempts: Dict[str, List[datetime]] = defaultdict(list)
def check_ip(self, ip_address: str) -> bool:
"""تحقق إذا كان IP محظوراً."""
return ip_address not in self.blocked_ips
def record_suspicious_activity(
self,
user_id: str,
activity_type: str
):
"""تسجيل النشاط المشبوه."""
self.suspicious_users[user_id] += 1
# حظر تلقائي بعد العتبة
if self.suspicious_users[user_id] >= 10:
self._escalate_user(user_id)
def record_failed_attempt(
self,
identifier: str,
attempt_type: str
):
"""تسجيل محاولة فاشلة (مصادقة، تحقق، إلخ)."""
now = datetime.utcnow()
self.failed_attempts[identifier].append(now)
# تنظيف المحاولات القديمة
cutoff = now - timedelta(hours=1)
self.failed_attempts[identifier] = [
t for t in self.failed_attempts[identifier]
if t > cutoff
]
# فحص هجوم القوة الغاشمة
if len(self.failed_attempts[identifier]) > 50:
self._handle_brute_force(identifier)
def _escalate_user(self, user_id: str):
"""التعامل مع المستخدم الذي تجاوز عتبة الشبهة."""
print(f"[تنبيه] المستخدم {user_id} تم تعليمه للمراجعة")
# في الإنتاج: إخطار فريق الأمان، تقييد مؤقت
def _handle_brute_force(self, identifier: str):
"""التعامل مع هجوم القوة الغاشمة المكتشف."""
print(f"[حظر] تم كشف هجوم قوة غاشمة من {identifier}")
self.blocked_ips.add(identifier)
class CostProtector:
"""الحماية من انفجار التكاليف."""
def __init__(self, daily_budget: float):
self.daily_budget = daily_budget
self.daily_spend: Dict[str, float] = defaultdict(float)
self.last_reset = datetime.utcnow().date()
def _check_reset(self):
"""إعادة تعيين العدادات اليومية إذا يوم جديد."""
today = datetime.utcnow().date()
if today > self.last_reset:
self.daily_spend.clear()
self.last_reset = today
def check_budget(self, user_id: str, estimated_cost: float) -> bool:
"""تحقق إذا كان الطلب يناسب الميزانية."""
self._check_reset()
projected = self.daily_spend[user_id] + estimated_cost
return projected <= self.daily_budget
def record_spend(self, user_id: str, cost: float):
"""تسجيل الإنفاق الفعلي."""
self._check_reset()
self.daily_spend[user_id] += cost
def get_remaining_budget(self, user_id: str) -> float:
"""الحصول على الميزانية اليومية المتبقية للمستخدم."""
self._check_reset()
return max(0, self.daily_budget - self.daily_spend[user_id])
# الحماية المدمجة
class ProtectionMiddleware:
"""تحديد المعدل ومنع الإساءة المدمج."""
def __init__(self):
self.rate_limiter = MultiTierLimiter()
self.abuse_preventer = AbusePreventor()
self.cost_protector = CostProtector(daily_budget=10.0)
def check_request(
self,
user_id: str,
ip_address: str,
tier: UserTier,
token_count: int,
estimated_cost: float
) -> tuple[bool, Optional[str]]:
"""تشغيل جميع فحوصات الحماية."""
# فحص حظر IP
if not self.abuse_preventer.check_ip(ip_address):
return False, "IP محظور"
# فحص حدود المعدل
allowed, error = self.rate_limiter.check_all_limits(
user_id, tier, token_count
)
if not allowed:
return False, error
# فحص الميزانية
if not self.cost_protector.check_budget(user_id, estimated_cost):
return False, "تم تجاوز الميزانية اليومية"
return True, None
النقطة الرئيسية: طبّق تحديد معدل متعدد الطبقات يأخذ في الاعتبار الطلبات في الدقيقة، الحدود اليومية، الاتصالات المتزامنة، وميزانيات التكلفة. ادمجه مع كشف الإساءة للحماية الشاملة. :::