Scaling & Operations Advanced

Going from a demo chatbot to one serving thousands of concurrent users across multiple tenants requires careful architecture. This lesson covers multi-tenant isolation, intelligent rate limiting, conversation analytics, A/B testing response quality, LLM cost management with token budgets, and production monitoring.

Multi-Tenant Architecture

If you're building a chatbot platform (not just a single bot), you need tenant isolation for data, configuration, system prompts, and usage limits.

# Multi-tenant chatbot platform
from dataclasses import dataclass, field
from typing import Dict, Optional
import json

@dataclass
class TenantConfig:
    tenant_id: str
    name: str
    system_prompt: str
    model: str = "gpt-4o-mini"
    max_tokens_per_response: int = 500
    monthly_token_budget: int = 1_000_000
    rate_limit_per_minute: int = 60
    allowed_tools: list = field(default_factory=list)
    blocked_topics: list = field(default_factory=list)
    escalation_email: str = ""
    custom_greeting: str = "Hello! How can I help you today?"

class TenantManager:
    """Manage tenant configurations and isolation."""

    def __init__(self, redis_client, db_pool):
        self.redis = redis_client
        self.db = db_pool
        self._cache: Dict[str, TenantConfig] = {}

    async def get_tenant(self, tenant_id: str) -> Optional[TenantConfig]:
        # Check local cache first
        if tenant_id in self._cache:
            return self._cache[tenant_id]

        # Check Redis cache
        cached = self.redis.get(f"tenant:{tenant_id}")
        if cached:
            config = TenantConfig(**json.loads(cached))
            self._cache[tenant_id] = config
            return config

        # Load from database
        async with self.db.acquire() as conn:
            row = await conn.fetchrow(
                "SELECT * FROM tenants WHERE tenant_id = $1 AND active = true",
                tenant_id
            )
            if not row:
                return None
            config = TenantConfig(
                tenant_id=row["tenant_id"],
                name=row["name"],
                system_prompt=row["system_prompt"],
                model=row["model"],
                max_tokens_per_response=row["max_tokens_per_response"],
                monthly_token_budget=row["monthly_token_budget"],
                rate_limit_per_minute=row["rate_limit_per_minute"],
                allowed_tools=json.loads(row["allowed_tools"]),
                blocked_topics=json.loads(row["blocked_topics"]),
                escalation_email=row["escalation_email"],
                custom_greeting=row["custom_greeting"]
            )
            self.redis.setex(f"tenant:{tenant_id}", 300, json.dumps(vars(config)))
            self._cache[tenant_id] = config
            return config

    async def check_budget(self, tenant_id: str, tokens_used: int) -> bool:
        """Check if tenant has remaining token budget."""
        key = f"usage:{tenant_id}:{self._current_month()}"
        current = int(self.redis.get(key) or 0)
        config = await self.get_tenant(tenant_id)
        if not config:
            return False
        return current + tokens_used <= config.monthly_token_budget

    async def record_usage(self, tenant_id: str, tokens: int):
        key = f"usage:{tenant_id}:{self._current_month()}"
        self.redis.incrby(key, tokens)
        self.redis.expire(key, 86400 * 35)  # Keep for ~1 month

    def _current_month(self) -> str:
        from datetime import datetime
        return datetime.utcnow().strftime("%Y-%m")

Intelligent Rate Limiting

Rate limit per user, per tenant, and globally. Use sliding windows for smooth enforcement and different limits for different operations.

# Multi-level rate limiter
import time

class SlidingWindowRateLimiter:
    """Sliding window rate limiter using Redis."""

    def __init__(self, redis_client):
        self.redis = redis_client

    def is_allowed(self, key: str, max_requests: int,
                   window_seconds: int) -> dict:
        """Check if request is allowed under rate limit."""
        now = time.time()
        window_start = now - window_seconds
        pipe_key = f"ratelimit:{key}"

        pipe = self.redis.pipeline()
        # Remove old entries
        pipe.zremrangebyscore(pipe_key, 0, window_start)
        # Count current entries
        pipe.zcard(pipe_key)
        # Add new entry
        pipe.zadd(pipe_key, {str(now): now})
        # Set expiry
        pipe.expire(pipe_key, window_seconds)
        results = pipe.execute()

        current_count = results[1]
        allowed = current_count < max_requests

        return {
            "allowed": allowed,
            "current": current_count,
            "limit": max_requests,
            "remaining": max(0, max_requests - current_count),
            "reset_at": now + window_seconds,
            "retry_after": window_seconds if not allowed else 0
        }


class ChatbotRateLimiter:
    """Multi-level rate limiting for chatbot platforms."""

    def __init__(self, redis_client):
        self.limiter = SlidingWindowRateLimiter(redis_client)

    def check(self, tenant_id: str, user_id: str) -> dict:
        # Level 1: Per-user rate limit (30 messages/minute)
        user_check = self.limiter.is_allowed(
            f"user:{tenant_id}:{user_id}", max_requests=30, window_seconds=60
        )
        if not user_check["allowed"]:
            return {"allowed": False, "level": "user", **user_check}

        # Level 2: Per-tenant rate limit (1000 messages/minute)
        tenant_check = self.limiter.is_allowed(
            f"tenant:{tenant_id}", max_requests=1000, window_seconds=60
        )
        if not tenant_check["allowed"]:
            return {"allowed": False, "level": "tenant", **tenant_check}

        # Level 3: Global rate limit (10000 messages/minute)
        global_check = self.limiter.is_allowed(
            "global", max_requests=10000, window_seconds=60
        )
        if not global_check["allowed"]:
            return {"allowed": False, "level": "global", **global_check}

        return {"allowed": True, "remaining": user_check["remaining"]}

Conversation Analytics

Track key metrics to understand chatbot performance: resolution rate, user satisfaction, escalation rate, and conversation quality.

# Conversation analytics tracker
from dataclasses import dataclass
from datetime import datetime
from typing import Dict, List

@dataclass
class ConversationMetrics:
    session_id: str
    tenant_id: str
    started_at: datetime
    ended_at: datetime = None
    turn_count: int = 0
    resolved: bool = False
    escalated: bool = False
    user_satisfaction: int = 0     # 1-5 star rating
    total_tokens: int = 0
    total_cost_usd: float = 0.0
    avg_response_time_ms: float = 0.0
    tools_used: List[str] = None
    channel: str = "web"

class AnalyticsCollector:
    """Collect and aggregate chatbot analytics."""

    def __init__(self, db_pool):
        self.db = db_pool

    async def record_conversation(self, metrics: ConversationMetrics):
        async with self.db.acquire() as conn:
            await conn.execute("""
                INSERT INTO conversation_analytics
                (session_id, tenant_id, started_at, ended_at, turn_count,
                 resolved, escalated, user_satisfaction, total_tokens,
                 total_cost_usd, avg_response_time_ms, tools_used, channel)
                VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13)
            """, metrics.session_id, metrics.tenant_id,
                metrics.started_at, metrics.ended_at, metrics.turn_count,
                metrics.resolved, metrics.escalated, metrics.user_satisfaction,
                metrics.total_tokens, metrics.total_cost_usd,
                metrics.avg_response_time_ms,
                ",".join(metrics.tools_used or []), metrics.channel)

    async def get_dashboard_data(self, tenant_id: str, days: int = 7) -> dict:
        async with self.db.acquire() as conn:
            stats = await conn.fetchrow("""
                SELECT
                    COUNT(*) as total_conversations,
                    AVG(turn_count) as avg_turns,
                    SUM(CASE WHEN resolved THEN 1 ELSE 0 END)::float
                        / NULLIF(COUNT(*), 0) * 100 as resolution_rate,
                    SUM(CASE WHEN escalated THEN 1 ELSE 0 END)::float
                        / NULLIF(COUNT(*), 0) * 100 as escalation_rate,
                    AVG(NULLIF(user_satisfaction, 0)) as avg_satisfaction,
                    SUM(total_tokens) as total_tokens,
                    SUM(total_cost_usd) as total_cost,
                    AVG(avg_response_time_ms) as avg_latency_ms
                FROM conversation_analytics
                WHERE tenant_id = $1
                AND started_at >= NOW() - INTERVAL '%s days'
            """ % days, tenant_id)

            return {
                "total_conversations": stats["total_conversations"],
                "avg_turns_per_conversation": round(float(stats["avg_turns"] or 0), 1),
                "resolution_rate_pct": round(float(stats["resolution_rate"] or 0), 1),
                "escalation_rate_pct": round(float(stats["escalation_rate"] or 0), 1),
                "avg_satisfaction": round(float(stats["avg_satisfaction"] or 0), 2),
                "total_tokens_used": stats["total_tokens"] or 0,
                "total_cost_usd": round(float(stats["total_cost"] or 0), 2),
                "avg_response_time_ms": round(float(stats["avg_latency_ms"] or 0))
            }

A/B Testing Chatbot Responses

Test different system prompts, models, and temperature settings to optimize conversation quality.

# A/B testing framework for chatbot responses
import hashlib
import random

@dataclass
class Experiment:
    name: str
    variants: Dict[str, dict]   # variant_name -> config overrides
    traffic_split: Dict[str, float]  # variant_name -> percentage (0.0-1.0)
    active: bool = True

class ABTestManager:
    """A/B test different chatbot configurations."""

    def __init__(self, redis_client):
        self.redis = redis_client
        self.experiments: Dict[str, Experiment] = {}

    def register_experiment(self, experiment: Experiment):
        self.experiments[experiment.name] = experiment

    def assign_variant(self, experiment_name: str, user_id: str) -> str:
        """Deterministically assign user to a variant (consistent hashing)."""
        experiment = self.experiments.get(experiment_name)
        if not experiment or not experiment.active:
            return "control"

        # Check if user already assigned
        cache_key = f"ab:{experiment_name}:{user_id}"
        cached = self.redis.get(cache_key)
        if cached:
            return cached.decode()

        # Deterministic assignment based on user_id hash
        hash_val = int(hashlib.md5(f"{experiment_name}:{user_id}".encode()).hexdigest(), 16)
        normalized = (hash_val % 10000) / 10000.0

        cumulative = 0.0
        assigned = "control"
        for variant_name, split in experiment.traffic_split.items():
            cumulative += split
            if normalized < cumulative:
                assigned = variant_name
                break

        self.redis.setex(cache_key, 86400 * 30, assigned)
        return assigned

    def get_config_overrides(self, experiment_name: str,
                              user_id: str) -> dict:
        experiment = self.experiments.get(experiment_name)
        if not experiment:
            return {}
        variant = self.assign_variant(experiment_name, user_id)
        return experiment.variants.get(variant, {})


# --- Example experiment ---
ab_manager = ABTestManager(redis_client=None)  # Pass your Redis client
ab_manager.register_experiment(Experiment(
    name="system_prompt_v2",
    variants={
        "control": {"system_prompt": "You are a helpful customer support agent..."},
        "concise": {"system_prompt": "You are a customer support agent. Be extremely concise. Max 2 sentences per response."},
        "empathetic": {"system_prompt": "You are a warm, empathetic customer support agent. Acknowledge feelings before solving problems."}
    },
    traffic_split={"control": 0.34, "concise": 0.33, "empathetic": 0.33}
))

Cost Management with Token Budgets

LLM API costs can spiral out of control. Implement per-conversation and per-tenant token budgets to keep costs predictable.

# Token budget manager
class TokenBudgetManager:
    """Track and enforce token spending limits."""

    # Pricing per 1M tokens (input/output) as of 2026
    MODEL_PRICING = {
        "gpt-4o":       {"input": 2.50,  "output": 10.00},
        "gpt-4o-mini":  {"input": 0.15,  "output": 0.60},
        "claude-sonnet": {"input": 3.00,  "output": 15.00},
        "claude-haiku":  {"input": 0.25,  "output": 1.25},
    }

    def __init__(self, redis_client):
        self.redis = redis_client

    def estimate_cost(self, model: str, input_tokens: int,
                      output_tokens: int) -> float:
        pricing = self.MODEL_PRICING.get(model, {"input": 5.0, "output": 15.0})
        input_cost = (input_tokens / 1_000_000) * pricing["input"]
        output_cost = (output_tokens / 1_000_000) * pricing["output"]
        return input_cost + output_cost

    def check_conversation_budget(self, session_id: str,
                                   max_cost_usd: float = 0.50) -> bool:
        """Check if conversation is within budget."""
        spent = float(self.redis.get(f"budget:session:{session_id}") or 0)
        return spent < max_cost_usd

    def record_spend(self, session_id: str, tenant_id: str,
                     model: str, input_tokens: int, output_tokens: int):
        cost = self.estimate_cost(model, input_tokens, output_tokens)

        pipe = self.redis.pipeline()
        # Per-session tracking
        pipe.incrbyfloat(f"budget:session:{session_id}", cost)
        pipe.expire(f"budget:session:{session_id}", 7200)
        # Per-tenant daily tracking
        from datetime import datetime
        today = datetime.utcnow().strftime("%Y-%m-%d")
        pipe.incrbyfloat(f"budget:tenant:{tenant_id}:{today}", cost)
        pipe.expire(f"budget:tenant:{tenant_id}:{today}", 86400 * 2)
        # Token counters
        pipe.incrby(f"tokens:tenant:{tenant_id}:input", input_tokens)
        pipe.incrby(f"tokens:tenant:{tenant_id}:output", output_tokens)
        pipe.execute()

        return cost

    def get_cost_summary(self, tenant_id: str) -> dict:
        from datetime import datetime, timedelta
        today = datetime.utcnow()
        daily_costs = {}
        for i in range(7):
            day = (today - timedelta(days=i)).strftime("%Y-%m-%d")
            cost = float(self.redis.get(f"budget:tenant:{tenant_id}:{day}") or 0)
            daily_costs[day] = round(cost, 4)

        return {
            "daily_costs": daily_costs,
            "total_7d": round(sum(daily_costs.values()), 2),
            "avg_daily": round(sum(daily_costs.values()) / 7, 2)
        }

Monitoring Dashboard Metrics

Key metrics to track in your monitoring dashboard (Grafana, Datadog, or custom).

MetricTargetAlert ThresholdWhy It Matters
P50 Response Time<2s>5sUser experience; slow bots get abandoned
P99 Response Time<8s>15sTail latency indicates system issues
Resolution Rate>75%<50%Core measure of chatbot effectiveness
Escalation Rate<15%>30%Too high means bot isn't handling enough
Error Rate<1%>5%Tool failures, LLM errors, timeouts
Daily Active UsersGrowing-20% WoWAdoption and engagement health
Cost per Conversation<$0.10>$0.50Financial sustainability
User Satisfaction (CSAT)>4.0/5<3.0/5Direct measure of quality
Safety Violations BlockedTrackedSpike >3xAttack detection, policy gaps
Cost Tip: Use cheaper models (GPT-4o-mini, Claude Haiku) for classification, summarization, and simple queries. Reserve expensive models (GPT-4o, Claude Sonnet) for complex reasoning. This hybrid approach can reduce costs by 60-80% with minimal quality impact.