مراقبة ML والخطوات التالية
اكتشاف انحراف البيانات
3 دقيقة للقراءة
نماذج ML في الإنتاج تتدهور بصمت. البيانات التي تواجهها تتغير مع الوقت، مما يُسبب تقليل دقة التنبؤات. اكتشاف هذا الانحراف مبكراً أمر حاسم.
أنواع الانحراف
| النوع | ما يتغير | مثال |
|---|---|---|
| انحراف البيانات | توزيعات الميزات | ديموغرافيات المستخدمين تتغير |
| انحراف المفهوم | علاقة المدخل←المخرج | أنماط الاحتيال تتطور |
| انحراف التسمية | توزيع الهدف | معدل التسرب يزيد |
| انحراف التنبؤ | توزيع مخرجات النموذج | تنبؤات عالية المخاطر أكثر |
لماذا تتدهور النماذج
وقت التدريب وقت الإنتاج
┌─────────────────┐ ┌─────────────────┐
│ بيانات التدريب │ │ بيانات جديدة │
│ │ │ │
│ متوسط العمر: 35 │ انحراف │ متوسط العمر: 28 │
│ 70% حضري │ ────► │ 85% حضري │
│ متوسط الدخل: 50K│ │ متوسط الدخل: 45K│
└─────────────────┘ └─────────────────┘
│ │
▼ ▼
النموذج مُدرب تنبؤات النموذج
لهذه البيانات تدهورت على البيانات الجديدة
اكتشاف الانحراف مع Evidently
التثبيت
pip install evidently
تقرير انحراف أساسي
import pandas as pd
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset
# البيانات المرجعية (توزيع التدريب)
reference_data = pd.read_csv("training_data.csv")
# البيانات الحالية (الإنتاج)
current_data = pd.read_csv("production_batch.csv")
# أنشئ تقرير انحراف
report = Report(metrics=[DataDriftPreset()])
report.run(
reference_data=reference_data,
current_data=current_data
)
# عرض في notebook
report
# أو احفظ كـ HTML
report.save_html("drift_report.html")
نتائج اكتشاف الانحراف
# احصل على نتائج اكتشاف الانحراف كـ dict
results = report.as_dict()
# تحقق إذا انحرفت مجموعة البيانات
dataset_drift = results["metrics"][0]["result"]["dataset_drift"]
print(f"اكتُشف انحراف مجموعة البيانات: {dataset_drift}")
# تحقق من انحراف الميزات الفردية
for feature in results["metrics"][0]["result"]["drift_by_columns"]:
col = feature["column_name"]
drifted = feature["drift_detected"]
score = feature["drift_score"]
print(f"{col}: انحراف={drifted}, درجة={score:.3f}")
مقاييس الانحراف على مستوى الأعمدة
from evidently.report import Report
from evidently.metrics import (
ColumnDriftMetric,
DataDriftTable
)
report = Report(metrics=[
DataDriftTable(),
ColumnDriftMetric(column_name="age"),
ColumnDriftMetric(column_name="income"),
ColumnDriftMetric(column_name="transaction_amount"),
])
report.run(reference_data=reference, current_data=current)
اختبارات إحصائية للانحراف
اختبار Kolmogorov-Smirnov
from scipy import stats
def detect_drift_ks(reference: pd.Series, current: pd.Series, threshold: float = 0.05):
"""اكتشف الانحراف باستخدام اختبار KS للميزات الرقمية."""
statistic, p_value = stats.ks_2samp(reference, current)
drift_detected = p_value < threshold
return {
"statistic": statistic,
"p_value": p_value,
"drift_detected": drift_detected
}
# مثال
result = detect_drift_ks(
reference_data["transaction_amount"],
current_data["transaction_amount"]
)
print(f"اكتُشف انحراف: {result['drift_detected']}, p-value: {result['p_value']:.4f}")
مؤشر استقرار السكان (PSI)
import numpy as np
def calculate_psi(reference: pd.Series, current: pd.Series, bins: int = 10) -> float:
"""احسب مؤشر استقرار السكان."""
# أنشئ bins من المرجع
breakpoints = np.percentile(reference, np.linspace(0, 100, bins + 1))
breakpoints[0] = -np.inf
breakpoints[-1] = np.inf
# احسب النسب
ref_counts = np.histogram(reference, bins=breakpoints)[0] / len(reference)
cur_counts = np.histogram(current, bins=breakpoints)[0] / len(current)
# تجنب القسمة على صفر
ref_counts = np.clip(ref_counts, 0.0001, None)
cur_counts = np.clip(cur_counts, 0.0001, None)
# احسب PSI
psi = np.sum((cur_counts - ref_counts) * np.log(cur_counts / ref_counts))
return psi
# التفسير
# PSI < 0.1: لا انحراف ذو دلالة
# 0.1 <= PSI < 0.2: انحراف متوسط (تحقق)
# PSI >= 0.2: انحراف كبير (أعد التدريب)
psi = calculate_psi(reference_data["income"], current_data["income"])
print(f"PSI: {psi:.3f}")
مراقبة الانحراف الآلية
مهمة مراقبة مجدولة
# drift_monitor.py
import schedule
import time
from datetime import datetime, timedelta
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset
def load_reference_data():
"""حمّل بيانات التدريب كمرجع."""
return pd.read_parquet("s3://ml-data/reference/training.parquet")
def load_current_data(hours: int = 24):
"""حمّل بيانات الإنتاج الحديثة."""
end = datetime.now()
start = end - timedelta(hours=hours)
return pd.read_parquet(
f"s3://ml-data/predictions/{start.date()}/",
)
def check_drift():
"""شغّل اكتشاف الانحراف وأنبه إذا لزم."""
reference = load_reference_data()
current = load_current_data(hours=24)
report = Report(metrics=[DataDriftPreset()])
report.run(reference_data=reference, current_data=current)
results = report.as_dict()
drift_detected = results["metrics"][0]["result"]["dataset_drift"]
if drift_detected:
drifted_features = [
f["column_name"]
for f in results["metrics"][0]["result"]["drift_by_columns"]
if f["drift_detected"]
]
send_alert(f"اكتُشف انحراف بيانات في: {drifted_features}")
report.save_html(f"drift_report_{datetime.now().isoformat()}.html")
return drift_detected
def send_alert(message: str):
"""أرسل تنبيه عبر Slack/PagerDuty/email."""
print(f"تنبيه: {message}")
# slack.post(channel="#ml-alerts", message=message)
# شغّل كل 6 ساعات
schedule.every(6).hours.do(check_drift)
while True:
schedule.run_pending()
time.sleep(60)
مقاييس Prometheus
from prometheus_client import Gauge, start_http_server
# عرّف المقاييس
drift_score = Gauge('ml_drift_score', 'درجة انحراف البيانات', ['feature'])
drift_detected = Gauge('ml_drift_detected', 'اكتُشف انحراف بيانات', ['feature'])
def export_drift_metrics(results: dict):
"""صدّر مقاييس الانحراف لـ Prometheus."""
for feature in results["metrics"][0]["result"]["drift_by_columns"]:
col = feature["column_name"]
drift_score.labels(feature=col).set(feature["drift_score"])
drift_detected.labels(feature=col).set(int(feature["drift_detected"]))
# ابدأ خادم المقاييس
start_http_server(8000)
التعامل مع الانحراف
| الشدة | الإجراء |
|---|---|
| منخفضة (PSI < 0.1) | راقب، لا إجراء |
| متوسطة (0.1-0.2) | تحقق، وثّق |
| عالية (PSI > 0.2) | أطلق خط أنابيب إعادة التدريب |
إطلاق إعادة التدريب الآلي
def trigger_retraining_if_needed(psi_scores: dict, threshold: float = 0.2):
"""أطلق إعادة التدريب عند تجاوز الانحراف العتبة."""
max_psi = max(psi_scores.values())
if max_psi >= threshold:
# أطلق خط أنابيب Kubeflow/Airflow
trigger_training_pipeline(
reason="data_drift",
drift_scores=psi_scores
)
return True
return False
الرؤية الرئيسية: النماذج لا تفشل بصوت عالٍ—تتدهور بصمت. مراقبة الانحراف المستمرة تكتشف المشاكل قبل أن تؤثر على المستخدمين.
التالي، سنستكشف مراقبة أداء النموذج والتنبيهات. :::