Scaling RAG in Production
A demo RAG system handles one user at a time. A production RAG system handles thousands of concurrent queries across multiple tenants while keeping costs under control. This lesson covers the architecture patterns, caching strategies, and operational practices that make RAG systems work at scale.
Multi-Tenant RAG Architecture
Most production RAG systems serve multiple customers, departments, or teams, each with their own document collections and access controls. There are three multi-tenancy patterns:
Pattern 1: Collection-per-Tenant (Recommended)
Each tenant gets their own vector collection (or namespace). This provides strong data isolation and allows per-tenant configuration.
# Qdrant: separate collection per tenant
def create_tenant(tenant_id: str, vector_size: int = 1536):
qdrant_client.create_collection(
collection_name=f"tenant_{tenant_id}",
vectors_config=VectorParams(size=vector_size, distance=Distance.COSINE)
)
def query_tenant(tenant_id: str, query_vector: list[float], top_k: int = 5):
return qdrant_client.search(
collection_name=f"tenant_{tenant_id}",
query_vector=query_vector,
limit=top_k
)
# Pinecone: namespace-per-tenant (same index, isolated data)
index = pinecone.Index("production-rag")
def query_tenant_pinecone(tenant_id: str, query_vector, top_k=5):
return index.query(
vector=query_vector,
top_k=top_k,
namespace=tenant_id, # Data isolation via namespaces
include_metadata=True
)
Pattern 2: Shared Collection with Metadata Filtering
All tenants share one collection, with a tenant_id metadata field used for filtering. Simpler to manage but weaker isolation.
# Shared collection with metadata filter
results = qdrant_client.search(
collection_name="shared_docs",
query_vector=query_vector,
query_filter=Filter(
must=[
FieldCondition(key="tenant_id", match=MatchValue(value="acme-corp")),
FieldCondition(key="access_level", match=MatchAny(any=["public", "internal"])),
]
),
limit=5
)
Multi-Tenancy Comparison
| Pattern | Isolation | Ops Complexity | Cost | Best For |
|---|---|---|---|---|
| Collection-per-tenant | Strong | Higher (many collections) | Higher (per-collection overhead) | Enterprise B2B, compliance-heavy |
| Namespace-per-tenant | Good (Pinecone-specific) | Low | Medium | SaaS products on Pinecone |
| Shared + metadata filter | Weak (app-level only) | Lowest | Lowest | Internal tools, low-risk use cases |
Caching Strategies
Caching is the most effective way to reduce RAG costs and latency. There are three layers of caching you should implement:
Layer 1: Semantic Query Cache
Cache answers for semantically similar questions. If a user asks "How do I reset my password?" and another asks "password reset steps," they should get the same cached answer.
import hashlib
import redis
import numpy as np
class SemanticCache:
def __init__(self, redis_client, embeddings, threshold=0.95):
self.redis = redis_client
self.embeddings = embeddings
self.threshold = threshold
self.cache_vectors = []
self.cache_keys = []
def get(self, query: str) -> str | None:
"""Check if a semantically similar query was already answered."""
query_vec = self.embeddings.embed_query(query)
for i, cached_vec in enumerate(self.cache_vectors):
similarity = np.dot(query_vec, cached_vec)
if similarity >= self.threshold:
cached_answer = self.redis.get(self.cache_keys[i])
if cached_answer:
return cached_answer.decode()
return None
def set(self, query: str, answer: str, ttl=3600):
"""Cache a query-answer pair."""
query_vec = self.embeddings.embed_query(query)
cache_key = f"rag:cache:{hashlib.md5(query.encode()).hexdigest()}"
self.redis.setex(cache_key, ttl, answer)
self.cache_vectors.append(query_vec)
self.cache_keys.append(cache_key)
# Usage in RAG pipeline
cache = SemanticCache(redis.Redis(), embeddings, threshold=0.95)
def rag_with_cache(query: str) -> str:
# Check cache first
cached = cache.get(query)
if cached:
return cached # Cache hit: $0, ~5ms
# Cache miss: run full RAG pipeline
answer = full_rag_pipeline(query) # ~$0.01, ~2000ms
cache.set(query, answer, ttl=3600)
return answer
Layer 2: Embedding Cache
# Cache embeddings to avoid re-computing for the same text
embedding_cache = {}
def cached_embed(text: str) -> list[float]:
cache_key = hashlib.md5(text.encode()).hexdigest()
if cache_key not in embedding_cache:
embedding_cache[cache_key] = embeddings.embed_query(text)
return embedding_cache[cache_key]
Layer 3: LLM Response Cache
# Use LangChain's built-in caching
from langchain.cache import RedisCache
from langchain.globals import set_llm_cache
set_llm_cache(RedisCache(redis_client=redis.Redis()))
# Identical prompts (same context + same question) return cached response
Cost Optimization: Per-Query Cost Analysis
Understanding your per-query cost breakdown is essential for budgeting and optimization. Here is a typical production RAG query cost analysis:
| Component | Cost per Query | % of Total | Optimization Strategy |
|---|---|---|---|
| Query embedding | $0.000002 | 0.02% | Cache repeated queries |
| Vector search | $0.0001 | 1% | Optimize index, reduce top-K |
| Re-ranking | $0.001 | 10% | Self-host cross-encoder model |
| LLM generation | $0.008 | 80% | Reduce context size, use smaller model, cache |
| Multi-query (optional) | $0.001 | 9% | Use gpt-4o-mini for query generation |
| Total: ~$0.01 per query (~$10 per 1,000 queries) | |||
Tiered Model Strategy
def select_model(query: str, context_complexity: str) -> str:
"""Route to cheaper model when possible."""
if context_complexity == "simple":
return "gpt-4o-mini" # $0.15/1M input tokens
elif context_complexity == "moderate":
return "gpt-4o-mini" # Still sufficient
else:
return "gpt-4o" # $2.50/1M input tokens, for complex reasoning
def classify_complexity(query: str, contexts: list) -> str:
"""Quick classification of query complexity."""
if len(contexts) <= 2 and len(query.split()) < 20:
return "simple"
elif len(contexts) <= 5:
return "moderate"
return "complex"
Monitoring Retrieval Quality
In production, you need continuous monitoring to detect quality degradation before users complain.
import time
from dataclasses import dataclass
@dataclass
class RAGMetrics:
query: str
latency_ms: float
num_chunks_retrieved: int
avg_similarity_score: float
model_used: str
cache_hit: bool
estimated_cost: float
user_feedback: str | None = None # "thumbs_up", "thumbs_down", None
class RAGMonitor:
def __init__(self, metrics_backend):
self.backend = metrics_backend # Prometheus, Datadog, etc.
def record_query(self, metrics: RAGMetrics):
"""Record metrics for dashboarding and alerting."""
self.backend.histogram("rag.latency_ms", metrics.latency_ms)
self.backend.gauge("rag.avg_similarity", metrics.avg_similarity_score)
self.backend.counter("rag.queries_total", 1, tags={
"model": metrics.model_used,
"cache_hit": str(metrics.cache_hit),
})
self.backend.histogram("rag.cost_per_query", metrics.estimated_cost)
# Alert on quality degradation
if metrics.avg_similarity_score < 0.3:
self.backend.event(
"Low retrieval quality",
f"Query '{metrics.query[:50]}' had avg similarity {metrics.avg_similarity_score:.2f}",
alert_type="warning"
)
def daily_quality_report(self):
"""Generate daily quality dashboard."""
return {
"total_queries": self.backend.query("sum(rag.queries_total)"),
"cache_hit_rate": self.backend.query("rag.cache_hit_rate"),
"p50_latency_ms": self.backend.query("p50(rag.latency_ms)"),
"p99_latency_ms": self.backend.query("p99(rag.latency_ms)"),
"avg_similarity": self.backend.query("avg(rag.avg_similarity)"),
"thumbs_down_rate": self.backend.query("rag.negative_feedback_rate"),
"daily_cost": self.backend.query("sum(rag.cost_per_query)"),
}
Incremental Index Updates
Re-indexing your entire document collection every time a document changes is wasteful. Implement incremental updates that only process changed documents.
class IncrementalIndexer:
def __init__(self, vector_store, embeddings, doc_tracker_db):
self.vector_store = vector_store
self.embeddings = embeddings
self.tracker = doc_tracker_db # Tracks document hashes
def sync(self, documents_dir: str):
"""Sync vector index with document directory."""
current_docs = self._scan_directory(documents_dir)
tracked_docs = self.tracker.get_all()
# Find new and modified documents
to_add = []
to_update = []
to_delete = []
for path, content_hash in current_docs.items():
if path not in tracked_docs:
to_add.append(path)
elif tracked_docs[path] != content_hash:
to_update.append(path)
for path in tracked_docs:
if path not in current_docs:
to_delete.append(path)
# Process changes
for path in to_delete:
self.vector_store.delete(filter={"source": path})
self.tracker.remove(path)
for path in to_update:
self.vector_store.delete(filter={"source": path})
self._ingest(path)
self.tracker.update(path, current_docs[path])
for path in to_add:
self._ingest(path)
self.tracker.add(path, current_docs[path])
return {
"added": len(to_add),
"updated": len(to_update),
"deleted": len(to_delete),
"unchanged": len(current_docs) - len(to_add) - len(to_update),
}
# Run on a schedule (e.g., every 15 minutes)
# indexer.sync("/data/knowledge-base/")
Key Takeaways
- Use collection-per-tenant for B2B SaaS with strict data isolation; shared collections with metadata filtering for internal tools.
- Implement three caching layers: semantic query cache, embedding cache, and LLM response cache.
- LLM generation is 80% of per-query cost — optimize with caching, context compression, and tiered model routing.
- Monitor retrieval quality continuously: track similarity scores, latency, cache hit rates, and user feedback.
- Build incremental indexing from day one to avoid full re-indexing on every document change.
Lilly Tech Systems