LLM Application Architecture
LLM API Design Patterns
4 min read
How you design your LLM APIs significantly impacts user experience, cost, and scalability. This lesson covers the essential patterns every AI engineer should know.
Synchronous vs. Asynchronous
Synchronous Pattern
# Simple but blocks the client
@app.post("/chat")
async def chat(request: ChatRequest):
response = await llm.complete(request.message) # Blocks for 1-5s
return {"response": response}
Use when:
- Response time < 3 seconds
- Simple, single-turn interactions
- Low traffic applications
Asynchronous Pattern
# Non-blocking with job polling
@app.post("/chat")
async def chat(request: ChatRequest):
job_id = await queue.enqueue(process_chat, request)
return {"job_id": job_id, "status": "processing"}
@app.get("/chat/{job_id}")
async def get_result(job_id: str):
result = await queue.get_result(job_id)
if result is None:
return {"status": "processing"}
return {"status": "complete", "response": result}
Use when:
- Response time > 5 seconds
- Complex multi-step workflows
- Need retry/recovery capability
Streaming Pattern
The most important pattern for modern AI applications.
from fastapi.responses import StreamingResponse
@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
async def generate():
async for chunk in llm.stream(request.message):
yield f"data: {json.dumps({'chunk': chunk})}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream"
)
Client-side handling:
// JavaScript EventSource example
const eventSource = new EventSource('/chat/stream');
eventSource.onmessage = (event) => {
if (event.data === '[DONE]') {
eventSource.close();
return;
}
const data = JSON.parse(event.data);
appendToUI(data.chunk);
};
Why streaming matters:
- Time to first token: 200ms vs 3000ms for complete response
- Users see progress immediately
- Better perceived performance
Request Batching
Group multiple requests to reduce overhead.
class BatchProcessor:
def __init__(self, max_batch_size=10, max_wait_ms=100):
self.batch = []
self.max_size = max_batch_size
self.max_wait = max_wait_ms
async def process(self, request):
self.batch.append(request)
if len(self.batch) >= self.max_size:
return await self._flush()
await asyncio.sleep(self.max_wait / 1000)
if self.batch:
return await self._flush()
async def _flush(self):
batch = self.batch
self.batch = []
# Process all requests in one LLM call
combined = self._combine_requests(batch)
response = await llm.complete(combined)
return self._split_response(response, len(batch))
Trade-off:
| Approach | Latency | Throughput | Cost |
|---|---|---|---|
| Individual | Low | Low | High |
| Batched | Medium | High | Low |
API Versioning
Always version your AI APIs—models and prompts evolve.
# Version in URL
@app.post("/v1/chat")
async def chat_v1(request: ChatRequest):
return await process_with_gpt35(request)
@app.post("/v2/chat")
async def chat_v2(request: ChatRequest):
return await process_with_gpt4(request)
# Or version in header
@app.post("/chat")
async def chat(
request: ChatRequest,
api_version: str = Header(default="2024-01")
):
if api_version == "2024-01":
return await process_v1(request)
return await process_v2(request)
Error Response Design
AI-specific errors need clear communication:
class AIErrorResponse(BaseModel):
error_code: str
message: str
retry_after: Optional[int] = None
fallback_available: bool = False
# Example errors
ERRORS = {
"RATE_LIMITED": AIErrorResponse(
error_code="RATE_LIMITED",
message="Too many requests. Please retry.",
retry_after=30
),
"CONTEXT_TOO_LONG": AIErrorResponse(
error_code="CONTEXT_TOO_LONG",
message="Input exceeds maximum token limit.",
fallback_available=True # Can try with shorter context
),
"CONTENT_FILTERED": AIErrorResponse(
error_code="CONTENT_FILTERED",
message="Request was filtered for safety.",
fallback_available=False
)
}
Next, we'll explore prompt management systems for production. :::