LLM-Specific Monitoring Intermediate

LLMs introduce monitoring challenges that traditional ML models don't have: unpredictable token costs, variable latency, hallucinations, prompt injection attacks, and quality that's hard to quantify. This lesson covers production monitoring patterns specifically designed for LLM-powered applications.

LLM Monitoring Dimensions

DimensionWhat to TrackWhy It MattersAlert Threshold
CostTokens per request, cost per query, daily spendLLM costs can spike 10x overnight from prompt changesCost > 2x daily budget
LatencyTTFT, total response time, tokens/secUser experience degrades sharply above 2-3s TTFTp95 > 5s or TTFT > 2s
QualityRelevance scores, factual accuracy, format complianceQuality degradation is silent and hard to detectQuality score < 0.7
SafetyGuardrail trigger rate, toxic content, PII leakageCompliance and reputation riskGuardrail rate > 5%
ReliabilityError rate, rate limit hits, timeout rateProvider outages directly impact your productError rate > 1%

Production LLM Request Logger

# Complete LLM request monitoring system
import time
import json
import hashlib
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from dataclasses import dataclass, field

@dataclass
class LLMRequestLog:
    """Structured log for every LLM API call."""
    request_id: str
    timestamp: str
    model: str
    prompt_template: str          # Which template was used
    prompt_hash: str              # Hash of actual prompt (for dedup)
    input_tokens: int
    output_tokens: int
    total_tokens: int
    latency_ms: float
    ttft_ms: float                # Time to first token
    cost_usd: float
    status: str                   # success, error, timeout, rate_limited
    error_message: Optional[str] = None
    quality_score: Optional[float] = None
    guardrail_triggered: bool = False
    guardrail_type: Optional[str] = None
    user_feedback: Optional[str] = None  # thumbs_up, thumbs_down, None


class LLMMonitor:
    """Production LLM monitoring system.

    Tracks cost, latency, quality, and safety across
    all LLM calls in your application.
    """

    # Pricing per 1M tokens (update as prices change)
    PRICING = {
        "gpt-4o": {"input": 2.50, "output": 10.00},
        "gpt-4o-mini": {"input": 0.15, "output": 0.60},
        "gpt-4-turbo": {"input": 10.00, "output": 30.00},
        "claude-3-5-sonnet": {"input": 3.00, "output": 15.00},
        "claude-3-haiku": {"input": 0.25, "output": 1.25},
    }

    def __init__(self):
        self.logs: List[LLMRequestLog] = []
        self.daily_cost: Dict[str, float] = {}
        self.prompt_performance: Dict[str, List[dict]] = {}

    def calculate_cost(self, model: str, input_tokens: int,
                        output_tokens: int) -> float:
        """Calculate cost for an LLM call."""
        pricing = self.PRICING.get(model, {"input": 5.0, "output": 15.0})
        cost = (input_tokens * pricing["input"] / 1_000_000 +
                output_tokens * pricing["output"] / 1_000_000)
        return round(cost, 6)

    def log_request(self, request_id: str, model: str,
                     prompt_template: str, prompt_text: str,
                     input_tokens: int, output_tokens: int,
                     latency_ms: float, ttft_ms: float,
                     status: str = "success",
                     error_message: str = None,
                     guardrail_triggered: bool = False,
                     guardrail_type: str = None) -> LLMRequestLog:
        """Log an LLM request with all monitoring dimensions."""
        cost = self.calculate_cost(model, input_tokens, output_tokens)
        prompt_hash = hashlib.md5(prompt_text.encode()).hexdigest()[:12]

        log = LLMRequestLog(
            request_id=request_id,
            timestamp=datetime.utcnow().isoformat(),
            model=model,
            prompt_template=prompt_template,
            prompt_hash=prompt_hash,
            input_tokens=input_tokens,
            output_tokens=output_tokens,
            total_tokens=input_tokens + output_tokens,
            latency_ms=latency_ms,
            ttft_ms=ttft_ms,
            cost_usd=cost,
            status=status,
            error_message=error_message,
            guardrail_triggered=guardrail_triggered,
            guardrail_type=guardrail_type
        )

        self.logs.append(log)

        # Track daily cost
        date_key = datetime.utcnow().strftime("%Y-%m-%d")
        self.daily_cost[date_key] = self.daily_cost.get(date_key, 0) + cost

        # Track per-prompt performance
        if prompt_template not in self.prompt_performance:
            self.prompt_performance[prompt_template] = []
        self.prompt_performance[prompt_template].append({
            "latency_ms": latency_ms,
            "tokens": input_tokens + output_tokens,
            "cost": cost,
            "status": status
        })

        return log

    def get_cost_report(self, hours: int = 24) -> dict:
        """Generate cost report for the last N hours."""
        cutoff = datetime.utcnow() - timedelta(hours=hours)
        recent = [l for l in self.logs
                  if datetime.fromisoformat(l.timestamp) > cutoff]

        if not recent:
            return {"status": "no_data", "period_hours": hours}

        total_cost = sum(l.cost_usd for l in recent)
        by_model = {}
        for log in recent:
            if log.model not in by_model:
                by_model[log.model] = {"cost": 0, "requests": 0, "tokens": 0}
            by_model[log.model]["cost"] += log.cost_usd
            by_model[log.model]["requests"] += 1
            by_model[log.model]["tokens"] += log.total_tokens

        return {
            "period_hours": hours,
            "total_cost_usd": round(total_cost, 4),
            "total_requests": len(recent),
            "avg_cost_per_request": round(total_cost / len(recent), 6),
            "projected_daily_cost": round(total_cost * 24 / hours, 2),
            "by_model": {k: {kk: round(vv, 4) if isinstance(vv, float) else vv
                             for kk, vv in v.items()}
                         for k, v in by_model.items()}
        }

    def get_latency_report(self, hours: int = 1) -> dict:
        """Generate latency report with percentiles."""
        cutoff = datetime.utcnow() - timedelta(hours=hours)
        recent = [l for l in self.logs
                  if datetime.fromisoformat(l.timestamp) > cutoff
                  and l.status == "success"]

        if not recent:
            return {"status": "no_data"}

        import numpy as np
        latencies = [l.latency_ms for l in recent]
        ttfts = [l.ttft_ms for l in recent]

        return {
            "period_hours": hours,
            "request_count": len(recent),
            "latency": {
                "p50_ms": round(np.percentile(latencies, 50), 1),
                "p95_ms": round(np.percentile(latencies, 95), 1),
                "p99_ms": round(np.percentile(latencies, 99), 1),
                "mean_ms": round(np.mean(latencies), 1)
            },
            "time_to_first_token": {
                "p50_ms": round(np.percentile(ttfts, 50), 1),
                "p95_ms": round(np.percentile(ttfts, 95), 1),
                "mean_ms": round(np.mean(ttfts), 1)
            }
        }

    def get_quality_report(self, hours: int = 24) -> dict:
        """Report on quality scores and user feedback."""
        cutoff = datetime.utcnow() - timedelta(hours=hours)
        recent = [l for l in self.logs
                  if datetime.fromisoformat(l.timestamp) > cutoff]

        total = len(recent)
        if total == 0:
            return {"status": "no_data"}

        guardrail_triggered = sum(1 for l in recent if l.guardrail_triggered)
        errors = sum(1 for l in recent if l.status != "success")
        thumbs_up = sum(1 for l in recent if l.user_feedback == "thumbs_up")
        thumbs_down = sum(1 for l in recent if l.user_feedback == "thumbs_down")
        feedback_total = thumbs_up + thumbs_down

        return {
            "period_hours": hours,
            "total_requests": total,
            "error_rate": round(errors / total, 4),
            "guardrail_trigger_rate": round(guardrail_triggered / total, 4),
            "user_satisfaction": round(thumbs_up / feedback_total, 4)
                                 if feedback_total > 0 else None,
            "feedback_coverage": round(feedback_total / total, 4)
        }

Hallucination Detection

# Hallucination detection strategies for production LLMs
import re
from typing import List, Dict

class HallucinationDetector:
    """Detect potential hallucinations in LLM outputs.

    Three detection strategies:
    1. Factual grounding: check claims against provided context
    2. Self-consistency: ask the same question multiple times
    3. Structural validation: check format, citations, numbers
    """

    def check_grounding(self, response: str, context: str,
                         claim_extractor=None) -> dict:
        """Check if response claims are grounded in the provided context.
        Used for RAG systems where context is retrieved documents."""

        # Simple approach: check if key entities in response
        # appear in the context
        response_sentences = [s.strip() for s in response.split('.')
                              if len(s.strip()) > 20]

        grounded = []
        ungrounded = []
        context_lower = context.lower()

        for sentence in response_sentences:
            # Extract potential factual claims (sentences with numbers,
            # dates, names, or specific assertions)
            has_specifics = bool(re.search(
                r'\d+|January|February|March|April|May|June|July|August|'
                r'September|October|November|December|\$|%|according to',
                sentence, re.IGNORECASE
            ))

            if has_specifics:
                # Check if key terms from the claim appear in context
                words = set(sentence.lower().split())
                context_words = set(context_lower.split())
                overlap = len(words & context_words) / len(words)

                if overlap > 0.4:
                    grounded.append(sentence)
                else:
                    ungrounded.append(sentence)

        total_claims = len(grounded) + len(ungrounded)
        grounding_score = (len(grounded) / total_claims
                           if total_claims > 0 else 1.0)

        return {
            "grounding_score": round(grounding_score, 3),
            "total_claims_checked": total_claims,
            "grounded_claims": len(grounded),
            "potentially_hallucinated": len(ungrounded),
            "ungrounded_sentences": ungrounded[:5],  # Top 5
            "alert": grounding_score < 0.7
        }

    def check_self_consistency(self, responses: List[str],
                                 similarity_threshold: float = 0.6) -> dict:
        """Check if multiple responses to the same query are consistent.
        High inconsistency suggests hallucination.

        Generate 3-5 responses with temperature > 0 and compare.
        """
        if len(responses) < 2:
            return {"status": "need_multiple_responses"}

        # Simple word overlap similarity
        def word_overlap(a: str, b: str) -> float:
            words_a = set(a.lower().split())
            words_b = set(b.lower().split())
            if not words_a or not words_b:
                return 0.0
            intersection = words_a & words_b
            union = words_a | words_b
            return len(intersection) / len(union)

        # Compare all pairs
        similarities = []
        for i in range(len(responses)):
            for j in range(i + 1, len(responses)):
                sim = word_overlap(responses[i], responses[j])
                similarities.append(sim)

        avg_similarity = sum(similarities) / len(similarities)

        return {
            "avg_consistency": round(avg_similarity, 3),
            "min_consistency": round(min(similarities), 3),
            "num_responses": len(responses),
            "likely_hallucination": avg_similarity < similarity_threshold,
            "confidence": "high" if len(responses) >= 5 else "medium"
        }

    def check_format_compliance(self, response: str,
                                  expected_format: dict) -> dict:
        """Validate LLM output matches expected format.
        Catches structural hallucinations (wrong JSON, missing fields)."""
        issues = []

        # Check JSON format if expected
        if expected_format.get("type") == "json":
            try:
                parsed = json.loads(response)
                required_fields = expected_format.get("required_fields", [])
                for field in required_fields:
                    if field not in parsed:
                        issues.append(f"Missing required field: {field}")
            except json.JSONDecodeError:
                issues.append("Response is not valid JSON")

        # Check length constraints
        max_length = expected_format.get("max_length")
        if max_length and len(response) > max_length:
            issues.append(f"Response exceeds max length ({len(response)} > {max_length})")

        # Check for forbidden patterns
        forbidden = expected_format.get("forbidden_patterns", [])
        for pattern in forbidden:
            if re.search(pattern, response, re.IGNORECASE):
                issues.append(f"Contains forbidden pattern: {pattern}")

        return {
            "compliant": len(issues) == 0,
            "issues": issues,
            "issue_count": len(issues)
        }

Prompt Performance Tracking

# Track and compare prompt template performance
from collections import defaultdict
import numpy as np

class PromptPerformanceTracker:
    """Track how different prompt templates perform in production.

    Helps answer: "Did our prompt change improve things?"
    """

    def __init__(self):
        self.prompt_metrics = defaultdict(lambda: {
            "requests": 0,
            "total_tokens": 0,
            "total_cost": 0,
            "latencies": [],
            "quality_scores": [],
            "guardrail_triggers": 0,
            "errors": 0,
            "thumbs_up": 0,
            "thumbs_down": 0
        })

    def log(self, prompt_template: str, tokens: int, cost: float,
            latency_ms: float, quality_score: float = None,
            guardrail_triggered: bool = False, error: bool = False,
            feedback: str = None):
        """Log metrics for a prompt template."""
        m = self.prompt_metrics[prompt_template]
        m["requests"] += 1
        m["total_tokens"] += tokens
        m["total_cost"] += cost
        m["latencies"].append(latency_ms)
        if quality_score is not None:
            m["quality_scores"].append(quality_score)
        if guardrail_triggered:
            m["guardrail_triggers"] += 1
        if error:
            m["errors"] += 1
        if feedback == "thumbs_up":
            m["thumbs_up"] += 1
        elif feedback == "thumbs_down":
            m["thumbs_down"] += 1

    def compare_prompts(self, template_a: str, template_b: str) -> dict:
        """Compare two prompt templates head-to-head."""
        a = self.prompt_metrics.get(template_a, {})
        b = self.prompt_metrics.get(template_b, {})

        if not a or not b:
            return {"status": "insufficient_data"}

        def summarize(metrics):
            reqs = metrics["requests"]
            fb_total = metrics["thumbs_up"] + metrics["thumbs_down"]
            return {
                "requests": reqs,
                "avg_tokens": round(metrics["total_tokens"] / max(reqs, 1)),
                "avg_cost": round(metrics["total_cost"] / max(reqs, 1), 6),
                "avg_latency_ms": round(np.mean(metrics["latencies"]), 1)
                                  if metrics["latencies"] else 0,
                "p95_latency_ms": round(np.percentile(metrics["latencies"], 95), 1)
                                  if metrics["latencies"] else 0,
                "avg_quality": round(np.mean(metrics["quality_scores"]), 3)
                              if metrics["quality_scores"] else None,
                "error_rate": round(metrics["errors"] / max(reqs, 1), 4),
                "guardrail_rate": round(metrics["guardrail_triggers"] / max(reqs, 1), 4),
                "satisfaction": round(metrics["thumbs_up"] / max(fb_total, 1), 3)
                               if fb_total > 0 else None
            }

        summary_a = summarize(a)
        summary_b = summarize(b)

        # Determine winner
        winners = {}
        for metric in ["avg_cost", "avg_latency_ms", "error_rate", "guardrail_rate"]:
            if summary_a[metric] < summary_b[metric]:
                winners[metric] = template_a
            elif summary_b[metric] < summary_a[metric]:
                winners[metric] = template_b

        for metric in ["avg_quality", "satisfaction"]:
            if summary_a[metric] and summary_b[metric]:
                if summary_a[metric] > summary_b[metric]:
                    winners[metric] = template_a
                elif summary_b[metric] > summary_a[metric]:
                    winners[metric] = template_b

        return {
            template_a: summary_a,
            template_b: summary_b,
            "winners_by_metric": winners
        }

Cost Monitoring and Budget Alerts

Cost Control Strategies:
  • Set daily/hourly budget caps with automatic model fallback (e.g., GPT-4o to GPT-4o-mini when budget is 80% consumed)
  • Track cost per user/feature to identify which parts of your app are most expensive
  • Monitor token efficiency: output_tokens / input_tokens ratio reveals if prompts are too verbose or context is bloated
  • Cache identical queries: semantic caching can reduce costs by 20-40% for many applications
  • Alert on cost anomalies: a runaway loop or prompt injection can generate thousands of dollars in API costs in minutes
Real Cost Incident: A production app had a retry loop bug that resubmitted failed LLM calls up to 100 times per request. Combined with a provider outage that returned 500 errors, this generated $12,000 in API costs in 2 hours before the budget alert fired. Always implement exponential backoff AND hard per-request cost caps.