ML Workflow Orchestration

Prefect & Alternatives

3 min read

Prefect offers a modern, Python-native approach to workflow orchestration. Let's explore it and compare with other alternatives.

Why Prefect?

FeatureBenefit
Python-nativeJust decorators, no config files
Dynamic workflowsBuild DAGs at runtime
Modern UIBeautiful dashboard
Hybrid executionLocal or cloud
Easy debuggingRun flows as regular Python

Installation

# Install Prefect
pip install prefect

# Verify installation
prefect version

Core Concepts

Flows and Tasks

from prefect import flow, task

@task
def load_data(path: str) -> dict:
    """Load data from path."""
    import pandas as pd
    df = pd.read_csv(path)
    return {"data": df, "rows": len(df)}

@task
def train_model(data: dict) -> str:
    """Train ML model."""
    from sklearn.ensemble import RandomForestClassifier
    import joblib

    df = data["data"]
    X = df.drop("target", axis=1)
    y = df["target"]

    model = RandomForestClassifier()
    model.fit(X, y)

    model_path = "/tmp/model.pkl"
    joblib.dump(model, model_path)
    return model_path

@task
def evaluate_model(model_path: str, data: dict) -> float:
    """Evaluate model accuracy."""
    import joblib
    from sklearn.metrics import accuracy_score

    model = joblib.load(model_path)
    df = data["data"]
    X = df.drop("target", axis=1)
    y = df["target"]

    predictions = model.predict(X)
    return accuracy_score(y, predictions)

@flow(name="ML Training Pipeline")
def ml_pipeline(data_path: str):
    """End-to-end ML training flow."""
    data = load_data(data_path)
    model_path = train_model(data)
    accuracy = evaluate_model(model_path, data)

    print(f"Model accuracy: {accuracy:.2%}")
    return accuracy

# Run the flow
if __name__ == "__main__":
    ml_pipeline("data/train.csv")

Advanced Features

Retries and Caching

from prefect import task, flow
from prefect.tasks import task_input_hash
from datetime import timedelta

@task(
    retries=3,
    retry_delay_seconds=60,
    cache_key_fn=task_input_hash,
    cache_expiration=timedelta(hours=1)
)
def expensive_computation(data: dict) -> dict:
    """Cached computation with retries."""
    # This result will be cached for 1 hour
    return process_data(data)

Parallel Execution

from prefect import flow, task

@task
def process_partition(partition_id: int) -> dict:
    return {"partition": partition_id, "result": partition_id * 2}

@flow
def parallel_flow():
    # Submit tasks in parallel
    futures = [process_partition.submit(i) for i in range(10)]

    # Wait for all results
    results = [f.result() for f in futures]
    return results

Conditional Logic

@flow
def conditional_flow(accuracy: float):
    if accuracy > 0.9:
        deploy_to_production()
    elif accuracy > 0.8:
        deploy_to_staging()
    else:
        notify_team("Model accuracy too low")

Scheduling Deployments

from prefect import flow
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule

@flow
def daily_training():
    # Training logic
    pass

# Create deployment
deployment = Deployment.build_from_flow(
    flow=daily_training,
    name="daily-training",
    schedule=CronSchedule(cron="0 2 * * *"),  # 2 AM daily
    work_queue_name="ml-queue"
)

deployment.apply()

Running with Prefect Cloud/Server

# Start local server
prefect server start

# Create deployment via CLI
prefect deployment build ./pipeline.py:ml_pipeline -n "ml-training" -q "ml-queue"
prefect deployment apply ml_pipeline-deployment.yaml

# Start worker
prefect worker start -q "ml-queue"

Comparison: Modern Orchestrators

Prefect vs Airflow

AspectPrefectAirflow
SyntaxPython decoratorsPython + config
Dynamic DAGsNativeLimited
Local testingflow()Requires setup
DeploymentSimpleComplex
MaturityNewerBattle-tested

Prefect vs Dagster

AspectPrefectDagster
FocusWorkflow orchestrationData orchestration
AssetsTask outputsFirst-class assets
TestingStandard pytestBuilt-in framework
Best forGeneral workflowsData pipelines

When to Use What

ToolBest For
PrefectPython teams, rapid development
AirflowLarge teams, data engineering
KubeflowKubernetes-native ML
DagsterData-centric pipelines
DVCSimple ML pipelines, versioning

Other Alternatives

Dagster

from dagster import asset, Definitions

@asset
def raw_data():
    """Load raw data."""
    return pd.read_csv("data.csv")

@asset
def processed_data(raw_data):
    """Process raw data."""
    return raw_data.dropna()

@asset
def trained_model(processed_data):
    """Train model on processed data."""
    model = train(processed_data)
    return model

defs = Definitions(assets=[raw_data, processed_data, trained_model])

Metaflow (Netflix)

from metaflow import FlowSpec, step

class MLFlow(FlowSpec):
    @step
    def start(self):
        self.data = load_data()
        self.next(self.train)

    @step
    def train(self):
        self.model = train_model(self.data)
        self.next(self.evaluate)

    @step
    def evaluate(self):
        self.accuracy = evaluate(self.model)
        self.next(self.end)

    @step
    def end(self):
        print(f"Final accuracy: {self.accuracy}")

if __name__ == "__main__":
    MLFlow()

Best Practices

PracticeWhy
Start simpleAdd complexity as needed
Use type hintsBetter IDE support and validation
Log extensivelyDebug production issues
Test locally firstCatch errors early
Version your flowsTrack changes over time

Choosing the Right Tool

                    Simple                 Complex
                      │                      │
    DVC ◄─────────────┼──────────────────────┼────► Kubeflow
                      │                      │
                      │     Prefect          │
                      │        │             │
                      │        ▼             │
                      │    Dagster           │
                      │        │             │
                      └────────┼─────────────┘
                            Airflow

Key insight: Start with the simplest tool that meets your needs. You can always migrate to more complex solutions as your requirements grow.

Next module: We'll explore feature stores for training-serving consistency. :::

Quick check: how does this lesson land for you?

Quiz

Module 3: ML Workflow Orchestration

Take Quiz
FREE WEEKLY NEWSLETTER

Stay on the Nerd Track

One email per week — courses, deep dives, tools, and AI experiments.

No spam. Unsubscribe anytime.