الإنسان في الحلقة وأنماط الإنتاج
مراقبة LangSmith
تطبيقات LangGraph الإنتاجية تحتاج رؤية عميقة في سلوك الوكيل. يوفر LangSmith تكاملاً أصلياً للتتبع والمراقبة وتصحيح الأخطاء في سير عمل LangGraph. يغطي هذا الدرس أنماط المراقبة على مستوى الإنتاج بما في ذلك الشروحات المخصصة وحلقات التغذية الراجعة والتنبيهات.
لماذا المراقبة مهمة
| التحدي | الحل |
|---|---|
| فشل غامض | تتبع كامل لكل عقدة، استدعاء LLM، وانتقال حالة |
| مشاكل الأداء | تحليل التأخير حسب العقدة، تحديد الاختناقات |
| التحكم في التكلفة | تتبع الرموز لكل طلب، مستخدم، وميزة |
| ضمان الجودة | ربط تغذية المستخدم الراجعة بتتبعات محددة |
| التصحيح | إعادة تشغيل أي تنفيذ بالمدخلات الدقيقة |
سيناريو حقيقي (يناير 2026): AI خدمة عملاء كان يفشل بشكل غامض في الساعة 3 صباحاً. تتبعات LangSmith كشفت: تجاوز حد الرموز في المحادثات الطويلة. تم العثور على السبب الجذري في 10 دقائق. تم الإصلاح بالتلخيص عند 80% من نافذة السياق. تقليل وقت التوقف من 4 ساعات إلى 15 دقيقة.
إعداد التتبع التلقائي
يتتبع LangSmith تنفيذات LangGraph تلقائياً عند التكوين عبر متغيرات البيئة.
import os
from typing import TypedDict, Annotated
import operator
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
# ============================================================
# تمكين تتبع LANGSMITH
# ============================================================
# عيّن هذه قبل استيراد LangGraph/LangChain
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "ls_your_api_key_here"
os.environ["LANGCHAIN_PROJECT"] = "research-agent-production"
# اختياري: تكوين إضافي
os.environ["LANGCHAIN_ENDPOINT"] = "https://api.smith.langchain.com" # الافتراضي
os.environ["LANGCHAIN_TAGS"] = "production,v2.1.0" # وسوم لجميع التشغيلات
# ============================================================
# تعريف الحالة والرسم البياني
# ============================================================
class ResearchState(TypedDict):
"""حالة لسير عمل البحث."""
query: str
documents: Annotated[list[str], operator.add]
analysis: str
iteration: int
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
def search_documents(state: ResearchState) -> dict:
"""البحث عن المستندات ذات الصلة."""
query = state["query"]
# بحث محاكى - في الإنتاج، اتصل بقاعدة بيانات متجهة
docs = [f"مستند عن {query} - نتيجة {i}" for i in range(3)]
return {"documents": docs}
def analyze_documents(state: ResearchState) -> dict:
"""تحليل المستندات مع LLM."""
docs = state["documents"]
prompt = f"""حلل هذه المستندات وقدم رؤى:
المستندات:
{docs}
قدم تحليلاً شاملاً."""
response = llm.invoke(prompt)
return {"analysis": response.content}
# بناء الرسم البياني
builder = StateGraph(ResearchState)
builder.add_node("search", search_documents)
builder.add_node("analyze", analyze_documents)
builder.add_edge(START, "search")
builder.add_edge("search", "analyze")
builder.add_edge("analyze", END)
app = builder.compile()
# ============================================================
# التنفيذ مع التتبع التلقائي
# ============================================================
# كل استدعاء متتبع بالكامل
result = app.invoke({"query": "اتجاهات AI 2026", "documents": [], "iteration": 0})
# التتبع يُظهر:
# - الجدول الزمني الكامل لتنفيذ الرسم البياني
# - مدخلات/مخرجات كل عقدة
# - استدعاءات LLM مع المطالبات/الإكمالات
# - استخدام الرموز والتأخير
# - لقطات الحالة في كل خطوة
شروحات Span المخصصة
أضف بيانات وصفية غنية للتتبعات لتحسين التصحيح والتحليل.
from langsmith import traceable
from langsmith.run_trees import RunTree
import langsmith
# ============================================================
# مزخرف @TRACEABLE الأساسي
# ============================================================
@traceable(name="research_agent", tags=["agent", "research"])
def research_node(state: ResearchState) -> dict:
"""البحث مع التتبع المخصص."""
results = search_documents(state["query"])
return {"documents": results}
@traceable(name="llm_analysis", run_type="llm")
def analyze_with_llm(documents: list[str]) -> str:
"""استدعاء LLM مع التصنيف الصحيح."""
# run_type="llm" يصنف هذا لتتبع التكلفة
return llm.invoke(f"حلل: {documents}").content
@traceable(name="tool_search", run_type="tool")
def tool_search(query: str) -> list[str]:
"""استدعاء أداة مع التصنيف الصحيح."""
# run_type="tool" لعمليات الأدوات/الاسترجاع
return [f"نتيجة لـ {query}"]
# ============================================================
# متقدم: البيانات الوصفية والسياق المخصص
# ============================================================
@traceable(
name="advanced_research",
tags=["agent", "research", "v2"],
metadata={"version": "2.0", "model": "gpt-4o-mini"}
)
def advanced_research_node(state: ResearchState) -> dict:
"""البحث مع تتبع البيانات الوصفية المفصل."""
query = state["query"]
existing_docs = state.get("documents", [])
# إضافة بيانات وصفية وقت التشغيل للتتبع
langsmith.set_trace_tags(
tags=[f"query_length_{len(query)}"]
)
# البحث مع تتبع السياق
results = search_documents(query)
# تسجيل المقاييس المخصصة
langsmith.log_metadata({
"query": query,
"doc_count_before": len(existing_docs),
"doc_count_after": len(existing_docs) + len(results),
"search_source": "vector_db",
"iteration": state.get("iteration", 0)
})
return {"documents": results}
# ============================================================
# التتبع مع سياق الخطأ
# ============================================================
@traceable(name="safe_llm_call", run_type="llm")
def safe_llm_call(prompt: str, max_retries: int = 3) -> str:
"""استدعاء LLM مع تتبع الأخطاء."""
for attempt in range(max_retries):
try:
response = llm.invoke(prompt)
langsmith.log_metadata({
"attempt": attempt + 1,
"success": True
})
return response.content
except Exception as e:
langsmith.log_metadata({
"attempt": attempt + 1,
"error": str(e),
"error_type": type(e).__name__
})
if attempt == max_retries - 1:
raise
return "" # يجب ألا يصل إلى هنا
# ============================================================
# تتبع العمليات غير المتزامنة
# ============================================================
@traceable(name="async_research", run_type="chain")
async def async_research_node(state: ResearchState) -> dict:
"""البحث غير المتزامن مع التتبع الصحيح."""
import asyncio
async def search_source(source: str) -> list[str]:
# محاكاة بحث غير متزامن
await asyncio.sleep(0.1)
return [f"نتيجة من {source}"]
# عمليات البحث المتوازية متتبعة كـ spans فرعية
results = await asyncio.gather(
search_source("web"),
search_source("database"),
search_source("archive")
)
all_docs = [doc for source_docs in results for doc in source_docs]
return {"documents": all_docs}
لوحة تحكم المراقبة الإنتاجية
المقاييس والاستعلامات الرئيسية لمراقبة LangGraph في الإنتاج.
from langsmith import Client
from datetime import datetime, timedelta
import json
client = Client()
# ============================================================
# المقاييس الرئيسية للتتبع
# ============================================================
"""
1. التأخير حسب العقدة
- p50، p95، p99 تأخير لكل عقدة
- تحديد العقد البطيئة والاختناقات
- تعيين تنبيهات لـ p95 > العتبة
2. استخدام الرموز
- تتبع حسب التشغيل، المستخدم، الميزة
- تخصيص التكلفة والميزانية
- كشف الشذوذ (ارتفاعات مفاجئة)
3. معدلات الخطأ
- حسب العقدة، حسب نوع الخطأ
- تنبيه تلقائي على الارتفاعات
- الربط مع المدخلات
4. معايير النجاح
- مقيّمات مخصصة للجودة الخاصة بالمجال
- تكامل التغذية الراجعة البشرية
- مقارنات اختبار A/B
5. الإنتاجية
- طلبات في الدقيقة حسب نقطة النهاية
- أعماق الطوابير للمعالجة غير المتزامنة
- تخطيط السعة
"""
# ============================================================
# استعلامات لوحة التحكم البرمجية
# ============================================================
def get_latency_metrics(project_name: str, hours: int = 24) -> dict:
"""الحصول على مقاييس التأخير لمشروع."""
runs = client.list_runs(
project_name=project_name,
start_time=datetime.now() - timedelta(hours=hours),
execution_order=1 # التشغيلات العلوية فقط
)
latencies = []
for run in runs:
if run.end_time and run.start_time:
latency = (run.end_time - run.start_time).total_seconds()
latencies.append(latency)
if not latencies:
return {"p50": 0, "p95": 0, "p99": 0}
latencies.sort()
n = len(latencies)
return {
"p50": latencies[int(n * 0.5)],
"p95": latencies[int(n * 0.95)],
"p99": latencies[int(n * 0.99)] if n > 100 else latencies[-1],
"count": n
}
def get_error_breakdown(project_name: str, hours: int = 24) -> dict:
"""الحصول على تفصيل الأخطاء حسب النوع."""
runs = client.list_runs(
project_name=project_name,
start_time=datetime.now() - timedelta(hours=hours),
error=True # التشغيلات الفاشلة فقط
)
error_counts = {}
for run in runs:
error_type = run.error.split(":")[0] if run.error else "Unknown"
error_counts[error_type] = error_counts.get(error_type, 0) + 1
return error_counts
def get_token_usage(project_name: str, hours: int = 24) -> dict:
"""الحصول على ملخص استخدام الرموز."""
runs = client.list_runs(
project_name=project_name,
start_time=datetime.now() - timedelta(hours=hours),
run_type="llm" # تشغيلات LLM فقط
)
total_tokens = 0
prompt_tokens = 0
completion_tokens = 0
for run in runs:
if run.extra and "token_usage" in run.extra:
usage = run.extra["token_usage"]
total_tokens += usage.get("total_tokens", 0)
prompt_tokens += usage.get("prompt_tokens", 0)
completion_tokens += usage.get("completion_tokens", 0)
return {
"total_tokens": total_tokens,
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"estimated_cost_usd": total_tokens * 0.00002 # تقدير تقريبي
}
جمع التغذية الراجعة والتقييم
ربط تغذية المستخدم الراجعة بالتتبعات لمراقبة الجودة.
from langsmith import Client
from langsmith.evaluation import evaluate
from typing import Optional
import uuid
client = Client()
# ============================================================
# جمع التغذية الراجعة الأساسي
# ============================================================
def collect_user_feedback(
run_id: str,
score: float,
comment: str = "",
feedback_type: str = "user_rating"
) -> None:
"""إرفاق تغذية المستخدم الراجعة بتتبع."""
client.create_feedback(
run_id=run_id,
key=feedback_type,
score=score, # 0.0 إلى 1.0
comment=comment
)
def collect_thumbs_feedback(run_id: str, thumbs_up: bool) -> None:
"""تغذية راجعة بسيطة إعجاب/عدم إعجاب."""
client.create_feedback(
run_id=run_id,
key="thumbs",
score=1.0 if thumbs_up else 0.0
)
def collect_categorical_feedback(
run_id: str,
category: str,
subcategory: Optional[str] = None
) -> None:
"""تغذية راجعة تصنيفية للتصنيف."""
client.create_feedback(
run_id=run_id,
key="category",
value=category,
comment=subcategory
)
# ============================================================
# التقاط معرف التشغيل أثناء التنفيذ
# ============================================================
def execute_with_feedback_capture(app, inputs: dict, config: dict) -> tuple:
"""تنفيذ سير العمل والتقاط معرف التشغيل للتغذية الراجعة."""
# الطريقة 1: استخدام callbacks
from langchain_core.callbacks import BaseCallbackHandler
class RunIDCapture(BaseCallbackHandler):
def __init__(self):
self.run_id = None
def on_chain_start(self, serialized, inputs, run_id, **kwargs):
if self.run_id is None: # التقاط معرف التشغيل العلوي
self.run_id = str(run_id)
callback = RunIDCapture()
result = app.invoke(inputs, config={"callbacks": [callback]})
return result, callback.run_id
# الطريقة 2: استخدام سياق شجرة التشغيل
def execute_with_run_tree(app, inputs: dict) -> tuple:
"""التنفيذ مع شجرة تشغيل صريحة لالتقاط المعرف."""
from langsmith.run_trees import RunTree
run_tree = RunTree(
name="research_workflow",
run_type="chain",
inputs=inputs
)
with run_tree:
result = app.invoke(inputs)
run_tree.end(outputs=result)
run_tree.post()
return result, str(run_tree.id)
# ============================================================
# تكامل التغذية الراجعة في الإنتاج
# ============================================================
class FeedbackCollector:
"""جمع التغذية الراجعة الإنتاجي مع التجميع."""
def __init__(self, project_name: str):
self.client = Client()
self.project_name = project_name
self.pending_feedback = []
self.batch_size = 10
def add_feedback(
self,
run_id: str,
feedback_type: str,
score: Optional[float] = None,
value: Optional[str] = None,
comment: str = ""
) -> None:
"""إضافة تغذية راجعة للدفعة."""
self.pending_feedback.append({
"run_id": run_id,
"key": feedback_type,
"score": score,
"value": value,
"comment": comment
})
if len(self.pending_feedback) >= self.batch_size:
self.flush()
def flush(self) -> None:
"""إرسال جميع التغذية الراجعة المعلقة."""
for feedback in self.pending_feedback:
try:
self.client.create_feedback(**feedback)
except Exception as e:
print(f"فشل إرسال التغذية الراجعة: {e}")
self.pending_feedback = []
def collect_detailed_feedback(
self,
run_id: str,
accuracy: float,
helpfulness: float,
relevance: float,
comment: str = ""
) -> None:
"""جمع تغذية راجعة متعددة الأبعاد."""
dimensions = {
"accuracy": accuracy,
"helpfulness": helpfulness,
"relevance": relevance
}
for dimension, score in dimensions.items():
self.add_feedback(
run_id=run_id,
feedback_type=dimension,
score=score
)
if comment:
self.add_feedback(
run_id=run_id,
feedback_type="comment",
value=comment
)
تكوين التنبيهات
إعداد تنبيهات الإنتاج لسير عمل LangGraph.
from langsmith import Client
from typing import Callable
import asyncio
from datetime import datetime, timedelta
# ============================================================
# أمثلة تنبيهات LANGSMITH
# ============================================================
"""
تنبيه: تأخير عالي
- الشرط: p95_latency > 30s
- النافذة: 5 دقائق
- الإجراء: إشعار Slack لـ #alerts-ai
تنبيه: ارتفاع الأخطاء
- الشرط: error_rate > 5%
- النافذة: 15 دقيقة
- المقارنة: مقابل خط الأساس لـ 24 ساعة السابقة
- الإجراء: حادثة PagerDuty
تنبيه: عتبة التكلفة
- الشرط: daily_token_cost > $100
- النافذة: 24 ساعة متحركة
- الإجراء: بريد للفريق + Slack
تنبيه: جودة منخفضة
- الشرط: avg(user_rating) < 0.7
- النافذة: ساعة واحدة
- الحد الأدنى للعينات: 10
- الإجراء: Slack + إنشاء تذكرة JIRA
"""
# ============================================================
# التنبيه المخصص باستخدام LANGSMITH API
# ============================================================
class AlertMonitor:
"""مراقبة التنبيهات المخصصة باستخدام LangSmith API."""
def __init__(self, project_name: str):
self.client = Client()
self.project_name = project_name
self.alert_handlers: dict[str, Callable] = {}
def register_alert(self, name: str, handler: Callable) -> None:
"""تسجيل معالج تنبيه."""
self.alert_handlers[name] = handler
async def check_latency_alert(
self,
threshold_seconds: float,
window_minutes: int = 5
) -> bool:
"""التحقق مما إذا كان التأخير يتجاوز العتبة."""
runs = list(self.client.list_runs(
project_name=self.project_name,
start_time=datetime.now() - timedelta(minutes=window_minutes),
execution_order=1
))
if not runs:
return False
latencies = []
for run in runs:
if run.end_time and run.start_time:
latency = (run.end_time - run.start_time).total_seconds()
latencies.append(latency)
if not latencies:
return False
latencies.sort()
p95_index = int(len(latencies) * 0.95)
p95_latency = latencies[p95_index] if p95_index < len(latencies) else latencies[-1]
if p95_latency > threshold_seconds:
if "high_latency" in self.alert_handlers:
await self.alert_handlers["high_latency"](
p95_latency=p95_latency,
threshold=threshold_seconds,
sample_size=len(latencies)
)
return True
return False
async def check_error_rate_alert(
self,
threshold_percent: float,
window_minutes: int = 15
) -> bool:
"""التحقق مما إذا كان معدل الخطأ يتجاوز العتبة."""
start_time = datetime.now() - timedelta(minutes=window_minutes)
all_runs = list(self.client.list_runs(
project_name=self.project_name,
start_time=start_time,
execution_order=1
))
error_runs = list(self.client.list_runs(
project_name=self.project_name,
start_time=start_time,
execution_order=1,
error=True
))
if not all_runs:
return False
error_rate = (len(error_runs) / len(all_runs)) * 100
if error_rate > threshold_percent:
if "error_spike" in self.alert_handlers:
await self.alert_handlers["error_spike"](
error_rate=error_rate,
threshold=threshold_percent,
error_count=len(error_runs),
total_count=len(all_runs)
)
return True
return False
async def run_continuous_monitoring(self, interval_seconds: int = 60):
"""تشغيل حلقة المراقبة المستمرة."""
while True:
await self.check_latency_alert(threshold_seconds=30)
await self.check_error_rate_alert(threshold_percent=5)
await asyncio.sleep(interval_seconds)
أسئلة المقابلة
س: كيف يتكامل LangSmith مع LangGraph؟
"تلقائياً عند تمكين
LANGCHAIN_TRACING_V2. كل تنفيذ عقدة، تحديث حالة، واستدعاء LLM متتبع بدون تغييرات في الكود. الشروحات المخصصة مع مزخرف@traceableتضيف بيانات وصفية ووسوم. لوحات التحكم الإنتاجية تُظهر التأخير حسب العقدة، واستخدام الرموز لتتبع التكلفة، ومعدلات الخطأ. يمكنك التصفية والتحليل حسب أي حقل بيانات وصفية."
س: ما المقاييس المهمة لتطبيقات LangGraph الإنتاجية؟
"خمسة مجالات رئيسية: (1) تأخير p50/p95/p99 حسب العقدة لتحديد الاختناقات، (2) استخدام الرموز لكل طلب/مستخدم/ميزة للتحكم في التكلفة، (3) معدلات الخطأ حسب العقدة ونوع الخطأ للموثوقية، (4) أحجام نقاط التفتيش لتخطيط التخزين، و(5) درجات تغذية المستخدم الراجعة للجودة. المقيّمات المخصصة تضيف مقاييس جودة خاصة بالمجال. جميع المقاييس يجب أن يكون لها تنبيهات بعتبات مناسبة."
س: كيف تنفذ حلقات التغذية الراجعة لتحسين الجودة؟
"التقط معرف التشغيل أثناء التنفيذ باستخدام callbacks أو سياق شجرة التشغيل. أعرض نقطة نهاية تغذية راجعة في API الخاص بك تستدعي
client.create_feedback()وتربط التقييمات بالتتبعات. استخدم تغذية راجعة متعددة الأبعاد (الدقة، الفائدة، الصلة) للتحليل المفصل. شغّل مقيّمات آلية وفق جدول لاكتشاف تراجعات الجودة. اربط التغذية الراجعة بأنماط المدخلات لتحديد مناطق المشاكل."
س: كيف تُعد التنبيهات لـ LangGraph في الإنتاج؟
"استخدم تنبيهات LangSmith المدمجة للأنماط الشائعة: تأخير p95، ارتفاعات معدل الخطأ، عتبات التكلفة. للتنبيهات المخصصة، استقصِ LangSmith API دورياً وتحقق من الشروط برمجياً. تكامل مع Slack، PagerDuty، أو OpsGenie للإشعارات. عيّن عتبات مناسبة بناءً على أداء خط الأساس--تجنب إرهاق التنبيهات باستخدام نوافذ مناسبة ومتطلبات حد أدنى للعينات."
النقاط الرئيسية
- التتبع التلقائي عبر متغيرات البيئة - لا حاجة لتغييرات في الكود
- مزخرف @traceable يضيف بيانات وصفية ووسوم وأنواع تشغيل للتتبعات
- أنواع التشغيل (llm, chain, tool) تصنف العمليات للتحليل الصحيح
- استعلامات برمجية باستخدام LangSmith Client للوحات تحكم مخصصة
- جمع التغذية الراجعة يربط تقييمات المستخدم بتتبعات محددة
- تغذية راجعة متعددة الأبعاد (الدقة، الفائدة، الصلة) لتتبع الجودة المفصل
- التقييم الآلي يشغل فحوصات الجودة على حركة الإنتاج
- تنبيهات مخصصة باستخدام LangSmith API مع تكاملات الإشعارات
:::