Feature Stores & Feature Engineering
Feature Engineering Pipelines
3 min read
Feature engineering transforms raw data into meaningful inputs for ML models. Building robust pipelines ensures your features are fresh, consistent, and production-ready.
Feature Engineering Pipeline Architecture
┌─────────────────────────────────────────────────────────────────┐
│ Raw Data Sources │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │Database │ │ Events │ │ Logs │ │ APIs │ │
│ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ │
└───────┼────────────┼────────────┼────────────┼─────────────────┘
│ │ │ │
└────────────┴─────┬──────┴────────────┘
│
▼
┌────────────────────────┐
│ Feature Pipeline │
│ ┌──────────────────┐ │
│ │ Ingestion │ │
│ ├──────────────────┤ │
│ │ Transformation │ │
│ ├──────────────────┤ │
│ │ Validation │ │
│ ├──────────────────┤ │
│ │ Storage │ │
│ └──────────────────┘ │
└───────────┬────────────┘
│
┌─────────────────┴─────────────────┐
│ │
▼ ▼
┌───────────────────┐ ┌───────────────────┐
│ Offline Store │ │ Online Store │
│ (Training) │ │ (Inference) │
└───────────────────┘ └───────────────────┘
Feature Transformation Patterns
Aggregations
import pandas as pd
def compute_customer_aggregates(transactions_df: pd.DataFrame) -> pd.DataFrame:
"""Compute customer-level aggregations."""
return transactions_df.groupby("customer_id").agg({
"amount": ["sum", "mean", "std", "count"],
"transaction_date": ["min", "max"]
}).reset_index()
# Output columns:
# - amount_sum (total spend)
# - amount_mean (average transaction)
# - amount_std (spending variability)
# - amount_count (number of transactions)
# - transaction_date_min (first purchase)
# - transaction_date_max (last purchase)
Time-Based Features
from datetime import datetime, timedelta
def compute_time_features(df: pd.DataFrame, reference_date: datetime) -> pd.DataFrame:
"""Compute time-based features."""
df = df.copy()
# Days since last activity
df["days_since_last_order"] = (
reference_date - pd.to_datetime(df["last_order_date"])
).dt.days
# Rolling aggregates
df["orders_last_7d"] = df.apply(
lambda row: count_orders_in_window(row["customer_id"], days=7),
axis=1
)
df["orders_last_30d"] = df.apply(
lambda row: count_orders_in_window(row["customer_id"], days=30),
axis=1
)
# Time-based ratios
df["order_frequency_trend"] = df["orders_last_7d"] / (df["orders_last_30d"] / 4)
return df
Categorical Encoding
from sklearn.preprocessing import LabelEncoder, OneHotEncoder
def encode_categoricals(df: pd.DataFrame) -> pd.DataFrame:
"""Encode categorical features."""
df = df.copy()
# Label encoding for ordinal categories
label_encoder = LabelEncoder()
df["customer_tier_encoded"] = label_encoder.fit_transform(df["customer_tier"])
# One-hot encoding for nominal categories
one_hot = pd.get_dummies(df["product_category"], prefix="category")
df = pd.concat([df, one_hot], axis=1)
return df
Feature Crosses
def create_feature_crosses(df: pd.DataFrame) -> pd.DataFrame:
"""Create interaction features."""
df = df.copy()
# Numeric crosses
df["spend_per_order"] = df["total_spend"] / df["order_count"]
df["orders_per_day"] = df["order_count"] / df["days_as_customer"]
# Category crosses
df["region_segment"] = df["region"] + "_" + df["customer_segment"]
return df
Pipeline Scheduling Patterns
Batch Pipeline (Daily)
from prefect import flow, task
from datetime import datetime
@task
def extract_raw_data(date: str) -> pd.DataFrame:
"""Extract raw data for a specific date."""
query = f"""
SELECT * FROM transactions
WHERE DATE(created_at) = '{date}'
"""
return pd.read_sql(query, connection)
@task
def transform_features(raw_df: pd.DataFrame) -> pd.DataFrame:
"""Apply feature transformations."""
df = compute_customer_aggregates(raw_df)
df = compute_time_features(df, datetime.now())
df = encode_categoricals(df)
return df
@task
def load_to_feature_store(features_df: pd.DataFrame):
"""Load features to Feast."""
features_df.to_parquet("data/customer_features.parquet")
# Then run: feast materialize-incremental
@flow(name="daily_feature_pipeline")
def daily_features(date: str):
raw_data = extract_raw_data(date)
features = transform_features(raw_data)
load_to_feature_store(features)
Streaming Pipeline (Real-time)
# Conceptual streaming feature pipeline
from kafka import KafkaConsumer
import json
def process_event(event: dict) -> dict:
"""Process a single event into features."""
return {
"customer_id": event["customer_id"],
"event_timestamp": event["timestamp"],
"session_duration": calculate_session_duration(event),
"pages_viewed": event["pages_viewed"],
"cart_value": event["cart_value"],
}
def stream_features():
consumer = KafkaConsumer("user_events")
for message in consumer:
event = json.loads(message.value)
features = process_event(event)
# Update online store
update_online_store(features)
Feature Validation
Schema Validation
from pydantic import BaseModel, validator
from typing import Optional
class CustomerFeatures(BaseModel):
customer_id: int
total_spend: float
order_count: int
days_since_last_order: int
customer_segment: str
@validator("total_spend")
def spend_must_be_positive(cls, v):
if v < 0:
raise ValueError("total_spend must be >= 0")
return v
@validator("order_count")
def order_count_must_be_positive(cls, v):
if v < 0:
raise ValueError("order_count must be >= 0")
return v
def validate_features(df: pd.DataFrame) -> pd.DataFrame:
"""Validate all rows against schema."""
valid_rows = []
for _, row in df.iterrows():
try:
CustomerFeatures(**row.to_dict())
valid_rows.append(row)
except Exception as e:
print(f"Invalid row: {e}")
return pd.DataFrame(valid_rows)
Statistical Validation
def check_feature_distributions(
current_df: pd.DataFrame,
reference_df: pd.DataFrame,
threshold: float = 0.1
) -> dict:
"""Check for distribution drift."""
drift_report = {}
for column in current_df.select_dtypes(include=[float, int]).columns:
current_mean = current_df[column].mean()
reference_mean = reference_df[column].mean()
drift = abs(current_mean - reference_mean) / reference_mean
drift_report[column] = {
"drift": drift,
"alert": drift > threshold
}
return drift_report
Best Practices
| Practice | Why |
|---|---|
| Compute once, use everywhere | Consistency across training/serving |
| Version transformations | Reproduce any feature set |
| Validate early | Catch bad data before it affects models |
| Monitor freshness | Stale features degrade predictions |
| Document features | Help team members understand features |
Feature Documentation
# Good feature documentation
customer_lifetime_value = FeatureView(
name="customer_lifetime_value",
description="""
Predicted lifetime value of a customer based on historical
purchase behavior. Updated daily. Used by marketing and
churn prediction models.
Calculation: Sum of all orders * retention probability
Owner: data-science@company.com
SLA: Updated by 6 AM UTC daily
""",
entities=[customer],
# ...
)
Key insight: Feature engineering pipelines should be as well-tested and monitored as your production code. Poor features are the #1 cause of model degradation.
Next, we'll explore managed feature store alternatives like Tecton and cloud-native solutions. :::