الإنسان في الحلقة وأنماط الإنتاج

استراتيجيات النشر

5 دقيقة للقراءة

نقل تطبيق LangGraph من التطوير إلى الإنتاج يتطلب دراسة متأنية للبنية التحتية والتوسع والموثوقية والاهتمامات التشغيلية. يغطي هذا الدرس المشهد الكامل للنشر - من المنصات السحابية المُدارة إلى الحلول المستضافة ذاتياً عبر خيارات البنية التحتية المختلفة.

السياق الواقعي

قصة إنتاج يناير 2026: شركة ناشئة في التكنولوجيا المالية بنت خط أنابيب معالجة مستندات مع 5 وكلاء متخصصين. رحلتهم: بدأوا بمنصة LangGraph لإطلاق MVP في يومين. بعد الوصول إلى 50,000 مستند/يوم والحاجة لضوابط امتثال محددة، انتقلوا إلى Kubernetes مستضاف ذاتياً مع PostgresSaver. استغرق الانتقال أسبوعين لكنه منحهم امتثال إقامة البيانات وتخفيض التكلفة بنسبة 40% على نطاق واسع.

إطار قرار النشر:

فريق صغير، إطلاق سريع → منصة LangGraph
متطلبات امتثال → استضافة ذاتية
اقتصاديات الحجم مهمة → Kubernetes مستضاف ذاتياً
احتياجات هجينة → منصة LangGraph + عمال مستضافين ذاتياً

منصة LangGraph (السحابة المُدارة)

توفر منصة LangGraph بيئة نشر مُدارة بالكامل مع صفر تكوين للبنية التحتية، توسع تلقائي، استمرارية مدمجة، ورصد متكامل.

نظرة عامة على بنية المنصة

┌─────────────────────────────────────────────────────────────────┐
│                    منصة LangGraph                                │
├─────────────────────────────────────────────────────────────────┤
│  ┌─────────────────┐  ┌─────────────────┐  ┌─────────────────┐  │
│  │   بوابة API     │  │  موازن الحمل    │  │   إنهاء SSL    │  │
│  └────────┬────────┘  └────────┬────────┘  └────────┬────────┘  │
│           │                    │                    │            │
│  ┌────────▼────────────────────▼────────────────────▼────────┐  │
│  │              طبقة الحوسبة ذاتية التوسع                     │  │
│  │  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐   │  │
│  │  │ عامل 1  │  │ عامل 2  │  │ عامل 3  │  │ عامل N  │   │  │
│  │  └──────────┘  └──────────┘  └──────────┘  └──────────┘   │  │
│  └────────────────────────────┬──────────────────────────────┘  │
│                               │                                  │
│  ┌────────────────────────────▼──────────────────────────────┐  │
│  │              طبقة الاستمرارية المُدارة                     │  │
│  │  ┌─────────────────┐      ┌─────────────────────────────┐ │  │
│  │  │ قاعدة التفتيش  │      │ تخزين الكائنات (المرفقات)   │ │  │
│  │  └─────────────────┘      └─────────────────────────────┘ │  │
│  └───────────────────────────────────────────────────────────┘  │
│                                                                  │
│  ┌───────────────────────────────────────────────────────────┐  │
│  │           رصد LangSmith المتكامل                           │  │
│  └───────────────────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────────────────┘

تكوين المشروع

أنشئ ملف langgraph.json في جذر مشروعك لتكوين النشر:

{
    "graphs": {
        "research_agent": "./src/graphs/research_graph.py:app",
        "document_processor": "./src/graphs/document_graph.py:app",
        "customer_support": "./src/graphs/support_graph.py:app"
    },
    "env": ".env.production",
    "python_version": "3.11",
    "dependencies": [
        ".",
        "langchain-openai>=0.1.0",
        "langchain-anthropic>=0.1.0",
        "langgraph>=0.2.0",
        "httpx>=0.25.0",
        "pydantic>=2.0.0"
    ],
    "pip_config": {
        "extra_index_url": "https://my-private-pypi.example.com/simple/"
    }
}

سير عمل النشر الكامل

# تثبيت CLI الخاص بـ LangGraph
pip install langgraph-cli

# التحقق من التكوين محلياً
langgraph validate

# اختبار محلياً قبل النشر
langgraph dev

# النشر إلى منصة LangGraph
langgraph deploy --project my-research-agent

# النشر مع بيئة محددة
langgraph deploy --project my-research-agent --env production

# التحقق من حالة النشر
langgraph status --project my-research-agent

# عرض سجلات النشر
langgraph logs --project my-research-agent --follow

# التراجع إلى الإصدار السابق إذا لزم الأمر
langgraph rollback --project my-research-agent --version v1.2.3

استدعاء الرسوم البيانية المنشورة

بمجرد النشر، تصبح الرسوم البيانية متاحة عبر REST API:

"""
كود العميل لاستدعاء نشرات منصة LangGraph.
"""
import httpx
from typing import Any


class LangGraphClient:
    """عميل للتفاعل مع نشرات منصة LangGraph."""

    def __init__(self, project_url: str, api_key: str):
        self.base_url = project_url
        self.headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        }
        self.client = httpx.AsyncClient(
            base_url=self.base_url,
            headers=self.headers,
            timeout=300.0  # مهلة 5 دقائق للرسوم البيانية طويلة التشغيل
        )

    async def invoke(
        self,
        graph_name: str,
        input_data: dict[str, Any],
        thread_id: str,
        config: dict[str, Any] | None = None
    ) -> dict[str, Any]:
        """
        استدعاء رسم بياني بشكل متزامن وانتظار النتيجة.

        المعاملات:
            graph_name: اسم الرسم البياني للاستدعاء
            input_data: حالة الإدخال للرسم البياني
            thread_id: معرف الخيط للاستمرارية
            config: تكوين إضافي اختياري

        يعيد:
            الحالة النهائية من تنفيذ الرسم البياني
        """
        payload = {
            "input": input_data,
            "config": {
                "configurable": {"thread_id": thread_id},
                **(config or {})
            }
        }

        response = await self.client.post(
            f"/graphs/{graph_name}/invoke",
            json=payload
        )
        response.raise_for_status()
        return response.json()

    async def stream(
        self,
        graph_name: str,
        input_data: dict[str, Any],
        thread_id: str
    ):
        """
        بث أحداث تنفيذ الرسم البياني.

        يُنتج الأحداث عند حدوثها أثناء تنفيذ الرسم البياني.
        """
        payload = {
            "input": input_data,
            "config": {
                "configurable": {"thread_id": thread_id}
            }
        }

        async with self.client.stream(
            "POST",
            f"/graphs/{graph_name}/stream",
            json=payload
        ) as response:
            async for line in response.aiter_lines():
                if line.startswith("data: "):
                    import json
                    event = json.loads(line[6:])
                    yield event

    async def get_state(
        self,
        graph_name: str,
        thread_id: str
    ) -> dict[str, Any]:
        """الحصول على الحالة الحالية لخيط."""
        response = await self.client.get(
            f"/graphs/{graph_name}/state",
            params={"thread_id": thread_id}
        )
        response.raise_for_status()
        return response.json()

    async def update_state(
        self,
        graph_name: str,
        thread_id: str,
        values: dict[str, Any],
        as_node: str | None = None
    ) -> dict[str, Any]:
        """تحديث الحالة لخيط (مفيد للإنسان في الحلقة)."""
        payload = {
            "values": values,
            "as_node": as_node
        }

        response = await self.client.post(
            f"/graphs/{graph_name}/state",
            params={"thread_id": thread_id},
            json=payload
        )
        response.raise_for_status()
        return response.json()

    async def resume(
        self,
        graph_name: str,
        thread_id: str,
        resume_data: Any
    ) -> dict[str, Any]:
        """استئناف رسم بياني متوقف مع إدخال بشري."""
        from langgraph.types import Command

        payload = {
            "input": Command(resume=resume_data),
            "config": {
                "configurable": {"thread_id": thread_id}
            }
        }

        response = await self.client.post(
            f"/graphs/{graph_name}/invoke",
            json=payload
        )
        response.raise_for_status()
        return response.json()


# مثال الاستخدام
async def main():
    client = LangGraphClient(
        project_url="https://my-research-agent.langgraph.app",
        api_key="lgp_xxxxx"
    )

    # استدعاء رسم البحث البياني
    result = await client.invoke(
        graph_name="research_agent",
        input_data={"query": "أحدث التطورات في الحوسبة الكمية"},
        thread_id="user-123-session-456"
    )

    print(f"نتيجة البحث: {result}")

    # بث الأحداث للتحديثات في الوقت الفعلي
    async for event in client.stream(
        graph_name="research_agent",
        input_data={"query": "بحث سلامة الذكاء الاصطناعي 2026"},
        thread_id="user-123-session-789"
    ):
        print(f"حدث: {event}")

الاستضافة الذاتية مع FastAPI

للفرق التي تتطلب تحكماً كاملاً في البنية التحتية أو إقامة البيانات أو تحسين التكلفة على نطاق واسع، يوفر النشر المستضاف ذاتياً مع FastAPI أساساً قوياً.

تطبيق FastAPI جاهز للإنتاج

"""
نشر FastAPI للإنتاج لتطبيقات LangGraph.

الميزات:
- تجميع الاتصالات مع asyncpg
- معالجة الإيقاف السلس
- فحوصات الصحة والجاهزية
- التحقق من الطلبات مع Pydantic
- تسجيل منظم
- معالجة الأخطاء والاستعادة
"""
from fastapi import FastAPI, HTTPException, BackgroundTasks, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from contextlib import asynccontextmanager
from pydantic import BaseModel, Field
from typing import Any
import asyncpg
import asyncio
import logging
import json
import os

from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
from langgraph.types import Command

# استيراد تعريفات الرسوم البيانية
from src.graphs.research_graph import research_graph
from src.graphs.document_graph import document_graph

# تكوين التسجيل المنظم
logging.basicConfig(
    level=logging.INFO,
    format='{"time": "%(asctime)s", "level": "%(levelname)s", "message": "%(message)s"}'
)
logger = logging.getLogger(__name__)

# تكوين قاعدة البيانات
DATABASE_URL = os.getenv("DATABASE_URL")
POOL_MIN_SIZE = int(os.getenv("POOL_MIN_SIZE", "5"))
POOL_MAX_SIZE = int(os.getenv("POOL_MAX_SIZE", "20"))

# تجمع الاتصالات العالمي
pool: asyncpg.Pool | None = None


async def init_database_pool() -> asyncpg.Pool:
    """تهيئة تجمع اتصالات قاعدة البيانات بإعدادات مناسبة."""
    return await asyncpg.create_pool(
        DATABASE_URL,
        min_size=POOL_MIN_SIZE,
        max_size=POOL_MAX_SIZE,
        command_timeout=60,
        max_inactive_connection_lifetime=300,
    )


async def setup_checkpointer_tables(pool: asyncpg.Pool):
    """ضمان وجود جداول نقاط التفتيش في قاعدة البيانات."""
    async with pool.acquire() as conn:
        # AsyncPostgresSaver سينشئ الجداول تلقائياً،
        # لكن يمكننا التحقق من الاتصال هنا
        await conn.execute("SELECT 1")
        logger.info("تم التحقق من اتصال قاعدة البيانات")


@asynccontextmanager
async def lifespan(app: FastAPI):
    """
    مدير دورة حياة التطبيق.

    يعالج البدء (إنشاء تجمع الاتصالات) والإيقاف (التنظيف).
    """
    global pool

    # البدء
    logger.info("بدء التطبيق...")
    pool = await init_database_pool()
    await setup_checkpointer_tables(pool)
    logger.info(f"تم تهيئة تجمع قاعدة البيانات مع {POOL_MIN_SIZE}-{POOL_MAX_SIZE} اتصال")

    yield

    # الإيقاف
    logger.info("إيقاف التطبيق...")
    if pool:
        await pool.close()
        logger.info("تم إغلاق تجمع قاعدة البيانات")


# إنشاء تطبيق FastAPI
app = FastAPI(
    title="LangGraph Production API",
    version="1.0.0",
    lifespan=lifespan
)

# تكوين CORS
app.add_middleware(
    CORSMiddleware,
    allow_origins=os.getenv("ALLOWED_ORIGINS", "*").split(","),
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)


# نماذج الطلب/الاستجابة
class InvokeRequest(BaseModel):
    """نموذج الطلب لاستدعاء الرسم البياني."""
    input: dict[str, Any] = Field(..., description="حالة الإدخال للرسم البياني")
    thread_id: str = Field(..., description="معرف الخيط للاستمرارية")
    config: dict[str, Any] | None = Field(None, description="تكوين إضافي")


class ResumeRequest(BaseModel):
    """نموذج الطلب لاستئناف الرسوم البيانية المتوقفة."""
    thread_id: str = Field(..., description="معرف الخيط للاستئناف")
    resume_data: Any = Field(..., description="البيانات للاستئناف بها")


class StateUpdateRequest(BaseModel):
    """نموذج الطلب لتحديثات الحالة."""
    thread_id: str = Field(..., description="معرف الخيط للتحديث")
    values: dict[str, Any] = Field(..., description="قيم الحالة للتحديث")
    as_node: str | None = Field(None, description="العقدة لنسب التحديث إليها")


class InvokeResponse(BaseModel):
    """نموذج الاستجابة لاستدعاء الرسم البياني."""
    result: dict[str, Any]
    thread_id: str


class HealthResponse(BaseModel):
    """استجابة فحص الصحة."""
    status: str
    version: str
    database: str


# سجل الرسوم البيانية - يربط الأسماء بالرسوم البيانية المجمعة
GRAPHS = {
    "research": research_graph,
    "document": document_graph,
}


def get_compiled_graph(graph_name: str, checkpointer):
    """الحصول على رسم بياني مجمع مع حافظ نقاط التفتيش مرفق."""
    if graph_name not in GRAPHS:
        raise HTTPException(
            status_code=404,
            detail=f"الرسم البياني '{graph_name}' غير موجود. المتاحة: {list(GRAPHS.keys())}"
        )
    return GRAPHS[graph_name].compile(checkpointer=checkpointer)


# نقاط نهاية فحص الصحة
@app.get("/health", response_model=HealthResponse)
async def health_check():
    """
    فحص صحة أساسي - يتحقق من تشغيل التطبيق.
    يُستخدم بواسطة موازنات الحمل لفحوصات الحياة الأساسية.
    """
    return HealthResponse(
        status="healthy",
        version="1.0.0",
        database="not_checked"
    )


@app.get("/ready", response_model=HealthResponse)
async def readiness_check():
    """
    فحص الجاهزية - يتحقق من توفر جميع التبعيات.
    يُستخدم بواسطة Kubernetes لتحديد ما إذا كان يجب أن يستقبل pod حركة المرور.
    """
    # فحص اتصال قاعدة البيانات
    db_status = "error"
    try:
        async with pool.acquire() as conn:
            await conn.execute("SELECT 1")
            db_status = "connected"
    except Exception as e:
        logger.error(f"فشل فحص صحة قاعدة البيانات: {e}")
        raise HTTPException(
            status_code=503,
            detail=f"قاعدة البيانات غير متاحة: {str(e)}"
        )

    return HealthResponse(
        status="ready",
        version="1.0.0",
        database=db_status
    )


# نقاط نهاية استدعاء الرسم البياني
@app.post("/graphs/{graph_name}/invoke", response_model=InvokeResponse)
async def invoke_graph(graph_name: str, request: InvokeRequest):
    """
    استدعاء رسم بياني بشكل متزامن.

    تنتظر نقطة النهاية هذه اكتمال الرسم البياني وتعيد الحالة النهائية.
    للرسوم البيانية طويلة التشغيل، ضع في اعتبارك استخدام نقطة نهاية البث.
    """
    logger.info(f"استدعاء الرسم البياني '{graph_name}' للخيط '{request.thread_id}'")

    try:
        async with AsyncPostgresSaver.from_conn_string(DATABASE_URL) as checkpointer:
            await checkpointer.setup()  # ضمان وجود الجداول

            compiled_graph = get_compiled_graph(graph_name, checkpointer)

            config = {
                "configurable": {
                    "thread_id": request.thread_id,
                    **(request.config.get("configurable", {}) if request.config else {})
                },
                **(request.config or {})
            }

            result = await compiled_graph.ainvoke(request.input, config)

            logger.info(f"اكتمل الرسم البياني '{graph_name}' للخيط '{request.thread_id}'")

            return InvokeResponse(
                result=result,
                thread_id=request.thread_id
            )

    except Exception as e:
        logger.error(f"فشل استدعاء الرسم البياني: {e}", exc_info=True)
        raise HTTPException(
            status_code=500,
            detail=f"فشل تنفيذ الرسم البياني: {str(e)}"
        )


@app.post("/graphs/{graph_name}/stream")
async def stream_graph(graph_name: str, request: InvokeRequest):
    """
    بث أحداث تنفيذ الرسم البياني.

    يعيد Server-Sent Events (SSE) للتحديثات في الوقت الفعلي أثناء التنفيذ.
    """
    logger.info(f"بث الرسم البياني '{graph_name}' للخيط '{request.thread_id}'")

    async def event_generator():
        try:
            async with AsyncPostgresSaver.from_conn_string(DATABASE_URL) as checkpointer:
                await checkpointer.setup()

                compiled_graph = get_compiled_graph(graph_name, checkpointer)

                config = {
                    "configurable": {"thread_id": request.thread_id}
                }

                async for event in compiled_graph.astream_events(
                    request.input,
                    config,
                    version="v2"
                ):
                    yield f"data: {json.dumps(event)}\n\n"

                yield "data: {\"event\": \"done\"}\n\n"

        except Exception as e:
            logger.error(f"خطأ في البث: {e}", exc_info=True)
            yield f"data: {json.dumps({'error': str(e)})}\n\n"

    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
        }
    )


@app.post("/graphs/{graph_name}/resume", response_model=InvokeResponse)
async def resume_graph(graph_name: str, request: ResumeRequest):
    """
    استئناف تنفيذ رسم بياني متوقف.

    استخدم هذا بعد استدعاء interrupt() لتوفير إدخال بشري ومتابعة التنفيذ.
    """
    logger.info(f"استئناف الرسم البياني '{graph_name}' للخيط '{request.thread_id}'")

    try:
        async with AsyncPostgresSaver.from_conn_string(DATABASE_URL) as checkpointer:
            await checkpointer.setup()

            compiled_graph = get_compiled_graph(graph_name, checkpointer)

            config = {
                "configurable": {"thread_id": request.thread_id}
            }

            # استخدم Command للاستئناف مع البيانات
            result = await compiled_graph.ainvoke(
                Command(resume=request.resume_data),
                config
            )

            logger.info(f"تم استئناف الرسم البياني '{graph_name}' للخيط '{request.thread_id}'")

            return InvokeResponse(
                result=result,
                thread_id=request.thread_id
            )

    except Exception as e:
        logger.error(f"فشل الاستئناف: {e}", exc_info=True)
        raise HTTPException(
            status_code=500,
            detail=f"فشل الاستئناف: {str(e)}"
        )


# تشغيل مع: uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4

تشغيل تطبيق FastAPI

# التطوير
uvicorn main:app --reload --host 0.0.0.0 --port 8000

# الإنتاج مع عمال متعددين
uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4 --loop uvloop

# مع Gunicorn لمزيد من التحكم
gunicorn main:app -w 4 -k uvicorn.workers.UvicornWorker -b 0.0.0.0:8000

نشر Docker

توفر الحاويات اتساقاً عبر البيئات وتبسط تنسيق النشر.

Dockerfile للإنتاج

# Dockerfile
# بناء متعدد المراحل لصورة إنتاج أصغر

# مرحلة البناء
FROM python:3.11-slim as builder

WORKDIR /app

# تثبيت تبعيات البناء
RUN apt-get update && apt-get install -y --no-install-recommends \
    build-essential \
    libpq-dev \
    && rm -rf /var/lib/apt/lists/*

# إنشاء بيئة افتراضية
RUN python -m venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"

# تثبيت تبعيات Python
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && \
    pip install --no-cache-dir -r requirements.txt


# مرحلة الإنتاج
FROM python:3.11-slim as production

WORKDIR /app

# تثبيت تبعيات وقت التشغيل فقط
RUN apt-get update && apt-get install -y --no-install-recommends \
    libpq5 \
    curl \
    && rm -rf /var/lib/apt/lists/* \
    && useradd --create-home --shell /bin/bash appuser

# نسخ البيئة الافتراضية من البناء
COPY --from=builder /opt/venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"

# نسخ كود التطبيق
COPY --chown=appuser:appuser . .

# التبديل إلى مستخدم غير جذري
USER appuser

# فحص الصحة
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
    CMD curl -f http://localhost:8000/health || exit 1

# كشف المنفذ
EXPOSE 8000

# تشغيل التطبيق
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]

ملف المتطلبات

# requirements.txt
langgraph>=0.2.0
langchain>=0.2.0
langchain-openai>=0.1.0
langchain-anthropic>=0.1.0
fastapi>=0.109.0
uvicorn[standard]>=0.27.0
asyncpg>=0.29.0
pydantic>=2.5.0
httpx>=0.25.0
python-dotenv>=1.0.0
structlog>=24.0.0

Docker Compose للمجموعة الكاملة

# docker-compose.yml
version: '3.8'

services:
  # تطبيق LangGraph
  langgraph-app:
    build:
      context: .
      dockerfile: Dockerfile
      target: production
    ports:
      - "8000:8000"
    environment:
      - DATABASE_URL=postgresql://langgraph:langgraph_password@db:5432/langgraph
      - REDIS_URL=redis://redis:6379/0
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
      - LANGCHAIN_TRACING_V2=true
      - LANGCHAIN_API_KEY=${LANGCHAIN_API_KEY}
      - LANGCHAIN_PROJECT=langgraph-production
      - POOL_MIN_SIZE=5
      - POOL_MAX_SIZE=20
    depends_on:
      db:
        condition: service_healthy
      redis:
        condition: service_healthy
    deploy:
      replicas: 3
      resources:
        limits:
          cpus: '2'
          memory: 2G
        reservations:
          cpus: '0.5'
          memory: 512M
      restart_policy:
        condition: on-failure
        delay: 5s
        max_attempts: 3
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
      interval: 30s
      timeout: 10s
      retries: 3
      start_period: 10s
    networks:
      - langgraph-network

  # قاعدة بيانات PostgreSQL
  db:
    image: postgres:16-alpine
    volumes:
      - pgdata:/var/lib/postgresql/data
      - ./init-db.sql:/docker-entrypoint-initdb.d/init.sql:ro
    environment:
      - POSTGRES_DB=langgraph
      - POSTGRES_USER=langgraph
      - POSTGRES_PASSWORD=langgraph_password
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U langgraph -d langgraph"]
      interval: 10s
      timeout: 5s
      retries: 5
    deploy:
      resources:
        limits:
          cpus: '1'
          memory: 1G
    networks:
      - langgraph-network

  # Redis للتخزين المؤقت وتحديد المعدل
  redis:
    image: redis:7-alpine
    volumes:
      - redisdata:/data
    command: redis-server --appendonly yes --maxmemory 256mb --maxmemory-policy allkeys-lru
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 10s
      timeout: 5s
      retries: 5
    deploy:
      resources:
        limits:
          cpus: '0.5'
          memory: 512M
    networks:
      - langgraph-network

  # وكيل Nginx العكسي
  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
      - "443:443"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf:ro
      - ./ssl:/etc/nginx/ssl:ro
    depends_on:
      - langgraph-app
    networks:
      - langgraph-network

volumes:
  pgdata:
  redisdata:

networks:
  langgraph-network:
    driver: bridge

أوامر Docker

# بناء وبدء جميع الخدمات
docker-compose up -d --build

# عرض السجلات
docker-compose logs -f langgraph-app

# توسيع التطبيق أفقياً
docker-compose up -d --scale langgraph-app=5

# تحديث متدرج (بدون توقف)
docker-compose up -d --no-deps --build langgraph-app

# إيقاف جميع الخدمات
docker-compose down

# إيقاف وإزالة الأحجام (تحذير: يحذف البيانات)
docker-compose down -v

نشر Kubernetes

للنشر على مستوى المؤسسات، يوفر Kubernetes تنسيقاً متقدماً وتوسعاً تلقائياً وقدرات شفاء ذاتي.

بيانات Kubernetes الكاملة

# k8s/namespace.yaml
apiVersion: v1
kind: Namespace
metadata:
  name: langgraph
  labels:
    name: langgraph
---
# k8s/configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: langgraph-config
  namespace: langgraph
data:
  POOL_MIN_SIZE: "5"
  POOL_MAX_SIZE: "20"
  LANGCHAIN_TRACING_V2: "true"
  LANGCHAIN_PROJECT: "langgraph-production"
  LOG_LEVEL: "INFO"
---
# k8s/secrets.yaml
apiVersion: v1
kind: Secret
metadata:
  name: langgraph-secrets
  namespace: langgraph
type: Opaque
stringData:
  DATABASE_URL: postgresql://langgraph:password@postgres-service:5432/langgraph
  OPENAI_API_KEY: sk-xxxxx
  ANTHROPIC_API_KEY: sk-ant-xxxxx
  LANGCHAIN_API_KEY: lsv2_xxxxx
---
# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: langgraph-agent
  namespace: langgraph
  labels:
    app: langgraph-agent
    version: v1
spec:
  replicas: 3
  strategy:
    type: RollingUpdate
    rollingUpdate:
      maxSurge: 1
      maxUnavailable: 0
  selector:
    matchLabels:
      app: langgraph-agent
  template:
    metadata:
      labels:
        app: langgraph-agent
        version: v1
      annotations:
        prometheus.io/scrape: "true"
        prometheus.io/port: "8000"
        prometheus.io/path: "/metrics"
    spec:
      serviceAccountName: langgraph-sa
      securityContext:
        runAsNonRoot: true
        runAsUser: 1000
        fsGroup: 1000
      containers:
      - name: langgraph
        image: my-registry/langgraph-agent:v1.0.0
        imagePullPolicy: Always
        ports:
        - name: http
          containerPort: 8000
          protocol: TCP
        envFrom:
        - configMapRef:
            name: langgraph-config
        - secretRef:
            name: langgraph-secrets
        resources:
          requests:
            memory: "512Mi"
            cpu: "500m"
          limits:
            memory: "2Gi"
            cpu: "2000m"
        livenessProbe:
          httpGet:
            path: /health
            port: http
          initialDelaySeconds: 10
          periodSeconds: 30
          timeoutSeconds: 5
          failureThreshold: 3
        readinessProbe:
          httpGet:
            path: /ready
            port: http
          initialDelaySeconds: 5
          periodSeconds: 10
          timeoutSeconds: 5
          failureThreshold: 3
        startupProbe:
          httpGet:
            path: /health
            port: http
          initialDelaySeconds: 5
          periodSeconds: 5
          failureThreshold: 30
        volumeMounts:
        - name: tmp
          mountPath: /tmp
      volumes:
      - name: tmp
        emptyDir: {}
      affinity:
        podAntiAffinity:
          preferredDuringSchedulingIgnoredDuringExecution:
          - weight: 100
            podAffinityTerm:
              labelSelector:
                matchLabels:
                  app: langgraph-agent
              topologyKey: kubernetes.io/hostname
---
# k8s/hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: langgraph-hpa
  namespace: langgraph
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: langgraph-agent
  minReplicas: 3
  maxReplicas: 50
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Resource
    resource:
      name: memory
      target:
        type: Utilization
        averageUtilization: 80
  behavior:
    scaleUp:
      stabilizationWindowSeconds: 60
      policies:
      - type: Percent
        value: 100
        periodSeconds: 60
      - type: Pods
        value: 4
        periodSeconds: 60
      selectPolicy: Max
    scaleDown:
      stabilizationWindowSeconds: 300
      policies:
      - type: Percent
        value: 25
        periodSeconds: 60
---
# k8s/pdb.yaml
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
  name: langgraph-pdb
  namespace: langgraph
spec:
  minAvailable: 2
  selector:
    matchLabels:
      app: langgraph-agent

أوامر نشر Kubernetes

# تطبيق جميع البيانات
kubectl apply -f k8s/

# التحقق من حالة النشر
kubectl -n langgraph get pods -w

# عرض السجلات
kubectl -n langgraph logs -f deployment/langgraph-agent

# التوسع يدوياً
kubectl -n langgraph scale deployment/langgraph-agent --replicas=10

# التحقق من حالة HPA
kubectl -n langgraph get hpa

# إعادة التشغيل المتدرج
kubectl -n langgraph rollout restart deployment/langgraph-agent

# عرض تاريخ النشر
kubectl -n langgraph rollout history deployment/langgraph-agent

# التراجع إلى الإصدار السابق
kubectl -n langgraph rollout undo deployment/langgraph-agent

مراقبة الإنتاج والتنبيه

نقطة نهاية مقاييس Prometheus

"""
إضافة مقاييس Prometheus إلى تطبيق FastAPI الخاص بك.
"""
from prometheus_client import Counter, Histogram, Gauge, generate_latest
from fastapi import Request
from fastapi.responses import PlainTextResponse
import time

# تعريف المقاييس
REQUEST_COUNT = Counter(
    'langgraph_requests_total',
    'إجمالي عدد الطلبات',
    ['method', 'endpoint', 'status']
)

REQUEST_LATENCY = Histogram(
    'langgraph_request_latency_seconds',
    'زمن استجابة الطلب بالثواني',
    ['method', 'endpoint'],
    buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0, 120.0]
)

GRAPH_EXECUTIONS = Counter(
    'langgraph_graph_executions_total',
    'إجمالي تنفيذات الرسم البياني',
    ['graph_name', 'status']
)

GRAPH_EXECUTION_TIME = Histogram(
    'langgraph_graph_execution_seconds',
    'وقت تنفيذ الرسم البياني بالثواني',
    ['graph_name'],
    buckets=[1.0, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0, 600.0]
)

ACTIVE_THREADS = Gauge(
    'langgraph_active_threads',
    'عدد خيوط سير العمل النشطة'
)

DB_POOL_SIZE = Gauge(
    'langgraph_db_pool_size',
    'حجم تجمع اتصالات قاعدة البيانات',
    ['state']
)


# البرمجية الوسيطة للمقاييس التلقائية
@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
    start_time = time.time()

    response = await call_next(request)

    latency = time.time() - start_time

    REQUEST_COUNT.labels(
        method=request.method,
        endpoint=request.url.path,
        status=response.status_code
    ).inc()

    REQUEST_LATENCY.labels(
        method=request.method,
        endpoint=request.url.path
    ).observe(latency)

    return response


# نقطة نهاية المقاييس
@app.get("/metrics")
async def metrics():
    """نقطة نهاية مقاييس Prometheus."""
    # تحديث مقاييس التجمع
    if pool:
        DB_POOL_SIZE.labels(state="free").set(pool.get_idle_size())
        DB_POOL_SIZE.labels(state="used").set(pool.get_size() - pool.get_idle_size())

    return PlainTextResponse(
        generate_latest(),
        media_type="text/plain"
    )

قواعد التنبيه لـ Prometheus

# prometheus-rules.yaml
groups:
- name: langgraph-alerts
  rules:
  - alert: LangGraphHighErrorRate
    expr: |
      rate(langgraph_requests_total{status=~"5.."}[5m])
      / rate(langgraph_requests_total[5m]) > 0.05
    for: 5m
    labels:
      severity: critical
    annotations:
      summary: "معدل أخطاء مرتفع مكتشف"
      description: "معدل الخطأ أعلى من 5% خلال الدقائق 5 الأخيرة"

  - alert: LangGraphSlowResponses
    expr: |
      histogram_quantile(0.95, rate(langgraph_request_latency_seconds_bucket[5m])) > 30
    for: 5m
    labels:
      severity: warning
    annotations:
      summary: "أوقات استجابة بطيئة"
      description: "الشريحة المئوية 95 للزمن أعلى من 30 ثانية"

  - alert: LangGraphDatabasePoolExhausted
    expr: |
      langgraph_db_pool_size{state="free"} == 0
    for: 1m
    labels:
      severity: critical
    annotations:
      summary: "استنفد تجمع اتصالات قاعدة البيانات"
      description: "لا توجد اتصالات قاعدة بيانات متاحة"

مصفوفة قرار النشر

العامل منصة LangGraph Docker مستضاف ذاتياً K8s مستضاف ذاتياً
وقت الإعداد ساعات أيام أسابيع
تكلفة البنية التحتية على أساس الاستخدام ثابت + استخدام ثابت + استخدام
التوسع تلقائي يدوي/compose تلقائي (HPA)
إقامة البيانات محدود تحكم كامل تحكم كامل
الامتثال (SOC2, HIPAA) يعتمد على المنصة تحكم كامل تحكم كامل
التخصيص محدود معتدل كامل
العبء التشغيلي لا يوجد معتدل عالي
الأفضل لـ الشركات الناشئة، MVPs الفرق الصغيرة المؤسسات

أسئلة المقابلة

س: متى تختار منصة LangGraph مقابل Kubernetes المستضاف ذاتياً؟

"منصة LangGraph للنشر السريع عندما يكون الوقت للسوق أهم من التحكم في البنية التحتية - الشركات الناشئة، MVPs، الفرق الصغيرة. Kubernetes المستضاف ذاتياً عندما تحتاج: امتثال إقامة البيانات (GDPR, HIPAA)، متطلبات أمان محددة، تحسين التكلفة على نطاق واسع (ما يتجاوز ~10,000 دولار/شهر تكاليف المنصة)، أو تكاملات بنية تحتية مخصصة. الانتقال عادة يحدث عندما يتجاوز الإنفاق الشهري 8-10 آلاف دولار أو تتطلب متطلبات الامتثال التحكم في البيانات."

س: كيف تضمن نشرات بدون توقف لتطبيقات LangGraph؟

"ثلاث استراتيجيات رئيسية: أولاً، النشر المتدرج مع فحوصات الجاهزية المناسبة - Kubernetes يوجه حركة المرور فقط إلى pods التي تجتاز فحوصات الصحة. ثانياً، تصريف الاتصالات - اضبط terminationGracePeriodSeconds للسماح للطلبات الجارية بالاكتمال (مهم بشكل خاص للتنفيذات طويلة الرسوم البيانية). ثالثاً، الاستعادة القائمة على نقاط التفتيش - إذا مات pod أثناء التنفيذ، يمكن لـ pod آخر الاستئناف من آخر نقطة تفتيش. PodDisruptionBudgets تضمن الحد الأدنى من التوفر أثناء الاضطرابات الطوعية."

س: ما استراتيجية قاعدة البيانات الخاصة بك لنشرات LangGraph عالية الإنتاجية؟

"AsyncPostgresSaver مع تجميع الاتصالات ضروري - استخدم asyncpg مع أحجام تجمع مضبوطة لعبء عملك (عادة 5-20 اتصال لكل pod). للإنتاجية العالية جداً، ضع في اعتبارك النسخ المتماثلة للقراءة لاستعلامات الحالة. قسّم جداول نقاط التفتيش حسب التاريخ للتنظيف الفعال. استخدم pg_partman لإدارة الأقسام التلقائية. راقب استنفاد تجمع الاتصالات وزمن استجابة الاستعلام. للمقياس الشديد، ضع في اعتبارك التقسيم حسب thread_id عبر قواعد بيانات متعددة."

س: كيف تتعامل مع إدارة الأسرار في نشرات LangGraph للإنتاج؟

"لا تضمّن الأسرار أبداً في الصور. استخدم Kubernetes Secrets (مشفرة في الراحة مع KMS)، أو أفضل، مديري الأسرار الخارجيين مثل HashiCorp Vault أو AWS Secrets Manager أو GCP Secret Manager. External Secrets Operator يزامن الأسرار الخارجية مع Kubernetes. قم بتدوير مفاتيح API بانتظام. استخدم هوية عبء العمل حيثما أمكن - pods تفترض أدوار IAM السحابية بدلاً من استخدام بيانات اعتماد ثابتة."


النقاط الرئيسية

خيار النشر أفضل حالة استخدام الاعتبار الرئيسي
منصة LangGraph النشر السريع، البنية التحتية المُدارة صفر عبء تشغيلي
FastAPI مستضاف ذاتياً تحكم كامل، متطلبات مخصصة تجميع الاتصالات، أنماط غير متزامنة
Docker Compose التطوير، إنتاج صغير فحوصات الصحة، حدود الموارد
Kubernetes مقياس المؤسسات، التوسع التلقائي HPA، PDB، فحوصات مناسبة
هجين متطلبات معقدة المنصة للتدفقات البسيطة، K8s للحساسة

قائمة فحص الإنتاج الحرجة:

  • حافظات نقاط التفتيش غير المتزامنة (AsyncPostgresSaver) للاستمرارية غير المحجوبة
  • تجميع الاتصالات مع التحجيم المناسب
  • فحوصات الصحة والجاهزية
  • معالجة الإيقاف السلس
  • مقاييس Prometheus للرصد
  • PodDisruptionBudgets للتوفر
  • سياسات الشبكة للأمان
  • النشر المتدرج لصفر توقف

اكتملت الوحدة 5 - جاهز لاختبار الوحدة!

:::

اختبار

الوحدة 5: الإنسان في الحلقة وأنماط الإنتاج

خذ الاختبار