بناء حواجز المدخلات والمخرجات

تطهير المخرجات

3 دقيقة للقراءة

حتى مع التحقق المثالي من المدخلات، مخرجات LLM قد تحتوي محتوى ضار. تطهير المخرجات يضمن أن الاستجابات آمنة قبل وصولها للمستخدمين أو الأنظمة اللاحقة.

لماذا تطهير المخرجات مهم

┌─────────────────────────────────────────────────────────────┐
│                    سيناريوهات مخاطر المخرجات                 │
│                                                             │
│   1. XSS: النموذج يولد <script>alert('xss')</script>       │
│   2. تسريب بيانات: النموذج يُخرج بيانات تدريب (PII)         │
│   3. محتوى ضار: عنف، خطاب كراهية                           │
│   4. هلوسة: معلومات خاطئة مقدمة كحقائق                     │
│   5. تسريب تعليمات: كشف محث النظام                         │
└─────────────────────────────────────────────────────────────┘

تطهير HTML/XSS

عندما تُعرض مخرجات LLM في سياقات الويب:

import html
import re
from typing import Optional

def sanitize_for_html(llm_output: str) -> str:
    """تطهير مخرجات LLM للعرض الآمن في HTML."""
    # تهريب أحرف HTML الخاصة
    sanitized = html.escape(llm_output)

    # إزالة أي أنماط شبيهة بالسكريبت المتبقية
    sanitized = re.sub(
        r'javascript:',
        '[removed]:',
        sanitized,
        flags=re.IGNORECASE
    )
    sanitized = re.sub(
        r'on\w+\s*=',
        '[removed]=',
        sanitized,
        flags=re.IGNORECASE
    )

    return sanitized

def sanitize_for_markdown(llm_output: str) -> str:
    """تطهير لعرض markdown (يسمح بالتنسيق)."""
    # السماح بـ markdown الآمن لكن تهريب HTML
    sanitized = html.escape(llm_output)

    # إعادة تمكين أنماط markdown الآمنة
    # عريض: **text** أو __text__
    sanitized = re.sub(r'&lt;b&gt;(.+?)&lt;/b&gt;', r'**\1**', sanitized)

    return sanitized

# مثال
raw_output = "Here's code: <script>steal_cookies()</script>"
safe_output = sanitize_for_html(raw_output)
print(safe_output)
# المخرج: Here's code: &lt;script&gt;steal_cookies()&lt;/script&gt;

كشف وتنقيح PII

import re
from dataclasses import dataclass
from typing import List, Tuple

@dataclass
class PIIMatch:
    type: str
    value: str
    start: int
    end: int

class PIIRedactor:
    """كشف وتنقيح معلومات التعريف الشخصية."""

    PATTERNS = {
        "email": r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
        "phone_us": r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
        "ssn": r'\b\d{3}-\d{2}-\d{4}\b',
        "credit_card": r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b',
        "ip_address": r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b',
    }

    def detect(self, text: str) -> List[PIIMatch]:
        """إيجاد كل PII في النص."""
        matches = []
        for pii_type, pattern in self.PATTERNS.items():
            for match in re.finditer(pattern, text):
                matches.append(PIIMatch(
                    type=pii_type,
                    value=match.group(),
                    start=match.start(),
                    end=match.end()
                ))
        return matches

    def redact(self, text: str) -> Tuple[str, List[PIIMatch]]:
        """تنقيح كل PII من النص."""
        matches = self.detect(text)
        redacted = text

        # الترتيب حسب الموقع (عكسي) للحفاظ على الفهارس
        for match in sorted(matches, key=lambda m: m.start, reverse=True):
            placeholder = f"[REDACTED_{match.type.upper()}]"
            redacted = redacted[:match.start] + placeholder + redacted[match.end:]

        return redacted, matches

# الاستخدام
redactor = PIIRedactor()
llm_output = "Contact John at john@example.com or 555-123-4567"
safe_output, detected = redactor.redact(llm_output)
print(safe_output)
# المخرج: Contact John at [REDACTED_EMAIL] or [REDACTED_PHONE_US]

إشراف المحتوى

from enum import Enum
from typing import Dict, List

class ContentCategory(Enum):
    SAFE = "safe"
    VIOLENCE = "violence"
    HATE_SPEECH = "hate_speech"
    SELF_HARM = "self_harm"
    SEXUAL = "sexual"
    DANGEROUS = "dangerous"

class OutputModerator:
    """الإشراف على مخرجات LLM للمحتوى الضار."""

    def __init__(self):
        # في الإنتاج، استخدم نموذج إشراف مناسب
        self.blocked_phrases = self._load_blocked_phrases()

    def _load_blocked_phrases(self) -> Dict[ContentCategory, List[str]]:
        """تحميل أنماط العبارات المحظورة."""
        return {
            ContentCategory.VIOLENCE: [
                "how to make a bomb",
                "how to kill",
                "attack instructions",
            ],
            ContentCategory.DANGEROUS: [
                "synthesize drugs",
                "create malware",
                "hack into",
            ],
        }

    def check(self, text: str) -> Tuple[bool, List[ContentCategory]]:
        """تحقق إذا كان المخرج يحتوي محتوى ضار."""
        text_lower = text.lower()
        detected_categories = []

        for category, phrases in self.blocked_phrases.items():
            for phrase in phrases:
                if phrase in text_lower:
                    detected_categories.append(category)
                    break

        is_safe = len(detected_categories) == 0
        return is_safe, detected_categories

    def filter(self, text: str) -> str:
        """تصفية المحتوى الضار من المخرج."""
        is_safe, categories = self.check(text)

        if not is_safe:
            return (
                "لا أستطيع تقديم تلك المعلومات. "
                "يرجى السؤال عن شيء آخر."
            )
        return text

# للإنتاج، استخدم إشراف عبر API:
# - OpenAI Moderation API
# - Google Perspective API
# - Azure Content Safety

كشف تسريب محث النظام

def detect_system_prompt_leak(
    llm_output: str,
    system_prompt: str,
    threshold: float = 0.5
) -> Tuple[bool, float]:
    """كشف إذا كان المخرج يحتوي محتوى محث النظام."""
    # كشف التداخل البسيط
    system_words = set(system_prompt.lower().split())
    output_words = set(llm_output.lower().split())

    if not system_words:
        return False, 0.0

    overlap = len(system_words & output_words)
    overlap_ratio = overlap / len(system_words)

    is_leaked = overlap_ratio > threshold
    return is_leaked, overlap_ratio

def sanitize_system_prompt_leak(
    llm_output: str,
    system_prompt: str
) -> str:
    """إزالة تسريبات محث النظام المحتملة من المخرج."""
    is_leaked, ratio = detect_system_prompt_leak(llm_output, system_prompt)

    if is_leaked:
        return (
            "لا أستطيع مشاركة تلك المعلومات. "
            "كيف يمكنني مساعدتك بطريقة أخرى؟"
        )
    return llm_output

مطهر المخرجات الكامل

from dataclasses import dataclass
from typing import Optional

@dataclass
class SanitizationResult:
    original: str
    sanitized: str
    was_modified: bool
    blocked: bool
    reason: Optional[str] = None

class OutputSanitizer:
    """خط أنابيب تطهير المخرجات الكامل."""

    def __init__(self, system_prompt: str = ""):
        self.pii_redactor = PIIRedactor()
        self.moderator = OutputModerator()
        self.system_prompt = system_prompt

    def sanitize(
        self,
        llm_output: str,
        context: str = "html"
    ) -> SanitizationResult:
        """تشغيل خط أنابيب التطهير الكامل."""
        original = llm_output
        current = llm_output

        # الخطوة 1: تحقق من المحتوى الضار
        is_safe, categories = self.moderator.check(current)
        if not is_safe:
            return SanitizationResult(
                original=original,
                sanitized="لا أستطيع تقديم تلك الاستجابة.",
                was_modified=True,
                blocked=True,
                reason=f"فئات محظورة: {categories}"
            )

        # الخطوة 2: تحقق من تسريب محث النظام
        if self.system_prompt:
            is_leaked, _ = detect_system_prompt_leak(current, self.system_prompt)
            if is_leaked:
                return SanitizationResult(
                    original=original,
                    sanitized="لا أستطيع مشاركة تلك المعلومات.",
                    was_modified=True,
                    blocked=True,
                    reason="تم كشف تسريب محث النظام"
                )

        # الخطوة 3: تنقيح PII
        current, pii_matches = self.pii_redactor.redact(current)

        # الخطوة 4: التطهير حسب السياق
        if context == "html":
            current = sanitize_for_html(current)
        elif context == "markdown":
            current = sanitize_for_markdown(current)

        was_modified = current != original
        return SanitizationResult(
            original=original,
            sanitized=current,
            was_modified=was_modified,
            blocked=False
        )

# الاستخدام
sanitizer = OutputSanitizer(system_prompt="أنت مساعد مفيد...")
result = sanitizer.sanitize(llm_response, context="html")
if not result.blocked:
    display_to_user(result.sanitized)

النقطة الرئيسية: تطهير المخرجات هو خط دفاعك الأخير. طهّر دائماً لسياق المخرج المحدد (HTML، markdown، JSON) وتحقق من تسريبات PII والمحتوى الضار وكشف محث النظام. :::

اختبار

الوحدة 4: بناء حواجز المدخلات والمخرجات

خذ الاختبار