Cost Optimization & Scaling
LLM costs can escalate from $100/month to $100,000/month as your application scales. The techniques in this lesson — semantic caching, model routing, token optimization, and batch processing — can reduce your costs by 40-60% without sacrificing quality.
Real Cost Breakdown
Before optimizing, you need to understand where your money goes. Here is a realistic cost breakdown for a production LLM application handling 100,000 requests per day:
| Component | Before Optimization | After Optimization | Savings |
|---|---|---|---|
| LLM API calls (GPT-4o) | $3,000/month | $900/month | 70% (model routing + caching) |
| Embedding calls | $150/month | $50/month | 67% (batch embeddings) |
| Vector database | $200/month | $200/month | 0% (fixed cost) |
| Infrastructure (cache, queue) | $100/month | $150/month | -50% (cache costs money but saves more) |
| Total | $3,450/month | $1,300/month | 62% savings |
Strategy 1: Model Routing (Save 50-70%)
The single biggest cost optimization. Route simple requests to cheap models and only use expensive models when necessary. Most requests do not need GPT-4o.
class CostAwareRouter:
"""Route requests to the cheapest model that can handle them."""
# Cost per 1M tokens (input + output averaged)
MODEL_COSTS = {
"gpt-4o": 6.25, # $2.50 input + $10 output / 2
"gpt-4o-mini": 0.375, # $0.15 input + $0.60 output / 2
"claude-sonnet": 9.0, # $3 input + $15 output / 2
"claude-haiku": 0.75, # $0.25 input + $1.25 output / 2
}
def __init__(self, gateway):
self.gateway = gateway
def classify_complexity(self, prompt: str, messages: list[dict]) -> str:
"""Classify request complexity to determine model tier."""
# Simple heuristics first (free, instant)
total_chars = sum(len(m["content"]) for m in messages)
# Short, simple requests -> cheap model
if total_chars < 500:
return "simple"
# Check for complexity indicators
complexity_signals = [
"analyze", "compare", "evaluate", "design", "architect",
"debug", "optimize", "trade-off", "pros and cons",
"step by step", "explain why", "reasoning"
]
prompt_lower = prompt.lower()
signal_count = sum(1 for s in complexity_signals if s in prompt_lower)
if signal_count >= 2:
return "complex"
elif signal_count == 1:
return "medium"
else:
return "simple"
def route(self, messages: list[dict], **kwargs) -> str:
"""Select the optimal model based on request complexity."""
prompt = messages[-1]["content"] if messages else ""
complexity = self.classify_complexity(prompt, messages)
model_map = {
"simple": "gpt-4o-mini", # $0.375/1M tokens
"medium": "gpt-4o-mini", # Still cheap enough
"complex": "gpt-4o" # $6.25/1M tokens
}
selected = model_map[complexity]
return selected
def complete(self, messages: list[dict], **kwargs):
"""Route and complete in one call."""
model = kwargs.pop("model", None) or self.route(messages)
return self.gateway.complete(messages=messages, model=model, **kwargs)
# Usage
router = CostAwareRouter(gateway=gateway)
# Simple request -> gpt-4o-mini ($0.375/1M tokens)
response = router.complete(
messages=[{"role": "user", "content": "What is the capital of France?"}]
)
# Cost: ~$0.0001
# Complex request -> gpt-4o ($6.25/1M tokens)
response = router.complete(
messages=[{"role": "user", "content": "Analyze the trade-offs between event-driven and request-driven architectures for a real-time trading platform. Evaluate latency, throughput, fault tolerance, and operational complexity."}]
)
# Cost: ~$0.005
Strategy 2: Semantic Caching (Save 40-60%)
Many users ask similar questions. Semantic caching returns cached responses for queries that are semantically equivalent, avoiding redundant LLM calls entirely.
class ProductionSemanticCache:
"""Production-grade semantic cache with Redis backend."""
def __init__(self, similarity_threshold: float = 0.95,
ttl_seconds: int = 3600, max_entries: int = 10000):
self.threshold = similarity_threshold
self.ttl = ttl_seconds
self.max_entries = max_entries
# In production, use Redis + vector index
self.entries: list[dict] = []
self.stats = {"hits": 0, "misses": 0, "total_saved_usd": 0}
def _embed(self, text: str) -> list[float]:
from openai import OpenAI
client = OpenAI()
response = client.embeddings.create(
input=text, model="text-embedding-3-small"
)
return response.data[0].embedding
def _cosine_sim(self, a, b):
dot = sum(x * y for x, y in zip(a, b))
norm_a = sum(x ** 2 for x in a) ** 0.5
norm_b = sum(x ** 2 for x in b) ** 0.5
return dot / (norm_a * norm_b) if norm_a and norm_b else 0
def get(self, prompt: str) -> dict | None:
"""Check cache. Returns cached response or None."""
prompt_emb = self._embed(prompt)
now = time.time()
best_match = None
best_score = 0
for entry in self.entries:
if now - entry["created_at"] > self.ttl:
continue
score = self._cosine_sim(prompt_emb, entry["embedding"])
if score > best_score and score >= self.threshold:
best_score = score
best_match = entry
if best_match:
self.stats["hits"] += 1
self.stats["total_saved_usd"] += best_match["estimated_cost"]
return best_match["response"]
self.stats["misses"] += 1
return None
def put(self, prompt: str, response: dict, estimated_cost: float):
"""Store response in cache."""
if len(self.entries) >= self.max_entries:
# Evict oldest
self.entries.sort(key=lambda e: e["created_at"])
self.entries = self.entries[len(self.entries) // 4:] # keep 75%
self.entries.append({
"embedding": self._embed(prompt),
"response": response,
"estimated_cost": estimated_cost,
"created_at": time.time()
})
def get_stats(self) -> dict:
total = self.stats["hits"] + self.stats["misses"]
return {
"hit_rate": self.stats["hits"] / max(total, 1),
"total_saved_usd": round(self.stats["total_saved_usd"], 2),
"total_requests": total,
"cache_size": len(self.entries)
}
# Real-world impact example:
# 100K requests/day, 45% cache hit rate, avg $0.003/request
# Savings: 100K * 0.45 * $0.003 = $135/day = $4,050/month
Strategy 3: Token Optimization (Save 20-30%)
Every token costs money. Reducing token usage in prompts and responses directly reduces costs.
class TokenOptimizer:
"""Reduce token usage without sacrificing output quality."""
@staticmethod
def compress_prompt(prompt: str) -> str:
"""Remove unnecessary tokens from prompts."""
# Remove excessive whitespace
import re
prompt = re.sub(r'\n{3,}', '\n\n', prompt)
prompt = re.sub(r' {2,}', ' ', prompt)
# Remove filler phrases that don't affect output quality
fillers = [
"Please note that ", "It is important to mention that ",
"As you may know, ", "In other words, ",
"Basically, ", "Essentially, ",
"I would like you to ", "Could you please "
]
for filler in fillers:
prompt = prompt.replace(filler, "")
return prompt.strip()
@staticmethod
def optimize_system_prompt(system_prompt: str) -> str:
"""Compress system prompt while preserving instructions."""
# System prompts are sent with every request - optimize them aggressively
lines = system_prompt.split('\n')
optimized = []
for line in lines:
line = line.strip()
if not line:
continue
if line.startswith('#') and not line.startswith('##'):
continue # Remove top-level comments
optimized.append(line)
return '\n'.join(optimized)
@staticmethod
def limit_response_tokens(task_type: str) -> int:
"""Set appropriate max_tokens based on task type."""
limits = {
"classification": 10, # "positive" / "negative"
"extraction": 200, # JSON with extracted fields
"short_answer": 100, # 1-2 sentences
"summary": 300, # 1 paragraph
"explanation": 500, # detailed answer
"code_generation": 1000, # code block
"long_form": 2000, # article, report
}
return limits.get(task_type, 500)
# Before optimization:
# system_prompt = 500 tokens, max_tokens = 4096 (default)
# Total per request: ~500 + 200 input + 4096 max output = expensive
# After optimization:
# system_prompt = 300 tokens (compressed), max_tokens = 200 (classification task)
# Total per request: ~300 + 200 input + 200 max output = 3x cheaper
optimizer = TokenOptimizer()
compressed = optimizer.compress_prompt("""
Please note that I would like you to analyze the following
customer review and classify the sentiment. It is important
to mention that you should respond with only one word:
positive, negative, or neutral.
""")
# Result: "Analyze this customer review. Classify sentiment: positive, negative, or neutral."
# Saved: ~50% of tokens
Strategy 4: Batch Processing (Save 30-50%)
When you have multiple requests that do not need real-time responses, batch them together to reduce overhead and often get volume discounts.
import asyncio
from collections import deque
class BatchProcessor:
"""Batch LLM requests for non-real-time workloads."""
def __init__(self, gateway, batch_size: int = 20, max_wait_seconds: float = 5.0):
self.gateway = gateway
self.batch_size = batch_size
self.max_wait = max_wait_seconds
self.queue: deque[dict] = deque()
self.results: dict[str, dict] = {}
def enqueue(self, request_id: str, messages: list[dict],
model: str = "gpt-4o-mini", **kwargs):
"""Add a request to the batch queue."""
self.queue.append({
"id": request_id,
"messages": messages,
"model": model,
**kwargs
})
async def process_batch(self):
"""Process a batch of requests concurrently."""
batch = []
while self.queue and len(batch) < self.batch_size:
batch.append(self.queue.popleft())
if not batch:
return
# Process concurrently with asyncio
tasks = []
for req in batch:
task = asyncio.create_task(self._process_one(req))
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
for req, result in zip(batch, results):
if isinstance(result, Exception):
self.results[req["id"]] = {"error": str(result)}
else:
self.results[req["id"]] = result
async def _process_one(self, request: dict) -> dict:
"""Process a single request (async wrapper)."""
response = self.gateway.complete(
messages=request["messages"],
model=request["model"]
)
return {
"content": response.content,
"cost_usd": response.cost_usd,
"model": response.model
}
def get_result(self, request_id: str) -> dict | None:
return self.results.get(request_id)
# Usage: Batch classify 1000 customer reviews
processor = BatchProcessor(gateway=gateway, batch_size=20)
reviews = [
"Great product, love it!",
"Terrible experience, want a refund.",
"It's okay, nothing special.",
# ... 997 more reviews
]
for i, review in enumerate(reviews):
processor.enqueue(
request_id=f"review_{i}",
messages=[{
"role": "user",
"content": f"Classify sentiment (positive/negative/neutral): {review}"
}],
model="gpt-4o-mini"
)
# Process all batches
# In production, use OpenAI's Batch API for 50% discount
asyncio.run(processor.process_batch())
Cost Monitoring Dashboard
You cannot optimize what you cannot see. Here is a cost monitoring system that tracks spend by model, feature, team, and time period:
from collections import defaultdict
from datetime import datetime, timedelta
class CostDashboard:
"""Track and visualize LLM costs across the organization."""
def __init__(self):
self.events: list[dict] = []
def record(self, model: str, input_tokens: int, output_tokens: int,
cost_usd: float, feature: str, team: str,
cached: bool = False):
"""Record a cost event."""
self.events.append({
"timestamp": time.time(),
"model": model,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"cost_usd": cost_usd,
"feature": feature,
"team": team,
"cached": cached
})
def daily_report(self, days: int = 7) -> dict:
"""Generate a daily cost report."""
cutoff = time.time() - (days * 86400)
recent = [e for e in self.events if e["timestamp"] > cutoff]
# Group by day
by_day = defaultdict(lambda: {"cost": 0, "requests": 0, "tokens": 0})
for e in recent:
day = datetime.fromtimestamp(e["timestamp"]).strftime("%Y-%m-%d")
by_day[day]["cost"] += e["cost_usd"]
by_day[day]["requests"] += 1
by_day[day]["tokens"] += e["input_tokens"] + e["output_tokens"]
# Top cost drivers
by_feature = defaultdict(float)
by_team = defaultdict(float)
by_model = defaultdict(float)
for e in recent:
by_feature[e["feature"]] += e["cost_usd"]
by_team[e["team"]] += e["cost_usd"]
by_model[e["model"]] += e["cost_usd"]
total_cost = sum(e["cost_usd"] for e in recent)
cache_savings = sum(e["cost_usd"] for e in recent if e["cached"])
return {
"period_days": days,
"total_cost_usd": round(total_cost, 2),
"total_requests": len(recent),
"avg_cost_per_request": round(total_cost / max(len(recent), 1), 4),
"cache_savings_usd": round(cache_savings, 2),
"daily_breakdown": dict(by_day),
"top_features": dict(sorted(by_feature.items(), key=lambda x: -x[1])[:5]),
"top_teams": dict(sorted(by_team.items(), key=lambda x: -x[1])[:5]),
"by_model": dict(by_model),
"projected_monthly_cost": round(total_cost / max(days, 1) * 30, 2)
}
def set_budget_alert(self, daily_budget_usd: float) -> dict | None:
"""Check if today's spend exceeds the budget."""
today = datetime.now().strftime("%Y-%m-%d")
today_cost = sum(
e["cost_usd"] for e in self.events
if datetime.fromtimestamp(e["timestamp"]).strftime("%Y-%m-%d") == today
)
if today_cost > daily_budget_usd:
return {
"alert": "BUDGET_EXCEEDED",
"daily_budget": daily_budget_usd,
"current_spend": round(today_cost, 2),
"overage": round(today_cost - daily_budget_usd, 2)
}
return None
# Usage
dashboard = CostDashboard()
# After every LLM call:
dashboard.record(
model="gpt-4o", input_tokens=500, output_tokens=200,
cost_usd=0.003, feature="chat", team="product"
)
# Generate weekly report
report = dashboard.daily_report(days=7)
print(f"Weekly LLM cost: ${report['total_cost_usd']}")
print(f"Projected monthly: ${report['projected_monthly_cost']}")
print(f"Top feature: {list(report['top_features'].keys())[0] if report['top_features'] else 'N/A'}")
# Set budget alerts
alert = dashboard.set_budget_alert(daily_budget_usd=100)
if alert:
print(f"ALERT: {alert['alert']} - ${alert['current_spend']} / ${alert['daily_budget']}")
Optimization Impact Summary
| Strategy | Savings | Implementation Effort | Impact on Quality |
|---|---|---|---|
| Model routing | 50-70% | 1-2 days | Minimal (cheap models handle simple tasks well) |
| Semantic caching | 40-60% | 2-3 days | None (identical responses for similar queries) |
| Token optimization | 20-30% | 1 day | None (removes filler, not content) |
| Batch processing | 30-50% | 1-2 days | None (same quality, delayed response) |
| max_tokens limits | 10-20% | 1 hour | None (prevents wasted output tokens) |
Key Takeaways
- Model routing is the biggest lever: route 60-80% of requests to cheap models (GPT-4o-mini, Claude Haiku) for 50-70% cost savings.
- Semantic caching saves 40-60% by returning cached responses for semantically similar queries.
- Set explicit max_tokens based on task type. Classification needs 10 tokens, not 4,096.
- Use OpenAI Batch API for non-real-time tasks to get a 50% discount.
- Build a cost monitoring dashboard before optimizing. You need to see where the money goes.
- Combined, these strategies typically reduce costs by 50-65% — turning a $10,000/month bill into $3,500-$5,000/month.
What Is Next
In the final lesson, we will compile everything into a production LLM checklist with best practices, common failure modes, debugging techniques, and a comprehensive FAQ to reference when building your next LLM application.
Lilly Tech Systems