ML Workflow Orchestration

Prefect & Alternatives

3 min read

Prefect offers a modern, Python-native approach to workflow orchestration. Let's explore it and compare with other alternatives.

Why Prefect?

Feature Benefit
Python-native Just decorators, no config files
Dynamic workflows Build DAGs at runtime
Modern UI Beautiful dashboard
Hybrid execution Local or cloud
Easy debugging Run flows as regular Python

Installation

# Install Prefect
pip install prefect

# Verify installation
prefect version

Core Concepts

Flows and Tasks

from prefect import flow, task

@task
def load_data(path: str) -> dict:
    """Load data from path."""
    import pandas as pd
    df = pd.read_csv(path)
    return {"data": df, "rows": len(df)}

@task
def train_model(data: dict) -> str:
    """Train ML model."""
    from sklearn.ensemble import RandomForestClassifier
    import joblib

    df = data["data"]
    X = df.drop("target", axis=1)
    y = df["target"]

    model = RandomForestClassifier()
    model.fit(X, y)

    model_path = "/tmp/model.pkl"
    joblib.dump(model, model_path)
    return model_path

@task
def evaluate_model(model_path: str, data: dict) -> float:
    """Evaluate model accuracy."""
    import joblib
    from sklearn.metrics import accuracy_score

    model = joblib.load(model_path)
    df = data["data"]
    X = df.drop("target", axis=1)
    y = df["target"]

    predictions = model.predict(X)
    return accuracy_score(y, predictions)

@flow(name="ML Training Pipeline")
def ml_pipeline(data_path: str):
    """End-to-end ML training flow."""
    data = load_data(data_path)
    model_path = train_model(data)
    accuracy = evaluate_model(model_path, data)

    print(f"Model accuracy: {accuracy:.2%}")
    return accuracy

# Run the flow
if __name__ == "__main__":
    ml_pipeline("data/train.csv")

Advanced Features

Retries and Caching

from prefect import task, flow
from prefect.tasks import task_input_hash
from datetime import timedelta

@task(
    retries=3,
    retry_delay_seconds=60,
    cache_key_fn=task_input_hash,
    cache_expiration=timedelta(hours=1)
)
def expensive_computation(data: dict) -> dict:
    """Cached computation with retries."""
    # This result will be cached for 1 hour
    return process_data(data)

Parallel Execution

from prefect import flow, task

@task
def process_partition(partition_id: int) -> dict:
    return {"partition": partition_id, "result": partition_id * 2}

@flow
def parallel_flow():
    # Submit tasks in parallel
    futures = [process_partition.submit(i) for i in range(10)]

    # Wait for all results
    results = [f.result() for f in futures]
    return results

Conditional Logic

@flow
def conditional_flow(accuracy: float):
    if accuracy > 0.9:
        deploy_to_production()
    elif accuracy > 0.8:
        deploy_to_staging()
    else:
        notify_team("Model accuracy too low")

Scheduling Deployments

from prefect import flow
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule

@flow
def daily_training():
    # Training logic
    pass

# Create deployment
deployment = Deployment.build_from_flow(
    flow=daily_training,
    name="daily-training",
    schedule=CronSchedule(cron="0 2 * * *"),  # 2 AM daily
    work_queue_name="ml-queue"
)

deployment.apply()

Running with Prefect Cloud/Server

# Start local server
prefect server start

# Create deployment via CLI
prefect deployment build ./pipeline.py:ml_pipeline -n "ml-training" -q "ml-queue"
prefect deployment apply ml_pipeline-deployment.yaml

# Start worker
prefect worker start -q "ml-queue"

Comparison: Modern Orchestrators

Prefect vs Airflow

Aspect Prefect Airflow
Syntax Python decorators Python + config
Dynamic DAGs Native Limited
Local testing flow() Requires setup
Deployment Simple Complex
Maturity Newer Battle-tested

Prefect vs Dagster

Aspect Prefect Dagster
Focus Workflow orchestration Data orchestration
Assets Task outputs First-class assets
Testing Standard pytest Built-in framework
Best for General workflows Data pipelines

When to Use What

Tool Best For
Prefect Python teams, rapid development
Airflow Large teams, data engineering
Kubeflow Kubernetes-native ML
Dagster Data-centric pipelines
DVC Simple ML pipelines, versioning

Other Alternatives

Dagster

from dagster import asset, Definitions

@asset
def raw_data():
    """Load raw data."""
    return pd.read_csv("data.csv")

@asset
def processed_data(raw_data):
    """Process raw data."""
    return raw_data.dropna()

@asset
def trained_model(processed_data):
    """Train model on processed data."""
    model = train(processed_data)
    return model

defs = Definitions(assets=[raw_data, processed_data, trained_model])

Metaflow (Netflix)

from metaflow import FlowSpec, step

class MLFlow(FlowSpec):
    @step
    def start(self):
        self.data = load_data()
        self.next(self.train)

    @step
    def train(self):
        self.model = train_model(self.data)
        self.next(self.evaluate)

    @step
    def evaluate(self):
        self.accuracy = evaluate(self.model)
        self.next(self.end)

    @step
    def end(self):
        print(f"Final accuracy: {self.accuracy}")

if __name__ == "__main__":
    MLFlow()

Best Practices

Practice Why
Start simple Add complexity as needed
Use type hints Better IDE support and validation
Log extensively Debug production issues
Test locally first Catch errors early
Version your flows Track changes over time

Choosing the Right Tool

                    Simple                 Complex
                      │                      │
    DVC ◄─────────────┼──────────────────────┼────► Kubeflow
                      │                      │
                      │     Prefect          │
                      │        │             │
                      │        ▼             │
                      │    Dagster           │
                      │        │             │
                      └────────┼─────────────┘
                            Airflow

Key insight: Start with the simplest tool that meets your needs. You can always migrate to more complex solutions as your requirements grow.

Next module: We'll explore feature stores for training-serving consistency. :::

Quiz

Module 3: ML Workflow Orchestration

Take Quiz