Skip to main content
Normalized for Mintlify from knowledge-base/aiconnected-apps-and-modules/modules/aiConnected-voice/voice-pipeline-architecture.mdx.

Voice by aiConnected — Voice Pipeline Architecture \

Document Information \

FieldValue
Document IDARCH-003
Version1.0
Last Updated2026-01-16
StatusDraft
OwnerEngineering
DependenciesARCH-001, ARCH-002

Table of Contents \

Voice by aiConnected — Voice Pipeline Architecture Document Information Table of Contents 1. Introduction 1.1 Purpose 1.2 Scope 1.3 Design Goals 1.4 Key Terminology 2. Pipeline Overview 2.1 High-Level Architecture 2.2 Component Summary 2.3 Data Flow Summary 2.4 Pipeline States 3. Latency Budget 3.1 Target Latency 3.2 Latency Budget Breakdown 3.3 Latency Optimization Strategies 3.4 Latency Monitoring Points 3.5 Latency Alerts 4. Voice Activity Detection 4.1 VAD Overview 4.2 Silero VAD Integration 4.3 Endpointing Strategies 4.4 VAD Configuration by Use Case 5. Speech-to-Text Integration 5.1 Deepgram Nova-2 5.2 Deepgram Client 5.3 Transcript Processing 5.4 STT Fallback Strategy 6. Context Assembly 6.1 Context Overview 6.2 Context Assembly Pipeline 6.3 Context Manager Implementation 6.4 System Prompt Templates 7. LLM Integration 7.1 Claude API Integration 7.2 Response Routing 7.3 LLM Fallback Strategy 8. Text-to-Speech Integration 8.1 Chatterbox TTS 8.2 Chatterbox Client 8.3 TTS Fallback Strategy 8.4 Voice Configuration 9. Streaming Architecture 9.1 End-to-End Streaming 9.2 Pipeline Orchestrator 9.3 Audio Buffer Management 10. Interruption Handling 10.1 Interruption Types 10.2 Interruption Detection 10.3 Graceful Interruption Flow 10.4 Backchannel Recognition 11. Tool Calling in Voice Context 11.1 Voice-Appropriate Tools 11.2 Tool Executor 11.3 Webhook Integration with n8n 12. Conversation State Management 12.1 State Structure 12.2 State Manager 13. Error Handling and Fallbacks 13.1 Error Categories 13.2 Error Handler 13.3 Fallback Hierarchy 14. Performance Optimization 14.1 Optimization Techniques 14.2 Connection Management 14.3 Phrase Caching 15. Monitoring and Debugging 15.1 Metrics 15.2 Logging 15.3 Debug Tools Appendix A: Configuration Reference Appendix B: Sequence Diagrams B.1 Normal Turn Flow Document History

  1. Introduction \

1.1 Purpose \

This document specifies the voice pipeline architecture for Voice by aiConnected. The voice pipeline is the core processing chain that transforms caller speech into AI responses and back to synthesized speech. This is where the “magic” happens—creating the illusion of a natural, responsive AI conversation partner. The pipeline must achieve sub-second response times to feel natural while maintaining conversation coherence, handling interruptions gracefully, and executing tool calls seamlessly.

1.2 Scope \

This document covers:
  • Complete audio processing pipeline from microphone to speaker
  • Component-level architecture for VAD, STT, LLM, and TTS
  • Streaming strategies for minimal latency
  • Interruption detection and handling
  • Tool calling integration in voice context
  • Conversation state management
  • Error handling and fallback strategies
This document does not cover:
  • Telephony integration (see ARCH-002)
  • LiveKit room management (see ARCH-004)
  • Business logic for specific use cases

1.3 Design Goals \

GoalTargetPriority
End-to-end latency< 1000msCritical
Time to first byte (TTS)< 500msCritical
Interruption response< 200msHigh
Transcription accuracy> 95%High
Natural conversation flowSubjectiveHigh
Graceful degradation100% uptimeMedium

1.4 Key Terminology \

TermDefinition
VADVoice Activity Detection - detecting when someone is speaking
STTSpeech-to-Text - converting audio to text (transcription)
LLMLarge Language Model - generating conversational responses
TTSText-to-Speech - converting text to audio (synthesis)
TTFBTime to First Byte - latency until first audio chunk
Barge-inUser interrupting the AI while it’s speaking
TurnOne party’s complete utterance in conversation
EndpointingDetecting when a speaker has finished their turn

  1. Pipeline Overview \

2.1 High-Level Architecture \

┌─────────────────────────────────────────────────────────────────────────────┐
│                         VOICE PIPELINE ARCHITECTURE                         │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   ┌─────────────────────────────────────────────────────────────────────┐   │
│   │                         AUDIO INPUT STAGE                           │   │
│   │                                                                     │   │
│   │   Caller Audio ──▶ [LiveKit] ──▶ [Audio Buffer] ──▶ [VAD]          │   │
│   │                                        │              │             │   │
│   │                                        │              ▼             │   │
│   │                                        │         Voice Active?      │   │
│   │                                        │         ┌────┴────┐        │   │
│   │                                        │        Yes        No       │   │
│   │                                        │         │          │       │   │
│   │                                        ▼         ▼          ▼       │   │
│   │                                   [STT Engine]   │     (silence)    │   │
│   │                                        │         │                  │   │
│   └────────────────────────────────────────┼─────────┼──────────────────┘   │
│                                            │         │                      │
│   ┌────────────────────────────────────────▼─────────▼──────────────────┐   │
│   │                       PROCESSING STAGE                              │   │
│   │                                                                     │   │
│   │   Partial          Final              Endpointing                   │   │
│   │   Transcript ───▶  Transcript ───────▶ Detected?                   │   │
│   │       │                │               ┌────┴────┐                  │   │
│   │       │                │              Yes        No                 │   │
│   │       │                ▼               │          │                 │   │
│   │       │         [Context Assembly]     │     (continue              │   │
│   │       │                │               │      buffering)            │   │
│   │       ▼                ▼               ▼                            │   │
│   │   (display)     [LLM Processing] ◀────┘                            │   │
│   │                        │                                            │   │
│   │                        │ streaming tokens                           │   │
│   │                        ▼                                            │   │
│   │                 [Response Router]                                   │   │
│   │                   ┌────┴────┐                                       │   │
│   │              Speech      Tool Call                                  │   │
│   │                 │            │                                      │   │
│   └─────────────────┼────────────┼──────────────────────────────────────┘   │
│                     │            │                                          │
│   ┌─────────────────▼────────────▼──────────────────────────────────────┐   │
│   │                       OUTPUT STAGE                                  │   │
│   │                                                                     │   │
│   │   [Sentence Buffer] ──▶ [TTS Engine] ──▶ [Audio Queue] ──▶ LiveKit │   │
│   │         │                     │                │                    │   │
│   │         │                     │                │                    │   │
│   │         ▼                     ▼                ▼                    │   │
│   │   Natural break         Audio chunks      Playback to              │   │
│   │   detection             generated         caller                    │   │
│   │                                                                     │   │
│   │                                                                     │   │
│   │   [Tool Executor] ──▶ [Result Formatter] ──▶ (back to LLM)         │   │
│   │                                                                     │   │
│   └─────────────────────────────────────────────────────────────────────┘   │
│                                                                             │
│   ┌─────────────────────────────────────────────────────────────────────┐   │
│   │                      INTERRUPTION HANDLING                          │   │
│   │                                                                     │   │
│   │   VAD during playback ──▶ [Interrupt Detector] ──▶ Stop TTS        │   │
│   │                                   │                    │            │   │
│   │                                   ▼                    ▼            │   │
│   │                           Threshold met?          Clear queue       │   │
│   │                                   │                    │            │   │
│   │                                   ▼                    ▼            │   │
│   │                           Process new input      Resume listening   │   │
│   │                                                                     │   │
│   └─────────────────────────────────────────────────────────────────────┘   │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

2.2 Component Summary \

ComponentTechnologyLocationPurpose
Audio TransportLiveKitCloudReal-time audio streaming
VADSilero VADAgent ServiceDetect speech activity
STTDeepgram Nova-2External APITranscribe speech
LLMClaude SonnetExternal APIGenerate responses
TTSChatterboxRunPod GPUSynthesize speech
State ManagerRedisPlatformTrack conversation state
Tool Executorn8n / InternalPlatformExecute function calls

2.3 Data Flow Summary \

┌─────────────────────────────────────────────────────────────────────────────┐
│                           DATA FLOW SUMMARY                                 │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   STAGE 1: Audio Capture                                                    │
│   ───────────────────────                                                   │
│   Format: PCM 16-bit, 16kHz mono (STT) / 48kHz stereo (transport)          │
│   Chunk size: 20ms frames (320 samples at 16kHz)                           │
│   Buffer: 100ms sliding window for VAD                                      │
│                                                                             │
│   STAGE 2: Speech Detection                                                 │
│   ─────────────────────────                                                 │
│   VAD checks each 20ms frame                                                │
│   Speech threshold: 0.5 probability                                         │
│   Minimum speech duration: 250ms                                            │
│   Silence padding: 300ms before endpoint                                    │
│                                                                             │
│   STAGE 3: Transcription                                                    │
│   ─────────────────────────                                                 │
│   Streaming WebSocket to Deepgram                                           │
│   Interim results every ~100ms                                              │
│   Final transcript on endpoint                                              │
│   Output: JSON with text, confidence, timestamps                            │
│                                                                             │
│   STAGE 4: Context Assembly                                                 │
│   ─────────────────────────                                                 │
│   Retrieve conversation history (last 10 turns)                             │
│   Add system prompt + knowledge base context                                │
│   Include tool definitions                                                  │
│   Format for Claude API                                                     │
│                                                                             │
│   STAGE 5: LLM Processing                                                   │
│   ──────────────────────                                                    │
│   Streaming API call to Claude                                              │
│   Tokens arrive in ~50ms chunks                                             │
│   Parse for tool calls vs speech                                            │
│   Output: Text stream or tool call JSON                                     │
│                                                                             │
│   STAGE 6: Speech Synthesis                                                 │
│   ────────────────────────                                                  │
│   Buffer text until sentence boundary                                       │
│   Send to Chatterbox TTS                                                    │
│   Stream audio chunks back                                                  │
│   Output: PCM audio at 24kHz                                                │
│                                                                             │
│   STAGE 7: Audio Playback                                                   │
│   ───────────────────────                                                   │
│   Resample to 48kHz if needed                                               │
│   Queue chunks for smooth playback                                          │
│   Publish to LiveKit track                                                  │
│   Monitor for interruptions                                                 │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

2.4 Pipeline States \

┌─────────────────────────────────────────────────────────────────────────────┐
│                         PIPELINE STATE MACHINE                              │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│                              ┌─────────┐                                    │
│                              │  IDLE   │                                    │
│                              └────┬────┘                                    │
│                                   │                                         │
│                          voice detected                                     │
│                                   │                                         │
│                                   ▼                                         │
│                           ┌───────────────┐                                 │
│                           │   LISTENING   │◀──────────────┐                │
│                           └───────┬───────┘               │                │
│                                   │                       │                │
│                          speech ended                     │                │
│                                   │                       │                │
│                                   ▼                       │                │
│                          ┌───────────────┐                │                │
│                          │  PROCESSING   │                │                │
│                          └───────┬───────┘                │                │
│                                  │                        │                │
│                    ┌─────────────┼─────────────┐          │                │
│                    │             │             │          │                │
│               tool call     response      error          │                │
│                    │         ready          │             │                │
│                    ▼             │           │             │                │
│           ┌───────────────┐     │           │             │                │
│           │EXECUTING_TOOL │     │           │             │                │
│           └───────┬───────┘     │           │             │                │
│                   │             │           │             │                │
│              result ready       │           │             │                │
│                   │             │           │             │                │
│                   └──────┬──────┘           │             │                │
│                          │                  │             │                │
│                          ▼                  │             │                │
│                   ┌───────────────┐         │             │                │
│                   │   SPEAKING    │─────────┼─────────────┘                │
│                   └───────┬───────┘         │       (on completion         │
│                           │                 │        or interruption)      │
│                    interrupted              │                              │
│                           │                 │                              │
│                           ▼                 │                              │
│                  ┌─────────────────┐        │                              │
│                  │  INTERRUPTED   │────────┘                               │
│                  └─────────────────┘   (resume listening)                  │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

  1. Latency Budget \

3.1 Target Latency \

Human conversation has natural response gaps. Studies show:
  • 200-300ms: Feels instantaneous, slightly unnatural
  • 500-700ms: Natural conversation pace
  • 800-1000ms: Acceptable, feels thoughtful
  • >1200ms: Noticeably slow, awkward
Our target: < 1000ms end-to-end with < 500ms time-to-first-byte.

3.2 Latency Budget Breakdown \

┌─────────────────────────────────────────────────────────────────────────────┐
│                         LATENCY BUDGET BREAKDOWN                            │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   COMPONENT                    TARGET      P50       P95       P99          │
│   ─────────────────────────────────────────────────────────────────────     │
│                                                                             │
│   1. Endpointing delay         200ms      150ms     250ms     350ms        │
│      (silence detection)                                                    │
│                                                                             │
│   2. STT finalization          100ms       80ms     150ms     200ms        │
│      (final transcript)                                                     │
│                                                                             │
│   3. Context assembly           20ms       15ms      30ms      50ms        │
│      (build prompt)                                                         │
│                                                                             │
│   4. Network to LLM             30ms       20ms      50ms      80ms        │
│      (API request)                                                          │
│                                                                             │
│   5. LLM TTFB                  200ms      150ms     300ms     500ms        │
│      (first token)                                                          │
│                                                                             │
│   6. Sentence accumulation     100ms       80ms     150ms     200ms        │
│      (buffer until boundary)                                                │
│                                                                             │
│   7. Network to TTS             20ms       15ms      30ms      50ms        │
│      (API request)                                                          │
│                                                                             │
│   8. TTS TTFB                  150ms      100ms     200ms     300ms        │
│      (first audio chunk)                                                    │
│                                                                             │
│   9. Audio transport            30ms       20ms      40ms      60ms        │
│      (to caller)                                                            │
│   ─────────────────────────────────────────────────────────────────────     │
│                                                                             │
│   TOTAL TTFB                   850ms      630ms    1200ms    1790ms        │
│                                                                             │
│   Note: Components 1-5 are on the critical path to TTFB                    │
│         Components 6-9 add to perceived response time                       │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

3.3 Latency Optimization Strategies \

StrategyImpactImplementation
Streaming STT-200msUse interim results for early processing
Streaming LLM-300msStart TTS before complete response
Sentence-level TTS-200msDon’t wait for full response
Connection pooling-50msReuse HTTP/WebSocket connections
Edge deployment-30msDeploy close to users
Aggressive endpointing-100msShorter silence threshold
Speculative execution-150msStart processing on interim transcript

3.4 Latency Monitoring Points \

# Pipeline latency instrumentation points

LATENCY_CHECKPOINTS = {
    "vad_speech_start": "Voice activity detected",
    "vad_speech_end": "Voice activity ended (endpoint)",
    "stt_interim_first": "First interim transcript",
    "stt_final": "Final transcript received",
    "context_assembled": "Prompt built",
    "llm_request_sent": "LLM API request sent",
    "llm_first_token": "First LLM token received",
    "llm_sentence_complete": "First sentence complete",
    "tts_request_sent": "TTS API request sent",
    "tts_first_chunk": "First audio chunk received",
    "audio_playback_start": "Audio playback began",
}

class LatencyTracker:
    """Track latency through the pipeline."""
    
    def __init__(self, call_id: str, turn_id: str):
        self.call_id = call_id
        self.turn_id = turn_id
        self.checkpoints: dict[str, float] = {}
        self.start_time: Optional[float] = None
    
    def mark(self, checkpoint: str) -> None:
        """Record a checkpoint timestamp."""
        now = time.perf_counter()
        
        if self.start_time is None:
            self.start_time = now
        
        self.checkpoints[checkpoint] = now
        
        # Log checkpoint
        elapsed = (now - self.start_time) * 1000
        logger.debug(
            f"[{self.call_id}:{self.turn_id}] "
            f"{checkpoint}: {elapsed:.1f}ms"
        )
    
    def get_metrics(self) -> dict:
        """Calculate latency metrics."""
        if not self.checkpoints:
            return {}
        
        metrics = {}
        
        # Calculate durations between checkpoints
        if "vad_speech_end" in self.checkpoints and "stt_final" in self.checkpoints:
            metrics["stt_latency"] = (
                self.checkpoints["stt_final"] - 
                self.checkpoints["vad_speech_end"]
            ) * 1000
        
        if "stt_final" in self.checkpoints and "llm_first_token" in self.checkpoints:
            metrics["llm_ttfb"] = (
                self.checkpoints["llm_first_token"] - 
                self.checkpoints["stt_final"]
            ) * 1000
        
        if "llm_sentence_complete" in self.checkpoints and "tts_first_chunk" in self.checkpoints:
            metrics["tts_ttfb"] = (
                self.checkpoints["tts_first_chunk"] - 
                self.checkpoints["llm_sentence_complete"]
            ) * 1000
        
        # End-to-end latency
        if "vad_speech_end" in self.checkpoints and "audio_playback_start" in self.checkpoints:
            metrics["e2e_latency"] = (
                self.checkpoints["audio_playback_start"] - 
                self.checkpoints["vad_speech_end"]
            ) * 1000
        
        return metrics
    
    def emit_metrics(self) -> None:
        """Emit metrics to monitoring system."""
        metrics = self.get_metrics()
        
        for name, value in metrics.items():
            prometheus_histogram(
                f"voice_pipeline_{name}_ms",
                value,
                labels={
                    "call_id": self.call_id,
                }
            )

3.5 Latency Alerts \

MetricWarningCritical
E2E Latency P50> 800ms> 1200ms
E2E Latency P95> 1200ms> 2000ms
LLM TTFB P50> 300ms> 500ms
TTS TTFB P50> 200ms> 400ms
STT Latency P50> 150ms> 300ms

  1. Voice Activity Detection \

4.1 VAD Overview \

Voice Activity Detection is the first processing stage, determining when the caller is speaking. Good VAD is critical for:
  • Knowing when to start transcription
  • Detecting end of utterance (endpointing)
  • Detecting interruptions during AI speech

4.2 Silero VAD Integration \

We use Silero VAD, a lightweight neural network model optimized for real-time processing.
# pipeline/vad/silero_vad.py

import torch
import numpy as np
from typing import Optional
from dataclasses import dataclass

@dataclass
class VADConfig:
    """Configuration for Silero VAD."""
    threshold: float = 0.5          # Speech probability threshold
    min_speech_ms: int = 250        # Minimum speech duration
    min_silence_ms: int = 300       # Silence before endpoint
    window_size_ms: int = 100       # Analysis window
    sample_rate: int = 16000        # Expected sample rate
    
class SileroVAD:
    """
    Silero VAD wrapper for real-time voice activity detection.
    
    Processes audio in 20ms frames and maintains state for
    speech detection and endpointing.
    """
    
    def __init__(self, config: VADConfig = None):
        self.config = config or VADConfig()
        
        # Load Silero VAD model
        self.model, self.utils = torch.hub.load(
            repo_or_dir='snakers4/silero-vad',
            model='silero_vad',
            force_reload=False
        )
        
        # State tracking
        self._is_speaking = False
        self._speech_start_time: Optional[float] = None
        self._silence_start_time: Optional[float] = None
        self._speech_frames: int = 0
        self._silence_frames: int = 0
        
        # Frame timing
        self._frame_duration_ms = 20  # Silero expects 20ms frames
        self._samples_per_frame = int(
            self.config.sample_rate * self._frame_duration_ms / 1000
        )
    
    def reset(self) -> None:
        """Reset VAD state for new utterance."""
        self.model.reset_states()
        self._is_speaking = False
        self._speech_start_time = None
        self._silence_start_time = None
        self._speech_frames = 0
        self._silence_frames = 0
    
    def process_frame(
        self,
        audio_frame: np.ndarray,
        timestamp: float
    ) -> dict:
        """
        Process a single audio frame.
        
        Args:
            audio_frame: PCM audio samples (16-bit, 16kHz)
            timestamp: Frame timestamp in seconds
        
        Returns:
            Dict with speech detection results
        """
        # Ensure correct frame size
        if len(audio_frame) != self._samples_per_frame:
            raise ValueError(
                f"Expected {self._samples_per_frame} samples, "
                f"got {len(audio_frame)}"
            )
        
        # Convert to float32 tensor
        audio_tensor = torch.from_numpy(audio_frame).float()
        audio_tensor = audio_tensor / 32768.0  # Normalize 16-bit
        
        # Get speech probability
        speech_prob = self.model(
            audio_tensor,
            self.config.sample_rate
        ).item()
        
        # Determine if this frame contains speech
        is_speech = speech_prob >= self.config.threshold
        
        # Update state machine
        result = self._update_state(is_speech, timestamp)
        result["speech_probability"] = speech_prob
        result["is_speech_frame"] = is_speech
        
        return result
    
    def _update_state(
        self,
        is_speech: bool,
        timestamp: float
    ) -> dict:
        """Update VAD state machine."""
        result = {
            "event": None,
            "is_speaking": self._is_speaking,
        }
        
        min_speech_frames = self.config.min_speech_ms // self._frame_duration_ms
        min_silence_frames = self.config.min_silence_ms // self._frame_duration_ms
        
        if is_speech:
            self._silence_frames = 0
            self._speech_frames += 1
            
            # Speech start detection
            if not self._is_speaking and self._speech_frames >= min_speech_frames:
                self._is_speaking = True
                self._speech_start_time = timestamp - (
                    self._speech_frames * self._frame_duration_ms / 1000
                )
                result["event"] = "speech_start"
                result["speech_start_time"] = self._speech_start_time
        else:
            self._speech_frames = 0
            
            if self._is_speaking:
                self._silence_frames += 1
                
                # Endpoint detection
                if self._silence_frames >= min_silence_frames:
                    self._is_speaking = False
                    speech_duration = timestamp - self._speech_start_time
                    result["event"] = "speech_end"
                    result["speech_duration"] = speech_duration
                    self._speech_start_time = None
        
        result["is_speaking"] = self._is_speaking
        return result
    
    @property
    def is_speaking(self) -> bool:
        """Current speech state."""
        return self._is_speaking


class VADProcessor:
    """
    High-level VAD processor with buffering and event emission.
    """
    
    def __init__(
        self,
        config: VADConfig = None,
        on_speech_start: callable = None,
        on_speech_end: callable = None
    ):
        self.vad = SileroVAD(config)
        self.on_speech_start = on_speech_start
        self.on_speech_end = on_speech_end
        
        self._audio_buffer = []
        self._frame_size = self.vad._samples_per_frame
    
    async def process_audio(
        self,
        audio_chunk: bytes,
        timestamp: float
    ) -> None:
        """
        Process incoming audio chunk.
        
        Audio may arrive in varying chunk sizes; this method
        buffers and processes in correct frame sizes.
        """
        # Convert bytes to numpy array
        samples = np.frombuffer(audio_chunk, dtype=np.int16)
        
        # Add to buffer
        self._audio_buffer.extend(samples)
        
        # Process complete frames
        while len(self._audio_buffer) >= self._frame_size:
            frame = np.array(self._audio_buffer[:self._frame_size])
            self._audio_buffer = self._audio_buffer[self._frame_size:]
            
            result = self.vad.process_frame(frame, timestamp)
            
            # Emit events
            if result["event"] == "speech_start" and self.on_speech_start:
                await self.on_speech_start(result)
            elif result["event"] == "speech_end" and self.on_speech_end:
                await self.on_speech_end(result)

4.3 Endpointing Strategies \

Endpointing determines when a speaker has finished their turn. Multiple strategies can be combined:
# pipeline/vad/endpointing.py

from enum import Enum
from dataclasses import dataclass
from typing import Optional
import re

class EndpointReason(Enum):
    SILENCE = "silence"              # Silence duration exceeded
    PUNCTUATION = "punctuation"      # Sentence-ending punctuation detected
    TURN_TAKING = "turn_taking"      # Linguistic turn-taking cue
    TIMEOUT = "timeout"              # Maximum utterance duration
    FORCED = "forced"                # Manually forced endpoint

@dataclass
class EndpointConfig:
    """Configuration for endpoint detection."""
    silence_threshold_ms: int = 300       # Silence before endpoint
    min_utterance_ms: int = 500           # Minimum utterance length
    max_utterance_ms: int = 30000         # Maximum utterance length
    punctuation_endpoint: bool = True     # Use punctuation as endpoint hint
    aggressive_mode: bool = False         # Faster endpoints for quick responses

class EndpointDetector:
    """
    Multi-strategy endpoint detection.
    
    Combines VAD silence detection with linguistic cues
    from interim transcripts.
    """
    
    def __init__(self, config: EndpointConfig = None):
        self.config = config or EndpointConfig()
        
        # Sentence-ending patterns
        self._sentence_end_pattern = re.compile(
            r'[.!?][\s]*$'
        )
        
        # Turn-taking cue patterns (questions, trailing off)
        self._turn_taking_patterns = [
            re.compile(r'\?[\s]*$'),           # Questions
            re.compile(r'(?:right|okay|yeah|yes|no)[.?]?[\s]*$', re.I),
            re.compile(r'\.{3,}[\s]*$'),       # Ellipsis
        ]
        
        self._utterance_start: Optional[float] = None
        self._last_speech_time: Optional[float] = None
        self._current_transcript: str = ""
    
    def start_utterance(self, timestamp: float) -> None:
        """Mark the start of a new utterance."""
        self._utterance_start = timestamp
        self._last_speech_time = timestamp
        self._current_transcript = ""
    
    def update_transcript(self, transcript: str) -> None:
        """Update with latest transcript."""
        self._current_transcript = transcript
    
    def update_speech(self, timestamp: float) -> None:
        """Update last speech time."""
        self._last_speech_time = timestamp
    
    def check_endpoint(
        self,
        timestamp: float,
        vad_is_speaking: bool
    ) -> Optional[EndpointReason]:
        """
        Check if we should endpoint now.
        
        Returns EndpointReason if endpoint detected, None otherwise.
        """
        if self._utterance_start is None:
            return None
        
        utterance_duration = (timestamp - self._utterance_start) * 1000
        silence_duration = (timestamp - self._last_speech_time) * 1000
        
        # Check maximum duration
        if utterance_duration >= self.config.max_utterance_ms:
            return EndpointReason.TIMEOUT
        
        # Check minimum duration
        if utterance_duration < self.config.min_utterance_ms:
            return None
        
        # VAD-based silence endpoint
        if not vad_is_speaking:
            threshold = self.config.silence_threshold_ms
            
            # Reduce threshold if we have a complete sentence
            if self.config.punctuation_endpoint:
                if self._sentence_end_pattern.search(self._current_transcript):
                    threshold = threshold * 0.7  # 30% faster endpoint
            
            # Reduce threshold in aggressive mode
            if self.config.aggressive_mode:
                threshold = threshold * 0.6
            
            if silence_duration >= threshold:
                return EndpointReason.SILENCE
        
        # Linguistic turn-taking cues (even during speech)
        if self._current_transcript:
            for pattern in self._turn_taking_patterns:
                if pattern.search(self._current_transcript):
                    # Still need some silence
                    if silence_duration >= self.config.silence_threshold_ms * 0.5:
                        return EndpointReason.TURN_TAKING
        
        return None
    
    def force_endpoint(self) -> EndpointReason:
        """Force an immediate endpoint."""
        return EndpointReason.FORCED
    
    def reset(self) -> None:
        """Reset state for next utterance."""
        self._utterance_start = None
        self._last_speech_time = None
        self._current_transcript = ""

4.4 VAD Configuration by Use Case \

Use CaseSilence ThresholdMin SpeechAggressive
Customer Service400ms300msNo
Quick Q&A250ms200msYes
Appointment Booking350ms250msNo
Technical Support500ms400msNo
Survey/IVR300ms200msYes

  1. Speech-to-Text Integration \

5.1 Deepgram Nova-2 \

We use Deepgram Nova-2 for streaming speech-to-text:
  • Accuracy: 95%+ word accuracy
  • Latency: ~100ms interim, ~200ms final
  • Features: Streaming, punctuation, word timestamps
  • Languages: 36+ languages supported

5.2 Deepgram Client \

# pipeline/stt/deepgram_client.py

import asyncio
import json
from typing import AsyncIterator, Optional, Callable
from dataclasses import dataclass
import websockets

@dataclass
class DeepgramConfig:
    """Configuration for Deepgram STT."""
    api_key: str
    model: str = "nova-2"
    language: str = "en-US"
    punctuate: bool = True
    interim_results: bool = True
    utterance_end_ms: int = 1000
    vad_events: bool = True
    smart_format: bool = True
    diarize: bool = False
    sample_rate: int = 16000
    encoding: str = "linear16"
    channels: int = 1

@dataclass 
class TranscriptResult:
    """Transcription result from Deepgram."""
    text: str
    is_final: bool
    confidence: float
    words: list
    speech_final: bool
    start_time: float
    end_time: float

class DeepgramSTT:
    """
    Deepgram streaming STT client.
    
    Maintains a WebSocket connection for real-time transcription
    with interim and final results.
    """
    
    WS_URL = "wss://api.deepgram.com/v1/listen"
    
    def __init__(
        self,
        config: DeepgramConfig,
        on_transcript: Callable[[TranscriptResult], None] = None,
        on_utterance_end: Callable[[], None] = None
    ):
        self.config = config
        self.on_transcript = on_transcript
        self.on_utterance_end = on_utterance_end
        
        self._ws: Optional[websockets.WebSocketClientProtocol] = None
        self._running = False
        self._receive_task: Optional[asyncio.Task] = None
    
    def _build_url(self) -> str:
        """Build WebSocket URL with query parameters."""
        params = {
            "model": self.config.model,
            "language": self.config.language,
            "punctuate": str(self.config.punctuate).lower(),
            "interim_results": str(self.config.interim_results).lower(),
            "utterance_end_ms": str(self.config.utterance_end_ms),
            "vad_events": str(self.config.vad_events).lower(),
            "smart_format": str(self.config.smart_format).lower(),
            "diarize": str(self.config.diarize).lower(),
            "sample_rate": str(self.config.sample_rate),
            "encoding": self.config.encoding,
            "channels": str(self.config.channels),
        }
        
        query_string = "&".join(f"{k}={v}" for k, v in params.items())
        return f"{self.WS_URL}?{query_string}"
    
    async def connect(self) -> None:
        """Establish WebSocket connection to Deepgram."""
        url = self._build_url()
        
        self._ws = await websockets.connect(
            url,
            extra_headers={
                "Authorization": f"Token {self.config.api_key}",
            },
            ping_interval=20,
            ping_timeout=10,
        )
        
        self._running = True
        self._receive_task = asyncio.create_task(self._receive_loop())
        
        logger.info("Deepgram STT connected")
    
    async def disconnect(self) -> None:
        """Close the WebSocket connection."""
        self._running = False
        
        if self._ws:
            # Send close message
            try:
                await self._ws.send(json.dumps({"type": "CloseStream"}))
            except Exception:
                pass
            
            await self._ws.close()
            self._ws = None
        
        if self._receive_task:
            self._receive_task.cancel()
            try:
                await self._receive_task
            except asyncio.CancelledError:
                pass
        
        logger.info("Deepgram STT disconnected")
    
    async def send_audio(self, audio_chunk: bytes) -> None:
        """
        Send audio data to Deepgram.
        
        Args:
            audio_chunk: PCM audio bytes (16-bit, 16kHz)
        """
        if self._ws and self._running:
            try:
                await self._ws.send(audio_chunk)
            except websockets.ConnectionClosed:
                logger.warning("Deepgram connection closed while sending")
    
    async def _receive_loop(self) -> None:
        """Receive and process transcription results."""
        while self._running and self._ws:
            try:
                message = await self._ws.recv()
                data = json.loads(message)
                
                await self._handle_message(data)
                
            except websockets.ConnectionClosed:
                logger.warning("Deepgram connection closed")
                break
            except json.JSONDecodeError as e:
                logger.error(f"Invalid JSON from Deepgram: {e}")
            except Exception as e:
                logger.error(f"Error in Deepgram receive loop: {e}")
    
    async def _handle_message(self, data: dict) -> None:
        """Handle a message from Deepgram."""
        msg_type = data.get("type")
        
        if msg_type == "Results":
            await self._handle_results(data)
        elif msg_type == "UtteranceEnd":
            if self.on_utterance_end:
                await self.on_utterance_end()
        elif msg_type == "SpeechStarted":
            logger.debug("Deepgram: Speech started")
        elif msg_type == "Metadata":
            logger.debug(f"Deepgram metadata: {data}")
        elif msg_type == "Error":
            logger.error(f"Deepgram error: {data}")
    
    async def _handle_results(self, data: dict) -> None:
        """Handle transcription results."""
        channel = data.get("channel", {})
        alternatives = channel.get("alternatives", [])
        
        if not alternatives:
            return
        
        alt = alternatives[0]
        transcript = alt.get("transcript", "")
        
        if not transcript:
            return
        
        result = TranscriptResult(
            text=transcript,
            is_final=data.get("is_final", False),
            confidence=alt.get("confidence", 0.0),
            words=alt.get("words", []),
            speech_final=data.get("speech_final", False),
            start_time=data.get("start", 0.0),
            end_time=data.get("duration", 0.0) + data.get("start", 0.0),
        )
        
        if self.on_transcript:
            await self.on_transcript(result)


class STTManager:
    """
    High-level STT manager with connection lifecycle.
    """
    
    def __init__(self, config: DeepgramConfig):
        self.config = config
        self._client: Optional[DeepgramSTT] = None
        self._transcript_buffer: str = ""
        self._final_transcript: str = ""
    
    async def start_session(
        self,
        on_interim: Callable[[str], None] = None,
        on_final: Callable[[str], None] = None
    ) -> None:
        """Start a new transcription session."""
        
        async def handle_transcript(result: TranscriptResult):
            if result.is_final:
                self._final_transcript += result.text + " "
                self._transcript_buffer = ""
                if on_final:
                    await on_final(self._final_transcript.strip())
            else:
                self._transcript_buffer = result.text
                if on_interim:
                    full_transcript = self._final_transcript + result.text
                    await on_interim(full_transcript.strip())
        
        self._client = DeepgramSTT(
            config=self.config,
            on_transcript=handle_transcript,
        )
        
        await self._client.connect()
    
    async def send_audio(self, audio_chunk: bytes) -> None:
        """Send audio to the STT engine."""
        if self._client:
            await self._client.send_audio(audio_chunk)
    
    async def end_session(self) -> str:
        """End the session and return final transcript."""
        if self._client:
            await self._client.disconnect()
            self._client = None
        
        final = self._final_transcript.strip()
        self._final_transcript = ""
        self._transcript_buffer = ""
        
        return final
    
    def get_current_transcript(self) -> str:
        """Get the current transcript including buffer."""
        return (self._final_transcript + self._transcript_buffer).strip()

5.3 Transcript Processing \

# pipeline/stt/transcript_processor.py

import re
from dataclasses import dataclass
from typing import Optional

@dataclass
class ProcessedTranscript:
    """Processed and normalized transcript."""
    raw_text: str
    normalized_text: str
    detected_intent: Optional[str]
    contains_question: bool
    word_count: int

class TranscriptProcessor:
    """
    Post-process transcripts for LLM consumption.
    """
    
    def __init__(self):
        # Common speech recognition errors to fix
        self._corrections = {
            r'\bum+\b': '',
            r'\buh+\b': '',
            r'\blike,?\s+': '',
            r'\byou know,?\s+': '',
            r'\bI mean,?\s+': '',
            r'\s+': ' ',  # Multiple spaces
        }
        
        # Intent patterns
        self._intent_patterns = {
            "greeting": r'^(?:hi|hello|hey|good\s+(?:morning|afternoon|evening))',
            "goodbye": r'(?:bye|goodbye|see you|talk later|have a good)',
            "help": r'(?:help|assist|support|problem|issue)',
            "transfer": r'(?:speak to|talk to|transfer|human|agent|person)',
            "appointment": r'(?:schedule|book|appointment|meeting|calendar)',
            "cancel": r'(?:cancel|nevermind|forget it|stop)',
        }
    
    def process(self, transcript: str) -> ProcessedTranscript:
        """
        Process a raw transcript.
        
        Args:
            transcript: Raw transcript from STT
        
        Returns:
            Processed transcript with metadata
        """
        # Normalize
        normalized = transcript.strip()
        for pattern, replacement in self._corrections.items():
            normalized = re.sub(pattern, replacement, normalized, flags=re.I)
        normalized = normalized.strip()
        
        # Detect intent
        detected_intent = None
        for intent, pattern in self._intent_patterns.items():
            if re.search(pattern, normalized, re.I):
                detected_intent = intent
                break
        
        # Check for question
        contains_question = bool(re.search(r'\?|^(?:what|where|when|who|why|how|is|are|do|does|can|could|would|will)\b', normalized, re.I))
        
        return ProcessedTranscript(
            raw_text=transcript,
            normalized_text=normalized,
            detected_intent=detected_intent,
            contains_question=contains_question,
            word_count=len(normalized.split()),
        )

5.4 STT Fallback Strategy \

# pipeline/stt/fallback.py

class STTWithFallback:
    """
    STT with automatic fallback to backup provider.
    """
    
    def __init__(
        self,
        primary: DeepgramSTT,
        fallback: "WhisperSTT",  # Local Whisper as fallback
    ):
        self.primary = primary
        self.fallback = fallback
        self._using_fallback = False
        self._primary_failures = 0
        self._max_failures = 3
    
    async def transcribe(self, audio: bytes) -> str:
        """Transcribe with automatic fallback."""
        if self._using_fallback:
            return await self._fallback_transcribe(audio)
        
        try:
            result = await self.primary.transcribe(audio)
            self._primary_failures = 0
            return result
        except Exception as e:
            logger.warning(f"Primary STT failed: {e}")
            self._primary_failures += 1
            
            if self._primary_failures >= self._max_failures:
                logger.warning("Switching to fallback STT")
                self._using_fallback = True
            
            return await self._fallback_transcribe(audio)
    
    async def _fallback_transcribe(self, audio: bytes) -> str:
        """Use fallback transcription."""
        return await self.fallback.transcribe(audio)
    
    async def reset_to_primary(self) -> None:
        """Attempt to switch back to primary."""
        self._using_fallback = False
        self._primary_failures = 0

  1. Context Assembly \

6.1 Context Overview \

Before sending to the LLM, we assemble a complete context including:
  • System prompt with agent personality
  • Knowledge base context
  • Conversation history
  • Tool definitions
  • Current user input

6.2 Context Assembly Pipeline \

┌─────────────────────────────────────────────────────────────────────────────┐
│                        CONTEXT ASSEMBLY PIPELINE                            │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   ┌──────────────┐     ┌──────────────┐     ┌──────────────┐               │
│   │    Agent     │     │  Knowledge   │     │    Tool      │               │
│   │   Config     │     │    Base      │     │ Definitions  │               │
│   └──────┬───────┘     └──────┬───────┘     └──────┬───────┘               │
│          │                    │                    │                        │
│          ▼                    ▼                    ▼                        │
│   ┌─────────────────────────────────────────────────────────────────┐      │
│   │                      SYSTEM PROMPT                               │      │
│   │                                                                  │      │
│   │  • Agent identity and personality                               │      │
│   │  • Business context and rules                                   │      │
│   │  • Knowledge base excerpts (RAG)                                │      │
│   │  • Available tools and when to use them                         │      │
│   │  • Response guidelines (concise, natural)                       │      │
│   │                                                                  │      │
│   └─────────────────────────────────────────────────────────────────┘      │
│                                    │                                        │
│                                    ▼                                        │
│   ┌──────────────┐     ┌─────────────────────────────────────────────┐     │
│   │ Conversation │     │              MESSAGE HISTORY                 │     │
│   │   History    │────▶│                                              │     │
│   │   (Redis)    │     │  [user]: "Hi, I'd like to schedule..."      │     │
│   └──────────────┘     │  [assistant]: "Of course! What day..."      │     │
│                        │  [user]: "How about Tuesday?"                │     │
│                        │  [assistant]: "Tuesday works..."             │     │
│                        │                                              │     │
│                        └─────────────────────────────────────────────┘     │
│                                    │                                        │
│                                    ▼                                        │
│   ┌──────────────┐     ┌─────────────────────────────────────────────┐     │
│   │   Current    │     │              CURRENT INPUT                   │     │
│   │  Transcript  │────▶│                                              │     │
│   │              │     │  [user]: "Actually, can we do 3pm?"          │     │
│   └──────────────┘     │                                              │     │
│                        └─────────────────────────────────────────────┘     │
│                                    │                                        │
│                                    ▼                                        │
│                        ┌─────────────────────────────────────────────┐     │
│                        │           ASSEMBLED CONTEXT                  │     │
│                        │                                              │     │
│                        │  Ready for LLM API call                     │     │
│                        │  Token count: ~2,000-4,000                  │     │
│                        │                                              │     │
│                        └─────────────────────────────────────────────┘     │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

6.3 Context Manager Implementation \

# pipeline/context/context_manager.py

from dataclasses import dataclass
from typing import Optional
import json

@dataclass
class AgentConfig:
    """Agent configuration from database."""
    agent_id: str
    name: str
    personality: str
    business_context: str
    greeting: str
    voice_id: str
    tools: list[dict]
    knowledge_base_id: Optional[str]
    max_turns: int = 50
    response_style: str = "concise"  # concise, detailed, friendly

@dataclass
class AssembledContext:
    """Fully assembled context ready for LLM."""
    system_prompt: str
    messages: list[dict]
    tools: list[dict]
    token_estimate: int

class ContextManager:
    """
    Manages conversation context assembly for LLM calls.
    """
    
    MAX_HISTORY_TURNS = 10
    MAX_CONTEXT_TOKENS = 8000  # Leave room for response
    
    def __init__(
        self,
        redis_client,
        knowledge_base_service,
    ):
        self.redis = redis_client
        self.kb_service = knowledge_base_service
    
    async def assemble_context(
        self,
        call_id: str,
        agent_config: AgentConfig,
        current_input: str,
    ) -> AssembledContext:
        """
        Assemble complete context for LLM call.
        
        Args:
            call_id: Current call ID
            agent_config: Agent configuration
            current_input: Current user transcript
        
        Returns:
            Assembled context ready for API call
        """
        # Build system prompt
        system_prompt = await self._build_system_prompt(
            agent_config,
            current_input,
        )
        
        # Get conversation history
        history = await self._get_conversation_history(call_id)
        
        # Build messages array
        messages = []
        for turn in history[-self.MAX_HISTORY_TURNS:]:
            messages.append({
                "role": turn["role"],
                "content": turn["content"],
            })
        
        # Add current input
        messages.append({
            "role": "user",
            "content": current_input,
        })
        
        # Format tools for Claude
        tools = self._format_tools(agent_config.tools)
        
        # Estimate tokens
        token_estimate = self._estimate_tokens(
            system_prompt,
            messages,
            tools,
        )
        
        # Trim history if needed
        while token_estimate > self.MAX_CONTEXT_TOKENS and len(messages) > 2:
            messages.pop(0)
            token_estimate = self._estimate_tokens(
                system_prompt,
                messages,
                tools,
            )
        
        return AssembledContext(
            system_prompt=system_prompt,
            messages=messages,
            tools=tools,
            token_estimate=token_estimate,
        )
    
    async def _build_system_prompt(
        self,
        config: AgentConfig,
        current_input: str,
    ) -> str:
        """Build the system prompt with all context."""
        
        # Base system prompt
        prompt_parts = [
            f"You are {config.name}, an AI voice assistant.",
            f"\n\n## Personality\n{config.personality}",
            f"\n\n## Business Context\n{config.business_context}",
        ]
        
        # Add knowledge base context if available
        if config.knowledge_base_id:
            kb_context = await self._get_knowledge_context(
                config.knowledge_base_id,
                current_input,
            )
            if kb_context:
                prompt_parts.append(
                    f"\n\n## Relevant Information\n{kb_context}"
                )
        
        # Add response guidelines
        style_guidelines = {
            "concise": """
## Response Guidelines
- Keep responses brief and conversational (1-2 sentences when possible)
- Speak naturally as if on a phone call
- Avoid lists or formatted text - this is a voice conversation
- Ask one question at a time
- Confirm understanding before taking actions
""",
            "detailed": """
## Response Guidelines
- Provide thorough explanations when needed
- Still maintain a conversational tone
- Break complex information into digestible parts
- Offer to elaborate on any points
""",
            "friendly": """
## Response Guidelines  
- Be warm, personable, and empathetic
- Use casual language appropriate for voice
- Show genuine interest in helping
- Add appropriate conversational acknowledgments
""",
        }
        
        prompt_parts.append(
            style_guidelines.get(config.response_style, style_guidelines["concise"])
        )
        
        # Add tool usage instructions if tools are available
        if config.tools:
            prompt_parts.append("""
## Tool Usage
- Use tools when you need to perform actions or look up information
- Always confirm with the user before executing consequential actions
- If a tool call fails, explain the issue and offer alternatives
""")
        
        return "\n".join(prompt_parts)
    
    async def _get_knowledge_context(
        self,
        knowledge_base_id: str,
        query: str,
    ) -> Optional[str]:
        """Retrieve relevant context from knowledge base."""
        try:
            results = await self.kb_service.search(
                knowledge_base_id=knowledge_base_id,
                query=query,
                top_k=3,
            )
            
            if not results:
                return None
            
            context_parts = []
            for result in results:
                context_parts.append(result["content"])
            
            return "\n\n".join(context_parts)
            
        except Exception as e:
            logger.warning(f"Knowledge base lookup failed: {e}")
            return None
    
    async def _get_conversation_history(
        self,
        call_id: str,
    ) -> list[dict]:
        """Get conversation history from Redis."""
        history_key = f"call:{call_id}:history"
        history_json = await self.redis.get(history_key)
        
        if history_json:
            return json.loads(history_json)
        return []
    
    async def add_to_history(
        self,
        call_id: str,
        role: str,
        content: str,
    ) -> None:
        """Add a turn to conversation history."""
        history_key = f"call:{call_id}:history"
        
        history = await self._get_conversation_history(call_id)
        history.append({
            "role": role,
            "content": content,
            "timestamp": time.time(),
        })
        
        # Keep last N turns
        history = history[-self.MAX_HISTORY_TURNS * 2:]
        
        await self.redis.set(
            history_key,
            json.dumps(history),
            ex=3600,  # 1 hour TTL
        )
    
    def _format_tools(self, tools: list[dict]) -> list[dict]:
        """Format tools for Claude API."""
        formatted = []
        
        for tool in tools:
            formatted.append({
                "name": tool["name"],
                "description": tool["description"],
                "input_schema": tool.get("parameters", {}),
            })
        
        return formatted
    
    def _estimate_tokens(
        self,
        system_prompt: str,
        messages: list[dict],
        tools: list[dict],
    ) -> int:
        """Rough token estimation (4 chars per token)."""
        total_chars = len(system_prompt)
        
        for msg in messages:
            total_chars += len(msg.get("content", ""))
        
        for tool in tools:
            total_chars += len(json.dumps(tool))
        
        return total_chars // 4

6.4 System Prompt Templates \

# pipeline/context/prompts.py

VOICE_AGENT_BASE_PROMPT = """You are {agent_name}, an AI voice assistant for {company_name}.

## Your Role
{role_description}

## Communication Style
- Speak naturally and conversationally - you're on a phone call
- Keep responses concise (1-3 sentences typically)
- Use contractions and casual language
- Never use markdown, bullet points, or formatting
- Don't say "I'm an AI" unless directly asked
- Acknowledge what the caller says before responding

## Current Context
- Caller phone number: {caller_phone}
- Time: {current_time}
- Previous calls with this number: {call_history_summary}

{additional_context}
"""

TOOL_USAGE_PROMPT = """
## Available Actions
You can perform these actions using the tools provided:
{tool_descriptions}

When using tools:
1. Only use tools when necessary to help the caller
2. Confirm consequential actions before executing
3. If a tool fails, apologize and offer alternatives
4. Don't mention "tools" or "functions" to the caller
"""

ESCALATION_PROMPT = """
## When to Transfer
Transfer to a human agent if:
- The caller explicitly requests to speak with a person
- You cannot resolve their issue after 2-3 attempts
- The situation involves sensitive matters you cannot handle
- The caller becomes frustrated or upset

To transfer, use the transfer_to_agent tool.
"""

  1. LLM Integration \

7.1 Claude API Integration \

We use Claude Sonnet for response generation via streaming API:
# pipeline/llm/claude_client.py

import anthropic
from typing import AsyncIterator, Optional
from dataclasses import dataclass
import json

@dataclass
class LLMConfig:
    """Configuration for Claude LLM."""
    api_key: str
    model: str = "claude-sonnet-4-20250514"
    max_tokens: int = 1024
    temperature: float = 0.7
    timeout: float = 30.0

@dataclass
class LLMResponse:
    """Streamed response from LLM."""
    text: str
    is_complete: bool
    tool_calls: list[dict]
    stop_reason: Optional[str]
    usage: Optional[dict]

class ClaudeLLM:
    """
    Claude LLM client with streaming support.
    
    Handles streaming responses and tool calling
    in voice conversation context.
    """
    
    def __init__(self, config: LLMConfig):
        self.config = config
        self.client = anthropic.AsyncAnthropic(
            api_key=config.api_key,
            timeout=config.timeout,
        )
    
    async def generate_stream(
        self,
        system_prompt: str,
        messages: list[dict],
        tools: list[dict] = None,
    ) -> AsyncIterator[LLMResponse]:
        """
        Generate a streaming response.
        
        Args:
            system_prompt: System prompt with context
            messages: Conversation history + current input
            tools: Available tools for the agent
        
        Yields:
            LLMResponse objects with incremental text
        """
        request_params = {
            "model": self.config.model,
            "max_tokens": self.config.max_tokens,
            "temperature": self.config.temperature,
            "system": system_prompt,
            "messages": messages,
        }
        
        if tools:
            request_params["tools"] = tools
        
        accumulated_text = ""
        tool_calls = []
        
        async with self.client.messages.stream(**request_params) as stream:
            async for event in stream:
                if event.type == "content_block_delta":
                    if hasattr(event.delta, "text"):
                        accumulated_text += event.delta.text
                        
                        yield LLMResponse(
                            text=accumulated_text,
                            is_complete=False,
                            tool_calls=[],
                            stop_reason=None,
                            usage=None,
                        )
                    
                    elif hasattr(event.delta, "partial_json"):
                        # Tool call being streamed
                        pass
                
                elif event.type == "content_block_stop":
                    pass
                
                elif event.type == "message_delta":
                    stop_reason = event.delta.stop_reason
                    usage = {
                        "output_tokens": event.usage.output_tokens,
                    }
            
            # Get final message for tool calls
            final_message = await stream.get_final_message()
            
            for block in final_message.content:
                if block.type == "tool_use":
                    tool_calls.append({
                        "id": block.id,
                        "name": block.name,
                        "input": block.input,
                    })
            
            yield LLMResponse(
                text=accumulated_text,
                is_complete=True,
                tool_calls=tool_calls,
                stop_reason=final_message.stop_reason,
                usage={
                    "input_tokens": final_message.usage.input_tokens,
                    "output_tokens": final_message.usage.output_tokens,
                },
            )
    
    async def generate_with_tools(
        self,
        system_prompt: str,
        messages: list[dict],
        tools: list[dict],
        tool_results: list[dict] = None,
    ) -> AsyncIterator[LLMResponse]:
        """
        Generate response with tool calling support.
        
        If tool_results are provided, continues conversation
        after tool execution.
        """
        # If we have tool results, add them to messages
        if tool_results:
            for result in tool_results:
                messages.append({
                    "role": "user",
                    "content": [
                        {
                            "type": "tool_result",
                            "tool_use_id": result["tool_use_id"],
                            "content": result["content"],
                        }
                    ],
                })
        
        async for response in self.generate_stream(
            system_prompt=system_prompt,
            messages=messages,
            tools=tools,
        ):
            yield response


class LLMResponseProcessor:
    """
    Process streaming LLM responses for voice output.
    
    Buffers text until natural speech boundaries
    and handles tool calls appropriately.
    """
    
    def __init__(self):
        self._buffer = ""
        self._sentence_endings = re.compile(r'[.!?]\s+')
        self._clause_endings = re.compile(r'[,;:]\s+')
    
    async def process_stream(
        self,
        response_stream: AsyncIterator[LLMResponse],
        on_sentence: callable,
        on_tool_call: callable,
    ) -> None:
        """
        Process a streaming response.
        
        Args:
            response_stream: Stream of LLM responses
            on_sentence: Called with each complete sentence
            on_tool_call: Called when tool calls are detected
        """
        full_text = ""
        last_sent_position = 0
        
        async for response in response_stream:
            full_text = response.text
            
            # Find sentence boundaries in new text
            new_text = full_text[last_sent_position:]
            
            # Look for sentence endings
            matches = list(self._sentence_endings.finditer(new_text))
            
            for match in matches:
                sentence_end = last_sent_position + match.end()
                sentence = full_text[last_sent_position:sentence_end].strip()
                
                if sentence:
                    await on_sentence(sentence)
                
                last_sent_position = sentence_end
            
            # Handle tool calls when response is complete
            if response.is_complete:
                # Send any remaining text
                remaining = full_text[last_sent_position:].strip()
                if remaining:
                    await on_sentence(remaining)
                
                # Process tool calls
                for tool_call in response.tool_calls:
                    await on_tool_call(tool_call)
    
    def extract_speakable_chunk(
        self,
        text: str,
        min_length: int = 10,
    ) -> tuple[str, str]:
        """
        Extract a chunk suitable for TTS.
        
        Returns:
            Tuple of (chunk_to_speak, remaining_text)
        """
        if len(text) < min_length:
            return "", text
        
        # Try to find sentence boundary
        match = self._sentence_endings.search(text)
        if match:
            return text[:match.end()].strip(), text[match.end():]
        
        # Try clause boundary if text is long enough
        if len(text) > 50:
            match = self._clause_endings.search(text)
            if match and match.start() > min_length:
                return text[:match.end()].strip(), text[match.end():]
        
        return "", text

7.2 Response Routing \

Determine whether LLM output should be spoken or is a tool call:
# pipeline/llm/response_router.py

from enum import Enum
from dataclasses import dataclass
from typing import Union

class ResponseType(Enum):
    SPEECH = "speech"
    TOOL_CALL = "tool_call"
    MIXED = "mixed"  # Contains both speech and tool calls

@dataclass
class RoutedResponse:
    """Response after routing decision."""
    response_type: ResponseType
    speech_text: Optional[str]
    tool_calls: list[dict]
    should_wait_for_tool: bool

class ResponseRouter:
    """
    Route LLM responses to appropriate handlers.
    """
    
    def route(self, response: LLMResponse) -> RoutedResponse:
        """
        Determine how to handle the response.
        
        Args:
            response: Complete LLM response
        
        Returns:
            Routing decision with extracted components
        """
        has_text = bool(response.text.strip())
        has_tools = bool(response.tool_calls)
        
        if has_tools and not has_text:
            return RoutedResponse(
                response_type=ResponseType.TOOL_CALL,
                speech_text=None,
                tool_calls=response.tool_calls,
                should_wait_for_tool=True,
            )
        
        elif has_text and not has_tools:
            return RoutedResponse(
                response_type=ResponseType.SPEECH,
                speech_text=response.text,
                tool_calls=[],
                should_wait_for_tool=False,
            )
        
        elif has_text and has_tools:
            # Text before tool call - speak first, then execute
            return RoutedResponse(
                response_type=ResponseType.MIXED,
                speech_text=response.text,
                tool_calls=response.tool_calls,
                should_wait_for_tool=True,
            )
        
        else:
            # Empty response
            return RoutedResponse(
                response_type=ResponseType.SPEECH,
                speech_text="I'm sorry, I didn't catch that. Could you please repeat?",
                tool_calls=[],
                should_wait_for_tool=False,
            )

7.3 LLM Fallback Strategy \

# pipeline/llm/fallback.py

class LLMWithFallback:
    """
    LLM client with automatic fallback.
    
    Falls back to Haiku for faster responses if Sonnet is slow,
    or to cached responses if API is unavailable.
    """
    
    def __init__(
        self,
        primary: ClaudeLLM,       # Sonnet
        fast_fallback: ClaudeLLM,  # Haiku
        cache: ResponseCache,
    ):
        self.primary = primary
        self.fast_fallback = fast_fallback
        self.cache = cache
        
        self._primary_timeout = 5.0  # Switch to fallback if TTFB > 5s
        self._failure_count = 0
        self._max_failures = 3
    
    async def generate_stream(
        self,
        system_prompt: str,
        messages: list[dict],
        tools: list[dict] = None,
    ) -> AsyncIterator[LLMResponse]:
        """Generate with automatic fallback."""
        
        # Check if we should use fallback due to recent failures
        if self._failure_count >= self._max_failures:
            async for response in self._use_fast_fallback(
                system_prompt, messages, tools
            ):
                yield response
            return
        
        # Try primary with timeout monitoring
        try:
            first_token_received = False
            start_time = time.time()
            
            async for response in self.primary.generate_stream(
                system_prompt, messages, tools
            ):
                if not first_token_received:
                    first_token_received = True
                    ttfb = time.time() - start_time
                    
                    # If TTFB is too slow, switch to fallback for next request
                    if ttfb > self._primary_timeout:
                        logger.warning(f"Primary LLM slow ({ttfb:.1f}s), consider fallback")
                
                yield response
            
            # Success - reset failure count
            self._failure_count = 0
            
        except Exception as e:
            logger.error(f"Primary LLM failed: {e}")
            self._failure_count += 1
            
            # Try fast fallback
            async for response in self._use_fast_fallback(
                system_prompt, messages, tools
            ):
                yield response
    
    async def _use_fast_fallback(
        self,
        system_prompt: str,
        messages: list[dict],
        tools: list[dict] = None,
    ) -> AsyncIterator[LLMResponse]:
        """Use the fast fallback model."""
        try:
            async for response in self.fast_fallback.generate_stream(
                system_prompt, messages, tools
            ):
                yield response
        except Exception as e:
            logger.error(f"Fallback LLM also failed: {e}")
            
            # Last resort - cached response
            cached = await self.cache.get_fallback_response(messages)
            if cached:
                yield LLMResponse(
                    text=cached,
                    is_complete=True,
                    tool_calls=[],
                    stop_reason="cache",
                    usage=None,
                )
            else:
                yield LLMResponse(
                    text="I apologize, I'm having technical difficulties. Please try again in a moment.",
                    is_complete=True,
                    tool_calls=[],
                    stop_reason="error",
                    usage=None,
                )

  1. Text-to-Speech Integration \

8.1 Chatterbox TTS \

We use Chatterbox TTS self-hosted on RunPod for zero per-minute cost:
  • Quality: Natural, expressive speech
  • Latency: ~150ms TTFB
  • Streaming: Chunk-based audio output
  • Cost: Fixed GPU cost, no per-minute fees

8.2 Chatterbox Client \

# pipeline/tts/chatterbox_client.py

import httpx
import numpy as np
from typing import AsyncIterator, Optional
from dataclasses import dataclass

@dataclass
class TTSConfig:
    """Configuration for Chatterbox TTS."""
    api_url: str  # RunPod endpoint
    api_key: str
    voice_id: str = "default"
    sample_rate: int = 24000
    speed: float = 1.0
    pitch: float = 1.0
    exaggeration: float = 0.5  # Emotion intensity
    cfg_weight: float = 0.5   # Adherence to reference
    timeout: float = 30.0

@dataclass
class AudioChunk:
    """Chunk of synthesized audio."""
    audio_data: bytes
    sample_rate: int
    duration_ms: float
    is_final: bool

class ChatterboxTTS:
    """
    Chatterbox TTS client with streaming support.
    
    Connects to self-hosted Chatterbox on RunPod
    for high-quality, cost-effective speech synthesis.
    """
    
    def __init__(self, config: TTSConfig):
        self.config = config
        self._client = httpx.AsyncClient(timeout=config.timeout)
    
    async def synthesize_stream(
        self,
        text: str,
        voice_id: str = None,
    ) -> AsyncIterator[AudioChunk]:
        """
        Synthesize text to speech with streaming output.
        
        Args:
            text: Text to synthesize
            voice_id: Voice to use (overrides config)
        
        Yields:
            AudioChunk objects with PCM audio data
        """
        voice = voice_id or self.config.voice_id
        
        request_data = {
            "text": text,
            "voice_id": voice,
            "sample_rate": self.config.sample_rate,
            "speed": self.config.speed,
            "pitch": self.config.pitch,
            "exaggeration": self.config.exaggeration,
            "cfg_weight": self.config.cfg_weight,
            "stream": True,
        }
        
        async with self._client.stream(
            "POST",
            f"{self.config.api_url}/synthesize",
            json=request_data,
            headers={
                "Authorization": f"Bearer {self.config.api_key}",
                "Accept": "audio/pcm",
            },
        ) as response:
            response.raise_for_status()
            
            chunk_index = 0
            total_samples = 0
            
            async for chunk in response.aiter_bytes(chunk_size=4800):  # 100ms chunks
                if chunk:
                    samples = len(chunk) // 2  # 16-bit audio
                    duration_ms = (samples / self.config.sample_rate) * 1000
                    total_samples += samples
                    
                    yield AudioChunk(
                        audio_data=chunk,
                        sample_rate=self.config.sample_rate,
                        duration_ms=duration_ms,
                        is_final=False,
                    )
                    
                    chunk_index += 1
            
            # Final chunk marker
            yield AudioChunk(
                audio_data=b"",
                sample_rate=self.config.sample_rate,
                duration_ms=0,
                is_final=True,
            )
    
    async def synthesize(
        self,
        text: str,
        voice_id: str = None,
    ) -> bytes:
        """
        Synthesize text to speech (non-streaming).
        
        Returns complete audio as bytes.
        """
        chunks = []
        async for chunk in self.synthesize_stream(text, voice_id):
            if chunk.audio_data:
                chunks.append(chunk.audio_data)
        
        return b"".join(chunks)
    
    async def get_voices(self) -> list[dict]:
        """Get available voices."""
        response = await self._client.get(
            f"{self.config.api_url}/voices",
            headers={"Authorization": f"Bearer {self.config.api_key}"},
        )
        response.raise_for_status()
        return response.json()
    
    async def clone_voice(
        self,
        name: str,
        audio_samples: list[bytes],
    ) -> str:
        """
        Clone a voice from audio samples.
        
        Returns the new voice ID.
        """
        # Implementation depends on Chatterbox voice cloning API
        pass
    
    async def close(self) -> None:
        """Close the client."""
        await self._client.aclose()


class TTSManager:
    """
    High-level TTS manager with sentence queuing.
    """
    
    def __init__(self, config: TTSConfig):
        self.config = config
        self._client = ChatterboxTTS(config)
        self._sentence_queue: asyncio.Queue = asyncio.Queue()
        self._audio_queue: asyncio.Queue = asyncio.Queue()
        self._running = False
    
    async def start(self) -> None:
        """Start the TTS processing loop."""
        self._running = True
        asyncio.create_task(self._process_queue())
    
    async def stop(self) -> None:
        """Stop the TTS processing loop."""
        self._running = False
    
    async def queue_sentence(self, text: str) -> None:
        """Add a sentence to the synthesis queue."""
        await self._sentence_queue.put(text)
    
    async def get_audio(self) -> Optional[AudioChunk]:
        """Get the next audio chunk."""
        try:
            return await asyncio.wait_for(
                self._audio_queue.get(),
                timeout=0.1,
            )
        except asyncio.TimeoutError:
            return None
    
    async def clear_queue(self) -> None:
        """Clear pending sentences (for interruption)."""
        while not self._sentence_queue.empty():
            try:
                self._sentence_queue.get_nowait()
            except asyncio.QueueEmpty:
                break
        
        while not self._audio_queue.empty():
            try:
                self._audio_queue.get_nowait()
            except asyncio.QueueEmpty:
                break
    
    async def _process_queue(self) -> None:
        """Process sentences from the queue."""
        while self._running:
            try:
                text = await asyncio.wait_for(
                    self._sentence_queue.get(),
                    timeout=0.5,
                )
                
                async for chunk in self._client.synthesize_stream(text):
                    await self._audio_queue.put(chunk)
                    
            except asyncio.TimeoutError:
                continue
            except Exception as e:
                logger.error(f"TTS error: {e}")

8.3 TTS Fallback Strategy \

# pipeline/tts/fallback.py

class TTSWithFallback:
    """
    TTS with automatic fallback providers.
    
    Fallback chain:
    1. Chatterbox (primary - self-hosted, lowest cost)
    2. Resemble.ai (cloud fallback - good quality)
    3. Pre-recorded phrases (emergency fallback)
    """
    
    def __init__(
        self,
        primary: ChatterboxTTS,
        cloud_fallback: "ResembleTTS",
        phrase_cache: "PhraseCache",
    ):
        self.primary = primary
        self.cloud_fallback = cloud_fallback
        self.phrase_cache = phrase_cache
        
        self._primary_healthy = True
        self._failure_count = 0
        self._max_failures = 3
    
    async def synthesize_stream(
        self,
        text: str,
        voice_id: str = None,
    ) -> AsyncIterator[AudioChunk]:
        """Synthesize with automatic fallback."""
        
        # Try primary (Chatterbox)
        if self._primary_healthy:
            try:
                async for chunk in self.primary.synthesize_stream(text, voice_id):
                    yield chunk
                self._failure_count = 0
                return
            except Exception as e:
                logger.warning(f"Primary TTS failed: {e}")
                self._failure_count += 1
                if self._failure_count >= self._max_failures:
                    self._primary_healthy = False
        
        # Try cloud fallback (Resemble)
        try:
            async for chunk in self.cloud_fallback.synthesize_stream(text):
                yield chunk
            return
        except Exception as e:
            logger.warning(f"Cloud TTS fallback failed: {e}")
        
        # Last resort - pre-recorded phrases
        cached_audio = await self.phrase_cache.get_similar(text)
        if cached_audio:
            yield AudioChunk(
                audio_data=cached_audio,
                sample_rate=24000,
                duration_ms=len(cached_audio) / 48,  # Approximate
                is_final=True,
            )
        else:
            # Generate a generic error message audio
            error_audio = await self.phrase_cache.get(
                "I apologize, I'm having some technical difficulties."
            )
            if error_audio:
                yield AudioChunk(
                    audio_data=error_audio,
                    sample_rate=24000,
                    duration_ms=len(error_audio) / 48,
                    is_final=True,
                )
    
    async def health_check(self) -> bool:
        """Check if primary TTS is healthy."""
        try:
            audio = await self.primary.synthesize("Test")
            self._primary_healthy = True
            self._failure_count = 0
            return True
        except Exception:
            return False

8.4 Voice Configuration \

# pipeline/tts/voices.py

@dataclass
class VoiceProfile:
    """Voice configuration for an agent."""
    voice_id: str
    name: str
    gender: str
    accent: str
    age_range: str
    style: str  # professional, friendly, casual
    speed: float = 1.0
    pitch: float = 1.0
    
    # Chatterbox-specific
    exaggeration: float = 0.5
    cfg_weight: float = 0.5

# Pre-configured voices
VOICE_PROFILES = {
    "professional_female": VoiceProfile(
        voice_id="alex_professional",
        name="Alex",
        gender="female",
        accent="American",
        age_range="30-40",
        style="professional",
        speed=1.0,
        pitch=1.0,
        exaggeration=0.3,
    ),
    "friendly_male": VoiceProfile(
        voice_id="jordan_friendly",
        name="Jordan",
        gender="male",
        accent="American",
        age_range="25-35",
        style="friendly",
        speed=1.05,
        pitch=1.0,
        exaggeration=0.5,
    ),
    "warm_female": VoiceProfile(
        voice_id="sam_warm",
        name="Sam",
        gender="female",
        accent="American",
        age_range="35-45",
        style="warm",
        speed=0.95,
        pitch=0.98,
        exaggeration=0.4,
    ),
}

  1. Streaming Architecture \

9.1 End-to-End Streaming \

The entire pipeline operates in streaming mode to minimize latency:
┌─────────────────────────────────────────────────────────────────────────────┐
│                      END-TO-END STREAMING FLOW                              │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   TIME ───────────────────────────────────────────────────────────────▶    │
│                                                                             │
│   AUDIO IN:    ████████████████░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░     │
│   (caller)     [speaking.......]                                            │
│                                                                             │
│   VAD:         ░░████████████████░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░     │
│                  [voice detected][endpoint]                                 │
│                                                                             │
│   STT:         ░░░░██░░██░░██░░████░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░     │
│                    [interim][interim][final]                                │
│                                                                             │
│   LLM:         ░░░░░░░░░░░░░░░░░░██░░██░░██░░██░░██░░░░░░░░░░░░░░░░░░     │
│                                  [tokens streaming...]                      │
│                                                                             │
│   SENTENCE:    ░░░░░░░░░░░░░░░░░░░░░░░░░░████░░░░░░████░░░░░░░░░░░░░░     │
│   BUFFER                                [sent1]    [sent2]                  │
│                                                                             │
│   TTS:         ░░░░░░░░░░░░░░░░░░░░░░░░░░░░██████░░░░██████░░░░░░░░░░     │
│                                            [synth1]  [synth2]               │
│                                                                             │
│   AUDIO OUT:   ░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░████████████████████████     │
│   (to caller)                                [audio playing............]    │
│                                                                             │
│   LATENCY:     |◀──────── ~850ms TTFB ────────▶|                           │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

9.2 Pipeline Orchestrator \

# pipeline/orchestrator.py

import asyncio
from typing import Optional
from dataclasses import dataclass
from enum import Enum

class PipelineState(Enum):
    IDLE = "idle"
    LISTENING = "listening"
    PROCESSING = "processing"
    SPEAKING = "speaking"
    INTERRUPTED = "interrupted"

@dataclass
class PipelineConfig:
    """Configuration for the voice pipeline."""
    vad_config: VADConfig
    stt_config: DeepgramConfig
    llm_config: LLMConfig
    tts_config: TTSConfig
    agent_config: AgentConfig
    
    # Timing
    max_silence_ms: int = 400
    max_turn_duration_ms: int = 60000
    interrupt_threshold_ms: int = 200

class VoicePipelineOrchestrator:
    """
    Orchestrates the complete voice pipeline.
    
    Manages the flow from audio input through processing
    to audio output, handling interruptions and errors.
    """
    
    def __init__(
        self,
        config: PipelineConfig,
        audio_input: "AudioInputStream",
        audio_output: "AudioOutputStream",
        call_id: str,
    ):
        self.config = config
        self.audio_input = audio_input
        self.audio_output = audio_output
        self.call_id = call_id
        
        # Initialize components
        self.vad = VADProcessor(config.vad_config)
        self.stt = STTManager(config.stt_config)
        self.llm = ClaudeLLM(config.llm_config)
        self.tts = TTSManager(config.tts_config)
        self.context = ContextManager(redis_client, kb_service)
        
        # State
        self._state = PipelineState.IDLE
        self._current_transcript = ""
        self._latency_tracker: Optional[LatencyTracker] = None
        self._turn_id = 0
        
        # Control flags
        self._running = False
        self._interrupted = False
    
    async def start(self) -> None:
        """Start the pipeline."""
        self._running = True
        self._state = PipelineState.LISTENING
        
        # Start component tasks
        await self.tts.start()
        
        # Main processing loop
        asyncio.create_task(self._audio_input_loop())
        asyncio.create_task(self._audio_output_loop())
    
    async def stop(self) -> None:
        """Stop the pipeline."""
        self._running = False
        await self.stt.end_session()
        await self.tts.stop()
    
    async def _audio_input_loop(self) -> None:
        """Process incoming audio."""
        while self._running:
            try:
                audio_chunk = await self.audio_input.read()
                if not audio_chunk:
                    continue
                
                timestamp = time.time()
                
                # Check for interruption during speaking
                if self._state == PipelineState.SPEAKING:
                    await self._check_interruption(audio_chunk, timestamp)
                
                # Process through VAD
                await self.vad.process_audio(
                    audio_chunk,
                    timestamp,
                    on_speech_start=self._on_speech_start,
                    on_speech_end=self._on_speech_end,
                )
                
                # Send to STT if listening
                if self._state in (PipelineState.LISTENING, PipelineState.INTERRUPTED):
                    await self.stt.send_audio(audio_chunk)
                
            except Exception as e:
                logger.error(f"Error in audio input loop: {e}")
    
    async def _audio_output_loop(self) -> None:
        """Send audio to output."""
        while self._running:
            try:
                chunk = await self.tts.get_audio()
                
                if chunk and chunk.audio_data:
                    if not self._interrupted:
                        await self.audio_output.write(chunk.audio_data)
                    
                    if chunk.is_final:
                        self._state = PipelineState.LISTENING
                
                await asyncio.sleep(0.01)  # Small yield
                
            except Exception as e:
                logger.error(f"Error in audio output loop: {e}")
    
    async def _on_speech_start(self, event: dict) -> None:
        """Handle speech start from VAD."""
        logger.debug(f"Speech started: {event}")
        
        # Start new turn
        self._turn_id += 1
        self._latency_tracker = LatencyTracker(self.call_id, str(self._turn_id))
        self._latency_tracker.mark("vad_speech_start")
        
        # Start STT session
        await self.stt.start_session(
            on_interim=self._on_interim_transcript,
            on_final=self._on_final_transcript,
        )
        
        self._state = PipelineState.LISTENING
    
    async def _on_speech_end(self, event: dict) -> None:
        """Handle speech end from VAD (endpointing)."""
        logger.debug(f"Speech ended: {event}")
        
        if self._latency_tracker:
            self._latency_tracker.mark("vad_speech_end")
        
        # Get final transcript
        final_transcript = await self.stt.end_session()
        
        if final_transcript:
            await self._process_turn(final_transcript)
    
    async def _on_interim_transcript(self, transcript: str) -> None:
        """Handle interim transcript."""
        self._current_transcript = transcript
        
        if self._latency_tracker and "stt_interim_first" not in self._latency_tracker.checkpoints:
            self._latency_tracker.mark("stt_interim_first")
    
    async def _on_final_transcript(self, transcript: str) -> None:
        """Handle final transcript."""
        if self._latency_tracker:
            self._latency_tracker.mark("stt_final")
    
    async def _process_turn(self, transcript: str) -> None:
        """Process a complete user turn."""
        self._state = PipelineState.PROCESSING
        
        logger.info(f"Processing turn: {transcript}")
        
        # Assemble context
        context = await self.context.assemble_context(
            call_id=self.call_id,
            agent_config=self.config.agent_config,
            current_input=transcript,
        )
        
        if self._latency_tracker:
            self._latency_tracker.mark("context_assembled")
            self._latency_tracker.mark("llm_request_sent")
        
        # Generate response
        first_token = False
        first_sentence = False
        
        async for response in self.llm.generate_stream(
            system_prompt=context.system_prompt,
            messages=context.messages,
            tools=context.tools,
        ):
            if not first_token and response.text:
                first_token = True
                if self._latency_tracker:
                    self._latency_tracker.mark("llm_first_token")
            
            # Extract sentences and queue for TTS
            if response.text:
                chunk, remaining = self._extract_sentence(response.text)
                
                if chunk:
                    if not first_sentence:
                        first_sentence = True
                        if self._latency_tracker:
                            self._latency_tracker.mark("llm_sentence_complete")
                    
                    await self.tts.queue_sentence(chunk)
                    self._state = PipelineState.SPEAKING
            
            # Handle tool calls
            if response.is_complete and response.tool_calls:
                for tool_call in response.tool_calls:
                    await self._execute_tool(tool_call)
        
        # Add to conversation history
        await self.context.add_to_history(self.call_id, "user", transcript)
        await self.context.add_to_history(self.call_id, "assistant", response.text)
        
        # Emit latency metrics
        if self._latency_tracker:
            self._latency_tracker.emit_metrics()
    
    async def _check_interruption(
        self,
        audio_chunk: bytes,
        timestamp: float,
    ) -> None:
        """Check if user is interrupting."""
        # Quick VAD check
        vad_result = self.vad.vad.process_frame(
            np.frombuffer(audio_chunk, dtype=np.int16),
            timestamp,
        )
        
        if vad_result.get("speech_probability", 0) > 0.7:
            # User might be interrupting
            self._handle_interruption()
    
    def _handle_interruption(self) -> None:
        """Handle user interruption."""
        logger.info("User interruption detected")
        
        self._interrupted = True
        self._state = PipelineState.INTERRUPTED
        
        # Clear TTS queue
        asyncio.create_task(self.tts.clear_queue())
        
        # Reset for new input
        self._interrupted = False
    
    def _extract_sentence(self, text: str) -> tuple[str, str]:
        """Extract a complete sentence from text."""
        # Simple sentence boundary detection
        for end_char in ['. ', '! ', '? ', '.\n', '!\n', '?\n']:
            if end_char in text:
                idx = text.index(end_char) + 1
                return text[:idx].strip(), text[idx:]
        return "", text
    
    async def _execute_tool(self, tool_call: dict) -> None:
        """Execute a tool call."""
        logger.info(f"Executing tool: {tool_call['name']}")
        
        # Tool execution logic (see Section 11)
        pass

9.3 Audio Buffer Management \

# pipeline/audio/buffer.py

import collections
from typing import Optional
import numpy as np

class CircularAudioBuffer:
    """
    Circular buffer for audio samples.
    
    Maintains a fixed-size buffer for audio processing
    with efficient append and read operations.
    """
    
    def __init__(
        self,
        duration_ms: int,
        sample_rate: int = 16000,
        channels: int = 1,
    ):
        self.sample_rate = sample_rate
        self.channels = channels
        
        # Calculate buffer size
        samples = int(sample_rate * duration_ms / 1000)
        self._buffer = collections.deque(maxlen=samples)
        self._write_position = 0
    
    def write(self, samples: np.ndarray) -> None:
        """Write samples to the buffer."""
        for sample in samples:
            self._buffer.append(sample)
    
    def read(self, num_samples: int) -> Optional[np.ndarray]:
        """Read samples from the buffer."""
        if len(self._buffer) < num_samples:
            return None
        
        samples = []
        for _ in range(num_samples):
            samples.append(self._buffer.popleft())
        
        return np.array(samples, dtype=np.int16)
    
    def peek(self, num_samples: int) -> Optional[np.ndarray]:
        """Peek at samples without removing them."""
        if len(self._buffer) < num_samples:
            return None
        
        return np.array(list(self._buffer)[:num_samples], dtype=np.int16)
    
    def clear(self) -> None:
        """Clear the buffer."""
        self._buffer.clear()
    
    @property
    def available(self) -> int:
        """Number of samples available."""
        return len(self._buffer)
    
    @property
    def duration_ms(self) -> float:
        """Duration of buffered audio in milliseconds."""
        return (len(self._buffer) / self.sample_rate) * 1000


class JitterBuffer:
    """
    Jitter buffer for smooth audio playback.
    
    Buffers audio chunks to handle network jitter
    while maintaining low latency.
    """
    
    def __init__(
        self,
        target_latency_ms: int = 60,
        max_latency_ms: int = 200,
        sample_rate: int = 24000,
    ):
        self.target_latency_ms = target_latency_ms
        self.max_latency_ms = max_latency_ms
        self.sample_rate = sample_rate
        
        self._buffer: asyncio.Queue = asyncio.Queue()
        self._buffered_duration_ms: float = 0
        self._is_playing = False
    
    async def add_chunk(self, audio_data: bytes) -> None:
        """Add an audio chunk to the buffer."""
        duration_ms = (len(audio_data) / 2) / self.sample_rate * 1000
        
        await self._buffer.put(audio_data)
        self._buffered_duration_ms += duration_ms
        
        # Drop old chunks if buffer is too full
        while self._buffered_duration_ms > self.max_latency_ms:
            try:
                old_chunk = self._buffer.get_nowait()
                old_duration = (len(old_chunk) / 2) / self.sample_rate * 1000
                self._buffered_duration_ms -= old_duration
            except asyncio.QueueEmpty:
                break
    
    async def get_chunk(self) -> Optional[bytes]:
        """Get the next chunk for playback."""
        # Wait until we have enough buffered
        if not self._is_playing:
            while self._buffered_duration_ms < self.target_latency_ms:
                await asyncio.sleep(0.01)
            self._is_playing = True
        
        try:
            chunk = await asyncio.wait_for(
                self._buffer.get(),
                timeout=0.1,
            )
            duration_ms = (len(chunk) / 2) / self.sample_rate * 1000
            self._buffered_duration_ms -= duration_ms
            return chunk
        except asyncio.TimeoutError:
            self._is_playing = False
            return None
    
    def clear(self) -> None:
        """Clear the buffer."""
        while not self._buffer.empty():
            try:
                self._buffer.get_nowait()
            except asyncio.QueueEmpty:
                break
        self._buffered_duration_ms = 0
        self._is_playing = False

  1. Interruption Handling \

10.1 Interruption Types \

Users may interrupt the AI for various reasons:
TypeDescriptionResponse
CorrectionUser wants to correct misunderstandingStop, acknowledge, listen
CompletionUser finishes AI’s sentenceStop, continue flow
UrgencyUser has urgent new informationStop immediately, process
ImpatienceAI is too verboseStop, be more concise
Backchannel”Uh-huh”, “okay” - not interruptionContinue speaking

10.2 Interruption Detection \

# pipeline/interruption/detector.py

from dataclasses import dataclass
from typing import Optional
import numpy as np

@dataclass
class InterruptionConfig:
    """Configuration for interruption detection."""
    vad_threshold: float = 0.7           # Higher than normal VAD
    min_duration_ms: int = 150           # Minimum speech to consider
    energy_threshold: float = 0.02       # Minimum audio energy
    backchannel_max_ms: int = 500        # Max duration for backchannel
    confirmation_delay_ms: int = 100     # Wait before confirming interrupt

class InterruptionDetector:
    """
    Detect user interruptions during AI speech.
    
    Distinguishes between intentional interruptions
    and backchannels/noise.
    """
    
    def __init__(self, config: InterruptionConfig = None):
        self.config = config or InterruptionConfig()
        
        self._speech_start_time: Optional[float] = None
        self._speech_energy_sum: float = 0
        self._frame_count: int = 0
        self._confirmed_interruption: bool = False
    
    def process_frame(
        self,
        audio_frame: np.ndarray,
        vad_probability: float,
        timestamp: float,
        ai_is_speaking: bool,
    ) -> dict:
        """
        Process an audio frame for interruption detection.
        
        Args:
            audio_frame: Audio samples
            vad_probability: VAD output for this frame
            timestamp: Current timestamp
            ai_is_speaking: Whether AI is currently outputting audio
        
        Returns:
            Detection result with interruption status
        """
        result = {
            "is_interruption": False,
            "is_backchannel": False,
            "should_stop": False,
            "confidence": 0.0,
        }
        
        if not ai_is_speaking:
            self._reset()
            return result
        
        # Calculate frame energy
        energy = np.sqrt(np.mean(audio_frame.astype(np.float32) ** 2)) / 32768
        
        # Check if this frame has speech
        is_speech = (
            vad_probability >= self.config.vad_threshold and
            energy >= self.config.energy_threshold
        )
        
        if is_speech:
            if self._speech_start_time is None:
                self._speech_start_time = timestamp
            
            self._speech_energy_sum += energy
            self._frame_count += 1
            
            # Calculate speech duration
            duration_ms = (timestamp - self._speech_start_time) * 1000
            avg_energy = self._speech_energy_sum / self._frame_count
            
            # Determine if this is an interruption
            if duration_ms >= self.config.min_duration_ms:
                if duration_ms <= self.config.backchannel_max_ms:
                    # Could be backchannel - wait to confirm
                    result["confidence"] = 0.5
                else:
                    # Likely real interruption
                    result["is_interruption"] = True
                    result["should_stop"] = True
                    result["confidence"] = min(0.9, 0.5 + (duration_ms / 1000))
            
        else:
            # No speech - check if we had a short backchannel
            if self._speech_start_time is not None:
                duration_ms = (timestamp - self._speech_start_time) * 1000
                
                if duration_ms <= self.config.backchannel_max_ms:
                    result["is_backchannel"] = True
                
                self._reset()
        
        return result
    
    def _reset(self) -> None:
        """Reset detection state."""
        self._speech_start_time = None
        self._speech_energy_sum = 0
        self._frame_count = 0
        self._confirmed_interruption = False
    
    def force_interrupt(self) -> None:
        """Force an interruption (external trigger)."""
        self._confirmed_interruption = True


class InterruptionHandler:
    """
    Handle interruptions in the voice pipeline.
    """
    
    def __init__(
        self,
        tts_manager: TTSManager,
        audio_output: "AudioOutputStream",
        context_manager: ContextManager,
    ):
        self.tts = tts_manager
        self.audio_output = audio_output
        self.context = context_manager
        
        self._interrupted_text: str = ""
        self._interrupt_count: int = 0
    
    async def handle_interruption(
        self,
        call_id: str,
        interrupted_response: str,
        interrupt_position: int,
    ) -> None:
        """
        Handle a user interruption.
        
        Args:
            call_id: Current call ID
            interrupted_response: The AI response that was interrupted
            interrupt_position: Character position where interrupted
        """
        logger.info(f"Handling interruption at position {interrupt_position}")
        
        # Stop TTS immediately
        await self.tts.clear_queue()
        
        # Stop audio output
        await self.audio_output.clear()
        
        # Track what was said vs interrupted
        spoken_text = interrupted_response[:interrupt_position]
        unspoken_text = interrupted_response[interrupt_position:]
        
        self._interrupted_text = unspoken_text
        self._interrupt_count += 1
        
        # Update conversation history with partial response
        if spoken_text.strip():
            await self.context.add_to_history(
                call_id,
                "assistant",
                spoken_text + "... [interrupted]",
            )
        
        # If user interrupts frequently, note this for context
        if self._interrupt_count >= 3:
            # User might prefer shorter responses
            logger.info("User frequently interrupts - suggesting shorter responses")
    
    async def get_interrupted_context(self) -> Optional[str]:
        """Get context about what was interrupted."""
        if self._interrupted_text:
            return f"(You were about to say: {self._interrupted_text[:100]}...)"
        return None
    
    def reset_interrupt_tracking(self) -> None:
        """Reset interruption tracking for new call."""
        self._interrupted_text = ""
        self._interrupt_count = 0

10.3 Graceful Interruption Flow \

┌─────────────────────────────────────────────────────────────────────────────┐
│                       INTERRUPTION HANDLING FLOW                            │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   AI Speaking          User Input          System Response                  │
│        │                   │                     │                          │
│        │  ◀── audio ──     │                     │                          │
│        │                   │                     │                          │
│        │              VAD detects speech         │                          │
│        │                   │                     │                          │
│        │                   │  ─── Is this ───▶   │                          │
│        │                   │      interruption?  │                          │
│        │                   │                     │                          │
│        │                   │                     │                          │
│        │                   │     ◀── Check ───   │                          │
│        │                   │         duration    │                          │
│        │                   │         & energy    │                          │
│        │                   │                     │                          │
│    ┌───┴───────────────────┴─────────────────────┴───┐                     │
│    │                                                  │                     │
│    │  If duration < 500ms AND low energy:            │                     │
│    │    → Likely backchannel, continue speaking      │                     │
│    │                                                  │                     │
│    │  If duration >= 150ms AND high energy:          │                     │
│    │    → Likely interruption                        │                     │
│    │    → Stop TTS                                   │                     │
│    │    → Clear audio queue                          │                     │
│    │    → Start processing new input                 │                     │
│    │                                                  │                     │
│    └─────────────────────────────────────────────────┘                     │
│                                                                             │
│   Post-Interruption:                                                        │
│   1. Log partial response for context                                       │
│   2. Begin STT on new user input                                           │
│   3. Don't repeat interrupted content unless relevant                       │
│   4. If frequent interrupts → respond more concisely                       │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

10.4 Backchannel Recognition \

# pipeline/interruption/backchannel.py

import re
from typing import Optional

class BackchannelRecognizer:
    """
    Recognize backchannel utterances that shouldn't interrupt.
    
    Backchannels are short acknowledgments like "uh-huh", "okay",
    "right" that indicate listening without intent to take the turn.
    """
    
    # Common backchannel patterns
    BACKCHANNEL_PATTERNS = [
        r'^(uh[- ]?huh|mm[- ]?hmm|hmm+)$',
        r'^(okay|ok|yep|yeah|yes|no|right|sure)$',
        r'^(i see|got it|makes sense)$',
        r'^(oh|ah|wow)$',
    ]
    
    def __init__(self):
        self._patterns = [
            re.compile(p, re.IGNORECASE)
            for p in self.BACKCHANNEL_PATTERNS
        ]
    
    def is_backchannel(
        self,
        transcript: str,
        duration_ms: float,
    ) -> bool:
        """
        Check if utterance is a backchannel.
        
        Args:
            transcript: The transcribed text
            duration_ms: Duration of the utterance
        
        Returns:
            True if likely a backchannel
        """
        # Backchannels are short
        if duration_ms > 1000:
            return False
        
        # Clean transcript
        text = transcript.strip().lower()
        
        # Check patterns
        for pattern in self._patterns:
            if pattern.match(text):
                return True
        
        # Very short utterances (1-2 words) are often backchannels
        words = text.split()
        if len(words) <= 2 and duration_ms < 500:
            return True
        
        return False
    
    def get_backchannel_type(
        self,
        transcript: str,
    ) -> Optional[str]:
        """Categorize the backchannel type."""
        text = transcript.strip().lower()
        
        if re.match(r'^(yes|yeah|yep|uh[- ]?huh|mm[- ]?hmm)$', text):
            return "agreement"
        elif re.match(r'^(no|nope)$', text):
            return "disagreement"
        elif re.match(r'^(okay|ok|got it|i see)$', text):
            return "acknowledgment"
        elif re.match(r'^(oh|ah|wow)$', text):
            return "surprise"
        elif re.match(r'^(right|sure)$', text):
            return "confirmation"
        
        return None

  1. Tool Calling in Voice Context \

11.1 Voice-Appropriate Tools \

Tools in voice context need special handling:
  • Results must be speakable
  • Execution should be fast
  • Failures need graceful verbal handling
# pipeline/tools/voice_tools.py

from dataclasses import dataclass
from typing import Any, Optional
from enum import Enum

class ToolCategory(Enum):
    INSTANT = "instant"        # < 500ms, can execute inline
    ASYNC = "async"            # > 500ms, need filler phrase
    BACKGROUND = "background"  # Can run without response

@dataclass
class VoiceToolDefinition:
    """Tool definition optimized for voice."""
    name: str
    description: str
    parameters: dict
    category: ToolCategory
    
    # Voice-specific
    filler_phrase: str = "Let me check that for you..."
    success_template: str = "{result}"
    failure_phrase: str = "I wasn't able to complete that action."
    max_execution_ms: int = 5000

# Example tool definitions
APPOINTMENT_TOOLS = [
    VoiceToolDefinition(
        name="check_availability",
        description="Check available appointment slots for a given date",
        parameters={
            "type": "object",
            "properties": {
                "date": {
                    "type": "string",
                    "description": "Date to check in YYYY-MM-DD format",
                },
                "service_type": {
                    "type": "string",
                    "description": "Type of appointment",
                },
            },
            "required": ["date"],
        },
        category=ToolCategory.INSTANT,
        filler_phrase="Let me check what's available...",
        success_template="I have availability at {slots}. Which time works best for you?",
    ),
    VoiceToolDefinition(
        name="book_appointment",
        description="Book an appointment for the caller",
        parameters={
            "type": "object",
            "properties": {
                "date": {"type": "string"},
                "time": {"type": "string"},
                "service_type": {"type": "string"},
                "customer_name": {"type": "string"},
                "customer_phone": {"type": "string"},
            },
            "required": ["date", "time", "customer_name"],
        },
        category=ToolCategory.ASYNC,
        filler_phrase="I'm booking that for you now...",
        success_template="I've booked your appointment for {date} at {time}. You'll receive a confirmation shortly.",
        failure_phrase="I wasn't able to book that appointment. Would you like to try a different time?",
    ),
    VoiceToolDefinition(
        name="transfer_to_agent",
        description="Transfer the call to a human agent",
        parameters={
            "type": "object",
            "properties": {
                "reason": {"type": "string"},
                "department": {"type": "string"},
            },
        },
        category=ToolCategory.INSTANT,
        filler_phrase="I'll connect you with someone who can help...",
        success_template="I'm transferring you now. Please hold.",
    ),
]

11.2 Tool Executor \

# pipeline/tools/executor.py

import asyncio
from typing import Any, Optional
from dataclasses import dataclass

@dataclass
class ToolResult:
    """Result from tool execution."""
    tool_name: str
    tool_call_id: str
    success: bool
    result: Any
    error: Optional[str]
    execution_time_ms: float
    speakable_response: str

class VoiceToolExecutor:
    """
    Execute tools in voice context.
    
    Handles timing, filler phrases, and result formatting
    for natural voice conversation flow.
    """
    
    def __init__(
        self,
        tool_definitions: list[VoiceToolDefinition],
        webhook_executor: "WebhookExecutor",
        tts_manager: TTSManager,
    ):
        self.tools = {t.name: t for t in tool_definitions}
        self.webhook_executor = webhook_executor
        self.tts = tts_manager
    
    async def execute(
        self,
        tool_call: dict,
        call_context: dict,
    ) -> ToolResult:
        """
        Execute a tool call.
        
        Args:
            tool_call: Tool call from LLM (name, id, input)
            call_context: Current call context
        
        Returns:
            Tool execution result
        """
        tool_name = tool_call["name"]
        tool_id = tool_call["id"]
        tool_input = tool_call["input"]
        
        tool_def = self.tools.get(tool_name)
        if not tool_def:
            return ToolResult(
                tool_name=tool_name,
                tool_call_id=tool_id,
                success=False,
                result=None,
                error=f"Unknown tool: {tool_name}",
                execution_time_ms=0,
                speakable_response="I'm not able to do that right now.",
            )
        
        # Queue filler phrase for async tools
        if tool_def.category == ToolCategory.ASYNC:
            await self.tts.queue_sentence(tool_def.filler_phrase)
        
        # Execute with timeout
        start_time = time.time()
        
        try:
            result = await asyncio.wait_for(
                self._execute_tool(tool_name, tool_input, call_context),
                timeout=tool_def.max_execution_ms / 1000,
            )
            
            execution_time = (time.time() - start_time) * 1000
            
            # Format speakable response
            speakable = self._format_response(tool_def, result)
            
            return ToolResult(
                tool_name=tool_name,
                tool_call_id=tool_id,
                success=True,
                result=result,
                error=None,
                execution_time_ms=execution_time,
                speakable_response=speakable,
            )
            
        except asyncio.TimeoutError:
            execution_time = (time.time() - start_time) * 1000
            return ToolResult(
                tool_name=tool_name,
                tool_call_id=tool_id,
                success=False,
                result=None,
                error="Tool execution timed out",
                execution_time_ms=execution_time,
                speakable_response=tool_def.failure_phrase,
            )
            
        except Exception as e:
            execution_time = (time.time() - start_time) * 1000
            logger.error(f"Tool execution error: {e}")
            return ToolResult(
                tool_name=tool_name,
                tool_call_id=tool_id,
                success=False,
                result=None,
                error=str(e),
                execution_time_ms=execution_time,
                speakable_response=tool_def.failure_phrase,
            )
    
    async def _execute_tool(
        self,
        tool_name: str,
        tool_input: dict,
        call_context: dict,
    ) -> Any:
        """Execute the actual tool logic."""
        
        # Built-in tools
        if tool_name == "transfer_to_agent":
            return await self._transfer_to_agent(tool_input, call_context)
        
        elif tool_name == "end_call":
            return await self._end_call(tool_input, call_context)
        
        elif tool_name == "send_sms":
            return await self._send_sms(tool_input, call_context)
        
        # External webhook tools
        else:
            return await self.webhook_executor.execute(
                tool_name,
                tool_input,
                call_context,
            )
    
    def _format_response(
        self,
        tool_def: VoiceToolDefinition,
        result: Any,
    ) -> str:
        """Format tool result as speakable text."""
        try:
            if isinstance(result, dict):
                return tool_def.success_template.format(**result)
            else:
                return tool_def.success_template.format(result=result)
        except KeyError:
            return str(result)
    
    async def _transfer_to_agent(
        self,
        params: dict,
        context: dict,
    ) -> dict:
        """Built-in transfer functionality."""
        from integrations.gotoconnect import call_control
        
        target = params.get("department", "support")
        reason = params.get("reason", "Customer requested transfer")
        
        # Map department to extension
        department_extensions = {
            "support": "ext:1001",
            "sales": "ext:1002",
            "billing": "ext:1003",
        }
        
        dial_string = department_extensions.get(target, "ext:1001")
        
        await call_control.blind_transfer(
            call_id=context["external_call_id"],
            dial_string=dial_string,
        )
        
        return {"transferred_to": target, "reason": reason}
    
    async def _end_call(
        self,
        params: dict,
        context: dict,
    ) -> dict:
        """Built-in end call functionality."""
        from integrations.gotoconnect import call_control
        
        await call_control.hangup(
            call_id=context["external_call_id"],
        )
        
        return {"ended": True}
    
    async def _send_sms(
        self,
        params: dict,
        context: dict,
    ) -> dict:
        """Send SMS to caller."""
        # Implementation depends on SMS provider
        pass

11.3 Webhook Integration with n8n \

# pipeline/tools/webhook_executor.py

import httpx
from typing import Any

class WebhookExecutor:
    """
    Execute tools via n8n webhooks.
    
    n8n workflows can be triggered to perform
    complex operations and return results.
    """
    
    def __init__(
        self,
        n8n_base_url: str,
        timeout: float = 10.0,
    ):
        self.base_url = n8n_base_url
        self.timeout = timeout
        self._client = httpx.AsyncClient(timeout=timeout)
    
    async def execute(
        self,
        tool_name: str,
        tool_input: dict,
        call_context: dict,
    ) -> Any:
        """
        Execute a tool via n8n webhook.
        
        Args:
            tool_name: Name of the tool (maps to webhook path)
            tool_input: Input parameters from LLM
            call_context: Current call context
        
        Returns:
            Result from n8n workflow
        """
        webhook_url = f"{self.base_url}/webhook/{tool_name}"
        
        payload = {
            "input": tool_input,
            "context": {
                "call_id": call_context.get("call_id"),
                "tenant_id": call_context.get("tenant_id"),
                "caller_phone": call_context.get("caller_phone"),
                "timestamp": time.time(),
            },
        }
        
        response = await self._client.post(
            webhook_url,
            json=payload,
            headers={
                "Content-Type": "application/json",
            },
        )
        
        response.raise_for_status()
        
        result = response.json()
        return result.get("data", result)
    
    async def close(self) -> None:
        """Close the client."""
        await self._client.aclose()

  1. Conversation State Management \

12.1 State Structure \

# pipeline/state/models.py

from dataclasses import dataclass, field
from typing import Optional, Any
from enum import Enum
from datetime import datetime

class ConversationPhase(Enum):
    GREETING = "greeting"
    DISCOVERY = "discovery"
    RESOLUTION = "resolution"
    CLOSING = "closing"

@dataclass
class ConversationState:
    """Complete state of a conversation."""
    call_id: str
    tenant_id: str
    agent_id: str
    
    # Call metadata
    caller_phone: str
    called_number: str
    direction: str
    started_at: datetime
    
    # Conversation tracking
    turn_count: int = 0
    current_phase: ConversationPhase = ConversationPhase.GREETING
    detected_intent: Optional[str] = None
    
    # Context accumulation
    collected_info: dict = field(default_factory=dict)
    pending_actions: list = field(default_factory=list)
    completed_actions: list = field(default_factory=list)
    
    # State flags
    is_on_hold: bool = False
    is_muted: bool = False
    transfer_pending: bool = False
    
    # Performance tracking
    total_latency_ms: float = 0
    average_latency_ms: float = 0
    interrupt_count: int = 0

@dataclass
class TurnState:
    """State for a single conversation turn."""
    turn_id: str
    started_at: float
    
    # Input
    transcript: str = ""
    transcript_confidence: float = 0.0
    
    # Processing
    llm_response: str = ""
    tool_calls: list = field(default_factory=list)
    tool_results: list = field(default_factory=list)
    
    # Output
    spoken_response: str = ""
    was_interrupted: bool = False
    
    # Timing
    stt_latency_ms: float = 0
    llm_latency_ms: float = 0
    tts_latency_ms: float = 0
    total_latency_ms: float = 0

12.2 State Manager \

# pipeline/state/manager.py

import json
from typing import Optional
from redis.asyncio import Redis

class ConversationStateManager:
    """
    Manage conversation state in Redis.
    
    Provides fast access to conversation state
    with automatic expiration and recovery.
    """
    
    STATE_TTL = 3600  # 1 hour
    
    def __init__(self, redis: Redis):
        self.redis = redis
    
    async def get_state(self, call_id: str) -> Optional[ConversationState]:
        """Get conversation state."""
        key = f"call:{call_id}:state"
        data = await self.redis.get(key)
        
        if data:
            state_dict = json.loads(data)
            return ConversationState(**state_dict)
        return None
    
    async def save_state(self, state: ConversationState) -> None:
        """Save conversation state."""
        key = f"call:{state.call_id}:state"
        
        state_dict = {
            "call_id": state.call_id,
            "tenant_id": state.tenant_id,
            "agent_id": state.agent_id,
            "caller_phone": state.caller_phone,
            "called_number": state.called_number,
            "direction": state.direction,
            "started_at": state.started_at.isoformat(),
            "turn_count": state.turn_count,
            "current_phase": state.current_phase.value,
            "detected_intent": state.detected_intent,
            "collected_info": state.collected_info,
            "pending_actions": state.pending_actions,
            "completed_actions": state.completed_actions,
            "is_on_hold": state.is_on_hold,
            "is_muted": state.is_muted,
            "transfer_pending": state.transfer_pending,
            "total_latency_ms": state.total_latency_ms,
            "average_latency_ms": state.average_latency_ms,
            "interrupt_count": state.interrupt_count,
        }
        
        await self.redis.setex(
            key,
            self.STATE_TTL,
            json.dumps(state_dict),
        )
    
    async def update_turn(
        self,
        call_id: str,
        turn: TurnState,
    ) -> None:
        """Update state after a turn."""
        state = await self.get_state(call_id)
        if not state:
            return
        
        state.turn_count += 1
        state.total_latency_ms += turn.total_latency_ms
        state.average_latency_ms = state.total_latency_ms / state.turn_count
        
        if turn.was_interrupted:
            state.interrupt_count += 1
        
        # Update collected info from tool results
        for result in turn.tool_results:
            if result.get("collected_data"):
                state.collected_info.update(result["collected_data"])
        
        await self.save_state(state)
    
    async def get_history(self, call_id: str) -> list[dict]:
        """Get conversation history."""
        key = f"call:{call_id}:history"
        data = await self.redis.get(key)
        
        if data:
            return json.loads(data)
        return []
    
    async def add_to_history(
        self,
        call_id: str,
        role: str,
        content: str,
    ) -> None:
        """Add a message to history."""
        key = f"call:{call_id}:history"
        
        history = await self.get_history(call_id)
        history.append({
            "role": role,
            "content": content,
            "timestamp": time.time(),
        })
        
        # Keep last 50 turns
        history = history[-100:]
        
        await self.redis.setex(
            key,
            self.STATE_TTL,
            json.dumps(history),
        )
    
    async def cleanup(self, call_id: str) -> None:
        """Clean up state after call ends."""
        keys = [
            f"call:{call_id}:state",
            f"call:{call_id}:history",
            f"call:{call_id}:webrtc",
            f"call:{call_id}:context",
        ]
        
        for key in keys:
            await self.redis.delete(key)

  1. Error Handling and Fallbacks \

13.1 Error Categories \

# pipeline/errors.py

from enum import Enum

class ErrorCategory(Enum):
    RECOVERABLE = "recoverable"     # Can retry or fallback
    DEGRADED = "degraded"           # Partial functionality
    FATAL = "fatal"                 # Must end call

class PipelineError(Exception):
    """Base pipeline error."""
    category: ErrorCategory = ErrorCategory.RECOVERABLE
    user_message: str = "I apologize, I encountered an issue."

class STTError(PipelineError):
    """Speech-to-text error."""
    user_message = "I'm having trouble hearing you. Could you please repeat that?"

class LLMError(PipelineError):
    """LLM processing error."""
    user_message = "I need a moment to think. Could you repeat your question?"

class TTSError(PipelineError):
    """Text-to-speech error."""
    category = ErrorCategory.DEGRADED
    user_message = ""  # Can't speak this error!

class ToolError(PipelineError):
    """Tool execution error."""
    user_message = "I wasn't able to complete that action. Let me try another way."

class FatalError(PipelineError):
    """Unrecoverable error."""
    category = ErrorCategory.FATAL
    user_message = "I apologize, I'm experiencing technical difficulties. Please call back or hold for an agent."

13.2 Error Handler \

# pipeline/error_handler.py

class PipelineErrorHandler:
    """
    Handle errors in the voice pipeline.
    
    Provides graceful degradation and user-friendly
    error messages for voice context.
    """
    
    def __init__(
        self,
        tts_manager: TTSManager,
        call_control: "GoToCallControl",
        state_manager: ConversationStateManager,
    ):
        self.tts = tts_manager
        self.call_control = call_control
        self.state = state_manager
        
        self._error_count = 0
        self._max_errors = 5
    
    async def handle_error(
        self,
        error: Exception,
        call_id: str,
        context: str = "",
    ) -> bool:
        """
        Handle a pipeline error.
        
        Args:
            error: The error that occurred
            call_id: Current call ID
            context: Additional context
        
        Returns:
            True if recovered, False if fatal
        """
        self._error_count += 1
        
        logger.error(
            f"Pipeline error in {context}: {error}",
            exc_info=True,
            extra={"call_id": call_id},
        )
        
        # Check if too many errors
        if self._error_count >= self._max_errors:
            await self._handle_fatal(call_id)
            return False
        
        # Handle by error type
        if isinstance(error, PipelineError):
            return await self._handle_pipeline_error(error, call_id)
        else:
            return await self._handle_unexpected_error(error, call_id)
    
    async def _handle_pipeline_error(
        self,
        error: PipelineError,
        call_id: str,
    ) -> bool:
        """Handle a known pipeline error."""
        
        if error.category == ErrorCategory.FATAL:
            await self._handle_fatal(call_id)
            return False
        
        elif error.category == ErrorCategory.DEGRADED:
            # Log but continue
            logger.warning(f"Degraded operation: {error}")
            return True
        
        else:  # RECOVERABLE
            # Speak the error message
            if error.user_message:
                await self.tts.queue_sentence(error.user_message)
            return True
    
    async def _handle_unexpected_error(
        self,
        error: Exception,
        call_id: str,
    ) -> bool:
        """Handle an unexpected error."""
        # Generic recovery attempt
        await self.tts.queue_sentence(
            "I apologize, I encountered an issue. Let me try again."
        )
        return True
    
    async def _handle_fatal(self, call_id: str) -> None:
        """Handle a fatal error - transfer to agent."""
        logger.error(f"Fatal error, transferring call {call_id}")
        
        try:
            # Inform user
            await self.tts.queue_sentence(
                "I apologize, I'm experiencing technical difficulties. "
                "Let me connect you with someone who can help."
            )
            
            # Wait for message to play
            await asyncio.sleep(3)
            
            # Transfer to agent
            await self.call_control.blind_transfer(
                call_id=call_id,
                dial_string="ext:1000",  # Overflow/error queue
            )
        except Exception as e:
            logger.error(f"Failed to transfer on fatal error: {e}")
            # Last resort - hang up
            await self.call_control.hangup(call_id)
    
    def reset_error_count(self) -> None:
        """Reset error count (e.g., after successful turn)."""
        self._error_count = 0

13.3 Fallback Hierarchy \

┌─────────────────────────────────────────────────────────────────────────────┐
│                        FALLBACK HIERARCHY                                   │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   COMPONENT          PRIMARY           FALLBACK 1        FALLBACK 2        │
│   ─────────────────────────────────────────────────────────────────────    │
│                                                                             │
│   STT                Deepgram          Local Whisper     Transfer           │
│                      Nova-2            (on-device)       to agent           │
│                                                                             │
│   LLM                Claude            Claude            Cached             │
│                      Sonnet            Haiku             responses          │
│                                                                             │
│   TTS                Chatterbox        Resemble.ai       Pre-recorded       │
│                      (self-hosted)     (cloud)           phrases            │
│                                                                             │
│   Tools              Primary           Retry with        Apologize          │
│                      webhook           backoff           & skip             │
│                                                                             │
│   Call Control       GoToConnect       Retry             Transfer           │
│                      API               (3 attempts)      queue              │
│                                                                             │
│                                                                             │
│   DECISION LOGIC:                                                           │
│   1. Try primary with timeout                                              │
│   2. If fail, increment failure counter                                    │
│   3. If counter > threshold, switch to fallback                            │
│   4. Periodically health-check primary to restore                          │
│   5. If all fallbacks fail, transfer to human                              │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

  1. Performance Optimization \

14.1 Optimization Techniques \

TechniqueComponentImpactImplementation
Connection poolingAll APIs-30msReuse HTTP connections
StreamingSTT, LLM, TTS-300msProcess incrementally
Sentence bufferingTTS-100msStart TTS before response complete
Speculative executionSTT→LLM-150msStart LLM on interim transcript
Pre-warmingAll-50msKeep connections hot
Edge cachingTTS-100msCache common phrases
Parallel processingToolsVariableExecute tools concurrently

14.2 Connection Management \

# pipeline/optimization/connections.py

import httpx
from typing import Dict

class ConnectionPool:
    """
    Manage persistent connections to external services.
    """
    
    def __init__(self):
        self._clients: Dict[str, httpx.AsyncClient] = {}
    
    async def get_client(
        self,
        service: str,
        base_url: str,
        **kwargs,
    ) -> httpx.AsyncClient:
        """Get or create a persistent client."""
        if service not in self._clients:
            self._clients[service] = httpx.AsyncClient(
                base_url=base_url,
                http2=True,  # Use HTTP/2 for multiplexing
                limits=httpx.Limits(
                    max_keepalive_connections=10,
                    max_connections=20,
                ),
                timeout=httpx.Timeout(30.0, connect=5.0),
                **kwargs,
            )
        
        return self._clients[service]
    
    async def close_all(self) -> None:
        """Close all clients."""
        for client in self._clients.values():
            await client.aclose()
        self._clients.clear()

14.3 Phrase Caching \

# pipeline/optimization/phrase_cache.py

from typing import Optional
import hashlib

class PhraseCache:
    """
    Cache synthesized audio for common phrases.
    
    Pre-generates and caches audio for frequently
    used phrases to reduce TTS latency.
    """
    
    COMMON_PHRASES = [
        "Hello, thank you for calling.",
        "How can I help you today?",
        "Let me check that for you.",
        "One moment please.",
        "Is there anything else I can help you with?",
        "Thank you for calling. Goodbye.",
        "I apologize, I didn't catch that. Could you please repeat?",
        "I'm having trouble understanding. Let me connect you with someone.",
    ]
    
    def __init__(
        self,
        redis: "Redis",
        tts: "ChatterboxTTS",
    ):
        self.redis = redis
        self.tts = tts
    
    async def warm_cache(self, voice_id: str) -> None:
        """Pre-generate audio for common phrases."""
        for phrase in self.COMMON_PHRASES:
            key = self._cache_key(phrase, voice_id)
            
            # Check if already cached
            if await self.redis.exists(key):
                continue
            
            # Generate and cache
            audio = await self.tts.synthesize(phrase, voice_id)
            await self.redis.setex(
                key,
                86400,  # 24 hours
                audio,
            )
        
        logger.info(f"Warmed phrase cache for voice {voice_id}")
    
    async def get(
        self,
        phrase: str,
        voice_id: str = "default",
    ) -> Optional[bytes]:
        """Get cached audio for a phrase."""
        key = self._cache_key(phrase, voice_id)
        return await self.redis.get(key)
    
    async def get_similar(
        self,
        phrase: str,
        voice_id: str = "default",
    ) -> Optional[bytes]:
        """Find a similar cached phrase."""
        # Normalize phrase
        normalized = phrase.lower().strip()
        
        for cached_phrase in self.COMMON_PHRASES:
            if self._is_similar(normalized, cached_phrase.lower()):
                return await self.get(cached_phrase, voice_id)
        
        return None
    
    def _cache_key(self, phrase: str, voice_id: str) -> str:
        """Generate cache key for phrase."""
        phrase_hash = hashlib.md5(phrase.encode()).hexdigest()[:8]
        return f"tts:cache:{voice_id}:{phrase_hash}"
    
    def _is_similar(self, a: str, b: str) -> bool:
        """Check if two phrases are similar enough."""
        # Simple word overlap check
        words_a = set(a.split())
        words_b = set(b.split())
        
        overlap = len(words_a & words_b)
        total = len(words_a | words_b)
        
        return overlap / total > 0.7 if total > 0 else False

  1. Monitoring and Debugging \

15.1 Metrics \

# pipeline/monitoring/metrics.py

from prometheus_client import Counter, Histogram, Gauge

# Latency metrics
PIPELINE_LATENCY = Histogram(
    "voice_pipeline_latency_ms",
    "End-to-end pipeline latency",
    ["call_id", "phase"],
    buckets=[100, 200, 300, 500, 750, 1000, 1500, 2000, 3000, 5000],
)

STT_LATENCY = Histogram(
    "voice_stt_latency_ms",
    "Speech-to-text latency",
    buckets=[50, 100, 150, 200, 300, 500],
)

LLM_TTFB = Histogram(
    "voice_llm_ttfb_ms",
    "LLM time to first token",
    buckets=[100, 200, 300, 500, 750, 1000, 1500],
)

TTS_TTFB = Histogram(
    "voice_tts_ttfb_ms",
    "TTS time to first byte",
    buckets=[50, 100, 150, 200, 300, 500],
)

# Counter metrics
TURNS_TOTAL = Counter(
    "voice_turns_total",
    "Total conversation turns",
    ["tenant_id", "agent_id"],
)

INTERRUPTIONS_TOTAL = Counter(
    "voice_interruptions_total",
    "Total user interruptions",
    ["tenant_id"],
)

ERRORS_TOTAL = Counter(
    "voice_errors_total",
    "Pipeline errors",
    ["component", "error_type"],
)

TOOL_CALLS_TOTAL = Counter(
    "voice_tool_calls_total",
    "Tool call executions",
    ["tool_name", "success"],
)

# Gauge metrics
ACTIVE_CALLS = Gauge(
    "voice_active_calls",
    "Currently active calls",
    ["tenant_id"],
)

PIPELINE_STATE = Gauge(
    "voice_pipeline_state",
    "Current pipeline state",
    ["call_id", "state"],
)

15.2 Logging \

# pipeline/monitoring/logging.py

import structlog
from typing import Any

def configure_logging():
    """Configure structured logging for the pipeline."""
    structlog.configure(
        processors=[
            structlog.contextvars.merge_contextvars,
            structlog.processors.add_log_level,
            structlog.processors.TimeStamper(fmt="iso"),
            structlog.processors.StackInfoRenderer(),
            structlog.processors.format_exc_info,
            structlog.processors.JSONRenderer(),
        ],
        wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
        context_class=dict,
        logger_factory=structlog.PrintLoggerFactory(),
        cache_logger_on_first_use=True,
    )

class PipelineLogger:
    """
    Structured logger for voice pipeline events.
    """
    
    def __init__(self, call_id: str, tenant_id: str):
        self.logger = structlog.get_logger()
        self.call_id = call_id
        self.tenant_id = tenant_id
    
    def turn_started(self, turn_id: str) -> None:
        """Log turn start."""
        self.logger.info(
            "turn_started",
            call_id=self.call_id,
            tenant_id=self.tenant_id,
            turn_id=turn_id,
        )
    
    def transcript_received(
        self,
        turn_id: str,
        transcript: str,
        is_final: bool,
        latency_ms: float,
    ) -> None:
        """Log transcript."""
        self.logger.info(
            "transcript_received",
            call_id=self.call_id,
            turn_id=turn_id,
            transcript=transcript[:100],  # Truncate
            is_final=is_final,
            latency_ms=latency_ms,
        )
    
    def llm_response(
        self,
        turn_id: str,
        response_length: int,
        ttfb_ms: float,
        total_ms: float,
        has_tool_calls: bool,
    ) -> None:
        """Log LLM response."""
        self.logger.info(
            "llm_response",
            call_id=self.call_id,
            turn_id=turn_id,
            response_length=response_length,
            ttfb_ms=ttfb_ms,
            total_ms=total_ms,
            has_tool_calls=has_tool_calls,
        )
    
    def tool_executed(
        self,
        turn_id: str,
        tool_name: str,
        success: bool,
        execution_ms: float,
    ) -> None:
        """Log tool execution."""
        self.logger.info(
            "tool_executed",
            call_id=self.call_id,
            turn_id=turn_id,
            tool_name=tool_name,
            success=success,
            execution_ms=execution_ms,
        )
    
    def interruption_detected(
        self,
        turn_id: str,
        spoken_duration_ms: float,
    ) -> None:
        """Log interruption."""
        self.logger.info(
            "interruption_detected",
            call_id=self.call_id,
            turn_id=turn_id,
            spoken_duration_ms=spoken_duration_ms,
        )
    
    def error(
        self,
        component: str,
        error: Exception,
        context: dict = None,
    ) -> None:
        """Log error."""
        self.logger.error(
            "pipeline_error",
            call_id=self.call_id,
            component=component,
            error=str(error),
            error_type=type(error).__name__,
            context=context or {},
            exc_info=True,
        )

15.3 Debug Tools \

# pipeline/monitoring/debug.py

class PipelineDebugger:
    """
    Debug tools for voice pipeline development.
    """
    
    def __init__(self, redis: "Redis"):
        self.redis = redis
    
    async def capture_turn(
        self,
        call_id: str,
        turn_id: str,
        data: dict,
    ) -> None:
        """Capture turn data for debugging."""
        key = f"debug:{call_id}:{turn_id}"
        await self.redis.setex(
            key,
            3600,  # 1 hour
            json.dumps(data),
        )
    
    async def get_turn_capture(
        self,
        call_id: str,
        turn_id: str,
    ) -> Optional[dict]:
        """Retrieve captured turn data."""
        key = f"debug:{call_id}:{turn_id}"
        data = await self.redis.get(key)
        return json.loads(data) if data else None
    
    async def replay_turn(
        self,
        call_id: str,
        turn_id: str,
    ) -> dict:
        """Replay a captured turn for debugging."""
        capture = await self.get_turn_capture(call_id, turn_id)
        if not capture:
            raise ValueError(f"No capture found for {call_id}:{turn_id}")
        
        # Re-run through pipeline components
        results = {
            "original": capture,
            "replay": {},
        }
        
        # Replay STT would require audio
        # Replay LLM
        if capture.get("transcript"):
            # ... replay logic
            pass
        
        return results

## Appendix A: Configuration Reference {#appendix-a:-configuration-reference}

# config/pipeline.yaml

pipeline:
  # VAD Configuration
  vad:
    model: "silero"
    threshold: 0.5
    min_speech_ms: 250
    min_silence_ms: 300
    window_size_ms: 100
  
  # STT Configuration
  stt:
    provider: "deepgram"
    model: "nova-2"
    language: "en-US"
    punctuate: true
    interim_results: true
    utterance_end_ms: 1000
  
  # LLM Configuration
  llm:
    provider: "anthropic"
    model: "claude-sonnet-4-20250514"
    max_tokens: 1024
    temperature: 0.7
    timeout_s: 30
  
  # TTS Configuration
  tts:
    provider: "chatterbox"
    endpoint: "${CHATTERBOX_URL}"
    sample_rate: 24000
    default_voice: "professional_female"
  
  # Latency Targets
  latency:
    e2e_target_ms: 1000
    ttfb_target_ms: 500
    stt_target_ms: 200
    llm_ttfb_target_ms: 300
    tts_ttfb_target_ms: 200
  
  # Interruption
  interruption:
    vad_threshold: 0.7
    min_duration_ms: 150
    backchannel_max_ms: 500
  
  # Error Handling
  errors:
    max_retries: 3
    max_errors_per_call: 5
    fallback_to_agent: true

## Appendix B: Sequence Diagrams {#appendix-b:-sequence-diagrams}

### B.1 Normal Turn Flow {#b.1-normal-turn-flow}

User          VAD           STT           LLM           TTS          Speaker
  │            │             │             │             │             │
  │──audio────▶│             │             │             │             │
  │            │──speech────▶│             │             │             │
  │            │  detected   │             │             │             │
  │──audio────▶│─────────────│──audio─────▶│             │             │
  │            │             │             │             │             │
  │            │             │◀──interim───│             │             │
  │──silence──▶│             │             │             │             │
  │            │──endpoint──▶│             │             │             │
  │            │             │◀──final─────│             │             │
  │            │             │             │             │             │
  │            │             │─────────────│──context───▶│             │
  │            │             │             │             │             │
  │            │             │             │◀─tokens─────│             │
  │            │             │             │             │             │
  │            │             │             │──sentence──▶│             │
  │            │             │             │             │──audio─────▶│
  │            │             │             │◀─tokens─────│             │
  │            │             │             │──sentence──▶│             │
  │            │             │             │             │──audio─────▶│
  │            │             │             │             │             │

Document History \

VersionDateAuthorChanges
1.02026-01-16ClaudeInitial document

End of Document
Last modified on April 18, 2026