Step 3: Multi-Agent Orchestration
This is the core of the system. You will build a supervisor agent that analyzes incoming tasks, routes them to the right worker agent, collects results, and decides when the workflow is complete. Everything is wired together using a LangGraph StateGraph.
The Supervisor Agent
The supervisor is the brain of the multi-agent system. It does not do the actual work — it decides who should do the work and when the task is complete.
# agents/supervisor.py
"""Supervisor agent - routes tasks to specialized worker agents."""
import os
import json
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage, AIMessage
from pydantic import BaseModel, Field
from typing import Literal
from agents.state import AgentState
# --- Structured output for routing decisions ---
class SupervisorDecision(BaseModel):
"""The supervisor's routing decision."""
next_agent: Literal["researcher", "coder", "analyst", "FINISH"]
reasoning: str = Field(description="Why this agent was chosen")
task_for_agent: str = Field(description="Specific task instruction for the selected agent")
# --- System prompt ---
SUPERVISOR_PROMPT = """You are a Supervisor Agent that coordinates a team of specialized AI agents.
Your team:
1. RESEARCHER - Searches the web, finds information, verifies facts, provides citations.
2. CODER - Writes Python code, executes it, debugs errors, handles files.
3. ANALYST - Analyzes data, identifies trends, generates structured reports.
Your job:
- Analyze the user's request and break it into subtasks
- Route each subtask to the most appropriate agent
- Review agent results and decide if more work is needed
- Return FINISH when the task is fully complete
Rules:
1. Route to ONE agent at a time. You will see their result before deciding the next step.
2. Be specific in your task instructions - tell the agent exactly what to do.
3. If an agent's result is incomplete or wrong, you can send the task back with corrections.
4. Maximum 10 routing iterations to prevent infinite loops.
5. Always route to FINISH when you have a satisfactory answer.
Decision format (you MUST respond with valid JSON):
{
"next_agent": "researcher" | "coder" | "analyst" | "FINISH",
"reasoning": "Why this agent is the best choice",
"task_for_agent": "Specific instruction for the agent"
}"""
def supervisor_node(state: AgentState) -> dict:
"""LangGraph node that makes routing decisions.
The supervisor examines the current state (task + any previous agent results)
and decides which agent to call next, or FINISH if done.
"""
llm = ChatOpenAI(
model=os.getenv("OPENAI_MODEL", "gpt-4o-mini"),
temperature=0
)
# Build context from current state
context_parts = [f"User task: {state['task']}"]
# Include results from previous agent runs
results = state.get("results", {})
if results:
context_parts.append("\n--- Previous agent results ---")
for agent_name, result in results.items():
# Truncate long results to keep context manageable
truncated = result[:2000] + "..." if len(result) > 2000 else result
context_parts.append(f"\n[{agent_name}]:\n{truncated}")
# Check iteration count
iteration = state.get("iteration", 0)
if iteration >= 10:
return {
"next_agent": "FINISH",
"status": "completed",
"iteration": iteration,
"messages": [AIMessage(content="Maximum iterations reached. Returning best available results.")]
}
# Get structured decision from the LLM
messages = [
SystemMessage(content=SUPERVISOR_PROMPT),
HumanMessage(content="\n".join(context_parts))
]
# Use structured output for reliable routing
structured_llm = llm.with_structured_output(SupervisorDecision)
try:
decision = structured_llm.invoke(messages)
except Exception as e:
# Fallback: if structured output fails, finish gracefully
return {
"next_agent": "FINISH",
"status": "error",
"iteration": iteration + 1,
"messages": [AIMessage(content=f"Supervisor error: {str(e)}. Returning available results.")]
}
# Update state based on decision
new_status = "completed" if decision.next_agent == "FINISH" else "in_progress"
return {
"next_agent": decision.next_agent,
"task": decision.task_for_agent if decision.next_agent != "FINISH" else state["task"],
"status": new_status,
"iteration": iteration + 1,
"messages": [AIMessage(content=f"Supervisor: {decision.reasoning} -> {decision.next_agent}")]
}
The Routing Logic
Conditional edges in LangGraph determine which node runs next based on the current state:
# graph/routing.py
"""Conditional routing logic for the multi-agent workflow."""
from agents.state import AgentState
def route_from_supervisor(state: AgentState) -> str:
"""Determine which node to execute next based on the supervisor's decision.
This function is used as a conditional edge in the LangGraph StateGraph.
It reads the 'next_agent' field from the state and returns the
corresponding node name.
Args:
state: Current workflow state.
Returns:
The name of the next node to execute.
"""
next_agent = state.get("next_agent", "FINISH")
if next_agent == "FINISH":
return "aggregate_results"
# Map agent names to graph node names
agent_map = {
"researcher": "researcher",
"coder": "coder",
"analyst": "analyst",
}
return agent_map.get(next_agent, "aggregate_results")
Result Aggregation
When the supervisor decides the workflow is complete, an aggregation node assembles the final response:
# graph/workflow.py (partial - aggregation node)
def aggregate_results(state: AgentState) -> dict:
"""Aggregate results from all agents into a final response.
This node runs when the supervisor routes to FINISH.
It combines all agent outputs into a coherent final answer.
"""
results = state.get("results", {})
if not results:
return {
"messages": [AIMessage(content="No results were generated.")],
"status": "completed"
}
# Build a structured summary
summary_parts = ["## Workflow Results\n"]
for agent_name, result in results.items():
summary_parts.append(f"### {agent_name.title()} Agent\n{result}\n")
summary_parts.append(f"\n---\n*Completed in {state.get('iteration', 0)} iterations*")
final_response = "\n".join(summary_parts)
return {
"messages": [AIMessage(content=final_response)],
"status": "completed"
}
Wire It All Together
Now build the complete LangGraph StateGraph that connects everything:
# graph/workflow.py
"""Complete multi-agent workflow graph using LangGraph."""
import os
from langgraph.graph import StateGraph, START, END
from langchain_core.messages import AIMessage
from agents.state import AgentState
from agents.supervisor import supervisor_node
from agents.researcher import researcher_node
from agents.coder import coder_node
from agents.analyst import analyst_node
from graph.routing import route_from_supervisor
def aggregate_results(state: AgentState) -> dict:
"""Aggregate results from all agents into a final response."""
results = state.get("results", {})
if not results:
return {
"messages": [AIMessage(content="No results were generated.")],
"status": "completed"
}
summary_parts = ["## Workflow Results\n"]
for agent_name, result in results.items():
summary_parts.append(f"### {agent_name.title()} Agent\n{result}\n")
summary_parts.append(f"\n---\n*Completed in {state.get('iteration', 0)} iterations*")
return {
"messages": [AIMessage(content="\n".join(summary_parts))],
"status": "completed"
}
def build_workflow() -> StateGraph:
"""Build and compile the multi-agent workflow graph.
Returns:
A compiled LangGraph application ready to invoke.
Graph structure:
START -> supervisor -> (conditional) -> researcher/coder/analyst
^ |
| |
+-------- back to supervisor ---------+
|
v
aggregate_results -> END
"""
# Create the graph with our state schema
workflow = StateGraph(AgentState)
# --- Add nodes ---
workflow.add_node("supervisor", supervisor_node)
workflow.add_node("researcher", researcher_node)
workflow.add_node("coder", coder_node)
workflow.add_node("analyst", analyst_node)
workflow.add_node("aggregate_results", aggregate_results)
# --- Add edges ---
# Entry point: always start with the supervisor
workflow.add_edge(START, "supervisor")
# Supervisor routes to agents or to aggregation
workflow.add_conditional_edges(
"supervisor",
route_from_supervisor,
{
"researcher": "researcher",
"coder": "coder",
"analyst": "analyst",
"aggregate_results": "aggregate_results",
}
)
# After each agent runs, go back to the supervisor for the next decision
workflow.add_edge("researcher", "supervisor")
workflow.add_edge("coder", "supervisor")
workflow.add_edge("analyst", "supervisor")
# Aggregation leads to END
workflow.add_edge("aggregate_results", END)
# Compile the graph
app = workflow.compile()
return app
# --- Entry point ---
def run_workflow(task: str) -> dict:
"""Run the multi-agent workflow with a given task.
Args:
task: The user's task description.
Returns:
The final state after workflow completion.
"""
app = build_workflow()
initial_state = {
"messages": [],
"next_agent": "",
"task": task,
"results": {},
"status": "in_progress",
"iteration": 0,
}
# Run the workflow
final_state = app.invoke(initial_state)
return final_state
The Main Entry Point
# main.py
"""Entry point for the multi-agent workflow."""
import os
from dotenv import load_dotenv
from rich.console import Console
from rich.panel import Panel
from rich.markdown import Markdown
load_dotenv()
console = Console()
def main():
from graph.workflow import run_workflow
console.print(Panel(
"[bold blue]Multi-Agent Workflow[/bold blue]\n"
"Enter a task and the agents will collaborate to complete it.\n"
"Type 'quit' to exit.",
title="Welcome"
))
while True:
task = console.input("\n[bold green]Task:[/bold green] ").strip()
if task.lower() in ("quit", "exit", "q"):
break
if not task:
continue
console.print(f"\n[dim]Running workflow for: {task}[/dim]\n")
try:
result = run_workflow(task)
# Display the final response
final_messages = result.get("messages", [])
if final_messages:
last_message = final_messages[-1].content
console.print(Panel(
Markdown(last_message),
title=f"Result (iterations: {result.get('iteration', '?')})",
border_style="green"
))
else:
console.print("[yellow]No response generated.[/yellow]")
except Exception as e:
console.print(f"[red]Error: {e}[/red]")
if __name__ == "__main__":
main()
Test the Complete Workflow
# tests/test_workflow.py
"""Test the complete multi-agent workflow."""
import os
from dotenv import load_dotenv
load_dotenv()
def test_research_task():
"""Test a pure research task."""
from graph.workflow import run_workflow
result = run_workflow("What is LangGraph and how does it compare to CrewAI?")
assert result["status"] == "completed"
assert "researcher" in result.get("results", {})
print(f"Research task OK - iterations: {result['iteration']}")
print(f"Result preview: {result['results']['researcher'][:200]}")
def test_coding_task():
"""Test a coding task."""
from graph.workflow import run_workflow
result = run_workflow("Write a Python function to merge two sorted lists and test it.")
assert result["status"] == "completed"
assert "coder" in result.get("results", {})
print(f"\nCoding task OK - iterations: {result['iteration']}")
print(f"Result preview: {result['results']['coder'][:200]}")
def test_multi_agent_task():
"""Test a task that requires multiple agents."""
from graph.workflow import run_workflow
result = run_workflow(
"Research the latest Python web frameworks, write code to compare "
"their request handling speed with a simple benchmark, and create "
"a summary report of the findings."
)
assert result["status"] == "completed"
print(f"\nMulti-agent task OK - iterations: {result['iteration']}")
print(f"Agents used: {list(result.get('results', {}).keys())}")
if __name__ == "__main__":
test_research_task()
test_coding_task()
test_multi_agent_task()
print("\nAll workflow tests passed!")
# Run the complete system
python main.py
# Example interaction:
# Task: Research the top 3 Python async frameworks, write a comparison table, and summarize
#
# The supervisor will:
# 1. Route to researcher -> search for Python async frameworks
# 2. Route to coder -> create a comparison table in code
# 3. Route to analyst -> synthesize a summary report
# 4. Route to FINISH -> aggregate all results
Visualizing the Graph
LangGraph can render the workflow as a diagram:
# Visualize the workflow graph
from graph.workflow import build_workflow
app = build_workflow()
# Print the graph structure as ASCII
print(app.get_graph().draw_ascii())
# Or save as a PNG image (requires graphviz)
# app.get_graph().draw_png("workflow_graph.png")
Key Takeaways
- The supervisor uses structured output (
SupervisorDecision) for reliable, parseable routing decisions. - Conditional edges (
add_conditional_edges) let the graph branch dynamically based on the supervisor's choice. - After each worker agent runs, control returns to the supervisor for the next decision — this is the feedback loop.
- A maximum iteration limit (10) prevents infinite loops when the supervisor cannot decide to stop.
- The aggregation node assembles all agent results into a single coherent response.
What Is Next
In the next lesson, you will add human-in-the-loop controls — approval gates that pause the workflow for human review before executing critical actions.