Intermediate

Step 2: LLM Conversation Engine

Build the brain of your voice assistant. You will create a dialog manager with conversation memory, define tools the assistant can call, implement streaming response generation, and wire everything together into a conversation engine powered by GPT-4o.

Conversation Memory

A voice assistant must remember what was said earlier in the conversation. We implement a sliding-window memory that keeps the last N turns and summarizes older context when the window is full.

# app/llm/conversation.py
"""Conversation memory and dialog management."""
import logging
from typing import Optional
from dataclasses import dataclass, field

logger = logging.getLogger(__name__)


@dataclass
class Message:
    """A single message in the conversation."""
    role: str          # "system", "user", "assistant", or "tool"
    content: str
    name: Optional[str] = None       # Tool name (for tool messages)
    tool_call_id: Optional[str] = None  # For tool response messages


class ConversationMemory:
    """Sliding-window conversation memory with summarization.

    Keeps the last `max_turns` exchanges in full detail.
    When the window overflows, older messages are summarized
    into a compact context string that is prepended to the
    system message.
    """

    def __init__(self, max_turns: int = 20, system_prompt: str = ""):
        self.max_turns = max_turns
        self.system_prompt = system_prompt
        self.messages: list[dict] = []
        self._summary: str = ""

    def get_system_message(self) -> dict:
        """Build the system message with optional summary context."""
        content = self.system_prompt

        if self._summary:
            content += (
                f"\n\n[Previous conversation summary: {self._summary}]"
            )

        return {"role": "system", "content": content}

    def add_user_message(self, text: str):
        """Add a user message to the conversation."""
        self.messages.append({"role": "user", "content": text})
        self._trim_if_needed()
        logger.debug(f"Added user message: '{text[:50]}...'")

    def add_assistant_message(self, text: str):
        """Add an assistant response to the conversation."""
        self.messages.append({"role": "assistant", "content": text})
        self._trim_if_needed()

    def add_tool_call(self, tool_call_id: str, name: str, arguments: str):
        """Record a tool call made by the assistant."""
        self.messages.append({
            "role": "assistant",
            "content": None,
            "tool_calls": [{
                "id": tool_call_id,
                "type": "function",
                "function": {"name": name, "arguments": arguments}
            }]
        })

    def add_tool_result(self, tool_call_id: str, name: str, result: str):
        """Add the result of a tool call."""
        self.messages.append({
            "role": "tool",
            "tool_call_id": tool_call_id,
            "name": name,
            "content": result
        })

    def get_messages(self) -> list[dict]:
        """Get the full message list for the API call."""
        return [self.get_system_message()] + self.messages

    def _trim_if_needed(self):
        """Trim old messages when the window overflows."""
        # Count user+assistant pairs as "turns"
        turn_count = sum(
            1 for m in self.messages if m["role"] == "user"
        )

        if turn_count > self.max_turns:
            # Summarize the oldest messages
            old_messages = self.messages[:4]  # First 2 turns
            summary_parts = []
            for msg in old_messages:
                role = msg["role"]
                content = msg.get("content", "")
                if content:
                    summary_parts.append(f"{role}: {content[:100]}")

            self._summary += " | ".join(summary_parts) + " | "

            # Remove the summarized messages
            self.messages = self.messages[4:]
            logger.debug(
                f"Trimmed conversation: {turn_count} turns -> "
                f"{turn_count - 2} turns"
            )

    def clear(self):
        """Clear the conversation history."""
        self.messages = []
        self._summary = ""

Tool Definitions

Tools let the assistant take real-world actions: check the weather, set a timer, search the web, or control smart devices. We define tools as OpenAI function-calling schemas:

# app/llm/tools.py
"""Tool definitions and execution for the voice assistant."""
import json
import logging
from datetime import datetime, timedelta
from typing import Any

logger = logging.getLogger(__name__)

# ============================================================
# Tool Schemas (OpenAI function calling format)
# ============================================================

TOOL_SCHEMAS = [
    {
        "type": "function",
        "function": {
            "name": "get_current_time",
            "description": "Get the current date and time.",
            "parameters": {
                "type": "object",
                "properties": {
                    "timezone": {
                        "type": "string",
                        "description": "Timezone (e.g., 'US/Eastern', 'Europe/London'). Defaults to UTC."
                    }
                },
                "required": []
            }
        }
    },
    {
        "type": "function",
        "function": {
            "name": "set_timer",
            "description": "Set a countdown timer for a specified duration.",
            "parameters": {
                "type": "object",
                "properties": {
                    "duration_seconds": {
                        "type": "integer",
                        "description": "Timer duration in seconds"
                    },
                    "label": {
                        "type": "string",
                        "description": "Optional label for the timer (e.g., 'pasta timer')"
                    }
                },
                "required": ["duration_seconds"]
            }
        }
    },
    {
        "type": "function",
        "function": {
            "name": "get_weather",
            "description": "Get the current weather for a location.",
            "parameters": {
                "type": "object",
                "properties": {
                    "location": {
                        "type": "string",
                        "description": "City name (e.g., 'San Francisco, CA')"
                    }
                },
                "required": ["location"]
            }
        }
    },
    {
        "type": "function",
        "function": {
            "name": "web_search",
            "description": "Search the web for current information.",
            "parameters": {
                "type": "object",
                "properties": {
                    "query": {
                        "type": "string",
                        "description": "The search query"
                    }
                },
                "required": ["query"]
            }
        }
    },
    {
        "type": "function",
        "function": {
            "name": "set_reminder",
            "description": "Set a reminder for a specific time.",
            "parameters": {
                "type": "object",
                "properties": {
                    "message": {
                        "type": "string",
                        "description": "The reminder message"
                    },
                    "minutes_from_now": {
                        "type": "integer",
                        "description": "Minutes from now to trigger the reminder"
                    }
                },
                "required": ["message", "minutes_from_now"]
            }
        }
    }
]


# ============================================================
# Tool Implementations
# ============================================================

# In-memory store for timers and reminders
_active_timers: list[dict] = []
_active_reminders: list[dict] = []


def execute_tool(name: str, arguments: str) -> str:
    """Execute a tool by name and return the result as a string.

    Args:
        name: Tool function name
        arguments: JSON string of arguments

    Returns:
        Result string to send back to the LLM
    """
    args = json.loads(arguments)
    logger.info(f"Executing tool: {name}({args})")

    if name == "get_current_time":
        return _get_current_time(args.get("timezone", "UTC"))
    elif name == "set_timer":
        return _set_timer(
            args["duration_seconds"],
            args.get("label", "Timer")
        )
    elif name == "get_weather":
        return _get_weather(args["location"])
    elif name == "web_search":
        return _web_search(args["query"])
    elif name == "set_reminder":
        return _set_reminder(
            args["message"],
            args["minutes_from_now"]
        )
    else:
        return f"Unknown tool: {name}"


def _get_current_time(timezone: str = "UTC") -> str:
    """Get the current time."""
    now = datetime.utcnow()
    return json.dumps({
        "time": now.strftime("%I:%M %p"),
        "date": now.strftime("%A, %B %d, %Y"),
        "timezone": timezone
    })


def _set_timer(duration_seconds: int, label: str = "Timer") -> str:
    """Set a countdown timer."""
    end_time = datetime.utcnow() + timedelta(seconds=duration_seconds)
    timer = {
        "label": label,
        "duration_seconds": duration_seconds,
        "end_time": end_time.isoformat()
    }
    _active_timers.append(timer)

    minutes = duration_seconds // 60
    seconds = duration_seconds % 60
    time_str = f"{minutes}m {seconds}s" if minutes else f"{seconds}s"

    return json.dumps({
        "status": "Timer set",
        "label": label,
        "duration": time_str,
        "end_time": end_time.strftime("%I:%M %p")
    })


def _get_weather(location: str) -> str:
    """Get weather for a location.

    NOTE: In production, integrate a real weather API
    (OpenWeatherMap, WeatherAPI, etc.). This is a stub.
    """
    # Stub response - replace with real API call
    return json.dumps({
        "location": location,
        "temperature": "72F / 22C",
        "condition": "Partly cloudy",
        "humidity": "45%",
        "wind": "10 mph",
        "note": "This is simulated data. Integrate a weather API for real results."
    })


def _web_search(query: str) -> str:
    """Search the web.

    NOTE: In production, integrate a real search API
    (SerpAPI, Brave Search, Tavily, etc.). This is a stub.
    """
    return json.dumps({
        "query": query,
        "results": [
            {"title": "Search result placeholder",
             "snippet": f"Results for: {query}",
             "url": "https://example.com"}
        ],
        "note": "This is simulated data. Integrate a search API for real results."
    })


def _set_reminder(message: str, minutes_from_now: int) -> str:
    """Set a reminder."""
    trigger_time = datetime.utcnow() + timedelta(minutes=minutes_from_now)
    reminder = {
        "message": message,
        "trigger_time": trigger_time.isoformat()
    }
    _active_reminders.append(reminder)

    return json.dumps({
        "status": "Reminder set",
        "message": message,
        "trigger_time": trigger_time.strftime("%I:%M %p")
    })

The Conversation Engine

The engine orchestrates everything: it takes a user transcript, sends it to GPT-4o with conversation history and tools, handles any tool calls, and streams the response text back.

# app/llm/engine.py
"""LLM conversation engine with streaming and tool support."""
import json
import logging
from typing import AsyncGenerator, Optional
from openai import AsyncOpenAI

from app.config import get_settings
from app.llm.conversation import ConversationMemory
from app.llm.tools import TOOL_SCHEMAS, execute_tool

logger = logging.getLogger(__name__)
settings = get_settings()

# System prompt for the voice assistant
SYSTEM_PROMPT = """You are a helpful voice assistant. You respond conversationally \
and concisely because your responses will be spoken aloud.

Guidelines:
- Keep responses brief (1-3 sentences) unless the user asks for detail.
- Avoid bullet points, markdown, or formatting - use natural spoken language.
- Use contractions and conversational tone (e.g., "I'll" not "I will").
- When using tools, briefly explain what you're doing.
- If you don't know something, say so honestly.
- For complex questions, give a concise answer first, then offer to elaborate.
- Never include URLs, code blocks, or special characters in responses.
- Pronounce numbers naturally (e.g., "seventy-two degrees" not "72°F").
"""


class ConversationEngine:
    """LLM-powered conversation engine with memory and tools.

    Manages the full lifecycle of a conversation turn:
    1. Accept user transcript
    2. Send to GPT-4o with conversation history
    3. Handle any tool calls
    4. Stream the response text

    Usage:
        engine = ConversationEngine()
        async for chunk in engine.generate_response("What time is it?"):
            print(chunk, end="")
    """

    def __init__(self):
        self.client = AsyncOpenAI(api_key=settings.openai_api_key)
        self.memory = ConversationMemory(
            max_turns=20,
            system_prompt=SYSTEM_PROMPT
        )
        self.model = settings.llm_model
        self.temperature = settings.llm_temperature
        self.max_tokens = settings.llm_max_tokens

    async def generate_response(
        self, user_text: str
    ) -> AsyncGenerator[str, None]:
        """Generate a streaming response to user input.

        Handles the full conversation turn including tool calls.
        Yields text chunks as they arrive from the LLM.

        Args:
            user_text: The user's transcribed speech

        Yields:
            Text chunks of the assistant's response
        """
        # Add user message to memory
        self.memory.add_user_message(user_text)

        # Build the API request
        messages = self.memory.get_messages()

        logger.info(f"LLM request: '{user_text}' "
                     f"({len(messages)} messages in context)")

        # First call - may include tool calls
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=messages,
            tools=TOOL_SCHEMAS,
            tool_choice="auto",
            temperature=self.temperature,
            max_tokens=self.max_tokens,
            stream=False  # Non-streaming for tool call detection
        )

        choice = response.choices[0]
        message = choice.message

        # Handle tool calls if present
        if message.tool_calls:
            logger.info(
                f"LLM requested {len(message.tool_calls)} tool call(s)"
            )

            for tool_call in message.tool_calls:
                func = tool_call.function

                # Record the tool call in memory
                self.memory.add_tool_call(
                    tool_call.id, func.name, func.arguments
                )

                # Execute the tool
                result = execute_tool(func.name, func.arguments)
                logger.info(
                    f"Tool '{func.name}' result: {result[:100]}..."
                )

                # Record the result in memory
                self.memory.add_tool_result(
                    tool_call.id, func.name, result
                )

            # Second call - stream the response after tool results
            messages = self.memory.get_messages()

            stream = await self.client.chat.completions.create(
                model=self.model,
                messages=messages,
                temperature=self.temperature,
                max_tokens=self.max_tokens,
                stream=True
            )

            full_response = ""
            async for chunk in stream:
                delta = chunk.choices[0].delta
                if delta.content:
                    full_response += delta.content
                    yield delta.content

            # Save full response to memory
            self.memory.add_assistant_message(full_response)

        else:
            # No tool calls - stream the response directly
            # Re-request with streaming enabled
            stream = await self.client.chat.completions.create(
                model=self.model,
                messages=messages,
                temperature=self.temperature,
                max_tokens=self.max_tokens,
                stream=True
            )

            full_response = ""
            async for chunk in stream:
                delta = chunk.choices[0].delta
                if delta.content:
                    full_response += delta.content
                    yield delta.content

            # Save full response to memory
            self.memory.add_assistant_message(full_response)

        logger.info(f"LLM response: '{full_response[:100]}...'")

    async def generate_response_full(self, user_text: str) -> str:
        """Generate a complete (non-streaming) response.

        Useful for testing and when streaming is not needed.

        Args:
            user_text: The user's transcribed speech

        Returns:
            Complete response text
        """
        chunks = []
        async for chunk in self.generate_response(user_text):
            chunks.append(chunk)
        return "".join(chunks)

    def reset(self):
        """Clear conversation memory and start fresh."""
        self.memory.clear()
        logger.info("Conversation memory cleared")
💡
Why Two API Calls for Tools? When the LLM decides to use a tool, the first (non-streaming) call returns the tool call request. We execute the tool, add the result to the conversation, and then make a second (streaming) call to get the natural language response. This two-step approach ensures tool results are incorporated before generating the spoken response.

Optimizing for Voice Output

The system prompt is critical for voice assistants. Unlike chat interfaces, responses must be spoken aloud. Here are key optimizations:

# app/llm/voice_optimizations.py
"""Post-processing to optimize LLM output for speech synthesis."""
import re


def clean_for_speech(text: str) -> str:
    """Clean LLM output so it sounds natural when spoken.

    Removes markdown, normalizes numbers, and converts
    abbreviations to spoken form.

    Args:
        text: Raw LLM response text

    Returns:
        Speech-optimized text
    """
    # Remove markdown formatting
    text = re.sub(r'\*\*(.*?)\*\*', r'\1', text)  # bold
    text = re.sub(r'\*(.*?)\*', r'\1', text)       # italic
    text = re.sub(r'`(.*?)`', r'\1', text)         # code
    text = re.sub(r'#{1,6}\s', '', text)            # headers
    text = re.sub(r'\[([^\]]+)\]\([^\)]+\)', r'\1', text)  # links

    # Remove bullet points and list markers
    text = re.sub(r'^\s*[-*+]\s+', '', text, flags=re.MULTILINE)
    text = re.sub(r'^\s*\d+\.\s+', '', text, flags=re.MULTILINE)

    # Normalize temperature
    text = re.sub(r'(\d+)°F', r'\1 degrees Fahrenheit', text)
    text = re.sub(r'(\d+)°C', r'\1 degrees Celsius', text)
    text = re.sub(r'(\d+)°', r'\1 degrees', text)

    # Normalize common abbreviations
    abbreviations = {
        'e.g.': 'for example',
        'i.e.': 'that is',
        'etc.': 'and so on',
        'vs.': 'versus',
        'approx.': 'approximately',
        'Dr.': 'Doctor',
        'Mr.': 'Mister',
        'Mrs.': 'Missus',
        'Ms.': 'Miz',
    }
    for abbr, expansion in abbreviations.items():
        text = text.replace(abbr, expansion)

    # Normalize URLs (in case any slip through)
    text = re.sub(r'https?://\S+', 'a web link', text)

    # Clean up extra whitespace
    text = re.sub(r'\n+', ' ', text)
    text = re.sub(r'\s+', ' ', text)

    return text.strip()


def split_into_sentences(text: str) -> list[str]:
    """Split text into sentences for incremental TTS.

    Sending complete sentences to TTS produces more natural
    prosody than sending arbitrary chunks.

    Args:
        text: Text to split

    Returns:
        List of sentences
    """
    # Split on sentence boundaries
    sentences = re.split(r'(?<=[.!?])\s+', text)
    return [s.strip() for s in sentences if s.strip()]

Testing the Conversation Engine

# tests/test_llm.py
"""Test the LLM conversation engine."""
import asyncio
from app.llm.engine import ConversationEngine
from app.llm.voice_optimizations import clean_for_speech, split_into_sentences


def test_clean_for_speech():
    """Test speech text cleaning."""
    raw = "The temperature is **72°F** with `partly cloudy` skies."
    cleaned = clean_for_speech(raw)
    assert "**" not in cleaned
    assert "`" not in cleaned
    assert "degrees Fahrenheit" in cleaned
    print(f"Clean: '{cleaned}'")


def test_sentence_splitting():
    """Test sentence splitting for TTS."""
    text = "Hello there! How can I help you today? I can check the weather or set a timer."
    sentences = split_into_sentences(text)
    assert len(sentences) == 3
    print(f"Sentences: {sentences}")


async def test_conversation():
    """Test a multi-turn conversation."""
    engine = ConversationEngine()

    # Turn 1
    print("\nUser: Hello!")
    response1 = await engine.generate_response_full("Hello!")
    print(f"Assistant: {response1}")

    # Turn 2 - should remember context
    print("\nUser: What's your name?")
    response2 = await engine.generate_response_full("What's your name?")
    print(f"Assistant: {response2}")

    # Turn 3 - tool use
    print("\nUser: What time is it?")
    response3 = await engine.generate_response_full("What time is it?")
    print(f"Assistant: {response3}")


async def test_streaming():
    """Test streaming response."""
    engine = ConversationEngine()

    print("\nUser: Tell me a fun fact about space.")
    print("Assistant: ", end="")
    async for chunk in engine.generate_response(
        "Tell me a fun fact about space."
    ):
        print(chunk, end="", flush=True)
    print()


if __name__ == "__main__":
    test_clean_for_speech()
    test_sentence_splitting()
    asyncio.run(test_conversation())
    asyncio.run(test_streaming())
    print("\nAll LLM tests passed!")
📝
Checkpoint: At this point the conversation engine can accept text input, maintain multi-turn context, call tools (time, weather, timers), and stream responses. The voice optimization layer ensures output is clean for speech synthesis. In the next lesson we will convert this text output into spoken audio.

Key Takeaways

  • The ConversationMemory class maintains a sliding window of conversation turns and summarizes older context to fit within the LLM context window.
  • Tool calling uses OpenAI's function-calling API: the LLM decides when to call a tool, we execute it, and then the LLM incorporates the result into its response.
  • The system prompt is critical for voice: it instructs the LLM to respond concisely and conversationally for spoken output.
  • Post-processing removes markdown, normalizes abbreviations, and splits text into sentences for natural TTS prosody.

What Is Next

In the next lesson, you will build the text-to-speech module — integrating ElevenLabs and OpenAI TTS to convert the LLM response text into natural-sounding streaming audio.