Lesson 19 of 23

RAG Evaluation & Testing

Automated Testing Pipelines

3 min read

Manual evaluation doesn't scale. This lesson covers how to set up automated testing pipelines that run on every change and catch regressions before they reach production.

Pipeline Architecture

┌─────────────────────────────────────────────────────────────────┐
│                    RAG Testing Pipeline                          │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  ┌──────────┐    ┌──────────┐    ┌──────────┐    ┌──────────┐  │
│  │  TRIGGER │───▶│   RUN    │───▶│ EVALUATE │───▶│  REPORT  │  │
│  └──────────┘    └──────────┘    └──────────┘    └──────────┘  │
│       │               │               │               │         │
│       ▼               ▼               ▼               ▼         │
│  • Git push      • Load test set  • RAGAS metrics • Dashboard   │
│  • PR created    • Run RAG        • Thresholds    • Alerts      │
│  • Schedule      • Collect output • Compare       • Artifacts   │
│  • Manual        • Log latency    • to baseline   • Slack/Email │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

Test Framework Setup

import json
import time
from pathlib import Path
from dataclasses import dataclass
from typing import Callable
from ragas import evaluate
from ragas.metrics import faithfulness, answer_relevancy, context_precision

@dataclass
class TestResult:
    """Results from a single test run."""
    question: str
    answer: str
    contexts: list[str]
    latency_ms: float
    metrics: dict

@dataclass
class PipelineResult:
    """Results from a full pipeline run."""
    timestamp: str
    total_questions: int
    avg_latency_ms: float
    metrics: dict
    failures: list[dict]
    passed: bool

class RAGTestPipeline:
    """Automated RAG testing pipeline."""

    def __init__(
        self,
        rag_function: Callable,
        test_set_path: str,
        thresholds: dict = None,
    ):
        self.rag_function = rag_function
        self.test_set = self._load_test_set(test_set_path)
        self.thresholds = thresholds or {
            "faithfulness": 0.8,
            "answer_relevancy": 0.75,
            "context_precision": 0.7,
            "avg_latency_ms": 2000,
        }

    def _load_test_set(self, path: str) -> list[dict]:
        return json.loads(Path(path).read_text())

    def run(self) -> PipelineResult:
        """Execute the test pipeline."""

        results = []
        latencies = []

        # Run RAG on each test question
        for test_case in self.test_set:
            start = time.time()

            output = self.rag_function(test_case["question"])

            latency = (time.time() - start) * 1000
            latencies.append(latency)

            results.append(TestResult(
                question=test_case["question"],
                answer=output["answer"],
                contexts=output["contexts"],
                latency_ms=latency,
                metrics={},
            ))

        # Evaluate with RAGAS
        from datasets import Dataset
        eval_dataset = Dataset.from_dict({
            "question": [r.question for r in results],
            "answer": [r.answer for r in results],
            "contexts": [r.contexts for r in results],
        })

        scores = evaluate(
            eval_dataset,
            metrics=[faithfulness, answer_relevancy, context_precision],
        )

        # Check thresholds
        failures = []
        passed = True

        for metric, threshold in self.thresholds.items():
            if metric == "avg_latency_ms":
                actual = sum(latencies) / len(latencies)
            else:
                actual = scores.get(metric, 0)

            if actual < threshold and metric != "avg_latency_ms":
                failures.append({
                    "metric": metric,
                    "expected": f">= {threshold}",
                    "actual": actual,
                })
                passed = False
            elif actual > threshold and metric == "avg_latency_ms":
                failures.append({
                    "metric": metric,
                    "expected": f"<= {threshold}",
                    "actual": actual,
                })
                passed = False

        return PipelineResult(
            timestamp=datetime.now().isoformat(),
            total_questions=len(results),
            avg_latency_ms=sum(latencies) / len(latencies),
            metrics=dict(scores),
            failures=failures,
            passed=passed,
        )

GitHub Actions Integration

# .github/workflows/rag-tests.yml
name: RAG Evaluation

on:
  push:
    branches: [main]
    paths:
      - 'src/rag/**'
      - 'prompts/**'
  pull_request:
    branches: [main]
  schedule:
    - cron: '0 0 * * *'  # Daily at midnight

jobs:
  evaluate:
    runs-on: ubuntu-latest

    steps:
      - uses: actions/checkout@v4

      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'

      - name: Install dependencies
        run: |
          pip install -r requirements.txt
          pip install ragas pytest

      - name: Run RAG evaluation
        env:
          OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
        run: |
          python -m pytest tests/test_rag_quality.py \
            --json-report \
            --json-report-file=results.json

      - name: Check thresholds
        run: python scripts/check_thresholds.py results.json

      - name: Upload results
        uses: actions/upload-artifact@v4
        with:
          name: rag-evaluation-results
          path: results.json

      - name: Comment on PR
        if: github.event_name == 'pull_request'
        uses: actions/github-script@v7
        with:
          script: |
            const fs = require('fs');
            const results = JSON.parse(fs.readFileSync('results.json'));

            const body = `## RAG Evaluation Results

            | Metric | Score | Threshold | Status |
            |--------|-------|-----------|--------|
            | Faithfulness | ${results.faithfulness.toFixed(2)} | 0.80 | ${results.faithfulness >= 0.8 ? '✅' : '❌'} |
            | Relevancy | ${results.answer_relevancy.toFixed(2)} | 0.75 | ${results.answer_relevancy >= 0.75 ? '✅' : '❌'} |
            | Precision | ${results.context_precision.toFixed(2)} | 0.70 | ${results.context_precision >= 0.7 ? '✅' : '❌'} |
            `;

            github.rest.issues.createComment({
              issue_number: context.issue.number,
              owner: context.repo.owner,
              repo: context.repo.repo,
              body: body
            });

pytest Integration

# tests/test_rag_quality.py
import pytest
from ragas import evaluate
from ragas.metrics import faithfulness, answer_relevancy, context_precision
from datasets import Dataset

@pytest.fixture(scope="module")
def rag_results():
    """Run RAG pipeline on test set once per module."""
    from src.rag import RAGPipeline

    pipeline = RAGPipeline()
    test_questions = load_test_questions("tests/data/test_set.json")

    results = []
    for q in test_questions:
        output = pipeline.query(q["question"])
        results.append({
            "question": q["question"],
            "answer": output["answer"],
            "contexts": output["contexts"],
            "ground_truth": q.get("ground_truth"),
        })

    return results

@pytest.fixture(scope="module")
def ragas_scores(rag_results):
    """Evaluate RAG results with RAGAS."""
    dataset = Dataset.from_dict({
        "question": [r["question"] for r in rag_results],
        "answer": [r["answer"] for r in rag_results],
        "contexts": [r["contexts"] for r in rag_results],
    })

    return evaluate(
        dataset,
        metrics=[faithfulness, answer_relevancy, context_precision],
    )

class TestRAGQuality:
    """RAG quality tests with thresholds."""

    def test_faithfulness_threshold(self, ragas_scores):
        """Answers should be grounded in context."""
        assert ragas_scores["faithfulness"] >= 0.8, \
            f"Faithfulness {ragas_scores['faithfulness']:.2f} below threshold 0.8"

    def test_answer_relevancy_threshold(self, ragas_scores):
        """Answers should address the questions."""
        assert ragas_scores["answer_relevancy"] >= 0.75, \
            f"Relevancy {ragas_scores['answer_relevancy']:.2f} below threshold 0.75"

    def test_context_precision_threshold(self, ragas_scores):
        """Retrieved context should be relevant."""
        assert ragas_scores["context_precision"] >= 0.7, \
            f"Precision {ragas_scores['context_precision']:.2f} below threshold 0.7"

class TestRAGPerformance:
    """RAG performance tests."""

    def test_latency_p95(self, rag_results):
        """95th percentile latency should be under 3 seconds."""
        latencies = [r.get("latency_ms", 0) for r in rag_results]
        p95 = sorted(latencies)[int(len(latencies) * 0.95)]

        assert p95 < 3000, f"P95 latency {p95}ms exceeds 3000ms threshold"

Regression Detection

class RegressionDetector:
    """Detect quality regressions between runs."""

    def __init__(self, history_path: str):
        self.history_path = Path(history_path)
        self.history = self._load_history()

    def _load_history(self) -> list[dict]:
        if self.history_path.exists():
            return json.loads(self.history_path.read_text())
        return []

    def record_run(self, result: PipelineResult):
        """Record a new test run."""
        self.history.append({
            "timestamp": result.timestamp,
            "metrics": result.metrics,
            "avg_latency_ms": result.avg_latency_ms,
        })
        self.history_path.write_text(json.dumps(self.history, indent=2))

    def check_regression(
        self,
        current: dict,
        threshold: float = 0.05
    ) -> list[dict]:
        """Check if current metrics regressed from baseline."""

        if len(self.history) < 5:
            return []  # Not enough history

        # Use last 5 runs as baseline
        baseline = {}
        for metric in current.keys():
            values = [h["metrics"].get(metric, 0) for h in self.history[-5:]]
            baseline[metric] = sum(values) / len(values)

        regressions = []
        for metric, current_value in current.items():
            baseline_value = baseline.get(metric, 0)

            if baseline_value > 0:
                change = (current_value - baseline_value) / baseline_value

                if change < -threshold:  # More than 5% regression
                    regressions.append({
                        "metric": metric,
                        "baseline": baseline_value,
                        "current": current_value,
                        "change_pct": change * 100,
                    })

        return regressions

# Usage in pipeline
detector = RegressionDetector("test_results/history.json")

result = pipeline.run()
detector.record_run(result)

regressions = detector.check_regression(result.metrics)
if regressions:
    print("REGRESSION DETECTED!")
    for r in regressions:
        print(f"  {r['metric']}: {r['baseline']:.2f} -> {r['current']:.2f} ({r['change_pct']:.1f}%)")

Alerting and Monitoring

import requests

class AlertManager:
    """Send alerts for test failures."""

    def __init__(self, slack_webhook: str = None, email_config: dict = None):
        self.slack_webhook = slack_webhook
        self.email_config = email_config

    def alert_failure(self, result: PipelineResult):
        """Send alerts for failed tests."""

        if result.passed:
            return

        message = self._format_message(result)

        if self.slack_webhook:
            self._send_slack(message)

        if self.email_config:
            self._send_email(message)

    def _format_message(self, result: PipelineResult) -> str:
        failures = "\n".join([
            f"• {f['metric']}: {f['actual']:.2f} (expected {f['expected']})"
            for f in result.failures
        ])

        return f"""RAG Quality Alert

Test run failed at {result.timestamp}

Failures:
{failures}

Metrics:
• Faithfulness: {result.metrics.get('faithfulness', 0):.2f}
• Relevancy: {result.metrics.get('answer_relevancy', 0):.2f}
• Precision: {result.metrics.get('context_precision', 0):.2f}
• Avg Latency: {result.avg_latency_ms:.0f}ms
"""

    def _send_slack(self, message: str):
        requests.post(self.slack_webhook, json={"text": message})

# Integration
alert_manager = AlertManager(
    slack_webhook=os.environ.get("SLACK_WEBHOOK"),
)

result = pipeline.run()
alert_manager.alert_failure(result)

Best Practices

Aspect Recommendation
Frequency Run on every PR + daily scheduled
Test Set Size 50-100 questions (balance cost vs coverage)
Thresholds Start conservative, tighten over time
History Keep 30+ days for trend analysis
Alerts Alert on failures AND regressions
Cost Use cheaper models for frequent runs

Key Insight: Automated testing catches issues early, but review failures manually. Low scores might indicate test set problems, not RAG problems. Update your test set when you find false positives.

Next module: Production RAG Systems - optimization, reliability, and monitoring. :::

Quiz

Module 5: RAG Evaluation & Testing

Take Quiz