Monitoring & Observability

Logging and Debugging ML Systems

4 min read

Production ML debugging requires specialized techniques. Interviewers test your ability to diagnose issues that span data, models, and infrastructure.

ML-Specific Logging Requirements

Log Type What to Capture Why It Matters
Request logs Input features, timestamps Reproduce issues
Prediction logs Output, confidence, latency Debug wrong predictions
Model logs Version, load time, errors Track model lifecycle
Data logs Schema, null rates, ranges Detect data issues

Structured Logging for ML

import structlog
import json
from datetime import datetime

# Configure structured logging
structlog.configure(
    processors=[
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.JSONRenderer()
    ]
)

logger = structlog.get_logger()

def log_prediction(request_id, model_name, features, prediction, latency_ms):
    logger.info(
        "prediction_completed",
        request_id=request_id,
        model_name=model_name,
        model_version=get_model_version(model_name),
        feature_count=len(features),
        # Sample features for debugging (not all - privacy + size)
        feature_sample={k: features[k] for k in list(features.keys())[:5]},
        prediction=prediction,
        confidence=prediction.get("confidence"),
        latency_ms=latency_ms,
        timestamp=datetime.utcnow().isoformat()
    )

Output:

{
  "event": "prediction_completed",
  "request_id": "abc-123",
  "model_name": "fraud_detector",
  "model_version": "v2.3.1",
  "feature_count": 45,
  "feature_sample": {"amount": 150.0, "merchant_id": "M123"},
  "prediction": {"label": "fraud", "score": 0.87},
  "confidence": 0.87,
  "latency_ms": 23,
  "timestamp": "2026-01-04T10:30:00Z"
}

Interview Question: Debug Wrong Predictions

Question: "Users report the fraud model is flagging legitimate transactions. How do you debug?"

Structured Debugging Approach:

def debug_false_positives(flagged_transactions):
    """
    Step-by-step debugging for wrong predictions
    """

    # Step 1: Reproduce the prediction
    debug_steps = {}

    debug_steps["1_replay_prediction"] = """
    - Fetch the exact input features from prediction logs
    - Run the same model version with those features
    - Verify we get the same prediction (reproducibility)
    """

    debug_steps["2_analyze_features"] = """
    - Compare flagged transactions to training data
    - Identify outlier features (amount, time, location)
    - Check for missing or unexpected values
    """

    debug_steps["3_check_feature_pipeline"] = """
    - Verify feature store is returning correct values
    - Check for stale cached features
    - Look for feature transformation bugs
    """

    debug_steps["4_analyze_model_behavior"] = """
    - Get feature importance for these predictions
    - Use SHAP/LIME for local explanations
    - Identify which features drove the decision
    """

    debug_steps["5_check_for_drift"] = """
    - Compare production data distribution to training
    - Check if this represents a new user segment
    - Evaluate if model was trained on similar cases
    """

    return debug_steps

Tracing for ML Systems

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger.thrift import JaegerExporter

# Setup tracing
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)

jaeger_exporter = JaegerExporter(
    agent_host_name="jaeger",
    agent_port=6831,
)
trace.get_tracer_provider().add_span_processor(
    BatchSpanProcessor(jaeger_exporter)
)

@tracer.start_as_current_span("predict")
def predict(request):
    span = trace.get_current_span()

    # Trace feature retrieval
    with tracer.start_as_current_span("fetch_features"):
        features = fetch_from_feature_store(request.user_id)
        span.set_attribute("feature_count", len(features))

    # Trace preprocessing
    with tracer.start_as_current_span("preprocess"):
        processed = preprocess(features)
        span.set_attribute("preprocessing_time_ms", timer.elapsed())

    # Trace inference
    with tracer.start_as_current_span("model_inference"):
        prediction = model.predict(processed)
        span.set_attribute("model_version", model.version)
        span.set_attribute("prediction", str(prediction))

    return prediction

Log Aggregation Strategy

# Recommended log pipeline
log_architecture:
  application_logs:
    format: "JSON structured"
    destination: "Loki or Elasticsearch"
    retention: "30 days"

  prediction_logs:
    format: "JSON with request_id correlation"
    destination: "Dedicated ML log store (BigQuery)"
    retention: "90 days (for model debugging)"

  metrics:
    format: "Prometheus exposition format"
    destination: "Prometheus + Thanos (long-term)"
    retention: "1 year"

  traces:
    format: "OpenTelemetry"
    destination: "Jaeger or Tempo"
    retention: "7 days"

Common Debugging Scenarios

Symptom Likely Cause Investigation
Sudden accuracy drop Data pipeline change Check upstream schema
Gradual accuracy drop Concept drift Compare distributions
High latency spikes Memory pressure Check GPU memory logs
Random errors Intermittent dependency Check trace spans
Wrong predictions Feature bug Compare logged vs expected features

Pro Tip: "We keep prediction logs for 90 days so we can replay any prediction when users report issues. The request_id is our correlation key across all systems."

Next module covers CI/CD for ML Systems interview questions. :::

Quiz

Module 4: Monitoring & Observability

Take Quiz