Intermediate

Conversational AI

Build voice-enabled AI assistants by combining speech-to-text, intent classification, LLM response generation, and text-to-speech into a seamless conversational pipeline that handles real-time interactions.

The Multi-Model Conversation Pipeline

A modern conversational AI system is not a single model — it is a carefully orchestrated pipeline of four or more specialized models working together. When a user speaks to a voice assistant, the audio passes through a speech-to-text model (Whisper), the transcript is analyzed by an intent classifier to determine what the user wants, the classified intent routes to an LLM for intelligent response generation, and finally a text-to-speech model converts the response back to natural-sounding audio.

This multi-model approach is how every major voice assistant works — from Alexa and Siri to enterprise call center systems handling millions of customer interactions daily. The key is choosing the right model for each stage and connecting them with minimal latency.

💡
Pipeline overview: Audio Input → Speech-to-Text (Whisper) → Intent Classification (BERT/DistilBERT) → LLM Response (Claude/GPT) → Text-to-Speech (ElevenLabs/OpenAI) → Audio Output. Total target latency for real-time: under 2 seconds end-to-end.

Speech-to-Text (STT) Providers

The first step is converting user speech to text. Here is how the leading STT solutions compare:

ProviderModelLanguagesReal-timeAccuracy (WER)Cost
OpenAIWhisper large-v399Yes (API)~5% (English)$0.006/min
OpenAIWhisper (self-hosted)99With streaming setup~5% (English)Free (GPU cost)
DeepgramNova-236Yes (WebSocket)~4% (English)$0.0043/min
GoogleChirp 2100+Yes~5% (English)$0.016/min
AWSTranscribe100+Yes~6% (English)$0.024/min
AssemblyAIUniversal-230+Yes~4.5% (English)$0.015/min

Text-to-Speech (TTS) Providers

ProviderQualityVoicesLatencyCustom VoiceCost
ElevenLabsHighest (near-human)1000+ prebuilt, clone any~300ms first byteYes (30s sample)$0.30/1K chars
OpenAI TTSVery High6 prebuilt~500ms first byteNo$0.015/1K chars
Google TTSHigh200+ prebuilt~200msYes$0.016/1M chars
Amazon PollyGood60+ prebuilt~100msNo$4/1M chars
Bark (open-source)GoodGenerative~2s (GPU)Prompt-basedFree (GPU cost)

Voice Assistant: Whisper + Claude + ElevenLabs

Here is a complete voice assistant implementation that records user speech, transcribes it with Whisper, generates an intelligent response with Claude, and speaks it back with ElevenLabs:

import io
import json
import time
import openai
import anthropic
import requests
import sounddevice as sd
import soundfile as sf
import numpy as np
from dataclasses import dataclass, field

@dataclass
class ConversationTurn:
    """Represents a single turn in the conversation."""
    user_audio_duration: float = 0
    user_text: str = ""
    intent: str = ""
    intent_confidence: float = 0
    assistant_text: str = ""
    stt_latency_ms: int = 0
    llm_latency_ms: int = 0
    tts_latency_ms: int = 0
    total_latency_ms: int = 0

class VoiceAssistant:
    """Multi-model voice assistant: Whisper + Claude + ElevenLabs."""

    def __init__(self, config: dict):
        self.openai_client = openai.OpenAI()
        self.anthropic_client = anthropic.Anthropic()
        self.elevenlabs_api_key = config["elevenlabs_api_key"]
        self.elevenlabs_voice_id = config.get(
            "elevenlabs_voice_id", "21m00Tcm4TlvDq8ikWAM"
        )
        self.sample_rate = 16000
        self.conversation_history = []
        self.system_prompt = config.get("system_prompt", """You are a
helpful voice assistant. Keep responses concise and conversational
(under 3 sentences unless the user asks for detail). Be warm and
natural - remember this will be spoken aloud.""")

    def record_audio(self, duration: float = 5.0) -> np.ndarray:
        """Record audio from microphone."""
        print("Listening...")
        audio = sd.rec(
            int(duration * self.sample_rate),
            samplerate=self.sample_rate,
            channels=1,
            dtype="float32"
        )
        sd.wait()
        print("Processing...")
        return audio.flatten()

    def speech_to_text(self, audio: np.ndarray) -> tuple[str, int]:
        """Transcribe audio using OpenAI Whisper API."""
        start = time.time()

        # Convert numpy array to WAV bytes
        buffer = io.BytesIO()
        sf.write(buffer, audio, self.sample_rate, format="WAV")
        buffer.seek(0)
        buffer.name = "audio.wav"

        response = self.openai_client.audio.transcriptions.create(
            model="whisper-1",
            file=buffer,
            language="en"
        )

        latency = int((time.time() - start) * 1000)
        return response.text, latency

    def classify_intent(self, text: str) -> tuple[str, float]:
        """Classify user intent for routing decisions."""
        # Using a lightweight classifier for speed
        from transformers import pipeline
        classifier = pipeline(
            "zero-shot-classification",
            model="facebook/bart-large-mnli"
        )
        labels = [
            "question", "command", "complaint",
            "greeting", "farewell", "small_talk",
            "booking", "technical_support", "billing"
        ]
        result = classifier(text, labels)
        return result["labels"][0], result["scores"][0]

    def generate_response(self, user_text: str,
                          intent: str) -> tuple[str, int]:
        """Generate response using Claude with conversation
        history and intent context."""
        start = time.time()

        # Add user message to history
        self.conversation_history.append({
            "role": "user",
            "content": user_text
        })

        # Build messages with intent context
        messages = self.conversation_history.copy()

        # Add intent as system context
        system = f"""{self.system_prompt}

Current user intent: {intent}. Tailor your response accordingly.
If intent is "complaint", be empathetic and solution-oriented.
If intent is "booking", help with scheduling.
If intent is "technical_support", be precise and step-by-step.
If intent is "billing", be accurate and offer to connect to billing."""

        response = self.anthropic_client.messages.create(
            model="claude-sonnet-4-20250514",
            max_tokens=300,
            system=system,
            messages=messages
        )

        assistant_text = response.content[0].text
        self.conversation_history.append({
            "role": "assistant",
            "content": assistant_text
        })

        # Keep conversation history manageable
        if len(self.conversation_history) > 20:
            self.conversation_history = (
                self.conversation_history[-16:]
            )

        latency = int((time.time() - start) * 1000)
        return assistant_text, latency

    def text_to_speech(self, text: str) -> tuple[bytes, int]:
        """Convert text to speech using ElevenLabs API."""
        start = time.time()

        url = (f"https://api.elevenlabs.io/v1/text-to-speech/"
               f"{self.elevenlabs_voice_id}/stream")

        response = requests.post(
            url,
            headers={
                "xi-api-key": self.elevenlabs_api_key,
                "Content-Type": "application/json"
            },
            json={
                "text": text,
                "model_id": "eleven_turbo_v2_5",
                "voice_settings": {
                    "stability": 0.5,
                    "similarity_boost": 0.75,
                    "style": 0.3
                }
            },
            stream=True
        )

        audio_bytes = b""
        for chunk in response.iter_content(chunk_size=1024):
            audio_bytes += chunk

        latency = int((time.time() - start) * 1000)
        return audio_bytes, latency

    def play_audio(self, audio_bytes: bytes):
        """Play audio response through speakers."""
        buffer = io.BytesIO(audio_bytes)
        data, rate = sf.read(buffer)
        sd.play(data, rate)
        sd.wait()

    def conversation_turn(self) -> ConversationTurn:
        """Execute one full conversation turn."""
        turn = ConversationTurn()
        total_start = time.time()

        # Step 1: Record user audio
        audio = self.record_audio(duration=5.0)
        turn.user_audio_duration = len(audio) / self.sample_rate

        # Step 2: Speech-to-Text
        turn.user_text, turn.stt_latency_ms = (
            self.speech_to_text(audio)
        )
        print(f"User: {turn.user_text}")

        if not turn.user_text.strip():
            turn.assistant_text = "I didn't catch that."
            return turn

        # Step 3: Intent Classification
        turn.intent, turn.intent_confidence = (
            self.classify_intent(turn.user_text)
        )

        # Step 4: LLM Response Generation
        turn.assistant_text, turn.llm_latency_ms = (
            self.generate_response(turn.user_text, turn.intent)
        )
        print(f"Assistant: {turn.assistant_text}")

        # Step 5: Text-to-Speech
        audio_response, turn.tts_latency_ms = (
            self.text_to_speech(turn.assistant_text)
        )

        # Step 6: Play audio
        self.play_audio(audio_response)

        turn.total_latency_ms = int(
            (time.time() - total_start) * 1000
        )
        return turn

    def run(self):
        """Run the voice assistant in a loop."""
        print("Voice Assistant ready. Speak to begin.")
        while True:
            try:
                turn = self.conversation_turn()
                print(f"  Latency - STT: {turn.stt_latency_ms}ms, "
                      f"LLM: {turn.llm_latency_ms}ms, "
                      f"TTS: {turn.tts_latency_ms}ms, "
                      f"Total: {turn.total_latency_ms}ms")
            except KeyboardInterrupt:
                print("\nGoodbye!")
                break


# Run the assistant
assistant = VoiceAssistant({
    "elevenlabs_api_key": "your-api-key",
    "system_prompt": "You are a helpful customer support agent for "
                     "an e-commerce company."
})
assistant.run()

Intent Classification and Routing

In production conversational AI, intent classification determines which backend system handles the request. A billing question routes to the billing API, a technical issue routes to the knowledge base, and a general question routes to the LLM. Here is a full intent classification and routing system:

from transformers import pipeline
from dataclasses import dataclass
from typing import Callable

@dataclass
class IntentRoute:
    """Maps an intent to a handler function."""
    intent: str
    handler: Callable
    requires_auth: bool = False
    fallback_to_llm: bool = True

class IntentRouter:
    """Classify user intent and route to specialized handlers."""

    def __init__(self):
        self.classifier = pipeline(
            "zero-shot-classification",
            model="facebook/bart-large-mnli"
        )
        self.routes: dict[str, IntentRoute] = {}
        self.confidence_threshold = 0.6

        # Register default routes
        self._register_defaults()

    def _register_defaults(self):
        self.register("check_order_status", self._handle_order,
                      requires_auth=True)
        self.register("cancel_order", self._handle_cancellation,
                      requires_auth=True)
        self.register("technical_issue", self._handle_tech_support)
        self.register("billing_question", self._handle_billing,
                      requires_auth=True)
        self.register("product_question", self._handle_product_qa)
        self.register("greeting", self._handle_greeting)
        self.register("farewell", self._handle_farewell)

    def register(self, intent: str, handler: Callable,
                 requires_auth: bool = False):
        self.routes[intent] = IntentRoute(
            intent=intent,
            handler=handler,
            requires_auth=requires_auth
        )

    def classify_and_route(self, text: str,
                           user_context: dict) -> dict:
        """Classify intent and route to appropriate handler."""
        # Classify
        labels = list(self.routes.keys())
        result = self.classifier(text, labels)
        top_intent = result["labels"][0]
        confidence = result["scores"][0]

        # Check confidence threshold
        if confidence < self.confidence_threshold:
            return self._fallback_llm_response(text, user_context)

        route = self.routes[top_intent]

        # Check authentication if required
        if route.requires_auth and not user_context.get("authenticated"):
            return {
                "intent": top_intent,
                "response": "I need to verify your identity first. "
                           "Could you provide your account number?",
                "action": "request_auth"
            }

        # Execute handler
        response = route.handler(text, user_context)
        return {
            "intent": top_intent,
            "confidence": confidence,
            **response
        }

    def _handle_order(self, text, ctx) -> dict:
        # Query order API
        order_id = self._extract_order_id(text)
        return {
            "response": f"Let me look up order {order_id} for you.",
            "action": "query_order_api",
            "params": {"order_id": order_id}
        }

    def _handle_cancellation(self, text, ctx) -> dict:
        return {
            "response": "I can help you cancel that order. "
                       "Let me pull up the details.",
            "action": "initiate_cancellation"
        }

    def _handle_tech_support(self, text, ctx) -> dict:
        return {
            "response": "Let me search our knowledge base for "
                       "a solution.",
            "action": "search_knowledge_base",
            "params": {"query": text}
        }

    def _handle_billing(self, text, ctx) -> dict:
        return {
            "response": "I'll look into your billing question.",
            "action": "query_billing_api"
        }

    def _handle_product_qa(self, text, ctx) -> dict:
        return {
            "response": "Let me find information about that product.",
            "action": "search_product_catalog",
            "params": {"query": text}
        }

    def _handle_greeting(self, text, ctx) -> dict:
        name = ctx.get("user_name", "there")
        return {"response": f"Hello {name}! How can I help you?"}

    def _handle_farewell(self, text, ctx) -> dict:
        return {
            "response": "Thank you for contacting us. Have a "
                       "great day!",
            "action": "end_conversation"
        }

    def _fallback_llm_response(self, text, ctx) -> dict:
        return {
            "intent": "unknown",
            "response": None,
            "action": "route_to_llm",
            "params": {"original_text": text}
        }

    def _extract_order_id(self, text: str) -> str:
        import re
        match = re.search(r"[A-Z]{2,3}-?\d{4,}", text.upper())
        return match.group(0) if match else "unknown"

WebSocket Streaming for Real-Time Interaction

For the lowest latency conversational experience, use WebSocket streaming to process audio in real time instead of waiting for the user to finish speaking:

import asyncio
import websockets
import json
from deepgram import DeepgramClient, LiveTranscriptionEvents

class RealtimeConversation:
    """Real-time conversational AI with streaming STT and TTS."""

    def __init__(self, config: dict):
        self.deepgram = DeepgramClient(config["deepgram_api_key"])
        self.anthropic = anthropic.Anthropic()
        self.partial_transcript = ""
        self.is_speaking = False

    async def handle_client(self, websocket):
        """Handle a WebSocket client connection."""
        # Start Deepgram live transcription
        dg_connection = self.deepgram.listen.live.v("1")

        dg_connection.on(
            LiveTranscriptionEvents.Transcript,
            lambda _, result: asyncio.ensure_future(
                self._on_transcript(websocket, result)
            )
        )

        options = {
            "model": "nova-2",
            "language": "en",
            "smart_format": True,
            "interim_results": True,
            "endpointing": 300,  # 300ms silence = end of utterance
            "vad_events": True
        }
        dg_connection.start(options)

        try:
            async for message in websocket:
                if isinstance(message, bytes):
                    # Forward audio to Deepgram
                    dg_connection.send(message)
        finally:
            dg_connection.finish()

    async def _on_transcript(self, websocket, result):
        """Handle transcript results from Deepgram."""
        transcript = (
            result.channel.alternatives[0].transcript
        )
        is_final = result.is_final

        if not transcript:
            return

        if is_final:
            self.partial_transcript += " " + transcript

            # Check if utterance is complete
            if result.speech_final:
                full_text = self.partial_transcript.strip()
                self.partial_transcript = ""

                # Send transcript to client
                await websocket.send(json.dumps({
                    "type": "transcript",
                    "text": full_text,
                    "final": True
                }))

                # Generate and stream response
                await self._stream_response(websocket, full_text)
        else:
            # Send partial transcript for live display
            await websocket.send(json.dumps({
                "type": "transcript",
                "text": transcript,
                "final": False
            }))

    async def _stream_response(self, websocket, user_text: str):
        """Stream LLM response and TTS audio back to client."""
        # Stream LLM response
        full_response = ""
        with self.anthropic.messages.stream(
            model="claude-sonnet-4-20250514",
            max_tokens=200,
            messages=[{"role": "user", "content": user_text}]
        ) as stream:
            sentence_buffer = ""
            for text in stream.text_stream:
                full_response += text
                sentence_buffer += text

                # Send TTS for each complete sentence
                if any(sentence_buffer.endswith(p)
                       for p in [".", "!", "?", "\n"]):
                    audio = await self._tts_async(sentence_buffer)
                    await websocket.send(audio)  # binary frame
                    await websocket.send(json.dumps({
                        "type": "response_text",
                        "text": sentence_buffer
                    }))
                    sentence_buffer = ""

            # Send remaining text
            if sentence_buffer.strip():
                audio = await self._tts_async(sentence_buffer)
                await websocket.send(audio)

    async def _tts_async(self, text: str) -> bytes:
        """Generate TTS audio asynchronously."""
        # Using OpenAI TTS for lower latency
        response = self.openai_client.audio.speech.create(
            model="tts-1",
            voice="nova",
            input=text,
            response_format="opus"
        )
        return response.content


# Start server
async def main():
    server = RealtimeConversation({
        "deepgram_api_key": "your-key"
    })
    async with websockets.serve(
        server.handle_client, "localhost", 8765
    ):
        await asyncio.Future()  # run forever

asyncio.run(main())

Emotion Detection and Adaptive Responses

Adding emotion detection allows your conversational AI to adjust its tone and approach based on the user's emotional state. An angry customer gets a more empathetic response; a confused user gets simpler explanations:

from transformers import pipeline

class EmotionAwareResponder:
    """Detect user emotion and adapt LLM response style."""

    def __init__(self):
        self.emotion_classifier = pipeline(
            "text-classification",
            model="j-hartmann/emotion-english-distilroberta-base",
            top_k=3
        )
        self.emotion_prompts = {
            "anger": "The user sounds frustrated or angry. "
                     "Be empathetic, apologize if appropriate, "
                     "and focus on solving their problem quickly.",
            "sadness": "The user sounds sad or disappointed. "
                       "Be warm, supportive, and understanding.",
            "fear": "The user sounds worried or anxious. "
                    "Be reassuring, provide clear information, "
                    "and reduce uncertainty.",
            "joy": "The user sounds happy. Match their positive "
                   "energy and be enthusiastic.",
            "surprise": "The user sounds surprised. Provide "
                        "clear context and explanation.",
            "neutral": "Respond in a friendly, professional tone.",
            "disgust": "The user is dissatisfied. Acknowledge "
                       "their concern and offer solutions."
        }

    def detect_emotion(self, text: str) -> dict:
        results = self.emotion_classifier(text)
        primary = results[0][0]
        return {
            "emotion": primary["label"],
            "confidence": primary["score"],
            "all_emotions": {
                r["label"]: round(r["score"], 3)
                for r in results[0]
            }
        }

    def get_emotion_context(self, text: str) -> str:
        emotion = self.detect_emotion(text)
        return self.emotion_prompts.get(
            emotion["emotion"], self.emotion_prompts["neutral"]
        )

Multi-Turn Conversation State Management

Managing context across multiple turns is critical. Here are three patterns for conversation state:

💡

State management patterns:

  • Full history: Send all previous messages to the LLM. Simple but expensive and hits context limits on long conversations.
  • Sliding window: Keep only the last N turns. Good for most cases. Loses early context but keeps costs predictable.
  • Summary + recent: Summarize older turns into a system message, keep last 4–6 turns verbatim. Best balance of context and cost. Use this for production systems.

Fallback Handling and Error Recovery

  • STT failure: If transcription returns empty or confidence is below threshold, ask the user to repeat. "I didn't quite catch that. Could you say that again?"
  • Low intent confidence: If the classifier is uncertain, route to the LLM for general handling rather than a specialized system that might give wrong results.
  • LLM timeout: Set a 5-second timeout on LLM calls. If exceeded, return a canned response: "Let me look into that and get back to you."
  • TTS failure: Fall back to a text response displayed on screen. Always have a text-based fallback for any voice feature.
  • Silence detection: If the user goes silent for 30+ seconds, prompt: "Are you still there? Is there anything else I can help with?"

Use Cases by Industry

IndustryApplicationKey ModelsScale
Call CentersAutomated customer support, call routing, agent assistWhisper + BERT intent + Claude + TTSMillions of calls/month
HealthcarePatient intake, symptom triage, appointment schedulingWhisper + medical NER + GPT-4 + TTSThousands/day
EducationLanguage tutoring, interactive learning, Q&AWhisper + LLM + ElevenLabsHundreds of concurrent users
AccessibilityVoice control for disabled users, screen reader enhancementWhisper + command classifier + TTSAlways-on per user
RetailVoice shopping, order status, product recommendationsWhisper + intent + LLM + catalog APIThousands/day

What's Next

In the next lesson, we explore Content Creation Pipelines — combining LLMs for writing, image generation models for visuals, and TTS for voiceovers to build automated content production systems for marketing, education, and media.