Feature Stores & Feature Engineering

Feature Engineering Pipelines

3 min read

Feature engineering transforms raw data into meaningful inputs for ML models. Building robust pipelines ensures your features are fresh, consistent, and production-ready.

Feature Engineering Pipeline Architecture

┌─────────────────────────────────────────────────────────────────┐
│                       Raw Data Sources                          │
│  ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────┐           │
│  │Database │  │ Events  │  │  Logs   │  │  APIs   │           │
│  └────┬────┘  └────┬────┘  └────┬────┘  └────┬────┘           │
└───────┼────────────┼────────────┼────────────┼─────────────────┘
        │            │            │            │
        └────────────┴─────┬──────┴────────────┘
              ┌────────────────────────┐
              │   Feature Pipeline     │
              │  ┌──────────────────┐  │
              │  │  Ingestion       │  │
              │  ├──────────────────┤  │
              │  │  Transformation  │  │
              │  ├──────────────────┤  │
              │  │  Validation      │  │
              │  ├──────────────────┤  │
              │  │  Storage         │  │
              │  └──────────────────┘  │
              └───────────┬────────────┘
        ┌─────────────────┴─────────────────┐
        │                                   │
        ▼                                   ▼
┌───────────────────┐            ┌───────────────────┐
│  Offline Store    │            │   Online Store    │
│  (Training)       │            │   (Inference)     │
└───────────────────┘            └───────────────────┘

Feature Transformation Patterns

Aggregations

import pandas as pd

def compute_customer_aggregates(transactions_df: pd.DataFrame) -> pd.DataFrame:
    """Compute customer-level aggregations."""
    return transactions_df.groupby("customer_id").agg({
        "amount": ["sum", "mean", "std", "count"],
        "transaction_date": ["min", "max"]
    }).reset_index()

# Output columns:
# - amount_sum (total spend)
# - amount_mean (average transaction)
# - amount_std (spending variability)
# - amount_count (number of transactions)
# - transaction_date_min (first purchase)
# - transaction_date_max (last purchase)

Time-Based Features

from datetime import datetime, timedelta

def compute_time_features(df: pd.DataFrame, reference_date: datetime) -> pd.DataFrame:
    """Compute time-based features."""
    df = df.copy()

    # Days since last activity
    df["days_since_last_order"] = (
        reference_date - pd.to_datetime(df["last_order_date"])
    ).dt.days

    # Rolling aggregates
    df["orders_last_7d"] = df.apply(
        lambda row: count_orders_in_window(row["customer_id"], days=7),
        axis=1
    )

    df["orders_last_30d"] = df.apply(
        lambda row: count_orders_in_window(row["customer_id"], days=30),
        axis=1
    )

    # Time-based ratios
    df["order_frequency_trend"] = df["orders_last_7d"] / (df["orders_last_30d"] / 4)

    return df

Categorical Encoding

from sklearn.preprocessing import LabelEncoder, OneHotEncoder

def encode_categoricals(df: pd.DataFrame) -> pd.DataFrame:
    """Encode categorical features."""
    df = df.copy()

    # Label encoding for ordinal categories
    label_encoder = LabelEncoder()
    df["customer_tier_encoded"] = label_encoder.fit_transform(df["customer_tier"])

    # One-hot encoding for nominal categories
    one_hot = pd.get_dummies(df["product_category"], prefix="category")
    df = pd.concat([df, one_hot], axis=1)

    return df

Feature Crosses

def create_feature_crosses(df: pd.DataFrame) -> pd.DataFrame:
    """Create interaction features."""
    df = df.copy()

    # Numeric crosses
    df["spend_per_order"] = df["total_spend"] / df["order_count"]
    df["orders_per_day"] = df["order_count"] / df["days_as_customer"]

    # Category crosses
    df["region_segment"] = df["region"] + "_" + df["customer_segment"]

    return df

Pipeline Scheduling Patterns

Batch Pipeline (Daily)

from prefect import flow, task
from datetime import datetime

@task
def extract_raw_data(date: str) -> pd.DataFrame:
    """Extract raw data for a specific date."""
    query = f"""
        SELECT * FROM transactions
        WHERE DATE(created_at) = '{date}'
    """
    return pd.read_sql(query, connection)

@task
def transform_features(raw_df: pd.DataFrame) -> pd.DataFrame:
    """Apply feature transformations."""
    df = compute_customer_aggregates(raw_df)
    df = compute_time_features(df, datetime.now())
    df = encode_categoricals(df)
    return df

@task
def load_to_feature_store(features_df: pd.DataFrame):
    """Load features to Feast."""
    features_df.to_parquet("data/customer_features.parquet")
    # Then run: feast materialize-incremental

@flow(name="daily_feature_pipeline")
def daily_features(date: str):
    raw_data = extract_raw_data(date)
    features = transform_features(raw_data)
    load_to_feature_store(features)

Streaming Pipeline (Real-time)

# Conceptual streaming feature pipeline
from kafka import KafkaConsumer
import json

def process_event(event: dict) -> dict:
    """Process a single event into features."""
    return {
        "customer_id": event["customer_id"],
        "event_timestamp": event["timestamp"],
        "session_duration": calculate_session_duration(event),
        "pages_viewed": event["pages_viewed"],
        "cart_value": event["cart_value"],
    }

def stream_features():
    consumer = KafkaConsumer("user_events")

    for message in consumer:
        event = json.loads(message.value)
        features = process_event(event)

        # Update online store
        update_online_store(features)

Feature Validation

Schema Validation

from pydantic import BaseModel, validator
from typing import Optional

class CustomerFeatures(BaseModel):
    customer_id: int
    total_spend: float
    order_count: int
    days_since_last_order: int
    customer_segment: str

    @validator("total_spend")
    def spend_must_be_positive(cls, v):
        if v < 0:
            raise ValueError("total_spend must be >= 0")
        return v

    @validator("order_count")
    def order_count_must_be_positive(cls, v):
        if v < 0:
            raise ValueError("order_count must be >= 0")
        return v

def validate_features(df: pd.DataFrame) -> pd.DataFrame:
    """Validate all rows against schema."""
    valid_rows = []
    for _, row in df.iterrows():
        try:
            CustomerFeatures(**row.to_dict())
            valid_rows.append(row)
        except Exception as e:
            print(f"Invalid row: {e}")
    return pd.DataFrame(valid_rows)

Statistical Validation

def check_feature_distributions(
    current_df: pd.DataFrame,
    reference_df: pd.DataFrame,
    threshold: float = 0.1
) -> dict:
    """Check for distribution drift."""
    drift_report = {}

    for column in current_df.select_dtypes(include=[float, int]).columns:
        current_mean = current_df[column].mean()
        reference_mean = reference_df[column].mean()

        drift = abs(current_mean - reference_mean) / reference_mean
        drift_report[column] = {
            "drift": drift,
            "alert": drift > threshold
        }

    return drift_report

Best Practices

Practice Why
Compute once, use everywhere Consistency across training/serving
Version transformations Reproduce any feature set
Validate early Catch bad data before it affects models
Monitor freshness Stale features degrade predictions
Document features Help team members understand features

Feature Documentation

# Good feature documentation
customer_lifetime_value = FeatureView(
    name="customer_lifetime_value",
    description="""
    Predicted lifetime value of a customer based on historical
    purchase behavior. Updated daily. Used by marketing and
    churn prediction models.

    Calculation: Sum of all orders * retention probability

    Owner: data-science@company.com
    SLA: Updated by 6 AM UTC daily
    """,
    entities=[customer],
    # ...
)

Key insight: Feature engineering pipelines should be as well-tested and monitored as your production code. Poor features are the #1 cause of model degradation.

Next, we'll explore managed feature store alternatives like Tecton and cloud-native solutions. :::

Quiz

Module 4: Feature Stores & Feature Engineering

Take Quiz