Feature Stores & Feature Engineering

Feature Engineering Pipelines

3 min read

Feature engineering transforms raw data into meaningful inputs for ML models. Building robust pipelines ensures your features are fresh, consistent, and production-ready.

Feature Engineering Pipeline Architecture

┌─────────────────────────────────────────────────────────────────┐
│                       Raw Data Sources                          │
│  ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────┐           │
│  │Database │  │ Events  │  │  Logs   │  │  APIs   │           │
│  └────┬────┘  └────┬────┘  └────┬────┘  └────┬────┘           │
└───────┼────────────┼────────────┼────────────┼─────────────────┘
        │            │            │            │
        └────────────┴─────┬──────┴────────────┘
              ┌────────────────────────┐
              │   Feature Pipeline     │
              │  ┌──────────────────┐  │
              │  │  Ingestion       │  │
              │  ├──────────────────┤  │
              │  │  Transformation  │  │
              │  ├──────────────────┤  │
              │  │  Validation      │  │
              │  ├──────────────────┤  │
              │  │  Storage         │  │
              │  └──────────────────┘  │
              └───────────┬────────────┘
        ┌─────────────────┴─────────────────┐
        │                                   │
        ▼                                   ▼
┌───────────────────┐            ┌───────────────────┐
│  Offline Store    │            │   Online Store    │
│  (Training)       │            │   (Inference)     │
└───────────────────┘            └───────────────────┘

Feature Transformation Patterns

Aggregations

import pandas as pd

def compute_customer_aggregates(transactions_df: pd.DataFrame) -> pd.DataFrame:
    """Compute customer-level aggregations."""
    return transactions_df.groupby("customer_id").agg({
        "amount": ["sum", "mean", "std", "count"],
        "transaction_date": ["min", "max"]
    }).reset_index()

# Output columns:
# - amount_sum (total spend)
# - amount_mean (average transaction)
# - amount_std (spending variability)
# - amount_count (number of transactions)
# - transaction_date_min (first purchase)
# - transaction_date_max (last purchase)

Time-Based Features

from datetime import datetime, timedelta

def compute_time_features(df: pd.DataFrame, reference_date: datetime) -> pd.DataFrame:
    """Compute time-based features."""
    df = df.copy()

    # Days since last activity
    df["days_since_last_order"] = (
        reference_date - pd.to_datetime(df["last_order_date"])
    ).dt.days

    # Rolling aggregates
    df["orders_last_7d"] = df.apply(
        lambda row: count_orders_in_window(row["customer_id"], days=7),
        axis=1
    )

    df["orders_last_30d"] = df.apply(
        lambda row: count_orders_in_window(row["customer_id"], days=30),
        axis=1
    )

    # Time-based ratios
    df["order_frequency_trend"] = df["orders_last_7d"] / (df["orders_last_30d"] / 4)

    return df

Categorical Encoding

from sklearn.preprocessing import LabelEncoder, OneHotEncoder

def encode_categoricals(df: pd.DataFrame) -> pd.DataFrame:
    """Encode categorical features."""
    df = df.copy()

    # Label encoding for ordinal categories
    label_encoder = LabelEncoder()
    df["customer_tier_encoded"] = label_encoder.fit_transform(df["customer_tier"])

    # One-hot encoding for nominal categories
    one_hot = pd.get_dummies(df["product_category"], prefix="category")
    df = pd.concat([df, one_hot], axis=1)

    return df

Feature Crosses

def create_feature_crosses(df: pd.DataFrame) -> pd.DataFrame:
    """Create interaction features."""
    df = df.copy()

    # Numeric crosses
    df["spend_per_order"] = df["total_spend"] / df["order_count"]
    df["orders_per_day"] = df["order_count"] / df["days_as_customer"]

    # Category crosses
    df["region_segment"] = df["region"] + "_" + df["customer_segment"]

    return df

Pipeline Scheduling Patterns

Batch Pipeline (Daily)

from prefect import flow, task
from datetime import datetime

@task
def extract_raw_data(date: str) -> pd.DataFrame:
    """Extract raw data for a specific date."""
    query = f"""
        SELECT * FROM transactions
        WHERE DATE(created_at) = '{date}'
    """
    return pd.read_sql(query, connection)

@task
def transform_features(raw_df: pd.DataFrame) -> pd.DataFrame:
    """Apply feature transformations."""
    df = compute_customer_aggregates(raw_df)
    df = compute_time_features(df, datetime.now())
    df = encode_categoricals(df)
    return df

@task
def load_to_feature_store(features_df: pd.DataFrame):
    """Load features to Feast."""
    features_df.to_parquet("data/customer_features.parquet")
    # Then run: feast materialize-incremental

@flow(name="daily_feature_pipeline")
def daily_features(date: str):
    raw_data = extract_raw_data(date)
    features = transform_features(raw_data)
    load_to_feature_store(features)

Streaming Pipeline (Real-time)

# Conceptual streaming feature pipeline
from kafka import KafkaConsumer
import json

def process_event(event: dict) -> dict:
    """Process a single event into features."""
    return {
        "customer_id": event["customer_id"],
        "event_timestamp": event["timestamp"],
        "session_duration": calculate_session_duration(event),
        "pages_viewed": event["pages_viewed"],
        "cart_value": event["cart_value"],
    }

def stream_features():
    consumer = KafkaConsumer("user_events")

    for message in consumer:
        event = json.loads(message.value)
        features = process_event(event)

        # Update online store
        update_online_store(features)

Feature Validation

Schema Validation

from pydantic import BaseModel, validator
from typing import Optional

class CustomerFeatures(BaseModel):
    customer_id: int
    total_spend: float
    order_count: int
    days_since_last_order: int
    customer_segment: str

    @validator("total_spend")
    def spend_must_be_positive(cls, v):
        if v < 0:
            raise ValueError("total_spend must be >= 0")
        return v

    @validator("order_count")
    def order_count_must_be_positive(cls, v):
        if v < 0:
            raise ValueError("order_count must be >= 0")
        return v

def validate_features(df: pd.DataFrame) -> pd.DataFrame:
    """Validate all rows against schema."""
    valid_rows = []
    for _, row in df.iterrows():
        try:
            CustomerFeatures(**row.to_dict())
            valid_rows.append(row)
        except Exception as e:
            print(f"Invalid row: {e}")
    return pd.DataFrame(valid_rows)

Statistical Validation

def check_feature_distributions(
    current_df: pd.DataFrame,
    reference_df: pd.DataFrame,
    threshold: float = 0.1
) -> dict:
    """Check for distribution drift."""
    drift_report = {}

    for column in current_df.select_dtypes(include=[float, int]).columns:
        current_mean = current_df[column].mean()
        reference_mean = reference_df[column].mean()

        drift = abs(current_mean - reference_mean) / reference_mean
        drift_report[column] = {
            "drift": drift,
            "alert": drift > threshold
        }

    return drift_report

Best Practices

PracticeWhy
Compute once, use everywhereConsistency across training/serving
Version transformationsReproduce any feature set
Validate earlyCatch bad data before it affects models
Monitor freshnessStale features degrade predictions
Document featuresHelp team members understand features

Feature Documentation

# Good feature documentation
customer_lifetime_value = FeatureView(
    name="customer_lifetime_value",
    description="""
    Predicted lifetime value of a customer based on historical
    purchase behavior. Updated daily. Used by marketing and
    churn prediction models.

    Calculation: Sum of all orders * retention probability

    Owner: data-science@company.com
    SLA: Updated by 6 AM UTC daily
    """,
    entities=[customer],
    # ...
)

Key insight: Feature engineering pipelines should be as well-tested and monitored as your production code. Poor features are the #1 cause of model degradation.

Next, we'll explore managed feature store alternatives like Tecton and cloud-native solutions. :::

Quick check: how does this lesson land for you?

Quiz

Module 4: Feature Stores & Feature Engineering

Take Quiz
FREE WEEKLY NEWSLETTER

Stay on the Nerd Track

One email per week — courses, deep dives, tools, and AI experiments.

No spam. Unsubscribe anytime.