Normalized for Mintlify from
knowledge-base/aiconnected-apps-and-modules/modules/aiConnected-voice/webrtc-bridge-technical-design.mdx.Voice by aiConnected — WebRTC Bridge Technical Design \
Document Information \
| Field | Value |
|---|---|
| Document ID | ARCH-004 |
| Version | 1.0 |
| Last Updated | 2026-01-16 |
| Status | Draft |
| Owner | Engineering |
| Dependencies | ARCH-001, ARCH-002, ARCH-003 |
Table of Contents \
Voice by aiConnected — WebRTC Bridge Technical Design Document Information Table of Contents 1. Introduction 1.1 Purpose 1.2 Scope 1.3 Design Goals 1.4 Key Terminology 2. Architecture Overview 2.1 High-Level Architecture 2.2 Component Responsibilities 2.3 Data Flow Summary 2.4 Threading Model 3. aiortc Fundamentals 3.1 aiortc Overview 3.2 Core Classes 3.3 RTCPeerConnection Lifecycle 3.4 Basic aiortc Setup 3.5 Custom Audio Tracks 4. SDP Exchange 4.1 SDP Overview 4.2 SDP Structure 4.3 SDP Offer/Answer Flow with GoToConnect 4.4 SDP Negotiator Implementation 4.5 SDP Examples 5. ICE Candidate Handling 5.1 ICE Overview 5.2 ICE Connection Process 5.3 ICE Manager Implementation 5.4 Trickle ICE Flow 6. Audio Frame Processing 6.1 Audio Frame Fundamentals 6.2 PyAV AudioFrame 6.3 Audio Resampling 6.4 Audio Format Conversion 7. Codec Management 7.1 Supported Codecs 7.2 Codec Handler Implementation 7.3 Codec Negotiation Strategy 7.4 Packet Loss Concealment 8. GoToConnect Integration 8.1 GoTo WebRTC Flow 8.2 GoTo Connection Handler 9. LiveKit Integration 9.1 LiveKit Overview 9.2 LiveKit Room Architecture 9.3 LiveKit Connection Handler 10. Bidirectional Bridge 10.1 Bridge Architecture 10.2 Audio Bridge Implementation 10.3 Audio Forking for STT 11. Connection Lifecycle Management 11.1 Complete Call Lifecycle 11.2 Lifecycle Manager Implementation 11.3 Graceful Shutdown 12. Error Handling and Recovery 12.1 Error Categories 12.2 Error Handler Implementation 12.3 ICE Restart Manager 13. Performance Optimization 13.1 Performance Targets 13.2 Audio Buffer Optimization 13.3 Connection Pooling 14. Monitoring and Debugging 14.1 Metrics Collection 14.2 Structured Logging 14.3 Debug Tools 15. Testing Strategy 15.1 Test Categories 15.2 Unit Test Examples 15.3 Integration Test Examples 15.4 Load Test Framework 16. Appendix 16.1 SDP Reference 16.2 Audio Format Reference Document Revision History
- Introduction \
1.1 Purpose \
This document provides the technical design for the WebRTC Bridge component in Voice by aiConnected. The WebRTC Bridge is the critical infrastructure that connects telephone calls from GoToConnect to our AI processing pipeline via LiveKit. The bridge must handle real-time audio with strict latency requirements while managing the complexity of two different WebRTC implementations, codec negotiation, and bidirectional audio streaming.1.2 Scope \
This document covers:- aiortc library implementation for WebRTC
- SDP offer/answer exchange with GoToConnect
- ICE candidate handling and connectivity
- Audio frame capture and injection
- Codec negotiation and transcoding
- LiveKit room integration
- Bidirectional audio bridging
- Error handling and recovery
- GoToConnect API authentication (see ARCH-002)
- Voice pipeline processing (see ARCH-003)
- LiveKit server deployment
1.3 Design Goals \
| Goal | Target | Priority |
|---|---|---|
| Audio latency | < 50ms bridge overhead | Critical |
| Connection setup | < 2 seconds | Critical |
| Audio quality | No degradation | High |
| Reliability | 99.9% call completion | High |
| Scalability | 1000 concurrent calls | Medium |
| Resource efficiency | < 50MB RAM per call | Medium |
1.4 Key Terminology \
| Term | Definition |
|---|---|
| WebRTC | Web Real-Time Communication - protocol for real-time audio/video |
| SDP | Session Description Protocol - describes media sessions |
| ICE | Interactive Connectivity Establishment - NAT traversal |
| STUN | Session Traversal Utilities for NAT - discover public IP |
| TURN | Traversal Using Relays around NAT - relay server |
| RTP | Real-time Transport Protocol - media transport |
| RTCP | RTP Control Protocol - quality feedback |
| aiortc | Python asyncio WebRTC implementation |
| LiveKit | Open-source WebRTC SFU for scalable real-time apps |
- Architecture Overview \
2.1 High-Level Architecture \
┌─────────────────────────────────────────────────────────────────────────────┐
│ WEBRTC BRIDGE ARCHITECTURE │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ PSTN NETWORK │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ GoToConnect │ │
│ │ │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ SIP/PSTN │───▶│ Media │───▶│ WebRTC │ │ │
│ │ │ Gateway │ │ Server │ │ Endpoint │ │ │
│ │ └──────────────┘ └──────────────┘ └──────┬───────┘ │ │
│ │ │ │ │
│ └──────────────────────────────────────────────────┼───────────────────┘ │
│ │ │
│ WebRTC (DTLS-SRTP) │
│ │ │
│ ┌──────────────────────────────────────────────────┼───────────────────┐ │
│ │ WEBRTC BRIDGE │ │ │
│ │ │ │ │
│ │ ┌──────────────────────────────────────────────▼─────────────┐ │ │
│ │ │ GoTo Connection │ │ │
│ │ │ │ │ │
│ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │
│ │ │ │ SDP │ │ ICE │ │ Audio │ │ │ │
│ │ │ │ Negotiator │ │ Agent │ │ Track │ │ │ │
│ │ │ └─────────────┘ └─────────────┘ └──────┬──────┘ │ │ │
│ │ │ │ │ │ │
│ │ └───────────────────────────────────────────┼────────────────┘ │ │
│ │ │ │ │
│ │ ┌───────────────────────────────────────────▼────────────────┐ │ │
│ │ │ AUDIO BRIDGE │ │ │
│ │ │ │ │ │
│ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │
│ │ │ │ Decoder │ │ Resampler │ │ Encoder │ │ │ │
│ │ │ │ (Opus/G711)│ │ (48k↔16k) │ │ (Opus) │ │ │ │
│ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │ │
│ │ │ │ │ │
│ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │
│ │ │ │ Bidirectional Buffer │ │ │ │
│ │ │ │ GoTo → Pipeline Pipeline → GoTo │ │ │ │
│ │ │ └─────────────────────────────────────────────────────┘ │ │ │
│ │ │ │ │ │
│ │ └───────────────────────────────────────────┬────────────────┘ │ │
│ │ │ │ │
│ │ ┌───────────────────────────────────────────▼────────────────┐ │ │
│ │ │ LiveKit Connection │ │ │
│ │ │ │ │ │
│ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │
│ │ │ │ Room │ │ Audio │ │ Audio │ │ │ │
│ │ │ │ Client │ │ Source │ │ Sink │ │ │ │
│ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │ │
│ │ │ │ │ │
│ │ └─────────────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ └──────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ WebRTC (DTLS-SRTP) │
│ │ │
│ ┌──────────────────────────────────────────────────┼───────────────────┐ │
│ │ LiveKit SFU │ │ │
│ │ ▼ │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ Room │◀──▶│ Selective │◀──▶│ Agent │ │ │
│ │ │ Manager │ │ Forwarder │ │ Worker │ │ │
│ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │
│ │ │ │
│ └──────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
2.2 Component Responsibilities \
| Component | Responsibility |
|---|---|
| GoTo Connection | Manage WebRTC session with GoToConnect |
| SDP Negotiator | Handle offer/answer exchange |
| ICE Agent | Manage connectivity establishment |
| Audio Track | Receive/send RTP audio packets |
| Audio Bridge | Convert and route audio between endpoints |
| Decoder | Decode incoming audio (Opus/G.711) |
| Resampler | Convert sample rates (48kHz ↔ 16kHz) |
| Encoder | Encode outgoing audio (Opus) |
| LiveKit Connection | Manage connection to LiveKit room |
| Room Client | Join/leave LiveKit rooms |
| Audio Source | Publish audio to LiveKit |
| Audio Sink | Subscribe to audio from LiveKit |
2.3 Data Flow Summary \
┌─────────────────────────────────────────────────────────────────────────────┐
│ AUDIO DATA FLOW │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ INBOUND (Caller → AI): │
│ ───────────────────── │
│ │
│ GoToConnect Bridge LiveKit │
│ │ │ │ │
│ │ RTP/Opus or │ │ │
│ │ RTP/PCMU │ │ │
│ │──────────────────▶ │ │ │
│ │ │ decode │ │
│ │ │ ──────▶ PCM 48kHz │ │
│ │ │ │ │ │
│ │ │ resample │ │
│ │ │ ──────▶ PCM 16kHz │ │
│ │ │ │ │ │
│ │ │ │──────────────▶│ (to STT) │
│ │ │ │ │ │
│ │ │ encode │ │
│ │ │ ──────▶ Opus 48kHz │ │
│ │ │ │ │ │
│ │ │ │──────────────▶│ (to Room) │
│ │ │ │ │
│ │
│ OUTBOUND (AI → Caller): │
│ ────────────────────── │
│ │
│ GoToConnect Bridge LiveKit │
│ │ │ │ │
│ │ │ │◀──────────────│ (from TTS) │
│ │ │ │ │ │
│ │ │ PCM 24kHz │ │
│ │ │ │ │ │
│ │ │ resample │ │
│ │ │ ──────▶ PCM 48kHz │ │
│ │ │ │ │ │
│ │ │ encode │ │
│ │ │ ──────▶ Opus/PCMU │ │
│ │ │ │ │ │
│ │ ◀───────────────────────────── │ │
│ │ RTP │ │ │
│ │ │ │ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
2.4 Threading Model \
┌─────────────────────────────────────────────────────────────────────────────┐
│ THREADING MODEL │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ MAIN ASYNCIO EVENT LOOP │ │
│ │ │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ WebRTC │ │ LiveKit │ │ Control │ │ │
│ │ │ Signaling │ │ Signaling │ │ Logic │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ AIORTC MEDIA THREAD │ │
│ │ │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ RTP │ │ Codec │ │ RTCP │ │ │
│ │ │ Processing │ │ Encode/ │ │ Processing │ │ │
│ │ │ │ │ Decode │ │ │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ AUDIO PROCESSING THREAD │ │
│ │ │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Resampling │ │ Buffer │ │ Format │ │ │
│ │ │ │ │ Management │ │ Conversion │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
│ COMMUNICATION: │
│ • asyncio.Queue for cross-thread audio transfer │
│ • Thread-safe buffers for frame handoff │
│ • Events for synchronization │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
- aiortc Fundamentals \
3.1 aiortc Overview \
aiortc is a Python implementation of WebRTC and ORTC (Object Real-Time Communications). It provides:- Full WebRTC stack in pure Python
- asyncio-based for non-blocking I/O
- Support for audio/video/data channels
- Built-in codecs (Opus, VP8, H.264)
3.2 Core Classes \
# bridge/webrtc/core.py
from aiortc import (
RTCPeerConnection,
RTCSessionDescription,
RTCIceCandidate,
RTCConfiguration,
RTCIceServer,
MediaStreamTrack,
)
from aiortc.contrib.media import MediaPlayer, MediaRecorder
from aiortc.mediastreams import AudioStreamTrack
import av
# Key aiortc classes we use:
# RTCPeerConnection - Main WebRTC connection object
# RTCSessionDescription - SDP offer/answer
# RTCIceCandidate - ICE candidate for connectivity
# RTCConfiguration - STUN/TURN server config
# MediaStreamTrack - Base class for audio/video tracks
# AudioStreamTrack - Audio-specific track implementation
3.3 RTCPeerConnection Lifecycle \
┌─────────────────────────────────────────────────────────────────────────────┐
│ RTCPeerConnection STATE MACHINE │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────┐ │
│ │ new │ │
│ └────┬────┘ │
│ │ │
│ createOffer() or │
│ setRemoteDescription() │
│ │ │
│ ▼ │
│ ┌───────────────┐ │
│ │ connecting │ │
│ └───────┬───────┘ │
│ │ │
│ ICE + DTLS complete │
│ │ │
│ ▼ │
│ ┌───────────────┐ │
│ │ connected │◀──────────┐ │
│ └───────┬───────┘ │ │
│ │ ICE restart │
│ ICE disconnected │ │
│ │ │ │
│ ▼ │ │
│ ┌────────────────┐ │ │
│ │ disconnected │───────────┘ │
│ └────────┬───────┘ │
│ │ │
│ ICE failed or │
│ close() called │
│ │ │
│ ▼ │
│ ┌───────────────┐ │
│ │ closed │ │
│ └───────────────┘ │
│ │
│ SIGNALING STATES: │
│ • stable - No offer/answer in progress │
│ • have-local-offer - Local offer created, awaiting answer │
│ • have-remote-offer - Remote offer received, need to answer │
│ • have-local-pranswer - Local provisional answer │
│ • have-remote-pranswer - Remote provisional answer │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
3.4 Basic aiortc Setup \
# bridge/webrtc/connection.py
import asyncio
from aiortc import (
RTCPeerConnection,
RTCConfiguration,
RTCIceServer,
)
from dataclasses import dataclass
from typing import Optional, Callable, List
@dataclass
class WebRTCConfig:
"""Configuration for WebRTC connection."""
stun_servers: List[str] = None
turn_servers: List[dict] = None
ice_transport_policy: str = "all" # "all" or "relay"
bundle_policy: str = "max-bundle"
def __post_init__(self):
if self.stun_servers is None:
self.stun_servers = [
"stun:stun.l.google.com:19302",
"stun:stun1.l.google.com:19302",
]
class WebRTCConnection:
"""
Wrapper around aiortc RTCPeerConnection.
Provides a simplified interface for managing
WebRTC connections with proper lifecycle handling.
"""
def __init__(
self,
config: WebRTCConfig = None,
call_id: str = None,
):
self.config = config or WebRTCConfig()
self.call_id = call_id
self._pc: Optional[RTCPeerConnection] = None
self._local_tracks: List[MediaStreamTrack] = []
self._remote_tracks: List[MediaStreamTrack] = []
# Event handlers
self.on_track: Optional[Callable] = None
self.on_ice_candidate: Optional[Callable] = None
self.on_connection_state_change: Optional[Callable] = None
self.on_ice_connection_state_change: Optional[Callable] = None
async def initialize(self) -> None:
"""Initialize the peer connection."""
ice_servers = self._build_ice_servers()
rtc_config = RTCConfiguration(
iceServers=ice_servers,
iceTransportPolicy=self.config.ice_transport_policy,
bundlePolicy=self.config.bundle_policy,
)
self._pc = RTCPeerConnection(configuration=rtc_config)
# Set up event handlers
self._pc.on("track", self._handle_track)
self._pc.on("icecandidate", self._handle_ice_candidate)
self._pc.on("connectionstatechange", self._handle_connection_state_change)
self._pc.on("iceconnectionstatechange", self._handle_ice_connection_state_change)
logger.info(f"[{self.call_id}] WebRTC connection initialized")
def _build_ice_servers(self) -> List[RTCIceServer]:
"""Build ICE server configuration."""
servers = []
# Add STUN servers
for url in self.config.stun_servers:
servers.append(RTCIceServer(urls=[url]))
# Add TURN servers
if self.config.turn_servers:
for turn in self.config.turn_servers:
servers.append(RTCIceServer(
urls=[turn["url"]],
username=turn.get("username"),
credential=turn.get("credential"),
))
return servers
async def add_track(self, track: MediaStreamTrack) -> None:
"""Add a local track to the connection."""
if self._pc:
self._pc.addTrack(track)
self._local_tracks.append(track)
logger.debug(f"[{self.call_id}] Added track: {track.kind}")
async def create_offer(self) -> str:
"""Create an SDP offer."""
if not self._pc:
raise RuntimeError("Connection not initialized")
offer = await self._pc.createOffer()
await self._pc.setLocalDescription(offer)
logger.debug(f"[{self.call_id}] Created offer")
return self._pc.localDescription.sdp
async def create_answer(self) -> str:
"""Create an SDP answer."""
if not self._pc:
raise RuntimeError("Connection not initialized")
answer = await self._pc.createAnswer()
await self._pc.setLocalDescription(answer)
logger.debug(f"[{self.call_id}] Created answer")
return self._pc.localDescription.sdp
async def set_remote_description(
self,
sdp: str,
sdp_type: str,
) -> None:
"""Set the remote SDP description."""
if not self._pc:
raise RuntimeError("Connection not initialized")
description = RTCSessionDescription(sdp=sdp, type=sdp_type)
await self._pc.setRemoteDescription(description)
logger.debug(f"[{self.call_id}] Set remote description: {sdp_type}")
async def add_ice_candidate(
self,
candidate: str,
sdp_mid: str,
sdp_mline_index: int,
) -> None:
"""Add a remote ICE candidate."""
if not self._pc:
raise RuntimeError("Connection not initialized")
ice_candidate = RTCIceCandidate(
candidate=candidate,
sdpMid=sdp_mid,
sdpMLineIndex=sdp_mline_index,
)
await self._pc.addIceCandidate(ice_candidate)
logger.debug(f"[{self.call_id}] Added ICE candidate")
async def close(self) -> None:
"""Close the connection."""
if self._pc:
await self._pc.close()
self._pc = None
# Stop local tracks
for track in self._local_tracks:
track.stop()
self._local_tracks.clear()
self._remote_tracks.clear()
logger.info(f"[{self.call_id}] WebRTC connection closed")
# Event handlers
def _handle_track(self, track: MediaStreamTrack) -> None:
"""Handle incoming track."""
logger.info(f"[{self.call_id}] Received track: {track.kind}")
self._remote_tracks.append(track)
if self.on_track:
asyncio.create_task(self.on_track(track))
def _handle_ice_candidate(self, candidate: RTCIceCandidate) -> None:
"""Handle local ICE candidate."""
if candidate and self.on_ice_candidate:
asyncio.create_task(self.on_ice_candidate(candidate))
def _handle_connection_state_change(self) -> None:
"""Handle connection state change."""
state = self._pc.connectionState if self._pc else "closed"
logger.info(f"[{self.call_id}] Connection state: {state}")
if self.on_connection_state_change:
asyncio.create_task(self.on_connection_state_change(state))
def _handle_ice_connection_state_change(self) -> None:
"""Handle ICE connection state change."""
state = self._pc.iceConnectionState if self._pc else "closed"
logger.info(f"[{self.call_id}] ICE state: {state}")
if self.on_ice_connection_state_change:
asyncio.create_task(self.on_ice_connection_state_change(state))
# Properties
@property
def connection_state(self) -> str:
"""Get current connection state."""
return self._pc.connectionState if self._pc else "closed"
@property
def ice_connection_state(self) -> str:
"""Get current ICE connection state."""
return self._pc.iceConnectionState if self._pc else "closed"
@property
def signaling_state(self) -> str:
"""Get current signaling state."""
return self._pc.signalingState if self._pc else "closed"
3.5 Custom Audio Tracks \
# bridge/webrtc/tracks.py
import asyncio
import fractions
import time
from typing import Optional
from aiortc import MediaStreamTrack
from av import AudioFrame
import numpy as np
class AudioTrackSink(MediaStreamTrack):
"""
Audio track that receives frames from a WebRTC peer.
Captures incoming audio and makes it available
for processing by the voice pipeline.
"""
kind = "audio"
def __init__(
self,
track: MediaStreamTrack,
on_frame: callable = None,
):
super().__init__()
self._track = track
self.on_frame = on_frame
self._running = True
async def recv(self) -> AudioFrame:
"""Receive and process audio frames."""
frame = await self._track.recv()
if self.on_frame and self._running:
await self.on_frame(frame)
return frame
def stop(self) -> None:
"""Stop receiving frames."""
self._running = False
super().stop()
class AudioTrackSource(MediaStreamTrack):
"""
Audio track that generates frames for a WebRTC peer.
Receives audio from the voice pipeline and sends
it to the remote peer.
"""
kind = "audio"
def __init__(
self,
sample_rate: int = 48000,
channels: int = 1,
samples_per_frame: int = 960, # 20ms at 48kHz
):
super().__init__()
self.sample_rate = sample_rate
self.channels = channels
self.samples_per_frame = samples_per_frame
# Frame timing
self._frame_duration = samples_per_frame / sample_rate
self._start_time: Optional[float] = None
self._frame_count = 0
# Audio buffer
self._queue: asyncio.Queue[np.ndarray] = asyncio.Queue(maxsize=50)
# Silence frame for when buffer is empty
self._silence = np.zeros(
(samples_per_frame, channels),
dtype=np.int16,
)
async def recv(self) -> AudioFrame:
"""Generate the next audio frame."""
# Initialize timing on first frame
if self._start_time is None:
self._start_time = time.time()
# Calculate expected time for this frame
expected_time = self._start_time + (self._frame_count * self._frame_duration)
# Wait until it's time to send this frame
now = time.time()
if now < expected_time:
await asyncio.sleep(expected_time - now)
# Get audio data from queue or use silence
try:
audio_data = self._queue.get_nowait()
except asyncio.QueueEmpty:
audio_data = self._silence
# Create AudioFrame
frame = AudioFrame(
format="s16",
layout="mono" if self.channels == 1 else "stereo",
samples=self.samples_per_frame,
)
# Set frame data
frame.planes[0].update(audio_data.tobytes())
frame.sample_rate = self.sample_rate
frame.pts = self._frame_count * self.samples_per_frame
frame.time_base = fractions.Fraction(1, self.sample_rate)
self._frame_count += 1
return frame
async def push_audio(self, audio_data: np.ndarray) -> None:
"""
Push audio data to be sent.
Args:
audio_data: Audio samples as numpy array (int16)
"""
try:
self._queue.put_nowait(audio_data)
except asyncio.QueueFull:
# Drop oldest frame
try:
self._queue.get_nowait()
self._queue.put_nowait(audio_data)
except asyncio.QueueEmpty:
pass
def clear_buffer(self) -> None:
"""Clear the audio buffer."""
while not self._queue.empty():
try:
self._queue.get_nowait()
except asyncio.QueueEmpty:
break
def stop(self) -> None:
"""Stop the track."""
self.clear_buffer()
super().stop()
- SDP Exchange \
4.1 SDP Overview \
Session Description Protocol (SDP) describes multimedia sessions. In WebRTC, SDP is used to negotiate:- Media types (audio, video, data)
- Codecs and their parameters
- Transport information
- Security parameters (DTLS fingerprint)
4.2 SDP Structure \
┌─────────────────────────────────────────────────────────────────────────────┐
│ SDP STRUCTURE │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ SESSION LEVEL (v=, o=, s=, t=) │
│ ├── Protocol version │
│ ├── Origin (session ID, username) │
│ ├── Session name │
│ └── Timing (start/stop time) │
│ │
│ MEDIA LEVEL (m=, c=, a=) │
│ ├── Media type (audio/video) │
│ ├── Port number │
│ ├── Protocol (UDP/TLS/RTP/SAVPF) │
│ ├── Format list (payload types) │
│ ├── Connection info (IP address) │
│ └── Attributes │
│ ├── rtpmap (codec mapping) │
│ ├── fmtp (format parameters) │
│ ├── ice-ufrag, ice-pwd (ICE credentials) │
│ ├── fingerprint (DTLS) │
│ ├── setup (DTLS role) │
│ ├── mid (media ID) │
│ ├── sendrecv/sendonly/recvonly │
│ └── candidate (ICE candidates) │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
4.3 SDP Offer/Answer Flow with GoToConnect \
┌─────────────────────────────────────────────────────────────────────────────┐
│ SDP OFFER/ANSWER FLOW │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ INBOUND CALL (GoTo initiates): │
│ ───────────────────────────── │
│ │
│ GoToConnect Bridge │
│ │ │ │
│ │ 1. Webhook: call.ringing │ │
│ │────────────────────────────▶ │
│ │ │ │
│ │ 2. POST /calls/{id}/answer│ │
│ │◀──────────────────────────── │
│ │ │ │
│ │ 3. SDP Offer │ │
│ │────────────────────────────▶ │
│ │ │ 4. Parse offer │
│ │ │ 5. Create answer │
│ │ │ │
│ │ 6. SDP Answer │ │
│ │◀──────────────────────────── │
│ │ │ │
│ │ 7. ICE Candidates │ │
│ │◀───────────────────────────▶ (trickle ICE) │
│ │ │ │
│ │ 8. Media flows │ │
│ │◀═══════════════════════════▶ │
│ │ │ │
│ │
│ OUTBOUND CALL (Bridge initiates): │
│ ───────────────────────────────── │
│ │
│ GoToConnect Bridge │
│ │ │ │
│ │ 1. POST /calls │ │
│ │ (with SDP offer) │ │
│ │◀──────────────────────────── │
│ │ │ │
│ │ 2. 200 OK with SDP answer │ │
│ │────────────────────────────▶ │
│ │ │ │
│ │ 3. ICE Candidates │ │
│ │◀───────────────────────────▶ │
│ │ │ │
│ │ 4. Media flows │ │
│ │◀═══════════════════════════▶ │
│ │ │ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
4.4 SDP Negotiator Implementation \
# bridge/webrtc/sdp_negotiator.py
import re
from dataclasses import dataclass, field
from typing import List, Optional, Dict
from enum import Enum
class SDPType(Enum):
OFFER = "offer"
ANSWER = "answer"
PRANSWER = "pranswer"
@dataclass
class CodecInfo:
"""Information about a codec in SDP."""
payload_type: int
name: str
clock_rate: int
channels: int = 1
fmtp: Optional[str] = None
@dataclass
class MediaDescription:
"""Parsed media section from SDP."""
media_type: str # audio, video
port: int
protocol: str
formats: List[int]
codecs: List[CodecInfo] = field(default_factory=list)
direction: str = "sendrecv"
ice_ufrag: Optional[str] = None
ice_pwd: Optional[str] = None
fingerprint: Optional[str] = None
setup: Optional[str] = None
mid: Optional[str] = None
candidates: List[str] = field(default_factory=list)
@dataclass
class ParsedSDP:
"""Fully parsed SDP."""
version: int
origin: str
session_name: str
timing: str
media: List[MediaDescription] = field(default_factory=list)
ice_ufrag: Optional[str] = None
ice_pwd: Optional[str] = None
fingerprint: Optional[str] = None
groups: List[str] = field(default_factory=list)
class SDPNegotiator:
"""
Handles SDP parsing, modification, and generation.
Manages codec negotiation and ensures compatibility
between GoToConnect and our WebRTC implementation.
"""
# Preferred codecs in order
PREFERRED_AUDIO_CODECS = [
("opus", 48000, 2), # Opus stereo
("PCMU", 8000, 1), # G.711 μ-law
("PCMA", 8000, 1), # G.711 A-law
]
def __init__(self):
self._local_ice_ufrag: Optional[str] = None
self._local_ice_pwd: Optional[str] = None
def parse_sdp(self, sdp: str) -> ParsedSDP:
"""
Parse an SDP string into structured data.
Args:
sdp: Raw SDP string
Returns:
Parsed SDP structure
"""
lines = sdp.strip().split('\r\n')
if not lines[0].startswith('v='):
lines = sdp.strip().split('\n')
parsed = ParsedSDP(
version=0,
origin="",
session_name="",
timing="",
)
current_media: Optional[MediaDescription] = None
for line in lines:
if not line or '=' not in line:
continue
key, value = line[0], line[2:]
# Session-level attributes
if key == 'v':
parsed.version = int(value)
elif key == 'o':
parsed.origin = value
elif key == 's':
parsed.session_name = value
elif key == 't':
parsed.timing = value
elif key == 'm':
# New media section
if current_media:
parsed.media.append(current_media)
current_media = self._parse_media_line(value)
elif key == 'a' and current_media:
# Media-level attribute
self._parse_attribute(current_media, value)
elif key == 'a':
# Session-level attribute
self._parse_session_attribute(parsed, value)
if current_media:
parsed.media.append(current_media)
return parsed
def _parse_media_line(self, value: str) -> MediaDescription:
"""Parse m= line."""
parts = value.split()
media_type = parts[0]
port = int(parts[1])
protocol = parts[2]
formats = [int(f) for f in parts[3:]]
return MediaDescription(
media_type=media_type,
port=port,
protocol=protocol,
formats=formats,
)
def _parse_attribute(
self,
media: MediaDescription,
value: str,
) -> None:
"""Parse media-level attribute."""
if value.startswith("rtpmap:"):
codec = self._parse_rtpmap(value[7:])
if codec:
media.codecs.append(codec)
elif value.startswith("fmtp:"):
self._parse_fmtp(media, value[5:])
elif value.startswith("ice-ufrag:"):
media.ice_ufrag = value[10:]
elif value.startswith("ice-pwd:"):
media.ice_pwd = value[8:]
elif value.startswith("fingerprint:"):
media.fingerprint = value[12:]
elif value.startswith("setup:"):
media.setup = value[6:]
elif value.startswith("mid:"):
media.mid = value[4:]
elif value.startswith("candidate:"):
media.candidates.append(value)
elif value in ("sendrecv", "sendonly", "recvonly", "inactive"):
media.direction = value
def _parse_session_attribute(
self,
parsed: ParsedSDP,
value: str,
) -> None:
"""Parse session-level attribute."""
if value.startswith("ice-ufrag:"):
parsed.ice_ufrag = value[10:]
elif value.startswith("ice-pwd:"):
parsed.ice_pwd = value[8:]
elif value.startswith("fingerprint:"):
parsed.fingerprint = value[12:]
elif value.startswith("group:"):
parsed.groups.append(value[6:])
def _parse_rtpmap(self, value: str) -> Optional[CodecInfo]:
"""Parse rtpmap attribute."""
match = re.match(r'(\d+)\s+(\w+)/(\d+)(?:/(\d+))?', value)
if match:
return CodecInfo(
payload_type=int(match.group(1)),
name=match.group(2),
clock_rate=int(match.group(3)),
channels=int(match.group(4)) if match.group(4) else 1,
)
return None
def _parse_fmtp(
self,
media: MediaDescription,
value: str,
) -> None:
"""Parse fmtp attribute and attach to codec."""
parts = value.split(' ', 1)
if len(parts) == 2:
payload_type = int(parts[0])
for codec in media.codecs:
if codec.payload_type == payload_type:
codec.fmtp = parts[1]
break
def negotiate_codecs(
self,
offered: List[CodecInfo],
) -> List[CodecInfo]:
"""
Negotiate codecs from an offer.
Returns codecs in our preferred order that are
also supported by the remote peer.
"""
negotiated = []
for pref_name, pref_rate, pref_channels in self.PREFERRED_AUDIO_CODECS:
for offered_codec in offered:
if (offered_codec.name.lower() == pref_name.lower() and
offered_codec.clock_rate == pref_rate):
negotiated.append(offered_codec)
break
return negotiated
def generate_answer(
self,
offer: ParsedSDP,
local_fingerprint: str,
local_ice_ufrag: str,
local_ice_pwd: str,
) -> str:
"""
Generate an SDP answer for an offer.
Args:
offer: Parsed SDP offer
local_fingerprint: Our DTLS fingerprint
local_ice_ufrag: Our ICE username fragment
local_ice_pwd: Our ICE password
Returns:
SDP answer string
"""
lines = [
"v=0",
f"o=- {int(time.time())} 1 IN IP4 0.0.0.0",
"s=-",
"t=0 0",
]
# Session-level attributes
if offer.groups:
for group in offer.groups:
lines.append(f"a=group:{group}")
lines.append("a=msid-semantic: WMS *")
# Generate media sections
for media in offer.media:
if media.media_type == "audio":
media_lines = self._generate_audio_answer(
media,
local_fingerprint,
local_ice_ufrag,
local_ice_pwd,
)
lines.extend(media_lines)
return '\r\n'.join(lines) + '\r\n'
def _generate_audio_answer(
self,
offer_media: MediaDescription,
fingerprint: str,
ice_ufrag: str,
ice_pwd: str,
) -> List[str]:
"""Generate audio media section for answer."""
# Negotiate codecs
negotiated = self.negotiate_codecs(offer_media.codecs)
if not negotiated:
raise ValueError("No compatible audio codecs found")
# Build format list
formats = [str(c.payload_type) for c in negotiated]
lines = [
f"m=audio 9 UDP/TLS/RTP/SAVPF {' '.join(formats)}",
"c=IN IP4 0.0.0.0",
]
# ICE credentials
lines.append(f"a=ice-ufrag:{ice_ufrag}")
lines.append(f"a=ice-pwd:{ice_pwd}")
# DTLS
lines.append(f"a=fingerprint:{fingerprint}")
# Setup role (we're answering, so passive or active based on offer)
if offer_media.setup == "actpass":
lines.append("a=setup:active")
elif offer_media.setup == "active":
lines.append("a=setup:passive")
else:
lines.append("a=setup:active")
# Media ID
if offer_media.mid:
lines.append(f"a=mid:{offer_media.mid}")
# Direction (mirror the offer)
lines.append(f"a={offer_media.direction}")
# RTP/RTCP
lines.append("a=rtcp-mux")
# Codec descriptions
for codec in negotiated:
if codec.channels > 1:
lines.append(
f"a=rtpmap:{codec.payload_type} "
f"{codec.name}/{codec.clock_rate}/{codec.channels}"
)
else:
lines.append(
f"a=rtpmap:{codec.payload_type} "
f"{codec.name}/{codec.clock_rate}"
)
if codec.fmtp:
lines.append(f"a=fmtp:{codec.payload_type} {codec.fmtp}")
return lines
def generate_offer(
self,
local_fingerprint: str,
local_ice_ufrag: str,
local_ice_pwd: str,
) -> str:
"""
Generate an SDP offer for outbound calls.
Args:
local_fingerprint: Our DTLS fingerprint
local_ice_ufrag: Our ICE username fragment
local_ice_pwd: Our ICE password
Returns:
SDP offer string
"""
lines = [
"v=0",
f"o=- {int(time.time())} 1 IN IP4 0.0.0.0",
"s=-",
"t=0 0",
"a=group:BUNDLE 0",
"a=msid-semantic: WMS *",
# Audio media section
"m=audio 9 UDP/TLS/RTP/SAVPF 111 0 8",
"c=IN IP4 0.0.0.0",
f"a=ice-ufrag:{local_ice_ufrag}",
f"a=ice-pwd:{local_ice_pwd}",
f"a=fingerprint:{local_fingerprint}",
"a=setup:actpass",
"a=mid:0",
"a=sendrecv",
"a=rtcp-mux",
# Opus
"a=rtpmap:111 opus/48000/2",
"a=fmtp:111 minptime=10;useinbandfec=1",
# G.711 μ-law
"a=rtpmap:0 PCMU/8000",
# G.711 A-law
"a=rtpmap:8 PCMA/8000",
]
return '\r\n'.join(lines) + '\r\n'
class SDPModifier:
"""
Utilities for modifying existing SDP.
"""
@staticmethod
def add_candidate(sdp: str, candidate: str, mid: str) -> str:
"""Add an ICE candidate to SDP."""
lines = sdp.split('\r\n')
result = []
in_target_media = False
for line in lines:
result.append(line)
if line.startswith(f"a=mid:{mid}"):
in_target_media = True
elif line.startswith("m=") and in_target_media:
in_target_media = False
elif in_target_media and line.startswith("a=") and not line.startswith("a=candidate"):
# Insert candidate before other attributes
pass
# Add candidate at end of media section
result.insert(-1, f"a={candidate}")
return '\r\n'.join(result)
@staticmethod
def set_direction(sdp: str, direction: str) -> str:
"""Set media direction in SDP."""
directions = ["sendrecv", "sendonly", "recvonly", "inactive"]
for d in directions:
sdp = sdp.replace(f"a={d}", f"a={direction}")
return sdp
@staticmethod
def remove_video(sdp: str) -> str:
"""Remove video media from SDP."""
lines = sdp.split('\r\n')
result = []
skip_until_next_media = False
for line in lines:
if line.startswith("m=video"):
skip_until_next_media = True
elif line.startswith("m=") and skip_until_next_media:
skip_until_next_media = False
result.append(line)
elif not skip_until_next_media:
result.append(line)
return '\r\n'.join(result)
4.5 SDP Examples \
# Example GoToConnect SDP Offer
v=0
o=- 7058047285456728323 2 IN IP4 127.0.0.1
s=-
t=0 0
a=group:BUNDLE 0
a=msid-semantic: WMS stream
m=audio 9 UDP/TLS/RTP/SAVPF 111 0 8
c=IN IP4 0.0.0.0
a=rtcp:9 IN IP4 0.0.0.0
a=ice-ufrag:abc123
a=ice-pwd:supersecretpassword12345678
a=fingerprint:sha-256 AB:CD:EF:12:34:56:78:90...
a=setup:actpass
a=mid:0
a=sendrecv
a=rtcp-mux
a=rtpmap:111 opus/48000/2
a=fmtp:111 minptime=10;useinbandfec=1
a=rtpmap:0 PCMU/8000
a=rtpmap:8 PCMA/8000
a=candidate:1 1 UDP 2122262783 192.168.1.100 54321 typ host
a=candidate:2 1 UDP 1686052863 203.0.113.50 54322 typ srflx raddr 192.168.1.100 rport 54321
# Example Bridge SDP Answer
v=0
o=- 1705408234 1 IN IP4 0.0.0.0
s=-
t=0 0
a=group:BUNDLE 0
a=msid-semantic: WMS *
m=audio 9 UDP/TLS/RTP/SAVPF 111 0
c=IN IP4 0.0.0.0
a=ice-ufrag:xyz789
a=ice-pwd:ouricecredentials12345678
a=fingerprint:sha-256 12:34:56:78:90:AB:CD:EF...
a=setup:active
a=mid:0
a=sendrecv
a=rtcp-mux
a=rtpmap:111 opus/48000/2
a=fmtp:111 minptime=10;useinbandfec=1
a=rtpmap:0 PCMU/8000
- ICE Candidate Handling \
5.1 ICE Overview \
Interactive Connectivity Establishment (ICE) finds the best path for media between peers:┌─────────────────────────────────────────────────────────────────────────────┐
│ ICE CANDIDATE TYPES │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ TYPE PRIORITY DESCRIPTION │
│ ──────────────────────────────────────────────────────────────────── │
│ │
│ host Highest Direct local IP address │
│ • 192.168.1.100:54321 │
│ • Best latency, works on same network │
│ │
│ srflx Medium Server Reflexive (STUN-discovered public IP) │
│ • 203.0.113.50:54322 │
│ • Works across most NATs │
│ │
│ prflx Medium Peer Reflexive (discovered during ICE) │
│ • Dynamic discovery │
│ • Found during connectivity checks │
│ │
│ relay Lowest TURN relay server │
│ • turn.example.com:443 │
│ • Always works, but adds latency │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
5.2 ICE Connection Process \
┌─────────────────────────────────────────────────────────────────────────────┐
│ ICE CONNECTION PROCESS │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ GATHERING PHASE: │
│ ───────────────── │
│ │
│ 1. Collect host candidates (local interfaces) │
│ 2. Query STUN servers for srflx candidates │
│ 3. Allocate TURN relay candidates (if configured) │
│ 4. Send candidates to remote peer (trickle ICE) │
│ │
│ CONNECTIVITY CHECKING: │
│ ────────────────────── │
│ │
│ For each local/remote candidate pair: │
│ 1. Send STUN binding request │
│ 2. Await response │
│ 3. Calculate round-trip time │
│ 4. Mark pair as succeeded/failed │
│ │
│ NOMINATION: │
│ ─────────── │
│ │
│ 1. Rank successful pairs by priority │
│ 2. Controlling agent nominates best pair │
│ 3. Both sides switch to nominated pair │
│ │
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ new │───────▶│gathering│───────▶│checking │ │
│ └─────────┘ └─────────┘ └────┬────┘ │
│ │ │
│ ┌───────────────────────┼───────────────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌───────────┐ ┌───────────┐ ┌─────────┐ │
│ │ connected │ │disconnected│ │ failed │ │
│ └───────────┘ └───────────┘ └─────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
5.3 ICE Manager Implementation \
# bridge/webrtc/ice_manager.py
import asyncio
from dataclasses import dataclass
from typing import Optional, List, Callable
from enum import Enum
class ICEGatheringState(Enum):
NEW = "new"
GATHERING = "gathering"
COMPLETE = "complete"
class ICEConnectionState(Enum):
NEW = "new"
CHECKING = "checking"
CONNECTED = "connected"
COMPLETED = "completed"
DISCONNECTED = "disconnected"
FAILED = "failed"
CLOSED = "closed"
@dataclass
class ICECandidate:
"""Parsed ICE candidate."""
foundation: str
component: int
protocol: str
priority: int
ip: str
port: int
type: str # host, srflx, prflx, relay
related_address: Optional[str] = None
related_port: Optional[int] = None
tcp_type: Optional[str] = None
@classmethod
def parse(cls, candidate_str: str) -> "ICECandidate":
"""Parse ICE candidate string."""
# candidate:foundation component protocol priority ip port typ type [extensions]
parts = candidate_str.split()
if parts[0].startswith("candidate:"):
parts[0] = parts[0][10:]
candidate = cls(
foundation=parts[0],
component=int(parts[1]),
protocol=parts[2].upper(),
priority=int(parts[3]),
ip=parts[4],
port=int(parts[5]),
type=parts[7],
)
# Parse extensions
i = 8
while i < len(parts) - 1:
if parts[i] == "raddr":
candidate.related_address = parts[i + 1]
elif parts[i] == "rport":
candidate.related_port = int(parts[i + 1])
elif parts[i] == "tcptype":
candidate.tcp_type = parts[i + 1]
i += 2
return candidate
def to_string(self) -> str:
"""Convert to SDP candidate string."""
s = (
f"candidate:{self.foundation} {self.component} "
f"{self.protocol} {self.priority} "
f"{self.ip} {self.port} typ {self.type}"
)
if self.related_address:
s += f" raddr {self.related_address}"
if self.related_port:
s += f" rport {self.related_port}"
if self.tcp_type:
s += f" tcptype {self.tcp_type}"
return s
class ICEManager:
"""
Manages ICE candidate gathering and connectivity.
Handles trickle ICE with GoToConnect, ensuring
candidates are exchanged efficiently.
"""
def __init__(
self,
call_id: str,
goto_client: "GoToCallControlClient",
):
self.call_id = call_id
self.goto_client = goto_client
# State
self._gathering_state = ICEGatheringState.NEW
self._connection_state = ICEConnectionState.NEW
# Candidates
self._local_candidates: List[ICECandidate] = []
self._remote_candidates: List[ICECandidate] = []
self._pending_remote: List[dict] = []
# Event handlers
self.on_gathering_state_change: Optional[Callable] = None
self.on_connection_state_change: Optional[Callable] = None
self.on_candidate: Optional[Callable] = None
# Candidate buffering for trickle ICE
self._candidate_buffer: List[dict] = []
self._buffer_timer: Optional[asyncio.Task] = None
self._buffer_delay = 0.1 # 100ms batching
async def handle_local_candidate(
self,
candidate: "RTCIceCandidate",
) -> None:
"""
Handle a locally gathered ICE candidate.
Buffers candidates briefly to batch them for
more efficient signaling.
"""
if candidate is None:
# Gathering complete
await self._flush_candidates()
self._gathering_state = ICEGatheringState.COMPLETE
if self.on_gathering_state_change:
await self.on_gathering_state_change(self._gathering_state)
return
parsed = ICECandidate.parse(candidate.candidate)
self._local_candidates.append(parsed)
# Buffer candidate
self._candidate_buffer.append({
"candidate": candidate.candidate,
"sdpMid": candidate.sdpMid,
"sdpMLineIndex": candidate.sdpMLineIndex,
})
# Start/reset buffer timer
if self._buffer_timer:
self._buffer_timer.cancel()
self._buffer_timer = asyncio.create_task(
self._flush_candidates_after_delay()
)
logger.debug(f"[{self.call_id}] Local candidate: {parsed.type} {parsed.ip}:{parsed.port}")
async def _flush_candidates_after_delay(self) -> None:
"""Flush buffered candidates after delay."""
await asyncio.sleep(self._buffer_delay)
await self._flush_candidates()
async def _flush_candidates(self) -> None:
"""Send buffered candidates to GoToConnect."""
if not self._candidate_buffer:
return
candidates = self._candidate_buffer.copy()
self._candidate_buffer.clear()
try:
for candidate in candidates:
await self.goto_client.send_ice_candidate(
call_id=self.call_id,
candidate=candidate["candidate"],
sdp_mid=candidate["sdpMid"],
sdp_mline_index=candidate["sdpMLineIndex"],
)
logger.info(f"[{self.call_id}] Sent {len(candidates)} ICE candidates")
except Exception as e:
logger.error(f"[{self.call_id}] Failed to send ICE candidates: {e}")
async def handle_remote_candidate(
self,
candidate_data: dict,
pc: "RTCPeerConnection",
) -> None:
"""
Handle a remote ICE candidate from GoToConnect.
Args:
candidate_data: Candidate data from event
pc: PeerConnection to add candidate to
"""
candidate_str = candidate_data.get("candidate")
if not candidate_str:
# End of candidates
logger.info(f"[{self.call_id}] Remote ICE gathering complete")
return
try:
parsed = ICECandidate.parse(candidate_str)
self._remote_candidates.append(parsed)
# Add to peer connection
ice_candidate = RTCIceCandidate(
candidate=candidate_str,
sdpMid=candidate_data.get("sdpMid", "0"),
sdpMLineIndex=candidate_data.get("sdpMLineIndex", 0),
)
await pc.addIceCandidate(ice_candidate)
logger.debug(
f"[{self.call_id}] Remote candidate: "
f"{parsed.type} {parsed.ip}:{parsed.port}"
)
except Exception as e:
logger.error(f"[{self.call_id}] Failed to add remote candidate: {e}")
def update_connection_state(self, state: str) -> None:
"""Update ICE connection state."""
try:
self._connection_state = ICEConnectionState(state)
except ValueError:
self._connection_state = ICEConnectionState.NEW
logger.info(f"[{self.call_id}] ICE connection state: {state}")
if self.on_connection_state_change:
asyncio.create_task(
self.on_connection_state_change(self._connection_state)
)
@property
def is_connected(self) -> bool:
"""Check if ICE is connected."""
return self._connection_state in (
ICEConnectionState.CONNECTED,
ICEConnectionState.COMPLETED,
)
@property
def local_candidates(self) -> List[ICECandidate]:
"""Get gathered local candidates."""
return self._local_candidates.copy()
@property
def remote_candidates(self) -> List[ICECandidate]:
"""Get received remote candidates."""
return self._remote_candidates.copy()
class ICERestartManager:
"""
Handles ICE restart scenarios.
ICE restart is needed when connectivity is lost
but the call should continue.
"""
def __init__(
self,
ice_manager: ICEManager,
pc: "RTCPeerConnection",
):
self.ice_manager = ice_manager
self.pc = pc
self._restart_count = 0
self._max_restarts = 3
self._restart_cooldown = 5.0 # seconds
self._last_restart: Optional[float] = None
async def check_and_restart(self) -> bool:
"""
Check if ICE restart is needed and perform it.
Returns:
True if restart was performed
"""
if not self._should_restart():
return False
if self._restart_count >= self._max_restarts:
logger.warning(
f"[{self.ice_manager.call_id}] "
f"Max ICE restarts ({self._max_restarts}) reached"
)
return False
# Check cooldown
if self._last_restart:
elapsed = time.time() - self._last_restart
if elapsed < self._restart_cooldown:
return False
await self._perform_restart()
return True
def _should_restart(self) -> bool:
"""Determine if ICE restart is needed."""
state = self.ice_manager._connection_state
return state in (
ICEConnectionState.DISCONNECTED,
ICEConnectionState.FAILED,
)
async def _perform_restart(self) -> None:
"""Perform ICE restart."""
logger.info(f"[{self.ice_manager.call_id}] Performing ICE restart")
self._restart_count += 1
self._last_restart = time.time()
# Create new offer with ice-restart
offer = await self.pc.createOffer(iceRestart=True)
await self.pc.setLocalDescription(offer)
# Send to GoToConnect
# (Implementation depends on GoTo API)
logger.info(
f"[{self.ice_manager.call_id}] "
f"ICE restart initiated (attempt {self._restart_count})"
)
5.4 Trickle ICE Flow \
┌─────────────────────────────────────────────────────────────────────────────┐
│ TRICKLE ICE FLOW │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ Bridge GoToConnect │
│ │ │ │
│ │ SDP Offer (no candidates) │ │
│ │────────────────────────────────────▶│ │
│ │ │ │
│ │ SDP Answer (no candidates) │ │
│ │◀────────────────────────────────────│ │
│ │ │ │
│ │ │ Start gathering │
│ │ Start gathering │ │
│ │ │ │
│ │ candidate (host) │ │
│ │────────────────────────────────────▶│ │
│ │ │ │
│ │ candidate (host) │ │
│ │◀────────────────────────────────────│ │
│ │ │ │
│ │ candidate (srflx) │ │
│ │────────────────────────────────────▶│ │
│ │ │ │
│ │ candidate (srflx) │ │
│ │◀────────────────────────────────────│ │
│ │ │ │
│ │ end-of-candidates │ │
│ │────────────────────────────────────▶│ │
│ │ │ │
│ │ end-of-candidates │ │
│ │◀────────────────────────────────────│ │
│ │ │ │
│ │ ═══════ ICE Connected ═══════════ │ │
│ │ │ │
│ │ ═══════ Media Flowing ═══════════ │ │
│ │ │ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
- Audio Frame Processing \
6.1 Audio Frame Fundamentals \
WebRTC audio is transmitted as discrete frames:┌─────────────────────────────────────────────────────────────────────────────┐
│ AUDIO FRAME STRUCTURE │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ FRAME PARAMETERS: │
│ ───────────────── │
│ │
│ Sample Rate: 8000 Hz (G.711) / 48000 Hz (Opus) │
│ Bit Depth: 16-bit signed integer (PCM) │
│ Channels: 1 (mono) or 2 (stereo) │
│ Frame Duration: 10ms, 20ms, 40ms, or 60ms │
│ │
│ SAMPLES PER FRAME: │
│ ────────────────── │
│ │
│ │ Duration │ 8kHz │ 16kHz │ 48kHz │ │
│ │──────────│───────│───────│───────│ │
│ │ 10ms │ 80 │ 160 │ 480 │ │
│ │ 20ms │ 160 │ 320 │ 960 │ ◀── Most common │
│ │ 40ms │ 320 │ 640 │ 1920 │ │
│ │ 60ms │ 480 │ 960 │ 2880 │ │
│ │
│ FRAME SIZE IN BYTES (16-bit mono): │
│ ────────────────────────────────── │
│ │
│ │ Duration │ 8kHz │ 16kHz │ 48kHz │ │
│ │──────────│───────│───────│───────│ │
│ │ 10ms │ 160 │ 320 │ 960 │ │
│ │ 20ms │ 320 │ 640 │ 1920 │ │
│ │ 40ms │ 640 │ 1280 │ 3840 │ │
│ │ 60ms │ 960 │ 1920 │ 5760 │ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
6.2 PyAV AudioFrame \
aiortc uses PyAV (FFmpeg bindings) for audio processing:# bridge/audio/frames.py
import av
import numpy as np
from typing import Tuple
import fractions
class AudioFrameProcessor:
"""
Utilities for working with PyAV AudioFrames.
Handles conversion between AudioFrame and numpy arrays,
as well as common audio manipulations.
"""
@staticmethod
def frame_to_numpy(frame: av.AudioFrame) -> np.ndarray:
"""
Convert AudioFrame to numpy array.
Args:
frame: PyAV AudioFrame
Returns:
numpy array of shape (samples, channels)
"""
# Get raw data from frame
# frame.to_ndarray() returns shape (channels, samples)
data = frame.to_ndarray()
# Transpose to (samples, channels)
if data.ndim == 2:
data = data.T
return data
@staticmethod
def numpy_to_frame(
data: np.ndarray,
sample_rate: int,
pts: int = 0,
format: str = "s16",
layout: str = "mono",
) -> av.AudioFrame:
"""
Convert numpy array to AudioFrame.
Args:
data: numpy array (samples,) or (samples, channels)
sample_rate: Sample rate in Hz
pts: Presentation timestamp
format: Audio format (s16, s32, flt, etc.)
layout: Channel layout (mono, stereo)
Returns:
PyAV AudioFrame
"""
# Ensure 2D array
if data.ndim == 1:
data = data.reshape(-1, 1)
samples = data.shape[0]
channels = data.shape[1]
# Determine layout
if channels == 1:
layout = "mono"
elif channels == 2:
layout = "stereo"
# Create frame
frame = av.AudioFrame(
format=format,
layout=layout,
samples=samples,
)
# Set frame data
# PyAV expects (channels, samples) for plane data
frame.planes[0].update(data.T.tobytes())
frame.sample_rate = sample_rate
frame.pts = pts
frame.time_base = fractions.Fraction(1, sample_rate)
return frame
@staticmethod
def get_frame_info(frame: av.AudioFrame) -> dict:
"""Get information about an audio frame."""
return {
"samples": frame.samples,
"sample_rate": frame.sample_rate,
"channels": len(frame.layout.channels),
"format": frame.format.name,
"pts": frame.pts,
"duration_ms": (frame.samples / frame.sample_rate) * 1000,
"size_bytes": sum(len(p) for p in frame.planes),
}
@staticmethod
def normalize_frame(
frame: av.AudioFrame,
target_format: str = "s16",
target_layout: str = "mono",
target_rate: int = None,
) -> av.AudioFrame:
"""
Normalize frame to target format.
Args:
frame: Input frame
target_format: Target sample format
target_layout: Target channel layout
target_rate: Target sample rate (None to keep original)
Returns:
Normalized frame
"""
resampler = av.AudioResampler(
format=target_format,
layout=target_layout,
rate=target_rate or frame.sample_rate,
)
return resampler.resample(frame)[0]
class AudioBuffer:
"""
Ring buffer for audio frames.
Provides smooth audio flow by buffering frames
and handling timing variations.
"""
def __init__(
self,
max_duration_ms: float = 500,
sample_rate: int = 48000,
channels: int = 1,
):
self.sample_rate = sample_rate
self.channels = channels
# Calculate buffer size
max_samples = int(sample_rate * max_duration_ms / 1000)
self._buffer = np.zeros((max_samples, channels), dtype=np.int16)
self._write_pos = 0
self._read_pos = 0
self._available = 0
self._lock = asyncio.Lock()
async def write(self, data: np.ndarray) -> int:
"""
Write audio data to buffer.
Args:
data: Audio samples (samples, channels)
Returns:
Number of samples written
"""
async with self._lock:
samples = data.shape[0]
buffer_size = self._buffer.shape[0]
# Check available space
space = buffer_size - self._available
if samples > space:
# Buffer full - drop oldest data
drop = samples - space
self._read_pos = (self._read_pos + drop) % buffer_size
self._available -= drop
# Write data
end_pos = self._write_pos + samples
if end_pos <= buffer_size:
self._buffer[self._write_pos:end_pos] = data
else:
# Wrap around
first_part = buffer_size - self._write_pos
self._buffer[self._write_pos:] = data[:first_part]
self._buffer[:end_pos - buffer_size] = data[first_part:]
self._write_pos = end_pos % buffer_size
self._available += samples
return samples
async def read(self, samples: int) -> np.ndarray:
"""
Read audio data from buffer.
Args:
samples: Number of samples to read
Returns:
Audio data or silence if not enough available
"""
async with self._lock:
buffer_size = self._buffer.shape[0]
if self._available < samples:
# Not enough data - return silence
return np.zeros((samples, self.channels), dtype=np.int16)
# Read data
end_pos = self._read_pos + samples
if end_pos <= buffer_size:
data = self._buffer[self._read_pos:end_pos].copy()
else:
# Wrap around
first_part = buffer_size - self._read_pos
data = np.concatenate([
self._buffer[self._read_pos:],
self._buffer[:end_pos - buffer_size],
])
self._read_pos = end_pos % buffer_size
self._available -= samples
return data
@property
def available_samples(self) -> int:
"""Number of samples available to read."""
return self._available
@property
def available_ms(self) -> float:
"""Duration of audio available in milliseconds."""
return (self._available / self.sample_rate) * 1000
def clear(self) -> None:
"""Clear the buffer."""
self._write_pos = 0
self._read_pos = 0
self._available = 0
6.3 Audio Resampling \
# bridge/audio/resampler.py
import numpy as np
from typing import Optional
import av
class AudioResampler:
"""
High-quality audio resampling.
Converts between different sample rates while
maintaining audio quality using PyAV's resampler.
"""
def __init__(
self,
input_rate: int,
output_rate: int,
input_channels: int = 1,
output_channels: int = 1,
input_format: str = "s16",
output_format: str = "s16",
):
self.input_rate = input_rate
self.output_rate = output_rate
self.input_channels = input_channels
self.output_channels = output_channels
self.input_format = input_format
self.output_format = output_format
# Determine layouts
in_layout = "mono" if input_channels == 1 else "stereo"
out_layout = "mono" if output_channels == 1 else "stereo"
# Create resampler
self._resampler = av.AudioResampler(
format=output_format,
layout=out_layout,
rate=output_rate,
)
# For numpy-based fallback
self._ratio = output_rate / input_rate
def resample_frame(self, frame: av.AudioFrame) -> av.AudioFrame:
"""
Resample an AudioFrame.
Args:
frame: Input frame at input_rate
Returns:
Resampled frame at output_rate
"""
frames = self._resampler.resample(frame)
if frames:
return frames[0]
return None
def resample_numpy(self, data: np.ndarray) -> np.ndarray:
"""
Resample numpy audio data.
Args:
data: Input samples (samples,) or (samples, channels)
Returns:
Resampled samples
"""
if self.input_rate == self.output_rate:
return data
# Ensure 2D
if data.ndim == 1:
data = data.reshape(-1, 1)
input_samples = data.shape[0]
output_samples = int(input_samples * self._ratio)
# Simple linear interpolation
# For production, consider using scipy.signal.resample
indices = np.linspace(0, input_samples - 1, output_samples)
output = np.zeros((output_samples, data.shape[1]), dtype=data.dtype)
for ch in range(data.shape[1]):
output[:, ch] = np.interp(
indices,
np.arange(input_samples),
data[:, ch],
).astype(data.dtype)
return output
def flush(self) -> Optional[av.AudioFrame]:
"""Flush any remaining samples from resampler."""
frames = self._resampler.resample(None)
if frames:
return frames[0]
return None
class MultiRateResampler:
"""
Manages multiple resamplers for common conversions.
Pre-creates resamplers for frequently used
rate conversions to reduce allocation overhead.
"""
COMMON_RATES = [8000, 16000, 24000, 48000]
def __init__(self):
self._resamplers: dict = {}
def get_resampler(
self,
input_rate: int,
output_rate: int,
channels: int = 1,
) -> AudioResampler:
"""Get or create a resampler for the given conversion."""
key = (input_rate, output_rate, channels)
if key not in self._resamplers:
self._resamplers[key] = AudioResampler(
input_rate=input_rate,
output_rate=output_rate,
input_channels=channels,
output_channels=channels,
)
return self._resamplers[key]
def resample(
self,
data: np.ndarray,
input_rate: int,
output_rate: int,
) -> np.ndarray:
"""
Resample audio data.
Args:
data: Input samples
input_rate: Input sample rate
output_rate: Output sample rate
Returns:
Resampled samples
"""
if input_rate == output_rate:
return data
channels = data.shape[1] if data.ndim == 2 else 1
resampler = self.get_resampler(input_rate, output_rate, channels)
return resampler.resample_numpy(data)
6.4 Audio Format Conversion \
# bridge/audio/converter.py
import numpy as np
from typing import Union
class AudioConverter:
"""
Convert between audio formats.
Handles conversion between different bit depths,
channel counts, and numeric representations.
"""
@staticmethod
def int16_to_float32(data: np.ndarray) -> np.ndarray:
"""Convert int16 to float32 (-1.0 to 1.0)."""
return data.astype(np.float32) / 32768.0
@staticmethod
def float32_to_int16(data: np.ndarray) -> np.ndarray:
"""Convert float32 (-1.0 to 1.0) to int16."""
return (np.clip(data, -1.0, 1.0) * 32767).astype(np.int16)
@staticmethod
def stereo_to_mono(data: np.ndarray) -> np.ndarray:
"""Convert stereo to mono by averaging channels."""
if data.ndim == 1:
return data
if data.shape[1] == 1:
return data[:, 0]
# Average channels
return ((data[:, 0].astype(np.int32) + data[:, 1]) // 2).astype(data.dtype)
@staticmethod
def mono_to_stereo(data: np.ndarray) -> np.ndarray:
"""Convert mono to stereo by duplicating channel."""
if data.ndim == 1:
data = data.reshape(-1, 1)
if data.shape[1] == 2:
return data
return np.column_stack([data[:, 0], data[:, 0]])
@staticmethod
def ulaw_to_linear(data: np.ndarray) -> np.ndarray:
"""
Convert μ-law encoded data to linear PCM.
G.711 μ-law is common in telephony.
"""
BIAS = 0x84
CLIP = 8159
# Ensure uint8
data = data.astype(np.uint8)
# Invert all bits
data = ~data
sign = data & 0x80
exponent = (data >> 4) & 0x07
mantissa = data & 0x0F
# Decode
linear = (mantissa << 4) + BIAS
linear = linear << (exponent - 1) if exponent > 0 else linear >> 1
# Apply sign
linear = np.where(sign != 0, -linear, linear)
return linear.astype(np.int16)
@staticmethod
def linear_to_ulaw(data: np.ndarray) -> np.ndarray:
"""
Convert linear PCM to μ-law encoded data.
"""
BIAS = 0x84
CLIP = 8159
MAX = 0x7F
# Ensure int16
data = data.astype(np.int16)
# Get sign
sign = (data < 0).astype(np.uint8) << 7
# Get absolute value with bias
data = np.abs(data)
data = np.clip(data, 0, CLIP)
data = data + BIAS
# Find exponent and mantissa
exponent = np.floor(np.log2(data)).astype(np.uint8)
exponent = np.clip(exponent, 0, 7)
mantissa = (data >> (exponent + 3)) & 0x0F
# Combine
ulaw = sign | (exponent << 4) | mantissa
# Invert
return (~ulaw & 0xFF).astype(np.uint8)
@staticmethod
def alaw_to_linear(data: np.ndarray) -> np.ndarray:
"""
Convert A-law encoded data to linear PCM.
G.711 A-law is common in European telephony.
"""
# Implementation similar to μ-law
data = data.astype(np.uint8) ^ 0x55
sign = data & 0x80
exponent = (data >> 4) & 0x07
mantissa = data & 0x0F
if exponent == 0:
linear = (mantissa << 4) + 8
else:
linear = ((mantissa << 4) + 0x108) << (exponent - 1)
linear = np.where(sign != 0, -linear, linear)
return linear.astype(np.int16)
@staticmethod
def linear_to_alaw(data: np.ndarray) -> np.ndarray:
"""Convert linear PCM to A-law encoded data."""
# Implementation similar to μ-law
data = data.astype(np.int16)
sign = (data < 0).astype(np.uint8) << 7
data = np.abs(data)
# Find exponent
exponent = np.zeros_like(data, dtype=np.uint8)
for i in range(7, 0, -1):
mask = data >= (0x100 << (i - 1))
exponent = np.where(mask & (exponent == 0), i, exponent)
if exponent == 0:
mantissa = (data >> 4) & 0x0F
else:
mantissa = (data >> (exponent + 3)) & 0x0F
alaw = sign | (exponent << 4) | mantissa
return (alaw ^ 0x55).astype(np.uint8)
- Codec Management \
7.1 Supported Codecs \
┌─────────────────────────────────────────────────────────────────────────────┐
│ SUPPORTED AUDIO CODECS │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ CODEC PT RATE CH BITRATE LATENCY USE CASE │
│ ───────────────────────────────────────────────────────────────────── │
│ │
│ Opus 111 48000 2 6-510 kbps 2.5-60ms ◀── Preferred │
│ ├── Low latency mode: 2.5ms frames │
│ ├── Built-in FEC for packet loss │
│ ├── VBR adapts to content │
│ └── Excellent for voice & music │
│ │
│ PCMU 0 8000 1 64 kbps 0.125ms G.711 μ-law │
│ ├── Universal telephony support │
│ ├── No compression delay │
│ └── Higher bandwidth but simpler │
│ │
│ PCMA 8 8000 1 64 kbps 0.125ms G.711 A-law │
│ ├── European telephony standard │
│ └── Same characteristics as PCMU │
│ │
│ G722 9 8000* 1 64 kbps 1.5ms Wideband telephony │
│ ├── *Actually 16kHz audio │
│ └── Better quality than G.711 │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
7.2 Codec Handler Implementation \
# bridge/audio/codecs.py
from abc import ABC, abstractmethod
from typing import Optional
import numpy as np
import av
class AudioCodec(ABC):
"""Base class for audio codecs."""
@property
@abstractmethod
def name(self) -> str:
"""Codec name."""
pass
@property
@abstractmethod
def sample_rate(self) -> int:
"""Native sample rate."""
pass
@property
@abstractmethod
def channels(self) -> int:
"""Number of channels."""
pass
@property
@abstractmethod
def frame_size(self) -> int:
"""Samples per frame."""
pass
@abstractmethod
def encode(self, pcm: np.ndarray) -> bytes:
"""Encode PCM to codec format."""
pass
@abstractmethod
def decode(self, data: bytes) -> np.ndarray:
"""Decode codec format to PCM."""
pass
class OpusCodec(AudioCodec):
"""
Opus codec implementation using PyAV.
Opus is the preferred codec for WebRTC with
excellent quality at low bitrates.
"""
def __init__(
self,
sample_rate: int = 48000,
channels: int = 2,
bitrate: int = 32000,
frame_duration_ms: int = 20,
):
self._sample_rate = sample_rate
self._channels = channels
self._bitrate = bitrate
self._frame_duration_ms = frame_duration_ms
self._frame_size = int(sample_rate * frame_duration_ms / 1000)
# Create encoder
self._encoder = av.CodecContext.create("opus", "w")
self._encoder.sample_rate = sample_rate
self._encoder.channels = channels
self._encoder.bit_rate = bitrate
self._encoder.format = av.AudioFormat("s16")
self._encoder.layout = "stereo" if channels == 2 else "mono"
self._encoder.open()
# Create decoder
self._decoder = av.CodecContext.create("opus", "r")
self._decoder.sample_rate = sample_rate
self._decoder.channels = channels
self._decoder.open()
@property
def name(self) -> str:
return "opus"
@property
def sample_rate(self) -> int:
return self._sample_rate
@property
def channels(self) -> int:
return self._channels
@property
def frame_size(self) -> int:
return self._frame_size
def encode(self, pcm: np.ndarray) -> bytes:
"""Encode PCM to Opus."""
# Create AudioFrame
frame = av.AudioFrame(
format="s16",
layout="stereo" if self._channels == 2 else "mono",
samples=pcm.shape[0],
)
frame.planes[0].update(pcm.tobytes())
frame.sample_rate = self._sample_rate
# Encode
packets = self._encoder.encode(frame)
if packets:
return bytes(packets[0])
return b""
def decode(self, data: bytes) -> np.ndarray:
"""Decode Opus to PCM."""
# Create packet
packet = av.Packet(data)
# Decode
frames = self._decoder.decode(packet)
if frames:
return frames[0].to_ndarray().T.astype(np.int16)
return np.zeros((self._frame_size, self._channels), dtype=np.int16)
class G711Codec(AudioCodec):
"""
G.711 codec implementation (μ-law and A-law).
G.711 is universal in telephony with zero
compression delay.
"""
def __init__(
self,
law: str = "ulaw", # "ulaw" or "alaw"
frame_duration_ms: int = 20,
):
self._law = law
self._frame_duration_ms = frame_duration_ms
self._frame_size = int(8000 * frame_duration_ms / 1000)
@property
def name(self) -> str:
return f"PCM{self._law[0].upper()}"
@property
def sample_rate(self) -> int:
return 8000
@property
def channels(self) -> int:
return 1
@property
def frame_size(self) -> int:
return self._frame_size
def encode(self, pcm: np.ndarray) -> bytes:
"""Encode PCM to G.711."""
# Ensure mono
if pcm.ndim == 2 and pcm.shape[1] > 1:
pcm = AudioConverter.stereo_to_mono(pcm)
if self._law == "ulaw":
encoded = AudioConverter.linear_to_ulaw(pcm)
else:
encoded = AudioConverter.linear_to_alaw(pcm)
return encoded.tobytes()
def decode(self, data: bytes) -> np.ndarray:
"""Decode G.711 to PCM."""
encoded = np.frombuffer(data, dtype=np.uint8)
if self._law == "ulaw":
return AudioConverter.ulaw_to_linear(encoded)
else:
return AudioConverter.alaw_to_linear(encoded)
class CodecManager:
"""
Manages codec instances and selection.
Provides codec negotiation and transcoding
between different codecs.
"""
def __init__(self):
self._codecs: dict[str, AudioCodec] = {}
# Pre-create common codecs
self._codecs["opus"] = OpusCodec()
self._codecs["PCMU"] = G711Codec(law="ulaw")
self._codecs["PCMA"] = G711Codec(law="alaw")
def get_codec(self, name: str) -> Optional[AudioCodec]:
"""Get a codec by name."""
return self._codecs.get(name.upper()) or self._codecs.get(name.lower())
def transcode(
self,
data: bytes,
from_codec: str,
to_codec: str,
) -> bytes:
"""
Transcode audio between codecs.
Args:
data: Encoded audio data
from_codec: Source codec name
to_codec: Target codec name
Returns:
Transcoded audio data
"""
if from_codec == to_codec:
return data
source = self.get_codec(from_codec)
target = self.get_codec(to_codec)
if not source or not target:
raise ValueError(f"Unknown codec: {from_codec} or {to_codec}")
# Decode to PCM
pcm = source.decode(data)
# Resample if needed
if source.sample_rate != target.sample_rate:
resampler = AudioResampler(
input_rate=source.sample_rate,
output_rate=target.sample_rate,
input_channels=source.channels,
output_channels=target.channels,
)
pcm = resampler.resample_numpy(pcm)
# Encode to target
return target.encode(pcm)
def decode_to_pcm(
self,
data: bytes,
codec_name: str,
target_rate: int = 16000,
) -> np.ndarray:
"""
Decode and resample to target PCM format.
This is the common path for incoming audio
destined for the voice pipeline.
"""
codec = self.get_codec(codec_name)
if not codec:
raise ValueError(f"Unknown codec: {codec_name}")
# Decode
pcm = codec.decode(data)
# Resample if needed
if codec.sample_rate != target_rate:
resampler = AudioResampler(
input_rate=codec.sample_rate,
output_rate=target_rate,
input_channels=codec.channels,
output_channels=1, # Always mono for STT
)
pcm = resampler.resample_numpy(pcm)
# Ensure mono
if pcm.ndim == 2 and pcm.shape[1] > 1:
pcm = AudioConverter.stereo_to_mono(pcm)
return pcm
def encode_from_pcm(
self,
pcm: np.ndarray,
source_rate: int,
codec_name: str,
) -> bytes:
"""
Encode PCM to target codec format.
This is the common path for outgoing audio
from the voice pipeline.
"""
codec = self.get_codec(codec_name)
if not codec:
raise ValueError(f"Unknown codec: {codec_name}")
# Resample if needed
if source_rate != codec.sample_rate:
resampler = AudioResampler(
input_rate=source_rate,
output_rate=codec.sample_rate,
input_channels=1,
output_channels=codec.channels,
)
pcm = resampler.resample_numpy(pcm)
# Convert to stereo if needed
if codec.channels == 2 and (pcm.ndim == 1 or pcm.shape[1] == 1):
pcm = AudioConverter.mono_to_stereo(pcm)
return codec.encode(pcm)
7.3 Codec Negotiation Strategy \
┌─────────────────────────────────────────────────────────────────────────────┐
│ CODEC NEGOTIATION STRATEGY │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ PREFERENCE ORDER: │
│ ───────────────── │
│ │
│ 1. Opus/48000/2 - Best quality, lowest bandwidth │
│ 2. Opus/48000/1 - Opus mono │
│ 3. PCMU/8000/1 - Universal fallback │
│ 4. PCMA/8000/1 - European fallback │
│ │
│ SELECTION LOGIC: │
│ ───────────────── │
│ │
│ def select_codec(offered_codecs): │
│ for preferred in PREFERENCE_ORDER: │
│ if preferred in offered_codecs: │
│ return preferred │
│ raise NoCompatibleCodec() │
│ │
│ FALLBACK SCENARIOS: │
│ ─────────────────── │
│ │
│ Scenario 1: GoTo offers Opus + G.711 │
│ → Select Opus (best quality) │
│ │
│ Scenario 2: GoTo offers only G.711 │
│ → Select PCMU (universal) │
│ │
│ Scenario 3: GoTo offers unknown codec │
│ → Reject call or request re-offer with known codecs │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
7.4 Packet Loss Concealment \
# bridge/audio/plc.py
import numpy as np
from typing import Optional
class PacketLossConcealer:
"""
Conceal packet loss in audio streams.
When packets are lost, generate replacement
audio based on previous packets.
"""
def __init__(
self,
sample_rate: int = 48000,
frame_size: int = 960,
max_consecutive_loss: int = 5,
):
self.sample_rate = sample_rate
self.frame_size = frame_size
self.max_consecutive_loss = max_consecutive_loss
# Store last good frame
self._last_frame: Optional[np.ndarray] = None
self._consecutive_lost = 0
def process(
self,
frame: Optional[np.ndarray],
is_lost: bool = False,
) -> np.ndarray:
"""
Process a frame, concealing if lost.
Args:
frame: Audio frame or None if lost
is_lost: Whether this frame was lost
Returns:
Original or concealed frame
"""
if is_lost or frame is None:
return self._conceal()
self._last_frame = frame.copy()
self._consecutive_lost = 0
return frame
def _conceal(self) -> np.ndarray:
"""Generate concealment audio."""
self._consecutive_lost += 1
if self._last_frame is None:
# No reference - return silence
return np.zeros(self.frame_size, dtype=np.int16)
if self._consecutive_lost > self.max_consecutive_loss:
# Too many lost - fade to silence
fade_factor = max(0, 1 - (self._consecutive_lost - self.max_consecutive_loss) * 0.2)
return (self._last_frame * fade_factor).astype(np.int16)
# Repeat last frame with slight attenuation
attenuation = 0.9 ** self._consecutive_lost
return (self._last_frame * attenuation).astype(np.int16)
def reset(self) -> None:
"""Reset concealer state."""
self._last_frame = None
self._consecutive_lost = 0
- GoToConnect Integration \
8.1 GoTo WebRTC Flow \
┌─────────────────────────────────────────────────────────────────────────────┐
│ GOTOCONNECT WEBRTC FLOW │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ INBOUND CALL: │
│ ───────────── │
│ │
│ 1. Caller dials phone number │
│ 2. GoTo receives call on DID │
│ 3. GoTo sends webhook: call.ringing │
│ 4. Bridge answers via API: POST /calls/{id}/answer │
│ 5. GoTo returns SDP offer in response │
│ 6. Bridge creates aiortc PeerConnection │
│ 7. Bridge sets remote description (offer) │
│ 8. Bridge creates answer │
│ 9. Bridge sends answer via API │
│ 10. ICE candidates exchanged via events │
│ 11. DTLS handshake completes │
│ 12. SRTP media flows │
│ │
│ OUTBOUND CALL: │
│ ────────────── │
│ │
│ 1. Bridge creates aiortc PeerConnection │
│ 2. Bridge adds local audio track │
│ 3. Bridge creates SDP offer │
│ 4. Bridge initiates call: POST /calls with offer │
│ 5. GoTo processes call, returns answer │
│ 6. Bridge sets remote description (answer) │
│ 7. ICE candidates exchanged │
│ 8. Media flows when callee answers │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
8.2 GoTo Connection Handler \
# bridge/goto/connection_handler.py
import asyncio
from dataclasses import dataclass
from typing import Optional, Callable
from enum import Enum
from bridge.webrtc.connection import WebRTCConnection, WebRTCConfig
from bridge.webrtc.sdp_negotiator import SDPNegotiator, ParsedSDP
from bridge.webrtc.ice_manager import ICEManager
from bridge.webrtc.tracks import AudioTrackSource, AudioTrackSink
from bridge.audio.codecs import CodecManager
class GoToCallState(Enum):
INITIALIZING = "initializing"
OFFERING = "offering"
ANSWERING = "answering"
CONNECTING = "connecting"
CONNECTED = "connected"
DISCONNECTED = "disconnected"
FAILED = "failed"
ENDED = "ended"
@dataclass
class GoToCallInfo:
"""Information about a GoToConnect call."""
call_id: str
external_call_id: str
direction: str # inbound, outbound
caller_number: str
callee_number: str
line_id: str
started_at: float
class GoToConnectionHandler:
"""
Manages WebRTC connection to GoToConnect.
Handles the complete lifecycle of a call including
signaling, media setup, and teardown.
"""
def __init__(
self,
call_info: GoToCallInfo,
goto_client: "GoToCallControlClient",
event_listener: "GoToEventListener",
webrtc_config: WebRTCConfig = None,
):
self.call_info = call_info
self.goto_client = goto_client
self.event_listener = event_listener
# WebRTC components
self._webrtc = WebRTCConnection(
config=webrtc_config,
call_id=call_info.call_id,
)
self._sdp_negotiator = SDPNegotiator()
self._ice_manager = ICEManager(
call_id=call_info.call_id,
goto_client=goto_client,
)
self._codec_manager = CodecManager()
# Audio tracks
self._local_audio: Optional[AudioTrackSource] = None
self._remote_audio: Optional[AudioTrackSink] = None
# State
self._state = GoToCallState.INITIALIZING
self._negotiated_codec: Optional[str] = None
self._parsed_remote_sdp: Optional[ParsedSDP] = None
# Callbacks
self.on_state_change: Optional[Callable] = None
self.on_audio_frame: Optional[Callable] = None
self.on_connected: Optional[Callable] = None
self.on_disconnected: Optional[Callable] = None
async def initialize(self) -> None:
"""Initialize the connection handler."""
await self._webrtc.initialize()
# Set up WebRTC event handlers
self._webrtc.on_track = self._handle_remote_track
self._webrtc.on_ice_candidate = self._handle_local_ice_candidate
self._webrtc.on_connection_state_change = self._handle_connection_state
# Set up ICE manager handlers
self._ice_manager.on_connection_state_change = self._handle_ice_state
# Subscribe to GoTo events for this call
await self.event_listener.subscribe_call_events(
call_id=self.call_info.external_call_id,
on_ice_candidate=self._handle_remote_ice_candidate,
)
# Create local audio track
self._local_audio = AudioTrackSource(
sample_rate=48000,
channels=1,
samples_per_frame=960,
)
await self._webrtc.add_track(self._local_audio)
self._set_state(GoToCallState.INITIALIZING)
logger.info(f"[{self.call_info.call_id}] GoTo connection initialized")
async def handle_inbound_call(self, sdp_offer: str) -> str:
"""
Handle an inbound call from GoToConnect.
Args:
sdp_offer: SDP offer from GoToConnect
Returns:
SDP answer to send back
"""
self._set_state(GoToCallState.ANSWERING)
# Parse the offer
self._parsed_remote_sdp = self._sdp_negotiator.parse_sdp(sdp_offer)
# Find audio codecs
audio_media = next(
(m for m in self._parsed_remote_sdp.media if m.media_type == "audio"),
None,
)
if not audio_media:
raise ValueError("No audio in SDP offer")
# Negotiate codec
negotiated = self._sdp_negotiator.negotiate_codecs(audio_media.codecs)
if not negotiated:
raise ValueError("No compatible audio codecs")
self._negotiated_codec = negotiated[0].name
logger.info(f"[{self.call_info.call_id}] Negotiated codec: {self._negotiated_codec}")
# Set remote description
await self._webrtc.set_remote_description(sdp_offer, "offer")
# Create answer
answer_sdp = await self._webrtc.create_answer()
self._set_state(GoToCallState.CONNECTING)
return answer_sdp
async def initiate_outbound_call(
self,
target: str,
) -> None:
"""
Initiate an outbound call via GoToConnect.
Args:
target: Dial string (phone number or extension)
"""
self._set_state(GoToCallState.OFFERING)
# Create offer
offer_sdp = await self._webrtc.create_offer()
# Initiate call via GoTo API
response = await self.goto_client.create_call(
line_id=self.call_info.line_id,
dial_string=f"tel:{target}",
sdp_offer=offer_sdp,
)
# Update call info with external ID
self.call_info.external_call_id = response["callId"]
# Handle answer
answer_sdp = response.get("sdp")
if answer_sdp:
await self._handle_answer(answer_sdp)
async def _handle_answer(self, sdp_answer: str) -> None:
"""Handle SDP answer from GoToConnect."""
self._parsed_remote_sdp = self._sdp_negotiator.parse_sdp(sdp_answer)
# Find negotiated codec
audio_media = next(
(m for m in self._parsed_remote_sdp.media if m.media_type == "audio"),
None,
)
if audio_media and audio_media.codecs:
self._negotiated_codec = audio_media.codecs[0].name
await self._webrtc.set_remote_description(sdp_answer, "answer")
self._set_state(GoToCallState.CONNECTING)
async def _handle_remote_track(self, track: "MediaStreamTrack") -> None:
"""Handle incoming remote audio track."""
if track.kind != "audio":
return
logger.info(f"[{self.call_info.call_id}] Remote audio track received")
# Wrap track in sink
self._remote_audio = AudioTrackSink(
track=track,
on_frame=self._handle_audio_frame,
)
# Start receiving frames
asyncio.create_task(self._receive_audio_loop())
async def _receive_audio_loop(self) -> None:
"""Continuously receive audio frames from remote track."""
while self._state in (GoToCallState.CONNECTING, GoToCallState.CONNECTED):
try:
frame = await self._remote_audio.recv()
if self.on_audio_frame:
# Convert frame to numpy and decode if needed
await self.on_audio_frame(frame)
except Exception as e:
if "Connection" in str(e):
break
logger.error(f"[{self.call_info.call_id}] Error receiving audio: {e}")
async def _handle_audio_frame(self, frame: "AudioFrame") -> None:
"""Process incoming audio frame."""
if self.on_audio_frame:
await self.on_audio_frame(frame)
async def _handle_local_ice_candidate(
self,
candidate: "RTCIceCandidate",
) -> None:
"""Handle locally gathered ICE candidate."""
await self._ice_manager.handle_local_candidate(candidate)
async def _handle_remote_ice_candidate(
self,
candidate_data: dict,
) -> None:
"""Handle remote ICE candidate from GoToConnect."""
await self._ice_manager.handle_remote_candidate(
candidate_data,
self._webrtc._pc,
)
async def _handle_connection_state(self, state: str) -> None:
"""Handle WebRTC connection state change."""
if state == "connected":
self._set_state(GoToCallState.CONNECTED)
if self.on_connected:
await self.on_connected()
elif state == "disconnected":
self._set_state(GoToCallState.DISCONNECTED)
elif state == "failed":
self._set_state(GoToCallState.FAILED)
if self.on_disconnected:
await self.on_disconnected()
elif state == "closed":
self._set_state(GoToCallState.ENDED)
async def _handle_ice_state(self, state: "ICEConnectionState") -> None:
"""Handle ICE connection state change."""
self._ice_manager.update_connection_state(state.value)
async def send_audio(self, audio_data: "np.ndarray") -> None:
"""
Send audio to GoToConnect.
Args:
audio_data: PCM audio samples to send
"""
if self._local_audio and self._state == GoToCallState.CONNECTED:
await self._local_audio.push_audio(audio_data)
async def hold(self) -> None:
"""Put the call on hold."""
await self.goto_client.hold_call(self.call_info.external_call_id)
async def resume(self) -> None:
"""Resume the call from hold."""
await self.goto_client.resume_call(self.call_info.external_call_id)
async def hangup(self) -> None:
"""End the call."""
self._set_state(GoToCallState.ENDED)
try:
await self.goto_client.hangup_call(self.call_info.external_call_id)
except Exception as e:
logger.warning(f"[{self.call_info.call_id}] Hangup error: {e}")
await self.close()
async def close(self) -> None:
"""Close the connection and clean up resources."""
# Stop audio tracks
if self._local_audio:
self._local_audio.stop()
if self._remote_audio:
self._remote_audio.stop()
# Close WebRTC connection
await self._webrtc.close()
# Unsubscribe from events
await self.event_listener.unsubscribe_call_events(
self.call_info.external_call_id
)
logger.info(f"[{self.call_info.call_id}] GoTo connection closed")
def _set_state(self, state: GoToCallState) -> None:
"""Update call state."""
old_state = self._state
self._state = state
logger.info(
f"[{self.call_info.call_id}] State: {old_state.value} → {state.value}"
)
if self.on_state_change:
asyncio.create_task(self.on_state_change(old_state, state))
@property
def state(self) -> GoToCallState:
"""Current call state."""
return self._state
@property
def is_connected(self) -> bool:
"""Whether call is connected."""
return self._state == GoToCallState.CONNECTED
@property
def negotiated_codec(self) -> Optional[str]:
"""The negotiated audio codec."""
return self._negotiated_codec
- LiveKit Integration \
9.1 LiveKit Overview \
LiveKit is an open-source WebRTC SFU (Selective Forwarding Unit) that provides:- Scalable real-time audio/video
- Room-based architecture
- Server-side SDKs
- Low-latency routing
9.2 LiveKit Room Architecture \
┌─────────────────────────────────────────────────────────────────────────────┐
│ LIVEKIT ROOM ARCHITECTURE │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ LiveKit Server │ │
│ │ │ │
│ │ ┌──────────────────────────────────────────────────────────────┐ │ │
│ │ │ Room: call_{call_id} │ │ │
│ │ │ │ │ │
│ │ │ PARTICIPANTS: │ │ │
│ │ │ │ │ │
│ │ │ ┌─────────────────┐ ┌─────────────────┐ │ │ │
│ │ │ │ bridge_{id} │ │ agent_{id} │ │ │ │
│ │ │ │ │ │ │ │ │ │
│ │ │ │ Tracks: │ │ Tracks: │ │ │ │
│ │ │ │ • caller_audio │◀─────▶│ • agent_audio │ │ │ │
│ │ │ │ (publish) │ │ (publish) │ │ │ │
│ │ │ │ │ │ │ │ │ │
│ │ │ │ Subscriptions: │ │ Subscriptions: │ │ │ │
│ │ │ │ • agent_audio │ │ • caller_audio │ │ │ │
│ │ │ │ │ │ │ │ │ │
│ │ │ └─────────────────┘ └─────────────────┘ │ │ │
│ │ │ │ │ │
│ │ └──────────────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ └──────────────────────────────────────────────────────────────────────┘ │
│ │
│ AUDIO FLOW: │
│ │
│ Caller → GoTo → Bridge → [caller_audio] → Agent Worker │
│ │
│ Agent Worker → [agent_audio] → Bridge → GoTo → Caller │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
9.3 LiveKit Connection Handler \
# bridge/livekit/connection_handler.py
import asyncio
from dataclasses import dataclass
from typing import Optional, Callable, AsyncIterator
from livekit import rtc, api
import numpy as np
@dataclass
class LiveKitConfig:
"""Configuration for LiveKit connection."""
url: str
api_key: str
api_secret: str
room_prefix: str = "call_"
@dataclass
class LiveKitRoomInfo:
"""Information about a LiveKit room."""
room_name: str
participant_identity: str
participant_name: str
class LiveKitConnectionHandler:
"""
Manages connection to LiveKit for voice pipeline integration.
Handles room creation, audio publishing, and subscribing
to agent audio.
"""
def __init__(
self,
config: LiveKitConfig,
call_id: str,
):
self.config = config
self.call_id = call_id
# Room info
self.room_info = LiveKitRoomInfo(
room_name=f"{config.room_prefix}{call_id}",
participant_identity=f"bridge_{call_id}",
participant_name="WebRTC Bridge",
)
# LiveKit components
self._room: Optional[rtc.Room] = None
self._api = api.LiveKitAPI(
url=config.url,
api_key=config.api_key,
api_secret=config.api_secret,
)
# Audio tracks
self._local_source: Optional[rtc.AudioSource] = None
self._local_track: Optional[rtc.LocalAudioTrack] = None
self._remote_tracks: dict[str, rtc.RemoteAudioTrack] = {}
# Callbacks
self.on_agent_audio: Optional[Callable] = None
self.on_connected: Optional[Callable] = None
self.on_disconnected: Optional[Callable] = None
# State
self._connected = False
self._audio_task: Optional[asyncio.Task] = None
async def connect(self) -> None:
"""Connect to LiveKit and join the room."""
# Create room if it doesn't exist
try:
await self._api.room.create_room(
api.CreateRoomRequest(name=self.room_info.room_name)
)
except Exception:
# Room may already exist
pass
# Generate access token
token = api.AccessToken(
self.config.api_key,
self.config.api_secret,
)
token.with_identity(self.room_info.participant_identity)
token.with_name(self.room_info.participant_name)
token.with_grants(api.VideoGrants(
room_join=True,
room=self.room_info.room_name,
can_publish=True,
can_subscribe=True,
))
# Create room
self._room = rtc.Room()
# Set up event handlers
self._room.on("participant_connected", self._on_participant_connected)
self._room.on("participant_disconnected", self._on_participant_disconnected)
self._room.on("track_subscribed", self._on_track_subscribed)
self._room.on("track_unsubscribed", self._on_track_unsubscribed)
self._room.on("disconnected", self._on_disconnected)
# Connect to room
await self._room.connect(
self.config.url,
token.to_jwt(),
)
self._connected = True
logger.info(f"[{self.call_id}] Connected to LiveKit room: {self.room_info.room_name}")
# Create and publish local audio track
await self._setup_local_audio()
if self.on_connected:
await self.on_connected()
async def _setup_local_audio(self) -> None:
"""Set up local audio track for publishing caller audio."""
# Create audio source
self._local_source = rtc.AudioSource(
sample_rate=48000,
num_channels=1,
)
# Create track from source
self._local_track = rtc.LocalAudioTrack.create_audio_track(
"caller_audio",
self._local_source,
)
# Publish track
options = rtc.TrackPublishOptions(
source=rtc.TrackSource.SOURCE_MICROPHONE,
)
await self._room.local_participant.publish_track(
self._local_track,
options,
)
logger.info(f"[{self.call_id}] Published caller audio track")
async def publish_audio(self, audio_data: np.ndarray) -> None:
"""
Publish audio data to LiveKit.
Args:
audio_data: PCM audio samples (int16, 48kHz, mono)
"""
if not self._local_source or not self._connected:
return
# Create audio frame
frame = rtc.AudioFrame(
data=audio_data.tobytes(),
sample_rate=48000,
num_channels=1,
samples_per_channel=len(audio_data),
)
# Capture frame to source
await self._local_source.capture_frame(frame)
async def _on_participant_connected(
self,
participant: rtc.RemoteParticipant,
) -> None:
"""Handle new participant joining."""
logger.info(
f"[{self.call_id}] Participant connected: {participant.identity}"
)
async def _on_participant_disconnected(
self,
participant: rtc.RemoteParticipant,
) -> None:
"""Handle participant leaving."""
logger.info(
f"[{self.call_id}] Participant disconnected: {participant.identity}"
)
async def _on_track_subscribed(
self,
track: rtc.Track,
publication: rtc.RemoteTrackPublication,
participant: rtc.RemoteParticipant,
) -> None:
"""Handle subscribing to a remote track."""
if track.kind != rtc.TrackKind.KIND_AUDIO:
return
logger.info(
f"[{self.call_id}] Subscribed to audio track from {participant.identity}"
)
# Store track
self._remote_tracks[participant.identity] = track
# Start receiving audio
if self.on_agent_audio:
self._audio_task = asyncio.create_task(
self._receive_audio(track, participant.identity)
)
async def _receive_audio(
self,
track: rtc.RemoteAudioTrack,
participant_id: str,
) -> None:
"""Receive audio frames from a track."""
audio_stream = rtc.AudioStream(track)
async for frame_event in audio_stream:
frame = frame_event.frame
if self.on_agent_audio:
# Convert to numpy
audio_data = np.frombuffer(
frame.data,
dtype=np.int16,
)
await self.on_agent_audio(audio_data, participant_id)
async def _on_track_unsubscribed(
self,
track: rtc.Track,
publication: rtc.RemoteTrackPublication,
participant: rtc.RemoteParticipant,
) -> None:
"""Handle unsubscribing from a remote track."""
if participant.identity in self._remote_tracks:
del self._remote_tracks[participant.identity]
async def _on_disconnected(self) -> None:
"""Handle disconnection from room."""
self._connected = False
logger.warning(f"[{self.call_id}] Disconnected from LiveKit")
if self.on_disconnected:
await self.on_disconnected()
async def disconnect(self) -> None:
"""Disconnect from LiveKit room."""
if self._audio_task:
self._audio_task.cancel()
try:
await self._audio_task
except asyncio.CancelledError:
pass
if self._room:
await self._room.disconnect()
self._room = None
self._connected = False
logger.info(f"[{self.call_id}] Disconnected from LiveKit")
async def delete_room(self) -> None:
"""Delete the LiveKit room after call ends."""
try:
await self._api.room.delete_room(
api.DeleteRoomRequest(room=self.room_info.room_name)
)
logger.info(f"[{self.call_id}] Deleted LiveKit room")
except Exception as e:
logger.warning(f"[{self.call_id}] Failed to delete room: {e}")
@property
def is_connected(self) -> bool:
"""Whether connected to LiveKit."""
return self._connected
class LiveKitTokenGenerator:
"""
Generate LiveKit access tokens.
Provides tokens for different participant types.
"""
def __init__(self, api_key: str, api_secret: str):
self.api_key = api_key
self.api_secret = api_secret
def generate_bridge_token(
self,
room_name: str,
call_id: str,
) -> str:
"""Generate token for WebRTC bridge."""
token = api.AccessToken(self.api_key, self.api_secret)
token.with_identity(f"bridge_{call_id}")
token.with_name("WebRTC Bridge")
token.with_grants(api.VideoGrants(
room_join=True,
room=room_name,
can_publish=True,
can_subscribe=True,
))
return token.to_jwt()
def generate_agent_token(
self,
room_name: str,
agent_id: str,
) -> str:
"""Generate token for agent worker."""
token = api.AccessToken(self.api_key, self.api_secret)
token.with_identity(f"agent_{agent_id}")
token.with_name("AI Agent")
token.with_grants(api.VideoGrants(
room_join=True,
room=room_name,
can_publish=True,
can_subscribe=True,
))
return token.to_jwt()
- Bidirectional Bridge \
10.1 Bridge Architecture \
┌─────────────────────────────────────────────────────────────────────────────┐
│ BIDIRECTIONAL BRIDGE ARCHITECTURE │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌───────────────────┐ │
│ │ AudioBridge │ │
│ │ │ │
│ ┌──────────────────────┤ ┌───────────┐ ├──────────────────────┐ │
│ │ │ │ Router │ │ │ │
│ │ GoToConnection │ └─────┬─────┘ │ LiveKitConnection │ │
│ │ │ │ │ │ │
│ │ ┌───────────────┐ │ ┌─────┴─────┐ │ ┌───────────────┐ │ │
│ │ │ Remote Track │───┼──▶│ Inbound │───┼──▶│ Audio Source │ │ │
│ │ │ (from caller) │ │ │ Pipeline │ │ │ (to LiveKit) │ │ │
│ │ └───────────────┘ │ └───────────┘ │ └───────────────┘ │ │
│ │ │ │ │ │
│ │ ┌───────────────┐ │ ┌───────────┐ │ ┌───────────────┐ │ │
│ │ │ Local Track │◀──┼───│ Outbound │◀──┼───│ Audio Sink │ │ │
│ │ │ (to caller) │ │ │ Pipeline │ │ │ (from Agent) │ │ │
│ │ └───────────────┘ │ └───────────┘ │ └───────────────┘ │ │
│ │ │ │ │ │
│ └──────────────────────┤ ├──────────────────────┘ │
│ │ │ │
│ └───────────────────┘ │
│ │
│ INBOUND PIPELINE (Caller → Agent): │
│ ────────────────────────────────── │
│ 1. Receive RTP from GoTo │
│ 2. Decode (Opus/G.711 → PCM) │
│ 3. Resample (48kHz → 16kHz for STT, 48kHz for LiveKit) │
│ 4. Fork: Send to LiveKit AND to STT │
│ │
│ OUTBOUND PIPELINE (Agent → Caller): │
│ ─────────────────────────────────── │
│ 1. Receive from LiveKit (TTS output) │
│ 2. Resample (24kHz → 48kHz) │
│ 3. Encode (PCM → negotiated codec) │
│ 4. Send RTP to GoTo │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
10.2 Audio Bridge Implementation \
# bridge/audio_bridge.py
import asyncio
from dataclasses import dataclass
from typing import Optional, Callable
import numpy as np
from bridge.goto.connection_handler import GoToConnectionHandler, GoToCallInfo
from bridge.livekit.connection_handler import LiveKitConnectionHandler, LiveKitConfig
from bridge.audio.resampler import MultiRateResampler
from bridge.audio.codecs import CodecManager
from bridge.audio.buffer import AudioBuffer
@dataclass
class BridgeConfig:
"""Configuration for the audio bridge."""
# Sample rates
goto_sample_rate: int = 48000
livekit_sample_rate: int = 48000
stt_sample_rate: int = 16000
tts_sample_rate: int = 24000
# Buffer sizes
inbound_buffer_ms: int = 100
outbound_buffer_ms: int = 100
# Frame sizes
frame_duration_ms: int = 20
class AudioBridge:
"""
Bridges audio between GoToConnect and LiveKit.
Handles all audio routing, format conversion,
and synchronization between the two endpoints.
"""
def __init__(
self,
call_id: str,
call_info: GoToCallInfo,
goto_client: "GoToCallControlClient",
event_listener: "GoToEventListener",
livekit_config: LiveKitConfig,
bridge_config: BridgeConfig = None,
):
self.call_id = call_id
self.config = bridge_config or BridgeConfig()
# Connection handlers
self._goto = GoToConnectionHandler(
call_info=call_info,
goto_client=goto_client,
event_listener=event_listener,
)
self._livekit = LiveKitConnectionHandler(
config=livekit_config,
call_id=call_id,
)
# Audio processing
self._resampler = MultiRateResampler()
self._codec_manager = CodecManager()
# Buffers
self._inbound_buffer = AudioBuffer(
max_duration_ms=self.config.inbound_buffer_ms,
sample_rate=self.config.stt_sample_rate,
)
self._outbound_buffer = AudioBuffer(
max_duration_ms=self.config.outbound_buffer_ms,
sample_rate=self.config.goto_sample_rate,
)
# Callbacks for voice pipeline
self.on_caller_audio: Optional[Callable] = None # For STT
# State
self._running = False
self._outbound_task: Optional[asyncio.Task] = None
async def initialize(self) -> None:
"""Initialize both connections."""
# Initialize GoTo connection
await self._goto.initialize()
self._goto.on_audio_frame = self._handle_goto_audio
self._goto.on_connected = self._on_goto_connected
# Connect to LiveKit
await self._livekit.connect()
self._livekit.on_agent_audio = self._handle_agent_audio
logger.info(f"[{self.call_id}] Audio bridge initialized")
async def handle_inbound_call(self, sdp_offer: str) -> str:
"""
Handle an inbound call.
Args:
sdp_offer: SDP offer from GoToConnect
Returns:
SDP answer
"""
return await self._goto.handle_inbound_call(sdp_offer)
async def start(self) -> None:
"""Start audio bridging."""
self._running = True
# Start outbound audio task
self._outbound_task = asyncio.create_task(
self._outbound_audio_loop()
)
logger.info(f"[{self.call_id}] Audio bridge started")
async def stop(self) -> None:
"""Stop audio bridging."""
self._running = False
if self._outbound_task:
self._outbound_task.cancel()
try:
await self._outbound_task
except asyncio.CancelledError:
pass
# Disconnect both ends
await self._goto.close()
await self._livekit.disconnect()
await self._livekit.delete_room()
logger.info(f"[{self.call_id}] Audio bridge stopped")
async def _on_goto_connected(self) -> None:
"""Called when GoTo connection is established."""
await self.start()
async def _handle_goto_audio(self, frame: "AudioFrame") -> None:
"""
Handle audio from GoToConnect (caller).
Routes to:
1. LiveKit (for agent to hear)
2. Voice pipeline (for STT)
"""
if not self._running:
return
# Convert frame to numpy
from bridge.audio.frames import AudioFrameProcessor
pcm_48k = AudioFrameProcessor.frame_to_numpy(frame)
# Ensure mono
if pcm_48k.ndim == 2 and pcm_48k.shape[1] > 1:
from bridge.audio.converter import AudioConverter
pcm_48k = AudioConverter.stereo_to_mono(pcm_48k)
# Route to LiveKit (48kHz)
await self._livekit.publish_audio(pcm_48k)
# Resample for STT (16kHz)
pcm_16k = self._resampler.resample(
pcm_48k,
input_rate=self.config.goto_sample_rate,
output_rate=self.config.stt_sample_rate,
)
# Send to voice pipeline
if self.on_caller_audio:
await self.on_caller_audio(pcm_16k)
async def _handle_agent_audio(
self,
audio_data: np.ndarray,
participant_id: str,
) -> None:
"""
Handle audio from agent (LiveKit).
Routes to GoToConnect (to caller).
"""
if not self._running:
return
# Audio arrives at 48kHz from LiveKit
# May need to resample if TTS outputs different rate
# Buffer for smooth playback
await self._outbound_buffer.write(audio_data.reshape(-1, 1))
async def _outbound_audio_loop(self) -> None:
"""
Send buffered audio to GoToConnect.
Runs continuously to ensure smooth audio delivery.
"""
frame_samples = int(
self.config.goto_sample_rate *
self.config.frame_duration_ms / 1000
)
frame_interval = self.config.frame_duration_ms / 1000
while self._running:
try:
# Read frame from buffer
audio = await self._outbound_buffer.read(frame_samples)
# Ensure correct shape
if audio.ndim == 2:
audio = audio[:, 0]
# Send to GoTo
await self._goto.send_audio(audio)
# Pace ourselves
await asyncio.sleep(frame_interval * 0.9)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"[{self.call_id}] Outbound audio error: {e}")
await asyncio.sleep(0.01)
async def send_tts_audio(self, audio_data: np.ndarray, sample_rate: int) -> None:
"""
Send TTS audio to the caller.
This is called by the voice pipeline when TTS
generates audio to be played to the caller.
Args:
audio_data: PCM audio from TTS
sample_rate: Sample rate of TTS audio
"""
# Resample if needed
if sample_rate != self.config.goto_sample_rate:
audio_data = self._resampler.resample(
audio_data,
input_rate=sample_rate,
output_rate=self.config.goto_sample_rate,
)
# Add to outbound buffer
await self._outbound_buffer.write(audio_data.reshape(-1, 1))
def clear_outbound_audio(self) -> None:
"""Clear outbound audio buffer (for interruption handling)."""
self._outbound_buffer.clear()
@property
def is_connected(self) -> bool:
"""Whether both connections are active."""
return self._goto.is_connected and self._livekit.is_connected
class BridgeManager:
"""
Manages multiple audio bridges for concurrent calls.
"""
def __init__(
self,
goto_client: "GoToCallControlClient",
event_listener: "GoToEventListener",
livekit_config: LiveKitConfig,
):
self.goto_client = goto_client
self.event_listener = event_listener
self.livekit_config = livekit_config
self._bridges: dict[str, AudioBridge] = {}
self._lock = asyncio.Lock()
async def create_bridge(
self,
call_id: str,
call_info: GoToCallInfo,
) -> AudioBridge:
"""Create a new audio bridge for a call."""
async with self._lock:
if call_id in self._bridges:
raise ValueError(f"Bridge already exists for call {call_id}")
bridge = AudioBridge(
call_id=call_id,
call_info=call_info,
goto_client=self.goto_client,
event_listener=self.event_listener,
livekit_config=self.livekit_config,
)
await bridge.initialize()
self._bridges[call_id] = bridge
logger.info(f"Created bridge for call {call_id}")
return bridge
async def get_bridge(self, call_id: str) -> Optional[AudioBridge]:
"""Get an existing bridge."""
return self._bridges.get(call_id)
async def remove_bridge(self, call_id: str) -> None:
"""Remove and clean up a bridge."""
async with self._lock:
bridge = self._bridges.pop(call_id, None)
if bridge:
await bridge.stop()
logger.info(f"Removed bridge for call {call_id}")
@property
def active_bridges(self) -> int:
"""Number of active bridges."""
return len(self._bridges)
10.3 Audio Forking for STT \
# bridge/audio_fork.py
import asyncio
from typing import Callable, List
import numpy as np
class AudioFork:
"""
Fork audio to multiple destinations.
Allows the same audio stream to be sent to
multiple consumers (e.g., LiveKit and STT).
"""
def __init__(self):
self._destinations: List[Callable] = []
def add_destination(self, callback: Callable) -> None:
"""Add a destination for audio."""
self._destinations.append(callback)
def remove_destination(self, callback: Callable) -> None:
"""Remove a destination."""
if callback in self._destinations:
self._destinations.remove(callback)
async def send(self, audio_data: np.ndarray) -> None:
"""Send audio to all destinations."""
tasks = [
asyncio.create_task(dest(audio_data.copy()))
for dest in self._destinations
]
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
class MultiRateAudioFork:
"""
Fork audio to destinations at different sample rates.
Maintains separate resamplers for each destination
to avoid repeated resampling.
"""
def __init__(self, source_rate: int):
self.source_rate = source_rate
self._destinations: dict[int, List[Callable]] = {}
self._resampler = MultiRateResampler()
def add_destination(
self,
callback: Callable,
target_rate: int,
) -> None:
"""Add a destination at a specific sample rate."""
if target_rate not in self._destinations:
self._destinations[target_rate] = []
self._destinations[target_rate].append(callback)
async def send(self, audio_data: np.ndarray) -> None:
"""Send audio to all destinations at their target rates."""
tasks = []
for target_rate, callbacks in self._destinations.items():
# Resample if needed
if target_rate != self.source_rate:
resampled = self._resampler.resample(
audio_data,
self.source_rate,
target_rate,
)
else:
resampled = audio_data
# Send to all callbacks at this rate
for callback in callbacks:
tasks.append(
asyncio.create_task(callback(resampled.copy()))
)
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
- Connection Lifecycle Management \
11.1 Complete Call Lifecycle \
┌─────────────────────────────────────────────────────────────────────────────┐
│ COMPLETE CALL LIFECYCLE │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ PHASE 1: SETUP │ │
│ │ │ │
│ │ 1. Webhook received: call.ringing │ │
│ │ 2. Create Bridge instance │ │
│ │ 3. Initialize GoTo WebRTC peer │ │
│ │ 4. Initialize LiveKit connection │ │
│ │ 5. Answer call via GoTo API │ │
│ │ 6. Receive SDP offer from GoTo │ │
│ │ │ │
│ │ Duration: ~500ms │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ PHASE 2: NEGOTIATION │ │
│ │ │ │
│ │ 7. Parse SDP offer, extract codecs │ │
│ │ 8. Select preferred codec (Opus > G.711) │ │
│ │ 9. Generate SDP answer │ │
│ │ 10. Send answer to GoTo │ │
│ │ 11. Begin ICE candidate exchange │ │
│ │ │ │
│ │ Duration: ~300ms │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ PHASE 3: CONNECTION │ │
│ │ │ │
│ │ 12. ICE connectivity checks │ │
│ │ 13. DTLS handshake │ │
│ │ 14. SRTP session established │ │
│ │ 15. Connection state → CONNECTED │ │
│ │ 16. Join LiveKit room │ │
│ │ 17. Publish caller audio track │ │
│ │ 18. Subscribe to agent audio track │ │
│ │ │ │
│ │ Duration: ~500-2000ms │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ PHASE 4: ACTIVE CALL │ │
│ │ │ │
│ │ 19. Bidirectional audio streaming │ │
│ │ 20. Continuous health monitoring │ │
│ │ 21. Handle hold/resume if needed │ │
│ │ 22. Handle network transitions (ICE restart) │ │
│ │ │ │
│ │ Duration: Call duration (seconds to hours) │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ PHASE 5: TEARDOWN │ │
│ │ │ │
│ │ 23. Call ended (hangup, timeout, error) │ │
│ │ 24. Stop audio processing │ │
│ │ 25. Leave LiveKit room │ │
│ │ 26. Close GoTo WebRTC connection │ │
│ │ 27. Delete LiveKit room │ │
│ │ 28. Clean up resources │ │
│ │ 29. Log final metrics │ │
│ │ │ │
│ │ Duration: ~200ms │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
11.2 Lifecycle Manager Implementation \
# bridge/lifecycle/manager.py
import asyncio
import time
from dataclasses import dataclass, field
from typing import Optional, Callable, Dict, Any
from enum import Enum
import logging
logger = logging.getLogger(__name__)
class BridgePhase(Enum):
"""Bridge lifecycle phases."""
CREATED = "created"
INITIALIZING = "initializing"
NEGOTIATING = "negotiating"
CONNECTING = "connecting"
CONNECTED = "connected"
ACTIVE = "active"
DISCONNECTING = "disconnecting"
TERMINATED = "terminated"
FAILED = "failed"
@dataclass
class LifecycleMetrics:
"""Metrics collected during bridge lifecycle."""
created_at: float = 0.0
initialized_at: float = 0.0
negotiation_started_at: float = 0.0
negotiation_completed_at: float = 0.0
connected_at: float = 0.0
first_audio_at: float = 0.0
terminated_at: float = 0.0
total_audio_frames_received: int = 0
total_audio_frames_sent: int = 0
total_bytes_received: int = 0
total_bytes_sent: int = 0
ice_candidates_sent: int = 0
ice_candidates_received: int = 0
reconnection_attempts: int = 0
@property
def time_to_connect(self) -> Optional[float]:
"""Time from creation to connected state."""
if self.connected_at and self.created_at:
return self.connected_at - self.created_at
return None
@property
def time_to_first_audio(self) -> Optional[float]:
"""Time from creation to first audio frame."""
if self.first_audio_at and self.created_at:
return self.first_audio_at - self.created_at
return None
@property
def call_duration(self) -> Optional[float]:
"""Total call duration."""
if self.terminated_at and self.connected_at:
return self.terminated_at - self.connected_at
return None
class BridgeLifecycleManager:
"""
Manages the complete lifecycle of a WebRTC bridge.
Coordinates initialization, connection, active state,
and teardown across all bridge components.
"""
def __init__(
self,
call_id: str,
bridge: "AudioBridge",
):
self.call_id = call_id
self.bridge = bridge
# State
self._phase = BridgePhase.CREATED
self._phase_lock = asyncio.Lock()
# Metrics
self.metrics = LifecycleMetrics(created_at=time.time())
# Callbacks
self._on_phase_change: Optional[Callable] = None
self._on_error: Optional[Callable] = None
# Timeouts
self._phase_timeouts: Dict[BridgePhase, float] = {
BridgePhase.INITIALIZING: 5.0,
BridgePhase.NEGOTIATING: 10.0,
BridgePhase.CONNECTING: 30.0,
}
# Timeout task
self._timeout_task: Optional[asyncio.Task] = None
async def initialize(self) -> None:
"""Initialize the bridge."""
await self._transition_to(BridgePhase.INITIALIZING)
try:
await self.bridge.initialize()
self.metrics.initialized_at = time.time()
logger.info(f"[{self.call_id}] Bridge initialized")
except Exception as e:
logger.error(f"[{self.call_id}] Initialization failed: {e}")
await self._transition_to(BridgePhase.FAILED)
raise
async def start_negotiation(self, sdp_offer: str) -> str:
"""Start SDP negotiation."""
await self._transition_to(BridgePhase.NEGOTIATING)
self.metrics.negotiation_started_at = time.time()
try:
answer = await self.bridge.handle_inbound_call(sdp_offer)
self.metrics.negotiation_completed_at = time.time()
await self._transition_to(BridgePhase.CONNECTING)
logger.info(f"[{self.call_id}] Negotiation completed")
return answer
except Exception as e:
logger.error(f"[{self.call_id}] Negotiation failed: {e}")
await self._transition_to(BridgePhase.FAILED)
raise
async def on_connected(self) -> None:
"""Called when connection is established."""
await self._transition_to(BridgePhase.CONNECTED)
self.metrics.connected_at = time.time()
# Start active call processing
await self.bridge.start()
await self._transition_to(BridgePhase.ACTIVE)
logger.info(
f"[{self.call_id}] Connected in "
f"{self.metrics.time_to_connect:.2f}s"
)
async def on_first_audio(self) -> None:
"""Called when first audio frame is received."""
if self.metrics.first_audio_at == 0:
self.metrics.first_audio_at = time.time()
logger.info(
f"[{self.call_id}] First audio in "
f"{self.metrics.time_to_first_audio:.2f}s"
)
async def terminate(self, reason: str = "normal") -> None:
"""Terminate the bridge."""
if self._phase in (BridgePhase.TERMINATED, BridgePhase.FAILED):
return
await self._transition_to(BridgePhase.DISCONNECTING)
try:
await self.bridge.stop()
except Exception as e:
logger.warning(f"[{self.call_id}] Error during stop: {e}")
self.metrics.terminated_at = time.time()
await self._transition_to(BridgePhase.TERMINATED)
logger.info(
f"[{self.call_id}] Terminated ({reason}), "
f"duration: {self.metrics.call_duration:.1f}s"
)
async def _transition_to(self, new_phase: BridgePhase) -> None:
"""Transition to a new phase."""
async with self._phase_lock:
old_phase = self._phase
self._phase = new_phase
# Cancel existing timeout
if self._timeout_task:
self._timeout_task.cancel()
self._timeout_task = None
# Set new timeout if applicable
if new_phase in self._phase_timeouts:
timeout = self._phase_timeouts[new_phase]
self._timeout_task = asyncio.create_task(
self._phase_timeout(new_phase, timeout)
)
logger.debug(
f"[{self.call_id}] Phase: {old_phase.value} → {new_phase.value}"
)
if self._on_phase_change:
await self._on_phase_change(old_phase, new_phase)
async def _phase_timeout(self, phase: BridgePhase, timeout: float) -> None:
"""Handle phase timeout."""
try:
await asyncio.sleep(timeout)
# Check if still in this phase
if self._phase == phase:
logger.error(
f"[{self.call_id}] Timeout in phase {phase.value} "
f"after {timeout}s"
)
await self._transition_to(BridgePhase.FAILED)
if self._on_error:
await self._on_error(f"Timeout in {phase.value}")
except asyncio.CancelledError:
pass
@property
def phase(self) -> BridgePhase:
"""Current lifecycle phase."""
return self._phase
@property
def is_active(self) -> bool:
"""Whether bridge is in active call state."""
return self._phase == BridgePhase.ACTIVE
def on_phase_change(self, callback: Callable) -> None:
"""Set phase change callback."""
self._on_phase_change = callback
def on_error(self, callback: Callable) -> None:
"""Set error callback."""
self._on_error = callback
11.3 Graceful Shutdown \
# bridge/lifecycle/shutdown.py
import asyncio
import signal
from typing import Set
import logging
logger = logging.getLogger(__name__)
class GracefulShutdown:
"""
Manages graceful shutdown of all active bridges.
Ensures calls are properly terminated when the
service is stopped.
"""
def __init__(self, bridge_manager: "BridgeManager"):
self.bridge_manager = bridge_manager
self._shutdown_event = asyncio.Event()
self._active_shutdowns: Set[asyncio.Task] = set()
def setup_signal_handlers(self) -> None:
"""Set up signal handlers for graceful shutdown."""
loop = asyncio.get_event_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(
sig,
lambda s=sig: asyncio.create_task(self._handle_signal(s))
)
async def _handle_signal(self, sig: signal.Signals) -> None:
"""Handle shutdown signal."""
logger.info(f"Received signal {sig.name}, initiating graceful shutdown")
self._shutdown_event.set()
await self.shutdown_all_bridges()
async def shutdown_all_bridges(
self,
timeout: float = 30.0,
reason: str = "service_shutdown",
) -> None:
"""
Shutdown all active bridges gracefully.
Args:
timeout: Maximum time to wait for shutdown
reason: Reason for shutdown (logged)
"""
bridges = list(self.bridge_manager._bridges.values())
if not bridges:
logger.info("No active bridges to shutdown")
return
logger.info(f"Shutting down {len(bridges)} active bridges")
# Create shutdown tasks
tasks = [
asyncio.create_task(self._shutdown_bridge(bridge, reason))
for bridge in bridges
]
self._active_shutdowns.update(tasks)
# Wait with timeout
try:
await asyncio.wait_for(
asyncio.gather(*tasks, return_exceptions=True),
timeout=timeout,
)
logger.info("All bridges shutdown gracefully")
except asyncio.TimeoutError:
logger.warning(
f"Shutdown timeout after {timeout}s, "
f"forcing remaining bridges"
)
# Force stop remaining
for task in tasks:
if not task.done():
task.cancel()
self._active_shutdowns.clear()
async def _shutdown_bridge(
self,
bridge: "AudioBridge",
reason: str,
) -> None:
"""Shutdown a single bridge."""
try:
# Play goodbye message if possible
# await bridge.play_announcement("call_ending")
await bridge.stop()
logger.debug(f"Bridge {bridge.call_id} shutdown complete")
except Exception as e:
logger.error(f"Error shutting down bridge {bridge.call_id}: {e}")
@property
def is_shutting_down(self) -> bool:
"""Whether shutdown is in progress."""
return self._shutdown_event.is_set()
- Error Handling and Recovery \
12.1 Error Categories \
┌─────────────────────────────────────────────────────────────────────────────┐
│ ERROR CATEGORIES │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ CATEGORY EXAMPLES RECOVERY ACTION │
│ ───────────────────────────────────────────────────────────────────── │
│ │
│ TRANSIENT • Packet loss • Automatic retry │
│ • Brief network hiccup • Buffer through it │
│ • Temporary high latency • Wait and continue │
│ │
│ RECOVERABLE • ICE disconnection • ICE restart │
│ • LiveKit reconnection • Reconnect + resume │
│ • Codec negotiation fail • Fallback codec │
│ │
│ TERMINAL • GoTo call ended • Clean shutdown │
│ • Authentication expired • Terminate + notify │
│ • Fatal connection error • Log + cleanup │
│ │
│ INFRASTRUCTURE • Memory exhaustion • Alert + graceful │
│ • CPU overload • Shed load │
│ • Disk full • Emergency cleanup │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
12.2 Error Handler Implementation \
# bridge/errors/handler.py
import asyncio
import traceback
from dataclasses import dataclass
from typing import Optional, Callable, Any
from enum import Enum
import logging
logger = logging.getLogger(__name__)
class ErrorSeverity(Enum):
"""Error severity levels."""
DEBUG = "debug"
INFO = "info"
WARNING = "warning"
ERROR = "error"
CRITICAL = "critical"
class ErrorCategory(Enum):
"""Error categories for handling decisions."""
TRANSIENT = "transient"
RECOVERABLE = "recoverable"
TERMINAL = "terminal"
INFRASTRUCTURE = "infrastructure"
@dataclass
class BridgeError:
"""Structured bridge error."""
code: str
message: str
category: ErrorCategory
severity: ErrorSeverity
details: Optional[dict] = None
cause: Optional[Exception] = None
def __str__(self) -> str:
return f"[{self.code}] {self.message}"
# Error code definitions
class ErrorCodes:
# Connection errors (1xxx)
ICE_FAILED = "1001"
ICE_DISCONNECTED = "1002"
DTLS_FAILED = "1003"
CONNECTION_TIMEOUT = "1004"
# Signaling errors (2xxx)
SDP_PARSE_ERROR = "2001"
SDP_NEGOTIATION_FAILED = "2002"
NO_COMPATIBLE_CODECS = "2003"
# Media errors (3xxx)
AUDIO_TRACK_FAILED = "3001"
CODEC_ERROR = "3002"
BUFFER_OVERFLOW = "3003"
# GoTo errors (4xxx)
GOTO_API_ERROR = "4001"
GOTO_CALL_ENDED = "4002"
GOTO_AUTH_EXPIRED = "4003"
# LiveKit errors (5xxx)
LIVEKIT_CONNECTION_FAILED = "5001"
LIVEKIT_ROOM_ERROR = "5002"
LIVEKIT_TRACK_ERROR = "5003"
# System errors (9xxx)
INTERNAL_ERROR = "9001"
RESOURCE_EXHAUSTED = "9002"
SHUTDOWN_REQUESTED = "9003"
class ErrorHandler:
"""
Centralized error handling for the bridge.
Routes errors to appropriate recovery mechanisms
based on category and severity.
"""
def __init__(
self,
call_id: str,
lifecycle_manager: "BridgeLifecycleManager",
):
self.call_id = call_id
self.lifecycle_manager = lifecycle_manager
# Recovery callbacks
self._recovery_handlers: dict[ErrorCategory, Callable] = {}
# Error history for pattern detection
self._error_history: list[BridgeError] = []
self._max_history = 100
# Circuit breaker state
self._consecutive_errors = 0
self._circuit_open = False
async def handle_error(self, error: BridgeError) -> bool:
"""
Handle an error and attempt recovery.
Returns:
True if recovered, False if terminal
"""
# Log the error
self._log_error(error)
# Record in history
self._error_history.append(error)
if len(self._error_history) > self._max_history:
self._error_history.pop(0)
# Check circuit breaker
if self._circuit_open:
logger.warning(f"[{self.call_id}] Circuit breaker open, skipping recovery")
return False
# Update consecutive error count
self._consecutive_errors += 1
if self._consecutive_errors >= 5:
logger.error(f"[{self.call_id}] Too many consecutive errors, opening circuit")
self._circuit_open = True
return False
# Route to appropriate handler
try:
if error.category == ErrorCategory.TRANSIENT:
return await self._handle_transient(error)
elif error.category == ErrorCategory.RECOVERABLE:
return await self._handle_recoverable(error)
elif error.category == ErrorCategory.TERMINAL:
return await self._handle_terminal(error)
elif error.category == ErrorCategory.INFRASTRUCTURE:
return await self._handle_infrastructure(error)
else:
return False
except Exception as e:
logger.error(f"[{self.call_id}] Error during recovery: {e}")
return False
def clear_error_state(self) -> None:
"""Clear error state after successful operation."""
self._consecutive_errors = 0
self._circuit_open = False
async def _handle_transient(self, error: BridgeError) -> bool:
"""Handle transient errors."""
# Usually just log and continue
logger.debug(f"[{self.call_id}] Transient error, continuing: {error}")
return True
async def _handle_recoverable(self, error: BridgeError) -> bool:
"""Handle recoverable errors."""
handler = self._recovery_handlers.get(error.category)
if handler:
return await handler(error)
# Default recovery based on error code
if error.code == ErrorCodes.ICE_DISCONNECTED:
return await self._attempt_ice_restart()
elif error.code == ErrorCodes.LIVEKIT_CONNECTION_FAILED:
return await self._attempt_livekit_reconnect()
elif error.code == ErrorCodes.NO_COMPATIBLE_CODECS:
return await self._attempt_codec_fallback()
return False
async def _handle_terminal(self, error: BridgeError) -> bool:
"""Handle terminal errors."""
logger.error(f"[{self.call_id}] Terminal error: {error}")
# Terminate the bridge
await self.lifecycle_manager.terminate(reason=str(error))
return False
async def _handle_infrastructure(self, error: BridgeError) -> bool:
"""Handle infrastructure errors."""
logger.critical(f"[{self.call_id}] Infrastructure error: {error}")
# Alert operations
# await alert_ops(error)
return False
async def _attempt_ice_restart(self) -> bool:
"""Attempt ICE restart."""
logger.info(f"[{self.call_id}] Attempting ICE restart")
# Implementation would trigger ICE restart
# through the bridge's WebRTC peer
return True # Optimistically return true
async def _attempt_livekit_reconnect(self) -> bool:
"""Attempt LiveKit reconnection."""
logger.info(f"[{self.call_id}] Attempting LiveKit reconnect")
# Implementation would reconnect to LiveKit
return True
async def _attempt_codec_fallback(self) -> bool:
"""Attempt codec fallback."""
logger.info(f"[{self.call_id}] Attempting codec fallback")
# Implementation would renegotiate with fallback codec
return False # Usually requires re-negotiation
def _log_error(self, error: BridgeError) -> None:
"""Log error with appropriate level."""
log_message = (
f"[{self.call_id}] {error.category.value.upper()} ERROR: "
f"{error.code} - {error.message}"
)
if error.details:
log_message += f" | Details: {error.details}"
if error.severity == ErrorSeverity.DEBUG:
logger.debug(log_message)
elif error.severity == ErrorSeverity.INFO:
logger.info(log_message)
elif error.severity == ErrorSeverity.WARNING:
logger.warning(log_message)
elif error.severity == ErrorSeverity.ERROR:
logger.error(log_message)
elif error.severity == ErrorSeverity.CRITICAL:
logger.critical(log_message)
if error.cause:
logger.critical(
f"[{self.call_id}] Traceback:\n"
f"{''.join(traceback.format_exception(type(error.cause), error.cause, error.cause.__traceback__))}"
)
def register_recovery_handler(
self,
category: ErrorCategory,
handler: Callable,
) -> None:
"""Register a custom recovery handler."""
self._recovery_handlers[category] = handler
12.3 ICE Restart Manager \
# bridge/errors/ice_restart.py
import asyncio
import time
from typing import Optional
import logging
logger = logging.getLogger(__name__)
class ICERestartManager:
"""
Manages ICE restart procedures.
Handles automatic ICE restarts when connectivity
is lost but call should continue.
"""
def __init__(
self,
goto_peer: "GoToWebRTCPeer",
max_restarts: int = 3,
cooldown_seconds: float = 5.0,
):
self.goto_peer = goto_peer
self.max_restarts = max_restarts
self.cooldown_seconds = cooldown_seconds
self._restart_count = 0
self._last_restart_time: Optional[float] = None
self._restart_in_progress = False
async def should_restart(self) -> bool:
"""Check if ICE restart should be attempted."""
if self._restart_in_progress:
return False
if self._restart_count >= self.max_restarts:
logger.warning("Maximum ICE restarts reached")
return False
if self._last_restart_time:
elapsed = time.time() - self._last_restart_time
if elapsed < self.cooldown_seconds:
logger.debug(f"ICE restart cooldown ({elapsed:.1f}s)")
return False
return True
async def perform_restart(self) -> bool:
"""
Perform ICE restart.
Returns:
True if restart succeeded
"""
if not await self.should_restart():
return False
self._restart_in_progress = True
self._restart_count += 1
self._last_restart_time = time.time()
logger.info(f"Performing ICE restart ({self._restart_count}/{self.max_restarts})")
try:
# Create new offer with ICE restart flag
# This is specific to how GoTo handles ICE restart
# In aiortc, you'd create a new offer:
# offer = await self.goto_peer._pc.createOffer()
# Then signal it to GoTo
# Wait for new ICE gathering
await self.goto_peer.wait_for_ice_gathering(timeout=10.0)
logger.info("ICE restart completed successfully")
return True
except Exception as e:
logger.error(f"ICE restart failed: {e}")
return False
finally:
self._restart_in_progress = False
def reset(self) -> None:
"""Reset restart counter."""
self._restart_count = 0
self._last_restart_time = None
- Performance Optimization \
13.1 Performance Targets \
┌─────────────────────────────────────────────────────────────────────────────┐
│ PERFORMANCE TARGETS │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ METRIC TARGET ACCEPTABLE CRITICAL │
│ ───────────────────────────────────────────────────────────────────── │
│ │
│ End-to-end latency < 150ms < 300ms > 500ms │
│ Bridge processing < 5ms < 10ms > 20ms │
│ Frame jitter < 10ms < 20ms > 50ms │
│ │
│ CPU per call < 2% < 5% > 10% │
│ Memory per call < 50MB < 100MB > 200MB │
│ │
│ Connection setup < 2s < 5s > 10s │
│ ICE gathering < 1s < 3s > 5s │
│ │
│ Packet loss tolerance < 5% < 10% > 20% │
│ Audio quality (MOS) > 4.0 > 3.5 < 3.0 │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
13.2 Audio Buffer Optimization \
# bridge/performance/buffer.py
import numpy as np
from collections import deque
from typing import Optional
import asyncio
class OptimizedAudioBuffer:
"""
High-performance circular audio buffer.
Uses numpy for efficient memory operations
and avoids Python object overhead.
"""
def __init__(
self,
max_duration_ms: int = 500,
sample_rate: int = 48000,
channels: int = 1,
):
self.sample_rate = sample_rate
self.channels = channels
# Pre-allocate buffer
max_samples = int(sample_rate * max_duration_ms / 1000)
self._buffer = np.zeros((max_samples, channels), dtype=np.int16)
# Ring buffer pointers
self._write_pos = 0
self._read_pos = 0
self._available = 0
# Lock for thread safety
self._lock = asyncio.Lock()
async def write(self, data: np.ndarray) -> int:
"""
Write audio data to buffer.
Args:
data: Audio samples (samples, channels)
Returns:
Number of samples written
"""
async with self._lock:
samples = len(data)
buffer_size = len(self._buffer)
# Check for overflow
available_space = buffer_size - self._available
if samples > available_space:
# Overwrite oldest data
overflow = samples - available_space
self._read_pos = (self._read_pos + overflow) % buffer_size
self._available -= overflow
# Write data (may wrap around)
end_pos = (self._write_pos + samples) % buffer_size
if end_pos > self._write_pos:
# No wrap
self._buffer[self._write_pos:end_pos] = data
else:
# Wrap around
first_part = buffer_size - self._write_pos
self._buffer[self._write_pos:] = data[:first_part]
self._buffer[:end_pos] = data[first_part:]
self._write_pos = end_pos
self._available += samples
return samples
async def read(self, samples: int) -> np.ndarray:
"""
Read audio data from buffer.
Args:
samples: Number of samples to read
Returns:
Audio data (samples, channels)
"""
async with self._lock:
if self._available < samples:
# Not enough data, return what we have + silence
actual = self._available
silence_needed = samples - actual
if actual == 0:
return np.zeros((samples, self.channels), dtype=np.int16)
data = self._read_internal(actual)
silence = np.zeros((silence_needed, self.channels), dtype=np.int16)
return np.vstack([data, silence])
return self._read_internal(samples)
def _read_internal(self, samples: int) -> np.ndarray:
"""Internal read without lock."""
buffer_size = len(self._buffer)
end_pos = (self._read_pos + samples) % buffer_size
if end_pos > self._read_pos:
# No wrap
data = self._buffer[self._read_pos:end_pos].copy()
else:
# Wrap around
first_part = buffer_size - self._read_pos
data = np.vstack([
self._buffer[self._read_pos:],
self._buffer[:end_pos]
])
self._read_pos = end_pos
self._available -= samples
return data
@property
def available_samples(self) -> int:
"""Number of samples available."""
return self._available
@property
def available_ms(self) -> float:
"""Duration available in milliseconds."""
return (self._available / self.sample_rate) * 1000
def clear(self) -> None:
"""Clear the buffer."""
self._write_pos = 0
self._read_pos = 0
self._available = 0
13.3 Connection Pooling \
# bridge/performance/pool.py
import asyncio
from typing import Dict, Optional
from dataclasses import dataclass
import time
@dataclass
class PooledConnection:
"""A pooled WebRTC-related connection."""
connection: any
created_at: float
last_used: float
in_use: bool = False
class LiveKitConnectionPool:
"""
Pool of pre-warmed LiveKit connections.
Reduces connection setup time by maintaining
ready-to-use connections.
"""
def __init__(
self,
config: "LiveKitConfig",
min_connections: int = 2,
max_connections: int = 10,
max_idle_seconds: float = 300,
):
self.config = config
self.min_connections = min_connections
self.max_connections = max_connections
self.max_idle_seconds = max_idle_seconds
self._pool: Dict[str, PooledConnection] = {}
self._lock = asyncio.Lock()
self._maintenance_task: Optional[asyncio.Task] = None
async def start(self) -> None:
"""Start the pool and warm up connections."""
# Pre-create minimum connections
for _ in range(self.min_connections):
await self._create_connection()
# Start maintenance task
self._maintenance_task = asyncio.create_task(
self._maintenance_loop()
)
async def stop(self) -> None:
"""Stop the pool and close all connections."""
if self._maintenance_task:
self._maintenance_task.cancel()
async with self._lock:
for conn_id, pooled in self._pool.items():
try:
await pooled.connection.disconnect()
except Exception:
pass
self._pool.clear()
async def acquire(self) -> "LiveKitConnectionHandler":
"""Acquire a connection from the pool."""
async with self._lock:
# Find available connection
for conn_id, pooled in self._pool.items():
if not pooled.in_use:
pooled.in_use = True
pooled.last_used = time.time()
return pooled.connection
# Create new if under limit
if len(self._pool) < self.max_connections:
conn = await self._create_connection()
conn_id = id(conn)
self._pool[conn_id].in_use = True
return conn
# Pool exhausted
raise RuntimeError("Connection pool exhausted")
async def release(self, connection: "LiveKitConnectionHandler") -> None:
"""Release a connection back to the pool."""
async with self._lock:
conn_id = id(connection)
if conn_id in self._pool:
self._pool[conn_id].in_use = False
self._pool[conn_id].last_used = time.time()
async def _create_connection(self) -> "LiveKitConnectionHandler":
"""Create a new pooled connection."""
from bridge.livekit.connection_handler import LiveKitConnectionHandler
conn = LiveKitConnectionHandler(
config=self.config,
call_id=f"pool_{len(self._pool)}",
)
# Pre-initialize but don't join room yet
self._pool[id(conn)] = PooledConnection(
connection=conn,
created_at=time.time(),
last_used=time.time(),
)
return conn
async def _maintenance_loop(self) -> None:
"""Periodic pool maintenance."""
while True:
try:
await asyncio.sleep(60)
await self._cleanup_idle()
await self._ensure_minimum()
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Pool maintenance error: {e}")
async def _cleanup_idle(self) -> None:
"""Remove idle connections beyond minimum."""
async with self._lock:
now = time.time()
to_remove = []
for conn_id, pooled in self._pool.items():
if (not pooled.in_use and
now - pooled.last_used > self.max_idle_seconds and
len(self._pool) > self.min_connections):
to_remove.append(conn_id)
for conn_id in to_remove:
pooled = self._pool.pop(conn_id)
try:
await pooled.connection.disconnect()
except Exception:
pass
async def _ensure_minimum(self) -> None:
"""Ensure minimum connections are available."""
async with self._lock:
available = sum(1 for p in self._pool.values() if not p.in_use)
while available < self.min_connections:
await self._create_connection()
available += 1
- Monitoring and Debugging \
14.1 Metrics Collection \
# bridge/monitoring/metrics.py
from prometheus_client import Counter, Histogram, Gauge
import time
# Connection metrics
CONNECTIONS_TOTAL = Counter(
'webrtc_bridge_connections_total',
'Total WebRTC connections',
['direction', 'result'] # inbound/outbound, success/failed
)
CONNECTIONS_ACTIVE = Gauge(
'webrtc_bridge_connections_active',
'Currently active connections'
)
CONNECTION_DURATION = Histogram(
'webrtc_bridge_connection_duration_seconds',
'Connection duration',
buckets=[1, 5, 30, 60, 300, 600, 1800, 3600]
)
# Latency metrics
AUDIO_LATENCY = Histogram(
'webrtc_bridge_audio_latency_ms',
'Audio processing latency',
['direction'], # inbound/outbound
buckets=[1, 2, 5, 10, 20, 50, 100, 200]
)
ICE_GATHERING_TIME = Histogram(
'webrtc_bridge_ice_gathering_seconds',
'ICE gathering duration',
buckets=[0.1, 0.5, 1, 2, 5, 10]
)
CONNECTION_SETUP_TIME = Histogram(
'webrtc_bridge_connection_setup_seconds',
'Time to establish connection',
buckets=[0.5, 1, 2, 3, 5, 10, 30]
)
# Audio metrics
AUDIO_FRAMES_PROCESSED = Counter(
'webrtc_bridge_audio_frames_total',
'Audio frames processed',
['direction']
)
AUDIO_BUFFER_LEVEL = Gauge(
'webrtc_bridge_audio_buffer_ms',
'Current audio buffer level',
['direction']
)
PACKET_LOSS_RATE = Gauge(
'webrtc_bridge_packet_loss_rate',
'Current packet loss rate',
['peer'] # goto/livekit
)
# Error metrics
ERRORS_TOTAL = Counter(
'webrtc_bridge_errors_total',
'Total errors',
['category', 'code']
)
ICE_RESTARTS = Counter(
'webrtc_bridge_ice_restarts_total',
'ICE restart attempts',
['result']
)
class BridgeMetricsCollector:
"""Collects and exposes bridge metrics."""
def __init__(self, call_id: str):
self.call_id = call_id
self._start_time = time.time()
def record_connection_start(self, direction: str) -> None:
"""Record connection attempt."""
CONNECTIONS_ACTIVE.inc()
def record_connection_established(self, direction: str) -> None:
"""Record successful connection."""
CONNECTIONS_TOTAL.labels(direction=direction, result='success').inc()
CONNECTION_SETUP_TIME.observe(time.time() - self._start_time)
def record_connection_failed(self, direction: str) -> None:
"""Record failed connection."""
CONNECTIONS_TOTAL.labels(direction=direction, result='failed').inc()
CONNECTIONS_ACTIVE.dec()
def record_connection_ended(self) -> None:
"""Record connection end."""
CONNECTIONS_ACTIVE.dec()
CONNECTION_DURATION.observe(time.time() - self._start_time)
def record_audio_latency(self, direction: str, latency_ms: float) -> None:
"""Record audio processing latency."""
AUDIO_LATENCY.labels(direction=direction).observe(latency_ms)
def record_audio_frame(self, direction: str) -> None:
"""Record processed audio frame."""
AUDIO_FRAMES_PROCESSED.labels(direction=direction).inc()
def update_buffer_level(self, direction: str, level_ms: float) -> None:
"""Update audio buffer level."""
AUDIO_BUFFER_LEVEL.labels(direction=direction).set(level_ms)
def record_error(self, category: str, code: str) -> None:
"""Record an error."""
ERRORS_TOTAL.labels(category=category, code=code).inc()
14.2 Structured Logging \
# bridge/monitoring/logging.py
import structlog
from typing import Any, Dict
def configure_logging() -> None:
"""Configure structured logging for the bridge."""
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.UnicodeDecoder(),
structlog.processors.JSONRenderer()
],
wrapper_class=structlog.stdlib.BoundLogger,
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
cache_logger_on_first_use=True,
)
def get_bridge_logger(call_id: str) -> structlog.BoundLogger:
"""Get a logger bound to a specific call."""
return structlog.get_logger().bind(call_id=call_id)
class BridgeLogContext:
"""Context manager for bridge logging."""
def __init__(self, call_id: str):
self.call_id = call_id
self.logger = get_bridge_logger(call_id)
self._context: Dict[str, Any] = {}
def bind(self, **kwargs) -> "BridgeLogContext":
"""Add context to logger."""
self._context.update(kwargs)
self.logger = self.logger.bind(**kwargs)
return self
def info(self, message: str, **kwargs) -> None:
self.logger.info(message, **kwargs)
def debug(self, message: str, **kwargs) -> None:
self.logger.debug(message, **kwargs)
def warning(self, message: str, **kwargs) -> None:
self.logger.warning(message, **kwargs)
def error(self, message: str, **kwargs) -> None:
self.logger.error(message, **kwargs)
def log_audio_frame(
self,
direction: str,
samples: int,
latency_ms: float,
) -> None:
"""Log audio frame processing (sampled)."""
self.logger.debug(
"audio_frame",
direction=direction,
samples=samples,
latency_ms=round(latency_ms, 2),
)
def log_ice_event(self, event: str, **details) -> None:
"""Log ICE-related event."""
self.logger.info(
"ice_event",
event=event,
**details
)
def log_connection_state(self, state: str, **details) -> None:
"""Log connection state change."""
self.logger.info(
"connection_state_change",
state=state,
**details
)
14.3 Debug Tools \
# bridge/monitoring/debug.py
import asyncio
from typing import Dict, Any, Optional
from dataclasses import dataclass, asdict
import json
@dataclass
class BridgeSnapshot:
"""Point-in-time snapshot of bridge state."""
call_id: str
timestamp: float
# Connection state
goto_connection_state: str
goto_ice_state: str
livekit_connected: bool
# Audio state
inbound_buffer_ms: float
outbound_buffer_ms: float
frames_received: int
frames_sent: int
# Quality metrics
estimated_latency_ms: float
packet_loss_percent: float
jitter_ms: float
# Error state
consecutive_errors: int
last_error: Optional[str]
class BridgeDebugger:
"""
Debug tools for inspecting bridge state.
Provides real-time inspection and diagnostic
capabilities for troubleshooting.
"""
def __init__(self, bridge: "AudioBridge"):
self.bridge = bridge
self._snapshots: list[BridgeSnapshot] = []
self._max_snapshots = 100
self._recording = False
def take_snapshot(self) -> BridgeSnapshot:
"""Take a snapshot of current state."""
snapshot = BridgeSnapshot(
call_id=self.bridge.call_id,
timestamp=time.time(),
goto_connection_state=self.bridge._goto.connection_state.value,
goto_ice_state=self.bridge._goto.ice_connection_state.value,
livekit_connected=self.bridge._livekit.is_connected,
inbound_buffer_ms=self.bridge._inbound_buffer.available_ms,
outbound_buffer_ms=self.bridge._outbound_buffer.available_ms,
frames_received=self.bridge.lifecycle_manager.metrics.total_audio_frames_received,
frames_sent=self.bridge.lifecycle_manager.metrics.total_audio_frames_sent,
estimated_latency_ms=0, # Would calculate from actual measurements
packet_loss_percent=0,
jitter_ms=0,
consecutive_errors=0,
last_error=None,
)
if self._recording:
self._snapshots.append(snapshot)
if len(self._snapshots) > self._max_snapshots:
self._snapshots.pop(0)
return snapshot
def start_recording(self) -> None:
"""Start recording snapshots."""
self._recording = True
self._snapshots.clear()
def stop_recording(self) -> list[BridgeSnapshot]:
"""Stop recording and return snapshots."""
self._recording = False
return self._snapshots.copy()
def dump_state(self) -> str:
"""Dump current state as JSON."""
snapshot = self.take_snapshot()
return json.dumps(asdict(snapshot), indent=2)
async def run_diagnostics(self) -> Dict[str, Any]:
"""Run diagnostic checks."""
results = {
"call_id": self.bridge.call_id,
"checks": {}
}
# Check GoTo connection
results["checks"]["goto_connected"] = (
self.bridge._goto.is_connected
)
# Check LiveKit connection
results["checks"]["livekit_connected"] = (
self.bridge._livekit.is_connected
)
# Check audio flow
results["checks"]["audio_flowing"] = (
self.bridge.lifecycle_manager.metrics.total_audio_frames_received > 0
)
# Check buffer health
inbound_ms = self.bridge._inbound_buffer.available_ms
results["checks"]["buffer_healthy"] = 20 <= inbound_ms <= 200
results["buffer_level_ms"] = inbound_ms
# Overall health
results["healthy"] = all(results["checks"].values())
return results
- Testing Strategy \
15.1 Test Categories \
┌─────────────────────────────────────────────────────────────────────────────┐
│ TEST CATEGORIES │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ UNIT TESTS │
│ ─────────── │
│ • Audio buffer operations │
│ • Codec encoding/decoding │
│ • SDP parsing and generation │
│ • ICE candidate parsing │
│ • Resampling accuracy │
│ │
│ INTEGRATION TESTS │
│ ───────────────── │
│ • aiortc peer connection establishment │
│ • LiveKit room join/leave │
│ • Audio track publish/subscribe │
│ • Full bridge audio flow │
│ │
│ END-TO-END TESTS │
│ ──────────────── │
│ • Complete call flow with mock GoTo │
│ • Real GoTo sandbox calls │
│ • Multi-call concurrency │
│ • Long-duration stability │
│ │
│ PERFORMANCE TESTS │
│ ───────────────── │
│ • Latency benchmarks │
│ • Throughput limits │
│ • Memory usage under load │
│ • CPU profiling │
│ │
│ CHAOS TESTS │
│ ──────────── │
│ • Network partition simulation │
│ • Packet loss injection │
│ • High latency simulation │
│ • Resource exhaustion │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
15.2 Unit Test Examples \
# tests/unit/test_audio_buffer.py
import pytest
import numpy as np
import asyncio
from bridge.performance.buffer import OptimizedAudioBuffer
@pytest.fixture
def audio_buffer():
return OptimizedAudioBuffer(
max_duration_ms=100,
sample_rate=48000,
channels=1,
)
@pytest.mark.asyncio
async def test_write_and_read(audio_buffer):
"""Test basic write and read operations."""
# Write 20ms of audio (960 samples at 48kHz)
samples = np.random.randint(-32768, 32767, (960, 1), dtype=np.int16)
written = await audio_buffer.write(samples)
assert written == 960
assert audio_buffer.available_samples == 960
# Read back
read_samples = await audio_buffer.read(960)
assert read_samples.shape == (960, 1)
np.testing.assert_array_equal(read_samples, samples)
@pytest.mark.asyncio
async def test_buffer_overflow(audio_buffer):
"""Test buffer overflow handling."""
# Buffer holds 100ms = 4800 samples
# Write 150ms worth
samples = np.random.randint(-32768, 32767, (7200, 1), dtype=np.int16)
written = await audio_buffer.write(samples)
assert written == 7200
# Should only have last 100ms
assert audio_buffer.available_samples == 4800
@pytest.mark.asyncio
async def test_read_underflow(audio_buffer):
"""Test reading more than available."""
# Write 10ms
samples = np.random.randint(-32768, 32767, (480, 1), dtype=np.int16)
await audio_buffer.write(samples)
# Request 20ms
read_samples = await audio_buffer.read(960)
# Should get data + silence
assert read_samples.shape == (960, 1)
np.testing.assert_array_equal(read_samples[:480], samples)
np.testing.assert_array_equal(read_samples[480:], np.zeros((480, 1)))
# tests/unit/test_sdp_negotiator.py
import pytest
from bridge.webrtc.sdp_negotiator import SDPNegotiator, ParsedSDP
@pytest.fixture
def negotiator():
return SDPNegotiator()
def test_parse_sdp_offer(negotiator):
"""Test parsing a typical SDP offer."""
sdp = """v=0
o=- 123456 1 IN IP4 0.0.0.0
s=-
t=0 0
a=group:BUNDLE 0
m=audio 9 UDP/TLS/RTP/SAVPF 111 0 8
c=IN IP4 0.0.0.0
a=mid:0
a=ice-ufrag:test123
a=ice-pwd:testpassword12345678
a=rtpmap:111 opus/48000/2
a=rtpmap:0 PCMU/8000
a=rtpmap:8 PCMA/8000
a=sendrecv
"""
parsed = negotiator.parse_sdp(sdp)
assert parsed.version == 0
assert len(parsed.media) == 1
assert parsed.media[0].media_type == "audio"
assert len(parsed.media[0].codecs) == 3
assert parsed.media[0].ice_ufrag == "test123"
def test_negotiate_codecs_prefers_opus(negotiator):
"""Test that Opus is preferred when available."""
from bridge.webrtc.sdp_negotiator import CodecInfo
offered = [
CodecInfo(0, "PCMU", 8000, 1),
CodecInfo(111, "opus", 48000, 2),
CodecInfo(8, "PCMA", 8000, 1),
]
negotiated = negotiator.negotiate_codecs(offered)
assert len(negotiated) == 3
assert negotiated[0].name == "opus" # First preference
def test_generate_answer(negotiator):
"""Test SDP answer generation."""
# Create minimal parsed offer
from bridge.webrtc.sdp_negotiator import ParsedSDP, MediaDescription, CodecInfo
offer = ParsedSDP(
version=0,
origin="",
session_name="",
timing="0 0",
media=[
MediaDescription(
media_type="audio",
port=9,
protocol="UDP/TLS/RTP/SAVPF",
formats=[111],
codecs=[CodecInfo(111, "opus", 48000, 2)],
ice_ufrag="remote",
ice_pwd="remotepassword",
setup="actpass",
mid="0",
)
],
)
answer = negotiator.generate_answer(
offer,
local_fingerprint="sha-256 AA:BB:CC",
local_ice_ufrag="local",
local_ice_pwd="localpassword",
)
assert "v=0" in answer
assert "a=ice-ufrag:local" in answer
assert "opus" in answer
15.3 Integration Test Examples \
# tests/integration/test_bridge_audio_flow.py
import pytest
import asyncio
import numpy as np
from unittest.mock import AsyncMock, MagicMock
@pytest.fixture
async def mock_bridge():
"""Create a bridge with mocked external connections."""
from bridge.audio_bridge import AudioBridge, BridgeConfig
from bridge.goto.connection_handler import GoToCallInfo
# Create bridge with mocked dependencies
bridge = AudioBridge(
call_id="test_call_001",
call_info=GoToCallInfo(
call_id="test_call_001",
external_call_id="goto_123",
direction="inbound",
caller_number="+15551234567",
callee_number="+15559876543",
line_id="line_001",
started_at=0,
),
goto_client=AsyncMock(),
event_listener=AsyncMock(),
livekit_config=MagicMock(),
)
# Mock the internal connections
bridge._goto = AsyncMock()
bridge._goto.is_connected = True
bridge._livekit = AsyncMock()
bridge._livekit.is_connected = True
return bridge
@pytest.mark.asyncio
async def test_inbound_audio_flow(mock_bridge):
"""Test audio flowing from GoTo to LiveKit."""
received_audio = []
# Capture audio sent to LiveKit
async def capture_audio(data):
received_audio.append(data)
mock_bridge._livekit.publish_audio = capture_audio
# Simulate incoming audio from GoTo
test_audio = np.random.randint(-1000, 1000, 960, dtype=np.int16)
# Create mock frame
mock_frame = MagicMock()
mock_frame.to_ndarray.return_value = test_audio.reshape(1, -1)
mock_frame.sample_rate = 48000
# Process the frame
await mock_bridge._handle_goto_audio(mock_frame)
# Verify audio was forwarded
assert len(received_audio) > 0
@pytest.mark.asyncio
async def test_outbound_audio_flow(mock_bridge):
"""Test audio flowing from LiveKit to GoTo."""
# Start the bridge
mock_bridge._running = True
# Add audio to outbound buffer
test_audio = np.random.randint(-1000, 1000, 960, dtype=np.int16)
await mock_bridge._outbound_buffer.write(test_audio.reshape(-1, 1))
# Verify buffer has audio
assert mock_bridge._outbound_buffer.available_samples == 960
15.4 Load Test Framework \
# tests/load/bridge_load_test.py
import asyncio
import time
from dataclasses import dataclass
from typing import List
import statistics
@dataclass
class LoadTestResult:
"""Results from a load test run."""
total_calls: int
successful_calls: int
failed_calls: int
avg_setup_time_ms: float
p95_setup_time_ms: float
max_concurrent: int
duration_seconds: float
class BridgeLoadTester:
"""Load testing framework for the WebRTC bridge."""
def __init__(
self,
bridge_manager: "BridgeManager",
concurrent_calls: int = 10,
call_duration_seconds: float = 30,
total_calls: int = 100,
):
self.bridge_manager = bridge_manager
self.concurrent_calls = concurrent_calls
self.call_duration_seconds = call_duration_seconds
self.total_calls = total_calls
self._setup_times: List[float] = []
self._successful = 0
self._failed = 0
self._max_concurrent = 0
async def run(self) -> LoadTestResult:
"""Run the load test."""
start_time = time.time()
# Create semaphore for concurrency control
semaphore = asyncio.Semaphore(self.concurrent_calls)
# Create all call tasks
tasks = [
self._simulate_call(i, semaphore)
for i in range(self.total_calls)
]
# Run all tasks
await asyncio.gather(*tasks, return_exceptions=True)
duration = time.time() - start_time
return LoadTestResult(
total_calls=self.total_calls,
successful_calls=self._successful,
failed_calls=self._failed,
avg_setup_time_ms=statistics.mean(self._setup_times) if self._setup_times else 0,
p95_setup_time_ms=self._percentile(self._setup_times, 95),
max_concurrent=self._max_concurrent,
duration_seconds=duration,
)
async def _simulate_call(
self,
call_index: int,
semaphore: asyncio.Semaphore,
) -> None:
"""Simulate a single call."""
async with semaphore:
call_id = f"load_test_{call_index}"
# Track concurrency
current = self.bridge_manager.active_bridges
self._max_concurrent = max(self._max_concurrent, current + 1)
try:
setup_start = time.time()
# Create bridge
bridge = await self.bridge_manager.create_bridge(
call_id=call_id,
call_info=self._create_test_call_info(call_index),
)
setup_time = (time.time() - setup_start) * 1000
self._setup_times.append(setup_time)
# Simulate call duration
await asyncio.sleep(self.call_duration_seconds)
self._successful += 1
except Exception as e:
self._failed += 1
finally:
# Clean up
await self.bridge_manager.remove_bridge(call_id)
def _create_test_call_info(self, index: int) -> "GoToCallInfo":
"""Create test call info."""
from bridge.goto.connection_handler import GoToCallInfo
return GoToCallInfo(
call_id=f"load_test_{index}",
external_call_id=f"goto_load_{index}",
direction="inbound",
caller_number=f"+1555000{index:04d}",
callee_number="+15559999999",
line_id="test_line",
started_at=time.time(),
)
@staticmethod
def _percentile(data: List[float], percentile: int) -> float:
"""Calculate percentile."""
if not data:
return 0
sorted_data = sorted(data)
index = int(len(sorted_data) * percentile / 100)
return sorted_data[min(index, len(sorted_data) - 1)]
- Appendix \
16.1 SDP Reference \
┌─────────────────────────────────────────────────────────────────────────────┐
│ SDP QUICK REFERENCE │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ SESSION DESCRIPTION: │
│ v=0 Protocol version │
│ o=- 123 1 IN IP4 0.0.0.0 Origin (session ID, version) │
│ s=- Session name │
│ t=0 0 Timing (start/end) │
│ │
│ MEDIA DESCRIPTION: │
│ m=audio 9 UDP/TLS/RTP/SAVPF 111 0 │
│ └─────┘ └┘ └─────────────┘ └────┘ │
│ type port protocol formats │
│ │
│ COMMON ATTRIBUTES: │
│ a=rtpmap:111 opus/48000/2 Codec mapping │
│ a=fmtp:111 useinbandfec=1 Format parameters │
│ a=ice-ufrag:xxxx ICE credentials │
│ a=ice-pwd:yyyy ICE password │
│ a=fingerprint:sha-256 AA:BB DTLS fingerprint │
│ a=setup:actpass DTLS role │
│ a=mid:0 Media ID for BUNDLE │
│ a=sendrecv Direction │
│ a=rtcp-mux Multiplex RTP/RTCP │
│ │
│ ICE CANDIDATE: │
│ a=candidate:1 1 UDP 2122260223 192.168.1.1 54321 typ host │
│ └─┘ └┘ └─┘ └────────┘ └─────────┘ └───┘ └─────┘ │
│ found comp proto priority IP port type │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
16.2 Audio Format Reference \
┌─────────────────────────────────────────────────────────────────────────────┐
│ AUDIO FORMAT REFERENCE │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ FORMAT SAMPLE RATE CHANNELS BITS BITRATE │
│ ───────────────────────────────────────────────────────────── │
│ Opus 48000 Hz 2 16 6-510 kbps (VBR) │
│ PCMU (G.711) 8000 Hz 1 8 64 kbps │
│ PCMA (G.711) 8000 Hz 1 8 64 kbps │
│ PCM (raw) 16000 Hz 1 16 256 kbps │
│ │
│ FRAME SIZES (20ms): │
│ • 48kHz stereo: 1920 samples (3840 bytes) │
│ • 48kHz mono: 960 samples (1920 bytes) │
│ • 16kHz mono: 320 samples (640 bytes) │
│ • 8kHz mono: 160 samples (320 bytes) │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Document Revision History \
| Version | Date | Author | Changes |
|---|---|---|---|
| 1.0 | 2026-01-16 | Engineering | Initial draft |