الدرس 20 من 23

الإنتاج والموثوقية

استراتيجيات النشر

3 دقيقة للقراءة

نشر أنظمة الذكاء الاصطناعي يتطلب استراتيجيات دقيقة لتقليل المخاطر مع تعظيم التعلم. يغطي هذا الدرس أنماط النشر الخاصة بتطبيقات LLM.

النشر الأزرق-الأخضر

تشغيل بيئتين متطابقتين، تبديل الحركة فوراً:

┌─────────────────────────────────────────────────────────────┐
│                     موزع الحمل                               │
└─────────────────────────────────────────────────────────────┘
              │                           │
              │ (100%)                    │ (0%)
              ▼                           ▼
┌──────────────────────┐     ┌──────────────────────┐
│    البيئة الزرقاء    │     │   البيئة الخضراء     │
│   (الحالية v1.2)     │     │   (الجديدة v1.3)     │
│                      │     │                      │
│   - الوكيل v1.2      │     │   - الوكيل v1.3      │
│   - الموجهات v5      │     │   - الموجهات v6      │
│   - النموذج: GPT-4   │     │   - النموذج: GPT-4   │
└──────────────────────┘     └──────────────────────┘
class BlueGreenDeployer:
    def __init__(self, load_balancer, environments: dict):
        self.lb = load_balancer
        self.environments = environments
        self.active = "blue"

    async def deploy_new_version(self, version: str):
        """النشر للبيئة غير النشطة والتبديل."""
        inactive = "green" if self.active == "blue" else "blue"

        # الخطوة 1: النشر للبيئة غير النشطة
        await self.environments[inactive].deploy(version)

        # الخطوة 2: تشغيل فحوصات الصحة
        health = await self.environments[inactive].health_check()
        if not health["healthy"]:
            raise DeploymentError(f"فشل فحص الصحة: {health}")

        # الخطوة 3: تشغيل اختبارات الدخان
        smoke = await self.run_smoke_tests(inactive)
        if not smoke["passed"]:
            raise DeploymentError(f"فشلت اختبارات الدخان: {smoke}")

        # الخطوة 4: تبديل الحركة
        await self.lb.set_weights({
            self.active: 0,
            inactive: 100
        })

        self.active = inactive
        return {"status": "success", "active": self.active}

    async def rollback(self):
        """التراجع الفوري للبيئة السابقة."""
        inactive = "green" if self.active == "blue" else "blue"

        await self.lb.set_weights({
            self.active: 0,
            inactive: 100
        })

        self.active = inactive
        return {"status": "rolled_back", "active": self.active}

نشر الكناري

تحويل الحركة تدريجياً للتحقق من النسخة الجديدة:

import asyncio

class CanaryDeployer:
    def __init__(self, load_balancer, metrics_client):
        self.lb = load_balancer
        self.metrics = metrics_client

    async def canary_deploy(
        self,
        new_version: str,
        stages: list = None,
        validation_period: int = 300  # 5 دقائق لكل مرحلة
    ):
        """الطرح التدريجي مع التحقق التلقائي."""
        stages = stages or [5, 10, 25, 50, 100]  # النسبة المئوية

        for percentage in stages:
            # تحويل الحركة
            await self.lb.set_weights({
                "stable": 100 - percentage,
                "canary": percentage
            })

            # الانتظار وجمع المقاييس
            await asyncio.sleep(validation_period)

            # التحقق من صحة الكناري
            validation = await self.validate_canary()

            if not validation["passed"]:
                # التراجع التلقائي
                await self.lb.set_weights({
                    "stable": 100,
                    "canary": 0
                })
                return {
                    "status": "rolled_back",
                    "failed_at": percentage,
                    "reason": validation["reason"]
                }

        return {"status": "success", "version": new_version}

    async def validate_canary(self) -> dict:
        """مقارنة مقاييس الكناري مع المستقر."""
        stable_metrics = await self.metrics.get("stable", window="5m")
        canary_metrics = await self.metrics.get("canary", window="5m")

        checks = {
            "error_rate": canary_metrics["error_rate"] <= stable_metrics["error_rate"] * 1.1,
            "latency_p95": canary_metrics["latency_p95"] <= stable_metrics["latency_p95"] * 1.2,
            "user_satisfaction": canary_metrics.get("satisfaction", 1.0) >= 0.9
        }

        failed = [k for k, v in checks.items() if not v]

        return {
            "passed": len(failed) == 0,
            "reason": failed if failed else None
        }

نشر الظل

اختبار النسخة الجديدة بحركة حقيقية، بدون تأثير على المستخدم:

import asyncio
from typing import Any

class ShadowDeployer:
    def __init__(self, production_agent, shadow_agent, comparator):
        self.production = production_agent
        self.shadow = shadow_agent
        self.comparator = comparator
        self.results = []

    async def handle_request(self, request: dict) -> Any:
        """معالجة الطلب في الإنتاج، والظل في الخلفية."""

        # استجابة الإنتاج (تُرجع للمستخدم)
        production_task = asyncio.create_task(
            self.production.run(request)
        )

        # استجابة الظل (لا تُرجع للمستخدم)
        shadow_task = asyncio.create_task(
            self.shadow.run(request)
        )

        # انتظار الإنتاج (واجهة المستخدم)
        production_response = await production_task

        # لا تنتظر الظل، لكن التقط عندما يجهز
        asyncio.create_task(
            self._compare_results(request, production_response, shadow_task)
        )

        return production_response

    async def _compare_results(
        self,
        request: dict,
        production_response: Any,
        shadow_task: asyncio.Task
    ):
        """مقارنة استجابات الإنتاج والظل."""
        try:
            shadow_response = await asyncio.wait_for(shadow_task, timeout=60)

            comparison = await self.comparator.compare(
                request=request,
                production=production_response,
                shadow=shadow_response
            )

            self.results.append(comparison)

            # تسجيل الاختلافات الكبيرة
            if comparison["divergence_score"] > 0.3:
                await self.log_divergence(comparison)

        except asyncio.TimeoutError:
            self.results.append({"error": "shadow_timeout"})

    def get_shadow_report(self) -> dict:
        """توليد تقرير عن أداء الظل."""
        if not self.results:
            return {"status": "no_data"}

        valid_results = [r for r in self.results if "error" not in r]

        return {
            "total_requests": len(self.results),
            "successful_shadows": len(valid_results),
            "avg_divergence": sum(r["divergence_score"] for r in valid_results) / len(valid_results),
            "shadow_better_count": sum(1 for r in valid_results if r["shadow_preferred"]),
            "ready_for_promotion": self._assess_readiness(valid_results)
        }

أعلام الميزات للذكاء الاصطناعي

التحكم بميزات الذكاء الاصطناعي ديناميكياً:

class AIFeatureFlags:
    def __init__(self, flag_service):
        self.flags = flag_service

    async def get_config(self, user_id: str, context: dict = None) -> dict:
        """الحصول على تكوين الذكاء الاصطناعي بناءً على أعلام الميزات."""
        config = {
            "model": "gpt-3.5-turbo",  # الافتراضي
            "max_tokens": 1000,
            "tools_enabled": [],
            "features": {}
        }

        # فحص علم ترقية النموذج
        if await self.flags.is_enabled("gpt4_rollout", user_id):
            config["model"] = "gpt-4"
            config["max_tokens"] = 4000

        # فحص علم الأداة الجديدة
        if await self.flags.is_enabled("code_execution_tool", user_id):
            config["tools_enabled"].append("code_executor")

        # فحص الميزات التجريبية
        if await self.flags.is_enabled("streaming_responses", user_id):
            config["features"]["streaming"] = True

        # الطرح بالنسبة المئوية
        if await self.flags.percentage_enabled("new_prompt_v2", user_id, 25):
            config["prompt_version"] = "v2"
        else:
            config["prompt_version"] = "v1"

        return config

# الاستخدام
flags = AIFeatureFlags(flag_service)

async def handle_request(user_id: str, request: dict):
    config = await flags.get_config(user_id)

    agent = create_agent(
        model=config["model"],
        tools=config["tools_enabled"],
        prompt_version=config["prompt_version"]
    )

    return await agent.run(request)

قائمة فحص النشر

deployment_checklist = {
    "قبل_النشر": [
        "تشغيل مجموعة الاختبارات الكاملة",
        "التحقق من نجاح اختبارات انحدار الموجهات",
        "فحص توافق إصدار النموذج",
        "مراجعة تقديرات تأثير التكلفة",
        "إعداد خطة التراجع"
    ],
    "أثناء_النشر": [
        "مراقبة معدلات الأخطاء (< عتبة 1%)",
        "مشاهدة زمن الاستجابة P95 (< 2x الأساس)",
        "فحص استخدام الرموز (ضمن الميزانية)",
        "التحقق من نشاط مرشحات الأمان",
        "اختبار مسارات المستخدم الحرجة"
    ],
    "بعد_النشر": [
        "مراجعة ملاحظات المستخدم",
        "تحليل التكلفة لكل طلب",
        "فحص تسريب الموجهات",
        "التحقق من عمل التسجيل",
        "تحديث كتب التشغيل إذا لزم"
    ]
}

نصيحة للمقابلة

عند مناقشة النشر:

  1. تقييم المخاطر - ما الذي يمكن أن يخطئ في هذا النشر؟
  2. وقت التراجع - كم السرعة التي يمكنك الرجوع بها؟
  3. مقاييس التحقق - كيف تعرف أن النشر نجح؟
  4. مراقبة التكلفة - النسخة الجديدة قد تستخدم رموزاً أكثر

مع اكتمال معرفة الإنتاج، لنطبق كل شيء على دراسات حالة المقابلات الحقيقية. :::

اختبار

الوحدة 5: الإنتاج والموثوقية

خذ الاختبار