Advanced

Cost Optimization & Scaling

LLM costs can escalate from $100/month to $100,000/month as your application scales. The techniques in this lesson — semantic caching, model routing, token optimization, and batch processing — can reduce your costs by 40-60% without sacrificing quality.

Real Cost Breakdown

Before optimizing, you need to understand where your money goes. Here is a realistic cost breakdown for a production LLM application handling 100,000 requests per day:

ComponentBefore OptimizationAfter OptimizationSavings
LLM API calls (GPT-4o) $3,000/month $900/month 70% (model routing + caching)
Embedding calls $150/month $50/month 67% (batch embeddings)
Vector database $200/month $200/month 0% (fixed cost)
Infrastructure (cache, queue) $100/month $150/month -50% (cache costs money but saves more)
Total $3,450/month $1,300/month 62% savings

Strategy 1: Model Routing (Save 50-70%)

The single biggest cost optimization. Route simple requests to cheap models and only use expensive models when necessary. Most requests do not need GPT-4o.

class CostAwareRouter:
    """Route requests to the cheapest model that can handle them."""

    # Cost per 1M tokens (input + output averaged)
    MODEL_COSTS = {
        "gpt-4o": 6.25,        # $2.50 input + $10 output / 2
        "gpt-4o-mini": 0.375,  # $0.15 input + $0.60 output / 2
        "claude-sonnet": 9.0,  # $3 input + $15 output / 2
        "claude-haiku": 0.75,  # $0.25 input + $1.25 output / 2
    }

    def __init__(self, gateway):
        self.gateway = gateway

    def classify_complexity(self, prompt: str, messages: list[dict]) -> str:
        """Classify request complexity to determine model tier."""

        # Simple heuristics first (free, instant)
        total_chars = sum(len(m["content"]) for m in messages)

        # Short, simple requests -> cheap model
        if total_chars < 500:
            return "simple"

        # Check for complexity indicators
        complexity_signals = [
            "analyze", "compare", "evaluate", "design", "architect",
            "debug", "optimize", "trade-off", "pros and cons",
            "step by step", "explain why", "reasoning"
        ]

        prompt_lower = prompt.lower()
        signal_count = sum(1 for s in complexity_signals if s in prompt_lower)

        if signal_count >= 2:
            return "complex"
        elif signal_count == 1:
            return "medium"
        else:
            return "simple"

    def route(self, messages: list[dict], **kwargs) -> str:
        """Select the optimal model based on request complexity."""
        prompt = messages[-1]["content"] if messages else ""
        complexity = self.classify_complexity(prompt, messages)

        model_map = {
            "simple": "gpt-4o-mini",    # $0.375/1M tokens
            "medium": "gpt-4o-mini",    # Still cheap enough
            "complex": "gpt-4o"         # $6.25/1M tokens
        }

        selected = model_map[complexity]
        return selected

    def complete(self, messages: list[dict], **kwargs):
        """Route and complete in one call."""
        model = kwargs.pop("model", None) or self.route(messages)
        return self.gateway.complete(messages=messages, model=model, **kwargs)


# Usage
router = CostAwareRouter(gateway=gateway)

# Simple request -> gpt-4o-mini ($0.375/1M tokens)
response = router.complete(
    messages=[{"role": "user", "content": "What is the capital of France?"}]
)
# Cost: ~$0.0001

# Complex request -> gpt-4o ($6.25/1M tokens)
response = router.complete(
    messages=[{"role": "user", "content": "Analyze the trade-offs between event-driven and request-driven architectures for a real-time trading platform. Evaluate latency, throughput, fault tolerance, and operational complexity."}]
)
# Cost: ~$0.005
💡
Apply at work: Analyze your last 1,000 LLM requests. You will likely find that 60-80% are simple classification, extraction, or formatting tasks that a cheap model handles just as well. Route those to GPT-4o-mini or Claude Haiku and keep GPT-4o for the 20-40% that need reasoning.

Strategy 2: Semantic Caching (Save 40-60%)

Many users ask similar questions. Semantic caching returns cached responses for queries that are semantically equivalent, avoiding redundant LLM calls entirely.

class ProductionSemanticCache:
    """Production-grade semantic cache with Redis backend."""

    def __init__(self, similarity_threshold: float = 0.95,
                 ttl_seconds: int = 3600, max_entries: int = 10000):
        self.threshold = similarity_threshold
        self.ttl = ttl_seconds
        self.max_entries = max_entries
        # In production, use Redis + vector index
        self.entries: list[dict] = []
        self.stats = {"hits": 0, "misses": 0, "total_saved_usd": 0}

    def _embed(self, text: str) -> list[float]:
        from openai import OpenAI
        client = OpenAI()
        response = client.embeddings.create(
            input=text, model="text-embedding-3-small"
        )
        return response.data[0].embedding

    def _cosine_sim(self, a, b):
        dot = sum(x * y for x, y in zip(a, b))
        norm_a = sum(x ** 2 for x in a) ** 0.5
        norm_b = sum(x ** 2 for x in b) ** 0.5
        return dot / (norm_a * norm_b) if norm_a and norm_b else 0

    def get(self, prompt: str) -> dict | None:
        """Check cache. Returns cached response or None."""
        prompt_emb = self._embed(prompt)
        now = time.time()

        best_match = None
        best_score = 0

        for entry in self.entries:
            if now - entry["created_at"] > self.ttl:
                continue
            score = self._cosine_sim(prompt_emb, entry["embedding"])
            if score > best_score and score >= self.threshold:
                best_score = score
                best_match = entry

        if best_match:
            self.stats["hits"] += 1
            self.stats["total_saved_usd"] += best_match["estimated_cost"]
            return best_match["response"]

        self.stats["misses"] += 1
        return None

    def put(self, prompt: str, response: dict, estimated_cost: float):
        """Store response in cache."""
        if len(self.entries) >= self.max_entries:
            # Evict oldest
            self.entries.sort(key=lambda e: e["created_at"])
            self.entries = self.entries[len(self.entries) // 4:]  # keep 75%

        self.entries.append({
            "embedding": self._embed(prompt),
            "response": response,
            "estimated_cost": estimated_cost,
            "created_at": time.time()
        })

    def get_stats(self) -> dict:
        total = self.stats["hits"] + self.stats["misses"]
        return {
            "hit_rate": self.stats["hits"] / max(total, 1),
            "total_saved_usd": round(self.stats["total_saved_usd"], 2),
            "total_requests": total,
            "cache_size": len(self.entries)
        }


# Real-world impact example:
# 100K requests/day, 45% cache hit rate, avg $0.003/request
# Savings: 100K * 0.45 * $0.003 = $135/day = $4,050/month

Strategy 3: Token Optimization (Save 20-30%)

Every token costs money. Reducing token usage in prompts and responses directly reduces costs.

class TokenOptimizer:
    """Reduce token usage without sacrificing output quality."""

    @staticmethod
    def compress_prompt(prompt: str) -> str:
        """Remove unnecessary tokens from prompts."""
        # Remove excessive whitespace
        import re
        prompt = re.sub(r'\n{3,}', '\n\n', prompt)
        prompt = re.sub(r' {2,}', ' ', prompt)

        # Remove filler phrases that don't affect output quality
        fillers = [
            "Please note that ", "It is important to mention that ",
            "As you may know, ", "In other words, ",
            "Basically, ", "Essentially, ",
            "I would like you to ", "Could you please "
        ]
        for filler in fillers:
            prompt = prompt.replace(filler, "")

        return prompt.strip()

    @staticmethod
    def optimize_system_prompt(system_prompt: str) -> str:
        """Compress system prompt while preserving instructions."""
        # System prompts are sent with every request - optimize them aggressively
        lines = system_prompt.split('\n')
        optimized = []
        for line in lines:
            line = line.strip()
            if not line:
                continue
            if line.startswith('#') and not line.startswith('##'):
                continue  # Remove top-level comments
            optimized.append(line)
        return '\n'.join(optimized)

    @staticmethod
    def limit_response_tokens(task_type: str) -> int:
        """Set appropriate max_tokens based on task type."""
        limits = {
            "classification": 10,       # "positive" / "negative"
            "extraction": 200,          # JSON with extracted fields
            "short_answer": 100,        # 1-2 sentences
            "summary": 300,             # 1 paragraph
            "explanation": 500,         # detailed answer
            "code_generation": 1000,    # code block
            "long_form": 2000,          # article, report
        }
        return limits.get(task_type, 500)


# Before optimization:
# system_prompt = 500 tokens, max_tokens = 4096 (default)
# Total per request: ~500 + 200 input + 4096 max output = expensive

# After optimization:
# system_prompt = 300 tokens (compressed), max_tokens = 200 (classification task)
# Total per request: ~300 + 200 input + 200 max output = 3x cheaper

optimizer = TokenOptimizer()
compressed = optimizer.compress_prompt("""
    Please note that I would like you to analyze the following
    customer review and classify the sentiment. It is important
    to mention that you should respond with only one word:
    positive, negative, or neutral.
""")
# Result: "Analyze this customer review. Classify sentiment: positive, negative, or neutral."
# Saved: ~50% of tokens

Strategy 4: Batch Processing (Save 30-50%)

When you have multiple requests that do not need real-time responses, batch them together to reduce overhead and often get volume discounts.

import asyncio
from collections import deque


class BatchProcessor:
    """Batch LLM requests for non-real-time workloads."""

    def __init__(self, gateway, batch_size: int = 20, max_wait_seconds: float = 5.0):
        self.gateway = gateway
        self.batch_size = batch_size
        self.max_wait = max_wait_seconds
        self.queue: deque[dict] = deque()
        self.results: dict[str, dict] = {}

    def enqueue(self, request_id: str, messages: list[dict],
                model: str = "gpt-4o-mini", **kwargs):
        """Add a request to the batch queue."""
        self.queue.append({
            "id": request_id,
            "messages": messages,
            "model": model,
            **kwargs
        })

    async def process_batch(self):
        """Process a batch of requests concurrently."""
        batch = []
        while self.queue and len(batch) < self.batch_size:
            batch.append(self.queue.popleft())

        if not batch:
            return

        # Process concurrently with asyncio
        tasks = []
        for req in batch:
            task = asyncio.create_task(self._process_one(req))
            tasks.append(task)

        results = await asyncio.gather(*tasks, return_exceptions=True)

        for req, result in zip(batch, results):
            if isinstance(result, Exception):
                self.results[req["id"]] = {"error": str(result)}
            else:
                self.results[req["id"]] = result

    async def _process_one(self, request: dict) -> dict:
        """Process a single request (async wrapper)."""
        response = self.gateway.complete(
            messages=request["messages"],
            model=request["model"]
        )
        return {
            "content": response.content,
            "cost_usd": response.cost_usd,
            "model": response.model
        }

    def get_result(self, request_id: str) -> dict | None:
        return self.results.get(request_id)


# Usage: Batch classify 1000 customer reviews
processor = BatchProcessor(gateway=gateway, batch_size=20)

reviews = [
    "Great product, love it!",
    "Terrible experience, want a refund.",
    "It's okay, nothing special.",
    # ... 997 more reviews
]

for i, review in enumerate(reviews):
    processor.enqueue(
        request_id=f"review_{i}",
        messages=[{
            "role": "user",
            "content": f"Classify sentiment (positive/negative/neutral): {review}"
        }],
        model="gpt-4o-mini"
    )

# Process all batches
# In production, use OpenAI's Batch API for 50% discount
asyncio.run(processor.process_batch())
📝
Production reality: OpenAI's Batch API offers a 50% discount for requests that can tolerate up to 24-hour completion times. If you have classification, extraction, or evaluation tasks that are not time-sensitive, always use the Batch API. At scale, this single change can save thousands of dollars per month.

Cost Monitoring Dashboard

You cannot optimize what you cannot see. Here is a cost monitoring system that tracks spend by model, feature, team, and time period:

from collections import defaultdict
from datetime import datetime, timedelta


class CostDashboard:
    """Track and visualize LLM costs across the organization."""

    def __init__(self):
        self.events: list[dict] = []

    def record(self, model: str, input_tokens: int, output_tokens: int,
               cost_usd: float, feature: str, team: str,
               cached: bool = False):
        """Record a cost event."""
        self.events.append({
            "timestamp": time.time(),
            "model": model,
            "input_tokens": input_tokens,
            "output_tokens": output_tokens,
            "cost_usd": cost_usd,
            "feature": feature,
            "team": team,
            "cached": cached
        })

    def daily_report(self, days: int = 7) -> dict:
        """Generate a daily cost report."""
        cutoff = time.time() - (days * 86400)
        recent = [e for e in self.events if e["timestamp"] > cutoff]

        # Group by day
        by_day = defaultdict(lambda: {"cost": 0, "requests": 0, "tokens": 0})
        for e in recent:
            day = datetime.fromtimestamp(e["timestamp"]).strftime("%Y-%m-%d")
            by_day[day]["cost"] += e["cost_usd"]
            by_day[day]["requests"] += 1
            by_day[day]["tokens"] += e["input_tokens"] + e["output_tokens"]

        # Top cost drivers
        by_feature = defaultdict(float)
        by_team = defaultdict(float)
        by_model = defaultdict(float)
        for e in recent:
            by_feature[e["feature"]] += e["cost_usd"]
            by_team[e["team"]] += e["cost_usd"]
            by_model[e["model"]] += e["cost_usd"]

        total_cost = sum(e["cost_usd"] for e in recent)
        cache_savings = sum(e["cost_usd"] for e in recent if e["cached"])

        return {
            "period_days": days,
            "total_cost_usd": round(total_cost, 2),
            "total_requests": len(recent),
            "avg_cost_per_request": round(total_cost / max(len(recent), 1), 4),
            "cache_savings_usd": round(cache_savings, 2),
            "daily_breakdown": dict(by_day),
            "top_features": dict(sorted(by_feature.items(), key=lambda x: -x[1])[:5]),
            "top_teams": dict(sorted(by_team.items(), key=lambda x: -x[1])[:5]),
            "by_model": dict(by_model),
            "projected_monthly_cost": round(total_cost / max(days, 1) * 30, 2)
        }

    def set_budget_alert(self, daily_budget_usd: float) -> dict | None:
        """Check if today's spend exceeds the budget."""
        today = datetime.now().strftime("%Y-%m-%d")
        today_cost = sum(
            e["cost_usd"] for e in self.events
            if datetime.fromtimestamp(e["timestamp"]).strftime("%Y-%m-%d") == today
        )
        if today_cost > daily_budget_usd:
            return {
                "alert": "BUDGET_EXCEEDED",
                "daily_budget": daily_budget_usd,
                "current_spend": round(today_cost, 2),
                "overage": round(today_cost - daily_budget_usd, 2)
            }
        return None


# Usage
dashboard = CostDashboard()

# After every LLM call:
dashboard.record(
    model="gpt-4o", input_tokens=500, output_tokens=200,
    cost_usd=0.003, feature="chat", team="product"
)

# Generate weekly report
report = dashboard.daily_report(days=7)
print(f"Weekly LLM cost: ${report['total_cost_usd']}")
print(f"Projected monthly: ${report['projected_monthly_cost']}")
print(f"Top feature: {list(report['top_features'].keys())[0] if report['top_features'] else 'N/A'}")

# Set budget alerts
alert = dashboard.set_budget_alert(daily_budget_usd=100)
if alert:
    print(f"ALERT: {alert['alert']} - ${alert['current_spend']} / ${alert['daily_budget']}")
💡
Apply at work: Implement cost monitoring before you implement cost optimization. Most teams discover surprising cost drivers once they have visibility. The top three optimizations (model routing, semantic caching, and max_tokens limits) typically save 50-65% and take less than a week to implement.

Optimization Impact Summary

StrategySavingsImplementation EffortImpact on Quality
Model routing 50-70% 1-2 days Minimal (cheap models handle simple tasks well)
Semantic caching 40-60% 2-3 days None (identical responses for similar queries)
Token optimization 20-30% 1 day None (removes filler, not content)
Batch processing 30-50% 1-2 days None (same quality, delayed response)
max_tokens limits 10-20% 1 hour None (prevents wasted output tokens)

Key Takeaways

  • Model routing is the biggest lever: route 60-80% of requests to cheap models (GPT-4o-mini, Claude Haiku) for 50-70% cost savings.
  • Semantic caching saves 40-60% by returning cached responses for semantically similar queries.
  • Set explicit max_tokens based on task type. Classification needs 10 tokens, not 4,096.
  • Use OpenAI Batch API for non-real-time tasks to get a 50% discount.
  • Build a cost monitoring dashboard before optimizing. You need to see where the money goes.
  • Combined, these strategies typically reduce costs by 50-65% — turning a $10,000/month bill into $3,500-$5,000/month.

What Is Next

In the final lesson, we will compile everything into a production LLM checklist with best practices, common failure modes, debugging techniques, and a comprehensive FAQ to reference when building your next LLM application.