Orchestration Frameworks
Multi-model AI applications require coordinating LLMs, embedding models, retrieval systems, and external tools into cohesive pipelines. Orchestration frameworks provide the abstractions and plumbing to make this manageable.
What Is Model Orchestration?
Model orchestration is the process of coordinating multiple AI models, data sources, and tools within a single application pipeline. Instead of calling one model and returning its output, an orchestrated system might:
- Route a user query to the appropriate model based on complexity or domain
- Chain multiple models together — one for retrieval, another for generation, a third for fact-checking
- Manage memory across conversation turns and model calls
- Handle errors with retries, fallbacks, and graceful degradation
- Observe the entire pipeline for debugging, cost tracking, and performance monitoring
LangChain Deep Dive
LangChain is the most widely adopted orchestration framework, with a massive ecosystem and active community. It provides abstractions for every component of an LLM application.
Core Concepts
- Chains: Sequences of operations (prompt → LLM → output parser). The fundamental building block.
- Agents: LLM-driven decision makers that choose which tools to use and in what order.
- Tools: Functions the agent can call — search, calculators, databases, APIs, code execution.
- Memory: Conversation history management — buffer, summary, vector-backed, and entity memory.
- Callbacks: Hooks into every step of execution for logging, streaming, and monitoring.
- LCEL (LangChain Expression Language): A declarative syntax for composing chains using the pipe (
|) operator.
LCEL: LangChain Expression Language
LCEL is the modern way to build chains in LangChain. It uses a pipe syntax inspired by Unix, making chain composition readable and composable:
from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import StrOutputParser from langchain_openai import ChatOpenAI from langchain_anthropic import ChatAnthropic # Define the prompt template prompt = ChatPromptTemplate.from_messages([ ("system", "You are a helpful assistant that explains {topic} concepts clearly."), ("human", "{question}"), ]) # Create the chain using LCEL pipe syntax chain = prompt | ChatOpenAI(model="gpt-4o") | StrOutputParser() # Invoke the chain result = chain.invoke({ "topic": "machine learning", "question": "What is gradient descent?" }) # Streaming is built in for chunk in chain.stream({"topic": "AI", "question": "Explain transformers"}): print(chunk, end="") # Batch processing results = chain.batch([ {"topic": "NLP", "question": "What is tokenization?"}, {"topic": "CV", "question": "How do CNNs work?"}, ])
LangChain Agent with Multiple Tools
Agents are the most powerful LangChain pattern. The LLM decides which tools to use based on the user's request:
from langchain_openai import ChatOpenAI from langchain.agents import create_tool_calling_agent, AgentExecutor from langchain_core.prompts import ChatPromptTemplate from langchain_core.tools import tool from langchain_community.tools import DuckDuckGoSearchRun import subprocess # Define custom tools @tool def calculator(expression: str) -> str: """Evaluate a mathematical expression. Input should be a valid Python math expression.""" try: result = eval(expression, {"__builtins__": {}}, {"__import__": None}) return str(result) except Exception as e: return f"Error: {e}" @tool def run_python_code(code: str) -> str: """Execute Python code and return the output. Use for data analysis or computations.""" try: result = subprocess.run( ["python", "-c", code], capture_output=True, text=True, timeout=30 ) return result.stdout or result.stderr except subprocess.TimeoutExpired: return "Code execution timed out (30s limit)" # Web search tool search = DuckDuckGoSearchRun() # Assemble tools tools = [search, calculator, run_python_code] # Create the prompt prompt = ChatPromptTemplate.from_messages([ ("system", """You are a helpful research assistant. You have access to: - Web search for finding current information - A calculator for math expressions - A Python executor for data analysis and complex computations Always show your reasoning and cite sources when using search."""), ("human", "{input}"), ("placeholder", "{agent_scratchpad}"), ]) # Create and run the agent llm = ChatOpenAI(model="gpt-4o", temperature=0) agent = create_tool_calling_agent(llm, tools, prompt) executor = AgentExecutor(agent=agent, tools=tools, verbose=True) # The agent decides which tools to use result = executor.invoke({ "input": "What is the current population of Tokyo? Calculate what percentage it is of Japan's total population." }) print(result["output"])
LlamaIndex Deep Dive
LlamaIndex specializes in connecting LLMs to your data. While LangChain is a general-purpose orchestration framework, LlamaIndex excels at retrieval-augmented generation (RAG) and structured data access.
Core Concepts
- Data Connectors: Ingest data from 160+ sources — PDFs, databases, APIs, Slack, Notion, Google Drive.
- Indexes: Structure your data for efficient retrieval — vector, keyword, tree, knowledge graph indexes.
- Query Engines: Interfaces for querying your indexed data with natural language.
- Routers: Intelligently route queries to the best index or data source.
- Agents: LlamaIndex agents that combine tool use with deep data retrieval capabilities.
LlamaIndex RAG with Sub-Question Query Engine
The sub-question query engine breaks complex queries into simpler sub-questions, routes each to the appropriate data source, and synthesizes the results:
from llama_index.core import ( VectorStoreIndex, SimpleDirectoryReader, ServiceContext, ) from llama_index.core.query_engine import SubQuestionQueryEngine from llama_index.core.tools import QueryEngineTool, ToolMetadata from llama_index.llms.openai import OpenAI from llama_index.embeddings.openai import OpenAIEmbedding # Configure LLM and embedding model llm = OpenAI(model="gpt-4o", temperature=0) embed_model = OpenAIEmbedding(model="text-embedding-3-small") # Load documents from different sources finance_docs = SimpleDirectoryReader("./data/finance").load_data() engineering_docs = SimpleDirectoryReader("./data/engineering").load_data() marketing_docs = SimpleDirectoryReader("./data/marketing").load_data() # Create separate indexes for each domain finance_index = VectorStoreIndex.from_documents( finance_docs, embed_model=embed_model ) engineering_index = VectorStoreIndex.from_documents( engineering_docs, embed_model=embed_model ) marketing_index = VectorStoreIndex.from_documents( marketing_docs, embed_model=embed_model ) # Create query engines finance_engine = finance_index.as_query_engine(llm=llm, similarity_top_k=5) engineering_engine = engineering_index.as_query_engine(llm=llm, similarity_top_k=5) marketing_engine = marketing_index.as_query_engine(llm=llm, similarity_top_k=5) # Wrap as tools with descriptions query_engine_tools = [ QueryEngineTool( query_engine=finance_engine, metadata=ToolMetadata( name="finance_data", description="Financial reports, revenue data, budgets, and forecasts", ), ), QueryEngineTool( query_engine=engineering_engine, metadata=ToolMetadata( name="engineering_data", description="Technical docs, architecture decisions, sprint reports", ), ), QueryEngineTool( query_engine=marketing_engine, metadata=ToolMetadata( name="marketing_data", description="Campaign results, brand guidelines, market research", ), ), ] # Sub-question engine decomposes complex queries sub_question_engine = SubQuestionQueryEngine.from_defaults( query_engine_tools=query_engine_tools, llm=llm, ) # Complex query that spans multiple data sources response = sub_question_engine.query( "Compare our Q3 revenue growth with engineering headcount changes " "and marketing spend. Are we scaling efficiently?" ) # The engine will: # 1. Break into sub-questions: "What was Q3 revenue growth?", "How did engineering headcount change?", etc. # 2. Route each sub-question to the right index # 3. Synthesize a unified answer print(response)
Semantic Kernel (Microsoft)
Semantic Kernel is Microsoft's open-source SDK for building AI agents and multi-model applications. It integrates deeply with the Azure ecosystem and supports C#, Python, and Java.
Key Features
- Plugins: Modular functions (native code or LLM prompts) that the kernel can orchestrate.
- Planners: Automatically create execution plans by combining available plugins to achieve a goal.
- Connectors: Integrations with OpenAI, Azure OpenAI, Hugging Face, and other model providers.
- Memory: Built-in semantic memory using embeddings and vector stores.
import semantic_kernel as sk from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion from semantic_kernel.functions import kernel_function # Initialize the kernel kernel = sk.Kernel() # Add Azure OpenAI service kernel.add_service(AzureChatCompletion( deployment_name="gpt-4o", endpoint="https://your-resource.openai.azure.com/", api_key="your-api-key", )) # Define a native plugin class WeatherPlugin: @kernel_function(description="Get the current weather for a city") def get_weather(self, city: str) -> str: # In production, call a real weather API return f"Weather in {city}: 72F, sunny, humidity 45%" @kernel_function(description="Get a 5-day weather forecast") def get_forecast(self, city: str) -> str: return f"5-day forecast for {city}: Mon 70F, Tue 72F, Wed 68F, Thu 75F, Fri 71F" # Register the plugin kernel.add_plugin(WeatherPlugin(), plugin_name="weather") # The kernel can now use these functions via the planner or direct invocation result = await kernel.invoke_prompt( "What's the weather like in Seattle and should I bring an umbrella this week?", function_choice_behavior="auto", )
Haystack (deepset)
Haystack by deepset is a production-grade framework focused on building search and RAG pipelines. It uses a pipeline-based architecture where components are connected like building blocks.
from haystack import Pipeline from haystack.components.retrievers.in_memory import InMemoryBM25Retriever from haystack.components.builders import PromptBuilder from haystack.components.generators import OpenAIGenerator from haystack.document_stores.in_memory import InMemoryDocumentStore from haystack import Document # Create document store and add documents doc_store = InMemoryDocumentStore() doc_store.write_documents([ Document(content="vLLM is a fast inference engine for LLMs using PagedAttention."), Document(content="TGI by Hugging Face provides production-ready LLM serving."), Document(content="BentoML allows packaging models as production services."), ]) # Define the prompt template template = """ Given the following documents, answer the question. Documents: {% for doc in documents %} - {{ doc.content }} {% endfor %} Question: {{ question }} Answer: """ # Build the pipeline pipe = Pipeline() pipe.add_component("retriever", InMemoryBM25Retriever(document_store=doc_store)) pipe.add_component("prompt_builder", PromptBuilder(template=template)) pipe.add_component("llm", OpenAIGenerator(model="gpt-4o-mini")) # Connect components pipe.connect("retriever", "prompt_builder.documents") pipe.connect("prompt_builder", "llm") # Run the pipeline result = pipe.run({ "retriever": {"query": "What is vLLM?"}, "prompt_builder": {"question": "What is vLLM and why is it fast?"}, }) print(result["llm"]["replies"][0])
Framework Comparison
| Feature | LangChain | LlamaIndex | Semantic Kernel | Haystack |
|---|---|---|---|---|
| Languages | Python, JS/TS | Python, TS | C#, Python, Java | Python |
| Primary Strength | General orchestration, agents | Data indexing & RAG | Enterprise & Azure integration | Search & retrieval pipelines |
| Best For | Complex agent workflows | Data-heavy RAG apps | .NET / Azure teams | Production search systems |
| Agent Support | Excellent (LangGraph) | Good (tool agents) | Good (planners) | Basic |
| Ecosystem | Largest (3k+ integrations) | Large (160+ connectors) | Azure-focused | Growing |
| Observability | LangSmith | Built-in callbacks | Azure Monitor | Pipeline tracing |
| Learning Curve | Moderate (many abstractions) | Moderate | Moderate | Low (simple pipeline model) |
| Production Ready | Yes (LangServe) | Yes | Yes | Yes |
Building a Custom Orchestration Layer
Sometimes frameworks add unnecessary complexity. Here is a lightweight custom orchestration layer that handles model routing, retries, and fallbacks without any framework dependencies:
import asyncio import time from dataclasses import dataclass, field from typing import Any, Callable, Optional from anthropic import AsyncAnthropic from openai import AsyncOpenAI @dataclass class ModelConfig: provider: str # "anthropic" or "openai" model: str # model name max_tokens: int = 4096 temperature: float = 0.0 @dataclass class PipelineStep: name: str model: ModelConfig prompt_template: str output_parser: Optional[Callable] = None fallback_model: Optional[ModelConfig] = None max_retries: int = 3 retry_delay: float = 1.0 class MultiModelOrchestrator: def __init__(self): self.anthropic = AsyncAnthropic() self.openai = AsyncOpenAI() self.steps: list[PipelineStep] = [] self.trace: list[dict] = [] def add_step(self, step: PipelineStep): self.steps.append(step) return self # Enable chaining async def _call_model(self, config: ModelConfig, prompt: str) -> str: if config.provider == "anthropic": response = await self.anthropic.messages.create( model=config.model, max_tokens=config.max_tokens, temperature=config.temperature, messages=[{"role": "user", "content": prompt}], ) return response.content[0].text elif config.provider == "openai": response = await self.openai.chat.completions.create( model=config.model, max_tokens=config.max_tokens, temperature=config.temperature, messages=[{"role": "user", "content": prompt}], ) return response.choices[0].message.content async def _execute_step(self, step: PipelineStep, context: dict) -> str: prompt = step.prompt_template.format(**context) for attempt in range(step.max_retries): try: start = time.time() result = await self._call_model(step.model, prompt) elapsed = time.time() - start self.trace.append({ "step": step.name, "model": step.model.model, "latency_ms": round(elapsed * 1000), "attempt": attempt + 1, }) if step.output_parser: result = step.output_parser(result) return result except Exception as e: if attempt == step.max_retries - 1 and step.fallback_model: return await self._call_model(step.fallback_model, prompt) await asyncio.sleep(step.retry_delay * (attempt + 1)) raise RuntimeError(f"Step '{step.name}' failed after {step.max_retries} retries") async def run(self, initial_context: dict) -> dict: context = {**initial_context} for step in self.steps: result = await self._execute_step(step, context) context[step.name] = result context["_trace"] = self.trace return context # Usage: 3-step pipeline with different models pipeline = MultiModelOrchestrator() pipeline.add_step(PipelineStep( name="research", model=ModelConfig(provider="anthropic", model="claude-sonnet-4-20250514"), prompt_template="Research this topic thoroughly: {topic}", )).add_step(PipelineStep( name="draft", model=ModelConfig(provider="openai", model="gpt-4o"), prompt_template="Based on this research:\n{research}\n\nWrite a blog post about: {topic}", )).add_step(PipelineStep( name="review", model=ModelConfig(provider="anthropic", model="claude-opus-4-20250514"), prompt_template="Review and improve this draft for accuracy and clarity:\n{draft}", fallback_model=ModelConfig(provider="openai", model="gpt-4o"), )) result = asyncio.run(pipeline.run({"topic": "Vector databases for AI applications"})) print(result["review"]) # Final reviewed blog post print(result["_trace"]) # Execution trace with latencies
When to Use a Framework vs Build Your Own
- You need many integrations (vector stores, tools, model providers)
- Your team wants established patterns and community support
- You need observability tools like LangSmith or Haystack pipeline tracing
- Rapid prototyping is more important than performance optimization
- You have specific performance requirements the framework cannot meet
- Your pipeline is simple (2-3 steps) and you want minimal dependencies
- You need fine-grained control over retries, caching, and error handling
- Framework abstractions hide important details for your use case
Error Handling & Retry Strategies
Multi-model pipelines have more failure points than single-model calls. Key strategies include:
- Exponential backoff: Increase delay between retries (1s, 2s, 4s, 8s) to avoid overwhelming rate-limited APIs.
- Model fallbacks: If Claude is unavailable, fall back to GPT-4o. If GPT-4o fails, try a smaller model.
- Circuit breakers: After N consecutive failures, stop trying a model for a cooldown period.
- Partial results: If step 3 of 5 fails, return what you have rather than failing the entire pipeline.
- Timeout budgets: Allocate time budgets to each step. If step 1 takes 5s of a 10s budget, step 2 only gets 5s.
Observability: Monitoring Multi-Model Pipelines
Debugging a multi-step AI pipeline requires visibility into every model call, its latency, token usage, and output quality.
Key Observability Tools
- LangSmith: LangChain's tracing and evaluation platform. Records every LLM call, chain execution, and agent decision for debugging and optimization.
- Weights & Biases (W&B): Tracks experiments, model performance, and prompt iterations. Integrates with most frameworks.
- Phoenix (Arize): Open-source observability for LLM applications. Visualizes traces, evaluates retrieval quality, and monitors embeddings drift.
- OpenTelemetry: Standard tracing protocol. Many frameworks emit OTEL spans that you can send to any observability backend (Datadog, Grafana, etc.).
import os os.environ["LANGCHAIN_TRACING_V2"] = "true" os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-api-key" os.environ["LANGCHAIN_PROJECT"] = "multi-model-app" # All LangChain calls are now automatically traced # View traces at https://smith.langchain.com # Each trace shows: input, output, latency, tokens, cost, child spans from langsmith import traceable # You can also trace custom functions @traceable(name="my-custom-pipeline") def my_pipeline(query: str) -> str: # Step 1: Retrieve docs = retrieve_documents(query) # Step 2: Generate response = generate_answer(query, docs) # Step 3: Validate validated = validate_response(response) return validated