Intermediate

LLM Gateway & Router

Calling OpenAI directly from your application code is like making raw HTTP calls to a database. It works until it does not. An LLM gateway gives you multi-provider routing, automatic failover, rate limiting, cost tracking, and response caching — all behind a single interface.

Why You Need an LLM Gateway

Without a gateway, every service in your application makes direct calls to LLM providers. This creates several production problems:

  • Single point of failure: When OpenAI goes down (and it does, regularly), your entire application stops working
  • No cost visibility: You cannot tell which feature or team is consuming the most tokens
  • No rate control: A traffic spike can exhaust your quota and cause 429 errors across all services
  • Vendor lock-in: Switching providers requires changing code in every service

Gateway Architecture

Here is the complete LLM gateway implementation. Every LLM call in your application goes through this gateway.

import time
import hashlib
import asyncio
from dataclasses import dataclass, field
from typing import Optional
from collections import defaultdict
from enum import Enum


class Provider(Enum):
    OPENAI = "openai"
    ANTHROPIC = "anthropic"
    LOCAL = "local"


@dataclass
class ModelConfig:
    """Configuration for a specific model."""
    provider: Provider
    model_id: str
    cost_per_input_token: float    # USD per token
    cost_per_output_token: float   # USD per token
    max_tokens: int
    requests_per_minute: int
    priority: int = 0              # lower = higher priority


@dataclass
class GatewayResponse:
    """Standardized response from any provider."""
    content: str
    model: str
    provider: str
    input_tokens: int
    output_tokens: int
    cost_usd: float
    latency_ms: float
    cached: bool = False


# Model registry with pricing (as of 2026)
MODEL_REGISTRY = {
    "gpt-4o": ModelConfig(
        provider=Provider.OPENAI, model_id="gpt-4o",
        cost_per_input_token=2.50 / 1_000_000,
        cost_per_output_token=10.00 / 1_000_000,
        max_tokens=128_000, requests_per_minute=500
    ),
    "gpt-4o-mini": ModelConfig(
        provider=Provider.OPENAI, model_id="gpt-4o-mini",
        cost_per_input_token=0.15 / 1_000_000,
        cost_per_output_token=0.60 / 1_000_000,
        max_tokens=128_000, requests_per_minute=1000
    ),
    "claude-sonnet": ModelConfig(
        provider=Provider.ANTHROPIC, model_id="claude-sonnet-4-20250514",
        cost_per_input_token=3.00 / 1_000_000,
        cost_per_output_token=15.00 / 1_000_000,
        max_tokens=200_000, requests_per_minute=400
    ),
    "claude-haiku": ModelConfig(
        provider=Provider.ANTHROPIC, model_id="claude-haiku-4-20250514",
        cost_per_input_token=0.25 / 1_000_000,
        cost_per_output_token=1.25 / 1_000_000,
        max_tokens=200_000, requests_per_minute=1000
    ),
}

Rate Limiter

The rate limiter prevents you from exceeding provider quotas and protects against cost spikes from traffic bursts.

class TokenBucketRateLimiter:
    """Token bucket rate limiter for LLM API calls."""

    def __init__(self, requests_per_minute: int):
        self.rate = requests_per_minute / 60.0  # requests per second
        self.max_tokens = requests_per_minute
        self.tokens = float(requests_per_minute)
        self.last_refill = time.monotonic()

    def acquire(self) -> bool:
        """Try to acquire a token. Returns True if allowed."""
        now = time.monotonic()
        elapsed = now - self.last_refill
        self.tokens = min(self.max_tokens, self.tokens + elapsed * self.rate)
        self.last_refill = now

        if self.tokens >= 1:
            self.tokens -= 1
            return True
        return False

    def wait_time(self) -> float:
        """Seconds until a token is available."""
        if self.tokens >= 1:
            return 0
        return (1 - self.tokens) / self.rate

Semantic Cache

The semantic cache stores LLM responses and returns cached results for semantically similar queries. This typically saves 40-60% of LLM costs in production.

class SemanticCache:
    """Cache LLM responses by semantic similarity of the prompt."""

    def __init__(self, similarity_threshold: float = 0.95, ttl_seconds: int = 3600):
        self.threshold = similarity_threshold
        self.ttl = ttl_seconds
        self.entries: list[dict] = []

    def _get_embedding(self, text: str) -> list[float]:
        """Get embedding for cache key (use a cheap, fast model)."""
        from openai import OpenAI
        client = OpenAI()
        response = client.embeddings.create(
            input=text, model="text-embedding-3-small"
        )
        return response.data[0].embedding

    def _cosine_similarity(self, a: list[float], b: list[float]) -> float:
        dot = sum(x * y for x, y in zip(a, b))
        norm_a = sum(x * x for x in a) ** 0.5
        norm_b = sum(x * x for x in b) ** 0.5
        return dot / (norm_a * norm_b) if norm_a and norm_b else 0

    def get(self, prompt: str, model: str) -> Optional[GatewayResponse]:
        """Check cache for a semantically similar prompt."""
        now = time.time()
        prompt_embedding = self._get_embedding(prompt)

        for entry in self.entries:
            # Check TTL
            if now - entry["timestamp"] > self.ttl:
                continue
            # Check model match
            if entry["model"] != model:
                continue
            # Check semantic similarity
            similarity = self._cosine_similarity(prompt_embedding, entry["embedding"])
            if similarity >= self.threshold:
                response = entry["response"]
                response.cached = True
                return response

        return None

    def put(self, prompt: str, model: str, response: GatewayResponse):
        """Store a response in the cache."""
        embedding = self._get_embedding(prompt)
        self.entries.append({
            "embedding": embedding,
            "model": model,
            "response": response,
            "timestamp": time.time()
        })

    def evict_expired(self):
        """Remove expired cache entries."""
        now = time.time()
        self.entries = [e for e in self.entries if now - e["timestamp"] <= self.ttl]

The Complete LLM Gateway

Here is the full gateway that ties together routing, rate limiting, fallbacks, cost tracking, and caching:

class LLMGateway:
    """Production LLM Gateway with routing, fallbacks, caching, and cost tracking."""

    def __init__(self, enable_cache: bool = True):
        self.rate_limiters: dict[str, TokenBucketRateLimiter] = {}
        self.cache = SemanticCache() if enable_cache else None
        self.cost_tracker: dict[str, float] = defaultdict(float)  # model -> total cost
        self.request_log: list[dict] = []
        self._init_rate_limiters()

    def _init_rate_limiters(self):
        for model_name, config in MODEL_REGISTRY.items():
            self.rate_limiters[model_name] = TokenBucketRateLimiter(
                config.requests_per_minute
            )

    def _call_openai(self, model_id: str, messages: list[dict],
                     temperature: float, max_tokens: int) -> dict:
        from openai import OpenAI
        client = OpenAI()
        response = client.chat.completions.create(
            model=model_id, messages=messages,
            temperature=temperature, max_tokens=max_tokens
        )
        return {
            "content": response.choices[0].message.content,
            "input_tokens": response.usage.prompt_tokens,
            "output_tokens": response.usage.completion_tokens
        }

    def _call_anthropic(self, model_id: str, messages: list[dict],
                        temperature: float, max_tokens: int) -> dict:
        import anthropic
        client = anthropic.Anthropic()

        # Extract system message if present
        system = ""
        user_messages = []
        for msg in messages:
            if msg["role"] == "system":
                system = msg["content"]
            else:
                user_messages.append(msg)

        response = client.messages.create(
            model=model_id, system=system,
            messages=user_messages,
            temperature=temperature, max_tokens=max_tokens
        )
        return {
            "content": response.content[0].text,
            "input_tokens": response.usage.input_tokens,
            "output_tokens": response.usage.output_tokens
        }

    def complete(self, messages: list[dict], model: str = "gpt-4o",
                 temperature: float = 0.7, max_tokens: int = 1024,
                 fallback_models: list[str] = None,
                 use_cache: bool = True,
                 metadata: dict = None) -> GatewayResponse:
        """Send a completion request through the gateway.

        Args:
            messages: Chat messages in OpenAI format
            model: Primary model to use
            temperature: Sampling temperature
            max_tokens: Max output tokens
            fallback_models: Models to try if primary fails
            use_cache: Whether to check the semantic cache
            metadata: Additional tracking metadata (team, feature, user_id)
        """
        # Build the full prompt for caching
        prompt_key = json.dumps(messages)

        # Check cache first
        if use_cache and self.cache:
            cached = self.cache.get(prompt_key, model)
            if cached:
                self._log_request(model, cached, metadata, from_cache=True)
                return cached

        # Try primary model, then fallbacks
        models_to_try = [model] + (fallback_models or [])

        for model_name in models_to_try:
            config = MODEL_REGISTRY.get(model_name)
            if not config:
                continue

            # Check rate limit
            limiter = self.rate_limiters.get(model_name)
            if limiter and not limiter.acquire():
                continue  # Try next model

            try:
                start = time.monotonic()

                if config.provider == Provider.OPENAI:
                    result = self._call_openai(
                        config.model_id, messages, temperature, max_tokens
                    )
                elif config.provider == Provider.ANTHROPIC:
                    result = self._call_anthropic(
                        config.model_id, messages, temperature, max_tokens
                    )
                else:
                    continue

                latency = (time.monotonic() - start) * 1000
                cost = (
                    result["input_tokens"] * config.cost_per_input_token +
                    result["output_tokens"] * config.cost_per_output_token
                )

                response = GatewayResponse(
                    content=result["content"],
                    model=model_name,
                    provider=config.provider.value,
                    input_tokens=result["input_tokens"],
                    output_tokens=result["output_tokens"],
                    cost_usd=cost,
                    latency_ms=latency
                )

                # Cache the response
                if use_cache and self.cache and temperature == 0:
                    self.cache.put(prompt_key, model_name, response)

                # Track costs
                self._log_request(model_name, response, metadata)
                return response

            except Exception as e:
                print(f"[Gateway] {model_name} failed: {e}")
                continue

        raise RuntimeError(f"All models failed: {models_to_try}")

    def _log_request(self, model: str, response: GatewayResponse,
                     metadata: dict = None, from_cache: bool = False):
        """Log request for cost tracking and analytics."""
        self.cost_tracker[model] += response.cost_usd
        self.request_log.append({
            "timestamp": time.time(),
            "model": model,
            "input_tokens": response.input_tokens,
            "output_tokens": response.output_tokens,
            "cost_usd": response.cost_usd,
            "latency_ms": response.latency_ms,
            "cached": from_cache,
            **(metadata or {})
        })

    def get_cost_report(self, hours: int = 24) -> dict:
        """Get cost breakdown for the last N hours."""
        cutoff = time.time() - (hours * 3600)
        recent = [r for r in self.request_log if r["timestamp"] > cutoff]

        by_model = defaultdict(lambda: {"requests": 0, "cost": 0, "tokens": 0})
        by_feature = defaultdict(lambda: {"requests": 0, "cost": 0})

        for r in recent:
            by_model[r["model"]]["requests"] += 1
            by_model[r["model"]]["cost"] += r["cost_usd"]
            by_model[r["model"]]["tokens"] += r["input_tokens"] + r["output_tokens"]

            feature = r.get("feature", "unknown")
            by_feature[feature]["requests"] += 1
            by_feature[feature]["cost"] += r["cost_usd"]

        total_cost = sum(r["cost_usd"] for r in recent)
        cache_hits = sum(1 for r in recent if r.get("cached"))

        return {
            "total_cost_usd": round(total_cost, 4),
            "total_requests": len(recent),
            "cache_hit_rate": round(cache_hits / max(len(recent), 1), 2),
            "cost_saved_by_cache": round(
                sum(r["cost_usd"] for r in recent if r.get("cached")), 4
            ),
            "by_model": dict(by_model),
            "by_feature": dict(by_feature)
        }


# Usage
gateway = LLMGateway(enable_cache=True)

response = gateway.complete(
    messages=[
        {"role": "system", "content": "You are a helpful assistant."},
        {"role": "user", "content": "Explain microservices vs monolith in 3 sentences."}
    ],
    model="gpt-4o",
    fallback_models=["claude-sonnet", "gpt-4o-mini"],
    temperature=0,  # deterministic = cacheable
    metadata={"team": "backend", "feature": "docs_search"}
)

print(f"Response: {response.content}")
print(f"Cost: ${response.cost_usd:.4f}")
print(f"Cached: {response.cached}")
print(f"Provider: {response.provider}")

# Get cost report
report = gateway.get_cost_report(hours=24)
print(f"\nLast 24h: ${report['total_cost_usd']} across {report['total_requests']} requests")
print(f"Cache hit rate: {report['cache_hit_rate']:.0%}")
💡
Apply at work: Start by routing all LLM calls through a gateway, even if you only use one provider. The cost tracking alone pays for the engineering investment within the first month. Most teams discover that 20% of their features consume 80% of their LLM budget once they have visibility.

Key Takeaways

  • An LLM gateway is the single point through which all LLM calls flow. It gives you routing, fallbacks, rate limiting, caching, and cost tracking.
  • Always configure fallback models. OpenAI and Anthropic both have regular outages — your app should survive them transparently.
  • Semantic caching saves 40-60% of costs in production. Only cache deterministic responses (temperature=0).
  • Per-request cost tracking is essential. Tag every request with team, feature, and user to understand where your budget goes.
  • Token bucket rate limiting prevents quota exhaustion and cost spikes from traffic bursts.

What Is Next

In the next lesson, we will build the guardrails and safety layer — the component that protects your application from prompt injection, PII leakage, harmful content, and invalid outputs.