مخازن الميزات وهندسة الميزات
خطوط أنابيب هندسة الميزات
3 دقيقة للقراءة
هندسة الميزات تُحول البيانات الخام إلى مدخلات ذات معنى لنماذج ML. بناء خطوط أنابيب قوية يضمن أن ميزاتك حديثة ومتسقة وجاهزة للإنتاج.
معمارية خط أنابيب هندسة الميزات
┌─────────────────────────────────────────────────────────────────┐
│ مصادر البيانات الخام │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │قاعدة │ │ أحداث │ │ سجلات │ │ APIs │ │
│ │بيانات │ │ │ │ │ │ │ │
│ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ │
└───────┼────────────┼────────────┼────────────┼─────────────────┘
│ │ │ │
└────────────┴─────┬──────┴────────────┘
│
▼
┌────────────────────────┐
│ خط أنابيب الميزات │
│ ┌──────────────────┐ │
│ │ الاستيعاب │ │
│ ├──────────────────┤ │
│ │ التحويل │ │
│ ├──────────────────┤ │
│ │ التحقق │ │
│ ├──────────────────┤ │
│ │ التخزين │ │
│ └──────────────────┘ │
└───────────┬────────────┘
│
┌─────────────────┴─────────────────┐
│ │
▼ ▼
┌───────────────────┐ ┌───────────────────┐
│ المخزن غير │ │ المخزن الفوري │
│ الفوري (التدريب)│ │ (الاستدلال) │
└───────────────────┘ └───────────────────┘
أنماط تحويل الميزات
التجميعات
import pandas as pd
def compute_customer_aggregates(transactions_df: pd.DataFrame) -> pd.DataFrame:
"""حساب تجميعات مستوى العميل."""
return transactions_df.groupby("customer_id").agg({
"amount": ["sum", "mean", "std", "count"],
"transaction_date": ["min", "max"]
}).reset_index()
# أعمدة المخرجات:
# - amount_sum (إجمالي الإنفاق)
# - amount_mean (متوسط المعاملة)
# - amount_std (تقلب الإنفاق)
# - amount_count (عدد المعاملات)
# - transaction_date_min (أول شراء)
# - transaction_date_max (آخر شراء)
الميزات المبنية على الوقت
from datetime import datetime, timedelta
def compute_time_features(df: pd.DataFrame, reference_date: datetime) -> pd.DataFrame:
"""حساب الميزات المبنية على الوقت."""
df = df.copy()
# الأيام منذ آخر نشاط
df["days_since_last_order"] = (
reference_date - pd.to_datetime(df["last_order_date"])
).dt.days
# التجميعات المتدحرجة
df["orders_last_7d"] = df.apply(
lambda row: count_orders_in_window(row["customer_id"], days=7),
axis=1
)
df["orders_last_30d"] = df.apply(
lambda row: count_orders_in_window(row["customer_id"], days=30),
axis=1
)
# نسب مبنية على الوقت
df["order_frequency_trend"] = df["orders_last_7d"] / (df["orders_last_30d"] / 4)
return df
ترميز الفئات
from sklearn.preprocessing import LabelEncoder, OneHotEncoder
def encode_categoricals(df: pd.DataFrame) -> pd.DataFrame:
"""ترميز الميزات الفئوية."""
df = df.copy()
# ترميز التسمية للفئات الترتيبية
label_encoder = LabelEncoder()
df["customer_tier_encoded"] = label_encoder.fit_transform(df["customer_tier"])
# ترميز one-hot للفئات الاسمية
one_hot = pd.get_dummies(df["product_category"], prefix="category")
df = pd.concat([df, one_hot], axis=1)
return df
تقاطعات الميزات
def create_feature_crosses(df: pd.DataFrame) -> pd.DataFrame:
"""إنشاء ميزات التفاعل."""
df = df.copy()
# تقاطعات رقمية
df["spend_per_order"] = df["total_spend"] / df["order_count"]
df["orders_per_day"] = df["order_count"] / df["days_as_customer"]
# تقاطعات الفئات
df["region_segment"] = df["region"] + "_" + df["customer_segment"]
return df
أنماط جدولة خطوط الأنابيب
خط أنابيب دفعي (يومي)
from prefect import flow, task
from datetime import datetime
@task
def extract_raw_data(date: str) -> pd.DataFrame:
"""استخراج البيانات الخام لتاريخ محدد."""
query = f"""
SELECT * FROM transactions
WHERE DATE(created_at) = '{date}'
"""
return pd.read_sql(query, connection)
@task
def transform_features(raw_df: pd.DataFrame) -> pd.DataFrame:
"""تطبيق تحويلات الميزات."""
df = compute_customer_aggregates(raw_df)
df = compute_time_features(df, datetime.now())
df = encode_categoricals(df)
return df
@task
def load_to_feature_store(features_df: pd.DataFrame):
"""تحميل الميزات إلى Feast."""
features_df.to_parquet("data/customer_features.parquet")
# ثم شغّل: feast materialize-incremental
@flow(name="daily_feature_pipeline")
def daily_features(date: str):
raw_data = extract_raw_data(date)
features = transform_features(raw_data)
load_to_feature_store(features)
خط أنابيب تدفقي (فوري)
# خط أنابيب ميزات تدفقي مفاهيمي
from kafka import KafkaConsumer
import json
def process_event(event: dict) -> dict:
"""معالجة حدث واحد إلى ميزات."""
return {
"customer_id": event["customer_id"],
"event_timestamp": event["timestamp"],
"session_duration": calculate_session_duration(event),
"pages_viewed": event["pages_viewed"],
"cart_value": event["cart_value"],
}
def stream_features():
consumer = KafkaConsumer("user_events")
for message in consumer:
event = json.loads(message.value)
features = process_event(event)
# حدّث المخزن الفوري
update_online_store(features)
التحقق من الميزات
التحقق من المخطط
from pydantic import BaseModel, validator
from typing import Optional
class CustomerFeatures(BaseModel):
customer_id: int
total_spend: float
order_count: int
days_since_last_order: int
customer_segment: str
@validator("total_spend")
def spend_must_be_positive(cls, v):
if v < 0:
raise ValueError("total_spend يجب أن يكون >= 0")
return v
@validator("order_count")
def order_count_must_be_positive(cls, v):
if v < 0:
raise ValueError("order_count يجب أن يكون >= 0")
return v
def validate_features(df: pd.DataFrame) -> pd.DataFrame:
"""التحقق من جميع الصفوف مقابل المخطط."""
valid_rows = []
for _, row in df.iterrows():
try:
CustomerFeatures(**row.to_dict())
valid_rows.append(row)
except Exception as e:
print(f"صف غير صالح: {e}")
return pd.DataFrame(valid_rows)
التحقق الإحصائي
def check_feature_distributions(
current_df: pd.DataFrame,
reference_df: pd.DataFrame,
threshold: float = 0.1
) -> dict:
"""تحقق من انجراف التوزيع."""
drift_report = {}
for column in current_df.select_dtypes(include=[float, int]).columns:
current_mean = current_df[column].mean()
reference_mean = reference_df[column].mean()
drift = abs(current_mean - reference_mean) / reference_mean
drift_report[column] = {
"drift": drift,
"alert": drift > threshold
}
return drift_report
أفضل الممارسات
| الممارسة | لماذا |
|---|---|
| احسب مرة، استخدم في كل مكان | الاتساق عبر التدريب/التقديم |
| أصدر التحويلات | أعد إنتاج أي مجموعة ميزات |
| تحقق مبكراً | اكتشف البيانات السيئة قبل تأثيرها على النماذج |
| راقب الحداثة | الميزات القديمة تُدهور التوقعات |
| وثّق الميزات | ساعد أعضاء الفريق على فهم الميزات |
توثيق الميزات
# توثيق ميزات جيد
customer_lifetime_value = FeatureView(
name="customer_lifetime_value",
description="""
القيمة المتوقعة مدى الحياة للعميل بناءً على سلوك
الشراء التاريخي. يُحدث يومياً. يُستخدم بواسطة نماذج
التسويق والتنبؤ بالانسحاب.
الحساب: مجموع جميع الطلبات * احتمالية الاحتفاظ
المالك: data-science@company.com
SLA: يُحدث بحلول 6 صباحاً UTC يومياً
""",
entities=[customer],
# ...
)
الرؤية الرئيسية: خطوط أنابيب هندسة الميزات يجب أن تكون مُختبرة ومُراقبة بشكل جيد مثل كود الإنتاج. الميزات السيئة هي السبب #1 لتدهور النماذج.
التالي، سنستكشف بدائل مخازن الميزات المُدارة مثل Tecton والحلول السحابية الأصلية. :::