Normalized for Mintlify from
knowledge-base/aiconnected-apps-and-modules/modules/aiConnected-voice/voice-pipeline-architecture.mdx.Voice by aiConnected — Voice Pipeline Architecture \
Document Information \
| Field | Value |
|---|---|
| Document ID | ARCH-003 |
| Version | 1.0 |
| Last Updated | 2026-01-16 |
| Status | Draft |
| Owner | Engineering |
| Dependencies | ARCH-001, ARCH-002 |
Table of Contents \
Voice by aiConnected — Voice Pipeline Architecture Document Information Table of Contents 1. Introduction 1.1 Purpose 1.2 Scope 1.3 Design Goals 1.4 Key Terminology 2. Pipeline Overview 2.1 High-Level Architecture 2.2 Component Summary 2.3 Data Flow Summary 2.4 Pipeline States 3. Latency Budget 3.1 Target Latency 3.2 Latency Budget Breakdown 3.3 Latency Optimization Strategies 3.4 Latency Monitoring Points 3.5 Latency Alerts 4. Voice Activity Detection 4.1 VAD Overview 4.2 Silero VAD Integration 4.3 Endpointing Strategies 4.4 VAD Configuration by Use Case 5. Speech-to-Text Integration 5.1 Deepgram Nova-2 5.2 Deepgram Client 5.3 Transcript Processing 5.4 STT Fallback Strategy 6. Context Assembly 6.1 Context Overview 6.2 Context Assembly Pipeline 6.3 Context Manager Implementation 6.4 System Prompt Templates 7. LLM Integration 7.1 Claude API Integration 7.2 Response Routing 7.3 LLM Fallback Strategy 8. Text-to-Speech Integration 8.1 Chatterbox TTS 8.2 Chatterbox Client 8.3 TTS Fallback Strategy 8.4 Voice Configuration 9. Streaming Architecture 9.1 End-to-End Streaming 9.2 Pipeline Orchestrator 9.3 Audio Buffer Management 10. Interruption Handling 10.1 Interruption Types 10.2 Interruption Detection 10.3 Graceful Interruption Flow 10.4 Backchannel Recognition 11. Tool Calling in Voice Context 11.1 Voice-Appropriate Tools 11.2 Tool Executor 11.3 Webhook Integration with n8n 12. Conversation State Management 12.1 State Structure 12.2 State Manager 13. Error Handling and Fallbacks 13.1 Error Categories 13.2 Error Handler 13.3 Fallback Hierarchy 14. Performance Optimization 14.1 Optimization Techniques 14.2 Connection Management 14.3 Phrase Caching 15. Monitoring and Debugging 15.1 Metrics 15.2 Logging 15.3 Debug Tools Appendix A: Configuration Reference Appendix B: Sequence Diagrams B.1 Normal Turn Flow Document History
- Introduction \
1.1 Purpose \
This document specifies the voice pipeline architecture for Voice by aiConnected. The voice pipeline is the core processing chain that transforms caller speech into AI responses and back to synthesized speech. This is where the “magic” happens—creating the illusion of a natural, responsive AI conversation partner. The pipeline must achieve sub-second response times to feel natural while maintaining conversation coherence, handling interruptions gracefully, and executing tool calls seamlessly.1.2 Scope \
This document covers:- Complete audio processing pipeline from microphone to speaker
- Component-level architecture for VAD, STT, LLM, and TTS
- Streaming strategies for minimal latency
- Interruption detection and handling
- Tool calling integration in voice context
- Conversation state management
- Error handling and fallback strategies
- Telephony integration (see ARCH-002)
- LiveKit room management (see ARCH-004)
- Business logic for specific use cases
1.3 Design Goals \
| Goal | Target | Priority |
|---|---|---|
| End-to-end latency | < 1000ms | Critical |
| Time to first byte (TTS) | < 500ms | Critical |
| Interruption response | < 200ms | High |
| Transcription accuracy | > 95% | High |
| Natural conversation flow | Subjective | High |
| Graceful degradation | 100% uptime | Medium |
1.4 Key Terminology \
| Term | Definition |
|---|---|
| VAD | Voice Activity Detection - detecting when someone is speaking |
| STT | Speech-to-Text - converting audio to text (transcription) |
| LLM | Large Language Model - generating conversational responses |
| TTS | Text-to-Speech - converting text to audio (synthesis) |
| TTFB | Time to First Byte - latency until first audio chunk |
| Barge-in | User interrupting the AI while it’s speaking |
| Turn | One party’s complete utterance in conversation |
| Endpointing | Detecting when a speaker has finished their turn |
- Pipeline Overview \
2.1 High-Level Architecture \
┌─────────────────────────────────────────────────────────────────────────────┐
│ VOICE PIPELINE ARCHITECTURE │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ AUDIO INPUT STAGE │ │
│ │ │ │
│ │ Caller Audio ──▶ [LiveKit] ──▶ [Audio Buffer] ──▶ [VAD] │ │
│ │ │ │ │ │
│ │ │ ▼ │ │
│ │ │ Voice Active? │ │
│ │ │ ┌────┴────┐ │ │
│ │ │ Yes No │ │
│ │ │ │ │ │ │
│ │ ▼ ▼ ▼ │ │
│ │ [STT Engine] │ (silence) │ │
│ │ │ │ │ │
│ └────────────────────────────────────────┼─────────┼──────────────────┘ │
│ │ │ │
│ ┌────────────────────────────────────────▼─────────▼──────────────────┐ │
│ │ PROCESSING STAGE │ │
│ │ │ │
│ │ Partial Final Endpointing │ │
│ │ Transcript ───▶ Transcript ───────▶ Detected? │ │
│ │ │ │ ┌────┴────┐ │ │
│ │ │ │ Yes No │ │
│ │ │ ▼ │ │ │ │
│ │ │ [Context Assembly] │ (continue │ │
│ │ │ │ │ buffering) │ │
│ │ ▼ ▼ ▼ │ │
│ │ (display) [LLM Processing] ◀────┘ │ │
│ │ │ │ │
│ │ │ streaming tokens │ │
│ │ ▼ │ │
│ │ [Response Router] │ │
│ │ ┌────┴────┐ │ │
│ │ Speech Tool Call │ │
│ │ │ │ │ │
│ └─────────────────┼────────────┼──────────────────────────────────────┘ │
│ │ │ │
│ ┌─────────────────▼────────────▼──────────────────────────────────────┐ │
│ │ OUTPUT STAGE │ │
│ │ │ │
│ │ [Sentence Buffer] ──▶ [TTS Engine] ──▶ [Audio Queue] ──▶ LiveKit │ │
│ │ │ │ │ │ │
│ │ │ │ │ │ │
│ │ ▼ ▼ ▼ │ │
│ │ Natural break Audio chunks Playback to │ │
│ │ detection generated caller │ │
│ │ │ │
│ │ │ │
│ │ [Tool Executor] ──▶ [Result Formatter] ──▶ (back to LLM) │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ INTERRUPTION HANDLING │ │
│ │ │ │
│ │ VAD during playback ──▶ [Interrupt Detector] ──▶ Stop TTS │ │
│ │ │ │ │ │
│ │ ▼ ▼ │ │
│ │ Threshold met? Clear queue │ │
│ │ │ │ │ │
│ │ ▼ ▼ │ │
│ │ Process new input Resume listening │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
2.2 Component Summary \
| Component | Technology | Location | Purpose |
|---|---|---|---|
| Audio Transport | LiveKit | Cloud | Real-time audio streaming |
| VAD | Silero VAD | Agent Service | Detect speech activity |
| STT | Deepgram Nova-2 | External API | Transcribe speech |
| LLM | Claude Sonnet | External API | Generate responses |
| TTS | Chatterbox | RunPod GPU | Synthesize speech |
| State Manager | Redis | Platform | Track conversation state |
| Tool Executor | n8n / Internal | Platform | Execute function calls |
2.3 Data Flow Summary \
┌─────────────────────────────────────────────────────────────────────────────┐
│ DATA FLOW SUMMARY │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ STAGE 1: Audio Capture │
│ ─────────────────────── │
│ Format: PCM 16-bit, 16kHz mono (STT) / 48kHz stereo (transport) │
│ Chunk size: 20ms frames (320 samples at 16kHz) │
│ Buffer: 100ms sliding window for VAD │
│ │
│ STAGE 2: Speech Detection │
│ ───────────────────────── │
│ VAD checks each 20ms frame │
│ Speech threshold: 0.5 probability │
│ Minimum speech duration: 250ms │
│ Silence padding: 300ms before endpoint │
│ │
│ STAGE 3: Transcription │
│ ───────────────────────── │
│ Streaming WebSocket to Deepgram │
│ Interim results every ~100ms │
│ Final transcript on endpoint │
│ Output: JSON with text, confidence, timestamps │
│ │
│ STAGE 4: Context Assembly │
│ ───────────────────────── │
│ Retrieve conversation history (last 10 turns) │
│ Add system prompt + knowledge base context │
│ Include tool definitions │
│ Format for Claude API │
│ │
│ STAGE 5: LLM Processing │
│ ────────────────────── │
│ Streaming API call to Claude │
│ Tokens arrive in ~50ms chunks │
│ Parse for tool calls vs speech │
│ Output: Text stream or tool call JSON │
│ │
│ STAGE 6: Speech Synthesis │
│ ──────────────────────── │
│ Buffer text until sentence boundary │
│ Send to Chatterbox TTS │
│ Stream audio chunks back │
│ Output: PCM audio at 24kHz │
│ │
│ STAGE 7: Audio Playback │
│ ─────────────────────── │
│ Resample to 48kHz if needed │
│ Queue chunks for smooth playback │
│ Publish to LiveKit track │
│ Monitor for interruptions │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
2.4 Pipeline States \
┌─────────────────────────────────────────────────────────────────────────────┐
│ PIPELINE STATE MACHINE │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────┐ │
│ │ IDLE │ │
│ └────┬────┘ │
│ │ │
│ voice detected │
│ │ │
│ ▼ │
│ ┌───────────────┐ │
│ │ LISTENING │◀──────────────┐ │
│ └───────┬───────┘ │ │
│ │ │ │
│ speech ended │ │
│ │ │ │
│ ▼ │ │
│ ┌───────────────┐ │ │
│ │ PROCESSING │ │ │
│ └───────┬───────┘ │ │
│ │ │ │
│ ┌─────────────┼─────────────┐ │ │
│ │ │ │ │ │
│ tool call response error │ │
│ │ ready │ │ │
│ ▼ │ │ │ │
│ ┌───────────────┐ │ │ │ │
│ │EXECUTING_TOOL │ │ │ │ │
│ └───────┬───────┘ │ │ │ │
│ │ │ │ │ │
│ result ready │ │ │ │
│ │ │ │ │ │
│ └──────┬──────┘ │ │ │
│ │ │ │ │
│ ▼ │ │ │
│ ┌───────────────┐ │ │ │
│ │ SPEAKING │─────────┼─────────────┘ │
│ └───────┬───────┘ │ (on completion │
│ │ │ or interruption) │
│ interrupted │ │
│ │ │ │
│ ▼ │ │
│ ┌─────────────────┐ │ │
│ │ INTERRUPTED │────────┘ │
│ └─────────────────┘ (resume listening) │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
- Latency Budget \
3.1 Target Latency \
Human conversation has natural response gaps. Studies show:- 200-300ms: Feels instantaneous, slightly unnatural
- 500-700ms: Natural conversation pace
- 800-1000ms: Acceptable, feels thoughtful
- >1200ms: Noticeably slow, awkward
3.2 Latency Budget Breakdown \
┌─────────────────────────────────────────────────────────────────────────────┐
│ LATENCY BUDGET BREAKDOWN │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ COMPONENT TARGET P50 P95 P99 │
│ ───────────────────────────────────────────────────────────────────── │
│ │
│ 1. Endpointing delay 200ms 150ms 250ms 350ms │
│ (silence detection) │
│ │
│ 2. STT finalization 100ms 80ms 150ms 200ms │
│ (final transcript) │
│ │
│ 3. Context assembly 20ms 15ms 30ms 50ms │
│ (build prompt) │
│ │
│ 4. Network to LLM 30ms 20ms 50ms 80ms │
│ (API request) │
│ │
│ 5. LLM TTFB 200ms 150ms 300ms 500ms │
│ (first token) │
│ │
│ 6. Sentence accumulation 100ms 80ms 150ms 200ms │
│ (buffer until boundary) │
│ │
│ 7. Network to TTS 20ms 15ms 30ms 50ms │
│ (API request) │
│ │
│ 8. TTS TTFB 150ms 100ms 200ms 300ms │
│ (first audio chunk) │
│ │
│ 9. Audio transport 30ms 20ms 40ms 60ms │
│ (to caller) │
│ ───────────────────────────────────────────────────────────────────── │
│ │
│ TOTAL TTFB 850ms 630ms 1200ms 1790ms │
│ │
│ Note: Components 1-5 are on the critical path to TTFB │
│ Components 6-9 add to perceived response time │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
3.3 Latency Optimization Strategies \
| Strategy | Impact | Implementation |
|---|---|---|
| Streaming STT | -200ms | Use interim results for early processing |
| Streaming LLM | -300ms | Start TTS before complete response |
| Sentence-level TTS | -200ms | Don’t wait for full response |
| Connection pooling | -50ms | Reuse HTTP/WebSocket connections |
| Edge deployment | -30ms | Deploy close to users |
| Aggressive endpointing | -100ms | Shorter silence threshold |
| Speculative execution | -150ms | Start processing on interim transcript |
3.4 Latency Monitoring Points \
# Pipeline latency instrumentation points
LATENCY_CHECKPOINTS = {
"vad_speech_start": "Voice activity detected",
"vad_speech_end": "Voice activity ended (endpoint)",
"stt_interim_first": "First interim transcript",
"stt_final": "Final transcript received",
"context_assembled": "Prompt built",
"llm_request_sent": "LLM API request sent",
"llm_first_token": "First LLM token received",
"llm_sentence_complete": "First sentence complete",
"tts_request_sent": "TTS API request sent",
"tts_first_chunk": "First audio chunk received",
"audio_playback_start": "Audio playback began",
}
class LatencyTracker:
"""Track latency through the pipeline."""
def __init__(self, call_id: str, turn_id: str):
self.call_id = call_id
self.turn_id = turn_id
self.checkpoints: dict[str, float] = {}
self.start_time: Optional[float] = None
def mark(self, checkpoint: str) -> None:
"""Record a checkpoint timestamp."""
now = time.perf_counter()
if self.start_time is None:
self.start_time = now
self.checkpoints[checkpoint] = now
# Log checkpoint
elapsed = (now - self.start_time) * 1000
logger.debug(
f"[{self.call_id}:{self.turn_id}] "
f"{checkpoint}: {elapsed:.1f}ms"
)
def get_metrics(self) -> dict:
"""Calculate latency metrics."""
if not self.checkpoints:
return {}
metrics = {}
# Calculate durations between checkpoints
if "vad_speech_end" in self.checkpoints and "stt_final" in self.checkpoints:
metrics["stt_latency"] = (
self.checkpoints["stt_final"] -
self.checkpoints["vad_speech_end"]
) * 1000
if "stt_final" in self.checkpoints and "llm_first_token" in self.checkpoints:
metrics["llm_ttfb"] = (
self.checkpoints["llm_first_token"] -
self.checkpoints["stt_final"]
) * 1000
if "llm_sentence_complete" in self.checkpoints and "tts_first_chunk" in self.checkpoints:
metrics["tts_ttfb"] = (
self.checkpoints["tts_first_chunk"] -
self.checkpoints["llm_sentence_complete"]
) * 1000
# End-to-end latency
if "vad_speech_end" in self.checkpoints and "audio_playback_start" in self.checkpoints:
metrics["e2e_latency"] = (
self.checkpoints["audio_playback_start"] -
self.checkpoints["vad_speech_end"]
) * 1000
return metrics
def emit_metrics(self) -> None:
"""Emit metrics to monitoring system."""
metrics = self.get_metrics()
for name, value in metrics.items():
prometheus_histogram(
f"voice_pipeline_{name}_ms",
value,
labels={
"call_id": self.call_id,
}
)
3.5 Latency Alerts \
| Metric | Warning | Critical |
|---|---|---|
| E2E Latency P50 | > 800ms | > 1200ms |
| E2E Latency P95 | > 1200ms | > 2000ms |
| LLM TTFB P50 | > 300ms | > 500ms |
| TTS TTFB P50 | > 200ms | > 400ms |
| STT Latency P50 | > 150ms | > 300ms |
- Voice Activity Detection \
4.1 VAD Overview \
Voice Activity Detection is the first processing stage, determining when the caller is speaking. Good VAD is critical for:- Knowing when to start transcription
- Detecting end of utterance (endpointing)
- Detecting interruptions during AI speech
4.2 Silero VAD Integration \
We use Silero VAD, a lightweight neural network model optimized for real-time processing.# pipeline/vad/silero_vad.py
import torch
import numpy as np
from typing import Optional
from dataclasses import dataclass
@dataclass
class VADConfig:
"""Configuration for Silero VAD."""
threshold: float = 0.5 # Speech probability threshold
min_speech_ms: int = 250 # Minimum speech duration
min_silence_ms: int = 300 # Silence before endpoint
window_size_ms: int = 100 # Analysis window
sample_rate: int = 16000 # Expected sample rate
class SileroVAD:
"""
Silero VAD wrapper for real-time voice activity detection.
Processes audio in 20ms frames and maintains state for
speech detection and endpointing.
"""
def __init__(self, config: VADConfig = None):
self.config = config or VADConfig()
# Load Silero VAD model
self.model, self.utils = torch.hub.load(
repo_or_dir='snakers4/silero-vad',
model='silero_vad',
force_reload=False
)
# State tracking
self._is_speaking = False
self._speech_start_time: Optional[float] = None
self._silence_start_time: Optional[float] = None
self._speech_frames: int = 0
self._silence_frames: int = 0
# Frame timing
self._frame_duration_ms = 20 # Silero expects 20ms frames
self._samples_per_frame = int(
self.config.sample_rate * self._frame_duration_ms / 1000
)
def reset(self) -> None:
"""Reset VAD state for new utterance."""
self.model.reset_states()
self._is_speaking = False
self._speech_start_time = None
self._silence_start_time = None
self._speech_frames = 0
self._silence_frames = 0
def process_frame(
self,
audio_frame: np.ndarray,
timestamp: float
) -> dict:
"""
Process a single audio frame.
Args:
audio_frame: PCM audio samples (16-bit, 16kHz)
timestamp: Frame timestamp in seconds
Returns:
Dict with speech detection results
"""
# Ensure correct frame size
if len(audio_frame) != self._samples_per_frame:
raise ValueError(
f"Expected {self._samples_per_frame} samples, "
f"got {len(audio_frame)}"
)
# Convert to float32 tensor
audio_tensor = torch.from_numpy(audio_frame).float()
audio_tensor = audio_tensor / 32768.0 # Normalize 16-bit
# Get speech probability
speech_prob = self.model(
audio_tensor,
self.config.sample_rate
).item()
# Determine if this frame contains speech
is_speech = speech_prob >= self.config.threshold
# Update state machine
result = self._update_state(is_speech, timestamp)
result["speech_probability"] = speech_prob
result["is_speech_frame"] = is_speech
return result
def _update_state(
self,
is_speech: bool,
timestamp: float
) -> dict:
"""Update VAD state machine."""
result = {
"event": None,
"is_speaking": self._is_speaking,
}
min_speech_frames = self.config.min_speech_ms // self._frame_duration_ms
min_silence_frames = self.config.min_silence_ms // self._frame_duration_ms
if is_speech:
self._silence_frames = 0
self._speech_frames += 1
# Speech start detection
if not self._is_speaking and self._speech_frames >= min_speech_frames:
self._is_speaking = True
self._speech_start_time = timestamp - (
self._speech_frames * self._frame_duration_ms / 1000
)
result["event"] = "speech_start"
result["speech_start_time"] = self._speech_start_time
else:
self._speech_frames = 0
if self._is_speaking:
self._silence_frames += 1
# Endpoint detection
if self._silence_frames >= min_silence_frames:
self._is_speaking = False
speech_duration = timestamp - self._speech_start_time
result["event"] = "speech_end"
result["speech_duration"] = speech_duration
self._speech_start_time = None
result["is_speaking"] = self._is_speaking
return result
@property
def is_speaking(self) -> bool:
"""Current speech state."""
return self._is_speaking
class VADProcessor:
"""
High-level VAD processor with buffering and event emission.
"""
def __init__(
self,
config: VADConfig = None,
on_speech_start: callable = None,
on_speech_end: callable = None
):
self.vad = SileroVAD(config)
self.on_speech_start = on_speech_start
self.on_speech_end = on_speech_end
self._audio_buffer = []
self._frame_size = self.vad._samples_per_frame
async def process_audio(
self,
audio_chunk: bytes,
timestamp: float
) -> None:
"""
Process incoming audio chunk.
Audio may arrive in varying chunk sizes; this method
buffers and processes in correct frame sizes.
"""
# Convert bytes to numpy array
samples = np.frombuffer(audio_chunk, dtype=np.int16)
# Add to buffer
self._audio_buffer.extend(samples)
# Process complete frames
while len(self._audio_buffer) >= self._frame_size:
frame = np.array(self._audio_buffer[:self._frame_size])
self._audio_buffer = self._audio_buffer[self._frame_size:]
result = self.vad.process_frame(frame, timestamp)
# Emit events
if result["event"] == "speech_start" and self.on_speech_start:
await self.on_speech_start(result)
elif result["event"] == "speech_end" and self.on_speech_end:
await self.on_speech_end(result)
4.3 Endpointing Strategies \
Endpointing determines when a speaker has finished their turn. Multiple strategies can be combined:# pipeline/vad/endpointing.py
from enum import Enum
from dataclasses import dataclass
from typing import Optional
import re
class EndpointReason(Enum):
SILENCE = "silence" # Silence duration exceeded
PUNCTUATION = "punctuation" # Sentence-ending punctuation detected
TURN_TAKING = "turn_taking" # Linguistic turn-taking cue
TIMEOUT = "timeout" # Maximum utterance duration
FORCED = "forced" # Manually forced endpoint
@dataclass
class EndpointConfig:
"""Configuration for endpoint detection."""
silence_threshold_ms: int = 300 # Silence before endpoint
min_utterance_ms: int = 500 # Minimum utterance length
max_utterance_ms: int = 30000 # Maximum utterance length
punctuation_endpoint: bool = True # Use punctuation as endpoint hint
aggressive_mode: bool = False # Faster endpoints for quick responses
class EndpointDetector:
"""
Multi-strategy endpoint detection.
Combines VAD silence detection with linguistic cues
from interim transcripts.
"""
def __init__(self, config: EndpointConfig = None):
self.config = config or EndpointConfig()
# Sentence-ending patterns
self._sentence_end_pattern = re.compile(
r'[.!?][\s]*$'
)
# Turn-taking cue patterns (questions, trailing off)
self._turn_taking_patterns = [
re.compile(r'\?[\s]*$'), # Questions
re.compile(r'(?:right|okay|yeah|yes|no)[.?]?[\s]*$', re.I),
re.compile(r'\.{3,}[\s]*$'), # Ellipsis
]
self._utterance_start: Optional[float] = None
self._last_speech_time: Optional[float] = None
self._current_transcript: str = ""
def start_utterance(self, timestamp: float) -> None:
"""Mark the start of a new utterance."""
self._utterance_start = timestamp
self._last_speech_time = timestamp
self._current_transcript = ""
def update_transcript(self, transcript: str) -> None:
"""Update with latest transcript."""
self._current_transcript = transcript
def update_speech(self, timestamp: float) -> None:
"""Update last speech time."""
self._last_speech_time = timestamp
def check_endpoint(
self,
timestamp: float,
vad_is_speaking: bool
) -> Optional[EndpointReason]:
"""
Check if we should endpoint now.
Returns EndpointReason if endpoint detected, None otherwise.
"""
if self._utterance_start is None:
return None
utterance_duration = (timestamp - self._utterance_start) * 1000
silence_duration = (timestamp - self._last_speech_time) * 1000
# Check maximum duration
if utterance_duration >= self.config.max_utterance_ms:
return EndpointReason.TIMEOUT
# Check minimum duration
if utterance_duration < self.config.min_utterance_ms:
return None
# VAD-based silence endpoint
if not vad_is_speaking:
threshold = self.config.silence_threshold_ms
# Reduce threshold if we have a complete sentence
if self.config.punctuation_endpoint:
if self._sentence_end_pattern.search(self._current_transcript):
threshold = threshold * 0.7 # 30% faster endpoint
# Reduce threshold in aggressive mode
if self.config.aggressive_mode:
threshold = threshold * 0.6
if silence_duration >= threshold:
return EndpointReason.SILENCE
# Linguistic turn-taking cues (even during speech)
if self._current_transcript:
for pattern in self._turn_taking_patterns:
if pattern.search(self._current_transcript):
# Still need some silence
if silence_duration >= self.config.silence_threshold_ms * 0.5:
return EndpointReason.TURN_TAKING
return None
def force_endpoint(self) -> EndpointReason:
"""Force an immediate endpoint."""
return EndpointReason.FORCED
def reset(self) -> None:
"""Reset state for next utterance."""
self._utterance_start = None
self._last_speech_time = None
self._current_transcript = ""
4.4 VAD Configuration by Use Case \
| Use Case | Silence Threshold | Min Speech | Aggressive |
|---|---|---|---|
| Customer Service | 400ms | 300ms | No |
| Quick Q&A | 250ms | 200ms | Yes |
| Appointment Booking | 350ms | 250ms | No |
| Technical Support | 500ms | 400ms | No |
| Survey/IVR | 300ms | 200ms | Yes |
- Speech-to-Text Integration \
5.1 Deepgram Nova-2 \
We use Deepgram Nova-2 for streaming speech-to-text:- Accuracy: 95%+ word accuracy
- Latency: ~100ms interim, ~200ms final
- Features: Streaming, punctuation, word timestamps
- Languages: 36+ languages supported
5.2 Deepgram Client \
# pipeline/stt/deepgram_client.py
import asyncio
import json
from typing import AsyncIterator, Optional, Callable
from dataclasses import dataclass
import websockets
@dataclass
class DeepgramConfig:
"""Configuration for Deepgram STT."""
api_key: str
model: str = "nova-2"
language: str = "en-US"
punctuate: bool = True
interim_results: bool = True
utterance_end_ms: int = 1000
vad_events: bool = True
smart_format: bool = True
diarize: bool = False
sample_rate: int = 16000
encoding: str = "linear16"
channels: int = 1
@dataclass
class TranscriptResult:
"""Transcription result from Deepgram."""
text: str
is_final: bool
confidence: float
words: list
speech_final: bool
start_time: float
end_time: float
class DeepgramSTT:
"""
Deepgram streaming STT client.
Maintains a WebSocket connection for real-time transcription
with interim and final results.
"""
WS_URL = "wss://api.deepgram.com/v1/listen"
def __init__(
self,
config: DeepgramConfig,
on_transcript: Callable[[TranscriptResult], None] = None,
on_utterance_end: Callable[[], None] = None
):
self.config = config
self.on_transcript = on_transcript
self.on_utterance_end = on_utterance_end
self._ws: Optional[websockets.WebSocketClientProtocol] = None
self._running = False
self._receive_task: Optional[asyncio.Task] = None
def _build_url(self) -> str:
"""Build WebSocket URL with query parameters."""
params = {
"model": self.config.model,
"language": self.config.language,
"punctuate": str(self.config.punctuate).lower(),
"interim_results": str(self.config.interim_results).lower(),
"utterance_end_ms": str(self.config.utterance_end_ms),
"vad_events": str(self.config.vad_events).lower(),
"smart_format": str(self.config.smart_format).lower(),
"diarize": str(self.config.diarize).lower(),
"sample_rate": str(self.config.sample_rate),
"encoding": self.config.encoding,
"channels": str(self.config.channels),
}
query_string = "&".join(f"{k}={v}" for k, v in params.items())
return f"{self.WS_URL}?{query_string}"
async def connect(self) -> None:
"""Establish WebSocket connection to Deepgram."""
url = self._build_url()
self._ws = await websockets.connect(
url,
extra_headers={
"Authorization": f"Token {self.config.api_key}",
},
ping_interval=20,
ping_timeout=10,
)
self._running = True
self._receive_task = asyncio.create_task(self._receive_loop())
logger.info("Deepgram STT connected")
async def disconnect(self) -> None:
"""Close the WebSocket connection."""
self._running = False
if self._ws:
# Send close message
try:
await self._ws.send(json.dumps({"type": "CloseStream"}))
except Exception:
pass
await self._ws.close()
self._ws = None
if self._receive_task:
self._receive_task.cancel()
try:
await self._receive_task
except asyncio.CancelledError:
pass
logger.info("Deepgram STT disconnected")
async def send_audio(self, audio_chunk: bytes) -> None:
"""
Send audio data to Deepgram.
Args:
audio_chunk: PCM audio bytes (16-bit, 16kHz)
"""
if self._ws and self._running:
try:
await self._ws.send(audio_chunk)
except websockets.ConnectionClosed:
logger.warning("Deepgram connection closed while sending")
async def _receive_loop(self) -> None:
"""Receive and process transcription results."""
while self._running and self._ws:
try:
message = await self._ws.recv()
data = json.loads(message)
await self._handle_message(data)
except websockets.ConnectionClosed:
logger.warning("Deepgram connection closed")
break
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON from Deepgram: {e}")
except Exception as e:
logger.error(f"Error in Deepgram receive loop: {e}")
async def _handle_message(self, data: dict) -> None:
"""Handle a message from Deepgram."""
msg_type = data.get("type")
if msg_type == "Results":
await self._handle_results(data)
elif msg_type == "UtteranceEnd":
if self.on_utterance_end:
await self.on_utterance_end()
elif msg_type == "SpeechStarted":
logger.debug("Deepgram: Speech started")
elif msg_type == "Metadata":
logger.debug(f"Deepgram metadata: {data}")
elif msg_type == "Error":
logger.error(f"Deepgram error: {data}")
async def _handle_results(self, data: dict) -> None:
"""Handle transcription results."""
channel = data.get("channel", {})
alternatives = channel.get("alternatives", [])
if not alternatives:
return
alt = alternatives[0]
transcript = alt.get("transcript", "")
if not transcript:
return
result = TranscriptResult(
text=transcript,
is_final=data.get("is_final", False),
confidence=alt.get("confidence", 0.0),
words=alt.get("words", []),
speech_final=data.get("speech_final", False),
start_time=data.get("start", 0.0),
end_time=data.get("duration", 0.0) + data.get("start", 0.0),
)
if self.on_transcript:
await self.on_transcript(result)
class STTManager:
"""
High-level STT manager with connection lifecycle.
"""
def __init__(self, config: DeepgramConfig):
self.config = config
self._client: Optional[DeepgramSTT] = None
self._transcript_buffer: str = ""
self._final_transcript: str = ""
async def start_session(
self,
on_interim: Callable[[str], None] = None,
on_final: Callable[[str], None] = None
) -> None:
"""Start a new transcription session."""
async def handle_transcript(result: TranscriptResult):
if result.is_final:
self._final_transcript += result.text + " "
self._transcript_buffer = ""
if on_final:
await on_final(self._final_transcript.strip())
else:
self._transcript_buffer = result.text
if on_interim:
full_transcript = self._final_transcript + result.text
await on_interim(full_transcript.strip())
self._client = DeepgramSTT(
config=self.config,
on_transcript=handle_transcript,
)
await self._client.connect()
async def send_audio(self, audio_chunk: bytes) -> None:
"""Send audio to the STT engine."""
if self._client:
await self._client.send_audio(audio_chunk)
async def end_session(self) -> str:
"""End the session and return final transcript."""
if self._client:
await self._client.disconnect()
self._client = None
final = self._final_transcript.strip()
self._final_transcript = ""
self._transcript_buffer = ""
return final
def get_current_transcript(self) -> str:
"""Get the current transcript including buffer."""
return (self._final_transcript + self._transcript_buffer).strip()
5.3 Transcript Processing \
# pipeline/stt/transcript_processor.py
import re
from dataclasses import dataclass
from typing import Optional
@dataclass
class ProcessedTranscript:
"""Processed and normalized transcript."""
raw_text: str
normalized_text: str
detected_intent: Optional[str]
contains_question: bool
word_count: int
class TranscriptProcessor:
"""
Post-process transcripts for LLM consumption.
"""
def __init__(self):
# Common speech recognition errors to fix
self._corrections = {
r'\bum+\b': '',
r'\buh+\b': '',
r'\blike,?\s+': '',
r'\byou know,?\s+': '',
r'\bI mean,?\s+': '',
r'\s+': ' ', # Multiple spaces
}
# Intent patterns
self._intent_patterns = {
"greeting": r'^(?:hi|hello|hey|good\s+(?:morning|afternoon|evening))',
"goodbye": r'(?:bye|goodbye|see you|talk later|have a good)',
"help": r'(?:help|assist|support|problem|issue)',
"transfer": r'(?:speak to|talk to|transfer|human|agent|person)',
"appointment": r'(?:schedule|book|appointment|meeting|calendar)',
"cancel": r'(?:cancel|nevermind|forget it|stop)',
}
def process(self, transcript: str) -> ProcessedTranscript:
"""
Process a raw transcript.
Args:
transcript: Raw transcript from STT
Returns:
Processed transcript with metadata
"""
# Normalize
normalized = transcript.strip()
for pattern, replacement in self._corrections.items():
normalized = re.sub(pattern, replacement, normalized, flags=re.I)
normalized = normalized.strip()
# Detect intent
detected_intent = None
for intent, pattern in self._intent_patterns.items():
if re.search(pattern, normalized, re.I):
detected_intent = intent
break
# Check for question
contains_question = bool(re.search(r'\?|^(?:what|where|when|who|why|how|is|are|do|does|can|could|would|will)\b', normalized, re.I))
return ProcessedTranscript(
raw_text=transcript,
normalized_text=normalized,
detected_intent=detected_intent,
contains_question=contains_question,
word_count=len(normalized.split()),
)
5.4 STT Fallback Strategy \
# pipeline/stt/fallback.py
class STTWithFallback:
"""
STT with automatic fallback to backup provider.
"""
def __init__(
self,
primary: DeepgramSTT,
fallback: "WhisperSTT", # Local Whisper as fallback
):
self.primary = primary
self.fallback = fallback
self._using_fallback = False
self._primary_failures = 0
self._max_failures = 3
async def transcribe(self, audio: bytes) -> str:
"""Transcribe with automatic fallback."""
if self._using_fallback:
return await self._fallback_transcribe(audio)
try:
result = await self.primary.transcribe(audio)
self._primary_failures = 0
return result
except Exception as e:
logger.warning(f"Primary STT failed: {e}")
self._primary_failures += 1
if self._primary_failures >= self._max_failures:
logger.warning("Switching to fallback STT")
self._using_fallback = True
return await self._fallback_transcribe(audio)
async def _fallback_transcribe(self, audio: bytes) -> str:
"""Use fallback transcription."""
return await self.fallback.transcribe(audio)
async def reset_to_primary(self) -> None:
"""Attempt to switch back to primary."""
self._using_fallback = False
self._primary_failures = 0
- Context Assembly \
6.1 Context Overview \
Before sending to the LLM, we assemble a complete context including:- System prompt with agent personality
- Knowledge base context
- Conversation history
- Tool definitions
- Current user input
6.2 Context Assembly Pipeline \
┌─────────────────────────────────────────────────────────────────────────────┐
│ CONTEXT ASSEMBLY PIPELINE │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Agent │ │ Knowledge │ │ Tool │ │
│ │ Config │ │ Base │ │ Definitions │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ SYSTEM PROMPT │ │
│ │ │ │
│ │ • Agent identity and personality │ │
│ │ • Business context and rules │ │
│ │ • Knowledge base excerpts (RAG) │ │
│ │ • Available tools and when to use them │ │
│ │ • Response guidelines (concise, natural) │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────┐ ┌─────────────────────────────────────────────┐ │
│ │ Conversation │ │ MESSAGE HISTORY │ │
│ │ History │────▶│ │ │
│ │ (Redis) │ │ [user]: "Hi, I'd like to schedule..." │ │
│ └──────────────┘ │ [assistant]: "Of course! What day..." │ │
│ │ [user]: "How about Tuesday?" │ │
│ │ [assistant]: "Tuesday works..." │ │
│ │ │ │
│ └─────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────┐ ┌─────────────────────────────────────────────┐ │
│ │ Current │ │ CURRENT INPUT │ │
│ │ Transcript │────▶│ │ │
│ │ │ │ [user]: "Actually, can we do 3pm?" │ │
│ └──────────────┘ │ │ │
│ └─────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────┐ │
│ │ ASSEMBLED CONTEXT │ │
│ │ │ │
│ │ Ready for LLM API call │ │
│ │ Token count: ~2,000-4,000 │ │
│ │ │ │
│ └─────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
6.3 Context Manager Implementation \
# pipeline/context/context_manager.py
from dataclasses import dataclass
from typing import Optional
import json
@dataclass
class AgentConfig:
"""Agent configuration from database."""
agent_id: str
name: str
personality: str
business_context: str
greeting: str
voice_id: str
tools: list[dict]
knowledge_base_id: Optional[str]
max_turns: int = 50
response_style: str = "concise" # concise, detailed, friendly
@dataclass
class AssembledContext:
"""Fully assembled context ready for LLM."""
system_prompt: str
messages: list[dict]
tools: list[dict]
token_estimate: int
class ContextManager:
"""
Manages conversation context assembly for LLM calls.
"""
MAX_HISTORY_TURNS = 10
MAX_CONTEXT_TOKENS = 8000 # Leave room for response
def __init__(
self,
redis_client,
knowledge_base_service,
):
self.redis = redis_client
self.kb_service = knowledge_base_service
async def assemble_context(
self,
call_id: str,
agent_config: AgentConfig,
current_input: str,
) -> AssembledContext:
"""
Assemble complete context for LLM call.
Args:
call_id: Current call ID
agent_config: Agent configuration
current_input: Current user transcript
Returns:
Assembled context ready for API call
"""
# Build system prompt
system_prompt = await self._build_system_prompt(
agent_config,
current_input,
)
# Get conversation history
history = await self._get_conversation_history(call_id)
# Build messages array
messages = []
for turn in history[-self.MAX_HISTORY_TURNS:]:
messages.append({
"role": turn["role"],
"content": turn["content"],
})
# Add current input
messages.append({
"role": "user",
"content": current_input,
})
# Format tools for Claude
tools = self._format_tools(agent_config.tools)
# Estimate tokens
token_estimate = self._estimate_tokens(
system_prompt,
messages,
tools,
)
# Trim history if needed
while token_estimate > self.MAX_CONTEXT_TOKENS and len(messages) > 2:
messages.pop(0)
token_estimate = self._estimate_tokens(
system_prompt,
messages,
tools,
)
return AssembledContext(
system_prompt=system_prompt,
messages=messages,
tools=tools,
token_estimate=token_estimate,
)
async def _build_system_prompt(
self,
config: AgentConfig,
current_input: str,
) -> str:
"""Build the system prompt with all context."""
# Base system prompt
prompt_parts = [
f"You are {config.name}, an AI voice assistant.",
f"\n\n## Personality\n{config.personality}",
f"\n\n## Business Context\n{config.business_context}",
]
# Add knowledge base context if available
if config.knowledge_base_id:
kb_context = await self._get_knowledge_context(
config.knowledge_base_id,
current_input,
)
if kb_context:
prompt_parts.append(
f"\n\n## Relevant Information\n{kb_context}"
)
# Add response guidelines
style_guidelines = {
"concise": """
## Response Guidelines
- Keep responses brief and conversational (1-2 sentences when possible)
- Speak naturally as if on a phone call
- Avoid lists or formatted text - this is a voice conversation
- Ask one question at a time
- Confirm understanding before taking actions
""",
"detailed": """
## Response Guidelines
- Provide thorough explanations when needed
- Still maintain a conversational tone
- Break complex information into digestible parts
- Offer to elaborate on any points
""",
"friendly": """
## Response Guidelines
- Be warm, personable, and empathetic
- Use casual language appropriate for voice
- Show genuine interest in helping
- Add appropriate conversational acknowledgments
""",
}
prompt_parts.append(
style_guidelines.get(config.response_style, style_guidelines["concise"])
)
# Add tool usage instructions if tools are available
if config.tools:
prompt_parts.append("""
## Tool Usage
- Use tools when you need to perform actions or look up information
- Always confirm with the user before executing consequential actions
- If a tool call fails, explain the issue and offer alternatives
""")
return "\n".join(prompt_parts)
async def _get_knowledge_context(
self,
knowledge_base_id: str,
query: str,
) -> Optional[str]:
"""Retrieve relevant context from knowledge base."""
try:
results = await self.kb_service.search(
knowledge_base_id=knowledge_base_id,
query=query,
top_k=3,
)
if not results:
return None
context_parts = []
for result in results:
context_parts.append(result["content"])
return "\n\n".join(context_parts)
except Exception as e:
logger.warning(f"Knowledge base lookup failed: {e}")
return None
async def _get_conversation_history(
self,
call_id: str,
) -> list[dict]:
"""Get conversation history from Redis."""
history_key = f"call:{call_id}:history"
history_json = await self.redis.get(history_key)
if history_json:
return json.loads(history_json)
return []
async def add_to_history(
self,
call_id: str,
role: str,
content: str,
) -> None:
"""Add a turn to conversation history."""
history_key = f"call:{call_id}:history"
history = await self._get_conversation_history(call_id)
history.append({
"role": role,
"content": content,
"timestamp": time.time(),
})
# Keep last N turns
history = history[-self.MAX_HISTORY_TURNS * 2:]
await self.redis.set(
history_key,
json.dumps(history),
ex=3600, # 1 hour TTL
)
def _format_tools(self, tools: list[dict]) -> list[dict]:
"""Format tools for Claude API."""
formatted = []
for tool in tools:
formatted.append({
"name": tool["name"],
"description": tool["description"],
"input_schema": tool.get("parameters", {}),
})
return formatted
def _estimate_tokens(
self,
system_prompt: str,
messages: list[dict],
tools: list[dict],
) -> int:
"""Rough token estimation (4 chars per token)."""
total_chars = len(system_prompt)
for msg in messages:
total_chars += len(msg.get("content", ""))
for tool in tools:
total_chars += len(json.dumps(tool))
return total_chars // 4
6.4 System Prompt Templates \
# pipeline/context/prompts.py
VOICE_AGENT_BASE_PROMPT = """You are {agent_name}, an AI voice assistant for {company_name}.
## Your Role
{role_description}
## Communication Style
- Speak naturally and conversationally - you're on a phone call
- Keep responses concise (1-3 sentences typically)
- Use contractions and casual language
- Never use markdown, bullet points, or formatting
- Don't say "I'm an AI" unless directly asked
- Acknowledge what the caller says before responding
## Current Context
- Caller phone number: {caller_phone}
- Time: {current_time}
- Previous calls with this number: {call_history_summary}
{additional_context}
"""
TOOL_USAGE_PROMPT = """
## Available Actions
You can perform these actions using the tools provided:
{tool_descriptions}
When using tools:
1. Only use tools when necessary to help the caller
2. Confirm consequential actions before executing
3. If a tool fails, apologize and offer alternatives
4. Don't mention "tools" or "functions" to the caller
"""
ESCALATION_PROMPT = """
## When to Transfer
Transfer to a human agent if:
- The caller explicitly requests to speak with a person
- You cannot resolve their issue after 2-3 attempts
- The situation involves sensitive matters you cannot handle
- The caller becomes frustrated or upset
To transfer, use the transfer_to_agent tool.
"""
- LLM Integration \
7.1 Claude API Integration \
We use Claude Sonnet for response generation via streaming API:# pipeline/llm/claude_client.py
import anthropic
from typing import AsyncIterator, Optional
from dataclasses import dataclass
import json
@dataclass
class LLMConfig:
"""Configuration for Claude LLM."""
api_key: str
model: str = "claude-sonnet-4-20250514"
max_tokens: int = 1024
temperature: float = 0.7
timeout: float = 30.0
@dataclass
class LLMResponse:
"""Streamed response from LLM."""
text: str
is_complete: bool
tool_calls: list[dict]
stop_reason: Optional[str]
usage: Optional[dict]
class ClaudeLLM:
"""
Claude LLM client with streaming support.
Handles streaming responses and tool calling
in voice conversation context.
"""
def __init__(self, config: LLMConfig):
self.config = config
self.client = anthropic.AsyncAnthropic(
api_key=config.api_key,
timeout=config.timeout,
)
async def generate_stream(
self,
system_prompt: str,
messages: list[dict],
tools: list[dict] = None,
) -> AsyncIterator[LLMResponse]:
"""
Generate a streaming response.
Args:
system_prompt: System prompt with context
messages: Conversation history + current input
tools: Available tools for the agent
Yields:
LLMResponse objects with incremental text
"""
request_params = {
"model": self.config.model,
"max_tokens": self.config.max_tokens,
"temperature": self.config.temperature,
"system": system_prompt,
"messages": messages,
}
if tools:
request_params["tools"] = tools
accumulated_text = ""
tool_calls = []
async with self.client.messages.stream(**request_params) as stream:
async for event in stream:
if event.type == "content_block_delta":
if hasattr(event.delta, "text"):
accumulated_text += event.delta.text
yield LLMResponse(
text=accumulated_text,
is_complete=False,
tool_calls=[],
stop_reason=None,
usage=None,
)
elif hasattr(event.delta, "partial_json"):
# Tool call being streamed
pass
elif event.type == "content_block_stop":
pass
elif event.type == "message_delta":
stop_reason = event.delta.stop_reason
usage = {
"output_tokens": event.usage.output_tokens,
}
# Get final message for tool calls
final_message = await stream.get_final_message()
for block in final_message.content:
if block.type == "tool_use":
tool_calls.append({
"id": block.id,
"name": block.name,
"input": block.input,
})
yield LLMResponse(
text=accumulated_text,
is_complete=True,
tool_calls=tool_calls,
stop_reason=final_message.stop_reason,
usage={
"input_tokens": final_message.usage.input_tokens,
"output_tokens": final_message.usage.output_tokens,
},
)
async def generate_with_tools(
self,
system_prompt: str,
messages: list[dict],
tools: list[dict],
tool_results: list[dict] = None,
) -> AsyncIterator[LLMResponse]:
"""
Generate response with tool calling support.
If tool_results are provided, continues conversation
after tool execution.
"""
# If we have tool results, add them to messages
if tool_results:
for result in tool_results:
messages.append({
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": result["tool_use_id"],
"content": result["content"],
}
],
})
async for response in self.generate_stream(
system_prompt=system_prompt,
messages=messages,
tools=tools,
):
yield response
class LLMResponseProcessor:
"""
Process streaming LLM responses for voice output.
Buffers text until natural speech boundaries
and handles tool calls appropriately.
"""
def __init__(self):
self._buffer = ""
self._sentence_endings = re.compile(r'[.!?]\s+')
self._clause_endings = re.compile(r'[,;:]\s+')
async def process_stream(
self,
response_stream: AsyncIterator[LLMResponse],
on_sentence: callable,
on_tool_call: callable,
) -> None:
"""
Process a streaming response.
Args:
response_stream: Stream of LLM responses
on_sentence: Called with each complete sentence
on_tool_call: Called when tool calls are detected
"""
full_text = ""
last_sent_position = 0
async for response in response_stream:
full_text = response.text
# Find sentence boundaries in new text
new_text = full_text[last_sent_position:]
# Look for sentence endings
matches = list(self._sentence_endings.finditer(new_text))
for match in matches:
sentence_end = last_sent_position + match.end()
sentence = full_text[last_sent_position:sentence_end].strip()
if sentence:
await on_sentence(sentence)
last_sent_position = sentence_end
# Handle tool calls when response is complete
if response.is_complete:
# Send any remaining text
remaining = full_text[last_sent_position:].strip()
if remaining:
await on_sentence(remaining)
# Process tool calls
for tool_call in response.tool_calls:
await on_tool_call(tool_call)
def extract_speakable_chunk(
self,
text: str,
min_length: int = 10,
) -> tuple[str, str]:
"""
Extract a chunk suitable for TTS.
Returns:
Tuple of (chunk_to_speak, remaining_text)
"""
if len(text) < min_length:
return "", text
# Try to find sentence boundary
match = self._sentence_endings.search(text)
if match:
return text[:match.end()].strip(), text[match.end():]
# Try clause boundary if text is long enough
if len(text) > 50:
match = self._clause_endings.search(text)
if match and match.start() > min_length:
return text[:match.end()].strip(), text[match.end():]
return "", text
7.2 Response Routing \
Determine whether LLM output should be spoken or is a tool call:# pipeline/llm/response_router.py
from enum import Enum
from dataclasses import dataclass
from typing import Union
class ResponseType(Enum):
SPEECH = "speech"
TOOL_CALL = "tool_call"
MIXED = "mixed" # Contains both speech and tool calls
@dataclass
class RoutedResponse:
"""Response after routing decision."""
response_type: ResponseType
speech_text: Optional[str]
tool_calls: list[dict]
should_wait_for_tool: bool
class ResponseRouter:
"""
Route LLM responses to appropriate handlers.
"""
def route(self, response: LLMResponse) -> RoutedResponse:
"""
Determine how to handle the response.
Args:
response: Complete LLM response
Returns:
Routing decision with extracted components
"""
has_text = bool(response.text.strip())
has_tools = bool(response.tool_calls)
if has_tools and not has_text:
return RoutedResponse(
response_type=ResponseType.TOOL_CALL,
speech_text=None,
tool_calls=response.tool_calls,
should_wait_for_tool=True,
)
elif has_text and not has_tools:
return RoutedResponse(
response_type=ResponseType.SPEECH,
speech_text=response.text,
tool_calls=[],
should_wait_for_tool=False,
)
elif has_text and has_tools:
# Text before tool call - speak first, then execute
return RoutedResponse(
response_type=ResponseType.MIXED,
speech_text=response.text,
tool_calls=response.tool_calls,
should_wait_for_tool=True,
)
else:
# Empty response
return RoutedResponse(
response_type=ResponseType.SPEECH,
speech_text="I'm sorry, I didn't catch that. Could you please repeat?",
tool_calls=[],
should_wait_for_tool=False,
)
7.3 LLM Fallback Strategy \
# pipeline/llm/fallback.py
class LLMWithFallback:
"""
LLM client with automatic fallback.
Falls back to Haiku for faster responses if Sonnet is slow,
or to cached responses if API is unavailable.
"""
def __init__(
self,
primary: ClaudeLLM, # Sonnet
fast_fallback: ClaudeLLM, # Haiku
cache: ResponseCache,
):
self.primary = primary
self.fast_fallback = fast_fallback
self.cache = cache
self._primary_timeout = 5.0 # Switch to fallback if TTFB > 5s
self._failure_count = 0
self._max_failures = 3
async def generate_stream(
self,
system_prompt: str,
messages: list[dict],
tools: list[dict] = None,
) -> AsyncIterator[LLMResponse]:
"""Generate with automatic fallback."""
# Check if we should use fallback due to recent failures
if self._failure_count >= self._max_failures:
async for response in self._use_fast_fallback(
system_prompt, messages, tools
):
yield response
return
# Try primary with timeout monitoring
try:
first_token_received = False
start_time = time.time()
async for response in self.primary.generate_stream(
system_prompt, messages, tools
):
if not first_token_received:
first_token_received = True
ttfb = time.time() - start_time
# If TTFB is too slow, switch to fallback for next request
if ttfb > self._primary_timeout:
logger.warning(f"Primary LLM slow ({ttfb:.1f}s), consider fallback")
yield response
# Success - reset failure count
self._failure_count = 0
except Exception as e:
logger.error(f"Primary LLM failed: {e}")
self._failure_count += 1
# Try fast fallback
async for response in self._use_fast_fallback(
system_prompt, messages, tools
):
yield response
async def _use_fast_fallback(
self,
system_prompt: str,
messages: list[dict],
tools: list[dict] = None,
) -> AsyncIterator[LLMResponse]:
"""Use the fast fallback model."""
try:
async for response in self.fast_fallback.generate_stream(
system_prompt, messages, tools
):
yield response
except Exception as e:
logger.error(f"Fallback LLM also failed: {e}")
# Last resort - cached response
cached = await self.cache.get_fallback_response(messages)
if cached:
yield LLMResponse(
text=cached,
is_complete=True,
tool_calls=[],
stop_reason="cache",
usage=None,
)
else:
yield LLMResponse(
text="I apologize, I'm having technical difficulties. Please try again in a moment.",
is_complete=True,
tool_calls=[],
stop_reason="error",
usage=None,
)
- Text-to-Speech Integration \
8.1 Chatterbox TTS \
We use Chatterbox TTS self-hosted on RunPod for zero per-minute cost:- Quality: Natural, expressive speech
- Latency: ~150ms TTFB
- Streaming: Chunk-based audio output
- Cost: Fixed GPU cost, no per-minute fees
8.2 Chatterbox Client \
# pipeline/tts/chatterbox_client.py
import httpx
import numpy as np
from typing import AsyncIterator, Optional
from dataclasses import dataclass
@dataclass
class TTSConfig:
"""Configuration for Chatterbox TTS."""
api_url: str # RunPod endpoint
api_key: str
voice_id: str = "default"
sample_rate: int = 24000
speed: float = 1.0
pitch: float = 1.0
exaggeration: float = 0.5 # Emotion intensity
cfg_weight: float = 0.5 # Adherence to reference
timeout: float = 30.0
@dataclass
class AudioChunk:
"""Chunk of synthesized audio."""
audio_data: bytes
sample_rate: int
duration_ms: float
is_final: bool
class ChatterboxTTS:
"""
Chatterbox TTS client with streaming support.
Connects to self-hosted Chatterbox on RunPod
for high-quality, cost-effective speech synthesis.
"""
def __init__(self, config: TTSConfig):
self.config = config
self._client = httpx.AsyncClient(timeout=config.timeout)
async def synthesize_stream(
self,
text: str,
voice_id: str = None,
) -> AsyncIterator[AudioChunk]:
"""
Synthesize text to speech with streaming output.
Args:
text: Text to synthesize
voice_id: Voice to use (overrides config)
Yields:
AudioChunk objects with PCM audio data
"""
voice = voice_id or self.config.voice_id
request_data = {
"text": text,
"voice_id": voice,
"sample_rate": self.config.sample_rate,
"speed": self.config.speed,
"pitch": self.config.pitch,
"exaggeration": self.config.exaggeration,
"cfg_weight": self.config.cfg_weight,
"stream": True,
}
async with self._client.stream(
"POST",
f"{self.config.api_url}/synthesize",
json=request_data,
headers={
"Authorization": f"Bearer {self.config.api_key}",
"Accept": "audio/pcm",
},
) as response:
response.raise_for_status()
chunk_index = 0
total_samples = 0
async for chunk in response.aiter_bytes(chunk_size=4800): # 100ms chunks
if chunk:
samples = len(chunk) // 2 # 16-bit audio
duration_ms = (samples / self.config.sample_rate) * 1000
total_samples += samples
yield AudioChunk(
audio_data=chunk,
sample_rate=self.config.sample_rate,
duration_ms=duration_ms,
is_final=False,
)
chunk_index += 1
# Final chunk marker
yield AudioChunk(
audio_data=b"",
sample_rate=self.config.sample_rate,
duration_ms=0,
is_final=True,
)
async def synthesize(
self,
text: str,
voice_id: str = None,
) -> bytes:
"""
Synthesize text to speech (non-streaming).
Returns complete audio as bytes.
"""
chunks = []
async for chunk in self.synthesize_stream(text, voice_id):
if chunk.audio_data:
chunks.append(chunk.audio_data)
return b"".join(chunks)
async def get_voices(self) -> list[dict]:
"""Get available voices."""
response = await self._client.get(
f"{self.config.api_url}/voices",
headers={"Authorization": f"Bearer {self.config.api_key}"},
)
response.raise_for_status()
return response.json()
async def clone_voice(
self,
name: str,
audio_samples: list[bytes],
) -> str:
"""
Clone a voice from audio samples.
Returns the new voice ID.
"""
# Implementation depends on Chatterbox voice cloning API
pass
async def close(self) -> None:
"""Close the client."""
await self._client.aclose()
class TTSManager:
"""
High-level TTS manager with sentence queuing.
"""
def __init__(self, config: TTSConfig):
self.config = config
self._client = ChatterboxTTS(config)
self._sentence_queue: asyncio.Queue = asyncio.Queue()
self._audio_queue: asyncio.Queue = asyncio.Queue()
self._running = False
async def start(self) -> None:
"""Start the TTS processing loop."""
self._running = True
asyncio.create_task(self._process_queue())
async def stop(self) -> None:
"""Stop the TTS processing loop."""
self._running = False
async def queue_sentence(self, text: str) -> None:
"""Add a sentence to the synthesis queue."""
await self._sentence_queue.put(text)
async def get_audio(self) -> Optional[AudioChunk]:
"""Get the next audio chunk."""
try:
return await asyncio.wait_for(
self._audio_queue.get(),
timeout=0.1,
)
except asyncio.TimeoutError:
return None
async def clear_queue(self) -> None:
"""Clear pending sentences (for interruption)."""
while not self._sentence_queue.empty():
try:
self._sentence_queue.get_nowait()
except asyncio.QueueEmpty:
break
while not self._audio_queue.empty():
try:
self._audio_queue.get_nowait()
except asyncio.QueueEmpty:
break
async def _process_queue(self) -> None:
"""Process sentences from the queue."""
while self._running:
try:
text = await asyncio.wait_for(
self._sentence_queue.get(),
timeout=0.5,
)
async for chunk in self._client.synthesize_stream(text):
await self._audio_queue.put(chunk)
except asyncio.TimeoutError:
continue
except Exception as e:
logger.error(f"TTS error: {e}")
8.3 TTS Fallback Strategy \
# pipeline/tts/fallback.py
class TTSWithFallback:
"""
TTS with automatic fallback providers.
Fallback chain:
1. Chatterbox (primary - self-hosted, lowest cost)
2. Resemble.ai (cloud fallback - good quality)
3. Pre-recorded phrases (emergency fallback)
"""
def __init__(
self,
primary: ChatterboxTTS,
cloud_fallback: "ResembleTTS",
phrase_cache: "PhraseCache",
):
self.primary = primary
self.cloud_fallback = cloud_fallback
self.phrase_cache = phrase_cache
self._primary_healthy = True
self._failure_count = 0
self._max_failures = 3
async def synthesize_stream(
self,
text: str,
voice_id: str = None,
) -> AsyncIterator[AudioChunk]:
"""Synthesize with automatic fallback."""
# Try primary (Chatterbox)
if self._primary_healthy:
try:
async for chunk in self.primary.synthesize_stream(text, voice_id):
yield chunk
self._failure_count = 0
return
except Exception as e:
logger.warning(f"Primary TTS failed: {e}")
self._failure_count += 1
if self._failure_count >= self._max_failures:
self._primary_healthy = False
# Try cloud fallback (Resemble)
try:
async for chunk in self.cloud_fallback.synthesize_stream(text):
yield chunk
return
except Exception as e:
logger.warning(f"Cloud TTS fallback failed: {e}")
# Last resort - pre-recorded phrases
cached_audio = await self.phrase_cache.get_similar(text)
if cached_audio:
yield AudioChunk(
audio_data=cached_audio,
sample_rate=24000,
duration_ms=len(cached_audio) / 48, # Approximate
is_final=True,
)
else:
# Generate a generic error message audio
error_audio = await self.phrase_cache.get(
"I apologize, I'm having some technical difficulties."
)
if error_audio:
yield AudioChunk(
audio_data=error_audio,
sample_rate=24000,
duration_ms=len(error_audio) / 48,
is_final=True,
)
async def health_check(self) -> bool:
"""Check if primary TTS is healthy."""
try:
audio = await self.primary.synthesize("Test")
self._primary_healthy = True
self._failure_count = 0
return True
except Exception:
return False
8.4 Voice Configuration \
# pipeline/tts/voices.py
@dataclass
class VoiceProfile:
"""Voice configuration for an agent."""
voice_id: str
name: str
gender: str
accent: str
age_range: str
style: str # professional, friendly, casual
speed: float = 1.0
pitch: float = 1.0
# Chatterbox-specific
exaggeration: float = 0.5
cfg_weight: float = 0.5
# Pre-configured voices
VOICE_PROFILES = {
"professional_female": VoiceProfile(
voice_id="alex_professional",
name="Alex",
gender="female",
accent="American",
age_range="30-40",
style="professional",
speed=1.0,
pitch=1.0,
exaggeration=0.3,
),
"friendly_male": VoiceProfile(
voice_id="jordan_friendly",
name="Jordan",
gender="male",
accent="American",
age_range="25-35",
style="friendly",
speed=1.05,
pitch=1.0,
exaggeration=0.5,
),
"warm_female": VoiceProfile(
voice_id="sam_warm",
name="Sam",
gender="female",
accent="American",
age_range="35-45",
style="warm",
speed=0.95,
pitch=0.98,
exaggeration=0.4,
),
}
- Streaming Architecture \
9.1 End-to-End Streaming \
The entire pipeline operates in streaming mode to minimize latency:┌─────────────────────────────────────────────────────────────────────────────┐
│ END-TO-END STREAMING FLOW │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ TIME ───────────────────────────────────────────────────────────────▶ │
│ │
│ AUDIO IN: ████████████████░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░ │
│ (caller) [speaking.......] │
│ │
│ VAD: ░░████████████████░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░ │
│ [voice detected][endpoint] │
│ │
│ STT: ░░░░██░░██░░██░░████░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░ │
│ [interim][interim][final] │
│ │
│ LLM: ░░░░░░░░░░░░░░░░░░██░░██░░██░░██░░██░░░░░░░░░░░░░░░░░░ │
│ [tokens streaming...] │
│ │
│ SENTENCE: ░░░░░░░░░░░░░░░░░░░░░░░░░░████░░░░░░████░░░░░░░░░░░░░░ │
│ BUFFER [sent1] [sent2] │
│ │
│ TTS: ░░░░░░░░░░░░░░░░░░░░░░░░░░░░██████░░░░██████░░░░░░░░░░ │
│ [synth1] [synth2] │
│ │
│ AUDIO OUT: ░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░████████████████████████ │
│ (to caller) [audio playing............] │
│ │
│ LATENCY: |◀──────── ~850ms TTFB ────────▶| │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
9.2 Pipeline Orchestrator \
# pipeline/orchestrator.py
import asyncio
from typing import Optional
from dataclasses import dataclass
from enum import Enum
class PipelineState(Enum):
IDLE = "idle"
LISTENING = "listening"
PROCESSING = "processing"
SPEAKING = "speaking"
INTERRUPTED = "interrupted"
@dataclass
class PipelineConfig:
"""Configuration for the voice pipeline."""
vad_config: VADConfig
stt_config: DeepgramConfig
llm_config: LLMConfig
tts_config: TTSConfig
agent_config: AgentConfig
# Timing
max_silence_ms: int = 400
max_turn_duration_ms: int = 60000
interrupt_threshold_ms: int = 200
class VoicePipelineOrchestrator:
"""
Orchestrates the complete voice pipeline.
Manages the flow from audio input through processing
to audio output, handling interruptions and errors.
"""
def __init__(
self,
config: PipelineConfig,
audio_input: "AudioInputStream",
audio_output: "AudioOutputStream",
call_id: str,
):
self.config = config
self.audio_input = audio_input
self.audio_output = audio_output
self.call_id = call_id
# Initialize components
self.vad = VADProcessor(config.vad_config)
self.stt = STTManager(config.stt_config)
self.llm = ClaudeLLM(config.llm_config)
self.tts = TTSManager(config.tts_config)
self.context = ContextManager(redis_client, kb_service)
# State
self._state = PipelineState.IDLE
self._current_transcript = ""
self._latency_tracker: Optional[LatencyTracker] = None
self._turn_id = 0
# Control flags
self._running = False
self._interrupted = False
async def start(self) -> None:
"""Start the pipeline."""
self._running = True
self._state = PipelineState.LISTENING
# Start component tasks
await self.tts.start()
# Main processing loop
asyncio.create_task(self._audio_input_loop())
asyncio.create_task(self._audio_output_loop())
async def stop(self) -> None:
"""Stop the pipeline."""
self._running = False
await self.stt.end_session()
await self.tts.stop()
async def _audio_input_loop(self) -> None:
"""Process incoming audio."""
while self._running:
try:
audio_chunk = await self.audio_input.read()
if not audio_chunk:
continue
timestamp = time.time()
# Check for interruption during speaking
if self._state == PipelineState.SPEAKING:
await self._check_interruption(audio_chunk, timestamp)
# Process through VAD
await self.vad.process_audio(
audio_chunk,
timestamp,
on_speech_start=self._on_speech_start,
on_speech_end=self._on_speech_end,
)
# Send to STT if listening
if self._state in (PipelineState.LISTENING, PipelineState.INTERRUPTED):
await self.stt.send_audio(audio_chunk)
except Exception as e:
logger.error(f"Error in audio input loop: {e}")
async def _audio_output_loop(self) -> None:
"""Send audio to output."""
while self._running:
try:
chunk = await self.tts.get_audio()
if chunk and chunk.audio_data:
if not self._interrupted:
await self.audio_output.write(chunk.audio_data)
if chunk.is_final:
self._state = PipelineState.LISTENING
await asyncio.sleep(0.01) # Small yield
except Exception as e:
logger.error(f"Error in audio output loop: {e}")
async def _on_speech_start(self, event: dict) -> None:
"""Handle speech start from VAD."""
logger.debug(f"Speech started: {event}")
# Start new turn
self._turn_id += 1
self._latency_tracker = LatencyTracker(self.call_id, str(self._turn_id))
self._latency_tracker.mark("vad_speech_start")
# Start STT session
await self.stt.start_session(
on_interim=self._on_interim_transcript,
on_final=self._on_final_transcript,
)
self._state = PipelineState.LISTENING
async def _on_speech_end(self, event: dict) -> None:
"""Handle speech end from VAD (endpointing)."""
logger.debug(f"Speech ended: {event}")
if self._latency_tracker:
self._latency_tracker.mark("vad_speech_end")
# Get final transcript
final_transcript = await self.stt.end_session()
if final_transcript:
await self._process_turn(final_transcript)
async def _on_interim_transcript(self, transcript: str) -> None:
"""Handle interim transcript."""
self._current_transcript = transcript
if self._latency_tracker and "stt_interim_first" not in self._latency_tracker.checkpoints:
self._latency_tracker.mark("stt_interim_first")
async def _on_final_transcript(self, transcript: str) -> None:
"""Handle final transcript."""
if self._latency_tracker:
self._latency_tracker.mark("stt_final")
async def _process_turn(self, transcript: str) -> None:
"""Process a complete user turn."""
self._state = PipelineState.PROCESSING
logger.info(f"Processing turn: {transcript}")
# Assemble context
context = await self.context.assemble_context(
call_id=self.call_id,
agent_config=self.config.agent_config,
current_input=transcript,
)
if self._latency_tracker:
self._latency_tracker.mark("context_assembled")
self._latency_tracker.mark("llm_request_sent")
# Generate response
first_token = False
first_sentence = False
async for response in self.llm.generate_stream(
system_prompt=context.system_prompt,
messages=context.messages,
tools=context.tools,
):
if not first_token and response.text:
first_token = True
if self._latency_tracker:
self._latency_tracker.mark("llm_first_token")
# Extract sentences and queue for TTS
if response.text:
chunk, remaining = self._extract_sentence(response.text)
if chunk:
if not first_sentence:
first_sentence = True
if self._latency_tracker:
self._latency_tracker.mark("llm_sentence_complete")
await self.tts.queue_sentence(chunk)
self._state = PipelineState.SPEAKING
# Handle tool calls
if response.is_complete and response.tool_calls:
for tool_call in response.tool_calls:
await self._execute_tool(tool_call)
# Add to conversation history
await self.context.add_to_history(self.call_id, "user", transcript)
await self.context.add_to_history(self.call_id, "assistant", response.text)
# Emit latency metrics
if self._latency_tracker:
self._latency_tracker.emit_metrics()
async def _check_interruption(
self,
audio_chunk: bytes,
timestamp: float,
) -> None:
"""Check if user is interrupting."""
# Quick VAD check
vad_result = self.vad.vad.process_frame(
np.frombuffer(audio_chunk, dtype=np.int16),
timestamp,
)
if vad_result.get("speech_probability", 0) > 0.7:
# User might be interrupting
self._handle_interruption()
def _handle_interruption(self) -> None:
"""Handle user interruption."""
logger.info("User interruption detected")
self._interrupted = True
self._state = PipelineState.INTERRUPTED
# Clear TTS queue
asyncio.create_task(self.tts.clear_queue())
# Reset for new input
self._interrupted = False
def _extract_sentence(self, text: str) -> tuple[str, str]:
"""Extract a complete sentence from text."""
# Simple sentence boundary detection
for end_char in ['. ', '! ', '? ', '.\n', '!\n', '?\n']:
if end_char in text:
idx = text.index(end_char) + 1
return text[:idx].strip(), text[idx:]
return "", text
async def _execute_tool(self, tool_call: dict) -> None:
"""Execute a tool call."""
logger.info(f"Executing tool: {tool_call['name']}")
# Tool execution logic (see Section 11)
pass
9.3 Audio Buffer Management \
# pipeline/audio/buffer.py
import collections
from typing import Optional
import numpy as np
class CircularAudioBuffer:
"""
Circular buffer for audio samples.
Maintains a fixed-size buffer for audio processing
with efficient append and read operations.
"""
def __init__(
self,
duration_ms: int,
sample_rate: int = 16000,
channels: int = 1,
):
self.sample_rate = sample_rate
self.channels = channels
# Calculate buffer size
samples = int(sample_rate * duration_ms / 1000)
self._buffer = collections.deque(maxlen=samples)
self._write_position = 0
def write(self, samples: np.ndarray) -> None:
"""Write samples to the buffer."""
for sample in samples:
self._buffer.append(sample)
def read(self, num_samples: int) -> Optional[np.ndarray]:
"""Read samples from the buffer."""
if len(self._buffer) < num_samples:
return None
samples = []
for _ in range(num_samples):
samples.append(self._buffer.popleft())
return np.array(samples, dtype=np.int16)
def peek(self, num_samples: int) -> Optional[np.ndarray]:
"""Peek at samples without removing them."""
if len(self._buffer) < num_samples:
return None
return np.array(list(self._buffer)[:num_samples], dtype=np.int16)
def clear(self) -> None:
"""Clear the buffer."""
self._buffer.clear()
@property
def available(self) -> int:
"""Number of samples available."""
return len(self._buffer)
@property
def duration_ms(self) -> float:
"""Duration of buffered audio in milliseconds."""
return (len(self._buffer) / self.sample_rate) * 1000
class JitterBuffer:
"""
Jitter buffer for smooth audio playback.
Buffers audio chunks to handle network jitter
while maintaining low latency.
"""
def __init__(
self,
target_latency_ms: int = 60,
max_latency_ms: int = 200,
sample_rate: int = 24000,
):
self.target_latency_ms = target_latency_ms
self.max_latency_ms = max_latency_ms
self.sample_rate = sample_rate
self._buffer: asyncio.Queue = asyncio.Queue()
self._buffered_duration_ms: float = 0
self._is_playing = False
async def add_chunk(self, audio_data: bytes) -> None:
"""Add an audio chunk to the buffer."""
duration_ms = (len(audio_data) / 2) / self.sample_rate * 1000
await self._buffer.put(audio_data)
self._buffered_duration_ms += duration_ms
# Drop old chunks if buffer is too full
while self._buffered_duration_ms > self.max_latency_ms:
try:
old_chunk = self._buffer.get_nowait()
old_duration = (len(old_chunk) / 2) / self.sample_rate * 1000
self._buffered_duration_ms -= old_duration
except asyncio.QueueEmpty:
break
async def get_chunk(self) -> Optional[bytes]:
"""Get the next chunk for playback."""
# Wait until we have enough buffered
if not self._is_playing:
while self._buffered_duration_ms < self.target_latency_ms:
await asyncio.sleep(0.01)
self._is_playing = True
try:
chunk = await asyncio.wait_for(
self._buffer.get(),
timeout=0.1,
)
duration_ms = (len(chunk) / 2) / self.sample_rate * 1000
self._buffered_duration_ms -= duration_ms
return chunk
except asyncio.TimeoutError:
self._is_playing = False
return None
def clear(self) -> None:
"""Clear the buffer."""
while not self._buffer.empty():
try:
self._buffer.get_nowait()
except asyncio.QueueEmpty:
break
self._buffered_duration_ms = 0
self._is_playing = False
- Interruption Handling \
10.1 Interruption Types \
Users may interrupt the AI for various reasons:| Type | Description | Response |
|---|---|---|
| Correction | User wants to correct misunderstanding | Stop, acknowledge, listen |
| Completion | User finishes AI’s sentence | Stop, continue flow |
| Urgency | User has urgent new information | Stop immediately, process |
| Impatience | AI is too verbose | Stop, be more concise |
| Backchannel | ”Uh-huh”, “okay” - not interruption | Continue speaking |
10.2 Interruption Detection \
# pipeline/interruption/detector.py
from dataclasses import dataclass
from typing import Optional
import numpy as np
@dataclass
class InterruptionConfig:
"""Configuration for interruption detection."""
vad_threshold: float = 0.7 # Higher than normal VAD
min_duration_ms: int = 150 # Minimum speech to consider
energy_threshold: float = 0.02 # Minimum audio energy
backchannel_max_ms: int = 500 # Max duration for backchannel
confirmation_delay_ms: int = 100 # Wait before confirming interrupt
class InterruptionDetector:
"""
Detect user interruptions during AI speech.
Distinguishes between intentional interruptions
and backchannels/noise.
"""
def __init__(self, config: InterruptionConfig = None):
self.config = config or InterruptionConfig()
self._speech_start_time: Optional[float] = None
self._speech_energy_sum: float = 0
self._frame_count: int = 0
self._confirmed_interruption: bool = False
def process_frame(
self,
audio_frame: np.ndarray,
vad_probability: float,
timestamp: float,
ai_is_speaking: bool,
) -> dict:
"""
Process an audio frame for interruption detection.
Args:
audio_frame: Audio samples
vad_probability: VAD output for this frame
timestamp: Current timestamp
ai_is_speaking: Whether AI is currently outputting audio
Returns:
Detection result with interruption status
"""
result = {
"is_interruption": False,
"is_backchannel": False,
"should_stop": False,
"confidence": 0.0,
}
if not ai_is_speaking:
self._reset()
return result
# Calculate frame energy
energy = np.sqrt(np.mean(audio_frame.astype(np.float32) ** 2)) / 32768
# Check if this frame has speech
is_speech = (
vad_probability >= self.config.vad_threshold and
energy >= self.config.energy_threshold
)
if is_speech:
if self._speech_start_time is None:
self._speech_start_time = timestamp
self._speech_energy_sum += energy
self._frame_count += 1
# Calculate speech duration
duration_ms = (timestamp - self._speech_start_time) * 1000
avg_energy = self._speech_energy_sum / self._frame_count
# Determine if this is an interruption
if duration_ms >= self.config.min_duration_ms:
if duration_ms <= self.config.backchannel_max_ms:
# Could be backchannel - wait to confirm
result["confidence"] = 0.5
else:
# Likely real interruption
result["is_interruption"] = True
result["should_stop"] = True
result["confidence"] = min(0.9, 0.5 + (duration_ms / 1000))
else:
# No speech - check if we had a short backchannel
if self._speech_start_time is not None:
duration_ms = (timestamp - self._speech_start_time) * 1000
if duration_ms <= self.config.backchannel_max_ms:
result["is_backchannel"] = True
self._reset()
return result
def _reset(self) -> None:
"""Reset detection state."""
self._speech_start_time = None
self._speech_energy_sum = 0
self._frame_count = 0
self._confirmed_interruption = False
def force_interrupt(self) -> None:
"""Force an interruption (external trigger)."""
self._confirmed_interruption = True
class InterruptionHandler:
"""
Handle interruptions in the voice pipeline.
"""
def __init__(
self,
tts_manager: TTSManager,
audio_output: "AudioOutputStream",
context_manager: ContextManager,
):
self.tts = tts_manager
self.audio_output = audio_output
self.context = context_manager
self._interrupted_text: str = ""
self._interrupt_count: int = 0
async def handle_interruption(
self,
call_id: str,
interrupted_response: str,
interrupt_position: int,
) -> None:
"""
Handle a user interruption.
Args:
call_id: Current call ID
interrupted_response: The AI response that was interrupted
interrupt_position: Character position where interrupted
"""
logger.info(f"Handling interruption at position {interrupt_position}")
# Stop TTS immediately
await self.tts.clear_queue()
# Stop audio output
await self.audio_output.clear()
# Track what was said vs interrupted
spoken_text = interrupted_response[:interrupt_position]
unspoken_text = interrupted_response[interrupt_position:]
self._interrupted_text = unspoken_text
self._interrupt_count += 1
# Update conversation history with partial response
if spoken_text.strip():
await self.context.add_to_history(
call_id,
"assistant",
spoken_text + "... [interrupted]",
)
# If user interrupts frequently, note this for context
if self._interrupt_count >= 3:
# User might prefer shorter responses
logger.info("User frequently interrupts - suggesting shorter responses")
async def get_interrupted_context(self) -> Optional[str]:
"""Get context about what was interrupted."""
if self._interrupted_text:
return f"(You were about to say: {self._interrupted_text[:100]}...)"
return None
def reset_interrupt_tracking(self) -> None:
"""Reset interruption tracking for new call."""
self._interrupted_text = ""
self._interrupt_count = 0
10.3 Graceful Interruption Flow \
┌─────────────────────────────────────────────────────────────────────────────┐
│ INTERRUPTION HANDLING FLOW │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ AI Speaking User Input System Response │
│ │ │ │ │
│ │ ◀── audio ── │ │ │
│ │ │ │ │
│ │ VAD detects speech │ │
│ │ │ │ │
│ │ │ ─── Is this ───▶ │ │
│ │ │ interruption? │ │
│ │ │ │ │
│ │ │ │ │
│ │ │ ◀── Check ─── │ │
│ │ │ duration │ │
│ │ │ & energy │ │
│ │ │ │ │
│ ┌───┴───────────────────┴─────────────────────┴───┐ │
│ │ │ │
│ │ If duration < 500ms AND low energy: │ │
│ │ → Likely backchannel, continue speaking │ │
│ │ │ │
│ │ If duration >= 150ms AND high energy: │ │
│ │ → Likely interruption │ │
│ │ → Stop TTS │ │
│ │ → Clear audio queue │ │
│ │ → Start processing new input │ │
│ │ │ │
│ └─────────────────────────────────────────────────┘ │
│ │
│ Post-Interruption: │
│ 1. Log partial response for context │
│ 2. Begin STT on new user input │
│ 3. Don't repeat interrupted content unless relevant │
│ 4. If frequent interrupts → respond more concisely │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
10.4 Backchannel Recognition \
# pipeline/interruption/backchannel.py
import re
from typing import Optional
class BackchannelRecognizer:
"""
Recognize backchannel utterances that shouldn't interrupt.
Backchannels are short acknowledgments like "uh-huh", "okay",
"right" that indicate listening without intent to take the turn.
"""
# Common backchannel patterns
BACKCHANNEL_PATTERNS = [
r'^(uh[- ]?huh|mm[- ]?hmm|hmm+)$',
r'^(okay|ok|yep|yeah|yes|no|right|sure)$',
r'^(i see|got it|makes sense)$',
r'^(oh|ah|wow)$',
]
def __init__(self):
self._patterns = [
re.compile(p, re.IGNORECASE)
for p in self.BACKCHANNEL_PATTERNS
]
def is_backchannel(
self,
transcript: str,
duration_ms: float,
) -> bool:
"""
Check if utterance is a backchannel.
Args:
transcript: The transcribed text
duration_ms: Duration of the utterance
Returns:
True if likely a backchannel
"""
# Backchannels are short
if duration_ms > 1000:
return False
# Clean transcript
text = transcript.strip().lower()
# Check patterns
for pattern in self._patterns:
if pattern.match(text):
return True
# Very short utterances (1-2 words) are often backchannels
words = text.split()
if len(words) <= 2 and duration_ms < 500:
return True
return False
def get_backchannel_type(
self,
transcript: str,
) -> Optional[str]:
"""Categorize the backchannel type."""
text = transcript.strip().lower()
if re.match(r'^(yes|yeah|yep|uh[- ]?huh|mm[- ]?hmm)$', text):
return "agreement"
elif re.match(r'^(no|nope)$', text):
return "disagreement"
elif re.match(r'^(okay|ok|got it|i see)$', text):
return "acknowledgment"
elif re.match(r'^(oh|ah|wow)$', text):
return "surprise"
elif re.match(r'^(right|sure)$', text):
return "confirmation"
return None
- Tool Calling in Voice Context \
11.1 Voice-Appropriate Tools \
Tools in voice context need special handling:- Results must be speakable
- Execution should be fast
- Failures need graceful verbal handling
# pipeline/tools/voice_tools.py
from dataclasses import dataclass
from typing import Any, Optional
from enum import Enum
class ToolCategory(Enum):
INSTANT = "instant" # < 500ms, can execute inline
ASYNC = "async" # > 500ms, need filler phrase
BACKGROUND = "background" # Can run without response
@dataclass
class VoiceToolDefinition:
"""Tool definition optimized for voice."""
name: str
description: str
parameters: dict
category: ToolCategory
# Voice-specific
filler_phrase: str = "Let me check that for you..."
success_template: str = "{result}"
failure_phrase: str = "I wasn't able to complete that action."
max_execution_ms: int = 5000
# Example tool definitions
APPOINTMENT_TOOLS = [
VoiceToolDefinition(
name="check_availability",
description="Check available appointment slots for a given date",
parameters={
"type": "object",
"properties": {
"date": {
"type": "string",
"description": "Date to check in YYYY-MM-DD format",
},
"service_type": {
"type": "string",
"description": "Type of appointment",
},
},
"required": ["date"],
},
category=ToolCategory.INSTANT,
filler_phrase="Let me check what's available...",
success_template="I have availability at {slots}. Which time works best for you?",
),
VoiceToolDefinition(
name="book_appointment",
description="Book an appointment for the caller",
parameters={
"type": "object",
"properties": {
"date": {"type": "string"},
"time": {"type": "string"},
"service_type": {"type": "string"},
"customer_name": {"type": "string"},
"customer_phone": {"type": "string"},
},
"required": ["date", "time", "customer_name"],
},
category=ToolCategory.ASYNC,
filler_phrase="I'm booking that for you now...",
success_template="I've booked your appointment for {date} at {time}. You'll receive a confirmation shortly.",
failure_phrase="I wasn't able to book that appointment. Would you like to try a different time?",
),
VoiceToolDefinition(
name="transfer_to_agent",
description="Transfer the call to a human agent",
parameters={
"type": "object",
"properties": {
"reason": {"type": "string"},
"department": {"type": "string"},
},
},
category=ToolCategory.INSTANT,
filler_phrase="I'll connect you with someone who can help...",
success_template="I'm transferring you now. Please hold.",
),
]
11.2 Tool Executor \
# pipeline/tools/executor.py
import asyncio
from typing import Any, Optional
from dataclasses import dataclass
@dataclass
class ToolResult:
"""Result from tool execution."""
tool_name: str
tool_call_id: str
success: bool
result: Any
error: Optional[str]
execution_time_ms: float
speakable_response: str
class VoiceToolExecutor:
"""
Execute tools in voice context.
Handles timing, filler phrases, and result formatting
for natural voice conversation flow.
"""
def __init__(
self,
tool_definitions: list[VoiceToolDefinition],
webhook_executor: "WebhookExecutor",
tts_manager: TTSManager,
):
self.tools = {t.name: t for t in tool_definitions}
self.webhook_executor = webhook_executor
self.tts = tts_manager
async def execute(
self,
tool_call: dict,
call_context: dict,
) -> ToolResult:
"""
Execute a tool call.
Args:
tool_call: Tool call from LLM (name, id, input)
call_context: Current call context
Returns:
Tool execution result
"""
tool_name = tool_call["name"]
tool_id = tool_call["id"]
tool_input = tool_call["input"]
tool_def = self.tools.get(tool_name)
if not tool_def:
return ToolResult(
tool_name=tool_name,
tool_call_id=tool_id,
success=False,
result=None,
error=f"Unknown tool: {tool_name}",
execution_time_ms=0,
speakable_response="I'm not able to do that right now.",
)
# Queue filler phrase for async tools
if tool_def.category == ToolCategory.ASYNC:
await self.tts.queue_sentence(tool_def.filler_phrase)
# Execute with timeout
start_time = time.time()
try:
result = await asyncio.wait_for(
self._execute_tool(tool_name, tool_input, call_context),
timeout=tool_def.max_execution_ms / 1000,
)
execution_time = (time.time() - start_time) * 1000
# Format speakable response
speakable = self._format_response(tool_def, result)
return ToolResult(
tool_name=tool_name,
tool_call_id=tool_id,
success=True,
result=result,
error=None,
execution_time_ms=execution_time,
speakable_response=speakable,
)
except asyncio.TimeoutError:
execution_time = (time.time() - start_time) * 1000
return ToolResult(
tool_name=tool_name,
tool_call_id=tool_id,
success=False,
result=None,
error="Tool execution timed out",
execution_time_ms=execution_time,
speakable_response=tool_def.failure_phrase,
)
except Exception as e:
execution_time = (time.time() - start_time) * 1000
logger.error(f"Tool execution error: {e}")
return ToolResult(
tool_name=tool_name,
tool_call_id=tool_id,
success=False,
result=None,
error=str(e),
execution_time_ms=execution_time,
speakable_response=tool_def.failure_phrase,
)
async def _execute_tool(
self,
tool_name: str,
tool_input: dict,
call_context: dict,
) -> Any:
"""Execute the actual tool logic."""
# Built-in tools
if tool_name == "transfer_to_agent":
return await self._transfer_to_agent(tool_input, call_context)
elif tool_name == "end_call":
return await self._end_call(tool_input, call_context)
elif tool_name == "send_sms":
return await self._send_sms(tool_input, call_context)
# External webhook tools
else:
return await self.webhook_executor.execute(
tool_name,
tool_input,
call_context,
)
def _format_response(
self,
tool_def: VoiceToolDefinition,
result: Any,
) -> str:
"""Format tool result as speakable text."""
try:
if isinstance(result, dict):
return tool_def.success_template.format(**result)
else:
return tool_def.success_template.format(result=result)
except KeyError:
return str(result)
async def _transfer_to_agent(
self,
params: dict,
context: dict,
) -> dict:
"""Built-in transfer functionality."""
from integrations.gotoconnect import call_control
target = params.get("department", "support")
reason = params.get("reason", "Customer requested transfer")
# Map department to extension
department_extensions = {
"support": "ext:1001",
"sales": "ext:1002",
"billing": "ext:1003",
}
dial_string = department_extensions.get(target, "ext:1001")
await call_control.blind_transfer(
call_id=context["external_call_id"],
dial_string=dial_string,
)
return {"transferred_to": target, "reason": reason}
async def _end_call(
self,
params: dict,
context: dict,
) -> dict:
"""Built-in end call functionality."""
from integrations.gotoconnect import call_control
await call_control.hangup(
call_id=context["external_call_id"],
)
return {"ended": True}
async def _send_sms(
self,
params: dict,
context: dict,
) -> dict:
"""Send SMS to caller."""
# Implementation depends on SMS provider
pass
11.3 Webhook Integration with n8n \
# pipeline/tools/webhook_executor.py
import httpx
from typing import Any
class WebhookExecutor:
"""
Execute tools via n8n webhooks.
n8n workflows can be triggered to perform
complex operations and return results.
"""
def __init__(
self,
n8n_base_url: str,
timeout: float = 10.0,
):
self.base_url = n8n_base_url
self.timeout = timeout
self._client = httpx.AsyncClient(timeout=timeout)
async def execute(
self,
tool_name: str,
tool_input: dict,
call_context: dict,
) -> Any:
"""
Execute a tool via n8n webhook.
Args:
tool_name: Name of the tool (maps to webhook path)
tool_input: Input parameters from LLM
call_context: Current call context
Returns:
Result from n8n workflow
"""
webhook_url = f"{self.base_url}/webhook/{tool_name}"
payload = {
"input": tool_input,
"context": {
"call_id": call_context.get("call_id"),
"tenant_id": call_context.get("tenant_id"),
"caller_phone": call_context.get("caller_phone"),
"timestamp": time.time(),
},
}
response = await self._client.post(
webhook_url,
json=payload,
headers={
"Content-Type": "application/json",
},
)
response.raise_for_status()
result = response.json()
return result.get("data", result)
async def close(self) -> None:
"""Close the client."""
await self._client.aclose()
- Conversation State Management \
12.1 State Structure \
# pipeline/state/models.py
from dataclasses import dataclass, field
from typing import Optional, Any
from enum import Enum
from datetime import datetime
class ConversationPhase(Enum):
GREETING = "greeting"
DISCOVERY = "discovery"
RESOLUTION = "resolution"
CLOSING = "closing"
@dataclass
class ConversationState:
"""Complete state of a conversation."""
call_id: str
tenant_id: str
agent_id: str
# Call metadata
caller_phone: str
called_number: str
direction: str
started_at: datetime
# Conversation tracking
turn_count: int = 0
current_phase: ConversationPhase = ConversationPhase.GREETING
detected_intent: Optional[str] = None
# Context accumulation
collected_info: dict = field(default_factory=dict)
pending_actions: list = field(default_factory=list)
completed_actions: list = field(default_factory=list)
# State flags
is_on_hold: bool = False
is_muted: bool = False
transfer_pending: bool = False
# Performance tracking
total_latency_ms: float = 0
average_latency_ms: float = 0
interrupt_count: int = 0
@dataclass
class TurnState:
"""State for a single conversation turn."""
turn_id: str
started_at: float
# Input
transcript: str = ""
transcript_confidence: float = 0.0
# Processing
llm_response: str = ""
tool_calls: list = field(default_factory=list)
tool_results: list = field(default_factory=list)
# Output
spoken_response: str = ""
was_interrupted: bool = False
# Timing
stt_latency_ms: float = 0
llm_latency_ms: float = 0
tts_latency_ms: float = 0
total_latency_ms: float = 0
12.2 State Manager \
# pipeline/state/manager.py
import json
from typing import Optional
from redis.asyncio import Redis
class ConversationStateManager:
"""
Manage conversation state in Redis.
Provides fast access to conversation state
with automatic expiration and recovery.
"""
STATE_TTL = 3600 # 1 hour
def __init__(self, redis: Redis):
self.redis = redis
async def get_state(self, call_id: str) -> Optional[ConversationState]:
"""Get conversation state."""
key = f"call:{call_id}:state"
data = await self.redis.get(key)
if data:
state_dict = json.loads(data)
return ConversationState(**state_dict)
return None
async def save_state(self, state: ConversationState) -> None:
"""Save conversation state."""
key = f"call:{state.call_id}:state"
state_dict = {
"call_id": state.call_id,
"tenant_id": state.tenant_id,
"agent_id": state.agent_id,
"caller_phone": state.caller_phone,
"called_number": state.called_number,
"direction": state.direction,
"started_at": state.started_at.isoformat(),
"turn_count": state.turn_count,
"current_phase": state.current_phase.value,
"detected_intent": state.detected_intent,
"collected_info": state.collected_info,
"pending_actions": state.pending_actions,
"completed_actions": state.completed_actions,
"is_on_hold": state.is_on_hold,
"is_muted": state.is_muted,
"transfer_pending": state.transfer_pending,
"total_latency_ms": state.total_latency_ms,
"average_latency_ms": state.average_latency_ms,
"interrupt_count": state.interrupt_count,
}
await self.redis.setex(
key,
self.STATE_TTL,
json.dumps(state_dict),
)
async def update_turn(
self,
call_id: str,
turn: TurnState,
) -> None:
"""Update state after a turn."""
state = await self.get_state(call_id)
if not state:
return
state.turn_count += 1
state.total_latency_ms += turn.total_latency_ms
state.average_latency_ms = state.total_latency_ms / state.turn_count
if turn.was_interrupted:
state.interrupt_count += 1
# Update collected info from tool results
for result in turn.tool_results:
if result.get("collected_data"):
state.collected_info.update(result["collected_data"])
await self.save_state(state)
async def get_history(self, call_id: str) -> list[dict]:
"""Get conversation history."""
key = f"call:{call_id}:history"
data = await self.redis.get(key)
if data:
return json.loads(data)
return []
async def add_to_history(
self,
call_id: str,
role: str,
content: str,
) -> None:
"""Add a message to history."""
key = f"call:{call_id}:history"
history = await self.get_history(call_id)
history.append({
"role": role,
"content": content,
"timestamp": time.time(),
})
# Keep last 50 turns
history = history[-100:]
await self.redis.setex(
key,
self.STATE_TTL,
json.dumps(history),
)
async def cleanup(self, call_id: str) -> None:
"""Clean up state after call ends."""
keys = [
f"call:{call_id}:state",
f"call:{call_id}:history",
f"call:{call_id}:webrtc",
f"call:{call_id}:context",
]
for key in keys:
await self.redis.delete(key)
- Error Handling and Fallbacks \
13.1 Error Categories \
# pipeline/errors.py
from enum import Enum
class ErrorCategory(Enum):
RECOVERABLE = "recoverable" # Can retry or fallback
DEGRADED = "degraded" # Partial functionality
FATAL = "fatal" # Must end call
class PipelineError(Exception):
"""Base pipeline error."""
category: ErrorCategory = ErrorCategory.RECOVERABLE
user_message: str = "I apologize, I encountered an issue."
class STTError(PipelineError):
"""Speech-to-text error."""
user_message = "I'm having trouble hearing you. Could you please repeat that?"
class LLMError(PipelineError):
"""LLM processing error."""
user_message = "I need a moment to think. Could you repeat your question?"
class TTSError(PipelineError):
"""Text-to-speech error."""
category = ErrorCategory.DEGRADED
user_message = "" # Can't speak this error!
class ToolError(PipelineError):
"""Tool execution error."""
user_message = "I wasn't able to complete that action. Let me try another way."
class FatalError(PipelineError):
"""Unrecoverable error."""
category = ErrorCategory.FATAL
user_message = "I apologize, I'm experiencing technical difficulties. Please call back or hold for an agent."
13.2 Error Handler \
# pipeline/error_handler.py
class PipelineErrorHandler:
"""
Handle errors in the voice pipeline.
Provides graceful degradation and user-friendly
error messages for voice context.
"""
def __init__(
self,
tts_manager: TTSManager,
call_control: "GoToCallControl",
state_manager: ConversationStateManager,
):
self.tts = tts_manager
self.call_control = call_control
self.state = state_manager
self._error_count = 0
self._max_errors = 5
async def handle_error(
self,
error: Exception,
call_id: str,
context: str = "",
) -> bool:
"""
Handle a pipeline error.
Args:
error: The error that occurred
call_id: Current call ID
context: Additional context
Returns:
True if recovered, False if fatal
"""
self._error_count += 1
logger.error(
f"Pipeline error in {context}: {error}",
exc_info=True,
extra={"call_id": call_id},
)
# Check if too many errors
if self._error_count >= self._max_errors:
await self._handle_fatal(call_id)
return False
# Handle by error type
if isinstance(error, PipelineError):
return await self._handle_pipeline_error(error, call_id)
else:
return await self._handle_unexpected_error(error, call_id)
async def _handle_pipeline_error(
self,
error: PipelineError,
call_id: str,
) -> bool:
"""Handle a known pipeline error."""
if error.category == ErrorCategory.FATAL:
await self._handle_fatal(call_id)
return False
elif error.category == ErrorCategory.DEGRADED:
# Log but continue
logger.warning(f"Degraded operation: {error}")
return True
else: # RECOVERABLE
# Speak the error message
if error.user_message:
await self.tts.queue_sentence(error.user_message)
return True
async def _handle_unexpected_error(
self,
error: Exception,
call_id: str,
) -> bool:
"""Handle an unexpected error."""
# Generic recovery attempt
await self.tts.queue_sentence(
"I apologize, I encountered an issue. Let me try again."
)
return True
async def _handle_fatal(self, call_id: str) -> None:
"""Handle a fatal error - transfer to agent."""
logger.error(f"Fatal error, transferring call {call_id}")
try:
# Inform user
await self.tts.queue_sentence(
"I apologize, I'm experiencing technical difficulties. "
"Let me connect you with someone who can help."
)
# Wait for message to play
await asyncio.sleep(3)
# Transfer to agent
await self.call_control.blind_transfer(
call_id=call_id,
dial_string="ext:1000", # Overflow/error queue
)
except Exception as e:
logger.error(f"Failed to transfer on fatal error: {e}")
# Last resort - hang up
await self.call_control.hangup(call_id)
def reset_error_count(self) -> None:
"""Reset error count (e.g., after successful turn)."""
self._error_count = 0
13.3 Fallback Hierarchy \
┌─────────────────────────────────────────────────────────────────────────────┐
│ FALLBACK HIERARCHY │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ COMPONENT PRIMARY FALLBACK 1 FALLBACK 2 │
│ ───────────────────────────────────────────────────────────────────── │
│ │
│ STT Deepgram Local Whisper Transfer │
│ Nova-2 (on-device) to agent │
│ │
│ LLM Claude Claude Cached │
│ Sonnet Haiku responses │
│ │
│ TTS Chatterbox Resemble.ai Pre-recorded │
│ (self-hosted) (cloud) phrases │
│ │
│ Tools Primary Retry with Apologize │
│ webhook backoff & skip │
│ │
│ Call Control GoToConnect Retry Transfer │
│ API (3 attempts) queue │
│ │
│ │
│ DECISION LOGIC: │
│ 1. Try primary with timeout │
│ 2. If fail, increment failure counter │
│ 3. If counter > threshold, switch to fallback │
│ 4. Periodically health-check primary to restore │
│ 5. If all fallbacks fail, transfer to human │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
- Performance Optimization \
14.1 Optimization Techniques \
| Technique | Component | Impact | Implementation |
|---|---|---|---|
| Connection pooling | All APIs | -30ms | Reuse HTTP connections |
| Streaming | STT, LLM, TTS | -300ms | Process incrementally |
| Sentence buffering | TTS | -100ms | Start TTS before response complete |
| Speculative execution | STT→LLM | -150ms | Start LLM on interim transcript |
| Pre-warming | All | -50ms | Keep connections hot |
| Edge caching | TTS | -100ms | Cache common phrases |
| Parallel processing | Tools | Variable | Execute tools concurrently |
14.2 Connection Management \
# pipeline/optimization/connections.py
import httpx
from typing import Dict
class ConnectionPool:
"""
Manage persistent connections to external services.
"""
def __init__(self):
self._clients: Dict[str, httpx.AsyncClient] = {}
async def get_client(
self,
service: str,
base_url: str,
**kwargs,
) -> httpx.AsyncClient:
"""Get or create a persistent client."""
if service not in self._clients:
self._clients[service] = httpx.AsyncClient(
base_url=base_url,
http2=True, # Use HTTP/2 for multiplexing
limits=httpx.Limits(
max_keepalive_connections=10,
max_connections=20,
),
timeout=httpx.Timeout(30.0, connect=5.0),
**kwargs,
)
return self._clients[service]
async def close_all(self) -> None:
"""Close all clients."""
for client in self._clients.values():
await client.aclose()
self._clients.clear()
14.3 Phrase Caching \
# pipeline/optimization/phrase_cache.py
from typing import Optional
import hashlib
class PhraseCache:
"""
Cache synthesized audio for common phrases.
Pre-generates and caches audio for frequently
used phrases to reduce TTS latency.
"""
COMMON_PHRASES = [
"Hello, thank you for calling.",
"How can I help you today?",
"Let me check that for you.",
"One moment please.",
"Is there anything else I can help you with?",
"Thank you for calling. Goodbye.",
"I apologize, I didn't catch that. Could you please repeat?",
"I'm having trouble understanding. Let me connect you with someone.",
]
def __init__(
self,
redis: "Redis",
tts: "ChatterboxTTS",
):
self.redis = redis
self.tts = tts
async def warm_cache(self, voice_id: str) -> None:
"""Pre-generate audio for common phrases."""
for phrase in self.COMMON_PHRASES:
key = self._cache_key(phrase, voice_id)
# Check if already cached
if await self.redis.exists(key):
continue
# Generate and cache
audio = await self.tts.synthesize(phrase, voice_id)
await self.redis.setex(
key,
86400, # 24 hours
audio,
)
logger.info(f"Warmed phrase cache for voice {voice_id}")
async def get(
self,
phrase: str,
voice_id: str = "default",
) -> Optional[bytes]:
"""Get cached audio for a phrase."""
key = self._cache_key(phrase, voice_id)
return await self.redis.get(key)
async def get_similar(
self,
phrase: str,
voice_id: str = "default",
) -> Optional[bytes]:
"""Find a similar cached phrase."""
# Normalize phrase
normalized = phrase.lower().strip()
for cached_phrase in self.COMMON_PHRASES:
if self._is_similar(normalized, cached_phrase.lower()):
return await self.get(cached_phrase, voice_id)
return None
def _cache_key(self, phrase: str, voice_id: str) -> str:
"""Generate cache key for phrase."""
phrase_hash = hashlib.md5(phrase.encode()).hexdigest()[:8]
return f"tts:cache:{voice_id}:{phrase_hash}"
def _is_similar(self, a: str, b: str) -> bool:
"""Check if two phrases are similar enough."""
# Simple word overlap check
words_a = set(a.split())
words_b = set(b.split())
overlap = len(words_a & words_b)
total = len(words_a | words_b)
return overlap / total > 0.7 if total > 0 else False
- Monitoring and Debugging \
15.1 Metrics \
# pipeline/monitoring/metrics.py
from prometheus_client import Counter, Histogram, Gauge
# Latency metrics
PIPELINE_LATENCY = Histogram(
"voice_pipeline_latency_ms",
"End-to-end pipeline latency",
["call_id", "phase"],
buckets=[100, 200, 300, 500, 750, 1000, 1500, 2000, 3000, 5000],
)
STT_LATENCY = Histogram(
"voice_stt_latency_ms",
"Speech-to-text latency",
buckets=[50, 100, 150, 200, 300, 500],
)
LLM_TTFB = Histogram(
"voice_llm_ttfb_ms",
"LLM time to first token",
buckets=[100, 200, 300, 500, 750, 1000, 1500],
)
TTS_TTFB = Histogram(
"voice_tts_ttfb_ms",
"TTS time to first byte",
buckets=[50, 100, 150, 200, 300, 500],
)
# Counter metrics
TURNS_TOTAL = Counter(
"voice_turns_total",
"Total conversation turns",
["tenant_id", "agent_id"],
)
INTERRUPTIONS_TOTAL = Counter(
"voice_interruptions_total",
"Total user interruptions",
["tenant_id"],
)
ERRORS_TOTAL = Counter(
"voice_errors_total",
"Pipeline errors",
["component", "error_type"],
)
TOOL_CALLS_TOTAL = Counter(
"voice_tool_calls_total",
"Tool call executions",
["tool_name", "success"],
)
# Gauge metrics
ACTIVE_CALLS = Gauge(
"voice_active_calls",
"Currently active calls",
["tenant_id"],
)
PIPELINE_STATE = Gauge(
"voice_pipeline_state",
"Current pipeline state",
["call_id", "state"],
)
15.2 Logging \
# pipeline/monitoring/logging.py
import structlog
from typing import Any
def configure_logging():
"""Configure structured logging for the pipeline."""
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer(),
],
wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(),
cache_logger_on_first_use=True,
)
class PipelineLogger:
"""
Structured logger for voice pipeline events.
"""
def __init__(self, call_id: str, tenant_id: str):
self.logger = structlog.get_logger()
self.call_id = call_id
self.tenant_id = tenant_id
def turn_started(self, turn_id: str) -> None:
"""Log turn start."""
self.logger.info(
"turn_started",
call_id=self.call_id,
tenant_id=self.tenant_id,
turn_id=turn_id,
)
def transcript_received(
self,
turn_id: str,
transcript: str,
is_final: bool,
latency_ms: float,
) -> None:
"""Log transcript."""
self.logger.info(
"transcript_received",
call_id=self.call_id,
turn_id=turn_id,
transcript=transcript[:100], # Truncate
is_final=is_final,
latency_ms=latency_ms,
)
def llm_response(
self,
turn_id: str,
response_length: int,
ttfb_ms: float,
total_ms: float,
has_tool_calls: bool,
) -> None:
"""Log LLM response."""
self.logger.info(
"llm_response",
call_id=self.call_id,
turn_id=turn_id,
response_length=response_length,
ttfb_ms=ttfb_ms,
total_ms=total_ms,
has_tool_calls=has_tool_calls,
)
def tool_executed(
self,
turn_id: str,
tool_name: str,
success: bool,
execution_ms: float,
) -> None:
"""Log tool execution."""
self.logger.info(
"tool_executed",
call_id=self.call_id,
turn_id=turn_id,
tool_name=tool_name,
success=success,
execution_ms=execution_ms,
)
def interruption_detected(
self,
turn_id: str,
spoken_duration_ms: float,
) -> None:
"""Log interruption."""
self.logger.info(
"interruption_detected",
call_id=self.call_id,
turn_id=turn_id,
spoken_duration_ms=spoken_duration_ms,
)
def error(
self,
component: str,
error: Exception,
context: dict = None,
) -> None:
"""Log error."""
self.logger.error(
"pipeline_error",
call_id=self.call_id,
component=component,
error=str(error),
error_type=type(error).__name__,
context=context or {},
exc_info=True,
)
15.3 Debug Tools \
# pipeline/monitoring/debug.py
class PipelineDebugger:
"""
Debug tools for voice pipeline development.
"""
def __init__(self, redis: "Redis"):
self.redis = redis
async def capture_turn(
self,
call_id: str,
turn_id: str,
data: dict,
) -> None:
"""Capture turn data for debugging."""
key = f"debug:{call_id}:{turn_id}"
await self.redis.setex(
key,
3600, # 1 hour
json.dumps(data),
)
async def get_turn_capture(
self,
call_id: str,
turn_id: str,
) -> Optional[dict]:
"""Retrieve captured turn data."""
key = f"debug:{call_id}:{turn_id}"
data = await self.redis.get(key)
return json.loads(data) if data else None
async def replay_turn(
self,
call_id: str,
turn_id: str,
) -> dict:
"""Replay a captured turn for debugging."""
capture = await self.get_turn_capture(call_id, turn_id)
if not capture:
raise ValueError(f"No capture found for {call_id}:{turn_id}")
# Re-run through pipeline components
results = {
"original": capture,
"replay": {},
}
# Replay STT would require audio
# Replay LLM
if capture.get("transcript"):
# ... replay logic
pass
return results
## Appendix A: Configuration Reference {#appendix-a:-configuration-reference}
# config/pipeline.yaml
pipeline:
# VAD Configuration
vad:
model: "silero"
threshold: 0.5
min_speech_ms: 250
min_silence_ms: 300
window_size_ms: 100
# STT Configuration
stt:
provider: "deepgram"
model: "nova-2"
language: "en-US"
punctuate: true
interim_results: true
utterance_end_ms: 1000
# LLM Configuration
llm:
provider: "anthropic"
model: "claude-sonnet-4-20250514"
max_tokens: 1024
temperature: 0.7
timeout_s: 30
# TTS Configuration
tts:
provider: "chatterbox"
endpoint: "${CHATTERBOX_URL}"
sample_rate: 24000
default_voice: "professional_female"
# Latency Targets
latency:
e2e_target_ms: 1000
ttfb_target_ms: 500
stt_target_ms: 200
llm_ttfb_target_ms: 300
tts_ttfb_target_ms: 200
# Interruption
interruption:
vad_threshold: 0.7
min_duration_ms: 150
backchannel_max_ms: 500
# Error Handling
errors:
max_retries: 3
max_errors_per_call: 5
fallback_to_agent: true
## Appendix B: Sequence Diagrams {#appendix-b:-sequence-diagrams}
### B.1 Normal Turn Flow {#b.1-normal-turn-flow}
User VAD STT LLM TTS Speaker
│ │ │ │ │ │
│──audio────▶│ │ │ │ │
│ │──speech────▶│ │ │ │
│ │ detected │ │ │ │
│──audio────▶│─────────────│──audio─────▶│ │ │
│ │ │ │ │ │
│ │ │◀──interim───│ │ │
│──silence──▶│ │ │ │ │
│ │──endpoint──▶│ │ │ │
│ │ │◀──final─────│ │ │
│ │ │ │ │ │
│ │ │─────────────│──context───▶│ │
│ │ │ │ │ │
│ │ │ │◀─tokens─────│ │
│ │ │ │ │ │
│ │ │ │──sentence──▶│ │
│ │ │ │ │──audio─────▶│
│ │ │ │◀─tokens─────│ │
│ │ │ │──sentence──▶│ │
│ │ │ │ │──audio─────▶│
│ │ │ │ │ │
Document History \
| Version | Date | Author | Changes |
|---|---|---|---|
| 1.0 | 2026-01-16 | Claude | Initial document |
End of Document