Enhancements & Best Practices
You have built a complete multi-agent system. Now let us make it production-grade with parallel execution, streaming output, deployment strategies, and proven patterns for scaling multi-agent workflows.
Parallel Agent Execution
When the supervisor determines that multiple agents can work independently on subtasks, run them in parallel to reduce latency:
# graph/parallel_workflow.py
"""Multi-agent workflow with parallel execution support."""
import asyncio
from langgraph.graph import StateGraph, START, END
from langchain_core.messages import AIMessage
from agents.state import AgentState
from agents.supervisor import supervisor_node
from agents.researcher import researcher_node
from agents.coder import coder_node
from agents.analyst import analyst_node
async def parallel_agents_node(state: AgentState) -> dict:
"""Run multiple agents in parallel when tasks are independent.
The supervisor can set next_agent to 'parallel:researcher,coder'
to run both agents simultaneously.
"""
next_agent = state.get("next_agent", "")
if not next_agent.startswith("parallel:"):
# Not a parallel task - should not reach here
return state
# Parse which agents to run
agent_names = next_agent.replace("parallel:", "").split(",")
agent_names = [a.strip() for a in agent_names]
agent_map = {
"researcher": researcher_node,
"coder": coder_node,
"analyst": analyst_node,
}
# Run agents concurrently
tasks = []
for name in agent_names:
if name in agent_map:
# Wrap sync functions in async
tasks.append(asyncio.to_thread(agent_map[name], state))
results = await asyncio.gather(*tasks, return_exceptions=True)
# Merge results
merged_results = state.get("results", {})
merged_messages = []
for i, result in enumerate(results):
if isinstance(result, Exception):
merged_messages.append(
AIMessage(content=f"Agent {agent_names[i]} failed: {str(result)}")
)
elif isinstance(result, dict):
if "results" in result:
merged_results.update(result["results"])
if "messages" in result:
merged_messages.extend(result["messages"])
return {
"messages": merged_messages,
"results": merged_results,
}
Streaming Output
Stream agent responses token-by-token so users see progress in real time:
# graph/streaming.py
"""Stream workflow execution step-by-step."""
from graph.workflow import build_workflow
def stream_workflow(task: str):
"""Generator that yields workflow events as they happen.
Usage:
for event in stream_workflow("Research Python frameworks"):
print(event)
"""
app = build_workflow()
initial_state = {
"messages": [],
"next_agent": "",
"task": task,
"results": {},
"status": "in_progress",
"iteration": 0,
}
# stream() yields each node's output as it completes
for step in app.stream(initial_state):
for node_name, node_output in step.items():
yield {
"node": node_name,
"status": node_output.get("status", "in_progress"),
"next_agent": node_output.get("next_agent", ""),
"messages": [
m.content for m in node_output.get("messages", [])
],
}
# --- FastAPI streaming endpoint ---
async def stream_endpoint_example():
"""Example FastAPI endpoint that streams workflow events via SSE."""
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import json
app = FastAPI()
@app.post("/api/workflow/stream")
async def stream_workflow_api(request: dict):
task = request.get("task", "")
async def event_generator():
for event in stream_workflow(task):
yield f"data: {json.dumps(event)}\n\n"
yield "data: {\"done\": true}\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream"
)
Deployment Strategies
There are several ways to deploy a multi-agent workflow in production:
Option 1: Monolithic API (Simplest)
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["uvicorn", "api:app", "--host", "0.0.0.0", "--port", "8000"]
# docker-compose.yml
version: "3.8"
services:
workflow-api:
build: .
ports:
- "8000:8000"
env_file:
- .env
restart: unless-stopped
Option 2: Agent-per-Service (Scalable)
# Deploy each agent as a separate microservice
# The supervisor calls agents via HTTP instead of in-process
# agent_service.py - generic agent service
from fastapi import FastAPI
app = FastAPI()
@app.post("/run")
async def run_agent(request: dict):
"""Run the agent and return results."""
from agents.researcher import researcher_node # or coder, analyst
state = request.get("state", {})
result = researcher_node(state)
return result
# Benefits:
# - Scale agents independently (more coder instances for code-heavy workloads)
# - Different resource requirements per agent
# - Independent deployments and updates
# - Fault isolation (one agent crashing doesn't affect others)
Option 3: LangGraph Cloud (Managed)
# langgraph.json - LangGraph Cloud configuration
{
"dependencies": ["requirements.txt"],
"graphs": {
"multi_agent": {
"module": "graph.workflow",
"function": "build_workflow"
}
},
"env": ".env"
}
# Deploy with:
# langgraph deploy --config langgraph.json
# Benefits:
# - Managed infrastructure (no Docker, no servers)
# - Built-in checkpointing and state persistence
# - Automatic scaling
# - Integrated with LangSmith for monitoring
Scaling Patterns
Model Tiering
Use cheaper models for simple tasks and expensive models for complex reasoning:
# agents/model_selector.py
"""Dynamic model selection based on task complexity."""
from langchain_openai import ChatOpenAI
def get_model_for_task(task: str, agent_type: str) -> ChatOpenAI:
"""Select the appropriate model based on task complexity.
Simple tasks (tool calls, summarization) -> gpt-4o-mini ($0.15/1M input)
Complex tasks (multi-step reasoning) -> gpt-4o ($2.50/1M input)
"""
# Use cheaper model for routine operations
if agent_type in ("researcher", "analyst"):
model = "gpt-4o-mini"
elif agent_type == "supervisor":
# Supervisor needs good reasoning for routing decisions
model = "gpt-4o-mini" # Still sufficient for routing
elif agent_type == "coder":
# Use a better model for code generation
model = "gpt-4o-mini" # Upgrade to gpt-4o for complex code
else:
model = "gpt-4o-mini"
return ChatOpenAI(model=model, temperature=0)
Caching
# monitoring/cache.py
"""Simple caching layer for repeated queries."""
import hashlib
import json
from functools import lru_cache
class AgentCache:
"""Cache agent results for identical inputs."""
def __init__(self, max_size: int = 1000):
self._cache = {}
self._max_size = max_size
def _key(self, agent_name: str, task: str) -> str:
content = f"{agent_name}:{task}"
return hashlib.sha256(content.encode()).hexdigest()
def get(self, agent_name: str, task: str):
key = self._key(agent_name, task)
return self._cache.get(key)
def set(self, agent_name: str, task: str, result: dict):
if len(self._cache) >= self._max_size:
# Remove oldest entry (simple FIFO)
oldest = next(iter(self._cache))
del self._cache[oldest]
key = self._key(agent_name, task)
self._cache[key] = result
# Usage in agent nodes:
# cache = AgentCache()
# cached = cache.get("researcher", state["task"])
# if cached:
# return cached
# result = run_agent(state)
# cache.set("researcher", state["task"], result)
# return result
Best Practices Checklist
- Keep agents focused. Each agent should have one clear responsibility. A "do everything" agent is just a chatbot with extra steps.
- Limit tools per agent. 3-5 tools per agent is ideal. Too many tools confuse the LLM and increase hallucinated tool calls.
- Set iteration limits. Always cap the supervisor loop (we used 10). Infinite loops burn tokens and time.
- Use structured output for routing. Pydantic models for supervisor decisions prevent parsing errors.
- Test agents individually first. Verify each agent works alone before wiring them into the graph.
- Log everything. Every agent call, tool invocation, and routing decision should be logged or traced.
- Gate irreversible actions. Code execution, API mutations, and file writes should require approval in production.
- Cache repeated queries. If the same research question comes up, return the cached result.
- Monitor costs per agent. One runaway agent can consume your entire API budget in minutes.
- Graceful degradation. If an agent fails, return partial results rather than nothing.
Frequently Asked Questions
How many agents should I have?
Start with 2-3 agents. Add more only when you have a clear specialization that does not fit existing agents. Most production systems use 3-5 agents. More agents mean more routing complexity and higher latency.
Should I use LangGraph or CrewAI?
LangGraph gives you full control over the graph structure, state management, and routing logic. CrewAI provides higher-level abstractions with less code but less flexibility. Use LangGraph when you need fine-grained control over agent interactions. Use CrewAI for simpler workflows where the built-in patterns fit your needs.
How do I handle agent disagreements?
The supervisor is the arbiter. If two agents produce conflicting results, the supervisor should route to a third agent to verify, or present both results to the human reviewer. You can also add a "validator" agent that cross-checks outputs.
What about long-running tasks?
Use LangGraph's checkpointing to persist state across interrupts. For tasks that take minutes, use background execution with a webhook notification when the workflow completes. The MemorySaver checkpointer stores state in memory; for production, use SqliteSaver or PostgresSaver for persistence across restarts.
How do I add a new agent?
- Create the agent file in
agents/with a system prompt, tools, and node function. - Add the agent as a node in
graph/workflow.py. - Add a route from the supervisor to the new agent (conditional edge).
- Add an edge back from the new agent to the supervisor.
- Update the supervisor's system prompt to include the new agent in its team description.
- Update
graph/routing.pyto handle the new agent name.
What is the cost of a typical workflow run?
With gpt-4o-mini, a 5-iteration workflow (supervisor + 3 agent calls) typically costs $0.001-$0.005 per run. With gpt-4o, the same workflow costs $0.01-$0.05. The main cost driver is context length — keep agent prompts concise and truncate large results before passing them back to the supervisor.
Can I use open-source models instead of OpenAI?
Yes. LangChain supports any model with a ChatModel interface. Replace ChatOpenAI with ChatOllama (local), ChatAnthropic, ChatGoogleGenerativeAI, or any other provider. The agent patterns work identically — you only change the model instantiation line. Note that smaller models may struggle with tool calling and structured output.
Complete Project Summary
Here is everything you built in this course:
Lesson 1: Project Setup
Architecture overview, LangGraph fundamentals, project structure, dependency installation, environment configuration.
Lesson 2: Single Agent
ReAct agents with tools, shared state schema, researcher/coder/analyst agents, individual agent testing.
Lesson 3: Tool Infrastructure
Web search, sandboxed code execution, file I/O, HTTP API client, tool registry organized by agent role.
Lesson 4: Multi-Agent Orchestration
Supervisor with structured output, conditional routing, result aggregation, complete LangGraph StateGraph.
Lesson 5: Human-in-the-Loop
Approval gates, LangGraph interrupts, checkpointing with MemorySaver, structured feedback collection.
Lesson 6: Monitoring
LangSmith tracing, cost tracking per agent, structured error handling with retries, debug utilities.
Next Steps
Now that you have a working multi-agent system, here are directions to explore:
- Add memory: Give agents long-term memory with a vector store so they remember previous conversations and user preferences.
- Build a web UI: Create a chat interface that shows which agent is working and streams their progress in real time.
- Add evaluation: Use LangSmith datasets to systematically test and improve agent performance on known tasks.
- Try hierarchical agents: Build a supervisor that manages other supervisors for complex organizational structures.
- Explore MCP (Model Context Protocol): Connect your agents to external tools and data sources via the standardized MCP protocol.