Intermediate

Step 2: Embedding & Vector Store

Now that we can ingest and chunk documents, we need to convert those chunks into vector embeddings and store them in Qdrant. This step makes your documents searchable by semantic meaning rather than just keywords.

What We Are Building

Two new modules that connect ingestion to search:

  • app/embeddings/embedder.py — Wrapper around OpenAI's embedding API with batching and retry logic
  • app/vectorstore/qdrant_store.py — Qdrant client that creates collections, upserts vectors, and supports hybrid search

Embedding Module

The embedder converts text into 1536-dimensional vectors using OpenAI's text-embedding-3-small model. We batch requests to stay under rate limits and add retry logic for reliability.

# app/embeddings/embedder.py
"""OpenAI embedding wrapper with batching and retry."""
import logging
import time
from openai import OpenAI, RateLimitError

from app.config import get_settings
from app.ingestion.chunker import Chunk

logger = logging.getLogger(__name__)
settings = get_settings()

# OpenAI client - initialized once
_client = OpenAI(api_key=settings.openai_api_key)

# Maximum batch size for OpenAI embeddings API
MAX_BATCH_SIZE = 2048


def embed_texts(
    texts: list[str],
    model: str | None = None,
    max_retries: int = 3
) -> list[list[float]]:
    """Generate embeddings for a list of texts.

    Handles batching and retry logic automatically.

    Args:
        texts: List of text strings to embed.
        model: Embedding model name. Defaults to config value.
        max_retries: Number of retries on rate limit errors.

    Returns:
        List of embedding vectors (each is a list of floats).
    """
    model = model or settings.openai_embedding_model
    all_embeddings = []

    # Process in batches
    for i in range(0, len(texts), MAX_BATCH_SIZE):
        batch = texts[i:i + MAX_BATCH_SIZE]

        for attempt in range(max_retries):
            try:
                response = _client.embeddings.create(
                    input=batch,
                    model=model
                )
                batch_embeddings = [item.embedding for item in response.data]
                all_embeddings.extend(batch_embeddings)

                logger.debug(
                    f"Embedded batch {i // MAX_BATCH_SIZE + 1} "
                    f"({len(batch)} texts, {response.usage.total_tokens} tokens)"
                )
                break

            except RateLimitError as e:
                wait_time = 2 ** attempt
                logger.warning(
                    f"Rate limited, retrying in {wait_time}s "
                    f"(attempt {attempt + 1}/{max_retries})"
                )
                time.sleep(wait_time)

                if attempt == max_retries - 1:
                    raise

    logger.info(f"Generated {len(all_embeddings)} embeddings")
    return all_embeddings


def embed_query(query: str, model: str | None = None) -> list[float]:
    """Generate embedding for a single query string.

    Args:
        query: The search query text.
        model: Embedding model name.

    Returns:
        Embedding vector as a list of floats.
    """
    embeddings = embed_texts([query], model=model)
    return embeddings[0]


def embed_chunks(chunks: list[Chunk]) -> list[tuple[Chunk, list[float]]]:
    """Generate embeddings for a list of chunks.

    Args:
        chunks: List of Chunk objects to embed.

    Returns:
        List of (chunk, embedding) tuples.
    """
    texts = [chunk.text for chunk in chunks]
    embeddings = embed_texts(texts)
    return list(zip(chunks, embeddings))

Qdrant Vector Store Module

This module handles all interactions with Qdrant: creating collections, upserting vectors with metadata, and searching.

# app/vectorstore/qdrant_store.py
"""Qdrant vector store wrapper."""
import logging
import uuid
from qdrant_client import QdrantClient
from qdrant_client.models import (
    Distance,
    VectorParams,
    PointStruct,
    Filter,
    FieldCondition,
    MatchValue,
    SearchParams,
)

from app.config import get_settings
from app.ingestion.chunker import Chunk

logger = logging.getLogger(__name__)
settings = get_settings()


class QdrantStore:
    """Manages the Qdrant vector database for RAG."""

    VECTOR_DIM = 1536  # text-embedding-3-small dimension

    def __init__(
        self,
        host: str | None = None,
        port: int | None = None,
        collection_name: str | None = None,
    ):
        self.host = host or settings.qdrant_host
        self.port = port or settings.qdrant_port
        self.collection_name = collection_name or settings.qdrant_collection

        self.client = QdrantClient(host=self.host, port=self.port)
        logger.info(f"Connected to Qdrant at {self.host}:{self.port}")

    def ensure_collection(self) -> None:
        """Create the collection if it does not exist."""
        collections = self.client.get_collections().collections
        existing_names = [c.name for c in collections]

        if self.collection_name not in existing_names:
            self.client.create_collection(
                collection_name=self.collection_name,
                vectors_config=VectorParams(
                    size=self.VECTOR_DIM,
                    distance=Distance.COSINE,
                ),
            )
            logger.info(f"Created collection: {self.collection_name}")
        else:
            logger.info(f"Collection already exists: {self.collection_name}")

    def upsert_chunks(
        self,
        chunks: list[Chunk],
        embeddings: list[list[float]]
    ) -> int:
        """Upsert chunk embeddings into Qdrant.

        Args:
            chunks: List of text chunks with metadata.
            embeddings: Corresponding embedding vectors.

        Returns:
            Number of points upserted.
        """
        if len(chunks) != len(embeddings):
            raise ValueError(
                f"Chunks ({len(chunks)}) and embeddings ({len(embeddings)}) "
                f"must have the same length"
            )

        self.ensure_collection()

        points = []
        for chunk, embedding in zip(chunks, embeddings):
            point_id = str(uuid.uuid5(uuid.NAMESPACE_DNS, chunk.chunk_id))
            points.append(PointStruct(
                id=point_id,
                vector=embedding,
                payload={
                    "text": chunk.text,
                    "chunk_id": chunk.chunk_id,
                    **chunk.metadata,
                }
            ))

        # Upsert in batches of 100
        batch_size = 100
        for i in range(0, len(points), batch_size):
            batch = points[i:i + batch_size]
            self.client.upsert(
                collection_name=self.collection_name,
                points=batch,
            )

        logger.info(f"Upserted {len(points)} points to {self.collection_name}")
        return len(points)

    def search(
        self,
        query_embedding: list[float],
        top_k: int | None = None,
        source_filter: str | None = None,
    ) -> list[dict]:
        """Search for similar documents.

        Args:
            query_embedding: The query vector.
            top_k: Number of results to return.
            source_filter: Optional filter by source filename.

        Returns:
            List of search results with text, score, and metadata.
        """
        top_k = top_k or settings.top_k

        # Build optional filter
        query_filter = None
        if source_filter:
            query_filter = Filter(
                must=[
                    FieldCondition(
                        key="source",
                        match=MatchValue(value=source_filter)
                    )
                ]
            )

        results = self.client.search(
            collection_name=self.collection_name,
            query_vector=query_embedding,
            limit=top_k,
            query_filter=query_filter,
            search_params=SearchParams(exact=False, hnsw_ef=128),
        )

        return [
            {
                "text": hit.payload.get("text", ""),
                "score": hit.score,
                "source": hit.payload.get("source", ""),
                "page_number": hit.payload.get("page_number"),
                "chunk_id": hit.payload.get("chunk_id", ""),
                "metadata": hit.payload,
            }
            for hit in results
        ]

    def get_collection_info(self) -> dict:
        """Get information about the collection."""
        try:
            info = self.client.get_collection(self.collection_name)
            return {
                "name": self.collection_name,
                "points_count": info.points_count,
                "vectors_count": info.vectors_count,
                "status": info.status.value,
            }
        except Exception:
            return {"name": self.collection_name, "status": "not_found"}

    def delete_collection(self) -> None:
        """Delete the entire collection. Use with caution."""
        self.client.delete_collection(self.collection_name)
        logger.warning(f"Deleted collection: {self.collection_name}")
💡
Why UUID5 for point IDs? We generate deterministic UUIDs from chunk IDs using uuid5. This means re-ingesting the same document updates existing vectors instead of creating duplicates. Idempotent ingestion is critical for production systems where you re-index documents regularly.

Wire It All Together

Update the ingestion endpoint in app/main.py to embed and store chunks after ingestion:

# Add to app/main.py - update the ingest endpoint
from app.embeddings.embedder import embed_chunks
from app.vectorstore.qdrant_store import QdrantStore

# Initialize vector store
vector_store = QdrantStore()


@app.post("/api/ingest")
async def ingest_documents(directory: str = "data/sample"):
    """Ingest, embed, and store documents from a directory."""
    try:
        # Step 1: Load and chunk
        result = run_ingestion(directory, verbose=True)
        if not result.chunks:
            return {"status": "no_documents", "errors": result.errors}

        # Step 2: Generate embeddings
        chunk_embedding_pairs = embed_chunks(result.chunks)
        chunks = [pair[0] for pair in chunk_embedding_pairs]
        embeddings = [pair[1] for pair in chunk_embedding_pairs]

        # Step 3: Store in Qdrant
        num_stored = vector_store.upsert_chunks(chunks, embeddings)

        return {
            "status": "success",
            "total_files": result.total_files,
            "total_chunks": result.total_chunks,
            "vectors_stored": num_stored,
            "elapsed_seconds": result.elapsed_seconds,
        }
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))


@app.get("/api/collection")
async def collection_info():
    """Get vector store collection statistics."""
    return vector_store.get_collection_info()

Test Embedding and Storage

# 1. Make sure Qdrant is running
docker-compose up -d qdrant

# 2. Ingest the sample documents (this now embeds and stores them)
curl -X POST "http://localhost:8000/api/ingest?directory=data/sample"
# Expected:
# {
#   "status": "success",
#   "total_files": 1,
#   "total_chunks": 2,
#   "vectors_stored": 2,
#   "elapsed_seconds": 1.23
# }

# 3. Check the collection
curl http://localhost:8000/api/collection
# Expected:
# {
#   "name": "rag_chatbot_docs",
#   "points_count": 2,
#   "vectors_count": 2,
#   "status": "green"
# }

Key Takeaways

  • OpenAI's text-embedding-3-small produces 1536-dimensional vectors at $0.02 per million tokens — extremely cost-effective.
  • Batch embedding with retry logic handles rate limits gracefully without crashing the pipeline.
  • Qdrant stores vectors with full metadata payloads, enabling both semantic search and filtered queries.
  • Deterministic UUID5 IDs make ingestion idempotent — re-running updates rather than duplicates.
  • HNSW indexing with ef=128 gives a good balance of speed and recall for most use cases.

What Is Next

Your documents are now embedded and searchable. In the next lesson, you will build the retrieval pipeline with multi-query expansion, cross-encoder re-ranking, and citation tracking that links every answer back to its source.