مخازن الميزات وهندسة الميزات

خطوط أنابيب هندسة الميزات

3 دقيقة للقراءة

هندسة الميزات تُحول البيانات الخام إلى مدخلات ذات معنى لنماذج ML. بناء خطوط أنابيب قوية يضمن أن ميزاتك حديثة ومتسقة وجاهزة للإنتاج.

معمارية خط أنابيب هندسة الميزات

┌─────────────────────────────────────────────────────────────────┐
│                       مصادر البيانات الخام                       │
│  ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────┐           │
│  │قاعدة   │  │ أحداث  │  │ سجلات  │  │  APIs   │           │
│  │بيانات  │  │         │  │         │  │         │           │
│  └────┬────┘  └────┬────┘  └────┬────┘  └────┬────┘           │
└───────┼────────────┼────────────┼────────────┼─────────────────┘
        │            │            │            │
        └────────────┴─────┬──────┴────────────┘
              ┌────────────────────────┐
              │   خط أنابيب الميزات   │
              │  ┌──────────────────┐  │
              │  │  الاستيعاب      │  │
              │  ├──────────────────┤  │
              │  │  التحويل        │  │
              │  ├──────────────────┤  │
              │  │  التحقق         │  │
              │  ├──────────────────┤  │
              │  │  التخزين        │  │
              │  └──────────────────┘  │
              └───────────┬────────────┘
        ┌─────────────────┴─────────────────┐
        │                                   │
        ▼                                   ▼
┌───────────────────┐            ┌───────────────────┐
│  المخزن غير     │            │   المخزن الفوري   │
│  الفوري (التدريب)│            │   (الاستدلال)     │
└───────────────────┘            └───────────────────┘

أنماط تحويل الميزات

التجميعات

import pandas as pd

def compute_customer_aggregates(transactions_df: pd.DataFrame) -> pd.DataFrame:
    """حساب تجميعات مستوى العميل."""
    return transactions_df.groupby("customer_id").agg({
        "amount": ["sum", "mean", "std", "count"],
        "transaction_date": ["min", "max"]
    }).reset_index()

# أعمدة المخرجات:
# - amount_sum (إجمالي الإنفاق)
# - amount_mean (متوسط المعاملة)
# - amount_std (تقلب الإنفاق)
# - amount_count (عدد المعاملات)
# - transaction_date_min (أول شراء)
# - transaction_date_max (آخر شراء)

الميزات المبنية على الوقت

from datetime import datetime, timedelta

def compute_time_features(df: pd.DataFrame, reference_date: datetime) -> pd.DataFrame:
    """حساب الميزات المبنية على الوقت."""
    df = df.copy()

    # الأيام منذ آخر نشاط
    df["days_since_last_order"] = (
        reference_date - pd.to_datetime(df["last_order_date"])
    ).dt.days

    # التجميعات المتدحرجة
    df["orders_last_7d"] = df.apply(
        lambda row: count_orders_in_window(row["customer_id"], days=7),
        axis=1
    )

    df["orders_last_30d"] = df.apply(
        lambda row: count_orders_in_window(row["customer_id"], days=30),
        axis=1
    )

    # نسب مبنية على الوقت
    df["order_frequency_trend"] = df["orders_last_7d"] / (df["orders_last_30d"] / 4)

    return df

ترميز الفئات

from sklearn.preprocessing import LabelEncoder, OneHotEncoder

def encode_categoricals(df: pd.DataFrame) -> pd.DataFrame:
    """ترميز الميزات الفئوية."""
    df = df.copy()

    # ترميز التسمية للفئات الترتيبية
    label_encoder = LabelEncoder()
    df["customer_tier_encoded"] = label_encoder.fit_transform(df["customer_tier"])

    # ترميز one-hot للفئات الاسمية
    one_hot = pd.get_dummies(df["product_category"], prefix="category")
    df = pd.concat([df, one_hot], axis=1)

    return df

تقاطعات الميزات

def create_feature_crosses(df: pd.DataFrame) -> pd.DataFrame:
    """إنشاء ميزات التفاعل."""
    df = df.copy()

    # تقاطعات رقمية
    df["spend_per_order"] = df["total_spend"] / df["order_count"]
    df["orders_per_day"] = df["order_count"] / df["days_as_customer"]

    # تقاطعات الفئات
    df["region_segment"] = df["region"] + "_" + df["customer_segment"]

    return df

أنماط جدولة خطوط الأنابيب

خط أنابيب دفعي (يومي)

from prefect import flow, task
from datetime import datetime

@task
def extract_raw_data(date: str) -> pd.DataFrame:
    """استخراج البيانات الخام لتاريخ محدد."""
    query = f"""
        SELECT * FROM transactions
        WHERE DATE(created_at) = '{date}'
    """
    return pd.read_sql(query, connection)

@task
def transform_features(raw_df: pd.DataFrame) -> pd.DataFrame:
    """تطبيق تحويلات الميزات."""
    df = compute_customer_aggregates(raw_df)
    df = compute_time_features(df, datetime.now())
    df = encode_categoricals(df)
    return df

@task
def load_to_feature_store(features_df: pd.DataFrame):
    """تحميل الميزات إلى Feast."""
    features_df.to_parquet("data/customer_features.parquet")
    # ثم شغّل: feast materialize-incremental

@flow(name="daily_feature_pipeline")
def daily_features(date: str):
    raw_data = extract_raw_data(date)
    features = transform_features(raw_data)
    load_to_feature_store(features)

خط أنابيب تدفقي (فوري)

# خط أنابيب ميزات تدفقي مفاهيمي
from kafka import KafkaConsumer
import json

def process_event(event: dict) -> dict:
    """معالجة حدث واحد إلى ميزات."""
    return {
        "customer_id": event["customer_id"],
        "event_timestamp": event["timestamp"],
        "session_duration": calculate_session_duration(event),
        "pages_viewed": event["pages_viewed"],
        "cart_value": event["cart_value"],
    }

def stream_features():
    consumer = KafkaConsumer("user_events")

    for message in consumer:
        event = json.loads(message.value)
        features = process_event(event)

        # حدّث المخزن الفوري
        update_online_store(features)

التحقق من الميزات

التحقق من المخطط

from pydantic import BaseModel, validator
from typing import Optional

class CustomerFeatures(BaseModel):
    customer_id: int
    total_spend: float
    order_count: int
    days_since_last_order: int
    customer_segment: str

    @validator("total_spend")
    def spend_must_be_positive(cls, v):
        if v < 0:
            raise ValueError("total_spend يجب أن يكون >= 0")
        return v

    @validator("order_count")
    def order_count_must_be_positive(cls, v):
        if v < 0:
            raise ValueError("order_count يجب أن يكون >= 0")
        return v

def validate_features(df: pd.DataFrame) -> pd.DataFrame:
    """التحقق من جميع الصفوف مقابل المخطط."""
    valid_rows = []
    for _, row in df.iterrows():
        try:
            CustomerFeatures(**row.to_dict())
            valid_rows.append(row)
        except Exception as e:
            print(f"صف غير صالح: {e}")
    return pd.DataFrame(valid_rows)

التحقق الإحصائي

def check_feature_distributions(
    current_df: pd.DataFrame,
    reference_df: pd.DataFrame,
    threshold: float = 0.1
) -> dict:
    """تحقق من انجراف التوزيع."""
    drift_report = {}

    for column in current_df.select_dtypes(include=[float, int]).columns:
        current_mean = current_df[column].mean()
        reference_mean = reference_df[column].mean()

        drift = abs(current_mean - reference_mean) / reference_mean
        drift_report[column] = {
            "drift": drift,
            "alert": drift > threshold
        }

    return drift_report

أفضل الممارسات

الممارسة لماذا
احسب مرة، استخدم في كل مكان الاتساق عبر التدريب/التقديم
أصدر التحويلات أعد إنتاج أي مجموعة ميزات
تحقق مبكراً اكتشف البيانات السيئة قبل تأثيرها على النماذج
راقب الحداثة الميزات القديمة تُدهور التوقعات
وثّق الميزات ساعد أعضاء الفريق على فهم الميزات

توثيق الميزات

# توثيق ميزات جيد
customer_lifetime_value = FeatureView(
    name="customer_lifetime_value",
    description="""
    القيمة المتوقعة مدى الحياة للعميل بناءً على سلوك
    الشراء التاريخي. يُحدث يومياً. يُستخدم بواسطة نماذج
    التسويق والتنبؤ بالانسحاب.

    الحساب: مجموع جميع الطلبات * احتمالية الاحتفاظ

    المالك: data-science@company.com
    SLA: يُحدث بحلول 6 صباحاً UTC يومياً
    """,
    entities=[customer],
    # ...
)

الرؤية الرئيسية: خطوط أنابيب هندسة الميزات يجب أن تكون مُختبرة ومُراقبة بشكل جيد مثل كود الإنتاج. الميزات السيئة هي السبب #1 لتدهور النماذج.

التالي، سنستكشف بدائل مخازن الميزات المُدارة مثل Tecton والحلول السحابية الأصلية. :::

اختبار

الوحدة 4: مخازن الميزات وهندسة الميزات

خذ الاختبار