Advanced

Enhancements & Best Practices

You have built a complete multi-agent system. Now let us make it production-grade with parallel execution, streaming output, deployment strategies, and proven patterns for scaling multi-agent workflows.

Parallel Agent Execution

When the supervisor determines that multiple agents can work independently on subtasks, run them in parallel to reduce latency:

# graph/parallel_workflow.py
"""Multi-agent workflow with parallel execution support."""
import asyncio
from langgraph.graph import StateGraph, START, END
from langchain_core.messages import AIMessage

from agents.state import AgentState
from agents.supervisor import supervisor_node
from agents.researcher import researcher_node
from agents.coder import coder_node
from agents.analyst import analyst_node


async def parallel_agents_node(state: AgentState) -> dict:
    """Run multiple agents in parallel when tasks are independent.

    The supervisor can set next_agent to 'parallel:researcher,coder'
    to run both agents simultaneously.
    """
    next_agent = state.get("next_agent", "")

    if not next_agent.startswith("parallel:"):
        # Not a parallel task - should not reach here
        return state

    # Parse which agents to run
    agent_names = next_agent.replace("parallel:", "").split(",")
    agent_names = [a.strip() for a in agent_names]

    agent_map = {
        "researcher": researcher_node,
        "coder": coder_node,
        "analyst": analyst_node,
    }

    # Run agents concurrently
    tasks = []
    for name in agent_names:
        if name in agent_map:
            # Wrap sync functions in async
            tasks.append(asyncio.to_thread(agent_map[name], state))

    results = await asyncio.gather(*tasks, return_exceptions=True)

    # Merge results
    merged_results = state.get("results", {})
    merged_messages = []

    for i, result in enumerate(results):
        if isinstance(result, Exception):
            merged_messages.append(
                AIMessage(content=f"Agent {agent_names[i]} failed: {str(result)}")
            )
        elif isinstance(result, dict):
            if "results" in result:
                merged_results.update(result["results"])
            if "messages" in result:
                merged_messages.extend(result["messages"])

    return {
        "messages": merged_messages,
        "results": merged_results,
    }
💡
When to parallelize. Run agents in parallel when their tasks are independent (e.g., "research topic A" and "write code for feature B"). Do not parallelize when one agent's output is the input for another (e.g., "research, then write code based on findings").

Streaming Output

Stream agent responses token-by-token so users see progress in real time:

# graph/streaming.py
"""Stream workflow execution step-by-step."""
from graph.workflow import build_workflow


def stream_workflow(task: str):
    """Generator that yields workflow events as they happen.

    Usage:
        for event in stream_workflow("Research Python frameworks"):
            print(event)
    """
    app = build_workflow()

    initial_state = {
        "messages": [],
        "next_agent": "",
        "task": task,
        "results": {},
        "status": "in_progress",
        "iteration": 0,
    }

    # stream() yields each node's output as it completes
    for step in app.stream(initial_state):
        for node_name, node_output in step.items():
            yield {
                "node": node_name,
                "status": node_output.get("status", "in_progress"),
                "next_agent": node_output.get("next_agent", ""),
                "messages": [
                    m.content for m in node_output.get("messages", [])
                ],
            }


# --- FastAPI streaming endpoint ---

async def stream_endpoint_example():
    """Example FastAPI endpoint that streams workflow events via SSE."""
    from fastapi import FastAPI
    from fastapi.responses import StreamingResponse
    import json

    app = FastAPI()

    @app.post("/api/workflow/stream")
    async def stream_workflow_api(request: dict):
        task = request.get("task", "")

        async def event_generator():
            for event in stream_workflow(task):
                yield f"data: {json.dumps(event)}\n\n"
            yield "data: {\"done\": true}\n\n"

        return StreamingResponse(
            event_generator(),
            media_type="text/event-stream"
        )

Deployment Strategies

There are several ways to deploy a multi-agent workflow in production:

Option 1: Monolithic API (Simplest)

# Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["uvicorn", "api:app", "--host", "0.0.0.0", "--port", "8000"]

# docker-compose.yml
version: "3.8"
services:
  workflow-api:
    build: .
    ports:
      - "8000:8000"
    env_file:
      - .env
    restart: unless-stopped

Option 2: Agent-per-Service (Scalable)

# Deploy each agent as a separate microservice
# The supervisor calls agents via HTTP instead of in-process

# agent_service.py - generic agent service
from fastapi import FastAPI

app = FastAPI()

@app.post("/run")
async def run_agent(request: dict):
    """Run the agent and return results."""
    from agents.researcher import researcher_node  # or coder, analyst

    state = request.get("state", {})
    result = researcher_node(state)
    return result

# Benefits:
# - Scale agents independently (more coder instances for code-heavy workloads)
# - Different resource requirements per agent
# - Independent deployments and updates
# - Fault isolation (one agent crashing doesn't affect others)

Option 3: LangGraph Cloud (Managed)

# langgraph.json - LangGraph Cloud configuration
{
  "dependencies": ["requirements.txt"],
  "graphs": {
    "multi_agent": {
      "module": "graph.workflow",
      "function": "build_workflow"
    }
  },
  "env": ".env"
}

# Deploy with:
# langgraph deploy --config langgraph.json

# Benefits:
# - Managed infrastructure (no Docker, no servers)
# - Built-in checkpointing and state persistence
# - Automatic scaling
# - Integrated with LangSmith for monitoring

Scaling Patterns

Model Tiering

Use cheaper models for simple tasks and expensive models for complex reasoning:

# agents/model_selector.py
"""Dynamic model selection based on task complexity."""
from langchain_openai import ChatOpenAI


def get_model_for_task(task: str, agent_type: str) -> ChatOpenAI:
    """Select the appropriate model based on task complexity.

    Simple tasks (tool calls, summarization) -> gpt-4o-mini ($0.15/1M input)
    Complex tasks (multi-step reasoning)     -> gpt-4o ($2.50/1M input)
    """
    # Use cheaper model for routine operations
    if agent_type in ("researcher", "analyst"):
        model = "gpt-4o-mini"
    elif agent_type == "supervisor":
        # Supervisor needs good reasoning for routing decisions
        model = "gpt-4o-mini"  # Still sufficient for routing
    elif agent_type == "coder":
        # Use a better model for code generation
        model = "gpt-4o-mini"  # Upgrade to gpt-4o for complex code
    else:
        model = "gpt-4o-mini"

    return ChatOpenAI(model=model, temperature=0)

Caching

# monitoring/cache.py
"""Simple caching layer for repeated queries."""
import hashlib
import json
from functools import lru_cache


class AgentCache:
    """Cache agent results for identical inputs."""

    def __init__(self, max_size: int = 1000):
        self._cache = {}
        self._max_size = max_size

    def _key(self, agent_name: str, task: str) -> str:
        content = f"{agent_name}:{task}"
        return hashlib.sha256(content.encode()).hexdigest()

    def get(self, agent_name: str, task: str):
        key = self._key(agent_name, task)
        return self._cache.get(key)

    def set(self, agent_name: str, task: str, result: dict):
        if len(self._cache) >= self._max_size:
            # Remove oldest entry (simple FIFO)
            oldest = next(iter(self._cache))
            del self._cache[oldest]
        key = self._key(agent_name, task)
        self._cache[key] = result

# Usage in agent nodes:
# cache = AgentCache()
# cached = cache.get("researcher", state["task"])
# if cached:
#     return cached
# result = run_agent(state)
# cache.set("researcher", state["task"], result)
# return result

Best Practices Checklist

  • Keep agents focused. Each agent should have one clear responsibility. A "do everything" agent is just a chatbot with extra steps.
  • Limit tools per agent. 3-5 tools per agent is ideal. Too many tools confuse the LLM and increase hallucinated tool calls.
  • Set iteration limits. Always cap the supervisor loop (we used 10). Infinite loops burn tokens and time.
  • Use structured output for routing. Pydantic models for supervisor decisions prevent parsing errors.
  • Test agents individually first. Verify each agent works alone before wiring them into the graph.
  • Log everything. Every agent call, tool invocation, and routing decision should be logged or traced.
  • Gate irreversible actions. Code execution, API mutations, and file writes should require approval in production.
  • Cache repeated queries. If the same research question comes up, return the cached result.
  • Monitor costs per agent. One runaway agent can consume your entire API budget in minutes.
  • Graceful degradation. If an agent fails, return partial results rather than nothing.

Frequently Asked Questions

How many agents should I have?

Start with 2-3 agents. Add more only when you have a clear specialization that does not fit existing agents. Most production systems use 3-5 agents. More agents mean more routing complexity and higher latency.

Should I use LangGraph or CrewAI?

LangGraph gives you full control over the graph structure, state management, and routing logic. CrewAI provides higher-level abstractions with less code but less flexibility. Use LangGraph when you need fine-grained control over agent interactions. Use CrewAI for simpler workflows where the built-in patterns fit your needs.

How do I handle agent disagreements?

The supervisor is the arbiter. If two agents produce conflicting results, the supervisor should route to a third agent to verify, or present both results to the human reviewer. You can also add a "validator" agent that cross-checks outputs.

What about long-running tasks?

Use LangGraph's checkpointing to persist state across interrupts. For tasks that take minutes, use background execution with a webhook notification when the workflow completes. The MemorySaver checkpointer stores state in memory; for production, use SqliteSaver or PostgresSaver for persistence across restarts.

How do I add a new agent?

  1. Create the agent file in agents/ with a system prompt, tools, and node function.
  2. Add the agent as a node in graph/workflow.py.
  3. Add a route from the supervisor to the new agent (conditional edge).
  4. Add an edge back from the new agent to the supervisor.
  5. Update the supervisor's system prompt to include the new agent in its team description.
  6. Update graph/routing.py to handle the new agent name.

What is the cost of a typical workflow run?

With gpt-4o-mini, a 5-iteration workflow (supervisor + 3 agent calls) typically costs $0.001-$0.005 per run. With gpt-4o, the same workflow costs $0.01-$0.05. The main cost driver is context length — keep agent prompts concise and truncate large results before passing them back to the supervisor.

Can I use open-source models instead of OpenAI?

Yes. LangChain supports any model with a ChatModel interface. Replace ChatOpenAI with ChatOllama (local), ChatAnthropic, ChatGoogleGenerativeAI, or any other provider. The agent patterns work identically — you only change the model instantiation line. Note that smaller models may struggle with tool calling and structured output.

Complete Project Summary

Here is everything you built in this course:

Lesson 1: Project Setup

Architecture overview, LangGraph fundamentals, project structure, dependency installation, environment configuration.

Lesson 2: Single Agent

ReAct agents with tools, shared state schema, researcher/coder/analyst agents, individual agent testing.

Lesson 3: Tool Infrastructure

Web search, sandboxed code execution, file I/O, HTTP API client, tool registry organized by agent role.

Lesson 4: Multi-Agent Orchestration

Supervisor with structured output, conditional routing, result aggregation, complete LangGraph StateGraph.

Lesson 5: Human-in-the-Loop

Approval gates, LangGraph interrupts, checkpointing with MemorySaver, structured feedback collection.

Lesson 6: Monitoring

LangSmith tracing, cost tracking per agent, structured error handling with retries, debug utilities.

Next Steps

Now that you have a working multi-agent system, here are directions to explore:

  • Add memory: Give agents long-term memory with a vector store so they remember previous conversations and user preferences.
  • Build a web UI: Create a chat interface that shows which agent is working and streams their progress in real time.
  • Add evaluation: Use LangSmith datasets to systematically test and improve agent performance on known tasks.
  • Try hierarchical agents: Build a supervisor that manages other supervisors for complex organizational structures.
  • Explore MCP (Model Context Protocol): Connect your agents to external tools and data sources via the standardized MCP protocol.
💡
Congratulations! You have built a production-quality multi-agent workflow from scratch. The patterns you learned — supervisor routing, tool infrastructure, human approval gates, and cost-aware monitoring — are the same ones used in production agent systems at leading AI companies.