Lesson 20 of 23

Production & Reliability

Deployment Strategies

3 min read

Deploying AI systems requires careful strategies to minimize risk while maximizing learning. This lesson covers deployment patterns specific to LLM-based applications.

Blue-Green Deployment

Run two identical environments, switch traffic instantly:

┌─────────────────────────────────────────────────────────────┐
│                     Load Balancer                            │
└─────────────────────────────────────────────────────────────┘
              │                           │
              │ (100%)                    │ (0%)
              ▼                           ▼
┌──────────────────────┐     ┌──────────────────────┐
│   Blue Environment   │     │   Green Environment  │
│   (Current v1.2)     │     │   (New v1.3)         │
│                      │     │                      │
│   - Agent v1.2       │     │   - Agent v1.3       │
│   - Prompts v5       │     │   - Prompts v6       │
│   - Model: GPT-4     │     │   - Model: GPT-4     │
└──────────────────────┘     └──────────────────────┘
class BlueGreenDeployer:
    def __init__(self, load_balancer, environments: dict):
        self.lb = load_balancer
        self.environments = environments
        self.active = "blue"

    async def deploy_new_version(self, version: str):
        """Deploy to inactive environment and switch."""
        inactive = "green" if self.active == "blue" else "blue"

        # Step 1: Deploy to inactive environment
        await self.environments[inactive].deploy(version)

        # Step 2: Run health checks
        health = await self.environments[inactive].health_check()
        if not health["healthy"]:
            raise DeploymentError(f"Health check failed: {health}")

        # Step 3: Run smoke tests
        smoke = await self.run_smoke_tests(inactive)
        if not smoke["passed"]:
            raise DeploymentError(f"Smoke tests failed: {smoke}")

        # Step 4: Switch traffic
        await self.lb.set_weights({
            self.active: 0,
            inactive: 100
        })

        self.active = inactive
        return {"status": "success", "active": self.active}

    async def rollback(self):
        """Instant rollback to previous environment."""
        inactive = "green" if self.active == "blue" else "blue"

        await self.lb.set_weights({
            self.active: 0,
            inactive: 100
        })

        self.active = inactive
        return {"status": "rolled_back", "active": self.active}

Canary Deployment

Gradually shift traffic to validate new version:

import asyncio

class CanaryDeployer:
    def __init__(self, load_balancer, metrics_client):
        self.lb = load_balancer
        self.metrics = metrics_client

    async def canary_deploy(
        self,
        new_version: str,
        stages: list = None,
        validation_period: int = 300  # 5 minutes per stage
    ):
        """Gradually roll out with automatic validation."""
        stages = stages or [5, 10, 25, 50, 100]  # Percentage

        for percentage in stages:
            # Shift traffic
            await self.lb.set_weights({
                "stable": 100 - percentage,
                "canary": percentage
            })

            # Wait and collect metrics
            await asyncio.sleep(validation_period)

            # Validate canary health
            validation = await self.validate_canary()

            if not validation["passed"]:
                # Automatic rollback
                await self.lb.set_weights({
                    "stable": 100,
                    "canary": 0
                })
                return {
                    "status": "rolled_back",
                    "failed_at": percentage,
                    "reason": validation["reason"]
                }

        return {"status": "success", "version": new_version}

    async def validate_canary(self) -> dict:
        """Compare canary metrics against stable."""
        stable_metrics = await self.metrics.get("stable", window="5m")
        canary_metrics = await self.metrics.get("canary", window="5m")

        checks = {
            "error_rate": canary_metrics["error_rate"] <= stable_metrics["error_rate"] * 1.1,
            "latency_p95": canary_metrics["latency_p95"] <= stable_metrics["latency_p95"] * 1.2,
            "user_satisfaction": canary_metrics.get("satisfaction", 1.0) >= 0.9
        }

        failed = [k for k, v in checks.items() if not v]

        return {
            "passed": len(failed) == 0,
            "reason": failed if failed else None
        }

Shadow Deployment

Test new version with real traffic, no user impact:

import asyncio
from typing import Any

class ShadowDeployer:
    def __init__(self, production_agent, shadow_agent, comparator):
        self.production = production_agent
        self.shadow = shadow_agent
        self.comparator = comparator
        self.results = []

    async def handle_request(self, request: dict) -> Any:
        """Process request in production, shadow in background."""

        # Production response (returned to user)
        production_task = asyncio.create_task(
            self.production.run(request)
        )

        # Shadow response (not returned to user)
        shadow_task = asyncio.create_task(
            self.shadow.run(request)
        )

        # Wait for production (user-facing)
        production_response = await production_task

        # Don't wait for shadow, but capture when ready
        asyncio.create_task(
            self._compare_results(request, production_response, shadow_task)
        )

        return production_response

    async def _compare_results(
        self,
        request: dict,
        production_response: Any,
        shadow_task: asyncio.Task
    ):
        """Compare production and shadow responses."""
        try:
            shadow_response = await asyncio.wait_for(shadow_task, timeout=60)

            comparison = await self.comparator.compare(
                request=request,
                production=production_response,
                shadow=shadow_response
            )

            self.results.append(comparison)

            # Log significant differences
            if comparison["divergence_score"] > 0.3:
                await self.log_divergence(comparison)

        except asyncio.TimeoutError:
            self.results.append({"error": "shadow_timeout"})

    def get_shadow_report(self) -> dict:
        """Generate report on shadow performance."""
        if not self.results:
            return {"status": "no_data"}

        valid_results = [r for r in self.results if "error" not in r]

        return {
            "total_requests": len(self.results),
            "successful_shadows": len(valid_results),
            "avg_divergence": sum(r["divergence_score"] for r in valid_results) / len(valid_results),
            "shadow_better_count": sum(1 for r in valid_results if r["shadow_preferred"]),
            "ready_for_promotion": self._assess_readiness(valid_results)
        }

Feature Flags for AI

Control AI features dynamically:

class AIFeatureFlags:
    def __init__(self, flag_service):
        self.flags = flag_service

    async def get_config(self, user_id: str, context: dict = None) -> dict:
        """Get AI configuration based on feature flags."""
        config = {
            "model": "gpt-3.5-turbo",  # Default
            "max_tokens": 1000,
            "tools_enabled": [],
            "features": {}
        }

        # Check model upgrade flag
        if await self.flags.is_enabled("gpt4_rollout", user_id):
            config["model"] = "gpt-4"
            config["max_tokens"] = 4000

        # Check new tool flag
        if await self.flags.is_enabled("code_execution_tool", user_id):
            config["tools_enabled"].append("code_executor")

        # Check experimental features
        if await self.flags.is_enabled("streaming_responses", user_id):
            config["features"]["streaming"] = True

        # Percentage-based rollout
        if await self.flags.percentage_enabled("new_prompt_v2", user_id, 25):
            config["prompt_version"] = "v2"
        else:
            config["prompt_version"] = "v1"

        return config

# Usage
flags = AIFeatureFlags(flag_service)

async def handle_request(user_id: str, request: dict):
    config = await flags.get_config(user_id)

    agent = create_agent(
        model=config["model"],
        tools=config["tools_enabled"],
        prompt_version=config["prompt_version"]
    )

    return await agent.run(request)

Deployment Checklist

deployment_checklist = {
    "pre_deployment": [
        "Run full test suite",
        "Verify prompt regression tests pass",
        "Check model version compatibility",
        "Review cost impact estimates",
        "Prepare rollback plan"
    ],
    "during_deployment": [
        "Monitor error rates (< 1% threshold)",
        "Watch latency P95 (< 2x baseline)",
        "Check token usage (within budget)",
        "Verify safety filters active",
        "Test critical user flows"
    ],
    "post_deployment": [
        "Review user feedback",
        "Analyze cost per request",
        "Check for prompt leakage",
        "Validate logging working",
        "Update runbooks if needed"
    ]
}

Interview Tip

When discussing deployment:

  1. Risk assessment - What could go wrong with this deployment?
  2. Rollback time - How fast can you revert?
  3. Validation metrics - How do you know the deployment succeeded?
  4. Cost monitoring - New version might use more tokens

With production knowledge complete, let's apply everything to real interview case studies. :::

Quiz

Module 5: Production & Reliability

Take Quiz