Intermediate

Step 2: Tool Infrastructure

Agents are only as capable as their tools. In this lesson, you will build a production-quality tool library with web search, sandboxed code execution, file I/O, and HTTP API integrations. These tools are shared across all agents in the system.

Tool Design Principles

Good tools follow these principles:

  • Clear docstrings: The LLM reads the docstring to decide when and how to use the tool. Vague descriptions cause wrong tool selections.
  • Typed arguments: Use Python type hints. The LLM uses these to construct correct function calls.
  • Graceful errors: Never raise exceptions. Return error strings so the agent can reason about failures and retry.
  • Safety guards: Sandbox file operations, limit execution time, validate inputs before processing.
  • Deterministic when possible: Same inputs should produce same outputs. Avoid hidden state.

Web Search Tool

A production-grade web search tool with Tavily and a fallback for when the API is unavailable:

# tools/search.py
"""Web search tool using Tavily API with fallback."""
import os
from datetime import datetime
from langchain_core.tools import tool


@tool
def web_search(query: str, max_results: int = 5) -> str:
    """Search the web for current information.

    Use this tool when you need up-to-date information, facts about
    recent events, current statistics, or to verify claims.

    Args:
        query: The search query. Be specific for better results.
        max_results: Number of results to return (1-10).

    Returns:
        Formatted search results with titles, URLs, and snippets.
    """
    max_results = max(1, min(max_results, 10))

    # Try Tavily first
    tavily_key = os.getenv("TAVILY_API_KEY")
    if tavily_key:
        try:
            from tavily import TavilyClient
            client = TavilyClient(api_key=tavily_key)
            response = client.search(query, max_results=max_results)

            results = []
            for i, r in enumerate(response.get("results", []), 1):
                results.append(
                    f"[{i}] {r['title']}\n"
                    f"    URL: {r['url']}\n"
                    f"    {r['content'][:300]}"
                )
            if results:
                header = f"Search results for: '{query}' ({datetime.now().strftime('%Y-%m-%d')})\n"
                return header + "\n\n".join(results)
            return f"No results found for: '{query}'"

        except Exception as e:
            return f"Tavily search failed ({str(e)}). Use your training knowledge instead."

    return (
        f"No search API configured (TAVILY_API_KEY not set). "
        f"Please answer '{query}' using your training knowledge and "
        f"clearly indicate the information may not be current."
    )


@tool
def web_search_with_context(query: str, context: str = "") -> str:
    """Search the web with additional context for better results.

    Use this when you need to refine a search based on previous findings
    or when the initial search was too broad.

    Args:
        query: The search query.
        context: Additional context to refine the search.

    Returns:
        Formatted search results.
    """
    refined_query = f"{query} {context}".strip() if context else query
    return web_search.invoke({"query": refined_query, "max_results": 5})

Code Execution Tool

A sandboxed Python execution environment with timeout protection and output capture:

# tools/code_executor.py
"""Sandboxed Python code execution tool."""
import sys
import io
import traceback
import signal
from contextlib import contextmanager
from langchain_core.tools import tool


class TimeoutError(Exception):
    """Raised when code execution exceeds the time limit."""
    pass


@contextmanager
def time_limit(seconds: int):
    """Context manager that raises TimeoutError after N seconds.
    Note: signal.alarm only works on Unix. On Windows, use threading.
    """
    try:
        signal.signal(signal.SIGALRM, lambda s, f: (_ for _ in ()).throw(TimeoutError("Timed out")))
        signal.alarm(seconds)
        yield
        signal.alarm(0)
    except AttributeError:
        # Windows fallback - no timeout protection
        yield


# Dangerous modules that should not be importable
BLOCKED_MODULES = {
    "subprocess", "shutil", "pathlib", "ctypes",
    "socket", "http.server", "ftplib", "smtplib",
}


@tool
def execute_python(code: str, timeout_seconds: int = 30) -> str:
    """Execute Python code and return the output.

    The code runs in a sandboxed environment. Dangerous operations
    like file deletion, network servers, and subprocess calls are blocked.

    Args:
        code: Python code to execute. Use print() to produce output.
        timeout_seconds: Maximum execution time (default 30 seconds).

    Returns:
        The stdout output, or an error message with traceback.
    """
    # Basic safety checks
    for module in BLOCKED_MODULES:
        if f"import {module}" in code or f"from {module}" in code:
            return f"Error: Import of '{module}' is blocked for security."

    if any(dangerous in code for dangerous in ["os.remove", "os.rmdir", "shutil.rmtree", "os.system"]):
        return "Error: Destructive operations are blocked."

    # Capture stdout and stderr
    old_stdout = sys.stdout
    old_stderr = sys.stderr
    sys.stdout = captured_out = io.StringIO()
    sys.stderr = captured_err = io.StringIO()

    try:
        with time_limit(timeout_seconds):
            exec_globals = {
                "__builtins__": __builtins__,
                "__name__": "__main__",
            }
            exec(code, exec_globals)

        stdout = captured_out.getvalue()
        stderr = captured_err.getvalue()

        output_parts = []
        if stdout:
            output_parts.append(stdout)
        if stderr:
            output_parts.append(f"[stderr]\n{stderr}")
        if not output_parts:
            output_parts.append("Code executed successfully (no output).")

        return "\n".join(output_parts)

    except TimeoutError:
        return f"Error: Code execution timed out after {timeout_seconds} seconds."
    except Exception as e:
        error_msg = traceback.format_exc()
        return f"Error: {type(e).__name__}: {str(e)}\n\nTraceback:\n{error_msg}"
    finally:
        sys.stdout = old_stdout
        sys.stderr = old_stderr


@tool
def execute_python_with_inputs(code: str, inputs: str = "{}") -> str:
    """Execute Python code with pre-defined input variables.

    Args:
        code: Python code to execute. Input variables are available in scope.
        inputs: JSON string of variable name-value pairs to inject.

    Returns:
        The stdout output, or an error message.
    """
    import json

    try:
        input_vars = json.loads(inputs)
    except json.JSONDecodeError:
        return "Error: 'inputs' must be a valid JSON string."

    # Prepend variable assignments to the code
    setup_lines = []
    for key, value in input_vars.items():
        setup_lines.append(f"{key} = {repr(value)}")

    full_code = "\n".join(setup_lines) + "\n\n" + code
    return execute_python.invoke({"code": full_code, "timeout_seconds": 30})

File I/O Tools

Safe file read and write operations restricted to a workspace directory:

# tools/file_io.py
"""File I/O tools with workspace sandboxing."""
import os
import json
from pathlib import Path
from langchain_core.tools import tool

# All file operations are restricted to this directory
WORKSPACE_DIR = os.getenv("AGENT_WORKSPACE", "./workspace")


def _safe_path(filepath: str) -> str:
    """Resolve a filepath to the workspace directory safely."""
    # Prevent directory traversal
    clean = os.path.normpath(filepath).lstrip(os.sep)
    if ".." in clean.split(os.sep):
        raise ValueError("Directory traversal not allowed.")
    full_path = os.path.join(WORKSPACE_DIR, clean)
    # Verify it is still under workspace
    if not os.path.abspath(full_path).startswith(os.path.abspath(WORKSPACE_DIR)):
        raise ValueError("Path escapes workspace directory.")
    return full_path


@tool
def read_file(filepath: str) -> str:
    """Read the contents of a file from the workspace.

    Args:
        filepath: Relative path within the workspace directory.

    Returns:
        File contents as a string, or an error message.
    """
    try:
        safe = _safe_path(filepath)
        if not os.path.exists(safe):
            return f"Error: File not found: {filepath}"
        with open(safe, "r", encoding="utf-8") as f:
            content = f.read()
        size = os.path.getsize(safe)
        return f"[{filepath} - {size} bytes]\n\n{content}"
    except ValueError as e:
        return f"Error: {str(e)}"
    except Exception as e:
        return f"Error reading file: {str(e)}"


@tool
def write_file(filepath: str, content: str) -> str:
    """Write content to a file in the workspace.

    Creates parent directories if they do not exist.

    Args:
        filepath: Relative path within the workspace directory.
        content: Content to write to the file.

    Returns:
        Confirmation message with file path and size.
    """
    try:
        safe = _safe_path(filepath)
        os.makedirs(os.path.dirname(safe), exist_ok=True)
        with open(safe, "w", encoding="utf-8") as f:
            f.write(content)
        size = os.path.getsize(safe)
        return f"Written: {filepath} ({size} bytes)"
    except ValueError as e:
        return f"Error: {str(e)}"
    except Exception as e:
        return f"Error writing file: {str(e)}"


@tool
def list_files(directory: str = ".") -> str:
    """List files in a workspace directory.

    Args:
        directory: Relative directory path (default: workspace root).

    Returns:
        A formatted list of files and directories.
    """
    try:
        safe = _safe_path(directory)
        if not os.path.isdir(safe):
            return f"Error: Not a directory: {directory}"

        entries = []
        for entry in sorted(os.listdir(safe)):
            full = os.path.join(safe, entry)
            if os.path.isdir(full):
                entries.append(f"  [DIR]  {entry}/")
            else:
                size = os.path.getsize(full)
                entries.append(f"  [FILE] {entry} ({size} bytes)")

        if not entries:
            return f"Directory '{directory}' is empty."
        return f"Contents of '{directory}':\n" + "\n".join(entries)
    except ValueError as e:
        return f"Error: {str(e)}"
    except Exception as e:
        return f"Error listing directory: {str(e)}"

HTTP API Client Tool

A generic HTTP client tool for calling external APIs:

# tools/api_client.py
"""HTTP API client tool for calling external services."""
import json
from langchain_core.tools import tool


@tool
def http_request(url: str, method: str = "GET", headers: str = "{}", body: str = "") -> str:
    """Make an HTTP request to an external API.

    Args:
        url: The full URL to request (must start with https://).
        method: HTTP method - GET, POST, PUT, DELETE.
        headers: JSON string of HTTP headers.
        body: Request body (for POST/PUT). JSON string or plain text.

    Returns:
        Response status code and body.
    """
    import httpx

    # Safety: only allow HTTPS
    if not url.startswith("https://"):
        return "Error: Only HTTPS URLs are allowed for security."

    method = method.upper()
    if method not in ("GET", "POST", "PUT", "DELETE"):
        return f"Error: Unsupported method: {method}. Use GET, POST, PUT, or DELETE."

    try:
        parsed_headers = json.loads(headers)
    except json.JSONDecodeError:
        return "Error: 'headers' must be a valid JSON string."

    try:
        with httpx.Client(timeout=30) as client:
            if method == "GET":
                response = client.get(url, headers=parsed_headers)
            elif method == "POST":
                response = client.post(url, headers=parsed_headers, content=body)
            elif method == "PUT":
                response = client.put(url, headers=parsed_headers, content=body)
            elif method == "DELETE":
                response = client.delete(url, headers=parsed_headers)

        # Format the response
        output = f"Status: {response.status_code}\n"

        # Try to format JSON response
        try:
            body_json = response.json()
            output += f"Body:\n{json.dumps(body_json, indent=2)}"
        except (json.JSONDecodeError, Exception):
            text = response.text[:2000]  # Limit response size
            output += f"Body:\n{text}"

        return output

    except httpx.TimeoutException:
        return "Error: Request timed out after 30 seconds."
    except httpx.ConnectError:
        return f"Error: Could not connect to {url}."
    except Exception as e:
        return f"Error: {type(e).__name__}: {str(e)}"

Tool Registry

Create a central registry so agents can pick tools by category:

# tools/__init__.py
"""Tool registry - central place to access all tools by category."""
from tools.search import web_search, web_search_with_context
from tools.code_executor import execute_python, execute_python_with_inputs
from tools.file_io import read_file, write_file, list_files
from tools.api_client import http_request


# Tool sets organized by agent role
RESEARCH_TOOLS = [web_search, web_search_with_context]
CODER_TOOLS = [execute_python, execute_python_with_inputs, read_file, write_file, list_files]
ANALYST_TOOLS = [execute_python, read_file, write_file, list_files, http_request]
ALL_TOOLS = [
    web_search, web_search_with_context,
    execute_python, execute_python_with_inputs,
    read_file, write_file, list_files,
    http_request,
]


def get_tools_for_agent(agent_type: str) -> list:
    """Get the appropriate tool set for an agent type.

    Args:
        agent_type: One of 'researcher', 'coder', 'analyst', 'all'.

    Returns:
        List of tool functions for that agent type.
    """
    registry = {
        "researcher": RESEARCH_TOOLS,
        "coder": CODER_TOOLS,
        "analyst": ANALYST_TOOLS,
        "all": ALL_TOOLS,
    }
    return registry.get(agent_type, ALL_TOOLS)

Test the Tools

# tests/test_tools.py
"""Test each tool independently."""
from tools.search import web_search
from tools.code_executor import execute_python
from tools.file_io import read_file, write_file, list_files
from tools.api_client import http_request
from tools import get_tools_for_agent


def test_code_execution():
    """Test the sandboxed code executor."""
    # Happy path
    result = execute_python.invoke({"code": "print(2 + 2)"})
    assert "4" in result
    print(f"Code execution OK: {result.strip()}")

    # Error handling
    result = execute_python.invoke({"code": "import subprocess"})
    assert "blocked" in result.lower()
    print(f"Security block OK: {result.strip()}")

    # Timeout (would need a long-running code to test properly)
    print("Code execution tests passed!")


def test_file_io():
    """Test file read/write/list."""
    import os
    os.makedirs("./workspace", exist_ok=True)

    result = write_file.invoke({"filepath": "test.txt", "content": "Hello, agents!"})
    assert "Written" in result
    print(f"File write OK: {result}")

    result = read_file.invoke({"filepath": "test.txt"})
    assert "Hello, agents!" in result
    print(f"File read OK")

    result = list_files.invoke({"directory": "."})
    assert "test.txt" in result
    print(f"File list OK")


def test_tool_registry():
    """Test the tool registry."""
    researcher_tools = get_tools_for_agent("researcher")
    assert len(researcher_tools) == 2
    coder_tools = get_tools_for_agent("coder")
    assert len(coder_tools) == 5
    print(f"Registry OK: researcher={len(researcher_tools)}, coder={len(coder_tools)} tools")


if __name__ == "__main__":
    test_code_execution()
    test_file_io()
    test_tool_registry()
    print("\nAll tool tests passed!")
💡
Production tip: In a real deployment, replace the exec()-based code executor with a containerized sandbox like E2B or Modal Sandboxes. The exec() approach is fine for development and learning but should not run untrusted code in production.

Key Takeaways

  • Tools are the interface between agents and the real world. Well-designed tools with clear docstrings lead to fewer agent errors.
  • Every tool returns strings (never raises exceptions) so the agent can reason about failures.
  • Security is built into each tool: workspace sandboxing, HTTPS-only, blocked dangerous modules, and timeout protection.
  • The tool registry organizes tools by agent role. Each agent only sees tools relevant to its specialization.

What Is Next

In the next lesson, you will build the multi-agent orchestration layer — the LangGraph StateGraph that connects the supervisor to all worker agents with conditional routing and handoff logic.