Scaling & Operations Advanced
Going from a demo chatbot to one serving thousands of concurrent users across multiple tenants requires careful architecture. This lesson covers multi-tenant isolation, intelligent rate limiting, conversation analytics, A/B testing response quality, LLM cost management with token budgets, and production monitoring.
Multi-Tenant Architecture
If you're building a chatbot platform (not just a single bot), you need tenant isolation for data, configuration, system prompts, and usage limits.
# Multi-tenant chatbot platform
from dataclasses import dataclass, field
from typing import Dict, Optional
import json
@dataclass
class TenantConfig:
tenant_id: str
name: str
system_prompt: str
model: str = "gpt-4o-mini"
max_tokens_per_response: int = 500
monthly_token_budget: int = 1_000_000
rate_limit_per_minute: int = 60
allowed_tools: list = field(default_factory=list)
blocked_topics: list = field(default_factory=list)
escalation_email: str = ""
custom_greeting: str = "Hello! How can I help you today?"
class TenantManager:
"""Manage tenant configurations and isolation."""
def __init__(self, redis_client, db_pool):
self.redis = redis_client
self.db = db_pool
self._cache: Dict[str, TenantConfig] = {}
async def get_tenant(self, tenant_id: str) -> Optional[TenantConfig]:
# Check local cache first
if tenant_id in self._cache:
return self._cache[tenant_id]
# Check Redis cache
cached = self.redis.get(f"tenant:{tenant_id}")
if cached:
config = TenantConfig(**json.loads(cached))
self._cache[tenant_id] = config
return config
# Load from database
async with self.db.acquire() as conn:
row = await conn.fetchrow(
"SELECT * FROM tenants WHERE tenant_id = $1 AND active = true",
tenant_id
)
if not row:
return None
config = TenantConfig(
tenant_id=row["tenant_id"],
name=row["name"],
system_prompt=row["system_prompt"],
model=row["model"],
max_tokens_per_response=row["max_tokens_per_response"],
monthly_token_budget=row["monthly_token_budget"],
rate_limit_per_minute=row["rate_limit_per_minute"],
allowed_tools=json.loads(row["allowed_tools"]),
blocked_topics=json.loads(row["blocked_topics"]),
escalation_email=row["escalation_email"],
custom_greeting=row["custom_greeting"]
)
self.redis.setex(f"tenant:{tenant_id}", 300, json.dumps(vars(config)))
self._cache[tenant_id] = config
return config
async def check_budget(self, tenant_id: str, tokens_used: int) -> bool:
"""Check if tenant has remaining token budget."""
key = f"usage:{tenant_id}:{self._current_month()}"
current = int(self.redis.get(key) or 0)
config = await self.get_tenant(tenant_id)
if not config:
return False
return current + tokens_used <= config.monthly_token_budget
async def record_usage(self, tenant_id: str, tokens: int):
key = f"usage:{tenant_id}:{self._current_month()}"
self.redis.incrby(key, tokens)
self.redis.expire(key, 86400 * 35) # Keep for ~1 month
def _current_month(self) -> str:
from datetime import datetime
return datetime.utcnow().strftime("%Y-%m")
Intelligent Rate Limiting
Rate limit per user, per tenant, and globally. Use sliding windows for smooth enforcement and different limits for different operations.
# Multi-level rate limiter
import time
class SlidingWindowRateLimiter:
"""Sliding window rate limiter using Redis."""
def __init__(self, redis_client):
self.redis = redis_client
def is_allowed(self, key: str, max_requests: int,
window_seconds: int) -> dict:
"""Check if request is allowed under rate limit."""
now = time.time()
window_start = now - window_seconds
pipe_key = f"ratelimit:{key}"
pipe = self.redis.pipeline()
# Remove old entries
pipe.zremrangebyscore(pipe_key, 0, window_start)
# Count current entries
pipe.zcard(pipe_key)
# Add new entry
pipe.zadd(pipe_key, {str(now): now})
# Set expiry
pipe.expire(pipe_key, window_seconds)
results = pipe.execute()
current_count = results[1]
allowed = current_count < max_requests
return {
"allowed": allowed,
"current": current_count,
"limit": max_requests,
"remaining": max(0, max_requests - current_count),
"reset_at": now + window_seconds,
"retry_after": window_seconds if not allowed else 0
}
class ChatbotRateLimiter:
"""Multi-level rate limiting for chatbot platforms."""
def __init__(self, redis_client):
self.limiter = SlidingWindowRateLimiter(redis_client)
def check(self, tenant_id: str, user_id: str) -> dict:
# Level 1: Per-user rate limit (30 messages/minute)
user_check = self.limiter.is_allowed(
f"user:{tenant_id}:{user_id}", max_requests=30, window_seconds=60
)
if not user_check["allowed"]:
return {"allowed": False, "level": "user", **user_check}
# Level 2: Per-tenant rate limit (1000 messages/minute)
tenant_check = self.limiter.is_allowed(
f"tenant:{tenant_id}", max_requests=1000, window_seconds=60
)
if not tenant_check["allowed"]:
return {"allowed": False, "level": "tenant", **tenant_check}
# Level 3: Global rate limit (10000 messages/minute)
global_check = self.limiter.is_allowed(
"global", max_requests=10000, window_seconds=60
)
if not global_check["allowed"]:
return {"allowed": False, "level": "global", **global_check}
return {"allowed": True, "remaining": user_check["remaining"]}
Conversation Analytics
Track key metrics to understand chatbot performance: resolution rate, user satisfaction, escalation rate, and conversation quality.
# Conversation analytics tracker
from dataclasses import dataclass
from datetime import datetime
from typing import Dict, List
@dataclass
class ConversationMetrics:
session_id: str
tenant_id: str
started_at: datetime
ended_at: datetime = None
turn_count: int = 0
resolved: bool = False
escalated: bool = False
user_satisfaction: int = 0 # 1-5 star rating
total_tokens: int = 0
total_cost_usd: float = 0.0
avg_response_time_ms: float = 0.0
tools_used: List[str] = None
channel: str = "web"
class AnalyticsCollector:
"""Collect and aggregate chatbot analytics."""
def __init__(self, db_pool):
self.db = db_pool
async def record_conversation(self, metrics: ConversationMetrics):
async with self.db.acquire() as conn:
await conn.execute("""
INSERT INTO conversation_analytics
(session_id, tenant_id, started_at, ended_at, turn_count,
resolved, escalated, user_satisfaction, total_tokens,
total_cost_usd, avg_response_time_ms, tools_used, channel)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13)
""", metrics.session_id, metrics.tenant_id,
metrics.started_at, metrics.ended_at, metrics.turn_count,
metrics.resolved, metrics.escalated, metrics.user_satisfaction,
metrics.total_tokens, metrics.total_cost_usd,
metrics.avg_response_time_ms,
",".join(metrics.tools_used or []), metrics.channel)
async def get_dashboard_data(self, tenant_id: str, days: int = 7) -> dict:
async with self.db.acquire() as conn:
stats = await conn.fetchrow("""
SELECT
COUNT(*) as total_conversations,
AVG(turn_count) as avg_turns,
SUM(CASE WHEN resolved THEN 1 ELSE 0 END)::float
/ NULLIF(COUNT(*), 0) * 100 as resolution_rate,
SUM(CASE WHEN escalated THEN 1 ELSE 0 END)::float
/ NULLIF(COUNT(*), 0) * 100 as escalation_rate,
AVG(NULLIF(user_satisfaction, 0)) as avg_satisfaction,
SUM(total_tokens) as total_tokens,
SUM(total_cost_usd) as total_cost,
AVG(avg_response_time_ms) as avg_latency_ms
FROM conversation_analytics
WHERE tenant_id = $1
AND started_at >= NOW() - INTERVAL '%s days'
""" % days, tenant_id)
return {
"total_conversations": stats["total_conversations"],
"avg_turns_per_conversation": round(float(stats["avg_turns"] or 0), 1),
"resolution_rate_pct": round(float(stats["resolution_rate"] or 0), 1),
"escalation_rate_pct": round(float(stats["escalation_rate"] or 0), 1),
"avg_satisfaction": round(float(stats["avg_satisfaction"] or 0), 2),
"total_tokens_used": stats["total_tokens"] or 0,
"total_cost_usd": round(float(stats["total_cost"] or 0), 2),
"avg_response_time_ms": round(float(stats["avg_latency_ms"] or 0))
}
A/B Testing Chatbot Responses
Test different system prompts, models, and temperature settings to optimize conversation quality.
# A/B testing framework for chatbot responses
import hashlib
import random
@dataclass
class Experiment:
name: str
variants: Dict[str, dict] # variant_name -> config overrides
traffic_split: Dict[str, float] # variant_name -> percentage (0.0-1.0)
active: bool = True
class ABTestManager:
"""A/B test different chatbot configurations."""
def __init__(self, redis_client):
self.redis = redis_client
self.experiments: Dict[str, Experiment] = {}
def register_experiment(self, experiment: Experiment):
self.experiments[experiment.name] = experiment
def assign_variant(self, experiment_name: str, user_id: str) -> str:
"""Deterministically assign user to a variant (consistent hashing)."""
experiment = self.experiments.get(experiment_name)
if not experiment or not experiment.active:
return "control"
# Check if user already assigned
cache_key = f"ab:{experiment_name}:{user_id}"
cached = self.redis.get(cache_key)
if cached:
return cached.decode()
# Deterministic assignment based on user_id hash
hash_val = int(hashlib.md5(f"{experiment_name}:{user_id}".encode()).hexdigest(), 16)
normalized = (hash_val % 10000) / 10000.0
cumulative = 0.0
assigned = "control"
for variant_name, split in experiment.traffic_split.items():
cumulative += split
if normalized < cumulative:
assigned = variant_name
break
self.redis.setex(cache_key, 86400 * 30, assigned)
return assigned
def get_config_overrides(self, experiment_name: str,
user_id: str) -> dict:
experiment = self.experiments.get(experiment_name)
if not experiment:
return {}
variant = self.assign_variant(experiment_name, user_id)
return experiment.variants.get(variant, {})
# --- Example experiment ---
ab_manager = ABTestManager(redis_client=None) # Pass your Redis client
ab_manager.register_experiment(Experiment(
name="system_prompt_v2",
variants={
"control": {"system_prompt": "You are a helpful customer support agent..."},
"concise": {"system_prompt": "You are a customer support agent. Be extremely concise. Max 2 sentences per response."},
"empathetic": {"system_prompt": "You are a warm, empathetic customer support agent. Acknowledge feelings before solving problems."}
},
traffic_split={"control": 0.34, "concise": 0.33, "empathetic": 0.33}
))
Cost Management with Token Budgets
LLM API costs can spiral out of control. Implement per-conversation and per-tenant token budgets to keep costs predictable.
# Token budget manager
class TokenBudgetManager:
"""Track and enforce token spending limits."""
# Pricing per 1M tokens (input/output) as of 2026
MODEL_PRICING = {
"gpt-4o": {"input": 2.50, "output": 10.00},
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
"claude-sonnet": {"input": 3.00, "output": 15.00},
"claude-haiku": {"input": 0.25, "output": 1.25},
}
def __init__(self, redis_client):
self.redis = redis_client
def estimate_cost(self, model: str, input_tokens: int,
output_tokens: int) -> float:
pricing = self.MODEL_PRICING.get(model, {"input": 5.0, "output": 15.0})
input_cost = (input_tokens / 1_000_000) * pricing["input"]
output_cost = (output_tokens / 1_000_000) * pricing["output"]
return input_cost + output_cost
def check_conversation_budget(self, session_id: str,
max_cost_usd: float = 0.50) -> bool:
"""Check if conversation is within budget."""
spent = float(self.redis.get(f"budget:session:{session_id}") or 0)
return spent < max_cost_usd
def record_spend(self, session_id: str, tenant_id: str,
model: str, input_tokens: int, output_tokens: int):
cost = self.estimate_cost(model, input_tokens, output_tokens)
pipe = self.redis.pipeline()
# Per-session tracking
pipe.incrbyfloat(f"budget:session:{session_id}", cost)
pipe.expire(f"budget:session:{session_id}", 7200)
# Per-tenant daily tracking
from datetime import datetime
today = datetime.utcnow().strftime("%Y-%m-%d")
pipe.incrbyfloat(f"budget:tenant:{tenant_id}:{today}", cost)
pipe.expire(f"budget:tenant:{tenant_id}:{today}", 86400 * 2)
# Token counters
pipe.incrby(f"tokens:tenant:{tenant_id}:input", input_tokens)
pipe.incrby(f"tokens:tenant:{tenant_id}:output", output_tokens)
pipe.execute()
return cost
def get_cost_summary(self, tenant_id: str) -> dict:
from datetime import datetime, timedelta
today = datetime.utcnow()
daily_costs = {}
for i in range(7):
day = (today - timedelta(days=i)).strftime("%Y-%m-%d")
cost = float(self.redis.get(f"budget:tenant:{tenant_id}:{day}") or 0)
daily_costs[day] = round(cost, 4)
return {
"daily_costs": daily_costs,
"total_7d": round(sum(daily_costs.values()), 2),
"avg_daily": round(sum(daily_costs.values()) / 7, 2)
}
Monitoring Dashboard Metrics
Key metrics to track in your monitoring dashboard (Grafana, Datadog, or custom).
| Metric | Target | Alert Threshold | Why It Matters |
|---|---|---|---|
| P50 Response Time | <2s | >5s | User experience; slow bots get abandoned |
| P99 Response Time | <8s | >15s | Tail latency indicates system issues |
| Resolution Rate | >75% | <50% | Core measure of chatbot effectiveness |
| Escalation Rate | <15% | >30% | Too high means bot isn't handling enough |
| Error Rate | <1% | >5% | Tool failures, LLM errors, timeouts |
| Daily Active Users | Growing | -20% WoW | Adoption and engagement health |
| Cost per Conversation | <$0.10 | >$0.50 | Financial sustainability |
| User Satisfaction (CSAT) | >4.0/5 | <3.0/5 | Direct measure of quality |
| Safety Violations Blocked | Tracked | Spike >3x | Attack detection, policy gaps |
Lilly Tech Systems