Step 2: LLM Conversation Engine
Build the brain of your voice assistant. You will create a dialog manager with conversation memory, define tools the assistant can call, implement streaming response generation, and wire everything together into a conversation engine powered by GPT-4o.
Conversation Memory
A voice assistant must remember what was said earlier in the conversation. We implement a sliding-window memory that keeps the last N turns and summarizes older context when the window is full.
# app/llm/conversation.py
"""Conversation memory and dialog management."""
import logging
from typing import Optional
from dataclasses import dataclass, field
logger = logging.getLogger(__name__)
@dataclass
class Message:
"""A single message in the conversation."""
role: str # "system", "user", "assistant", or "tool"
content: str
name: Optional[str] = None # Tool name (for tool messages)
tool_call_id: Optional[str] = None # For tool response messages
class ConversationMemory:
"""Sliding-window conversation memory with summarization.
Keeps the last `max_turns` exchanges in full detail.
When the window overflows, older messages are summarized
into a compact context string that is prepended to the
system message.
"""
def __init__(self, max_turns: int = 20, system_prompt: str = ""):
self.max_turns = max_turns
self.system_prompt = system_prompt
self.messages: list[dict] = []
self._summary: str = ""
def get_system_message(self) -> dict:
"""Build the system message with optional summary context."""
content = self.system_prompt
if self._summary:
content += (
f"\n\n[Previous conversation summary: {self._summary}]"
)
return {"role": "system", "content": content}
def add_user_message(self, text: str):
"""Add a user message to the conversation."""
self.messages.append({"role": "user", "content": text})
self._trim_if_needed()
logger.debug(f"Added user message: '{text[:50]}...'")
def add_assistant_message(self, text: str):
"""Add an assistant response to the conversation."""
self.messages.append({"role": "assistant", "content": text})
self._trim_if_needed()
def add_tool_call(self, tool_call_id: str, name: str, arguments: str):
"""Record a tool call made by the assistant."""
self.messages.append({
"role": "assistant",
"content": None,
"tool_calls": [{
"id": tool_call_id,
"type": "function",
"function": {"name": name, "arguments": arguments}
}]
})
def add_tool_result(self, tool_call_id: str, name: str, result: str):
"""Add the result of a tool call."""
self.messages.append({
"role": "tool",
"tool_call_id": tool_call_id,
"name": name,
"content": result
})
def get_messages(self) -> list[dict]:
"""Get the full message list for the API call."""
return [self.get_system_message()] + self.messages
def _trim_if_needed(self):
"""Trim old messages when the window overflows."""
# Count user+assistant pairs as "turns"
turn_count = sum(
1 for m in self.messages if m["role"] == "user"
)
if turn_count > self.max_turns:
# Summarize the oldest messages
old_messages = self.messages[:4] # First 2 turns
summary_parts = []
for msg in old_messages:
role = msg["role"]
content = msg.get("content", "")
if content:
summary_parts.append(f"{role}: {content[:100]}")
self._summary += " | ".join(summary_parts) + " | "
# Remove the summarized messages
self.messages = self.messages[4:]
logger.debug(
f"Trimmed conversation: {turn_count} turns -> "
f"{turn_count - 2} turns"
)
def clear(self):
"""Clear the conversation history."""
self.messages = []
self._summary = ""
Tool Definitions
Tools let the assistant take real-world actions: check the weather, set a timer, search the web, or control smart devices. We define tools as OpenAI function-calling schemas:
# app/llm/tools.py
"""Tool definitions and execution for the voice assistant."""
import json
import logging
from datetime import datetime, timedelta
from typing import Any
logger = logging.getLogger(__name__)
# ============================================================
# Tool Schemas (OpenAI function calling format)
# ============================================================
TOOL_SCHEMAS = [
{
"type": "function",
"function": {
"name": "get_current_time",
"description": "Get the current date and time.",
"parameters": {
"type": "object",
"properties": {
"timezone": {
"type": "string",
"description": "Timezone (e.g., 'US/Eastern', 'Europe/London'). Defaults to UTC."
}
},
"required": []
}
}
},
{
"type": "function",
"function": {
"name": "set_timer",
"description": "Set a countdown timer for a specified duration.",
"parameters": {
"type": "object",
"properties": {
"duration_seconds": {
"type": "integer",
"description": "Timer duration in seconds"
},
"label": {
"type": "string",
"description": "Optional label for the timer (e.g., 'pasta timer')"
}
},
"required": ["duration_seconds"]
}
}
},
{
"type": "function",
"function": {
"name": "get_weather",
"description": "Get the current weather for a location.",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "City name (e.g., 'San Francisco, CA')"
}
},
"required": ["location"]
}
}
},
{
"type": "function",
"function": {
"name": "web_search",
"description": "Search the web for current information.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The search query"
}
},
"required": ["query"]
}
}
},
{
"type": "function",
"function": {
"name": "set_reminder",
"description": "Set a reminder for a specific time.",
"parameters": {
"type": "object",
"properties": {
"message": {
"type": "string",
"description": "The reminder message"
},
"minutes_from_now": {
"type": "integer",
"description": "Minutes from now to trigger the reminder"
}
},
"required": ["message", "minutes_from_now"]
}
}
}
]
# ============================================================
# Tool Implementations
# ============================================================
# In-memory store for timers and reminders
_active_timers: list[dict] = []
_active_reminders: list[dict] = []
def execute_tool(name: str, arguments: str) -> str:
"""Execute a tool by name and return the result as a string.
Args:
name: Tool function name
arguments: JSON string of arguments
Returns:
Result string to send back to the LLM
"""
args = json.loads(arguments)
logger.info(f"Executing tool: {name}({args})")
if name == "get_current_time":
return _get_current_time(args.get("timezone", "UTC"))
elif name == "set_timer":
return _set_timer(
args["duration_seconds"],
args.get("label", "Timer")
)
elif name == "get_weather":
return _get_weather(args["location"])
elif name == "web_search":
return _web_search(args["query"])
elif name == "set_reminder":
return _set_reminder(
args["message"],
args["minutes_from_now"]
)
else:
return f"Unknown tool: {name}"
def _get_current_time(timezone: str = "UTC") -> str:
"""Get the current time."""
now = datetime.utcnow()
return json.dumps({
"time": now.strftime("%I:%M %p"),
"date": now.strftime("%A, %B %d, %Y"),
"timezone": timezone
})
def _set_timer(duration_seconds: int, label: str = "Timer") -> str:
"""Set a countdown timer."""
end_time = datetime.utcnow() + timedelta(seconds=duration_seconds)
timer = {
"label": label,
"duration_seconds": duration_seconds,
"end_time": end_time.isoformat()
}
_active_timers.append(timer)
minutes = duration_seconds // 60
seconds = duration_seconds % 60
time_str = f"{minutes}m {seconds}s" if minutes else f"{seconds}s"
return json.dumps({
"status": "Timer set",
"label": label,
"duration": time_str,
"end_time": end_time.strftime("%I:%M %p")
})
def _get_weather(location: str) -> str:
"""Get weather for a location.
NOTE: In production, integrate a real weather API
(OpenWeatherMap, WeatherAPI, etc.). This is a stub.
"""
# Stub response - replace with real API call
return json.dumps({
"location": location,
"temperature": "72F / 22C",
"condition": "Partly cloudy",
"humidity": "45%",
"wind": "10 mph",
"note": "This is simulated data. Integrate a weather API for real results."
})
def _web_search(query: str) -> str:
"""Search the web.
NOTE: In production, integrate a real search API
(SerpAPI, Brave Search, Tavily, etc.). This is a stub.
"""
return json.dumps({
"query": query,
"results": [
{"title": "Search result placeholder",
"snippet": f"Results for: {query}",
"url": "https://example.com"}
],
"note": "This is simulated data. Integrate a search API for real results."
})
def _set_reminder(message: str, minutes_from_now: int) -> str:
"""Set a reminder."""
trigger_time = datetime.utcnow() + timedelta(minutes=minutes_from_now)
reminder = {
"message": message,
"trigger_time": trigger_time.isoformat()
}
_active_reminders.append(reminder)
return json.dumps({
"status": "Reminder set",
"message": message,
"trigger_time": trigger_time.strftime("%I:%M %p")
})
The Conversation Engine
The engine orchestrates everything: it takes a user transcript, sends it to GPT-4o with conversation history and tools, handles any tool calls, and streams the response text back.
# app/llm/engine.py
"""LLM conversation engine with streaming and tool support."""
import json
import logging
from typing import AsyncGenerator, Optional
from openai import AsyncOpenAI
from app.config import get_settings
from app.llm.conversation import ConversationMemory
from app.llm.tools import TOOL_SCHEMAS, execute_tool
logger = logging.getLogger(__name__)
settings = get_settings()
# System prompt for the voice assistant
SYSTEM_PROMPT = """You are a helpful voice assistant. You respond conversationally \
and concisely because your responses will be spoken aloud.
Guidelines:
- Keep responses brief (1-3 sentences) unless the user asks for detail.
- Avoid bullet points, markdown, or formatting - use natural spoken language.
- Use contractions and conversational tone (e.g., "I'll" not "I will").
- When using tools, briefly explain what you're doing.
- If you don't know something, say so honestly.
- For complex questions, give a concise answer first, then offer to elaborate.
- Never include URLs, code blocks, or special characters in responses.
- Pronounce numbers naturally (e.g., "seventy-two degrees" not "72°F").
"""
class ConversationEngine:
"""LLM-powered conversation engine with memory and tools.
Manages the full lifecycle of a conversation turn:
1. Accept user transcript
2. Send to GPT-4o with conversation history
3. Handle any tool calls
4. Stream the response text
Usage:
engine = ConversationEngine()
async for chunk in engine.generate_response("What time is it?"):
print(chunk, end="")
"""
def __init__(self):
self.client = AsyncOpenAI(api_key=settings.openai_api_key)
self.memory = ConversationMemory(
max_turns=20,
system_prompt=SYSTEM_PROMPT
)
self.model = settings.llm_model
self.temperature = settings.llm_temperature
self.max_tokens = settings.llm_max_tokens
async def generate_response(
self, user_text: str
) -> AsyncGenerator[str, None]:
"""Generate a streaming response to user input.
Handles the full conversation turn including tool calls.
Yields text chunks as they arrive from the LLM.
Args:
user_text: The user's transcribed speech
Yields:
Text chunks of the assistant's response
"""
# Add user message to memory
self.memory.add_user_message(user_text)
# Build the API request
messages = self.memory.get_messages()
logger.info(f"LLM request: '{user_text}' "
f"({len(messages)} messages in context)")
# First call - may include tool calls
response = await self.client.chat.completions.create(
model=self.model,
messages=messages,
tools=TOOL_SCHEMAS,
tool_choice="auto",
temperature=self.temperature,
max_tokens=self.max_tokens,
stream=False # Non-streaming for tool call detection
)
choice = response.choices[0]
message = choice.message
# Handle tool calls if present
if message.tool_calls:
logger.info(
f"LLM requested {len(message.tool_calls)} tool call(s)"
)
for tool_call in message.tool_calls:
func = tool_call.function
# Record the tool call in memory
self.memory.add_tool_call(
tool_call.id, func.name, func.arguments
)
# Execute the tool
result = execute_tool(func.name, func.arguments)
logger.info(
f"Tool '{func.name}' result: {result[:100]}..."
)
# Record the result in memory
self.memory.add_tool_result(
tool_call.id, func.name, result
)
# Second call - stream the response after tool results
messages = self.memory.get_messages()
stream = await self.client.chat.completions.create(
model=self.model,
messages=messages,
temperature=self.temperature,
max_tokens=self.max_tokens,
stream=True
)
full_response = ""
async for chunk in stream:
delta = chunk.choices[0].delta
if delta.content:
full_response += delta.content
yield delta.content
# Save full response to memory
self.memory.add_assistant_message(full_response)
else:
# No tool calls - stream the response directly
# Re-request with streaming enabled
stream = await self.client.chat.completions.create(
model=self.model,
messages=messages,
temperature=self.temperature,
max_tokens=self.max_tokens,
stream=True
)
full_response = ""
async for chunk in stream:
delta = chunk.choices[0].delta
if delta.content:
full_response += delta.content
yield delta.content
# Save full response to memory
self.memory.add_assistant_message(full_response)
logger.info(f"LLM response: '{full_response[:100]}...'")
async def generate_response_full(self, user_text: str) -> str:
"""Generate a complete (non-streaming) response.
Useful for testing and when streaming is not needed.
Args:
user_text: The user's transcribed speech
Returns:
Complete response text
"""
chunks = []
async for chunk in self.generate_response(user_text):
chunks.append(chunk)
return "".join(chunks)
def reset(self):
"""Clear conversation memory and start fresh."""
self.memory.clear()
logger.info("Conversation memory cleared")
Optimizing for Voice Output
The system prompt is critical for voice assistants. Unlike chat interfaces, responses must be spoken aloud. Here are key optimizations:
# app/llm/voice_optimizations.py
"""Post-processing to optimize LLM output for speech synthesis."""
import re
def clean_for_speech(text: str) -> str:
"""Clean LLM output so it sounds natural when spoken.
Removes markdown, normalizes numbers, and converts
abbreviations to spoken form.
Args:
text: Raw LLM response text
Returns:
Speech-optimized text
"""
# Remove markdown formatting
text = re.sub(r'\*\*(.*?)\*\*', r'\1', text) # bold
text = re.sub(r'\*(.*?)\*', r'\1', text) # italic
text = re.sub(r'`(.*?)`', r'\1', text) # code
text = re.sub(r'#{1,6}\s', '', text) # headers
text = re.sub(r'\[([^\]]+)\]\([^\)]+\)', r'\1', text) # links
# Remove bullet points and list markers
text = re.sub(r'^\s*[-*+]\s+', '', text, flags=re.MULTILINE)
text = re.sub(r'^\s*\d+\.\s+', '', text, flags=re.MULTILINE)
# Normalize temperature
text = re.sub(r'(\d+)°F', r'\1 degrees Fahrenheit', text)
text = re.sub(r'(\d+)°C', r'\1 degrees Celsius', text)
text = re.sub(r'(\d+)°', r'\1 degrees', text)
# Normalize common abbreviations
abbreviations = {
'e.g.': 'for example',
'i.e.': 'that is',
'etc.': 'and so on',
'vs.': 'versus',
'approx.': 'approximately',
'Dr.': 'Doctor',
'Mr.': 'Mister',
'Mrs.': 'Missus',
'Ms.': 'Miz',
}
for abbr, expansion in abbreviations.items():
text = text.replace(abbr, expansion)
# Normalize URLs (in case any slip through)
text = re.sub(r'https?://\S+', 'a web link', text)
# Clean up extra whitespace
text = re.sub(r'\n+', ' ', text)
text = re.sub(r'\s+', ' ', text)
return text.strip()
def split_into_sentences(text: str) -> list[str]:
"""Split text into sentences for incremental TTS.
Sending complete sentences to TTS produces more natural
prosody than sending arbitrary chunks.
Args:
text: Text to split
Returns:
List of sentences
"""
# Split on sentence boundaries
sentences = re.split(r'(?<=[.!?])\s+', text)
return [s.strip() for s in sentences if s.strip()]
Testing the Conversation Engine
# tests/test_llm.py
"""Test the LLM conversation engine."""
import asyncio
from app.llm.engine import ConversationEngine
from app.llm.voice_optimizations import clean_for_speech, split_into_sentences
def test_clean_for_speech():
"""Test speech text cleaning."""
raw = "The temperature is **72°F** with `partly cloudy` skies."
cleaned = clean_for_speech(raw)
assert "**" not in cleaned
assert "`" not in cleaned
assert "degrees Fahrenheit" in cleaned
print(f"Clean: '{cleaned}'")
def test_sentence_splitting():
"""Test sentence splitting for TTS."""
text = "Hello there! How can I help you today? I can check the weather or set a timer."
sentences = split_into_sentences(text)
assert len(sentences) == 3
print(f"Sentences: {sentences}")
async def test_conversation():
"""Test a multi-turn conversation."""
engine = ConversationEngine()
# Turn 1
print("\nUser: Hello!")
response1 = await engine.generate_response_full("Hello!")
print(f"Assistant: {response1}")
# Turn 2 - should remember context
print("\nUser: What's your name?")
response2 = await engine.generate_response_full("What's your name?")
print(f"Assistant: {response2}")
# Turn 3 - tool use
print("\nUser: What time is it?")
response3 = await engine.generate_response_full("What time is it?")
print(f"Assistant: {response3}")
async def test_streaming():
"""Test streaming response."""
engine = ConversationEngine()
print("\nUser: Tell me a fun fact about space.")
print("Assistant: ", end="")
async for chunk in engine.generate_response(
"Tell me a fun fact about space."
):
print(chunk, end="", flush=True)
print()
if __name__ == "__main__":
test_clean_for_speech()
test_sentence_splitting()
asyncio.run(test_conversation())
asyncio.run(test_streaming())
print("\nAll LLM tests passed!")
Key Takeaways
- The
ConversationMemoryclass maintains a sliding window of conversation turns and summarizes older context to fit within the LLM context window. - Tool calling uses OpenAI's function-calling API: the LLM decides when to call a tool, we execute it, and then the LLM incorporates the result into its response.
- The system prompt is critical for voice: it instructs the LLM to respond concisely and conversationally for spoken output.
- Post-processing removes markdown, normalizes abbreviations, and splits text into sentences for natural TTS prosody.
What Is Next
In the next lesson, you will build the text-to-speech module — integrating ElevenLabs and OpenAI TTS to convert the LLM response text into natural-sounding streaming audio.