Intermediate

Orchestration Frameworks

Multi-model AI applications require coordinating LLMs, embedding models, retrieval systems, and external tools into cohesive pipelines. Orchestration frameworks provide the abstractions and plumbing to make this manageable.

What Is Model Orchestration?

Model orchestration is the process of coordinating multiple AI models, data sources, and tools within a single application pipeline. Instead of calling one model and returning its output, an orchestrated system might:

  • Route a user query to the appropriate model based on complexity or domain
  • Chain multiple models together — one for retrieval, another for generation, a third for fact-checking
  • Manage memory across conversation turns and model calls
  • Handle errors with retries, fallbacks, and graceful degradation
  • Observe the entire pipeline for debugging, cost tracking, and performance monitoring
💡
Why not just call APIs directly? You absolutely can for simple use cases. But once you have 3+ models, retrieval, memory, tool use, and error handling, the boilerplate becomes overwhelming. Frameworks handle the common patterns so you can focus on your application logic.

LangChain Deep Dive

LangChain is the most widely adopted orchestration framework, with a massive ecosystem and active community. It provides abstractions for every component of an LLM application.

Core Concepts

  • Chains: Sequences of operations (prompt → LLM → output parser). The fundamental building block.
  • Agents: LLM-driven decision makers that choose which tools to use and in what order.
  • Tools: Functions the agent can call — search, calculators, databases, APIs, code execution.
  • Memory: Conversation history management — buffer, summary, vector-backed, and entity memory.
  • Callbacks: Hooks into every step of execution for logging, streaming, and monitoring.
  • LCEL (LangChain Expression Language): A declarative syntax for composing chains using the pipe (|) operator.

LCEL: LangChain Expression Language

LCEL is the modern way to build chains in LangChain. It uses a pipe syntax inspired by Unix, making chain composition readable and composable:

Python — LCEL Chain Composition
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic

# Define the prompt template
prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a helpful assistant that explains {topic} concepts clearly."),
    ("human", "{question}"),
])

# Create the chain using LCEL pipe syntax
chain = prompt | ChatOpenAI(model="gpt-4o") | StrOutputParser()

# Invoke the chain
result = chain.invoke({
    "topic": "machine learning",
    "question": "What is gradient descent?"
})

# Streaming is built in
for chunk in chain.stream({"topic": "AI", "question": "Explain transformers"}):
    print(chunk, end="")

# Batch processing
results = chain.batch([
    {"topic": "NLP", "question": "What is tokenization?"},
    {"topic": "CV", "question": "How do CNNs work?"},
])

LangChain Agent with Multiple Tools

Agents are the most powerful LangChain pattern. The LLM decides which tools to use based on the user's request:

Python — LangChain Agent with Tools
from langchain_openai import ChatOpenAI
from langchain.agents import create_tool_calling_agent, AgentExecutor
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.tools import tool
from langchain_community.tools import DuckDuckGoSearchRun
import subprocess

# Define custom tools
@tool
def calculator(expression: str) -> str:
    """Evaluate a mathematical expression. Input should be a valid Python math expression."""
    try:
        result = eval(expression, {"__builtins__": {}}, {"__import__": None})
        return str(result)
    except Exception as e:
        return f"Error: {e}"

@tool
def run_python_code(code: str) -> str:
    """Execute Python code and return the output. Use for data analysis or computations."""
    try:
        result = subprocess.run(
            ["python", "-c", code],
            capture_output=True, text=True, timeout=30
        )
        return result.stdout or result.stderr
    except subprocess.TimeoutExpired:
        return "Code execution timed out (30s limit)"

# Web search tool
search = DuckDuckGoSearchRun()

# Assemble tools
tools = [search, calculator, run_python_code]

# Create the prompt
prompt = ChatPromptTemplate.from_messages([
    ("system", """You are a helpful research assistant. You have access to:
- Web search for finding current information
- A calculator for math expressions
- A Python executor for data analysis and complex computations
Always show your reasoning and cite sources when using search."""),
    ("human", "{input}"),
    ("placeholder", "{agent_scratchpad}"),
])

# Create and run the agent
llm = ChatOpenAI(model="gpt-4o", temperature=0)
agent = create_tool_calling_agent(llm, tools, prompt)
executor = AgentExecutor(agent=agent, tools=tools, verbose=True)

# The agent decides which tools to use
result = executor.invoke({
    "input": "What is the current population of Tokyo? Calculate what percentage it is of Japan's total population."
})
print(result["output"])

LlamaIndex Deep Dive

LlamaIndex specializes in connecting LLMs to your data. While LangChain is a general-purpose orchestration framework, LlamaIndex excels at retrieval-augmented generation (RAG) and structured data access.

Core Concepts

  • Data Connectors: Ingest data from 160+ sources — PDFs, databases, APIs, Slack, Notion, Google Drive.
  • Indexes: Structure your data for efficient retrieval — vector, keyword, tree, knowledge graph indexes.
  • Query Engines: Interfaces for querying your indexed data with natural language.
  • Routers: Intelligently route queries to the best index or data source.
  • Agents: LlamaIndex agents that combine tool use with deep data retrieval capabilities.

LlamaIndex RAG with Sub-Question Query Engine

The sub-question query engine breaks complex queries into simpler sub-questions, routes each to the appropriate data source, and synthesizes the results:

Python — LlamaIndex Sub-Question RAG Pipeline
from llama_index.core import (
    VectorStoreIndex,
    SimpleDirectoryReader,
    ServiceContext,
)
from llama_index.core.query_engine import SubQuestionQueryEngine
from llama_index.core.tools import QueryEngineTool, ToolMetadata
from llama_index.llms.openai import OpenAI
from llama_index.embeddings.openai import OpenAIEmbedding

# Configure LLM and embedding model
llm = OpenAI(model="gpt-4o", temperature=0)
embed_model = OpenAIEmbedding(model="text-embedding-3-small")

# Load documents from different sources
finance_docs = SimpleDirectoryReader("./data/finance").load_data()
engineering_docs = SimpleDirectoryReader("./data/engineering").load_data()
marketing_docs = SimpleDirectoryReader("./data/marketing").load_data()

# Create separate indexes for each domain
finance_index = VectorStoreIndex.from_documents(
    finance_docs, embed_model=embed_model
)
engineering_index = VectorStoreIndex.from_documents(
    engineering_docs, embed_model=embed_model
)
marketing_index = VectorStoreIndex.from_documents(
    marketing_docs, embed_model=embed_model
)

# Create query engines
finance_engine = finance_index.as_query_engine(llm=llm, similarity_top_k=5)
engineering_engine = engineering_index.as_query_engine(llm=llm, similarity_top_k=5)
marketing_engine = marketing_index.as_query_engine(llm=llm, similarity_top_k=5)

# Wrap as tools with descriptions
query_engine_tools = [
    QueryEngineTool(
        query_engine=finance_engine,
        metadata=ToolMetadata(
            name="finance_data",
            description="Financial reports, revenue data, budgets, and forecasts",
        ),
    ),
    QueryEngineTool(
        query_engine=engineering_engine,
        metadata=ToolMetadata(
            name="engineering_data",
            description="Technical docs, architecture decisions, sprint reports",
        ),
    ),
    QueryEngineTool(
        query_engine=marketing_engine,
        metadata=ToolMetadata(
            name="marketing_data",
            description="Campaign results, brand guidelines, market research",
        ),
    ),
]

# Sub-question engine decomposes complex queries
sub_question_engine = SubQuestionQueryEngine.from_defaults(
    query_engine_tools=query_engine_tools,
    llm=llm,
)

# Complex query that spans multiple data sources
response = sub_question_engine.query(
    "Compare our Q3 revenue growth with engineering headcount changes "
    "and marketing spend. Are we scaling efficiently?"
)
# The engine will:
# 1. Break into sub-questions: "What was Q3 revenue growth?", "How did engineering headcount change?", etc.
# 2. Route each sub-question to the right index
# 3. Synthesize a unified answer
print(response)

Semantic Kernel (Microsoft)

Semantic Kernel is Microsoft's open-source SDK for building AI agents and multi-model applications. It integrates deeply with the Azure ecosystem and supports C#, Python, and Java.

Key Features

  • Plugins: Modular functions (native code or LLM prompts) that the kernel can orchestrate.
  • Planners: Automatically create execution plans by combining available plugins to achieve a goal.
  • Connectors: Integrations with OpenAI, Azure OpenAI, Hugging Face, and other model providers.
  • Memory: Built-in semantic memory using embeddings and vector stores.
Python — Semantic Kernel Plugin Example
import semantic_kernel as sk
from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion
from semantic_kernel.functions import kernel_function

# Initialize the kernel
kernel = sk.Kernel()

# Add Azure OpenAI service
kernel.add_service(AzureChatCompletion(
    deployment_name="gpt-4o",
    endpoint="https://your-resource.openai.azure.com/",
    api_key="your-api-key",
))

# Define a native plugin
class WeatherPlugin:
    @kernel_function(description="Get the current weather for a city")
    def get_weather(self, city: str) -> str:
        # In production, call a real weather API
        return f"Weather in {city}: 72F, sunny, humidity 45%"

    @kernel_function(description="Get a 5-day weather forecast")
    def get_forecast(self, city: str) -> str:
        return f"5-day forecast for {city}: Mon 70F, Tue 72F, Wed 68F, Thu 75F, Fri 71F"

# Register the plugin
kernel.add_plugin(WeatherPlugin(), plugin_name="weather")

# The kernel can now use these functions via the planner or direct invocation
result = await kernel.invoke_prompt(
    "What's the weather like in Seattle and should I bring an umbrella this week?",
    function_choice_behavior="auto",
)

Haystack (deepset)

Haystack by deepset is a production-grade framework focused on building search and RAG pipelines. It uses a pipeline-based architecture where components are connected like building blocks.

Python — Haystack RAG Pipeline
from haystack import Pipeline
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
from haystack.components.builders import PromptBuilder
from haystack.components.generators import OpenAIGenerator
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack import Document

# Create document store and add documents
doc_store = InMemoryDocumentStore()
doc_store.write_documents([
    Document(content="vLLM is a fast inference engine for LLMs using PagedAttention."),
    Document(content="TGI by Hugging Face provides production-ready LLM serving."),
    Document(content="BentoML allows packaging models as production services."),
])

# Define the prompt template
template = """
Given the following documents, answer the question.
Documents:
{% for doc in documents %}
  - {{ doc.content }}
{% endfor %}
Question: {{ question }}
Answer:
"""

# Build the pipeline
pipe = Pipeline()
pipe.add_component("retriever", InMemoryBM25Retriever(document_store=doc_store))
pipe.add_component("prompt_builder", PromptBuilder(template=template))
pipe.add_component("llm", OpenAIGenerator(model="gpt-4o-mini"))

# Connect components
pipe.connect("retriever", "prompt_builder.documents")
pipe.connect("prompt_builder", "llm")

# Run the pipeline
result = pipe.run({
    "retriever": {"query": "What is vLLM?"},
    "prompt_builder": {"question": "What is vLLM and why is it fast?"},
})
print(result["llm"]["replies"][0])

Framework Comparison

FeatureLangChainLlamaIndexSemantic KernelHaystack
LanguagesPython, JS/TSPython, TSC#, Python, JavaPython
Primary StrengthGeneral orchestration, agentsData indexing & RAGEnterprise & Azure integrationSearch & retrieval pipelines
Best ForComplex agent workflowsData-heavy RAG apps.NET / Azure teamsProduction search systems
Agent SupportExcellent (LangGraph)Good (tool agents)Good (planners)Basic
EcosystemLargest (3k+ integrations)Large (160+ connectors)Azure-focusedGrowing
ObservabilityLangSmithBuilt-in callbacksAzure MonitorPipeline tracing
Learning CurveModerate (many abstractions)ModerateModerateLow (simple pipeline model)
Production ReadyYes (LangServe)YesYesYes

Building a Custom Orchestration Layer

Sometimes frameworks add unnecessary complexity. Here is a lightweight custom orchestration layer that handles model routing, retries, and fallbacks without any framework dependencies:

Python — Custom Orchestration Layer (No Framework)
import asyncio
import time
from dataclasses import dataclass, field
from typing import Any, Callable, Optional
from anthropic import AsyncAnthropic
from openai import AsyncOpenAI

@dataclass
class ModelConfig:
    provider: str          # "anthropic" or "openai"
    model: str             # model name
    max_tokens: int = 4096
    temperature: float = 0.0

@dataclass
class PipelineStep:
    name: str
    model: ModelConfig
    prompt_template: str
    output_parser: Optional[Callable] = None
    fallback_model: Optional[ModelConfig] = None
    max_retries: int = 3
    retry_delay: float = 1.0

class MultiModelOrchestrator:
    def __init__(self):
        self.anthropic = AsyncAnthropic()
        self.openai = AsyncOpenAI()
        self.steps: list[PipelineStep] = []
        self.trace: list[dict] = []

    def add_step(self, step: PipelineStep):
        self.steps.append(step)
        return self  # Enable chaining

    async def _call_model(self, config: ModelConfig, prompt: str) -> str:
        if config.provider == "anthropic":
            response = await self.anthropic.messages.create(
                model=config.model,
                max_tokens=config.max_tokens,
                temperature=config.temperature,
                messages=[{"role": "user", "content": prompt}],
            )
            return response.content[0].text
        elif config.provider == "openai":
            response = await self.openai.chat.completions.create(
                model=config.model,
                max_tokens=config.max_tokens,
                temperature=config.temperature,
                messages=[{"role": "user", "content": prompt}],
            )
            return response.choices[0].message.content

    async def _execute_step(self, step: PipelineStep, context: dict) -> str:
        prompt = step.prompt_template.format(**context)
        for attempt in range(step.max_retries):
            try:
                start = time.time()
                result = await self._call_model(step.model, prompt)
                elapsed = time.time() - start
                self.trace.append({
                    "step": step.name, "model": step.model.model,
                    "latency_ms": round(elapsed * 1000), "attempt": attempt + 1,
                })
                if step.output_parser:
                    result = step.output_parser(result)
                return result
            except Exception as e:
                if attempt == step.max_retries - 1 and step.fallback_model:
                    return await self._call_model(step.fallback_model, prompt)
                await asyncio.sleep(step.retry_delay * (attempt + 1))
        raise RuntimeError(f"Step '{step.name}' failed after {step.max_retries} retries")

    async def run(self, initial_context: dict) -> dict:
        context = {**initial_context}
        for step in self.steps:
            result = await self._execute_step(step, context)
            context[step.name] = result
        context["_trace"] = self.trace
        return context

# Usage: 3-step pipeline with different models
pipeline = MultiModelOrchestrator()
pipeline.add_step(PipelineStep(
    name="research",
    model=ModelConfig(provider="anthropic", model="claude-sonnet-4-20250514"),
    prompt_template="Research this topic thoroughly: {topic}",
)).add_step(PipelineStep(
    name="draft",
    model=ModelConfig(provider="openai", model="gpt-4o"),
    prompt_template="Based on this research:\n{research}\n\nWrite a blog post about: {topic}",
)).add_step(PipelineStep(
    name="review",
    model=ModelConfig(provider="anthropic", model="claude-opus-4-20250514"),
    prompt_template="Review and improve this draft for accuracy and clarity:\n{draft}",
    fallback_model=ModelConfig(provider="openai", model="gpt-4o"),
))

result = asyncio.run(pipeline.run({"topic": "Vector databases for AI applications"}))
print(result["review"])  # Final reviewed blog post
print(result["_trace"])  # Execution trace with latencies

When to Use a Framework vs Build Your Own

Use a framework when:
  • You need many integrations (vector stores, tools, model providers)
  • Your team wants established patterns and community support
  • You need observability tools like LangSmith or Haystack pipeline tracing
  • Rapid prototyping is more important than performance optimization
Build your own when:
  • You have specific performance requirements the framework cannot meet
  • Your pipeline is simple (2-3 steps) and you want minimal dependencies
  • You need fine-grained control over retries, caching, and error handling
  • Framework abstractions hide important details for your use case

Error Handling & Retry Strategies

Multi-model pipelines have more failure points than single-model calls. Key strategies include:

  • Exponential backoff: Increase delay between retries (1s, 2s, 4s, 8s) to avoid overwhelming rate-limited APIs.
  • Model fallbacks: If Claude is unavailable, fall back to GPT-4o. If GPT-4o fails, try a smaller model.
  • Circuit breakers: After N consecutive failures, stop trying a model for a cooldown period.
  • Partial results: If step 3 of 5 fails, return what you have rather than failing the entire pipeline.
  • Timeout budgets: Allocate time budgets to each step. If step 1 takes 5s of a 10s budget, step 2 only gets 5s.

Observability: Monitoring Multi-Model Pipelines

Debugging a multi-step AI pipeline requires visibility into every model call, its latency, token usage, and output quality.

Key Observability Tools

  • LangSmith: LangChain's tracing and evaluation platform. Records every LLM call, chain execution, and agent decision for debugging and optimization.
  • Weights & Biases (W&B): Tracks experiments, model performance, and prompt iterations. Integrates with most frameworks.
  • Phoenix (Arize): Open-source observability for LLM applications. Visualizes traces, evaluates retrieval quality, and monitors embeddings drift.
  • OpenTelemetry: Standard tracing protocol. Many frameworks emit OTEL spans that you can send to any observability backend (Datadog, Grafana, etc.).
Python — LangSmith Tracing Example
import os
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-api-key"
os.environ["LANGCHAIN_PROJECT"] = "multi-model-app"

# All LangChain calls are now automatically traced
# View traces at https://smith.langchain.com
# Each trace shows: input, output, latency, tokens, cost, child spans

from langsmith import traceable

# You can also trace custom functions
@traceable(name="my-custom-pipeline")
def my_pipeline(query: str) -> str:
    # Step 1: Retrieve
    docs = retrieve_documents(query)
    # Step 2: Generate
    response = generate_answer(query, docs)
    # Step 3: Validate
    validated = validate_response(response)
    return validated