Advanced

Step 3: Multi-Agent Orchestration

This is the core of the system. You will build a supervisor agent that analyzes incoming tasks, routes them to the right worker agent, collects results, and decides when the workflow is complete. Everything is wired together using a LangGraph StateGraph.

The Supervisor Agent

The supervisor is the brain of the multi-agent system. It does not do the actual work — it decides who should do the work and when the task is complete.

# agents/supervisor.py
"""Supervisor agent - routes tasks to specialized worker agents."""
import os
import json
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage, AIMessage
from pydantic import BaseModel, Field
from typing import Literal

from agents.state import AgentState


# --- Structured output for routing decisions ---

class SupervisorDecision(BaseModel):
    """The supervisor's routing decision."""
    next_agent: Literal["researcher", "coder", "analyst", "FINISH"]
    reasoning: str = Field(description="Why this agent was chosen")
    task_for_agent: str = Field(description="Specific task instruction for the selected agent")


# --- System prompt ---

SUPERVISOR_PROMPT = """You are a Supervisor Agent that coordinates a team of specialized AI agents.

Your team:
1. RESEARCHER - Searches the web, finds information, verifies facts, provides citations.
2. CODER - Writes Python code, executes it, debugs errors, handles files.
3. ANALYST - Analyzes data, identifies trends, generates structured reports.

Your job:
- Analyze the user's request and break it into subtasks
- Route each subtask to the most appropriate agent
- Review agent results and decide if more work is needed
- Return FINISH when the task is fully complete

Rules:
1. Route to ONE agent at a time. You will see their result before deciding the next step.
2. Be specific in your task instructions - tell the agent exactly what to do.
3. If an agent's result is incomplete or wrong, you can send the task back with corrections.
4. Maximum 10 routing iterations to prevent infinite loops.
5. Always route to FINISH when you have a satisfactory answer.

Decision format (you MUST respond with valid JSON):
{
  "next_agent": "researcher" | "coder" | "analyst" | "FINISH",
  "reasoning": "Why this agent is the best choice",
  "task_for_agent": "Specific instruction for the agent"
}"""


def supervisor_node(state: AgentState) -> dict:
    """LangGraph node that makes routing decisions.

    The supervisor examines the current state (task + any previous agent results)
    and decides which agent to call next, or FINISH if done.
    """
    llm = ChatOpenAI(
        model=os.getenv("OPENAI_MODEL", "gpt-4o-mini"),
        temperature=0
    )

    # Build context from current state
    context_parts = [f"User task: {state['task']}"]

    # Include results from previous agent runs
    results = state.get("results", {})
    if results:
        context_parts.append("\n--- Previous agent results ---")
        for agent_name, result in results.items():
            # Truncate long results to keep context manageable
            truncated = result[:2000] + "..." if len(result) > 2000 else result
            context_parts.append(f"\n[{agent_name}]:\n{truncated}")

    # Check iteration count
    iteration = state.get("iteration", 0)
    if iteration >= 10:
        return {
            "next_agent": "FINISH",
            "status": "completed",
            "iteration": iteration,
            "messages": [AIMessage(content="Maximum iterations reached. Returning best available results.")]
        }

    # Get structured decision from the LLM
    messages = [
        SystemMessage(content=SUPERVISOR_PROMPT),
        HumanMessage(content="\n".join(context_parts))
    ]

    # Use structured output for reliable routing
    structured_llm = llm.with_structured_output(SupervisorDecision)

    try:
        decision = structured_llm.invoke(messages)
    except Exception as e:
        # Fallback: if structured output fails, finish gracefully
        return {
            "next_agent": "FINISH",
            "status": "error",
            "iteration": iteration + 1,
            "messages": [AIMessage(content=f"Supervisor error: {str(e)}. Returning available results.")]
        }

    # Update state based on decision
    new_status = "completed" if decision.next_agent == "FINISH" else "in_progress"

    return {
        "next_agent": decision.next_agent,
        "task": decision.task_for_agent if decision.next_agent != "FINISH" else state["task"],
        "status": new_status,
        "iteration": iteration + 1,
        "messages": [AIMessage(content=f"Supervisor: {decision.reasoning} -> {decision.next_agent}")]
    }

The Routing Logic

Conditional edges in LangGraph determine which node runs next based on the current state:

# graph/routing.py
"""Conditional routing logic for the multi-agent workflow."""
from agents.state import AgentState


def route_from_supervisor(state: AgentState) -> str:
    """Determine which node to execute next based on the supervisor's decision.

    This function is used as a conditional edge in the LangGraph StateGraph.
    It reads the 'next_agent' field from the state and returns the
    corresponding node name.

    Args:
        state: Current workflow state.

    Returns:
        The name of the next node to execute.
    """
    next_agent = state.get("next_agent", "FINISH")

    if next_agent == "FINISH":
        return "aggregate_results"

    # Map agent names to graph node names
    agent_map = {
        "researcher": "researcher",
        "coder": "coder",
        "analyst": "analyst",
    }

    return agent_map.get(next_agent, "aggregate_results")

Result Aggregation

When the supervisor decides the workflow is complete, an aggregation node assembles the final response:

# graph/workflow.py (partial - aggregation node)

def aggregate_results(state: AgentState) -> dict:
    """Aggregate results from all agents into a final response.

    This node runs when the supervisor routes to FINISH.
    It combines all agent outputs into a coherent final answer.
    """
    results = state.get("results", {})

    if not results:
        return {
            "messages": [AIMessage(content="No results were generated.")],
            "status": "completed"
        }

    # Build a structured summary
    summary_parts = ["## Workflow Results\n"]

    for agent_name, result in results.items():
        summary_parts.append(f"### {agent_name.title()} Agent\n{result}\n")

    summary_parts.append(f"\n---\n*Completed in {state.get('iteration', 0)} iterations*")

    final_response = "\n".join(summary_parts)

    return {
        "messages": [AIMessage(content=final_response)],
        "status": "completed"
    }

Wire It All Together

Now build the complete LangGraph StateGraph that connects everything:

# graph/workflow.py
"""Complete multi-agent workflow graph using LangGraph."""
import os
from langgraph.graph import StateGraph, START, END
from langchain_core.messages import AIMessage

from agents.state import AgentState
from agents.supervisor import supervisor_node
from agents.researcher import researcher_node
from agents.coder import coder_node
from agents.analyst import analyst_node
from graph.routing import route_from_supervisor


def aggregate_results(state: AgentState) -> dict:
    """Aggregate results from all agents into a final response."""
    results = state.get("results", {})

    if not results:
        return {
            "messages": [AIMessage(content="No results were generated.")],
            "status": "completed"
        }

    summary_parts = ["## Workflow Results\n"]
    for agent_name, result in results.items():
        summary_parts.append(f"### {agent_name.title()} Agent\n{result}\n")
    summary_parts.append(f"\n---\n*Completed in {state.get('iteration', 0)} iterations*")

    return {
        "messages": [AIMessage(content="\n".join(summary_parts))],
        "status": "completed"
    }


def build_workflow() -> StateGraph:
    """Build and compile the multi-agent workflow graph.

    Returns:
        A compiled LangGraph application ready to invoke.

    Graph structure:
        START -> supervisor -> (conditional) -> researcher/coder/analyst
                    ^                                    |
                    |                                    |
                    +-------- back to supervisor ---------+
                    |
                    v
             aggregate_results -> END
    """
    # Create the graph with our state schema
    workflow = StateGraph(AgentState)

    # --- Add nodes ---
    workflow.add_node("supervisor", supervisor_node)
    workflow.add_node("researcher", researcher_node)
    workflow.add_node("coder", coder_node)
    workflow.add_node("analyst", analyst_node)
    workflow.add_node("aggregate_results", aggregate_results)

    # --- Add edges ---

    # Entry point: always start with the supervisor
    workflow.add_edge(START, "supervisor")

    # Supervisor routes to agents or to aggregation
    workflow.add_conditional_edges(
        "supervisor",
        route_from_supervisor,
        {
            "researcher": "researcher",
            "coder": "coder",
            "analyst": "analyst",
            "aggregate_results": "aggregate_results",
        }
    )

    # After each agent runs, go back to the supervisor for the next decision
    workflow.add_edge("researcher", "supervisor")
    workflow.add_edge("coder", "supervisor")
    workflow.add_edge("analyst", "supervisor")

    # Aggregation leads to END
    workflow.add_edge("aggregate_results", END)

    # Compile the graph
    app = workflow.compile()

    return app


# --- Entry point ---

def run_workflow(task: str) -> dict:
    """Run the multi-agent workflow with a given task.

    Args:
        task: The user's task description.

    Returns:
        The final state after workflow completion.
    """
    app = build_workflow()

    initial_state = {
        "messages": [],
        "next_agent": "",
        "task": task,
        "results": {},
        "status": "in_progress",
        "iteration": 0,
    }

    # Run the workflow
    final_state = app.invoke(initial_state)

    return final_state

The Main Entry Point

# main.py
"""Entry point for the multi-agent workflow."""
import os
from dotenv import load_dotenv
from rich.console import Console
from rich.panel import Panel
from rich.markdown import Markdown

load_dotenv()
console = Console()


def main():
    from graph.workflow import run_workflow

    console.print(Panel(
        "[bold blue]Multi-Agent Workflow[/bold blue]\n"
        "Enter a task and the agents will collaborate to complete it.\n"
        "Type 'quit' to exit.",
        title="Welcome"
    ))

    while True:
        task = console.input("\n[bold green]Task:[/bold green] ").strip()
        if task.lower() in ("quit", "exit", "q"):
            break
        if not task:
            continue

        console.print(f"\n[dim]Running workflow for: {task}[/dim]\n")

        try:
            result = run_workflow(task)

            # Display the final response
            final_messages = result.get("messages", [])
            if final_messages:
                last_message = final_messages[-1].content
                console.print(Panel(
                    Markdown(last_message),
                    title=f"Result (iterations: {result.get('iteration', '?')})",
                    border_style="green"
                ))
            else:
                console.print("[yellow]No response generated.[/yellow]")

        except Exception as e:
            console.print(f"[red]Error: {e}[/red]")


if __name__ == "__main__":
    main()

Test the Complete Workflow

# tests/test_workflow.py
"""Test the complete multi-agent workflow."""
import os
from dotenv import load_dotenv

load_dotenv()


def test_research_task():
    """Test a pure research task."""
    from graph.workflow import run_workflow

    result = run_workflow("What is LangGraph and how does it compare to CrewAI?")

    assert result["status"] == "completed"
    assert "researcher" in result.get("results", {})
    print(f"Research task OK - iterations: {result['iteration']}")
    print(f"Result preview: {result['results']['researcher'][:200]}")


def test_coding_task():
    """Test a coding task."""
    from graph.workflow import run_workflow

    result = run_workflow("Write a Python function to merge two sorted lists and test it.")

    assert result["status"] == "completed"
    assert "coder" in result.get("results", {})
    print(f"\nCoding task OK - iterations: {result['iteration']}")
    print(f"Result preview: {result['results']['coder'][:200]}")


def test_multi_agent_task():
    """Test a task that requires multiple agents."""
    from graph.workflow import run_workflow

    result = run_workflow(
        "Research the latest Python web frameworks, write code to compare "
        "their request handling speed with a simple benchmark, and create "
        "a summary report of the findings."
    )

    assert result["status"] == "completed"
    print(f"\nMulti-agent task OK - iterations: {result['iteration']}")
    print(f"Agents used: {list(result.get('results', {}).keys())}")


if __name__ == "__main__":
    test_research_task()
    test_coding_task()
    test_multi_agent_task()
    print("\nAll workflow tests passed!")
# Run the complete system
python main.py

# Example interaction:
# Task: Research the top 3 Python async frameworks, write a comparison table, and summarize
#
# The supervisor will:
# 1. Route to researcher -> search for Python async frameworks
# 2. Route to coder -> create a comparison table in code
# 3. Route to analyst -> synthesize a summary report
# 4. Route to FINISH -> aggregate all results
📝
Checkpoint: The workflow should complete in 3-6 iterations for multi-agent tasks. If it runs more than 10 iterations, the safety limit kicks in. If the supervisor keeps routing to the same agent, check the system prompt — it may need clearer stop conditions.

Visualizing the Graph

LangGraph can render the workflow as a diagram:

# Visualize the workflow graph
from graph.workflow import build_workflow

app = build_workflow()

# Print the graph structure as ASCII
print(app.get_graph().draw_ascii())

# Or save as a PNG image (requires graphviz)
# app.get_graph().draw_png("workflow_graph.png")

Key Takeaways

  • The supervisor uses structured output (SupervisorDecision) for reliable, parseable routing decisions.
  • Conditional edges (add_conditional_edges) let the graph branch dynamically based on the supervisor's choice.
  • After each worker agent runs, control returns to the supervisor for the next decision — this is the feedback loop.
  • A maximum iteration limit (10) prevents infinite loops when the supervisor cannot decide to stop.
  • The aggregation node assembles all agent results into a single coherent response.

What Is Next

In the next lesson, you will add human-in-the-loop controls — approval gates that pause the workflow for human review before executing critical actions.