Caching & Performance
Caching is the single highest-ROI optimization in an AI gateway. Organizations routinely ask the same questions, use identical system prompts, and repeat similar queries. A gateway cache can reduce API costs by 30-60% and cut response times from seconds to milliseconds for cache hits.
Exact Match vs Semantic Caching
| Strategy | How It Works | Hit Rate | Use Case |
|---|---|---|---|
| Exact Match | SHA-256 hash of full request body, lookup in Redis | 10-30% | Templated prompts, batch jobs, repeated classification tasks |
| Semantic | Embed the prompt, vector-search for similar cached prompts | 20-50% | Customer support, search queries, FAQ-type questions |
Exact Match Cache
Start with exact match caching — it is simpler, faster, and has zero false positives:
import hashlib
import json
import time
import redis.asyncio as aioredis
class ExactMatchCache:
"""Cache LLM responses by exact request hash in Redis."""
def __init__(self, redis_url: str, default_ttl: int = 3600):
self.redis = aioredis.from_url(redis_url)
self.ttl = default_ttl
def _cache_key(self, body: dict) -> str:
"""Deterministic cache key from request body."""
# Only include fields that affect the response
relevant = {
"model": body.get("model"),
"messages": body.get("messages"),
"temperature": body.get("temperature", 1.0),
"max_tokens": body.get("max_tokens"),
"tools": body.get("tools"),
}
serialized = json.dumps(
{k: v for k, v in relevant.items() if v is not None},
sort_keys=True
)
return f"llm:exact:{hashlib.sha256(serialized.encode()).hexdigest()}"
async def get(self, body: dict) -> dict | None:
"""Check cache. Returns None on miss."""
# Only cache deterministic requests
if body.get("temperature", 1.0) > 0.5:
return None
if body.get("stream"):
return None
key = self._cache_key(body)
cached = await self.redis.get(key)
if cached:
result = json.loads(cached)
result["_cached"] = True
result["_cache_type"] = "exact"
return result
return None
async def set(self, body: dict, response: dict, ttl: int = None):
"""Cache a successful response."""
if body.get("temperature", 1.0) > 0.5 or body.get("stream"):
return
if "error" in response:
return
key = self._cache_key(body)
cache_data = {
"choices": response.get("choices"),
"usage": response.get("usage"),
"model": response.get("model"),
"_cached_at": time.time(),
}
await self.redis.setex(key, ttl or self.ttl, json.dumps(cache_data))
Semantic Cache
Semantic caching catches paraphrased queries. "What is the capital of France?" and "Tell me the capital city of France" return the same cached answer:
from openai import OpenAI
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct
class SemanticCache:
"""Cache LLM responses using embedding similarity."""
def __init__(
self,
qdrant_url: str = "http://localhost:6333",
threshold: float = 0.95, # Similarity threshold (0.95 = very similar)
ttl: int = 3600,
):
self.qdrant = QdrantClient(url=qdrant_url)
self.embedder = OpenAI()
self.threshold = threshold
self.ttl = ttl
self.collection = "gateway_semantic_cache"
self._init_collection()
def _init_collection(self):
collections = [c.name for c in self.qdrant.get_collections().collections]
if self.collection not in collections:
self.qdrant.create_collection(
collection_name=self.collection,
vectors_config=VectorParams(size=1536, distance=Distance.COSINE)
)
def _embed(self, text: str) -> list[float]:
return self.embedder.embeddings.create(
input=text[:2000], # Limit embedding input
model="text-embedding-3-small"
).data[0].embedding
def _prompt_text(self, body: dict) -> str:
"""Extract cacheable text from request."""
parts = []
for msg in body.get("messages", []):
if msg["role"] in ("system", "user"):
content = msg.get("content", "")
if isinstance(content, str):
parts.append(content)
return " ".join(parts)[-2000:]
async def get(self, body: dict) -> dict | None:
"""Find semantically similar cached response."""
if body.get("temperature", 1.0) > 0.3:
return None # Only for very deterministic requests
text = self._prompt_text(body)
embedding = self._embed(text)
results = self.qdrant.search(
collection_name=self.collection,
query_vector=embedding,
limit=1,
score_threshold=self.threshold
)
if results:
cached = results[0].payload
# Check TTL
if time.time() - cached["cached_at"] > self.ttl:
return None # Expired
response = json.loads(cached["response_json"])
response["_cached"] = True
response["_cache_type"] = "semantic"
response["_similarity"] = round(results[0].score, 4)
return response
return None
async def set(self, body: dict, response: dict):
if body.get("temperature", 1.0) > 0.3 or "error" in response:
return
text = self._prompt_text(body)
embedding = self._embed(text)
point_id = hashlib.md5(text.encode()).hexdigest()
self.qdrant.upsert(
collection_name=self.collection,
points=[PointStruct(
id=point_id,
vector=embedding,
payload={
"model": body.get("model"),
"prompt_preview": text[:200],
"response_json": json.dumps({
"choices": response.get("choices"),
"usage": response.get("usage"),
"model": response.get("model"),
}),
"cached_at": time.time(),
}
)]
)
Response Streaming Through Gateway
Streaming is critical for user-facing applications. The gateway must forward tokens as they arrive while tracking usage for cost and rate limiting:
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import httpx
app = FastAPI()
@app.post("/v1/chat/completions")
async def chat_completions(request: Request):
body = await request.json()
gw_key = request.headers.get("Authorization", "").replace("Bearer ", "")
# Auth, rate limit, PII filter, cache check (same as non-streaming)
caller = key_manager.validate(gw_key)
endpoint = router.select(body["model"])[0]
if not body.get("stream"):
# Non-streaming: standard proxy
response = await proxy_request(endpoint, body)
return response
# Streaming: forward tokens as they arrive
async def stream_generator():
full_content = []
usage_data = {}
async with httpx.AsyncClient() as client:
async with client.stream(
"POST",
f"{endpoint.base_url}/chat/completions",
headers={
"Authorization": f"Bearer {endpoint.api_key}",
"Content-Type": "application/json",
},
json=body,
timeout=120.0
) as resp:
async for line in resp.aiter_lines():
if line.startswith("data: "):
yield line + "\n\n" # Forward SSE event immediately
# Parse for tracking
data = line[6:]
if data != "[DONE]":
chunk = json.loads(data)
delta = chunk.get("choices", [{}])[0].get("delta", {})
if delta.get("content"):
full_content.append(delta["content"])
if chunk.get("usage"):
usage_data = chunk["usage"]
# After stream completes: record cost
output_tokens = usage_data.get("completion_tokens", len("".join(full_content)) // 4)
input_tokens = usage_data.get("prompt_tokens", 0)
cost = calculate_request_cost(body["model"], input_tokens, output_tokens)
await budget_enforcer.record_spend(caller.team_id, cost["total_cost"])
return StreamingResponse(
stream_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Gateway-Request-Id": str(uuid.uuid4()),
}
)
Latency Optimization
Target: gateway overhead under 5ms for cache hits, under 15ms for routed requests:
# 1. Connection pooling (biggest single win)
# Reuse HTTP connections to providers - saves 50-100ms per request
provider_pool = httpx.AsyncClient(
limits=httpx.Limits(max_connections=100, max_keepalive_connections=20),
http2=True, # HTTP/2 multiplexing
timeout=httpx.Timeout(60.0, connect=5.0)
)
# 2. Parallel pre-checks (auth + rate limit + cache in parallel)
import asyncio
async def pre_flight(body, api_key):
auth, rate, cached = await asyncio.gather(
key_manager.validate_async(api_key),
rate_limiter.check_async(api_key),
cache.get(body),
)
return auth, rate, cached
# Takes max(auth_time, rate_time, cache_time) instead of sum
# 3. Fast serialization with orjson (10x faster than stdlib json)
import orjson
data = orjson.dumps(response) # 50us vs 500us
# 4. In-memory caching for hot data
from functools import lru_cache
@lru_cache(maxsize=500)
def get_pricing(model: str):
return PRICING.get(model)
# Gateway latency budget:
# Key validation: < 0.5ms (Redis hash lookup)
# Rate limit: < 0.5ms (Redis Lua script)
# PII regex scan: < 2ms (compiled patterns)
# Cache lookup: < 1ms (Redis GET) or < 3ms (vector search)
# Routing logic: < 0.1ms (in-memory)
# ─────────────────────────────
# Total overhead: < 5ms (cache hit path)
Cache Hit Rate Monitoring
Track cache performance to validate your caching strategy is working and detect regressions:
class CacheMetrics:
"""Track and report cache performance."""
def __init__(self, redis_url):
self.redis = aioredis.from_url(redis_url)
async def record(self, hit: bool, cache_type: str, model: str, team: str):
today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
key = f"cache:metrics:{today}"
field = f"{'hit' if hit else 'miss'}:{cache_type}:{model}:{team}"
await self.redis.hincrby(key, field, 1)
await self.redis.hincrby(key, "hit" if hit else "miss", 1)
await self.redis.expire(key, 604800) # 7 days
async def get_daily_report(self, date: str = None) -> dict:
date = date or datetime.now(timezone.utc).strftime("%Y-%m-%d")
data = await self.redis.hgetall(f"cache:metrics:{date}")
hits = int(data.get("hit", 0))
misses = int(data.get("miss", 0))
total = hits + misses
return {
"date": date,
"total": total,
"hits": hits,
"misses": misses,
"hit_rate": round(100 * hits / total, 1) if total else 0,
"estimated_savings_pct": round(95 * hits / total, 1) if total else 0,
}
# Alerting thresholds:
# - Hit rate < 10%: Cache may be misconfigured or TTL too short
# - Hit rate > 60%: Great! Consider longer TTLs
# - Hit rate drops 10%+ from yesterday: Investigate (new prompts? config change?)
Key Takeaways
- Implement exact match caching first (Redis, 30 minutes to deploy). Add semantic caching only if paraphrased queries are common.
- Only cache low-temperature requests (temperature ≤ 0.5 for exact, ≤ 0.3 for semantic). High temperature requests want variety.
- Stream responses through the gateway by forwarding SSE events immediately while accumulating tokens for cost tracking after the stream completes.
- Target under 5ms gateway overhead with connection pooling, parallel pre-checks, orjson, and in-memory caching of hot data.
- Monitor cache hit rate daily. Target 20%+ for most workloads. A 25% hit rate saves approximately 24% on total API costs.
What Is Next
In the final lesson, we cover best practices and deployment checklist — how to migrate teams from direct API calls, deploy across multiple regions, and a comprehensive FAQ for AI gateway platform engineers.
Lilly Tech Systems