LLM Gateway & Router
Calling OpenAI directly from your application code is like making raw HTTP calls to a database. It works until it does not. An LLM gateway gives you multi-provider routing, automatic failover, rate limiting, cost tracking, and response caching — all behind a single interface.
Why You Need an LLM Gateway
Without a gateway, every service in your application makes direct calls to LLM providers. This creates several production problems:
- Single point of failure: When OpenAI goes down (and it does, regularly), your entire application stops working
- No cost visibility: You cannot tell which feature or team is consuming the most tokens
- No rate control: A traffic spike can exhaust your quota and cause 429 errors across all services
- Vendor lock-in: Switching providers requires changing code in every service
Gateway Architecture
Here is the complete LLM gateway implementation. Every LLM call in your application goes through this gateway.
import time
import hashlib
import asyncio
from dataclasses import dataclass, field
from typing import Optional
from collections import defaultdict
from enum import Enum
class Provider(Enum):
OPENAI = "openai"
ANTHROPIC = "anthropic"
LOCAL = "local"
@dataclass
class ModelConfig:
"""Configuration for a specific model."""
provider: Provider
model_id: str
cost_per_input_token: float # USD per token
cost_per_output_token: float # USD per token
max_tokens: int
requests_per_minute: int
priority: int = 0 # lower = higher priority
@dataclass
class GatewayResponse:
"""Standardized response from any provider."""
content: str
model: str
provider: str
input_tokens: int
output_tokens: int
cost_usd: float
latency_ms: float
cached: bool = False
# Model registry with pricing (as of 2026)
MODEL_REGISTRY = {
"gpt-4o": ModelConfig(
provider=Provider.OPENAI, model_id="gpt-4o",
cost_per_input_token=2.50 / 1_000_000,
cost_per_output_token=10.00 / 1_000_000,
max_tokens=128_000, requests_per_minute=500
),
"gpt-4o-mini": ModelConfig(
provider=Provider.OPENAI, model_id="gpt-4o-mini",
cost_per_input_token=0.15 / 1_000_000,
cost_per_output_token=0.60 / 1_000_000,
max_tokens=128_000, requests_per_minute=1000
),
"claude-sonnet": ModelConfig(
provider=Provider.ANTHROPIC, model_id="claude-sonnet-4-20250514",
cost_per_input_token=3.00 / 1_000_000,
cost_per_output_token=15.00 / 1_000_000,
max_tokens=200_000, requests_per_minute=400
),
"claude-haiku": ModelConfig(
provider=Provider.ANTHROPIC, model_id="claude-haiku-4-20250514",
cost_per_input_token=0.25 / 1_000_000,
cost_per_output_token=1.25 / 1_000_000,
max_tokens=200_000, requests_per_minute=1000
),
}
Rate Limiter
The rate limiter prevents you from exceeding provider quotas and protects against cost spikes from traffic bursts.
class TokenBucketRateLimiter:
"""Token bucket rate limiter for LLM API calls."""
def __init__(self, requests_per_minute: int):
self.rate = requests_per_minute / 60.0 # requests per second
self.max_tokens = requests_per_minute
self.tokens = float(requests_per_minute)
self.last_refill = time.monotonic()
def acquire(self) -> bool:
"""Try to acquire a token. Returns True if allowed."""
now = time.monotonic()
elapsed = now - self.last_refill
self.tokens = min(self.max_tokens, self.tokens + elapsed * self.rate)
self.last_refill = now
if self.tokens >= 1:
self.tokens -= 1
return True
return False
def wait_time(self) -> float:
"""Seconds until a token is available."""
if self.tokens >= 1:
return 0
return (1 - self.tokens) / self.rate
Semantic Cache
The semantic cache stores LLM responses and returns cached results for semantically similar queries. This typically saves 40-60% of LLM costs in production.
class SemanticCache:
"""Cache LLM responses by semantic similarity of the prompt."""
def __init__(self, similarity_threshold: float = 0.95, ttl_seconds: int = 3600):
self.threshold = similarity_threshold
self.ttl = ttl_seconds
self.entries: list[dict] = []
def _get_embedding(self, text: str) -> list[float]:
"""Get embedding for cache key (use a cheap, fast model)."""
from openai import OpenAI
client = OpenAI()
response = client.embeddings.create(
input=text, model="text-embedding-3-small"
)
return response.data[0].embedding
def _cosine_similarity(self, a: list[float], b: list[float]) -> float:
dot = sum(x * y for x, y in zip(a, b))
norm_a = sum(x * x for x in a) ** 0.5
norm_b = sum(x * x for x in b) ** 0.5
return dot / (norm_a * norm_b) if norm_a and norm_b else 0
def get(self, prompt: str, model: str) -> Optional[GatewayResponse]:
"""Check cache for a semantically similar prompt."""
now = time.time()
prompt_embedding = self._get_embedding(prompt)
for entry in self.entries:
# Check TTL
if now - entry["timestamp"] > self.ttl:
continue
# Check model match
if entry["model"] != model:
continue
# Check semantic similarity
similarity = self._cosine_similarity(prompt_embedding, entry["embedding"])
if similarity >= self.threshold:
response = entry["response"]
response.cached = True
return response
return None
def put(self, prompt: str, model: str, response: GatewayResponse):
"""Store a response in the cache."""
embedding = self._get_embedding(prompt)
self.entries.append({
"embedding": embedding,
"model": model,
"response": response,
"timestamp": time.time()
})
def evict_expired(self):
"""Remove expired cache entries."""
now = time.time()
self.entries = [e for e in self.entries if now - e["timestamp"] <= self.ttl]
The Complete LLM Gateway
Here is the full gateway that ties together routing, rate limiting, fallbacks, cost tracking, and caching:
class LLMGateway:
"""Production LLM Gateway with routing, fallbacks, caching, and cost tracking."""
def __init__(self, enable_cache: bool = True):
self.rate_limiters: dict[str, TokenBucketRateLimiter] = {}
self.cache = SemanticCache() if enable_cache else None
self.cost_tracker: dict[str, float] = defaultdict(float) # model -> total cost
self.request_log: list[dict] = []
self._init_rate_limiters()
def _init_rate_limiters(self):
for model_name, config in MODEL_REGISTRY.items():
self.rate_limiters[model_name] = TokenBucketRateLimiter(
config.requests_per_minute
)
def _call_openai(self, model_id: str, messages: list[dict],
temperature: float, max_tokens: int) -> dict:
from openai import OpenAI
client = OpenAI()
response = client.chat.completions.create(
model=model_id, messages=messages,
temperature=temperature, max_tokens=max_tokens
)
return {
"content": response.choices[0].message.content,
"input_tokens": response.usage.prompt_tokens,
"output_tokens": response.usage.completion_tokens
}
def _call_anthropic(self, model_id: str, messages: list[dict],
temperature: float, max_tokens: int) -> dict:
import anthropic
client = anthropic.Anthropic()
# Extract system message if present
system = ""
user_messages = []
for msg in messages:
if msg["role"] == "system":
system = msg["content"]
else:
user_messages.append(msg)
response = client.messages.create(
model=model_id, system=system,
messages=user_messages,
temperature=temperature, max_tokens=max_tokens
)
return {
"content": response.content[0].text,
"input_tokens": response.usage.input_tokens,
"output_tokens": response.usage.output_tokens
}
def complete(self, messages: list[dict], model: str = "gpt-4o",
temperature: float = 0.7, max_tokens: int = 1024,
fallback_models: list[str] = None,
use_cache: bool = True,
metadata: dict = None) -> GatewayResponse:
"""Send a completion request through the gateway.
Args:
messages: Chat messages in OpenAI format
model: Primary model to use
temperature: Sampling temperature
max_tokens: Max output tokens
fallback_models: Models to try if primary fails
use_cache: Whether to check the semantic cache
metadata: Additional tracking metadata (team, feature, user_id)
"""
# Build the full prompt for caching
prompt_key = json.dumps(messages)
# Check cache first
if use_cache and self.cache:
cached = self.cache.get(prompt_key, model)
if cached:
self._log_request(model, cached, metadata, from_cache=True)
return cached
# Try primary model, then fallbacks
models_to_try = [model] + (fallback_models or [])
for model_name in models_to_try:
config = MODEL_REGISTRY.get(model_name)
if not config:
continue
# Check rate limit
limiter = self.rate_limiters.get(model_name)
if limiter and not limiter.acquire():
continue # Try next model
try:
start = time.monotonic()
if config.provider == Provider.OPENAI:
result = self._call_openai(
config.model_id, messages, temperature, max_tokens
)
elif config.provider == Provider.ANTHROPIC:
result = self._call_anthropic(
config.model_id, messages, temperature, max_tokens
)
else:
continue
latency = (time.monotonic() - start) * 1000
cost = (
result["input_tokens"] * config.cost_per_input_token +
result["output_tokens"] * config.cost_per_output_token
)
response = GatewayResponse(
content=result["content"],
model=model_name,
provider=config.provider.value,
input_tokens=result["input_tokens"],
output_tokens=result["output_tokens"],
cost_usd=cost,
latency_ms=latency
)
# Cache the response
if use_cache and self.cache and temperature == 0:
self.cache.put(prompt_key, model_name, response)
# Track costs
self._log_request(model_name, response, metadata)
return response
except Exception as e:
print(f"[Gateway] {model_name} failed: {e}")
continue
raise RuntimeError(f"All models failed: {models_to_try}")
def _log_request(self, model: str, response: GatewayResponse,
metadata: dict = None, from_cache: bool = False):
"""Log request for cost tracking and analytics."""
self.cost_tracker[model] += response.cost_usd
self.request_log.append({
"timestamp": time.time(),
"model": model,
"input_tokens": response.input_tokens,
"output_tokens": response.output_tokens,
"cost_usd": response.cost_usd,
"latency_ms": response.latency_ms,
"cached": from_cache,
**(metadata or {})
})
def get_cost_report(self, hours: int = 24) -> dict:
"""Get cost breakdown for the last N hours."""
cutoff = time.time() - (hours * 3600)
recent = [r for r in self.request_log if r["timestamp"] > cutoff]
by_model = defaultdict(lambda: {"requests": 0, "cost": 0, "tokens": 0})
by_feature = defaultdict(lambda: {"requests": 0, "cost": 0})
for r in recent:
by_model[r["model"]]["requests"] += 1
by_model[r["model"]]["cost"] += r["cost_usd"]
by_model[r["model"]]["tokens"] += r["input_tokens"] + r["output_tokens"]
feature = r.get("feature", "unknown")
by_feature[feature]["requests"] += 1
by_feature[feature]["cost"] += r["cost_usd"]
total_cost = sum(r["cost_usd"] for r in recent)
cache_hits = sum(1 for r in recent if r.get("cached"))
return {
"total_cost_usd": round(total_cost, 4),
"total_requests": len(recent),
"cache_hit_rate": round(cache_hits / max(len(recent), 1), 2),
"cost_saved_by_cache": round(
sum(r["cost_usd"] for r in recent if r.get("cached")), 4
),
"by_model": dict(by_model),
"by_feature": dict(by_feature)
}
# Usage
gateway = LLMGateway(enable_cache=True)
response = gateway.complete(
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Explain microservices vs monolith in 3 sentences."}
],
model="gpt-4o",
fallback_models=["claude-sonnet", "gpt-4o-mini"],
temperature=0, # deterministic = cacheable
metadata={"team": "backend", "feature": "docs_search"}
)
print(f"Response: {response.content}")
print(f"Cost: ${response.cost_usd:.4f}")
print(f"Cached: {response.cached}")
print(f"Provider: {response.provider}")
# Get cost report
report = gateway.get_cost_report(hours=24)
print(f"\nLast 24h: ${report['total_cost_usd']} across {report['total_requests']} requests")
print(f"Cache hit rate: {report['cache_hit_rate']:.0%}")
Key Takeaways
- An LLM gateway is the single point through which all LLM calls flow. It gives you routing, fallbacks, rate limiting, caching, and cost tracking.
- Always configure fallback models. OpenAI and Anthropic both have regular outages — your app should survive them transparently.
- Semantic caching saves 40-60% of costs in production. Only cache deterministic responses (temperature=0).
- Per-request cost tracking is essential. Tag every request with team, feature, and user to understand where your budget goes.
- Token bucket rate limiting prevents quota exhaustion and cost spikes from traffic bursts.
What Is Next
In the next lesson, we will build the guardrails and safety layer — the component that protects your application from prompt injection, PII leakage, harmful content, and invalid outputs.
Lilly Tech Systems