Speech-to-Text Pipeline Intermediate

The ASR (Automatic Speech Recognition) stage converts raw audio into text. Getting this right is critical — every downstream error (wrong intent, bad response) usually traces back to a bad transcript. This lesson covers production ASR providers, streaming transcription, noise handling, speaker diarization, and custom vocabulary tuning with working code.

ASR Provider Comparison

ProviderModelStreamingLatencyWER (Clean)Cost per HourBest For
DeepgramNova-2Yes~200ms~8%$0.0043Real-time voice apps, lowest latency
OpenAIWhisper large-v3No (batch)~2-5s~5%$0.006Best accuracy, 100+ languages
GoogleChirp 2Yes~300ms~7%$0.016Multi-language, enterprise compliance
AWSTranscribeYes~400ms~9%$0.024AWS-native workloads
AzureSpeech-to-Text v3.2Yes~350ms~7%$0.016Enterprise, custom models, Teams integration
Self-hosted Whisperwhisper-large-v3-turboPartial~500ms*~5%GPU cost onlyData privacy, no external API calls

Streaming ASR with Deepgram (Production Code)

import asyncio
import json
import websockets
from typing import AsyncIterator, Optional
from dataclasses import dataclass

@dataclass
class TranscriptResult:
    text: str
    is_final: bool
    confidence: float
    start_time: float
    end_time: float
    words: list  # Word-level timestamps
    speaker: Optional[int] = None  # Speaker ID if diarization enabled

class DeepgramStreamingASR:
    """Production streaming ASR client using Deepgram Nova-2."""

    WS_URL = "wss://api.deepgram.com/v1/listen"

    def __init__(self, api_key: str, sample_rate: int = 16000,
                 language: str = "en-US", model: str = "nova-2"):
        self.api_key = api_key
        self.sample_rate = sample_rate
        self.language = language
        self.model = model
        self.ws = None

    def _build_url(self, diarize: bool = False,
                   custom_vocabulary: list = None) -> str:
        """Build WebSocket URL with query parameters."""
        params = [
            f"model={self.model}",
            f"language={self.language}",
            f"sample_rate={self.sample_rate}",
            "encoding=linear16",
            "channels=1",
            "interim_results=true",      # Get partial transcripts
            "utterance_end_ms=1000",      # Silence = end of utterance
            "vad_events=true",            # Voice activity detection events
            "smart_format=true",          # Auto-punctuation and formatting
            "endpointing=300",            # 300ms silence = end of speech
        ]
        if diarize:
            params.append("diarize=true")
        if custom_vocabulary:
            # Boost recognition of specific terms
            keywords = "&".join([f"keywords={w}" for w in custom_vocabulary])
            params.append(keywords)
        return f"{self.WS_URL}?{'&'.join(params)}"

    async def connect(self, diarize: bool = False,
                      custom_vocabulary: list = None):
        """Establish WebSocket connection to Deepgram."""
        url = self._build_url(diarize, custom_vocabulary)
        self.ws = await websockets.connect(
            url,
            extra_headers={"Authorization": f"Token {self.api_key}"},
            ping_interval=20,
            ping_timeout=10
        )

    async def stream_and_transcribe(
        self, audio_source: AsyncIterator[bytes]
    ) -> AsyncIterator[TranscriptResult]:
        """Send audio chunks and yield transcript results.

        This is the core streaming loop used in production voice pipelines.
        Audio chunks arrive every 100ms (1600 bytes at 16kHz 16-bit mono).
        """
        if not self.ws:
            raise RuntimeError("Call connect() first")

        # Start sender task (sends audio chunks to Deepgram)
        async def send_audio():
            async for chunk in audio_source:
                await self.ws.send(chunk)
            # Signal end of audio
            await self.ws.send(json.dumps({"type": "CloseStream"}))

        sender = asyncio.create_task(send_audio())

        # Receive transcripts as they arrive
        try:
            async for message in self.ws:
                data = json.loads(message)

                if data.get("type") == "Results":
                    channel = data["channel"]
                    alt = channel["alternatives"][0]

                    if alt["transcript"]:
                        words = [
                            {
                                "word": w["word"],
                                "start": w["start"],
                                "end": w["end"],
                                "confidence": w["confidence"],
                                "speaker": w.get("speaker")
                            }
                            for w in alt.get("words", [])
                        ]

                        yield TranscriptResult(
                            text=alt["transcript"],
                            is_final=data["is_final"],
                            confidence=alt["confidence"],
                            start_time=data["start"],
                            end_time=data["start"] + data["duration"],
                            words=words,
                            speaker=words[0].get("speaker") if words else None
                        )

                elif data.get("type") == "UtteranceEnd":
                    # Silence detected - good point to trigger NLU
                    pass

        finally:
            sender.cancel()
            await self.ws.close()


# --- Usage Example ---
async def voice_assistant_loop():
    """Main loop for a streaming voice assistant."""
    asr = DeepgramStreamingASR(
        api_key="YOUR_DEEPGRAM_KEY",
        sample_rate=16000,
        language="en-US"
    )
    await asr.connect(
        diarize=False,
        custom_vocabulary=["Lilly Tech", "ASR", "Deepgram"]
    )

    # audio_stream would come from microphone/WebRTC/phone
    audio_stream = get_audio_stream()  # Your audio source

    current_utterance = ""
    async for result in asr.stream_and_transcribe(audio_stream):
        if not result.is_final:
            # Show partial transcript (like live captions)
            print(f"  [partial] {result.text}", end="\r")
        else:
            # Final transcript for this utterance
            current_utterance = result.text
            print(f"  [final] {result.text} (confidence: {result.confidence:.2f})")

            # Now send to NLU/Dialog pipeline
            response = await process_voice_turn(current_utterance)
            await speak_response(response)

Self-Hosted Whisper for Privacy-Sensitive Deployments

import torch
import numpy as np
from faster_whisper import WhisperModel
from typing import Optional
import time

class SelfHostedWhisperASR:
    """Self-hosted Whisper ASR for when audio cannot leave your infrastructure.

    Uses faster-whisper (CTranslate2) for 4x speedup over original Whisper.
    Runs on GPU or CPU. GPU recommended for real-time use.
    """

    def __init__(self, model_size: str = "large-v3",
                 device: str = "cuda", compute_type: str = "float16"):
        self.model = WhisperModel(
            model_size,
            device=device,
            compute_type=compute_type  # float16 on GPU, int8 on CPU
        )

    def transcribe(self, audio: np.ndarray, language: str = "en",
                   beam_size: int = 5) -> dict:
        """Transcribe audio array to text with word timestamps.

        Args:
            audio: NumPy float32 array, 16kHz mono
            language: ISO language code
            beam_size: Higher = more accurate but slower (1-10)
        """
        t0 = time.monotonic()
        segments, info = self.model.transcribe(
            audio,
            language=language,
            beam_size=beam_size,
            word_timestamps=True,
            vad_filter=True,              # Skip silence
            vad_parameters={
                "min_silence_duration_ms": 500,
                "speech_pad_ms": 200
            }
        )

        words = []
        full_text = []
        for segment in segments:
            full_text.append(segment.text)
            for word in segment.words:
                words.append({
                    "word": word.word.strip(),
                    "start": word.start,
                    "end": word.end,
                    "probability": word.probability
                })

        latency_ms = (time.monotonic() - t0) * 1000
        return {
            "text": " ".join(full_text).strip(),
            "language": info.language,
            "language_probability": info.language_probability,
            "words": words,
            "latency_ms": latency_ms,
            "audio_duration_s": len(audio) / 16000,
            "rtf": latency_ms / (len(audio) / 16000 * 1000)  # Real-time factor
        }

    def transcribe_file(self, file_path: str, **kwargs) -> dict:
        """Convenience method to transcribe from a file."""
        return self.transcribe(file_path, **kwargs)


# --- GPU sizing guide ---
# Model Size  | VRAM   | RTF (GPU)  | RTF (CPU)  | Accuracy (WER)
# tiny        | 1 GB   | 0.02x      | 0.3x       | ~15%
# base        | 1 GB   | 0.03x      | 0.5x       | ~12%
# small       | 2 GB   | 0.05x      | 1.0x       | ~10%
# medium      | 5 GB   | 0.1x       | 2.5x       | ~7%
# large-v3    | 10 GB  | 0.15x      | 5.0x       | ~5%
#
# RTF = Real-Time Factor. 0.1x means 10 seconds of audio takes 1 second to process.
# For real-time use, you need RTF < 0.3x (streaming with buffering).

Noise Handling and Audio Preprocessing

import numpy as np
from scipy import signal

class AudioPreprocessor:
    """Production audio preprocessing for voice AI pipelines.

    Applied BEFORE sending audio to ASR to improve transcription quality.
    """

    def __init__(self, sample_rate: int = 16000):
        self.sample_rate = sample_rate

    def normalize_volume(self, audio: np.ndarray,
                         target_db: float = -20.0) -> np.ndarray:
        """Normalize audio volume to target dB level.
        Prevents too-quiet or too-loud audio from degrading ASR accuracy."""
        rms = np.sqrt(np.mean(audio ** 2))
        if rms == 0:
            return audio
        current_db = 20 * np.log10(rms)
        gain = 10 ** ((target_db - current_db) / 20)
        normalized = audio * gain
        # Clip to prevent distortion
        return np.clip(normalized, -1.0, 1.0)

    def apply_highpass_filter(self, audio: np.ndarray,
                              cutoff_hz: int = 80) -> np.ndarray:
        """Remove low-frequency noise (HVAC hum, traffic rumble).
        80Hz highpass is standard for voice - human speech rarely goes below 85Hz."""
        nyquist = self.sample_rate / 2
        b, a = signal.butter(4, cutoff_hz / nyquist, btype='high')
        return signal.filtfilt(b, a, audio)

    def detect_speech_segments(self, audio: np.ndarray,
                                frame_ms: int = 30,
                                threshold_db: float = -40.0) -> list:
        """Simple energy-based Voice Activity Detection (VAD).
        Returns list of (start_sample, end_sample) tuples for speech segments."""
        frame_size = int(self.sample_rate * frame_ms / 1000)
        segments = []
        in_speech = False
        start = 0

        for i in range(0, len(audio) - frame_size, frame_size):
            frame = audio[i:i + frame_size]
            rms = np.sqrt(np.mean(frame ** 2))
            db = 20 * np.log10(rms + 1e-10)

            if db > threshold_db and not in_speech:
                start = i
                in_speech = True
            elif db <= threshold_db and in_speech:
                # Add padding around speech segments
                pad = int(self.sample_rate * 0.2)  # 200ms padding
                segments.append((max(0, start - pad), min(len(audio), i + pad)))
                in_speech = False

        if in_speech:
            segments.append((max(0, start - int(self.sample_rate * 0.2)), len(audio)))

        return segments

    def preprocess(self, audio: np.ndarray) -> np.ndarray:
        """Full preprocessing pipeline for voice AI input."""
        audio = self.apply_highpass_filter(audio)
        audio = self.normalize_volume(audio)
        return audio

Speaker Diarization

Speaker diarization answers "who spoke when" — critical for multi-speaker scenarios like meetings, call centers, and interview transcription.

# Speaker diarization with Deepgram (simplest production approach)
async def transcribe_with_speakers(audio_file: str, api_key: str) -> list:
    """Transcribe audio with speaker labels using Deepgram."""
    from deepgram import Deepgram
    import aiofiles

    dg = Deepgram(api_key)

    async with aiofiles.open(audio_file, "rb") as f:
        audio_data = await f.read()

    response = await dg.transcription.prerecorded(
        {"buffer": audio_data, "mimetype": "audio/wav"},
        {
            "model": "nova-2",
            "diarize": True,
            "utterances": True,
            "smart_format": True,
            "punctuate": True
        }
    )

    # Parse diarized results
    turns = []
    for utterance in response["results"]["utterances"]:
        turns.append({
            "speaker": utterance["speaker"],
            "text": utterance["transcript"],
            "start": utterance["start"],
            "end": utterance["end"],
            "confidence": utterance["confidence"]
        })

    return turns

# Output format:
# [
#   {"speaker": 0, "text": "Hi, I'd like to check my account balance.", "start": 0.5, "end": 2.8},
#   {"speaker": 1, "text": "Sure, can I have your account number?", "start": 3.1, "end": 4.9},
#   {"speaker": 0, "text": "It's 4 5 6 7 8 9.", "start": 5.2, "end": 6.8},
# ]

Custom Vocabulary and Domain Adaptation

# Custom vocabulary boosts recognition of domain-specific terms
# This is critical for medical, legal, financial, and technical voice apps

class CustomVocabularyManager:
    """Manage custom vocabulary for ASR accuracy improvement."""

    def __init__(self):
        self.vocabularies = {}

    def create_vocabulary(self, domain: str, terms: list,
                          boost_weight: float = 1.5):
        """Create a custom vocabulary for a specific domain.

        Args:
            domain: e.g., "medical", "financial", "tech_support"
            terms: List of terms that should be recognized accurately
            boost_weight: How much to boost these terms (1.0 = normal, 2.0 = strong)
        """
        self.vocabularies[domain] = {
            "terms": terms,
            "boost_weight": boost_weight
        }

    def get_deepgram_keywords(self, domain: str) -> list:
        """Format vocabulary for Deepgram's keywords parameter."""
        vocab = self.vocabularies.get(domain, {})
        weight = vocab.get("boost_weight", 1.0)
        return [f"{term}:{weight}" for term in vocab.get("terms", [])]

    def get_google_phrases(self, domain: str) -> dict:
        """Format vocabulary for Google STT speech_contexts."""
        vocab = self.vocabularies.get(domain, {})
        return {
            "phrases": vocab.get("terms", []),
            "boost": vocab.get("boost_weight", 1.5) * 10  # Google uses 1-20 scale
        }

# Domain-specific vocabulary examples
vocab_manager = CustomVocabularyManager()

vocab_manager.create_vocabulary("healthcare", [
    "metformin", "lisinopril", "atorvastatin", "hypertension",
    "hemoglobin A1C", "CBC", "MRI", "CT scan", "systolic",
    "diastolic", "milligrams", "prescription refill"
], boost_weight=2.0)

vocab_manager.create_vocabulary("financial_services", [
    "FICO score", "APR", "amortization", "escrow",
    "Roth IRA", "401k", "CD rate", "wire transfer",
    "ACH payment", "overdraft protection", "FDIC"
], boost_weight=1.8)

vocab_manager.create_vocabulary("tech_support", [
    "Kubernetes", "Docker", "API gateway", "load balancer",
    "SSL certificate", "DNS", "CIDR block", "VPC",
    "Redis cluster", "PostgreSQL", "microservice"
], boost_weight=1.5)
Production Tip: Always log ASR confidence scores alongside transcripts. Set up alerts when average confidence drops below 0.85 — it usually means either audio quality degraded (new phone system, different microphone) or users are using vocabulary your model hasn't seen. Both are fixable, but only if you detect them early.