الدرس 19 من 23

الإنتاج والموثوقية

الأمان والحواجز

3 دقيقة للقراءة

أنظمة الذكاء الاصطناعي في الإنتاج تحتاج طبقات متعددة من الأمان—التحقق من المدخلات، تصفية المخرجات، والقيود السلوكية. يغطي هذا الدرس تنفيذ حواجز شاملة.

الدفاع في العمق

┌─────────────────────────────────────────────────────────────┐
│                     مدخل المستخدم                            │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│  الطبقة 1: التحقق من المدخلات                                │
│  - حدود الطول                                                │
│  - التحقق من التنسيق                                         │
│  - كشف الحقن                                                 │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│  الطبقة 2: إشراف المحتوى                                     │
│  - كشف المحتوى الضار                                         │
│  - كشف المعلومات الشخصية                                     │
│  - تصنيف الموضوع                                             │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│  الطبقة 3: معالجة LLM                                        │
│  - قيود موجه النظام                                          │
│  - قيود الأدوات                                              │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│  الطبقة 4: تصفية المخرجات                                    │
│  - التحقق من الاستجابة                                       │
│  - كشف الهلوسة                                               │
│  - التحقق من الاقتباسات                                      │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│                     استجابة المستخدم                         │
└─────────────────────────────────────────────────────────────┘

التحقق من المدخلات

import re
from dataclasses import dataclass
from typing import Optional

@dataclass
class ValidationResult:
    is_valid: bool
    error_message: Optional[str] = None
    sanitized_input: Optional[str] = None

class InputValidator:
    def __init__(
        self,
        max_length: int = 10000,
        max_tokens: int = 4000,
        blocked_patterns: list = None
    ):
        self.max_length = max_length
        self.max_tokens = max_tokens
        self.blocked_patterns = blocked_patterns or []

        # أنماط الحقن الشائعة
        self.injection_patterns = [
            r"ignore\s+(previous|above)\s+instructions",
            r"disregard\s+your\s+(system|initial)",
            r"you\s+are\s+now\s+",
            r"pretend\s+you\s+are",
            r"act\s+as\s+if",
        ]

    def validate(self, user_input: str) -> ValidationResult:
        # فحص الطول
        if len(user_input) > self.max_length:
            return ValidationResult(
                is_valid=False,
                error_message=f"المدخل يتجاوز {self.max_length} حرف"
            )

        # فحص محاولات الحقن
        input_lower = user_input.lower()
        for pattern in self.injection_patterns:
            if re.search(pattern, input_lower):
                return ValidationResult(
                    is_valid=False,
                    error_message="المدخل يحتوي أنماط محظورة"
                )

        # فحص الأنماط المحظورة (مخصصة)
        for pattern in self.blocked_patterns:
            if re.search(pattern, user_input, re.IGNORECASE):
                return ValidationResult(
                    is_valid=False,
                    error_message="المدخل يحتوي محتوى محظور"
                )

        # تنظيف (إزالة البايتات الفارغة، أحرف التحكم)
        sanitized = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f]', '', user_input)

        return ValidationResult(
            is_valid=True,
            sanitized_input=sanitized
        )

إشراف المحتوى

from enum import Enum

class ContentCategory(Enum):
    SAFE = "safe"
    HARMFUL = "harmful"
    SEXUAL = "sexual"
    HATE = "hate"
    VIOLENCE = "violence"
    SELF_HARM = "self_harm"
    PII = "pii"

class ContentModerator:
    def __init__(self, moderation_model):
        self.model = moderation_model
        self.pii_patterns = {
            "email": r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}",
            "phone": r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b",
            "ssn": r"\b\d{3}-\d{2}-\d{4}\b",
            "credit_card": r"\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b"
        }

    async def moderate(self, text: str) -> dict:
        results = {
            "flagged": False,
            "categories": [],
            "pii_detected": [],
            "confidence": 0.0
        }

        # فحص المعلومات الشخصية
        for pii_type, pattern in self.pii_patterns.items():
            if re.search(pattern, text):
                results["pii_detected"].append(pii_type)
                results["flagged"] = True

        # استخدام نموذج الإشراف لفئات المحتوى
        moderation = await self.model.moderate(text)

        for category, score in moderation.scores.items():
            if score > 0.7:  # العتبة
                results["categories"].append(category)
                results["flagged"] = True
                results["confidence"] = max(results["confidence"], score)

        return results

    def redact_pii(self, text: str) -> str:
        """استبدال المعلومات الشخصية بعناصر نائبة."""
        redacted = text
        for pii_type, pattern in self.pii_patterns.items():
            redacted = re.sub(pattern, f"[{pii_type.upper()}_محجوب]", redacted)
        return redacted

تصفية المخرجات

class OutputFilter:
    def __init__(self, allowed_domains: list = None):
        self.allowed_domains = allowed_domains or []

    async def filter(self, response: str, context: dict = None) -> dict:
        """تصفية والتحقق من مخرجات LLM."""
        issues = []

        # فحص مؤشرات تسريب موجه النظام
        system_leak_patterns = [
            "my system prompt",
            "i was instructed to",
            "my instructions say",
            "i am programmed to"
        ]

        for pattern in system_leak_patterns:
            if pattern in response.lower():
                issues.append("potential_system_leak")

        # التحقق من أن الروابط من النطاقات المسموحة
        urls = re.findall(r'https?://[^\s<>"{}|\\^`\[\]]+', response)
        for url in urls:
            domain = url.split('/')[2]
            if self.allowed_domains and domain not in self.allowed_domains:
                issues.append(f"unauthorized_domain: {domain}")

        # فحص الادعاءات الواقعية مقابل السياق (إذا RAG)
        if context and "sources" in context:
            # فحص هلوسة مبسط
            claims = self._extract_claims(response)
            for claim in claims:
                if not self._verify_claim(claim, context["sources"]):
                    issues.append(f"unverified_claim: {claim[:50]}...")

        return {
            "filtered_response": response,
            "issues": issues,
            "passed": len(issues) == 0
        }

    def _extract_claims(self, text: str) -> list:
        """استخراج الادعاءات الواقعية من النص."""
        # مبسط: تقسيم بالجمل
        sentences = text.split('.')
        return [s.strip() for s in sentences if len(s.strip()) > 20]

    def _verify_claim(self, claim: str, sources: list) -> bool:
        """التحقق إذا الادعاء مدعوم بالمصادر."""
        # في الإنتاج: استخدم تشابه التضمين أو نموذج NLI
        claim_lower = claim.lower()
        for source in sources:
            if any(word in source.lower() for word in claim_lower.split()[:5]):
                return True
        return False

تحديد المعدل

import time
from collections import defaultdict

class RateLimiter:
    def __init__(
        self,
        requests_per_minute: int = 60,
        requests_per_hour: int = 1000,
        tokens_per_day: int = 1000000
    ):
        self.rpm = requests_per_minute
        self.rph = requests_per_hour
        self.tpd = tokens_per_day

        self.request_times = defaultdict(list)
        self.token_usage = defaultdict(int)
        self.token_reset_time = defaultdict(float)

    def check_limit(self, user_id: str) -> tuple[bool, str]:
        """التحقق إذا المستخدم ضمن حدود المعدل."""
        now = time.time()

        # تنظيف المدخلات القديمة
        minute_ago = now - 60
        hour_ago = now - 3600

        self.request_times[user_id] = [
            t for t in self.request_times[user_id] if t > hour_ago
        ]

        # فحص الطلبات في الدقيقة
        recent_minute = sum(1 for t in self.request_times[user_id] if t > minute_ago)
        if recent_minute >= self.rpm:
            return False, f"حد المعدل: تم تجاوز {self.rpm} طلب/دقيقة"

        # فحص الطلبات في الساعة
        if len(self.request_times[user_id]) >= self.rph:
            return False, f"حد المعدل: تم تجاوز {self.rph} طلب/ساعة"

        # فحص حد الرموز اليومي
        if now - self.token_reset_time[user_id] > 86400:
            self.token_usage[user_id] = 0
            self.token_reset_time[user_id] = now

        if self.token_usage[user_id] >= self.tpd:
            return False, f"تم تجاوز حد الرموز اليومي {self.tpd}"

        return True, "موافق"

    def record_request(self, user_id: str, tokens_used: int = 0):
        """تسجيل طلب لتحديد المعدل."""
        self.request_times[user_id].append(time.time())
        self.token_usage[user_id] += tokens_used

خط أنابيب الحواجز المجمعة

class GuardrailPipeline:
    def __init__(self):
        self.input_validator = InputValidator()
        self.moderator = ContentModerator(moderation_model)
        self.output_filter = OutputFilter()
        self.rate_limiter = RateLimiter()

    async def process_request(
        self,
        user_id: str,
        user_input: str
    ) -> dict:
        # الخطوة 1: تحديد المعدل
        allowed, message = self.rate_limiter.check_limit(user_id)
        if not allowed:
            return {"error": message, "blocked_at": "rate_limit"}

        # الخطوة 2: التحقق من المدخلات
        validation = self.input_validator.validate(user_input)
        if not validation.is_valid:
            return {"error": validation.error_message, "blocked_at": "validation"}

        # الخطوة 3: إشراف المحتوى
        moderation = await self.moderator.moderate(validation.sanitized_input)
        if moderation["flagged"]:
            return {
                "error": "انتهاك سياسة المحتوى",
                "categories": moderation["categories"],
                "blocked_at": "moderation"
            }

        return {
            "approved": True,
            "sanitized_input": validation.sanitized_input
        }

نصيحة للمقابلة

عند مناقشة الأمان:

  1. النهج الطبقي - لا نقطة فشل واحدة
  2. الإيجابيات الكاذبة - التوازن بين الأمان وقابلية الاستخدام
  3. التسجيل - تتبع المحتوى المحظور للتحليل
  4. التحديثات - كيف تتعامل مع نواقل الهجوم الجديدة؟

بعد ذلك، سنغطي استراتيجيات النشر لأنظمة الذكاء الاصطناعي. :::

اختبار

الوحدة 5: الإنتاج والموثوقية

خذ الاختبار