Advanced

Scaling RAG in Production

A demo RAG system handles one user at a time. A production RAG system handles thousands of concurrent queries across multiple tenants while keeping costs under control. This lesson covers the architecture patterns, caching strategies, and operational practices that make RAG systems work at scale.

Multi-Tenant RAG Architecture

Most production RAG systems serve multiple customers, departments, or teams, each with their own document collections and access controls. There are three multi-tenancy patterns:

Pattern 1: Collection-per-Tenant (Recommended)

Each tenant gets their own vector collection (or namespace). This provides strong data isolation and allows per-tenant configuration.

# Qdrant: separate collection per tenant
def create_tenant(tenant_id: str, vector_size: int = 1536):
    qdrant_client.create_collection(
        collection_name=f"tenant_{tenant_id}",
        vectors_config=VectorParams(size=vector_size, distance=Distance.COSINE)
    )

def query_tenant(tenant_id: str, query_vector: list[float], top_k: int = 5):
    return qdrant_client.search(
        collection_name=f"tenant_{tenant_id}",
        query_vector=query_vector,
        limit=top_k
    )

# Pinecone: namespace-per-tenant (same index, isolated data)
index = pinecone.Index("production-rag")

def query_tenant_pinecone(tenant_id: str, query_vector, top_k=5):
    return index.query(
        vector=query_vector,
        top_k=top_k,
        namespace=tenant_id,  # Data isolation via namespaces
        include_metadata=True
    )

Pattern 2: Shared Collection with Metadata Filtering

All tenants share one collection, with a tenant_id metadata field used for filtering. Simpler to manage but weaker isolation.

# Shared collection with metadata filter
results = qdrant_client.search(
    collection_name="shared_docs",
    query_vector=query_vector,
    query_filter=Filter(
        must=[
            FieldCondition(key="tenant_id", match=MatchValue(value="acme-corp")),
            FieldCondition(key="access_level", match=MatchAny(any=["public", "internal"])),
        ]
    ),
    limit=5
)

Multi-Tenancy Comparison

PatternIsolationOps ComplexityCostBest For
Collection-per-tenant Strong Higher (many collections) Higher (per-collection overhead) Enterprise B2B, compliance-heavy
Namespace-per-tenant Good (Pinecone-specific) Low Medium SaaS products on Pinecone
Shared + metadata filter Weak (app-level only) Lowest Lowest Internal tools, low-risk use cases
💡
Apply at work: Use collection-per-tenant for B2B SaaS where data leaks between tenants would be a security incident. Use shared-collection with metadata filtering for internal tools where all users are within the same organization.

Caching Strategies

Caching is the most effective way to reduce RAG costs and latency. There are three layers of caching you should implement:

Layer 1: Semantic Query Cache

Cache answers for semantically similar questions. If a user asks "How do I reset my password?" and another asks "password reset steps," they should get the same cached answer.

import hashlib
import redis
import numpy as np

class SemanticCache:
    def __init__(self, redis_client, embeddings, threshold=0.95):
        self.redis = redis_client
        self.embeddings = embeddings
        self.threshold = threshold
        self.cache_vectors = []
        self.cache_keys = []

    def get(self, query: str) -> str | None:
        """Check if a semantically similar query was already answered."""
        query_vec = self.embeddings.embed_query(query)

        for i, cached_vec in enumerate(self.cache_vectors):
            similarity = np.dot(query_vec, cached_vec)
            if similarity >= self.threshold:
                cached_answer = self.redis.get(self.cache_keys[i])
                if cached_answer:
                    return cached_answer.decode()
        return None

    def set(self, query: str, answer: str, ttl=3600):
        """Cache a query-answer pair."""
        query_vec = self.embeddings.embed_query(query)
        cache_key = f"rag:cache:{hashlib.md5(query.encode()).hexdigest()}"

        self.redis.setex(cache_key, ttl, answer)
        self.cache_vectors.append(query_vec)
        self.cache_keys.append(cache_key)

# Usage in RAG pipeline
cache = SemanticCache(redis.Redis(), embeddings, threshold=0.95)

def rag_with_cache(query: str) -> str:
    # Check cache first
    cached = cache.get(query)
    if cached:
        return cached  # Cache hit: $0, ~5ms

    # Cache miss: run full RAG pipeline
    answer = full_rag_pipeline(query)  # ~$0.01, ~2000ms
    cache.set(query, answer, ttl=3600)
    return answer

Layer 2: Embedding Cache

# Cache embeddings to avoid re-computing for the same text
embedding_cache = {}

def cached_embed(text: str) -> list[float]:
    cache_key = hashlib.md5(text.encode()).hexdigest()
    if cache_key not in embedding_cache:
        embedding_cache[cache_key] = embeddings.embed_query(text)
    return embedding_cache[cache_key]

Layer 3: LLM Response Cache

# Use LangChain's built-in caching
from langchain.cache import RedisCache
from langchain.globals import set_llm_cache

set_llm_cache(RedisCache(redis_client=redis.Redis()))
# Identical prompts (same context + same question) return cached response

Cost Optimization: Per-Query Cost Analysis

Understanding your per-query cost breakdown is essential for budgeting and optimization. Here is a typical production RAG query cost analysis:

ComponentCost per Query% of TotalOptimization Strategy
Query embedding $0.000002 0.02% Cache repeated queries
Vector search $0.0001 1% Optimize index, reduce top-K
Re-ranking $0.001 10% Self-host cross-encoder model
LLM generation $0.008 80% Reduce context size, use smaller model, cache
Multi-query (optional) $0.001 9% Use gpt-4o-mini for query generation
Total: ~$0.01 per query (~$10 per 1,000 queries)
💡
Apply at work: LLM generation is 80% of your cost. The highest-impact cost optimizations are: (1) Semantic caching (eliminates 30–50% of LLM calls), (2) Context compression (reduces prompt tokens by 30%), (3) Using gpt-4o-mini instead of gpt-4o for simple queries (5x cheaper). A tiered model approach can cut costs by 60%.

Tiered Model Strategy

def select_model(query: str, context_complexity: str) -> str:
    """Route to cheaper model when possible."""
    if context_complexity == "simple":
        return "gpt-4o-mini"      # $0.15/1M input tokens
    elif context_complexity == "moderate":
        return "gpt-4o-mini"      # Still sufficient
    else:
        return "gpt-4o"           # $2.50/1M input tokens, for complex reasoning

def classify_complexity(query: str, contexts: list) -> str:
    """Quick classification of query complexity."""
    if len(contexts) <= 2 and len(query.split()) < 20:
        return "simple"
    elif len(contexts) <= 5:
        return "moderate"
    return "complex"

Monitoring Retrieval Quality

In production, you need continuous monitoring to detect quality degradation before users complain.

import time
from dataclasses import dataclass

@dataclass
class RAGMetrics:
    query: str
    latency_ms: float
    num_chunks_retrieved: int
    avg_similarity_score: float
    model_used: str
    cache_hit: bool
    estimated_cost: float
    user_feedback: str | None = None  # "thumbs_up", "thumbs_down", None

class RAGMonitor:
    def __init__(self, metrics_backend):
        self.backend = metrics_backend  # Prometheus, Datadog, etc.

    def record_query(self, metrics: RAGMetrics):
        """Record metrics for dashboarding and alerting."""
        self.backend.histogram("rag.latency_ms", metrics.latency_ms)
        self.backend.gauge("rag.avg_similarity", metrics.avg_similarity_score)
        self.backend.counter("rag.queries_total", 1, tags={
            "model": metrics.model_used,
            "cache_hit": str(metrics.cache_hit),
        })
        self.backend.histogram("rag.cost_per_query", metrics.estimated_cost)

        # Alert on quality degradation
        if metrics.avg_similarity_score < 0.3:
            self.backend.event(
                "Low retrieval quality",
                f"Query '{metrics.query[:50]}' had avg similarity {metrics.avg_similarity_score:.2f}",
                alert_type="warning"
            )

    def daily_quality_report(self):
        """Generate daily quality dashboard."""
        return {
            "total_queries": self.backend.query("sum(rag.queries_total)"),
            "cache_hit_rate": self.backend.query("rag.cache_hit_rate"),
            "p50_latency_ms": self.backend.query("p50(rag.latency_ms)"),
            "p99_latency_ms": self.backend.query("p99(rag.latency_ms)"),
            "avg_similarity": self.backend.query("avg(rag.avg_similarity)"),
            "thumbs_down_rate": self.backend.query("rag.negative_feedback_rate"),
            "daily_cost": self.backend.query("sum(rag.cost_per_query)"),
        }

Incremental Index Updates

Re-indexing your entire document collection every time a document changes is wasteful. Implement incremental updates that only process changed documents.

class IncrementalIndexer:
    def __init__(self, vector_store, embeddings, doc_tracker_db):
        self.vector_store = vector_store
        self.embeddings = embeddings
        self.tracker = doc_tracker_db  # Tracks document hashes

    def sync(self, documents_dir: str):
        """Sync vector index with document directory."""
        current_docs = self._scan_directory(documents_dir)
        tracked_docs = self.tracker.get_all()

        # Find new and modified documents
        to_add = []
        to_update = []
        to_delete = []

        for path, content_hash in current_docs.items():
            if path not in tracked_docs:
                to_add.append(path)
            elif tracked_docs[path] != content_hash:
                to_update.append(path)

        for path in tracked_docs:
            if path not in current_docs:
                to_delete.append(path)

        # Process changes
        for path in to_delete:
            self.vector_store.delete(filter={"source": path})
            self.tracker.remove(path)

        for path in to_update:
            self.vector_store.delete(filter={"source": path})
            self._ingest(path)
            self.tracker.update(path, current_docs[path])

        for path in to_add:
            self._ingest(path)
            self.tracker.add(path, current_docs[path])

        return {
            "added": len(to_add),
            "updated": len(to_update),
            "deleted": len(to_delete),
            "unchanged": len(current_docs) - len(to_add) - len(to_update),
        }

# Run on a schedule (e.g., every 15 minutes)
# indexer.sync("/data/knowledge-base/")

Key Takeaways

  • Use collection-per-tenant for B2B SaaS with strict data isolation; shared collections with metadata filtering for internal tools.
  • Implement three caching layers: semantic query cache, embedding cache, and LLM response cache.
  • LLM generation is 80% of per-query cost — optimize with caching, context compression, and tiered model routing.
  • Monitor retrieval quality continuously: track similarity scores, latency, cache hit rates, and user feedback.
  • Build incremental indexing from day one to avoid full re-indexing on every document change.