الإنسان في الحلقة وأنماط الإنتاج
استراتيجيات النشر
نقل تطبيق LangGraph من التطوير إلى الإنتاج يتطلب دراسة متأنية للبنية التحتية والتوسع والموثوقية والاهتمامات التشغيلية. يغطي هذا الدرس المشهد الكامل للنشر - من المنصات السحابية المُدارة إلى الحلول المستضافة ذاتياً عبر خيارات البنية التحتية المختلفة.
السياق الواقعي
قصة إنتاج يناير 2026: شركة ناشئة في التكنولوجيا المالية بنت خط أنابيب معالجة مستندات مع 5 وكلاء متخصصين. رحلتهم: بدأوا بمنصة LangGraph لإطلاق MVP في يومين. بعد الوصول إلى 50,000 مستند/يوم والحاجة لضوابط امتثال محددة، انتقلوا إلى Kubernetes مستضاف ذاتياً مع PostgresSaver. استغرق الانتقال أسبوعين لكنه منحهم امتثال إقامة البيانات وتخفيض التكلفة بنسبة 40% على نطاق واسع.
إطار قرار النشر:
فريق صغير، إطلاق سريع → منصة LangGraph
متطلبات امتثال → استضافة ذاتية
اقتصاديات الحجم مهمة → Kubernetes مستضاف ذاتياً
احتياجات هجينة → منصة LangGraph + عمال مستضافين ذاتياً
منصة LangGraph (السحابة المُدارة)
توفر منصة LangGraph بيئة نشر مُدارة بالكامل مع صفر تكوين للبنية التحتية، توسع تلقائي، استمرارية مدمجة، ورصد متكامل.
نظرة عامة على بنية المنصة
┌─────────────────────────────────────────────────────────────────┐
│ منصة LangGraph │
├─────────────────────────────────────────────────────────────────┤
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ بوابة API │ │ موازن الحمل │ │ إنهاء SSL │ │
│ └────────┬────────┘ └────────┬────────┘ └────────┬────────┘ │
│ │ │ │ │
│ ┌────────▼────────────────────▼────────────────────▼────────┐ │
│ │ طبقة الحوسبة ذاتية التوسع │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ عامل 1 │ │ عامل 2 │ │ عامل 3 │ │ عامل N │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │
│ └────────────────────────────┬──────────────────────────────┘ │
│ │ │
│ ┌────────────────────────────▼──────────────────────────────┐ │
│ │ طبقة الاستمرارية المُدارة │ │
│ │ ┌─────────────────┐ ┌─────────────────────────────┐ │ │
│ │ │ قاعدة التفتيش │ │ تخزين الكائنات (المرفقات) │ │ │
│ │ └─────────────────┘ └─────────────────────────────┘ │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ رصد LangSmith المتكامل │ │
│ └───────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
تكوين المشروع
أنشئ ملف langgraph.json في جذر مشروعك لتكوين النشر:
{
"graphs": {
"research_agent": "./src/graphs/research_graph.py:app",
"document_processor": "./src/graphs/document_graph.py:app",
"customer_support": "./src/graphs/support_graph.py:app"
},
"env": ".env.production",
"python_version": "3.11",
"dependencies": [
".",
"langchain-openai>=0.1.0",
"langchain-anthropic>=0.1.0",
"langgraph>=0.2.0",
"httpx>=0.25.0",
"pydantic>=2.0.0"
],
"pip_config": {
"extra_index_url": "https://my-private-pypi.example.com/simple/"
}
}
سير عمل النشر الكامل
# تثبيت CLI الخاص بـ LangGraph
pip install langgraph-cli
# التحقق من التكوين محلياً
langgraph validate
# اختبار محلياً قبل النشر
langgraph dev
# النشر إلى منصة LangGraph
langgraph deploy --project my-research-agent
# النشر مع بيئة محددة
langgraph deploy --project my-research-agent --env production
# التحقق من حالة النشر
langgraph status --project my-research-agent
# عرض سجلات النشر
langgraph logs --project my-research-agent --follow
# التراجع إلى الإصدار السابق إذا لزم الأمر
langgraph rollback --project my-research-agent --version v1.2.3
استدعاء الرسوم البيانية المنشورة
بمجرد النشر، تصبح الرسوم البيانية متاحة عبر REST API:
"""
كود العميل لاستدعاء نشرات منصة LangGraph.
"""
import httpx
from typing import Any
class LangGraphClient:
"""عميل للتفاعل مع نشرات منصة LangGraph."""
def __init__(self, project_url: str, api_key: str):
self.base_url = project_url
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
self.client = httpx.AsyncClient(
base_url=self.base_url,
headers=self.headers,
timeout=300.0 # مهلة 5 دقائق للرسوم البيانية طويلة التشغيل
)
async def invoke(
self,
graph_name: str,
input_data: dict[str, Any],
thread_id: str,
config: dict[str, Any] | None = None
) -> dict[str, Any]:
"""
استدعاء رسم بياني بشكل متزامن وانتظار النتيجة.
المعاملات:
graph_name: اسم الرسم البياني للاستدعاء
input_data: حالة الإدخال للرسم البياني
thread_id: معرف الخيط للاستمرارية
config: تكوين إضافي اختياري
يعيد:
الحالة النهائية من تنفيذ الرسم البياني
"""
payload = {
"input": input_data,
"config": {
"configurable": {"thread_id": thread_id},
**(config or {})
}
}
response = await self.client.post(
f"/graphs/{graph_name}/invoke",
json=payload
)
response.raise_for_status()
return response.json()
async def stream(
self,
graph_name: str,
input_data: dict[str, Any],
thread_id: str
):
"""
بث أحداث تنفيذ الرسم البياني.
يُنتج الأحداث عند حدوثها أثناء تنفيذ الرسم البياني.
"""
payload = {
"input": input_data,
"config": {
"configurable": {"thread_id": thread_id}
}
}
async with self.client.stream(
"POST",
f"/graphs/{graph_name}/stream",
json=payload
) as response:
async for line in response.aiter_lines():
if line.startswith("data: "):
import json
event = json.loads(line[6:])
yield event
async def get_state(
self,
graph_name: str,
thread_id: str
) -> dict[str, Any]:
"""الحصول على الحالة الحالية لخيط."""
response = await self.client.get(
f"/graphs/{graph_name}/state",
params={"thread_id": thread_id}
)
response.raise_for_status()
return response.json()
async def update_state(
self,
graph_name: str,
thread_id: str,
values: dict[str, Any],
as_node: str | None = None
) -> dict[str, Any]:
"""تحديث الحالة لخيط (مفيد للإنسان في الحلقة)."""
payload = {
"values": values,
"as_node": as_node
}
response = await self.client.post(
f"/graphs/{graph_name}/state",
params={"thread_id": thread_id},
json=payload
)
response.raise_for_status()
return response.json()
async def resume(
self,
graph_name: str,
thread_id: str,
resume_data: Any
) -> dict[str, Any]:
"""استئناف رسم بياني متوقف مع إدخال بشري."""
from langgraph.types import Command
payload = {
"input": Command(resume=resume_data),
"config": {
"configurable": {"thread_id": thread_id}
}
}
response = await self.client.post(
f"/graphs/{graph_name}/invoke",
json=payload
)
response.raise_for_status()
return response.json()
# مثال الاستخدام
async def main():
client = LangGraphClient(
project_url="https://my-research-agent.langgraph.app",
api_key="lgp_xxxxx"
)
# استدعاء رسم البحث البياني
result = await client.invoke(
graph_name="research_agent",
input_data={"query": "أحدث التطورات في الحوسبة الكمية"},
thread_id="user-123-session-456"
)
print(f"نتيجة البحث: {result}")
# بث الأحداث للتحديثات في الوقت الفعلي
async for event in client.stream(
graph_name="research_agent",
input_data={"query": "بحث سلامة الذكاء الاصطناعي 2026"},
thread_id="user-123-session-789"
):
print(f"حدث: {event}")
الاستضافة الذاتية مع FastAPI
للفرق التي تتطلب تحكماً كاملاً في البنية التحتية أو إقامة البيانات أو تحسين التكلفة على نطاق واسع، يوفر النشر المستضاف ذاتياً مع FastAPI أساساً قوياً.
تطبيق FastAPI جاهز للإنتاج
"""
نشر FastAPI للإنتاج لتطبيقات LangGraph.
الميزات:
- تجميع الاتصالات مع asyncpg
- معالجة الإيقاف السلس
- فحوصات الصحة والجاهزية
- التحقق من الطلبات مع Pydantic
- تسجيل منظم
- معالجة الأخطاء والاستعادة
"""
from fastapi import FastAPI, HTTPException, BackgroundTasks, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from contextlib import asynccontextmanager
from pydantic import BaseModel, Field
from typing import Any
import asyncpg
import asyncio
import logging
import json
import os
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
from langgraph.types import Command
# استيراد تعريفات الرسوم البيانية
from src.graphs.research_graph import research_graph
from src.graphs.document_graph import document_graph
# تكوين التسجيل المنظم
logging.basicConfig(
level=logging.INFO,
format='{"time": "%(asctime)s", "level": "%(levelname)s", "message": "%(message)s"}'
)
logger = logging.getLogger(__name__)
# تكوين قاعدة البيانات
DATABASE_URL = os.getenv("DATABASE_URL")
POOL_MIN_SIZE = int(os.getenv("POOL_MIN_SIZE", "5"))
POOL_MAX_SIZE = int(os.getenv("POOL_MAX_SIZE", "20"))
# تجمع الاتصالات العالمي
pool: asyncpg.Pool | None = None
async def init_database_pool() -> asyncpg.Pool:
"""تهيئة تجمع اتصالات قاعدة البيانات بإعدادات مناسبة."""
return await asyncpg.create_pool(
DATABASE_URL,
min_size=POOL_MIN_SIZE,
max_size=POOL_MAX_SIZE,
command_timeout=60,
max_inactive_connection_lifetime=300,
)
async def setup_checkpointer_tables(pool: asyncpg.Pool):
"""ضمان وجود جداول نقاط التفتيش في قاعدة البيانات."""
async with pool.acquire() as conn:
# AsyncPostgresSaver سينشئ الجداول تلقائياً،
# لكن يمكننا التحقق من الاتصال هنا
await conn.execute("SELECT 1")
logger.info("تم التحقق من اتصال قاعدة البيانات")
@asynccontextmanager
async def lifespan(app: FastAPI):
"""
مدير دورة حياة التطبيق.
يعالج البدء (إنشاء تجمع الاتصالات) والإيقاف (التنظيف).
"""
global pool
# البدء
logger.info("بدء التطبيق...")
pool = await init_database_pool()
await setup_checkpointer_tables(pool)
logger.info(f"تم تهيئة تجمع قاعدة البيانات مع {POOL_MIN_SIZE}-{POOL_MAX_SIZE} اتصال")
yield
# الإيقاف
logger.info("إيقاف التطبيق...")
if pool:
await pool.close()
logger.info("تم إغلاق تجمع قاعدة البيانات")
# إنشاء تطبيق FastAPI
app = FastAPI(
title="LangGraph Production API",
version="1.0.0",
lifespan=lifespan
)
# تكوين CORS
app.add_middleware(
CORSMiddleware,
allow_origins=os.getenv("ALLOWED_ORIGINS", "*").split(","),
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# نماذج الطلب/الاستجابة
class InvokeRequest(BaseModel):
"""نموذج الطلب لاستدعاء الرسم البياني."""
input: dict[str, Any] = Field(..., description="حالة الإدخال للرسم البياني")
thread_id: str = Field(..., description="معرف الخيط للاستمرارية")
config: dict[str, Any] | None = Field(None, description="تكوين إضافي")
class ResumeRequest(BaseModel):
"""نموذج الطلب لاستئناف الرسوم البيانية المتوقفة."""
thread_id: str = Field(..., description="معرف الخيط للاستئناف")
resume_data: Any = Field(..., description="البيانات للاستئناف بها")
class StateUpdateRequest(BaseModel):
"""نموذج الطلب لتحديثات الحالة."""
thread_id: str = Field(..., description="معرف الخيط للتحديث")
values: dict[str, Any] = Field(..., description="قيم الحالة للتحديث")
as_node: str | None = Field(None, description="العقدة لنسب التحديث إليها")
class InvokeResponse(BaseModel):
"""نموذج الاستجابة لاستدعاء الرسم البياني."""
result: dict[str, Any]
thread_id: str
class HealthResponse(BaseModel):
"""استجابة فحص الصحة."""
status: str
version: str
database: str
# سجل الرسوم البيانية - يربط الأسماء بالرسوم البيانية المجمعة
GRAPHS = {
"research": research_graph,
"document": document_graph,
}
def get_compiled_graph(graph_name: str, checkpointer):
"""الحصول على رسم بياني مجمع مع حافظ نقاط التفتيش مرفق."""
if graph_name not in GRAPHS:
raise HTTPException(
status_code=404,
detail=f"الرسم البياني '{graph_name}' غير موجود. المتاحة: {list(GRAPHS.keys())}"
)
return GRAPHS[graph_name].compile(checkpointer=checkpointer)
# نقاط نهاية فحص الصحة
@app.get("/health", response_model=HealthResponse)
async def health_check():
"""
فحص صحة أساسي - يتحقق من تشغيل التطبيق.
يُستخدم بواسطة موازنات الحمل لفحوصات الحياة الأساسية.
"""
return HealthResponse(
status="healthy",
version="1.0.0",
database="not_checked"
)
@app.get("/ready", response_model=HealthResponse)
async def readiness_check():
"""
فحص الجاهزية - يتحقق من توفر جميع التبعيات.
يُستخدم بواسطة Kubernetes لتحديد ما إذا كان يجب أن يستقبل pod حركة المرور.
"""
# فحص اتصال قاعدة البيانات
db_status = "error"
try:
async with pool.acquire() as conn:
await conn.execute("SELECT 1")
db_status = "connected"
except Exception as e:
logger.error(f"فشل فحص صحة قاعدة البيانات: {e}")
raise HTTPException(
status_code=503,
detail=f"قاعدة البيانات غير متاحة: {str(e)}"
)
return HealthResponse(
status="ready",
version="1.0.0",
database=db_status
)
# نقاط نهاية استدعاء الرسم البياني
@app.post("/graphs/{graph_name}/invoke", response_model=InvokeResponse)
async def invoke_graph(graph_name: str, request: InvokeRequest):
"""
استدعاء رسم بياني بشكل متزامن.
تنتظر نقطة النهاية هذه اكتمال الرسم البياني وتعيد الحالة النهائية.
للرسوم البيانية طويلة التشغيل، ضع في اعتبارك استخدام نقطة نهاية البث.
"""
logger.info(f"استدعاء الرسم البياني '{graph_name}' للخيط '{request.thread_id}'")
try:
async with AsyncPostgresSaver.from_conn_string(DATABASE_URL) as checkpointer:
await checkpointer.setup() # ضمان وجود الجداول
compiled_graph = get_compiled_graph(graph_name, checkpointer)
config = {
"configurable": {
"thread_id": request.thread_id,
**(request.config.get("configurable", {}) if request.config else {})
},
**(request.config or {})
}
result = await compiled_graph.ainvoke(request.input, config)
logger.info(f"اكتمل الرسم البياني '{graph_name}' للخيط '{request.thread_id}'")
return InvokeResponse(
result=result,
thread_id=request.thread_id
)
except Exception as e:
logger.error(f"فشل استدعاء الرسم البياني: {e}", exc_info=True)
raise HTTPException(
status_code=500,
detail=f"فشل تنفيذ الرسم البياني: {str(e)}"
)
@app.post("/graphs/{graph_name}/stream")
async def stream_graph(graph_name: str, request: InvokeRequest):
"""
بث أحداث تنفيذ الرسم البياني.
يعيد Server-Sent Events (SSE) للتحديثات في الوقت الفعلي أثناء التنفيذ.
"""
logger.info(f"بث الرسم البياني '{graph_name}' للخيط '{request.thread_id}'")
async def event_generator():
try:
async with AsyncPostgresSaver.from_conn_string(DATABASE_URL) as checkpointer:
await checkpointer.setup()
compiled_graph = get_compiled_graph(graph_name, checkpointer)
config = {
"configurable": {"thread_id": request.thread_id}
}
async for event in compiled_graph.astream_events(
request.input,
config,
version="v2"
):
yield f"data: {json.dumps(event)}\n\n"
yield "data: {\"event\": \"done\"}\n\n"
except Exception as e:
logger.error(f"خطأ في البث: {e}", exc_info=True)
yield f"data: {json.dumps({'error': str(e)})}\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
}
)
@app.post("/graphs/{graph_name}/resume", response_model=InvokeResponse)
async def resume_graph(graph_name: str, request: ResumeRequest):
"""
استئناف تنفيذ رسم بياني متوقف.
استخدم هذا بعد استدعاء interrupt() لتوفير إدخال بشري ومتابعة التنفيذ.
"""
logger.info(f"استئناف الرسم البياني '{graph_name}' للخيط '{request.thread_id}'")
try:
async with AsyncPostgresSaver.from_conn_string(DATABASE_URL) as checkpointer:
await checkpointer.setup()
compiled_graph = get_compiled_graph(graph_name, checkpointer)
config = {
"configurable": {"thread_id": request.thread_id}
}
# استخدم Command للاستئناف مع البيانات
result = await compiled_graph.ainvoke(
Command(resume=request.resume_data),
config
)
logger.info(f"تم استئناف الرسم البياني '{graph_name}' للخيط '{request.thread_id}'")
return InvokeResponse(
result=result,
thread_id=request.thread_id
)
except Exception as e:
logger.error(f"فشل الاستئناف: {e}", exc_info=True)
raise HTTPException(
status_code=500,
detail=f"فشل الاستئناف: {str(e)}"
)
# تشغيل مع: uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4
تشغيل تطبيق FastAPI
# التطوير
uvicorn main:app --reload --host 0.0.0.0 --port 8000
# الإنتاج مع عمال متعددين
uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4 --loop uvloop
# مع Gunicorn لمزيد من التحكم
gunicorn main:app -w 4 -k uvicorn.workers.UvicornWorker -b 0.0.0.0:8000
نشر Docker
توفر الحاويات اتساقاً عبر البيئات وتبسط تنسيق النشر.
Dockerfile للإنتاج
# Dockerfile
# بناء متعدد المراحل لصورة إنتاج أصغر
# مرحلة البناء
FROM python:3.11-slim as builder
WORKDIR /app
# تثبيت تبعيات البناء
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
libpq-dev \
&& rm -rf /var/lib/apt/lists/*
# إنشاء بيئة افتراضية
RUN python -m venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"
# تثبيت تبعيات Python
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && \
pip install --no-cache-dir -r requirements.txt
# مرحلة الإنتاج
FROM python:3.11-slim as production
WORKDIR /app
# تثبيت تبعيات وقت التشغيل فقط
RUN apt-get update && apt-get install -y --no-install-recommends \
libpq5 \
curl \
&& rm -rf /var/lib/apt/lists/* \
&& useradd --create-home --shell /bin/bash appuser
# نسخ البيئة الافتراضية من البناء
COPY /opt/venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"
# نسخ كود التطبيق
COPY . .
# التبديل إلى مستخدم غير جذري
USER appuser
# فحص الصحة
HEALTHCHECK \
CMD curl -f http://localhost:8000/health || exit 1
# كشف المنفذ
EXPOSE 8000
# تشغيل التطبيق
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
ملف المتطلبات
# requirements.txt
langgraph>=0.2.0
langchain>=0.2.0
langchain-openai>=0.1.0
langchain-anthropic>=0.1.0
fastapi>=0.109.0
uvicorn[standard]>=0.27.0
asyncpg>=0.29.0
pydantic>=2.5.0
httpx>=0.25.0
python-dotenv>=1.0.0
structlog>=24.0.0
Docker Compose للمجموعة الكاملة
# docker-compose.yml
version: '3.8'
services:
# تطبيق LangGraph
langgraph-app:
build:
context: .
dockerfile: Dockerfile
target: production
ports:
- "8000:8000"
environment:
- DATABASE_URL=postgresql://langgraph:langgraph_password@db:5432/langgraph
- REDIS_URL=redis://redis:6379/0
- OPENAI_API_KEY=${OPENAI_API_KEY}
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
- LANGCHAIN_TRACING_V2=true
- LANGCHAIN_API_KEY=${LANGCHAIN_API_KEY}
- LANGCHAIN_PROJECT=langgraph-production
- POOL_MIN_SIZE=5
- POOL_MAX_SIZE=20
depends_on:
db:
condition: service_healthy
redis:
condition: service_healthy
deploy:
replicas: 3
resources:
limits:
cpus: '2'
memory: 2G
reservations:
cpus: '0.5'
memory: 512M
restart_policy:
condition: on-failure
delay: 5s
max_attempts: 3
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 10s
networks:
- langgraph-network
# قاعدة بيانات PostgreSQL
db:
image: postgres:16-alpine
volumes:
- pgdata:/var/lib/postgresql/data
- ./init-db.sql:/docker-entrypoint-initdb.d/init.sql:ro
environment:
- POSTGRES_DB=langgraph
- POSTGRES_USER=langgraph
- POSTGRES_PASSWORD=langgraph_password
healthcheck:
test: ["CMD-SHELL", "pg_isready -U langgraph -d langgraph"]
interval: 10s
timeout: 5s
retries: 5
deploy:
resources:
limits:
cpus: '1'
memory: 1G
networks:
- langgraph-network
# Redis للتخزين المؤقت وتحديد المعدل
redis:
image: redis:7-alpine
volumes:
- redisdata:/data
command: redis-server --appendonly yes --maxmemory 256mb --maxmemory-policy allkeys-lru
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 5s
retries: 5
deploy:
resources:
limits:
cpus: '0.5'
memory: 512M
networks:
- langgraph-network
# وكيل Nginx العكسي
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf:ro
- ./ssl:/etc/nginx/ssl:ro
depends_on:
- langgraph-app
networks:
- langgraph-network
volumes:
pgdata:
redisdata:
networks:
langgraph-network:
driver: bridge
أوامر Docker
# بناء وبدء جميع الخدمات
docker-compose up -d --build
# عرض السجلات
docker-compose logs -f langgraph-app
# توسيع التطبيق أفقياً
docker-compose up -d --scale langgraph-app=5
# تحديث متدرج (بدون توقف)
docker-compose up -d --no-deps --build langgraph-app
# إيقاف جميع الخدمات
docker-compose down
# إيقاف وإزالة الأحجام (تحذير: يحذف البيانات)
docker-compose down -v
نشر Kubernetes
للنشر على مستوى المؤسسات، يوفر Kubernetes تنسيقاً متقدماً وتوسعاً تلقائياً وقدرات شفاء ذاتي.
بيانات Kubernetes الكاملة
# k8s/namespace.yaml
apiVersion: v1
kind: Namespace
metadata:
name: langgraph
labels:
name: langgraph
---
# k8s/configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: langgraph-config
namespace: langgraph
data:
POOL_MIN_SIZE: "5"
POOL_MAX_SIZE: "20"
LANGCHAIN_TRACING_V2: "true"
LANGCHAIN_PROJECT: "langgraph-production"
LOG_LEVEL: "INFO"
---
# k8s/secrets.yaml
apiVersion: v1
kind: Secret
metadata:
name: langgraph-secrets
namespace: langgraph
type: Opaque
stringData:
DATABASE_URL: postgresql://langgraph:password@postgres-service:5432/langgraph
OPENAI_API_KEY: sk-xxxxx
ANTHROPIC_API_KEY: sk-ant-xxxxx
LANGCHAIN_API_KEY: lsv2_xxxxx
---
# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: langgraph-agent
namespace: langgraph
labels:
app: langgraph-agent
version: v1
spec:
replicas: 3
strategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 1
maxUnavailable: 0
selector:
matchLabels:
app: langgraph-agent
template:
metadata:
labels:
app: langgraph-agent
version: v1
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "8000"
prometheus.io/path: "/metrics"
spec:
serviceAccountName: langgraph-sa
securityContext:
runAsNonRoot: true
runAsUser: 1000
fsGroup: 1000
containers:
- name: langgraph
image: my-registry/langgraph-agent:v1.0.0
imagePullPolicy: Always
ports:
- name: http
containerPort: 8000
protocol: TCP
envFrom:
- configMapRef:
name: langgraph-config
- secretRef:
name: langgraph-secrets
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "2000m"
livenessProbe:
httpGet:
path: /health
port: http
initialDelaySeconds: 10
periodSeconds: 30
timeoutSeconds: 5
failureThreshold: 3
readinessProbe:
httpGet:
path: /ready
port: http
initialDelaySeconds: 5
periodSeconds: 10
timeoutSeconds: 5
failureThreshold: 3
startupProbe:
httpGet:
path: /health
port: http
initialDelaySeconds: 5
periodSeconds: 5
failureThreshold: 30
volumeMounts:
- name: tmp
mountPath: /tmp
volumes:
- name: tmp
emptyDir: {}
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
podAffinityTerm:
labelSelector:
matchLabels:
app: langgraph-agent
topologyKey: kubernetes.io/hostname
---
# k8s/hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: langgraph-hpa
namespace: langgraph
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: langgraph-agent
minReplicas: 3
maxReplicas: 50
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
behavior:
scaleUp:
stabilizationWindowSeconds: 60
policies:
- type: Percent
value: 100
periodSeconds: 60
- type: Pods
value: 4
periodSeconds: 60
selectPolicy: Max
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Percent
value: 25
periodSeconds: 60
---
# k8s/pdb.yaml
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
name: langgraph-pdb
namespace: langgraph
spec:
minAvailable: 2
selector:
matchLabels:
app: langgraph-agent
أوامر نشر Kubernetes
# تطبيق جميع البيانات
kubectl apply -f k8s/
# التحقق من حالة النشر
kubectl -n langgraph get pods -w
# عرض السجلات
kubectl -n langgraph logs -f deployment/langgraph-agent
# التوسع يدوياً
kubectl -n langgraph scale deployment/langgraph-agent --replicas=10
# التحقق من حالة HPA
kubectl -n langgraph get hpa
# إعادة التشغيل المتدرج
kubectl -n langgraph rollout restart deployment/langgraph-agent
# عرض تاريخ النشر
kubectl -n langgraph rollout history deployment/langgraph-agent
# التراجع إلى الإصدار السابق
kubectl -n langgraph rollout undo deployment/langgraph-agent
مراقبة الإنتاج والتنبيه
نقطة نهاية مقاييس Prometheus
"""
إضافة مقاييس Prometheus إلى تطبيق FastAPI الخاص بك.
"""
from prometheus_client import Counter, Histogram, Gauge, generate_latest
from fastapi import Request
from fastapi.responses import PlainTextResponse
import time
# تعريف المقاييس
REQUEST_COUNT = Counter(
'langgraph_requests_total',
'إجمالي عدد الطلبات',
['method', 'endpoint', 'status']
)
REQUEST_LATENCY = Histogram(
'langgraph_request_latency_seconds',
'زمن استجابة الطلب بالثواني',
['method', 'endpoint'],
buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0, 120.0]
)
GRAPH_EXECUTIONS = Counter(
'langgraph_graph_executions_total',
'إجمالي تنفيذات الرسم البياني',
['graph_name', 'status']
)
GRAPH_EXECUTION_TIME = Histogram(
'langgraph_graph_execution_seconds',
'وقت تنفيذ الرسم البياني بالثواني',
['graph_name'],
buckets=[1.0, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0, 600.0]
)
ACTIVE_THREADS = Gauge(
'langgraph_active_threads',
'عدد خيوط سير العمل النشطة'
)
DB_POOL_SIZE = Gauge(
'langgraph_db_pool_size',
'حجم تجمع اتصالات قاعدة البيانات',
['state']
)
# البرمجية الوسيطة للمقاييس التلقائية
@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
latency = time.time() - start_time
REQUEST_COUNT.labels(
method=request.method,
endpoint=request.url.path,
status=response.status_code
).inc()
REQUEST_LATENCY.labels(
method=request.method,
endpoint=request.url.path
).observe(latency)
return response
# نقطة نهاية المقاييس
@app.get("/metrics")
async def metrics():
"""نقطة نهاية مقاييس Prometheus."""
# تحديث مقاييس التجمع
if pool:
DB_POOL_SIZE.labels(state="free").set(pool.get_idle_size())
DB_POOL_SIZE.labels(state="used").set(pool.get_size() - pool.get_idle_size())
return PlainTextResponse(
generate_latest(),
media_type="text/plain"
)
قواعد التنبيه لـ Prometheus
# prometheus-rules.yaml
groups:
- name: langgraph-alerts
rules:
- alert: LangGraphHighErrorRate
expr: |
rate(langgraph_requests_total{status=~"5.."}[5m])
/ rate(langgraph_requests_total[5m]) > 0.05
for: 5m
labels:
severity: critical
annotations:
summary: "معدل أخطاء مرتفع مكتشف"
description: "معدل الخطأ أعلى من 5% خلال الدقائق 5 الأخيرة"
- alert: LangGraphSlowResponses
expr: |
histogram_quantile(0.95, rate(langgraph_request_latency_seconds_bucket[5m])) > 30
for: 5m
labels:
severity: warning
annotations:
summary: "أوقات استجابة بطيئة"
description: "الشريحة المئوية 95 للزمن أعلى من 30 ثانية"
- alert: LangGraphDatabasePoolExhausted
expr: |
langgraph_db_pool_size{state="free"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "استنفد تجمع اتصالات قاعدة البيانات"
description: "لا توجد اتصالات قاعدة بيانات متاحة"
مصفوفة قرار النشر
| العامل | منصة LangGraph | Docker مستضاف ذاتياً | K8s مستضاف ذاتياً |
|---|---|---|---|
| وقت الإعداد | ساعات | أيام | أسابيع |
| تكلفة البنية التحتية | على أساس الاستخدام | ثابت + استخدام | ثابت + استخدام |
| التوسع | تلقائي | يدوي/compose | تلقائي (HPA) |
| إقامة البيانات | محدود | تحكم كامل | تحكم كامل |
| الامتثال (SOC2, HIPAA) | يعتمد على المنصة | تحكم كامل | تحكم كامل |
| التخصيص | محدود | معتدل | كامل |
| العبء التشغيلي | لا يوجد | معتدل | عالي |
| الأفضل لـ | الشركات الناشئة، MVPs | الفرق الصغيرة | المؤسسات |
أسئلة المقابلة
س: متى تختار منصة LangGraph مقابل Kubernetes المستضاف ذاتياً؟
"منصة LangGraph للنشر السريع عندما يكون الوقت للسوق أهم من التحكم في البنية التحتية - الشركات الناشئة، MVPs، الفرق الصغيرة. Kubernetes المستضاف ذاتياً عندما تحتاج: امتثال إقامة البيانات (GDPR, HIPAA)، متطلبات أمان محددة، تحسين التكلفة على نطاق واسع (ما يتجاوز ~10,000 دولار/شهر تكاليف المنصة)، أو تكاملات بنية تحتية مخصصة. الانتقال عادة يحدث عندما يتجاوز الإنفاق الشهري 8-10 آلاف دولار أو تتطلب متطلبات الامتثال التحكم في البيانات."
س: كيف تضمن نشرات بدون توقف لتطبيقات LangGraph؟
"ثلاث استراتيجيات رئيسية: أولاً، النشر المتدرج مع فحوصات الجاهزية المناسبة - Kubernetes يوجه حركة المرور فقط إلى pods التي تجتاز فحوصات الصحة. ثانياً، تصريف الاتصالات - اضبط terminationGracePeriodSeconds للسماح للطلبات الجارية بالاكتمال (مهم بشكل خاص للتنفيذات طويلة الرسوم البيانية). ثالثاً، الاستعادة القائمة على نقاط التفتيش - إذا مات pod أثناء التنفيذ، يمكن لـ pod آخر الاستئناف من آخر نقطة تفتيش. PodDisruptionBudgets تضمن الحد الأدنى من التوفر أثناء الاضطرابات الطوعية."
س: ما استراتيجية قاعدة البيانات الخاصة بك لنشرات LangGraph عالية الإنتاجية؟
"AsyncPostgresSaver مع تجميع الاتصالات ضروري - استخدم asyncpg مع أحجام تجمع مضبوطة لعبء عملك (عادة 5-20 اتصال لكل pod). للإنتاجية العالية جداً، ضع في اعتبارك النسخ المتماثلة للقراءة لاستعلامات الحالة. قسّم جداول نقاط التفتيش حسب التاريخ للتنظيف الفعال. استخدم pg_partman لإدارة الأقسام التلقائية. راقب استنفاد تجمع الاتصالات وزمن استجابة الاستعلام. للمقياس الشديد، ضع في اعتبارك التقسيم حسب thread_id عبر قواعد بيانات متعددة."
س: كيف تتعامل مع إدارة الأسرار في نشرات LangGraph للإنتاج؟
"لا تضمّن الأسرار أبداً في الصور. استخدم Kubernetes Secrets (مشفرة في الراحة مع KMS)، أو أفضل، مديري الأسرار الخارجيين مثل HashiCorp Vault أو AWS Secrets Manager أو GCP Secret Manager. External Secrets Operator يزامن الأسرار الخارجية مع Kubernetes. قم بتدوير مفاتيح API بانتظام. استخدم هوية عبء العمل حيثما أمكن - pods تفترض أدوار IAM السحابية بدلاً من استخدام بيانات اعتماد ثابتة."
النقاط الرئيسية
| خيار النشر | أفضل حالة استخدام | الاعتبار الرئيسي |
|---|---|---|
| منصة LangGraph | النشر السريع، البنية التحتية المُدارة | صفر عبء تشغيلي |
| FastAPI مستضاف ذاتياً | تحكم كامل، متطلبات مخصصة | تجميع الاتصالات، أنماط غير متزامنة |
| Docker Compose | التطوير، إنتاج صغير | فحوصات الصحة، حدود الموارد |
| Kubernetes | مقياس المؤسسات، التوسع التلقائي | HPA، PDB، فحوصات مناسبة |
| هجين | متطلبات معقدة | المنصة للتدفقات البسيطة، K8s للحساسة |
قائمة فحص الإنتاج الحرجة:
- حافظات نقاط التفتيش غير المتزامنة (AsyncPostgresSaver) للاستمرارية غير المحجوبة
- تجميع الاتصالات مع التحجيم المناسب
- فحوصات الصحة والجاهزية
- معالجة الإيقاف السلس
- مقاييس Prometheus للرصد
- PodDisruptionBudgets للتوفر
- سياسات الشبكة للأمان
- النشر المتدرج لصفر توقف
اكتملت الوحدة 5 - جاهز لاختبار الوحدة!
:::