Skip to main content
Normalized for Mintlify from knowledge-base/aiconnected-apps-and-modules/modules/aiConnected-voice/webrtc-bridge-technical-design.mdx.

Voice by aiConnected — WebRTC Bridge Technical Design \

Document Information \

FieldValue
Document IDARCH-004
Version1.0
Last Updated2026-01-16
StatusDraft
OwnerEngineering
DependenciesARCH-001, ARCH-002, ARCH-003

Table of Contents \

Voice by aiConnected — WebRTC Bridge Technical Design Document Information Table of Contents 1. Introduction 1.1 Purpose 1.2 Scope 1.3 Design Goals 1.4 Key Terminology 2. Architecture Overview 2.1 High-Level Architecture 2.2 Component Responsibilities 2.3 Data Flow Summary 2.4 Threading Model 3. aiortc Fundamentals 3.1 aiortc Overview 3.2 Core Classes 3.3 RTCPeerConnection Lifecycle 3.4 Basic aiortc Setup 3.5 Custom Audio Tracks 4. SDP Exchange 4.1 SDP Overview 4.2 SDP Structure 4.3 SDP Offer/Answer Flow with GoToConnect 4.4 SDP Negotiator Implementation 4.5 SDP Examples 5. ICE Candidate Handling 5.1 ICE Overview 5.2 ICE Connection Process 5.3 ICE Manager Implementation 5.4 Trickle ICE Flow 6. Audio Frame Processing 6.1 Audio Frame Fundamentals 6.2 PyAV AudioFrame 6.3 Audio Resampling 6.4 Audio Format Conversion 7. Codec Management 7.1 Supported Codecs 7.2 Codec Handler Implementation 7.3 Codec Negotiation Strategy 7.4 Packet Loss Concealment 8. GoToConnect Integration 8.1 GoTo WebRTC Flow 8.2 GoTo Connection Handler 9. LiveKit Integration 9.1 LiveKit Overview 9.2 LiveKit Room Architecture 9.3 LiveKit Connection Handler 10. Bidirectional Bridge 10.1 Bridge Architecture 10.2 Audio Bridge Implementation 10.3 Audio Forking for STT 11. Connection Lifecycle Management 11.1 Complete Call Lifecycle 11.2 Lifecycle Manager Implementation 11.3 Graceful Shutdown 12. Error Handling and Recovery 12.1 Error Categories 12.2 Error Handler Implementation 12.3 ICE Restart Manager 13. Performance Optimization 13.1 Performance Targets 13.2 Audio Buffer Optimization 13.3 Connection Pooling 14. Monitoring and Debugging 14.1 Metrics Collection 14.2 Structured Logging 14.3 Debug Tools 15. Testing Strategy 15.1 Test Categories 15.2 Unit Test Examples 15.3 Integration Test Examples 15.4 Load Test Framework 16. Appendix 16.1 SDP Reference 16.2 Audio Format Reference Document Revision History

  1. Introduction \

1.1 Purpose \

This document provides the technical design for the WebRTC Bridge component in Voice by aiConnected. The WebRTC Bridge is the critical infrastructure that connects telephone calls from GoToConnect to our AI processing pipeline via LiveKit. The bridge must handle real-time audio with strict latency requirements while managing the complexity of two different WebRTC implementations, codec negotiation, and bidirectional audio streaming.

1.2 Scope \

This document covers:
  • aiortc library implementation for WebRTC
  • SDP offer/answer exchange with GoToConnect
  • ICE candidate handling and connectivity
  • Audio frame capture and injection
  • Codec negotiation and transcoding
  • LiveKit room integration
  • Bidirectional audio bridging
  • Error handling and recovery
This document does not cover:
  • GoToConnect API authentication (see ARCH-002)
  • Voice pipeline processing (see ARCH-003)
  • LiveKit server deployment

1.3 Design Goals \

GoalTargetPriority
Audio latency< 50ms bridge overheadCritical
Connection setup< 2 secondsCritical
Audio qualityNo degradationHigh
Reliability99.9% call completionHigh
Scalability1000 concurrent callsMedium
Resource efficiency< 50MB RAM per callMedium

1.4 Key Terminology \

TermDefinition
WebRTCWeb Real-Time Communication - protocol for real-time audio/video
SDPSession Description Protocol - describes media sessions
ICEInteractive Connectivity Establishment - NAT traversal
STUNSession Traversal Utilities for NAT - discover public IP
TURNTraversal Using Relays around NAT - relay server
RTPReal-time Transport Protocol - media transport
RTCPRTP Control Protocol - quality feedback
aiortcPython asyncio WebRTC implementation
LiveKitOpen-source WebRTC SFU for scalable real-time apps

  1. Architecture Overview \

2.1 High-Level Architecture \

┌─────────────────────────────────────────────────────────────────────────────┐
│                      WEBRTC BRIDGE ARCHITECTURE                             │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│                              PSTN NETWORK                                   │
│                                   │                                         │
│                                   ▼                                         │
│   ┌─────────────────────────────────────────────────────────────────────┐   │
│   │                        GoToConnect                                   │   │
│   │                                                                      │   │
│   │  ┌──────────────┐    ┌──────────────┐    ┌──────────────┐          │   │
│   │  │   SIP/PSTN   │───▶│   Media      │───▶│   WebRTC     │          │   │
│   │  │   Gateway    │    │   Server     │    │   Endpoint   │          │   │
│   │  └──────────────┘    └──────────────┘    └──────┬───────┘          │   │
│   │                                                  │                   │   │
│   └──────────────────────────────────────────────────┼───────────────────┘   │
│                                                      │                       │
│                                          WebRTC (DTLS-SRTP)                 │
│                                                      │                       │
│   ┌──────────────────────────────────────────────────┼───────────────────┐   │
│   │                     WEBRTC BRIDGE                │                   │   │
│   │                                                  │                   │   │
│   │   ┌──────────────────────────────────────────────▼─────────────┐    │   │
│   │   │                   GoTo Connection                           │    │   │
│   │   │                                                             │    │   │
│   │   │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐        │    │   │
│   │   │  │    SDP      │  │    ICE      │  │   Audio     │        │    │   │
│   │   │  │  Negotiator │  │   Agent     │  │   Track     │        │    │   │
│   │   │  └─────────────┘  └─────────────┘  └──────┬──────┘        │    │   │
│   │   │                                           │                │    │   │
│   │   └───────────────────────────────────────────┼────────────────┘    │   │
│   │                                               │                      │   │
│   │   ┌───────────────────────────────────────────▼────────────────┐    │   │
│   │   │                   AUDIO BRIDGE                              │    │   │
│   │   │                                                             │    │   │
│   │   │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐        │    │   │
│   │   │  │   Decoder   │  │  Resampler  │  │   Encoder   │        │    │   │
│   │   │  │  (Opus/G711)│  │  (48k↔16k)  │  │  (Opus)     │        │    │   │
│   │   │  └─────────────┘  └─────────────┘  └─────────────┘        │    │   │
│   │   │                                                             │    │   │
│   │   │  ┌─────────────────────────────────────────────────────┐   │    │   │
│   │   │  │              Bidirectional Buffer                    │   │    │   │
│   │   │  │         GoTo → Pipeline    Pipeline → GoTo           │   │    │   │
│   │   │  └─────────────────────────────────────────────────────┘   │    │   │
│   │   │                                                             │    │   │
│   │   └───────────────────────────────────────────┬────────────────┘    │   │
│   │                                               │                      │   │
│   │   ┌───────────────────────────────────────────▼────────────────┐    │   │
│   │   │                  LiveKit Connection                         │    │   │
│   │   │                                                             │    │   │
│   │   │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐        │    │   │
│   │   │  │    Room     │  │   Audio     │  │   Audio     │        │    │   │
│   │   │  │   Client    │  │   Source    │  │   Sink      │        │    │   │
│   │   │  └─────────────┘  └─────────────┘  └─────────────┘        │    │   │
│   │   │                                                             │    │   │
│   │   └─────────────────────────────────────────────────────────────┘    │   │
│   │                                                                      │   │
│   └──────────────────────────────────────────────────────────────────────┘   │
│                                                      │                       │
│                                          WebRTC (DTLS-SRTP)                 │
│                                                      │                       │
│   ┌──────────────────────────────────────────────────┼───────────────────┐   │
│   │                      LiveKit SFU                 │                   │   │
│   │                                                  ▼                   │   │
│   │  ┌──────────────┐    ┌──────────────┐    ┌──────────────┐          │   │
│   │  │    Room      │◀──▶│   Selective  │◀──▶│   Agent      │          │   │
│   │  │   Manager    │    │   Forwarder  │    │   Worker     │          │   │
│   │  └──────────────┘    └──────────────┘    └──────────────┘          │   │
│   │                                                                      │   │
│   └──────────────────────────────────────────────────────────────────────┘   │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

2.2 Component Responsibilities \

ComponentResponsibility
GoTo ConnectionManage WebRTC session with GoToConnect
SDP NegotiatorHandle offer/answer exchange
ICE AgentManage connectivity establishment
Audio TrackReceive/send RTP audio packets
Audio BridgeConvert and route audio between endpoints
DecoderDecode incoming audio (Opus/G.711)
ResamplerConvert sample rates (48kHz ↔ 16kHz)
EncoderEncode outgoing audio (Opus)
LiveKit ConnectionManage connection to LiveKit room
Room ClientJoin/leave LiveKit rooms
Audio SourcePublish audio to LiveKit
Audio SinkSubscribe to audio from LiveKit

2.3 Data Flow Summary \

┌─────────────────────────────────────────────────────────────────────────────┐
│                         AUDIO DATA FLOW                                     │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   INBOUND (Caller → AI):                                                    │
│   ─────────────────────                                                     │
│                                                                             │
│   GoToConnect           Bridge                    LiveKit                   │
│       │                    │                         │                      │
│       │   RTP/Opus or      │                         │                      │
│       │   RTP/PCMU         │                         │                      │
│       │──────────────────▶ │                         │                      │
│       │                    │  decode                 │                      │
│       │                    │  ──────▶ PCM 48kHz      │                      │
│       │                    │         │               │                      │
│       │                    │  resample               │                      │
│       │                    │  ──────▶ PCM 16kHz      │                      │
│       │                    │         │               │                      │
│       │                    │         │──────────────▶│ (to STT)             │
│       │                    │         │               │                      │
│       │                    │  encode                 │                      │
│       │                    │  ──────▶ Opus 48kHz     │                      │
│       │                    │         │               │                      │
│       │                    │         │──────────────▶│ (to Room)            │
│       │                    │                         │                      │
│                                                                             │
│   OUTBOUND (AI → Caller):                                                   │
│   ──────────────────────                                                    │
│                                                                             │
│   GoToConnect           Bridge                    LiveKit                   │
│       │                    │                         │                      │
│       │                    │         │◀──────────────│ (from TTS)           │
│       │                    │         │               │                      │
│       │                    │  PCM 24kHz              │                      │
│       │                    │         │               │                      │
│       │                    │  resample               │                      │
│       │                    │  ──────▶ PCM 48kHz      │                      │
│       │                    │         │               │                      │
│       │                    │  encode                 │                      │
│       │                    │  ──────▶ Opus/PCMU      │                      │
│       │                    │         │               │                      │
│       │ ◀─────────────────────────────               │                      │
│       │   RTP                        │               │                      │
│       │                              │               │                      │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

2.4 Threading Model \

┌─────────────────────────────────────────────────────────────────────────────┐
│                         THREADING MODEL                                     │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   ┌─────────────────────────────────────────────────────────────────────┐   │
│   │                      MAIN ASYNCIO EVENT LOOP                        │   │
│   │                                                                     │   │
│   │   ┌─────────────┐  ┌─────────────┐  ┌─────────────┐               │   │
│   │   │  WebRTC     │  │  LiveKit    │  │  Control    │               │   │
│   │   │  Signaling  │  │  Signaling  │  │  Logic      │               │   │
│   │   └─────────────┘  └─────────────┘  └─────────────┘               │   │
│   │                                                                     │   │
│   └─────────────────────────────────────────────────────────────────────┘   │
│                                                                             │
│   ┌─────────────────────────────────────────────────────────────────────┐   │
│   │                      AIORTC MEDIA THREAD                            │   │
│   │                                                                     │   │
│   │   ┌─────────────┐  ┌─────────────┐  ┌─────────────┐               │   │
│   │   │  RTP        │  │  Codec      │  │  RTCP       │               │   │
│   │   │  Processing │  │  Encode/    │  │  Processing │               │   │
│   │   │             │  │  Decode     │  │             │               │   │
│   │   └─────────────┘  └─────────────┘  └─────────────┘               │   │
│   │                                                                     │   │
│   └─────────────────────────────────────────────────────────────────────┘   │
│                                                                             │
│   ┌─────────────────────────────────────────────────────────────────────┐   │
│   │                      AUDIO PROCESSING THREAD                        │   │
│   │                                                                     │   │
│   │   ┌─────────────┐  ┌─────────────┐  ┌─────────────┐               │   │
│   │   │  Resampling │  │  Buffer     │  │  Format     │               │   │
│   │   │             │  │  Management │  │  Conversion │               │   │
│   │   └─────────────┘  └─────────────┘  └─────────────┘               │   │
│   │                                                                     │   │
│   └─────────────────────────────────────────────────────────────────────┘   │
│                                                                             │
│   COMMUNICATION:                                                            │
│   • asyncio.Queue for cross-thread audio transfer                          │
│   • Thread-safe buffers for frame handoff                                  │
│   • Events for synchronization                                             │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

  1. aiortc Fundamentals \

3.1 aiortc Overview \

aiortc is a Python implementation of WebRTC and ORTC (Object Real-Time Communications). It provides:
  • Full WebRTC stack in pure Python
  • asyncio-based for non-blocking I/O
  • Support for audio/video/data channels
  • Built-in codecs (Opus, VP8, H.264)

3.2 Core Classes \

# bridge/webrtc/core.py

from aiortc import (
    RTCPeerConnection,
    RTCSessionDescription,
    RTCIceCandidate,
    RTCConfiguration,
    RTCIceServer,
    MediaStreamTrack,
)
from aiortc.contrib.media import MediaPlayer, MediaRecorder
from aiortc.mediastreams import AudioStreamTrack
import av

# Key aiortc classes we use:

# RTCPeerConnection - Main WebRTC connection object
# RTCSessionDescription - SDP offer/answer
# RTCIceCandidate - ICE candidate for connectivity
# RTCConfiguration - STUN/TURN server config
# MediaStreamTrack - Base class for audio/video tracks
# AudioStreamTrack - Audio-specific track implementation

3.3 RTCPeerConnection Lifecycle \

┌─────────────────────────────────────────────────────────────────────────────┐
│                    RTCPeerConnection STATE MACHINE                          │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│                              ┌─────────┐                                    │
│                              │   new   │                                    │
│                              └────┬────┘                                    │
│                                   │                                         │
│                          createOffer() or                                   │
│                          setRemoteDescription()                             │
│                                   │                                         │
│                                   ▼                                         │
│                           ┌───────────────┐                                 │
│                           │  connecting   │                                 │
│                           └───────┬───────┘                                 │
│                                   │                                         │
│                          ICE + DTLS complete                                │
│                                   │                                         │
│                                   ▼                                         │
│                           ┌───────────────┐                                 │
│                           │   connected   │◀──────────┐                    │
│                           └───────┬───────┘           │                    │
│                                   │              ICE restart               │
│                          ICE disconnected             │                    │
│                                   │                   │                    │
│                                   ▼                   │                    │
│                          ┌────────────────┐           │                    │
│                          │  disconnected  │───────────┘                    │
│                          └────────┬───────┘                                │
│                                   │                                         │
│                          ICE failed or                                      │
│                          close() called                                     │
│                                   │                                         │
│                                   ▼                                         │
│                           ┌───────────────┐                                 │
│                           │    closed     │                                 │
│                           └───────────────┘                                 │
│                                                                             │
│   SIGNALING STATES:                                                         │
│   • stable - No offer/answer in progress                                   │
│   • have-local-offer - Local offer created, awaiting answer                │
│   • have-remote-offer - Remote offer received, need to answer              │
│   • have-local-pranswer - Local provisional answer                         │
│   • have-remote-pranswer - Remote provisional answer                       │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

3.4 Basic aiortc Setup \

# bridge/webrtc/connection.py

import asyncio
from aiortc import (
    RTCPeerConnection,
    RTCConfiguration,
    RTCIceServer,
)
from dataclasses import dataclass
from typing import Optional, Callable, List

@dataclass
class WebRTCConfig:
    """Configuration for WebRTC connection."""
    stun_servers: List[str] = None
    turn_servers: List[dict] = None
    ice_transport_policy: str = "all"  # "all" or "relay"
    bundle_policy: str = "max-bundle"
    
    def __post_init__(self):
        if self.stun_servers is None:
            self.stun_servers = [
                "stun:stun.l.google.com:19302",
                "stun:stun1.l.google.com:19302",
            ]

class WebRTCConnection:
    """
    Wrapper around aiortc RTCPeerConnection.
    
    Provides a simplified interface for managing
    WebRTC connections with proper lifecycle handling.
    """
    
    def __init__(
        self,
        config: WebRTCConfig = None,
        call_id: str = None,
    ):
        self.config = config or WebRTCConfig()
        self.call_id = call_id
        
        self._pc: Optional[RTCPeerConnection] = None
        self._local_tracks: List[MediaStreamTrack] = []
        self._remote_tracks: List[MediaStreamTrack] = []
        
        # Event handlers
        self.on_track: Optional[Callable] = None
        self.on_ice_candidate: Optional[Callable] = None
        self.on_connection_state_change: Optional[Callable] = None
        self.on_ice_connection_state_change: Optional[Callable] = None
    
    async def initialize(self) -> None:
        """Initialize the peer connection."""
        ice_servers = self._build_ice_servers()
        
        rtc_config = RTCConfiguration(
            iceServers=ice_servers,
            iceTransportPolicy=self.config.ice_transport_policy,
            bundlePolicy=self.config.bundle_policy,
        )
        
        self._pc = RTCPeerConnection(configuration=rtc_config)
        
        # Set up event handlers
        self._pc.on("track", self._handle_track)
        self._pc.on("icecandidate", self._handle_ice_candidate)
        self._pc.on("connectionstatechange", self._handle_connection_state_change)
        self._pc.on("iceconnectionstatechange", self._handle_ice_connection_state_change)
        
        logger.info(f"[{self.call_id}] WebRTC connection initialized")
    
    def _build_ice_servers(self) -> List[RTCIceServer]:
        """Build ICE server configuration."""
        servers = []
        
        # Add STUN servers
        for url in self.config.stun_servers:
            servers.append(RTCIceServer(urls=[url]))
        
        # Add TURN servers
        if self.config.turn_servers:
            for turn in self.config.turn_servers:
                servers.append(RTCIceServer(
                    urls=[turn["url"]],
                    username=turn.get("username"),
                    credential=turn.get("credential"),
                ))
        
        return servers
    
    async def add_track(self, track: MediaStreamTrack) -> None:
        """Add a local track to the connection."""
        if self._pc:
            self._pc.addTrack(track)
            self._local_tracks.append(track)
            logger.debug(f"[{self.call_id}] Added track: {track.kind}")
    
    async def create_offer(self) -> str:
        """Create an SDP offer."""
        if not self._pc:
            raise RuntimeError("Connection not initialized")
        
        offer = await self._pc.createOffer()
        await self._pc.setLocalDescription(offer)
        
        logger.debug(f"[{self.call_id}] Created offer")
        return self._pc.localDescription.sdp
    
    async def create_answer(self) -> str:
        """Create an SDP answer."""
        if not self._pc:
            raise RuntimeError("Connection not initialized")
        
        answer = await self._pc.createAnswer()
        await self._pc.setLocalDescription(answer)
        
        logger.debug(f"[{self.call_id}] Created answer")
        return self._pc.localDescription.sdp
    
    async def set_remote_description(
        self,
        sdp: str,
        sdp_type: str,
    ) -> None:
        """Set the remote SDP description."""
        if not self._pc:
            raise RuntimeError("Connection not initialized")
        
        description = RTCSessionDescription(sdp=sdp, type=sdp_type)
        await self._pc.setRemoteDescription(description)
        
        logger.debug(f"[{self.call_id}] Set remote description: {sdp_type}")
    
    async def add_ice_candidate(
        self,
        candidate: str,
        sdp_mid: str,
        sdp_mline_index: int,
    ) -> None:
        """Add a remote ICE candidate."""
        if not self._pc:
            raise RuntimeError("Connection not initialized")
        
        ice_candidate = RTCIceCandidate(
            candidate=candidate,
            sdpMid=sdp_mid,
            sdpMLineIndex=sdp_mline_index,
        )
        
        await self._pc.addIceCandidate(ice_candidate)
        logger.debug(f"[{self.call_id}] Added ICE candidate")
    
    async def close(self) -> None:
        """Close the connection."""
        if self._pc:
            await self._pc.close()
            self._pc = None
        
        # Stop local tracks
        for track in self._local_tracks:
            track.stop()
        
        self._local_tracks.clear()
        self._remote_tracks.clear()
        
        logger.info(f"[{self.call_id}] WebRTC connection closed")
    
    # Event handlers
    
    def _handle_track(self, track: MediaStreamTrack) -> None:
        """Handle incoming track."""
        logger.info(f"[{self.call_id}] Received track: {track.kind}")
        self._remote_tracks.append(track)
        
        if self.on_track:
            asyncio.create_task(self.on_track(track))
    
    def _handle_ice_candidate(self, candidate: RTCIceCandidate) -> None:
        """Handle local ICE candidate."""
        if candidate and self.on_ice_candidate:
            asyncio.create_task(self.on_ice_candidate(candidate))
    
    def _handle_connection_state_change(self) -> None:
        """Handle connection state change."""
        state = self._pc.connectionState if self._pc else "closed"
        logger.info(f"[{self.call_id}] Connection state: {state}")
        
        if self.on_connection_state_change:
            asyncio.create_task(self.on_connection_state_change(state))
    
    def _handle_ice_connection_state_change(self) -> None:
        """Handle ICE connection state change."""
        state = self._pc.iceConnectionState if self._pc else "closed"
        logger.info(f"[{self.call_id}] ICE state: {state}")
        
        if self.on_ice_connection_state_change:
            asyncio.create_task(self.on_ice_connection_state_change(state))
    
    # Properties
    
    @property
    def connection_state(self) -> str:
        """Get current connection state."""
        return self._pc.connectionState if self._pc else "closed"
    
    @property
    def ice_connection_state(self) -> str:
        """Get current ICE connection state."""
        return self._pc.iceConnectionState if self._pc else "closed"
    
    @property
    def signaling_state(self) -> str:
        """Get current signaling state."""
        return self._pc.signalingState if self._pc else "closed"

3.5 Custom Audio Tracks \

# bridge/webrtc/tracks.py

import asyncio
import fractions
import time
from typing import Optional
from aiortc import MediaStreamTrack
from av import AudioFrame
import numpy as np

class AudioTrackSink(MediaStreamTrack):
    """
    Audio track that receives frames from a WebRTC peer.
    
    Captures incoming audio and makes it available
    for processing by the voice pipeline.
    """
    
    kind = "audio"
    
    def __init__(
        self,
        track: MediaStreamTrack,
        on_frame: callable = None,
    ):
        super().__init__()
        self._track = track
        self.on_frame = on_frame
        self._running = True
    
    async def recv(self) -> AudioFrame:
        """Receive and process audio frames."""
        frame = await self._track.recv()
        
        if self.on_frame and self._running:
            await self.on_frame(frame)
        
        return frame
    
    def stop(self) -> None:
        """Stop receiving frames."""
        self._running = False
        super().stop()


class AudioTrackSource(MediaStreamTrack):
    """
    Audio track that generates frames for a WebRTC peer.
    
    Receives audio from the voice pipeline and sends
    it to the remote peer.
    """
    
    kind = "audio"
    
    def __init__(
        self,
        sample_rate: int = 48000,
        channels: int = 1,
        samples_per_frame: int = 960,  # 20ms at 48kHz
    ):
        super().__init__()
        
        self.sample_rate = sample_rate
        self.channels = channels
        self.samples_per_frame = samples_per_frame
        
        # Frame timing
        self._frame_duration = samples_per_frame / sample_rate
        self._start_time: Optional[float] = None
        self._frame_count = 0
        
        # Audio buffer
        self._queue: asyncio.Queue[np.ndarray] = asyncio.Queue(maxsize=50)
        
        # Silence frame for when buffer is empty
        self._silence = np.zeros(
            (samples_per_frame, channels),
            dtype=np.int16,
        )
    
    async def recv(self) -> AudioFrame:
        """Generate the next audio frame."""
        # Initialize timing on first frame
        if self._start_time is None:
            self._start_time = time.time()
        
        # Calculate expected time for this frame
        expected_time = self._start_time + (self._frame_count * self._frame_duration)
        
        # Wait until it's time to send this frame
        now = time.time()
        if now < expected_time:
            await asyncio.sleep(expected_time - now)
        
        # Get audio data from queue or use silence
        try:
            audio_data = self._queue.get_nowait()
        except asyncio.QueueEmpty:
            audio_data = self._silence
        
        # Create AudioFrame
        frame = AudioFrame(
            format="s16",
            layout="mono" if self.channels == 1 else "stereo",
            samples=self.samples_per_frame,
        )
        
        # Set frame data
        frame.planes[0].update(audio_data.tobytes())
        frame.sample_rate = self.sample_rate
        frame.pts = self._frame_count * self.samples_per_frame
        frame.time_base = fractions.Fraction(1, self.sample_rate)
        
        self._frame_count += 1
        
        return frame
    
    async def push_audio(self, audio_data: np.ndarray) -> None:
        """
        Push audio data to be sent.
        
        Args:
            audio_data: Audio samples as numpy array (int16)
        """
        try:
            self._queue.put_nowait(audio_data)
        except asyncio.QueueFull:
            # Drop oldest frame
            try:
                self._queue.get_nowait()
                self._queue.put_nowait(audio_data)
            except asyncio.QueueEmpty:
                pass
    
    def clear_buffer(self) -> None:
        """Clear the audio buffer."""
        while not self._queue.empty():
            try:
                self._queue.get_nowait()
            except asyncio.QueueEmpty:
                break
    
    def stop(self) -> None:
        """Stop the track."""
        self.clear_buffer()
        super().stop()

  1. SDP Exchange \

4.1 SDP Overview \

Session Description Protocol (SDP) describes multimedia sessions. In WebRTC, SDP is used to negotiate:
  • Media types (audio, video, data)
  • Codecs and their parameters
  • Transport information
  • Security parameters (DTLS fingerprint)

4.2 SDP Structure \

┌─────────────────────────────────────────────────────────────────────────────┐
│                           SDP STRUCTURE                                     │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   SESSION LEVEL (v=, o=, s=, t=)                                           │
│   ├── Protocol version                                                      │
│   ├── Origin (session ID, username)                                        │
│   ├── Session name                                                          │
│   └── Timing (start/stop time)                                             │
│                                                                             │
│   MEDIA LEVEL (m=, c=, a=)                                                 │
│   ├── Media type (audio/video)                                             │
│   ├── Port number                                                           │
│   ├── Protocol (UDP/TLS/RTP/SAVPF)                                         │
│   ├── Format list (payload types)                                          │
│   ├── Connection info (IP address)                                         │
│   └── Attributes                                                            │
│       ├── rtpmap (codec mapping)                                           │
│       ├── fmtp (format parameters)                                         │
│       ├── ice-ufrag, ice-pwd (ICE credentials)                            │
│       ├── fingerprint (DTLS)                                               │
│       ├── setup (DTLS role)                                                │
│       ├── mid (media ID)                                                   │
│       ├── sendrecv/sendonly/recvonly                                       │
│       └── candidate (ICE candidates)                                       │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

4.3 SDP Offer/Answer Flow with GoToConnect \

┌─────────────────────────────────────────────────────────────────────────────┐
│                    SDP OFFER/ANSWER FLOW                                    │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   INBOUND CALL (GoTo initiates):                                           │
│   ─────────────────────────────                                             │
│                                                                             │
│   GoToConnect                     Bridge                                    │
│        │                            │                                       │
│        │  1. Webhook: call.ringing  │                                       │
│        │────────────────────────────▶                                       │
│        │                            │                                       │
│        │  2. POST /calls/{id}/answer│                                       │
│        │◀────────────────────────────                                       │
│        │                            │                                       │
│        │  3. SDP Offer              │                                       │
│        │────────────────────────────▶                                       │
│        │                            │  4. Parse offer                       │
│        │                            │  5. Create answer                     │
│        │                            │                                       │
│        │  6. SDP Answer             │                                       │
│        │◀────────────────────────────                                       │
│        │                            │                                       │
│        │  7. ICE Candidates         │                                       │
│        │◀───────────────────────────▶  (trickle ICE)                       │
│        │                            │                                       │
│        │  8. Media flows            │                                       │
│        │◀═══════════════════════════▶                                       │
│        │                            │                                       │
│                                                                             │
│   OUTBOUND CALL (Bridge initiates):                                         │
│   ─────────────────────────────────                                         │
│                                                                             │
│   GoToConnect                     Bridge                                    │
│        │                            │                                       │
│        │  1. POST /calls            │                                       │
│        │     (with SDP offer)       │                                       │
│        │◀────────────────────────────                                       │
│        │                            │                                       │
│        │  2. 200 OK with SDP answer │                                       │
│        │────────────────────────────▶                                       │
│        │                            │                                       │
│        │  3. ICE Candidates         │                                       │
│        │◀───────────────────────────▶                                       │
│        │                            │                                       │
│        │  4. Media flows            │                                       │
│        │◀═══════════════════════════▶                                       │
│        │                            │                                       │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

4.4 SDP Negotiator Implementation \

# bridge/webrtc/sdp_negotiator.py

import re
from dataclasses import dataclass, field
from typing import List, Optional, Dict
from enum import Enum

class SDPType(Enum):
    OFFER = "offer"
    ANSWER = "answer"
    PRANSWER = "pranswer"

@dataclass
class CodecInfo:
    """Information about a codec in SDP."""
    payload_type: int
    name: str
    clock_rate: int
    channels: int = 1
    fmtp: Optional[str] = None

@dataclass
class MediaDescription:
    """Parsed media section from SDP."""
    media_type: str  # audio, video
    port: int
    protocol: str
    formats: List[int]
    codecs: List[CodecInfo] = field(default_factory=list)
    direction: str = "sendrecv"
    ice_ufrag: Optional[str] = None
    ice_pwd: Optional[str] = None
    fingerprint: Optional[str] = None
    setup: Optional[str] = None
    mid: Optional[str] = None
    candidates: List[str] = field(default_factory=list)

@dataclass
class ParsedSDP:
    """Fully parsed SDP."""
    version: int
    origin: str
    session_name: str
    timing: str
    media: List[MediaDescription] = field(default_factory=list)
    ice_ufrag: Optional[str] = None
    ice_pwd: Optional[str] = None
    fingerprint: Optional[str] = None
    groups: List[str] = field(default_factory=list)

class SDPNegotiator:
    """
    Handles SDP parsing, modification, and generation.
    
    Manages codec negotiation and ensures compatibility
    between GoToConnect and our WebRTC implementation.
    """
    
    # Preferred codecs in order
    PREFERRED_AUDIO_CODECS = [
        ("opus", 48000, 2),    # Opus stereo
        ("PCMU", 8000, 1),     # G.711 μ-law
        ("PCMA", 8000, 1),     # G.711 A-law
    ]
    
    def __init__(self):
        self._local_ice_ufrag: Optional[str] = None
        self._local_ice_pwd: Optional[str] = None
    
    def parse_sdp(self, sdp: str) -> ParsedSDP:
        """
        Parse an SDP string into structured data.
        
        Args:
            sdp: Raw SDP string
        
        Returns:
            Parsed SDP structure
        """
        lines = sdp.strip().split('\r\n')
        if not lines[0].startswith('v='):
            lines = sdp.strip().split('\n')
        
        parsed = ParsedSDP(
            version=0,
            origin="",
            session_name="",
            timing="",
        )
        
        current_media: Optional[MediaDescription] = None
        
        for line in lines:
            if not line or '=' not in line:
                continue
            
            key, value = line[0], line[2:]
            
            # Session-level attributes
            if key == 'v':
                parsed.version = int(value)
            elif key == 'o':
                parsed.origin = value
            elif key == 's':
                parsed.session_name = value
            elif key == 't':
                parsed.timing = value
            elif key == 'm':
                # New media section
                if current_media:
                    parsed.media.append(current_media)
                current_media = self._parse_media_line(value)
            elif key == 'a' and current_media:
                # Media-level attribute
                self._parse_attribute(current_media, value)
            elif key == 'a':
                # Session-level attribute
                self._parse_session_attribute(parsed, value)
        
        if current_media:
            parsed.media.append(current_media)
        
        return parsed
    
    def _parse_media_line(self, value: str) -> MediaDescription:
        """Parse m= line."""
        parts = value.split()
        media_type = parts[0]
        port = int(parts[1])
        protocol = parts[2]
        formats = [int(f) for f in parts[3:]]
        
        return MediaDescription(
            media_type=media_type,
            port=port,
            protocol=protocol,
            formats=formats,
        )
    
    def _parse_attribute(
        self,
        media: MediaDescription,
        value: str,
    ) -> None:
        """Parse media-level attribute."""
        if value.startswith("rtpmap:"):
            codec = self._parse_rtpmap(value[7:])
            if codec:
                media.codecs.append(codec)
        elif value.startswith("fmtp:"):
            self._parse_fmtp(media, value[5:])
        elif value.startswith("ice-ufrag:"):
            media.ice_ufrag = value[10:]
        elif value.startswith("ice-pwd:"):
            media.ice_pwd = value[8:]
        elif value.startswith("fingerprint:"):
            media.fingerprint = value[12:]
        elif value.startswith("setup:"):
            media.setup = value[6:]
        elif value.startswith("mid:"):
            media.mid = value[4:]
        elif value.startswith("candidate:"):
            media.candidates.append(value)
        elif value in ("sendrecv", "sendonly", "recvonly", "inactive"):
            media.direction = value
    
    def _parse_session_attribute(
        self,
        parsed: ParsedSDP,
        value: str,
    ) -> None:
        """Parse session-level attribute."""
        if value.startswith("ice-ufrag:"):
            parsed.ice_ufrag = value[10:]
        elif value.startswith("ice-pwd:"):
            parsed.ice_pwd = value[8:]
        elif value.startswith("fingerprint:"):
            parsed.fingerprint = value[12:]
        elif value.startswith("group:"):
            parsed.groups.append(value[6:])
    
    def _parse_rtpmap(self, value: str) -> Optional[CodecInfo]:
        """Parse rtpmap attribute."""
        match = re.match(r'(\d+)\s+(\w+)/(\d+)(?:/(\d+))?', value)
        if match:
            return CodecInfo(
                payload_type=int(match.group(1)),
                name=match.group(2),
                clock_rate=int(match.group(3)),
                channels=int(match.group(4)) if match.group(4) else 1,
            )
        return None
    
    def _parse_fmtp(
        self,
        media: MediaDescription,
        value: str,
    ) -> None:
        """Parse fmtp attribute and attach to codec."""
        parts = value.split(' ', 1)
        if len(parts) == 2:
            payload_type = int(parts[0])
            for codec in media.codecs:
                if codec.payload_type == payload_type:
                    codec.fmtp = parts[1]
                    break
    
    def negotiate_codecs(
        self,
        offered: List[CodecInfo],
    ) -> List[CodecInfo]:
        """
        Negotiate codecs from an offer.
        
        Returns codecs in our preferred order that are
        also supported by the remote peer.
        """
        negotiated = []
        
        for pref_name, pref_rate, pref_channels in self.PREFERRED_AUDIO_CODECS:
            for offered_codec in offered:
                if (offered_codec.name.lower() == pref_name.lower() and
                    offered_codec.clock_rate == pref_rate):
                    negotiated.append(offered_codec)
                    break
        
        return negotiated
    
    def generate_answer(
        self,
        offer: ParsedSDP,
        local_fingerprint: str,
        local_ice_ufrag: str,
        local_ice_pwd: str,
    ) -> str:
        """
        Generate an SDP answer for an offer.
        
        Args:
            offer: Parsed SDP offer
            local_fingerprint: Our DTLS fingerprint
            local_ice_ufrag: Our ICE username fragment
            local_ice_pwd: Our ICE password
        
        Returns:
            SDP answer string
        """
        lines = [
            "v=0",
            f"o=- {int(time.time())} 1 IN IP4 0.0.0.0",
            "s=-",
            "t=0 0",
        ]
        
        # Session-level attributes
        if offer.groups:
            for group in offer.groups:
                lines.append(f"a=group:{group}")
        
        lines.append("a=msid-semantic: WMS *")
        
        # Generate media sections
        for media in offer.media:
            if media.media_type == "audio":
                media_lines = self._generate_audio_answer(
                    media,
                    local_fingerprint,
                    local_ice_ufrag,
                    local_ice_pwd,
                )
                lines.extend(media_lines)
        
        return '\r\n'.join(lines) + '\r\n'
    
    def _generate_audio_answer(
        self,
        offer_media: MediaDescription,
        fingerprint: str,
        ice_ufrag: str,
        ice_pwd: str,
    ) -> List[str]:
        """Generate audio media section for answer."""
        # Negotiate codecs
        negotiated = self.negotiate_codecs(offer_media.codecs)
        
        if not negotiated:
            raise ValueError("No compatible audio codecs found")
        
        # Build format list
        formats = [str(c.payload_type) for c in negotiated]
        
        lines = [
            f"m=audio 9 UDP/TLS/RTP/SAVPF {' '.join(formats)}",
            "c=IN IP4 0.0.0.0",
        ]
        
        # ICE credentials
        lines.append(f"a=ice-ufrag:{ice_ufrag}")
        lines.append(f"a=ice-pwd:{ice_pwd}")
        
        # DTLS
        lines.append(f"a=fingerprint:{fingerprint}")
        
        # Setup role (we're answering, so passive or active based on offer)
        if offer_media.setup == "actpass":
            lines.append("a=setup:active")
        elif offer_media.setup == "active":
            lines.append("a=setup:passive")
        else:
            lines.append("a=setup:active")
        
        # Media ID
        if offer_media.mid:
            lines.append(f"a=mid:{offer_media.mid}")
        
        # Direction (mirror the offer)
        lines.append(f"a={offer_media.direction}")
        
        # RTP/RTCP
        lines.append("a=rtcp-mux")
        
        # Codec descriptions
        for codec in negotiated:
            if codec.channels > 1:
                lines.append(
                    f"a=rtpmap:{codec.payload_type} "
                    f"{codec.name}/{codec.clock_rate}/{codec.channels}"
                )
            else:
                lines.append(
                    f"a=rtpmap:{codec.payload_type} "
                    f"{codec.name}/{codec.clock_rate}"
                )
            
            if codec.fmtp:
                lines.append(f"a=fmtp:{codec.payload_type} {codec.fmtp}")
        
        return lines
    
    def generate_offer(
        self,
        local_fingerprint: str,
        local_ice_ufrag: str,
        local_ice_pwd: str,
    ) -> str:
        """
        Generate an SDP offer for outbound calls.
        
        Args:
            local_fingerprint: Our DTLS fingerprint
            local_ice_ufrag: Our ICE username fragment
            local_ice_pwd: Our ICE password
        
        Returns:
            SDP offer string
        """
        lines = [
            "v=0",
            f"o=- {int(time.time())} 1 IN IP4 0.0.0.0",
            "s=-",
            "t=0 0",
            "a=group:BUNDLE 0",
            "a=msid-semantic: WMS *",
            
            # Audio media section
            "m=audio 9 UDP/TLS/RTP/SAVPF 111 0 8",
            "c=IN IP4 0.0.0.0",
            f"a=ice-ufrag:{local_ice_ufrag}",
            f"a=ice-pwd:{local_ice_pwd}",
            f"a=fingerprint:{local_fingerprint}",
            "a=setup:actpass",
            "a=mid:0",
            "a=sendrecv",
            "a=rtcp-mux",
            
            # Opus
            "a=rtpmap:111 opus/48000/2",
            "a=fmtp:111 minptime=10;useinbandfec=1",
            
            # G.711 μ-law
            "a=rtpmap:0 PCMU/8000",
            
            # G.711 A-law
            "a=rtpmap:8 PCMA/8000",
        ]
        
        return '\r\n'.join(lines) + '\r\n'


class SDPModifier:
    """
    Utilities for modifying existing SDP.
    """
    
    @staticmethod
    def add_candidate(sdp: str, candidate: str, mid: str) -> str:
        """Add an ICE candidate to SDP."""
        lines = sdp.split('\r\n')
        result = []
        in_target_media = False
        
        for line in lines:
            result.append(line)
            
            if line.startswith(f"a=mid:{mid}"):
                in_target_media = True
            elif line.startswith("m=") and in_target_media:
                in_target_media = False
            elif in_target_media and line.startswith("a=") and not line.startswith("a=candidate"):
                # Insert candidate before other attributes
                pass
        
        # Add candidate at end of media section
        result.insert(-1, f"a={candidate}")
        
        return '\r\n'.join(result)
    
    @staticmethod
    def set_direction(sdp: str, direction: str) -> str:
        """Set media direction in SDP."""
        directions = ["sendrecv", "sendonly", "recvonly", "inactive"]
        
        for d in directions:
            sdp = sdp.replace(f"a={d}", f"a={direction}")
        
        return sdp
    
    @staticmethod
    def remove_video(sdp: str) -> str:
        """Remove video media from SDP."""
        lines = sdp.split('\r\n')
        result = []
        skip_until_next_media = False
        
        for line in lines:
            if line.startswith("m=video"):
                skip_until_next_media = True
            elif line.startswith("m=") and skip_until_next_media:
                skip_until_next_media = False
                result.append(line)
            elif not skip_until_next_media:
                result.append(line)
        
        return '\r\n'.join(result)

4.5 SDP Examples \

# Example GoToConnect SDP Offer

v=0
o=- 7058047285456728323 2 IN IP4 127.0.0.1
s=-
t=0 0
a=group:BUNDLE 0
a=msid-semantic: WMS stream
m=audio 9 UDP/TLS/RTP/SAVPF 111 0 8
c=IN IP4 0.0.0.0
a=rtcp:9 IN IP4 0.0.0.0
a=ice-ufrag:abc123
a=ice-pwd:supersecretpassword12345678
a=fingerprint:sha-256 AB:CD:EF:12:34:56:78:90...
a=setup:actpass
a=mid:0
a=sendrecv
a=rtcp-mux
a=rtpmap:111 opus/48000/2
a=fmtp:111 minptime=10;useinbandfec=1
a=rtpmap:0 PCMU/8000
a=rtpmap:8 PCMA/8000
a=candidate:1 1 UDP 2122262783 192.168.1.100 54321 typ host
a=candidate:2 1 UDP 1686052863 203.0.113.50 54322 typ srflx raddr 192.168.1.100 rport 54321
# Example Bridge SDP Answer

v=0
o=- 1705408234 1 IN IP4 0.0.0.0
s=-
t=0 0
a=group:BUNDLE 0
a=msid-semantic: WMS *
m=audio 9 UDP/TLS/RTP/SAVPF 111 0
c=IN IP4 0.0.0.0
a=ice-ufrag:xyz789
a=ice-pwd:ouricecredentials12345678
a=fingerprint:sha-256 12:34:56:78:90:AB:CD:EF...
a=setup:active
a=mid:0
a=sendrecv
a=rtcp-mux
a=rtpmap:111 opus/48000/2
a=fmtp:111 minptime=10;useinbandfec=1
a=rtpmap:0 PCMU/8000

  1. ICE Candidate Handling \

5.1 ICE Overview \

Interactive Connectivity Establishment (ICE) finds the best path for media between peers:
┌─────────────────────────────────────────────────────────────────────────────┐
│                         ICE CANDIDATE TYPES                                 │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   TYPE        PRIORITY    DESCRIPTION                                       │
│   ────────────────────────────────────────────────────────────────────     │
│                                                                             │
│   host        Highest     Direct local IP address                           │
│                           • 192.168.1.100:54321                            │
│                           • Best latency, works on same network            │
│                                                                             │
│   srflx       Medium      Server Reflexive (STUN-discovered public IP)     │
│                           • 203.0.113.50:54322                             │
│                           • Works across most NATs                          │
│                                                                             │
│   prflx       Medium      Peer Reflexive (discovered during ICE)           │
│                           • Dynamic discovery                               │
│                           • Found during connectivity checks               │
│                                                                             │
│   relay       Lowest      TURN relay server                                 │
│                           • turn.example.com:443                           │
│                           • Always works, but adds latency                 │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

5.2 ICE Connection Process \

┌─────────────────────────────────────────────────────────────────────────────┐
│                       ICE CONNECTION PROCESS                                │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   GATHERING PHASE:                                                          │
│   ─────────────────                                                         │
│                                                                             │
│   1. Collect host candidates (local interfaces)                            │
│   2. Query STUN servers for srflx candidates                               │
│   3. Allocate TURN relay candidates (if configured)                        │
│   4. Send candidates to remote peer (trickle ICE)                          │
│                                                                             │
│   CONNECTIVITY CHECKING:                                                    │
│   ──────────────────────                                                    │
│                                                                             │
│   For each local/remote candidate pair:                                     │
│   1. Send STUN binding request                                             │
│   2. Await response                                                         │
│   3. Calculate round-trip time                                             │
│   4. Mark pair as succeeded/failed                                         │
│                                                                             │
│   NOMINATION:                                                               │
│   ───────────                                                               │
│                                                                             │
│   1. Rank successful pairs by priority                                     │
│   2. Controlling agent nominates best pair                                 │
│   3. Both sides switch to nominated pair                                   │
│                                                                             │
│                                                                             │
│   ┌─────────┐        ┌─────────┐        ┌─────────┐                       │
│   │  new    │───────▶│gathering│───────▶│checking │                       │
│   └─────────┘        └─────────┘        └────┬────┘                       │
│                                              │                             │
│                      ┌───────────────────────┼───────────────────────┐    │
│                      │                       │                       │    │
│                      ▼                       ▼                       ▼    │
│               ┌───────────┐          ┌───────────┐          ┌─────────┐  │
│               │ connected │          │disconnected│         │ failed  │  │
│               └───────────┘          └───────────┘          └─────────┘  │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

5.3 ICE Manager Implementation \

# bridge/webrtc/ice_manager.py

import asyncio
from dataclasses import dataclass
from typing import Optional, List, Callable
from enum import Enum

class ICEGatheringState(Enum):
    NEW = "new"
    GATHERING = "gathering"
    COMPLETE = "complete"

class ICEConnectionState(Enum):
    NEW = "new"
    CHECKING = "checking"
    CONNECTED = "connected"
    COMPLETED = "completed"
    DISCONNECTED = "disconnected"
    FAILED = "failed"
    CLOSED = "closed"

@dataclass
class ICECandidate:
    """Parsed ICE candidate."""
    foundation: str
    component: int
    protocol: str
    priority: int
    ip: str
    port: int
    type: str  # host, srflx, prflx, relay
    related_address: Optional[str] = None
    related_port: Optional[int] = None
    tcp_type: Optional[str] = None
    
    @classmethod
    def parse(cls, candidate_str: str) -> "ICECandidate":
        """Parse ICE candidate string."""
        # candidate:foundation component protocol priority ip port typ type [extensions]
        parts = candidate_str.split()
        
        if parts[0].startswith("candidate:"):
            parts[0] = parts[0][10:]
        
        candidate = cls(
            foundation=parts[0],
            component=int(parts[1]),
            protocol=parts[2].upper(),
            priority=int(parts[3]),
            ip=parts[4],
            port=int(parts[5]),
            type=parts[7],
        )
        
        # Parse extensions
        i = 8
        while i < len(parts) - 1:
            if parts[i] == "raddr":
                candidate.related_address = parts[i + 1]
            elif parts[i] == "rport":
                candidate.related_port = int(parts[i + 1])
            elif parts[i] == "tcptype":
                candidate.tcp_type = parts[i + 1]
            i += 2
        
        return candidate
    
    def to_string(self) -> str:
        """Convert to SDP candidate string."""
        s = (
            f"candidate:{self.foundation} {self.component} "
            f"{self.protocol} {self.priority} "
            f"{self.ip} {self.port} typ {self.type}"
        )
        
        if self.related_address:
            s += f" raddr {self.related_address}"
        if self.related_port:
            s += f" rport {self.related_port}"
        if self.tcp_type:
            s += f" tcptype {self.tcp_type}"
        
        return s


class ICEManager:
    """
    Manages ICE candidate gathering and connectivity.
    
    Handles trickle ICE with GoToConnect, ensuring
    candidates are exchanged efficiently.
    """
    
    def __init__(
        self,
        call_id: str,
        goto_client: "GoToCallControlClient",
    ):
        self.call_id = call_id
        self.goto_client = goto_client
        
        # State
        self._gathering_state = ICEGatheringState.NEW
        self._connection_state = ICEConnectionState.NEW
        
        # Candidates
        self._local_candidates: List[ICECandidate] = []
        self._remote_candidates: List[ICECandidate] = []
        self._pending_remote: List[dict] = []
        
        # Event handlers
        self.on_gathering_state_change: Optional[Callable] = None
        self.on_connection_state_change: Optional[Callable] = None
        self.on_candidate: Optional[Callable] = None
        
        # Candidate buffering for trickle ICE
        self._candidate_buffer: List[dict] = []
        self._buffer_timer: Optional[asyncio.Task] = None
        self._buffer_delay = 0.1  # 100ms batching
    
    async def handle_local_candidate(
        self,
        candidate: "RTCIceCandidate",
    ) -> None:
        """
        Handle a locally gathered ICE candidate.
        
        Buffers candidates briefly to batch them for
        more efficient signaling.
        """
        if candidate is None:
            # Gathering complete
            await self._flush_candidates()
            self._gathering_state = ICEGatheringState.COMPLETE
            if self.on_gathering_state_change:
                await self.on_gathering_state_change(self._gathering_state)
            return
        
        parsed = ICECandidate.parse(candidate.candidate)
        self._local_candidates.append(parsed)
        
        # Buffer candidate
        self._candidate_buffer.append({
            "candidate": candidate.candidate,
            "sdpMid": candidate.sdpMid,
            "sdpMLineIndex": candidate.sdpMLineIndex,
        })
        
        # Start/reset buffer timer
        if self._buffer_timer:
            self._buffer_timer.cancel()
        
        self._buffer_timer = asyncio.create_task(
            self._flush_candidates_after_delay()
        )
        
        logger.debug(f"[{self.call_id}] Local candidate: {parsed.type} {parsed.ip}:{parsed.port}")
    
    async def _flush_candidates_after_delay(self) -> None:
        """Flush buffered candidates after delay."""
        await asyncio.sleep(self._buffer_delay)
        await self._flush_candidates()
    
    async def _flush_candidates(self) -> None:
        """Send buffered candidates to GoToConnect."""
        if not self._candidate_buffer:
            return
        
        candidates = self._candidate_buffer.copy()
        self._candidate_buffer.clear()
        
        try:
            for candidate in candidates:
                await self.goto_client.send_ice_candidate(
                    call_id=self.call_id,
                    candidate=candidate["candidate"],
                    sdp_mid=candidate["sdpMid"],
                    sdp_mline_index=candidate["sdpMLineIndex"],
                )
            
            logger.info(f"[{self.call_id}] Sent {len(candidates)} ICE candidates")
            
        except Exception as e:
            logger.error(f"[{self.call_id}] Failed to send ICE candidates: {e}")
    
    async def handle_remote_candidate(
        self,
        candidate_data: dict,
        pc: "RTCPeerConnection",
    ) -> None:
        """
        Handle a remote ICE candidate from GoToConnect.
        
        Args:
            candidate_data: Candidate data from event
            pc: PeerConnection to add candidate to
        """
        candidate_str = candidate_data.get("candidate")
        
        if not candidate_str:
            # End of candidates
            logger.info(f"[{self.call_id}] Remote ICE gathering complete")
            return
        
        try:
            parsed = ICECandidate.parse(candidate_str)
            self._remote_candidates.append(parsed)
            
            # Add to peer connection
            ice_candidate = RTCIceCandidate(
                candidate=candidate_str,
                sdpMid=candidate_data.get("sdpMid", "0"),
                sdpMLineIndex=candidate_data.get("sdpMLineIndex", 0),
            )
            
            await pc.addIceCandidate(ice_candidate)
            
            logger.debug(
                f"[{self.call_id}] Remote candidate: "
                f"{parsed.type} {parsed.ip}:{parsed.port}"
            )
            
        except Exception as e:
            logger.error(f"[{self.call_id}] Failed to add remote candidate: {e}")
    
    def update_connection_state(self, state: str) -> None:
        """Update ICE connection state."""
        try:
            self._connection_state = ICEConnectionState(state)
        except ValueError:
            self._connection_state = ICEConnectionState.NEW
        
        logger.info(f"[{self.call_id}] ICE connection state: {state}")
        
        if self.on_connection_state_change:
            asyncio.create_task(
                self.on_connection_state_change(self._connection_state)
            )
    
    @property
    def is_connected(self) -> bool:
        """Check if ICE is connected."""
        return self._connection_state in (
            ICEConnectionState.CONNECTED,
            ICEConnectionState.COMPLETED,
        )
    
    @property
    def local_candidates(self) -> List[ICECandidate]:
        """Get gathered local candidates."""
        return self._local_candidates.copy()
    
    @property
    def remote_candidates(self) -> List[ICECandidate]:
        """Get received remote candidates."""
        return self._remote_candidates.copy()


class ICERestartManager:
    """
    Handles ICE restart scenarios.
    
    ICE restart is needed when connectivity is lost
    but the call should continue.
    """
    
    def __init__(
        self,
        ice_manager: ICEManager,
        pc: "RTCPeerConnection",
    ):
        self.ice_manager = ice_manager
        self.pc = pc
        
        self._restart_count = 0
        self._max_restarts = 3
        self._restart_cooldown = 5.0  # seconds
        self._last_restart: Optional[float] = None
    
    async def check_and_restart(self) -> bool:
        """
        Check if ICE restart is needed and perform it.
        
        Returns:
            True if restart was performed
        """
        if not self._should_restart():
            return False
        
        if self._restart_count >= self._max_restarts:
            logger.warning(
                f"[{self.ice_manager.call_id}] "
                f"Max ICE restarts ({self._max_restarts}) reached"
            )
            return False
        
        # Check cooldown
        if self._last_restart:
            elapsed = time.time() - self._last_restart
            if elapsed < self._restart_cooldown:
                return False
        
        await self._perform_restart()
        return True
    
    def _should_restart(self) -> bool:
        """Determine if ICE restart is needed."""
        state = self.ice_manager._connection_state
        return state in (
            ICEConnectionState.DISCONNECTED,
            ICEConnectionState.FAILED,
        )
    
    async def _perform_restart(self) -> None:
        """Perform ICE restart."""
        logger.info(f"[{self.ice_manager.call_id}] Performing ICE restart")
        
        self._restart_count += 1
        self._last_restart = time.time()
        
        # Create new offer with ice-restart
        offer = await self.pc.createOffer(iceRestart=True)
        await self.pc.setLocalDescription(offer)
        
        # Send to GoToConnect
        # (Implementation depends on GoTo API)
        
        logger.info(
            f"[{self.ice_manager.call_id}] "
            f"ICE restart initiated (attempt {self._restart_count})"
        )

5.4 Trickle ICE Flow \

┌─────────────────────────────────────────────────────────────────────────────┐
│                        TRICKLE ICE FLOW                                     │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   Bridge                              GoToConnect                           │
│      │                                     │                                │
│      │  SDP Offer (no candidates)          │                                │
│      │────────────────────────────────────▶│                                │
│      │                                     │                                │
│      │  SDP Answer (no candidates)         │                                │
│      │◀────────────────────────────────────│                                │
│      │                                     │                                │
│      │                                     │  Start gathering               │
│      │  Start gathering                    │                                │
│      │                                     │                                │
│      │  candidate (host)                   │                                │
│      │────────────────────────────────────▶│                                │
│      │                                     │                                │
│      │                    candidate (host) │                                │
│      │◀────────────────────────────────────│                                │
│      │                                     │                                │
│      │  candidate (srflx)                  │                                │
│      │────────────────────────────────────▶│                                │
│      │                                     │                                │
│      │                   candidate (srflx) │                                │
│      │◀────────────────────────────────────│                                │
│      │                                     │                                │
│      │  end-of-candidates                  │                                │
│      │────────────────────────────────────▶│                                │
│      │                                     │                                │
│      │               end-of-candidates     │                                │
│      │◀────────────────────────────────────│                                │
│      │                                     │                                │
│      │  ═══════ ICE Connected ═══════════  │                                │
│      │                                     │                                │
│      │  ═══════ Media Flowing ═══════════  │                                │
│      │                                     │                                │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

  1. Audio Frame Processing \

6.1 Audio Frame Fundamentals \

WebRTC audio is transmitted as discrete frames:
┌─────────────────────────────────────────────────────────────────────────────┐
│                       AUDIO FRAME STRUCTURE                                 │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   FRAME PARAMETERS:                                                         │
│   ─────────────────                                                         │
│                                                                             │
│   Sample Rate:     8000 Hz (G.711) / 48000 Hz (Opus)                       │
│   Bit Depth:       16-bit signed integer (PCM)                             │
│   Channels:        1 (mono) or 2 (stereo)                                  │
│   Frame Duration:  10ms, 20ms, 40ms, or 60ms                               │
│                                                                             │
│   SAMPLES PER FRAME:                                                        │
│   ──────────────────                                                        │
│                                                                             │
│   │ Duration │ 8kHz  │ 16kHz │ 48kHz │                                     │
│   │──────────│───────│───────│───────│                                     │
│   │ 10ms     │ 80    │ 160   │ 480   │                                     │
│   │ 20ms     │ 160   │ 320   │ 960   │  ◀── Most common                   │
│   │ 40ms     │ 320   │ 640   │ 1920  │                                     │
│   │ 60ms     │ 480   │ 960   │ 2880  │                                     │
│                                                                             │
│   FRAME SIZE IN BYTES (16-bit mono):                                        │
│   ──────────────────────────────────                                        │
│                                                                             │
│   │ Duration │ 8kHz  │ 16kHz │ 48kHz │                                     │
│   │──────────│───────│───────│───────│                                     │
│   │ 10ms     │ 160   │ 320   │ 960   │                                     │
│   │ 20ms     │ 320   │ 640   │ 1920  │                                     │
│   │ 40ms     │ 640   │ 1280  │ 3840  │                                     │
│   │ 60ms     │ 960   │ 1920  │ 5760  │                                     │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

6.2 PyAV AudioFrame \

aiortc uses PyAV (FFmpeg bindings) for audio processing:
# bridge/audio/frames.py

import av
import numpy as np
from typing import Tuple
import fractions

class AudioFrameProcessor:
    """
    Utilities for working with PyAV AudioFrames.
    
    Handles conversion between AudioFrame and numpy arrays,
    as well as common audio manipulations.
    """
    
    @staticmethod
    def frame_to_numpy(frame: av.AudioFrame) -> np.ndarray:
        """
        Convert AudioFrame to numpy array.
        
        Args:
            frame: PyAV AudioFrame
        
        Returns:
            numpy array of shape (samples, channels)
        """
        # Get raw data from frame
        # frame.to_ndarray() returns shape (channels, samples)
        data = frame.to_ndarray()
        
        # Transpose to (samples, channels)
        if data.ndim == 2:
            data = data.T
        
        return data
    
    @staticmethod
    def numpy_to_frame(
        data: np.ndarray,
        sample_rate: int,
        pts: int = 0,
        format: str = "s16",
        layout: str = "mono",
    ) -> av.AudioFrame:
        """
        Convert numpy array to AudioFrame.
        
        Args:
            data: numpy array (samples,) or (samples, channels)
            sample_rate: Sample rate in Hz
            pts: Presentation timestamp
            format: Audio format (s16, s32, flt, etc.)
            layout: Channel layout (mono, stereo)
        
        Returns:
            PyAV AudioFrame
        """
        # Ensure 2D array
        if data.ndim == 1:
            data = data.reshape(-1, 1)
        
        samples = data.shape[0]
        channels = data.shape[1]
        
        # Determine layout
        if channels == 1:
            layout = "mono"
        elif channels == 2:
            layout = "stereo"
        
        # Create frame
        frame = av.AudioFrame(
            format=format,
            layout=layout,
            samples=samples,
        )
        
        # Set frame data
        # PyAV expects (channels, samples) for plane data
        frame.planes[0].update(data.T.tobytes())
        
        frame.sample_rate = sample_rate
        frame.pts = pts
        frame.time_base = fractions.Fraction(1, sample_rate)
        
        return frame
    
    @staticmethod
    def get_frame_info(frame: av.AudioFrame) -> dict:
        """Get information about an audio frame."""
        return {
            "samples": frame.samples,
            "sample_rate": frame.sample_rate,
            "channels": len(frame.layout.channels),
            "format": frame.format.name,
            "pts": frame.pts,
            "duration_ms": (frame.samples / frame.sample_rate) * 1000,
            "size_bytes": sum(len(p) for p in frame.planes),
        }
    
    @staticmethod
    def normalize_frame(
        frame: av.AudioFrame,
        target_format: str = "s16",
        target_layout: str = "mono",
        target_rate: int = None,
    ) -> av.AudioFrame:
        """
        Normalize frame to target format.
        
        Args:
            frame: Input frame
            target_format: Target sample format
            target_layout: Target channel layout
            target_rate: Target sample rate (None to keep original)
        
        Returns:
            Normalized frame
        """
        resampler = av.AudioResampler(
            format=target_format,
            layout=target_layout,
            rate=target_rate or frame.sample_rate,
        )
        
        return resampler.resample(frame)[0]


class AudioBuffer:
    """
    Ring buffer for audio frames.
    
    Provides smooth audio flow by buffering frames
    and handling timing variations.
    """
    
    def __init__(
        self,
        max_duration_ms: float = 500,
        sample_rate: int = 48000,
        channels: int = 1,
    ):
        self.sample_rate = sample_rate
        self.channels = channels
        
        # Calculate buffer size
        max_samples = int(sample_rate * max_duration_ms / 1000)
        self._buffer = np.zeros((max_samples, channels), dtype=np.int16)
        
        self._write_pos = 0
        self._read_pos = 0
        self._available = 0
        self._lock = asyncio.Lock()
    
    async def write(self, data: np.ndarray) -> int:
        """
        Write audio data to buffer.
        
        Args:
            data: Audio samples (samples, channels)
        
        Returns:
            Number of samples written
        """
        async with self._lock:
            samples = data.shape[0]
            buffer_size = self._buffer.shape[0]
            
            # Check available space
            space = buffer_size - self._available
            if samples > space:
                # Buffer full - drop oldest data
                drop = samples - space
                self._read_pos = (self._read_pos + drop) % buffer_size
                self._available -= drop
            
            # Write data
            end_pos = self._write_pos + samples
            
            if end_pos <= buffer_size:
                self._buffer[self._write_pos:end_pos] = data
            else:
                # Wrap around
                first_part = buffer_size - self._write_pos
                self._buffer[self._write_pos:] = data[:first_part]
                self._buffer[:end_pos - buffer_size] = data[first_part:]
            
            self._write_pos = end_pos % buffer_size
            self._available += samples
            
            return samples
    
    async def read(self, samples: int) -> np.ndarray:
        """
        Read audio data from buffer.
        
        Args:
            samples: Number of samples to read
        
        Returns:
            Audio data or silence if not enough available
        """
        async with self._lock:
            buffer_size = self._buffer.shape[0]
            
            if self._available < samples:
                # Not enough data - return silence
                return np.zeros((samples, self.channels), dtype=np.int16)
            
            # Read data
            end_pos = self._read_pos + samples
            
            if end_pos <= buffer_size:
                data = self._buffer[self._read_pos:end_pos].copy()
            else:
                # Wrap around
                first_part = buffer_size - self._read_pos
                data = np.concatenate([
                    self._buffer[self._read_pos:],
                    self._buffer[:end_pos - buffer_size],
                ])
            
            self._read_pos = end_pos % buffer_size
            self._available -= samples
            
            return data
    
    @property
    def available_samples(self) -> int:
        """Number of samples available to read."""
        return self._available
    
    @property
    def available_ms(self) -> float:
        """Duration of audio available in milliseconds."""
        return (self._available / self.sample_rate) * 1000
    
    def clear(self) -> None:
        """Clear the buffer."""
        self._write_pos = 0
        self._read_pos = 0
        self._available = 0

6.3 Audio Resampling \

# bridge/audio/resampler.py

import numpy as np
from typing import Optional
import av

class AudioResampler:
    """
    High-quality audio resampling.
    
    Converts between different sample rates while
    maintaining audio quality using PyAV's resampler.
    """
    
    def __init__(
        self,
        input_rate: int,
        output_rate: int,
        input_channels: int = 1,
        output_channels: int = 1,
        input_format: str = "s16",
        output_format: str = "s16",
    ):
        self.input_rate = input_rate
        self.output_rate = output_rate
        self.input_channels = input_channels
        self.output_channels = output_channels
        self.input_format = input_format
        self.output_format = output_format
        
        # Determine layouts
        in_layout = "mono" if input_channels == 1 else "stereo"
        out_layout = "mono" if output_channels == 1 else "stereo"
        
        # Create resampler
        self._resampler = av.AudioResampler(
            format=output_format,
            layout=out_layout,
            rate=output_rate,
        )
        
        # For numpy-based fallback
        self._ratio = output_rate / input_rate
    
    def resample_frame(self, frame: av.AudioFrame) -> av.AudioFrame:
        """
        Resample an AudioFrame.
        
        Args:
            frame: Input frame at input_rate
        
        Returns:
            Resampled frame at output_rate
        """
        frames = self._resampler.resample(frame)
        if frames:
            return frames[0]
        return None
    
    def resample_numpy(self, data: np.ndarray) -> np.ndarray:
        """
        Resample numpy audio data.
        
        Args:
            data: Input samples (samples,) or (samples, channels)
        
        Returns:
            Resampled samples
        """
        if self.input_rate == self.output_rate:
            return data
        
        # Ensure 2D
        if data.ndim == 1:
            data = data.reshape(-1, 1)
        
        input_samples = data.shape[0]
        output_samples = int(input_samples * self._ratio)
        
        # Simple linear interpolation
        # For production, consider using scipy.signal.resample
        indices = np.linspace(0, input_samples - 1, output_samples)
        
        output = np.zeros((output_samples, data.shape[1]), dtype=data.dtype)
        
        for ch in range(data.shape[1]):
            output[:, ch] = np.interp(
                indices,
                np.arange(input_samples),
                data[:, ch],
            ).astype(data.dtype)
        
        return output
    
    def flush(self) -> Optional[av.AudioFrame]:
        """Flush any remaining samples from resampler."""
        frames = self._resampler.resample(None)
        if frames:
            return frames[0]
        return None


class MultiRateResampler:
    """
    Manages multiple resamplers for common conversions.
    
    Pre-creates resamplers for frequently used
    rate conversions to reduce allocation overhead.
    """
    
    COMMON_RATES = [8000, 16000, 24000, 48000]
    
    def __init__(self):
        self._resamplers: dict = {}
    
    def get_resampler(
        self,
        input_rate: int,
        output_rate: int,
        channels: int = 1,
    ) -> AudioResampler:
        """Get or create a resampler for the given conversion."""
        key = (input_rate, output_rate, channels)
        
        if key not in self._resamplers:
            self._resamplers[key] = AudioResampler(
                input_rate=input_rate,
                output_rate=output_rate,
                input_channels=channels,
                output_channels=channels,
            )
        
        return self._resamplers[key]
    
    def resample(
        self,
        data: np.ndarray,
        input_rate: int,
        output_rate: int,
    ) -> np.ndarray:
        """
        Resample audio data.
        
        Args:
            data: Input samples
            input_rate: Input sample rate
            output_rate: Output sample rate
        
        Returns:
            Resampled samples
        """
        if input_rate == output_rate:
            return data
        
        channels = data.shape[1] if data.ndim == 2 else 1
        resampler = self.get_resampler(input_rate, output_rate, channels)
        
        return resampler.resample_numpy(data)

6.4 Audio Format Conversion \

# bridge/audio/converter.py

import numpy as np
from typing import Union

class AudioConverter:
    """
    Convert between audio formats.
    
    Handles conversion between different bit depths,
    channel counts, and numeric representations.
    """
    
    @staticmethod
    def int16_to_float32(data: np.ndarray) -> np.ndarray:
        """Convert int16 to float32 (-1.0 to 1.0)."""
        return data.astype(np.float32) / 32768.0
    
    @staticmethod
    def float32_to_int16(data: np.ndarray) -> np.ndarray:
        """Convert float32 (-1.0 to 1.0) to int16."""
        return (np.clip(data, -1.0, 1.0) * 32767).astype(np.int16)
    
    @staticmethod
    def stereo_to_mono(data: np.ndarray) -> np.ndarray:
        """Convert stereo to mono by averaging channels."""
        if data.ndim == 1:
            return data
        if data.shape[1] == 1:
            return data[:, 0]
        
        # Average channels
        return ((data[:, 0].astype(np.int32) + data[:, 1]) // 2).astype(data.dtype)
    
    @staticmethod
    def mono_to_stereo(data: np.ndarray) -> np.ndarray:
        """Convert mono to stereo by duplicating channel."""
        if data.ndim == 1:
            data = data.reshape(-1, 1)
        if data.shape[1] == 2:
            return data
        
        return np.column_stack([data[:, 0], data[:, 0]])
    
    @staticmethod
    def ulaw_to_linear(data: np.ndarray) -> np.ndarray:
        """
        Convert μ-law encoded data to linear PCM.
        
        G.711 μ-law is common in telephony.
        """
        BIAS = 0x84
        CLIP = 8159
        
        # Ensure uint8
        data = data.astype(np.uint8)
        
        # Invert all bits
        data = ~data
        
        sign = data & 0x80
        exponent = (data >> 4) & 0x07
        mantissa = data & 0x0F
        
        # Decode
        linear = (mantissa << 4) + BIAS
        linear = linear << (exponent - 1) if exponent > 0 else linear >> 1
        
        # Apply sign
        linear = np.where(sign != 0, -linear, linear)
        
        return linear.astype(np.int16)
    
    @staticmethod
    def linear_to_ulaw(data: np.ndarray) -> np.ndarray:
        """
        Convert linear PCM to μ-law encoded data.
        """
        BIAS = 0x84
        CLIP = 8159
        MAX = 0x7F
        
        # Ensure int16
        data = data.astype(np.int16)
        
        # Get sign
        sign = (data < 0).astype(np.uint8) << 7
        
        # Get absolute value with bias
        data = np.abs(data)
        data = np.clip(data, 0, CLIP)
        data = data + BIAS
        
        # Find exponent and mantissa
        exponent = np.floor(np.log2(data)).astype(np.uint8)
        exponent = np.clip(exponent, 0, 7)
        
        mantissa = (data >> (exponent + 3)) & 0x0F
        
        # Combine
        ulaw = sign | (exponent << 4) | mantissa
        
        # Invert
        return (~ulaw & 0xFF).astype(np.uint8)
    
    @staticmethod
    def alaw_to_linear(data: np.ndarray) -> np.ndarray:
        """
        Convert A-law encoded data to linear PCM.
        
        G.711 A-law is common in European telephony.
        """
        # Implementation similar to μ-law
        data = data.astype(np.uint8) ^ 0x55
        
        sign = data & 0x80
        exponent = (data >> 4) & 0x07
        mantissa = data & 0x0F
        
        if exponent == 0:
            linear = (mantissa << 4) + 8
        else:
            linear = ((mantissa << 4) + 0x108) << (exponent - 1)
        
        linear = np.where(sign != 0, -linear, linear)
        
        return linear.astype(np.int16)
    
    @staticmethod
    def linear_to_alaw(data: np.ndarray) -> np.ndarray:
        """Convert linear PCM to A-law encoded data."""
        # Implementation similar to μ-law
        data = data.astype(np.int16)
        
        sign = (data < 0).astype(np.uint8) << 7
        data = np.abs(data)
        
        # Find exponent
        exponent = np.zeros_like(data, dtype=np.uint8)
        for i in range(7, 0, -1):
            mask = data >= (0x100 << (i - 1))
            exponent = np.where(mask & (exponent == 0), i, exponent)
        
        if exponent == 0:
            mantissa = (data >> 4) & 0x0F
        else:
            mantissa = (data >> (exponent + 3)) & 0x0F
        
        alaw = sign | (exponent << 4) | mantissa
        
        return (alaw ^ 0x55).astype(np.uint8)

  1. Codec Management \

7.1 Supported Codecs \

┌─────────────────────────────────────────────────────────────────────────────┐
│                        SUPPORTED AUDIO CODECS                               │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   CODEC    PT    RATE     CH   BITRATE      LATENCY    USE CASE            │
│   ─────────────────────────────────────────────────────────────────────    │
│                                                                             │
│   Opus     111   48000    2    6-510 kbps   2.5-60ms   ◀── Preferred       │
│            ├── Low latency mode: 2.5ms frames                              │
│            ├── Built-in FEC for packet loss                                │
│            ├── VBR adapts to content                                       │
│            └── Excellent for voice & music                                  │
│                                                                             │
│   PCMU     0     8000     1    64 kbps      0.125ms    G.711 μ-law         │
│            ├── Universal telephony support                                  │
│            ├── No compression delay                                        │
│            └── Higher bandwidth but simpler                                │
│                                                                             │
│   PCMA     8     8000     1    64 kbps      0.125ms    G.711 A-law         │
│            ├── European telephony standard                                  │
│            └── Same characteristics as PCMU                                │
│                                                                             │
│   G722     9     8000*    1    64 kbps      1.5ms      Wideband telephony  │
│            ├── *Actually 16kHz audio                                       │
│            └── Better quality than G.711                                   │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

7.2 Codec Handler Implementation \

# bridge/audio/codecs.py

from abc import ABC, abstractmethod
from typing import Optional
import numpy as np
import av

class AudioCodec(ABC):
    """Base class for audio codecs."""
    
    @property
    @abstractmethod
    def name(self) -> str:
        """Codec name."""
        pass
    
    @property
    @abstractmethod
    def sample_rate(self) -> int:
        """Native sample rate."""
        pass
    
    @property
    @abstractmethod
    def channels(self) -> int:
        """Number of channels."""
        pass
    
    @property
    @abstractmethod
    def frame_size(self) -> int:
        """Samples per frame."""
        pass
    
    @abstractmethod
    def encode(self, pcm: np.ndarray) -> bytes:
        """Encode PCM to codec format."""
        pass
    
    @abstractmethod
    def decode(self, data: bytes) -> np.ndarray:
        """Decode codec format to PCM."""
        pass


class OpusCodec(AudioCodec):
    """
    Opus codec implementation using PyAV.
    
    Opus is the preferred codec for WebRTC with
    excellent quality at low bitrates.
    """
    
    def __init__(
        self,
        sample_rate: int = 48000,
        channels: int = 2,
        bitrate: int = 32000,
        frame_duration_ms: int = 20,
    ):
        self._sample_rate = sample_rate
        self._channels = channels
        self._bitrate = bitrate
        self._frame_duration_ms = frame_duration_ms
        self._frame_size = int(sample_rate * frame_duration_ms / 1000)
        
        # Create encoder
        self._encoder = av.CodecContext.create("opus", "w")
        self._encoder.sample_rate = sample_rate
        self._encoder.channels = channels
        self._encoder.bit_rate = bitrate
        self._encoder.format = av.AudioFormat("s16")
        self._encoder.layout = "stereo" if channels == 2 else "mono"
        self._encoder.open()
        
        # Create decoder
        self._decoder = av.CodecContext.create("opus", "r")
        self._decoder.sample_rate = sample_rate
        self._decoder.channels = channels
        self._decoder.open()
    
    @property
    def name(self) -> str:
        return "opus"
    
    @property
    def sample_rate(self) -> int:
        return self._sample_rate
    
    @property
    def channels(self) -> int:
        return self._channels
    
    @property
    def frame_size(self) -> int:
        return self._frame_size
    
    def encode(self, pcm: np.ndarray) -> bytes:
        """Encode PCM to Opus."""
        # Create AudioFrame
        frame = av.AudioFrame(
            format="s16",
            layout="stereo" if self._channels == 2 else "mono",
            samples=pcm.shape[0],
        )
        frame.planes[0].update(pcm.tobytes())
        frame.sample_rate = self._sample_rate
        
        # Encode
        packets = self._encoder.encode(frame)
        
        if packets:
            return bytes(packets[0])
        return b""
    
    def decode(self, data: bytes) -> np.ndarray:
        """Decode Opus to PCM."""
        # Create packet
        packet = av.Packet(data)
        
        # Decode
        frames = self._decoder.decode(packet)
        
        if frames:
            return frames[0].to_ndarray().T.astype(np.int16)
        
        return np.zeros((self._frame_size, self._channels), dtype=np.int16)


class G711Codec(AudioCodec):
    """
    G.711 codec implementation (μ-law and A-law).
    
    G.711 is universal in telephony with zero
    compression delay.
    """
    
    def __init__(
        self,
        law: str = "ulaw",  # "ulaw" or "alaw"
        frame_duration_ms: int = 20,
    ):
        self._law = law
        self._frame_duration_ms = frame_duration_ms
        self._frame_size = int(8000 * frame_duration_ms / 1000)
    
    @property
    def name(self) -> str:
        return f"PCM{self._law[0].upper()}"
    
    @property
    def sample_rate(self) -> int:
        return 8000
    
    @property
    def channels(self) -> int:
        return 1
    
    @property
    def frame_size(self) -> int:
        return self._frame_size
    
    def encode(self, pcm: np.ndarray) -> bytes:
        """Encode PCM to G.711."""
        # Ensure mono
        if pcm.ndim == 2 and pcm.shape[1] > 1:
            pcm = AudioConverter.stereo_to_mono(pcm)
        
        if self._law == "ulaw":
            encoded = AudioConverter.linear_to_ulaw(pcm)
        else:
            encoded = AudioConverter.linear_to_alaw(pcm)
        
        return encoded.tobytes()
    
    def decode(self, data: bytes) -> np.ndarray:
        """Decode G.711 to PCM."""
        encoded = np.frombuffer(data, dtype=np.uint8)
        
        if self._law == "ulaw":
            return AudioConverter.ulaw_to_linear(encoded)
        else:
            return AudioConverter.alaw_to_linear(encoded)


class CodecManager:
    """
    Manages codec instances and selection.
    
    Provides codec negotiation and transcoding
    between different codecs.
    """
    
    def __init__(self):
        self._codecs: dict[str, AudioCodec] = {}
        
        # Pre-create common codecs
        self._codecs["opus"] = OpusCodec()
        self._codecs["PCMU"] = G711Codec(law="ulaw")
        self._codecs["PCMA"] = G711Codec(law="alaw")
    
    def get_codec(self, name: str) -> Optional[AudioCodec]:
        """Get a codec by name."""
        return self._codecs.get(name.upper()) or self._codecs.get(name.lower())
    
    def transcode(
        self,
        data: bytes,
        from_codec: str,
        to_codec: str,
    ) -> bytes:
        """
        Transcode audio between codecs.
        
        Args:
            data: Encoded audio data
            from_codec: Source codec name
            to_codec: Target codec name
        
        Returns:
            Transcoded audio data
        """
        if from_codec == to_codec:
            return data
        
        source = self.get_codec(from_codec)
        target = self.get_codec(to_codec)
        
        if not source or not target:
            raise ValueError(f"Unknown codec: {from_codec} or {to_codec}")
        
        # Decode to PCM
        pcm = source.decode(data)
        
        # Resample if needed
        if source.sample_rate != target.sample_rate:
            resampler = AudioResampler(
                input_rate=source.sample_rate,
                output_rate=target.sample_rate,
                input_channels=source.channels,
                output_channels=target.channels,
            )
            pcm = resampler.resample_numpy(pcm)
        
        # Encode to target
        return target.encode(pcm)
    
    def decode_to_pcm(
        self,
        data: bytes,
        codec_name: str,
        target_rate: int = 16000,
    ) -> np.ndarray:
        """
        Decode and resample to target PCM format.
        
        This is the common path for incoming audio
        destined for the voice pipeline.
        """
        codec = self.get_codec(codec_name)
        if not codec:
            raise ValueError(f"Unknown codec: {codec_name}")
        
        # Decode
        pcm = codec.decode(data)
        
        # Resample if needed
        if codec.sample_rate != target_rate:
            resampler = AudioResampler(
                input_rate=codec.sample_rate,
                output_rate=target_rate,
                input_channels=codec.channels,
                output_channels=1,  # Always mono for STT
            )
            pcm = resampler.resample_numpy(pcm)
        
        # Ensure mono
        if pcm.ndim == 2 and pcm.shape[1] > 1:
            pcm = AudioConverter.stereo_to_mono(pcm)
        
        return pcm
    
    def encode_from_pcm(
        self,
        pcm: np.ndarray,
        source_rate: int,
        codec_name: str,
    ) -> bytes:
        """
        Encode PCM to target codec format.
        
        This is the common path for outgoing audio
        from the voice pipeline.
        """
        codec = self.get_codec(codec_name)
        if not codec:
            raise ValueError(f"Unknown codec: {codec_name}")
        
        # Resample if needed
        if source_rate != codec.sample_rate:
            resampler = AudioResampler(
                input_rate=source_rate,
                output_rate=codec.sample_rate,
                input_channels=1,
                output_channels=codec.channels,
            )
            pcm = resampler.resample_numpy(pcm)
        
        # Convert to stereo if needed
        if codec.channels == 2 and (pcm.ndim == 1 or pcm.shape[1] == 1):
            pcm = AudioConverter.mono_to_stereo(pcm)
        
        return codec.encode(pcm)

7.3 Codec Negotiation Strategy \

┌─────────────────────────────────────────────────────────────────────────────┐
│                     CODEC NEGOTIATION STRATEGY                              │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   PREFERENCE ORDER:                                                         │
│   ─────────────────                                                         │
│                                                                             │
│   1. Opus/48000/2   - Best quality, lowest bandwidth                       │
│   2. Opus/48000/1   - Opus mono                                            │
│   3. PCMU/8000/1    - Universal fallback                                   │
│   4. PCMA/8000/1    - European fallback                                    │
│                                                                             │
│   SELECTION LOGIC:                                                          │
│   ─────────────────                                                         │
│                                                                             │
│   def select_codec(offered_codecs):                                        │
│       for preferred in PREFERENCE_ORDER:                                   │
│           if preferred in offered_codecs:                                  │
│               return preferred                                              │
│       raise NoCompatibleCodec()                                            │
│                                                                             │
│   FALLBACK SCENARIOS:                                                       │
│   ───────────────────                                                       │
│                                                                             │
│   Scenario 1: GoTo offers Opus + G.711                                     │
│   → Select Opus (best quality)                                             │
│                                                                             │
│   Scenario 2: GoTo offers only G.711                                       │
│   → Select PCMU (universal)                                                │
│                                                                             │
│   Scenario 3: GoTo offers unknown codec                                    │
│   → Reject call or request re-offer with known codecs                      │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

7.4 Packet Loss Concealment \

# bridge/audio/plc.py

import numpy as np
from typing import Optional

class PacketLossConcealer:
    """
    Conceal packet loss in audio streams.
    
    When packets are lost, generate replacement
    audio based on previous packets.
    """
    
    def __init__(
        self,
        sample_rate: int = 48000,
        frame_size: int = 960,
        max_consecutive_loss: int = 5,
    ):
        self.sample_rate = sample_rate
        self.frame_size = frame_size
        self.max_consecutive_loss = max_consecutive_loss
        
        # Store last good frame
        self._last_frame: Optional[np.ndarray] = None
        self._consecutive_lost = 0
    
    def process(
        self,
        frame: Optional[np.ndarray],
        is_lost: bool = False,
    ) -> np.ndarray:
        """
        Process a frame, concealing if lost.
        
        Args:
            frame: Audio frame or None if lost
            is_lost: Whether this frame was lost
        
        Returns:
            Original or concealed frame
        """
        if is_lost or frame is None:
            return self._conceal()
        
        self._last_frame = frame.copy()
        self._consecutive_lost = 0
        return frame
    
    def _conceal(self) -> np.ndarray:
        """Generate concealment audio."""
        self._consecutive_lost += 1
        
        if self._last_frame is None:
            # No reference - return silence
            return np.zeros(self.frame_size, dtype=np.int16)
        
        if self._consecutive_lost > self.max_consecutive_loss:
            # Too many lost - fade to silence
            fade_factor = max(0, 1 - (self._consecutive_lost - self.max_consecutive_loss) * 0.2)
            return (self._last_frame * fade_factor).astype(np.int16)
        
        # Repeat last frame with slight attenuation
        attenuation = 0.9 ** self._consecutive_lost
        return (self._last_frame * attenuation).astype(np.int16)
    
    def reset(self) -> None:
        """Reset concealer state."""
        self._last_frame = None
        self._consecutive_lost = 0

  1. GoToConnect Integration \

8.1 GoTo WebRTC Flow \

┌─────────────────────────────────────────────────────────────────────────────┐
│                      GOTOCONNECT WEBRTC FLOW                                │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   INBOUND CALL:                                                             │
│   ─────────────                                                             │
│                                                                             │
│   1. Caller dials phone number                                             │
│   2. GoTo receives call on DID                                             │
│   3. GoTo sends webhook: call.ringing                                      │
│   4. Bridge answers via API: POST /calls/{id}/answer                       │
│   5. GoTo returns SDP offer in response                                    │
│   6. Bridge creates aiortc PeerConnection                                  │
│   7. Bridge sets remote description (offer)                                │
│   8. Bridge creates answer                                                  │
│   9. Bridge sends answer via API                                           │
│   10. ICE candidates exchanged via events                                  │
│   11. DTLS handshake completes                                             │
│   12. SRTP media flows                                                     │
│                                                                             │
│   OUTBOUND CALL:                                                            │
│   ──────────────                                                            │
│                                                                             │
│   1. Bridge creates aiortc PeerConnection                                  │
│   2. Bridge adds local audio track                                         │
│   3. Bridge creates SDP offer                                              │
│   4. Bridge initiates call: POST /calls with offer                         │
│   5. GoTo processes call, returns answer                                   │
│   6. Bridge sets remote description (answer)                               │
│   7. ICE candidates exchanged                                              │
│   8. Media flows when callee answers                                       │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

8.2 GoTo Connection Handler \

# bridge/goto/connection_handler.py

import asyncio
from dataclasses import dataclass
from typing import Optional, Callable
from enum import Enum

from bridge.webrtc.connection import WebRTCConnection, WebRTCConfig
from bridge.webrtc.sdp_negotiator import SDPNegotiator, ParsedSDP
from bridge.webrtc.ice_manager import ICEManager
from bridge.webrtc.tracks import AudioTrackSource, AudioTrackSink
from bridge.audio.codecs import CodecManager

class GoToCallState(Enum):
    INITIALIZING = "initializing"
    OFFERING = "offering"
    ANSWERING = "answering"
    CONNECTING = "connecting"
    CONNECTED = "connected"
    DISCONNECTED = "disconnected"
    FAILED = "failed"
    ENDED = "ended"

@dataclass
class GoToCallInfo:
    """Information about a GoToConnect call."""
    call_id: str
    external_call_id: str
    direction: str  # inbound, outbound
    caller_number: str
    callee_number: str
    line_id: str
    started_at: float

class GoToConnectionHandler:
    """
    Manages WebRTC connection to GoToConnect.
    
    Handles the complete lifecycle of a call including
    signaling, media setup, and teardown.
    """
    
    def __init__(
        self,
        call_info: GoToCallInfo,
        goto_client: "GoToCallControlClient",
        event_listener: "GoToEventListener",
        webrtc_config: WebRTCConfig = None,
    ):
        self.call_info = call_info
        self.goto_client = goto_client
        self.event_listener = event_listener
        
        # WebRTC components
        self._webrtc = WebRTCConnection(
            config=webrtc_config,
            call_id=call_info.call_id,
        )
        self._sdp_negotiator = SDPNegotiator()
        self._ice_manager = ICEManager(
            call_id=call_info.call_id,
            goto_client=goto_client,
        )
        self._codec_manager = CodecManager()
        
        # Audio tracks
        self._local_audio: Optional[AudioTrackSource] = None
        self._remote_audio: Optional[AudioTrackSink] = None
        
        # State
        self._state = GoToCallState.INITIALIZING
        self._negotiated_codec: Optional[str] = None
        self._parsed_remote_sdp: Optional[ParsedSDP] = None
        
        # Callbacks
        self.on_state_change: Optional[Callable] = None
        self.on_audio_frame: Optional[Callable] = None
        self.on_connected: Optional[Callable] = None
        self.on_disconnected: Optional[Callable] = None
    
    async def initialize(self) -> None:
        """Initialize the connection handler."""
        await self._webrtc.initialize()
        
        # Set up WebRTC event handlers
        self._webrtc.on_track = self._handle_remote_track
        self._webrtc.on_ice_candidate = self._handle_local_ice_candidate
        self._webrtc.on_connection_state_change = self._handle_connection_state
        
        # Set up ICE manager handlers
        self._ice_manager.on_connection_state_change = self._handle_ice_state
        
        # Subscribe to GoTo events for this call
        await self.event_listener.subscribe_call_events(
            call_id=self.call_info.external_call_id,
            on_ice_candidate=self._handle_remote_ice_candidate,
        )
        
        # Create local audio track
        self._local_audio = AudioTrackSource(
            sample_rate=48000,
            channels=1,
            samples_per_frame=960,
        )
        await self._webrtc.add_track(self._local_audio)
        
        self._set_state(GoToCallState.INITIALIZING)
        logger.info(f"[{self.call_info.call_id}] GoTo connection initialized")
    
    async def handle_inbound_call(self, sdp_offer: str) -> str:
        """
        Handle an inbound call from GoToConnect.
        
        Args:
            sdp_offer: SDP offer from GoToConnect
        
        Returns:
            SDP answer to send back
        """
        self._set_state(GoToCallState.ANSWERING)
        
        # Parse the offer
        self._parsed_remote_sdp = self._sdp_negotiator.parse_sdp(sdp_offer)
        
        # Find audio codecs
        audio_media = next(
            (m for m in self._parsed_remote_sdp.media if m.media_type == "audio"),
            None,
        )
        
        if not audio_media:
            raise ValueError("No audio in SDP offer")
        
        # Negotiate codec
        negotiated = self._sdp_negotiator.negotiate_codecs(audio_media.codecs)
        if not negotiated:
            raise ValueError("No compatible audio codecs")
        
        self._negotiated_codec = negotiated[0].name
        logger.info(f"[{self.call_info.call_id}] Negotiated codec: {self._negotiated_codec}")
        
        # Set remote description
        await self._webrtc.set_remote_description(sdp_offer, "offer")
        
        # Create answer
        answer_sdp = await self._webrtc.create_answer()
        
        self._set_state(GoToCallState.CONNECTING)
        
        return answer_sdp
    
    async def initiate_outbound_call(
        self,
        target: str,
    ) -> None:
        """
        Initiate an outbound call via GoToConnect.
        
        Args:
            target: Dial string (phone number or extension)
        """
        self._set_state(GoToCallState.OFFERING)
        
        # Create offer
        offer_sdp = await self._webrtc.create_offer()
        
        # Initiate call via GoTo API
        response = await self.goto_client.create_call(
            line_id=self.call_info.line_id,
            dial_string=f"tel:{target}",
            sdp_offer=offer_sdp,
        )
        
        # Update call info with external ID
        self.call_info.external_call_id = response["callId"]
        
        # Handle answer
        answer_sdp = response.get("sdp")
        if answer_sdp:
            await self._handle_answer(answer_sdp)
    
    async def _handle_answer(self, sdp_answer: str) -> None:
        """Handle SDP answer from GoToConnect."""
        self._parsed_remote_sdp = self._sdp_negotiator.parse_sdp(sdp_answer)
        
        # Find negotiated codec
        audio_media = next(
            (m for m in self._parsed_remote_sdp.media if m.media_type == "audio"),
            None,
        )
        
        if audio_media and audio_media.codecs:
            self._negotiated_codec = audio_media.codecs[0].name
        
        await self._webrtc.set_remote_description(sdp_answer, "answer")
        
        self._set_state(GoToCallState.CONNECTING)
    
    async def _handle_remote_track(self, track: "MediaStreamTrack") -> None:
        """Handle incoming remote audio track."""
        if track.kind != "audio":
            return
        
        logger.info(f"[{self.call_info.call_id}] Remote audio track received")
        
        # Wrap track in sink
        self._remote_audio = AudioTrackSink(
            track=track,
            on_frame=self._handle_audio_frame,
        )
        
        # Start receiving frames
        asyncio.create_task(self._receive_audio_loop())
    
    async def _receive_audio_loop(self) -> None:
        """Continuously receive audio frames from remote track."""
        while self._state in (GoToCallState.CONNECTING, GoToCallState.CONNECTED):
            try:
                frame = await self._remote_audio.recv()
                
                if self.on_audio_frame:
                    # Convert frame to numpy and decode if needed
                    await self.on_audio_frame(frame)
                    
            except Exception as e:
                if "Connection" in str(e):
                    break
                logger.error(f"[{self.call_info.call_id}] Error receiving audio: {e}")
    
    async def _handle_audio_frame(self, frame: "AudioFrame") -> None:
        """Process incoming audio frame."""
        if self.on_audio_frame:
            await self.on_audio_frame(frame)
    
    async def _handle_local_ice_candidate(
        self,
        candidate: "RTCIceCandidate",
    ) -> None:
        """Handle locally gathered ICE candidate."""
        await self._ice_manager.handle_local_candidate(candidate)
    
    async def _handle_remote_ice_candidate(
        self,
        candidate_data: dict,
    ) -> None:
        """Handle remote ICE candidate from GoToConnect."""
        await self._ice_manager.handle_remote_candidate(
            candidate_data,
            self._webrtc._pc,
        )
    
    async def _handle_connection_state(self, state: str) -> None:
        """Handle WebRTC connection state change."""
        if state == "connected":
            self._set_state(GoToCallState.CONNECTED)
            if self.on_connected:
                await self.on_connected()
        elif state == "disconnected":
            self._set_state(GoToCallState.DISCONNECTED)
        elif state == "failed":
            self._set_state(GoToCallState.FAILED)
            if self.on_disconnected:
                await self.on_disconnected()
        elif state == "closed":
            self._set_state(GoToCallState.ENDED)
    
    async def _handle_ice_state(self, state: "ICEConnectionState") -> None:
        """Handle ICE connection state change."""
        self._ice_manager.update_connection_state(state.value)
    
    async def send_audio(self, audio_data: "np.ndarray") -> None:
        """
        Send audio to GoToConnect.
        
        Args:
            audio_data: PCM audio samples to send
        """
        if self._local_audio and self._state == GoToCallState.CONNECTED:
            await self._local_audio.push_audio(audio_data)
    
    async def hold(self) -> None:
        """Put the call on hold."""
        await self.goto_client.hold_call(self.call_info.external_call_id)
    
    async def resume(self) -> None:
        """Resume the call from hold."""
        await self.goto_client.resume_call(self.call_info.external_call_id)
    
    async def hangup(self) -> None:
        """End the call."""
        self._set_state(GoToCallState.ENDED)
        
        try:
            await self.goto_client.hangup_call(self.call_info.external_call_id)
        except Exception as e:
            logger.warning(f"[{self.call_info.call_id}] Hangup error: {e}")
        
        await self.close()
    
    async def close(self) -> None:
        """Close the connection and clean up resources."""
        # Stop audio tracks
        if self._local_audio:
            self._local_audio.stop()
        if self._remote_audio:
            self._remote_audio.stop()
        
        # Close WebRTC connection
        await self._webrtc.close()
        
        # Unsubscribe from events
        await self.event_listener.unsubscribe_call_events(
            self.call_info.external_call_id
        )
        
        logger.info(f"[{self.call_info.call_id}] GoTo connection closed")
    
    def _set_state(self, state: GoToCallState) -> None:
        """Update call state."""
        old_state = self._state
        self._state = state
        
        logger.info(
            f"[{self.call_info.call_id}] State: {old_state.value}{state.value}"
        )
        
        if self.on_state_change:
            asyncio.create_task(self.on_state_change(old_state, state))
    
    @property
    def state(self) -> GoToCallState:
        """Current call state."""
        return self._state
    
    @property
    def is_connected(self) -> bool:
        """Whether call is connected."""
        return self._state == GoToCallState.CONNECTED
    
    @property
    def negotiated_codec(self) -> Optional[str]:
        """The negotiated audio codec."""
        return self._negotiated_codec

  1. LiveKit Integration \

9.1 LiveKit Overview \

LiveKit is an open-source WebRTC SFU (Selective Forwarding Unit) that provides:
  • Scalable real-time audio/video
  • Room-based architecture
  • Server-side SDKs
  • Low-latency routing

9.2 LiveKit Room Architecture \

┌─────────────────────────────────────────────────────────────────────────────┐
│                      LIVEKIT ROOM ARCHITECTURE                              │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   ┌─────────────────────────────────────────────────────────────────────┐   │
│   │                        LiveKit Server                                │   │
│   │                                                                      │   │
│   │   ┌──────────────────────────────────────────────────────────────┐  │   │
│   │   │                    Room: call_{call_id}                       │  │   │
│   │   │                                                               │  │   │
│   │   │   PARTICIPANTS:                                               │  │   │
│   │   │                                                               │  │   │
│   │   │   ┌─────────────────┐       ┌─────────────────┐              │  │   │
│   │   │   │  bridge_{id}    │       │  agent_{id}     │              │  │   │
│   │   │   │                 │       │                 │              │  │   │
│   │   │   │  Tracks:        │       │  Tracks:        │              │  │   │
│   │   │   │  • caller_audio │◀─────▶│  • agent_audio  │              │  │   │
│   │   │   │    (publish)    │       │    (publish)    │              │  │   │
│   │   │   │                 │       │                 │              │  │   │
│   │   │   │  Subscriptions: │       │  Subscriptions: │              │  │   │
│   │   │   │  • agent_audio  │       │  • caller_audio │              │  │   │
│   │   │   │                 │       │                 │              │  │   │
│   │   │   └─────────────────┘       └─────────────────┘              │  │   │
│   │   │                                                               │  │   │
│   │   └──────────────────────────────────────────────────────────────┘  │   │
│   │                                                                      │   │
│   └──────────────────────────────────────────────────────────────────────┘   │
│                                                                             │
│   AUDIO FLOW:                                                               │
│                                                                             │
│   Caller → GoTo → Bridge → [caller_audio] → Agent Worker                   │
│                                                                             │
│   Agent Worker → [agent_audio] → Bridge → GoTo → Caller                    │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

9.3 LiveKit Connection Handler \

# bridge/livekit/connection_handler.py

import asyncio
from dataclasses import dataclass
from typing import Optional, Callable, AsyncIterator
from livekit import rtc, api
import numpy as np

@dataclass
class LiveKitConfig:
    """Configuration for LiveKit connection."""
    url: str
    api_key: str
    api_secret: str
    room_prefix: str = "call_"

@dataclass
class LiveKitRoomInfo:
    """Information about a LiveKit room."""
    room_name: str
    participant_identity: str
    participant_name: str

class LiveKitConnectionHandler:
    """
    Manages connection to LiveKit for voice pipeline integration.
    
    Handles room creation, audio publishing, and subscribing
    to agent audio.
    """
    
    def __init__(
        self,
        config: LiveKitConfig,
        call_id: str,
    ):
        self.config = config
        self.call_id = call_id
        
        # Room info
        self.room_info = LiveKitRoomInfo(
            room_name=f"{config.room_prefix}{call_id}",
            participant_identity=f"bridge_{call_id}",
            participant_name="WebRTC Bridge",
        )
        
        # LiveKit components
        self._room: Optional[rtc.Room] = None
        self._api = api.LiveKitAPI(
            url=config.url,
            api_key=config.api_key,
            api_secret=config.api_secret,
        )
        
        # Audio tracks
        self._local_source: Optional[rtc.AudioSource] = None
        self._local_track: Optional[rtc.LocalAudioTrack] = None
        self._remote_tracks: dict[str, rtc.RemoteAudioTrack] = {}
        
        # Callbacks
        self.on_agent_audio: Optional[Callable] = None
        self.on_connected: Optional[Callable] = None
        self.on_disconnected: Optional[Callable] = None
        
        # State
        self._connected = False
        self._audio_task: Optional[asyncio.Task] = None
    
    async def connect(self) -> None:
        """Connect to LiveKit and join the room."""
        # Create room if it doesn't exist
        try:
            await self._api.room.create_room(
                api.CreateRoomRequest(name=self.room_info.room_name)
            )
        except Exception:
            # Room may already exist
            pass
        
        # Generate access token
        token = api.AccessToken(
            self.config.api_key,
            self.config.api_secret,
        )
        token.with_identity(self.room_info.participant_identity)
        token.with_name(self.room_info.participant_name)
        token.with_grants(api.VideoGrants(
            room_join=True,
            room=self.room_info.room_name,
            can_publish=True,
            can_subscribe=True,
        ))
        
        # Create room
        self._room = rtc.Room()
        
        # Set up event handlers
        self._room.on("participant_connected", self._on_participant_connected)
        self._room.on("participant_disconnected", self._on_participant_disconnected)
        self._room.on("track_subscribed", self._on_track_subscribed)
        self._room.on("track_unsubscribed", self._on_track_unsubscribed)
        self._room.on("disconnected", self._on_disconnected)
        
        # Connect to room
        await self._room.connect(
            self.config.url,
            token.to_jwt(),
        )
        
        self._connected = True
        logger.info(f"[{self.call_id}] Connected to LiveKit room: {self.room_info.room_name}")
        
        # Create and publish local audio track
        await self._setup_local_audio()
        
        if self.on_connected:
            await self.on_connected()
    
    async def _setup_local_audio(self) -> None:
        """Set up local audio track for publishing caller audio."""
        # Create audio source
        self._local_source = rtc.AudioSource(
            sample_rate=48000,
            num_channels=1,
        )
        
        # Create track from source
        self._local_track = rtc.LocalAudioTrack.create_audio_track(
            "caller_audio",
            self._local_source,
        )
        
        # Publish track
        options = rtc.TrackPublishOptions(
            source=rtc.TrackSource.SOURCE_MICROPHONE,
        )
        
        await self._room.local_participant.publish_track(
            self._local_track,
            options,
        )
        
        logger.info(f"[{self.call_id}] Published caller audio track")
    
    async def publish_audio(self, audio_data: np.ndarray) -> None:
        """
        Publish audio data to LiveKit.
        
        Args:
            audio_data: PCM audio samples (int16, 48kHz, mono)
        """
        if not self._local_source or not self._connected:
            return
        
        # Create audio frame
        frame = rtc.AudioFrame(
            data=audio_data.tobytes(),
            sample_rate=48000,
            num_channels=1,
            samples_per_channel=len(audio_data),
        )
        
        # Capture frame to source
        await self._local_source.capture_frame(frame)
    
    async def _on_participant_connected(
        self,
        participant: rtc.RemoteParticipant,
    ) -> None:
        """Handle new participant joining."""
        logger.info(
            f"[{self.call_id}] Participant connected: {participant.identity}"
        )
    
    async def _on_participant_disconnected(
        self,
        participant: rtc.RemoteParticipant,
    ) -> None:
        """Handle participant leaving."""
        logger.info(
            f"[{self.call_id}] Participant disconnected: {participant.identity}"
        )
    
    async def _on_track_subscribed(
        self,
        track: rtc.Track,
        publication: rtc.RemoteTrackPublication,
        participant: rtc.RemoteParticipant,
    ) -> None:
        """Handle subscribing to a remote track."""
        if track.kind != rtc.TrackKind.KIND_AUDIO:
            return
        
        logger.info(
            f"[{self.call_id}] Subscribed to audio track from {participant.identity}"
        )
        
        # Store track
        self._remote_tracks[participant.identity] = track
        
        # Start receiving audio
        if self.on_agent_audio:
            self._audio_task = asyncio.create_task(
                self._receive_audio(track, participant.identity)
            )
    
    async def _receive_audio(
        self,
        track: rtc.RemoteAudioTrack,
        participant_id: str,
    ) -> None:
        """Receive audio frames from a track."""
        audio_stream = rtc.AudioStream(track)
        
        async for frame_event in audio_stream:
            frame = frame_event.frame
            
            if self.on_agent_audio:
                # Convert to numpy
                audio_data = np.frombuffer(
                    frame.data,
                    dtype=np.int16,
                )
                
                await self.on_agent_audio(audio_data, participant_id)
    
    async def _on_track_unsubscribed(
        self,
        track: rtc.Track,
        publication: rtc.RemoteTrackPublication,
        participant: rtc.RemoteParticipant,
    ) -> None:
        """Handle unsubscribing from a remote track."""
        if participant.identity in self._remote_tracks:
            del self._remote_tracks[participant.identity]
    
    async def _on_disconnected(self) -> None:
        """Handle disconnection from room."""
        self._connected = False
        logger.warning(f"[{self.call_id}] Disconnected from LiveKit")
        
        if self.on_disconnected:
            await self.on_disconnected()
    
    async def disconnect(self) -> None:
        """Disconnect from LiveKit room."""
        if self._audio_task:
            self._audio_task.cancel()
            try:
                await self._audio_task
            except asyncio.CancelledError:
                pass
        
        if self._room:
            await self._room.disconnect()
            self._room = None
        
        self._connected = False
        logger.info(f"[{self.call_id}] Disconnected from LiveKit")
    
    async def delete_room(self) -> None:
        """Delete the LiveKit room after call ends."""
        try:
            await self._api.room.delete_room(
                api.DeleteRoomRequest(room=self.room_info.room_name)
            )
            logger.info(f"[{self.call_id}] Deleted LiveKit room")
        except Exception as e:
            logger.warning(f"[{self.call_id}] Failed to delete room: {e}")
    
    @property
    def is_connected(self) -> bool:
        """Whether connected to LiveKit."""
        return self._connected


class LiveKitTokenGenerator:
    """
    Generate LiveKit access tokens.
    
    Provides tokens for different participant types.
    """
    
    def __init__(self, api_key: str, api_secret: str):
        self.api_key = api_key
        self.api_secret = api_secret
    
    def generate_bridge_token(
        self,
        room_name: str,
        call_id: str,
    ) -> str:
        """Generate token for WebRTC bridge."""
        token = api.AccessToken(self.api_key, self.api_secret)
        token.with_identity(f"bridge_{call_id}")
        token.with_name("WebRTC Bridge")
        token.with_grants(api.VideoGrants(
            room_join=True,
            room=room_name,
            can_publish=True,
            can_subscribe=True,
        ))
        return token.to_jwt()
    
    def generate_agent_token(
        self,
        room_name: str,
        agent_id: str,
    ) -> str:
        """Generate token for agent worker."""
        token = api.AccessToken(self.api_key, self.api_secret)
        token.with_identity(f"agent_{agent_id}")
        token.with_name("AI Agent")
        token.with_grants(api.VideoGrants(
            room_join=True,
            room=room_name,
            can_publish=True,
            can_subscribe=True,
        ))
        return token.to_jwt()

  1. Bidirectional Bridge \

10.1 Bridge Architecture \

┌─────────────────────────────────────────────────────────────────────────────┐
│                    BIDIRECTIONAL BRIDGE ARCHITECTURE                        │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│                          ┌───────────────────┐                              │
│                          │   AudioBridge     │                              │
│                          │                   │                              │
│   ┌──────────────────────┤   ┌───────────┐   ├──────────────────────┐      │
│   │                      │   │  Router   │   │                      │      │
│   │  GoToConnection      │   └─────┬─────┘   │   LiveKitConnection  │      │
│   │                      │         │         │                      │      │
│   │  ┌───────────────┐   │   ┌─────┴─────┐   │   ┌───────────────┐  │      │
│   │  │ Remote Track  │───┼──▶│  Inbound  │───┼──▶│ Audio Source  │  │      │
│   │  │ (from caller) │   │   │  Pipeline │   │   │ (to LiveKit)  │  │      │
│   │  └───────────────┘   │   └───────────┘   │   └───────────────┘  │      │
│   │                      │                   │                      │      │
│   │  ┌───────────────┐   │   ┌───────────┐   │   ┌───────────────┐  │      │
│   │  │ Local Track   │◀──┼───│  Outbound │◀──┼───│ Audio Sink    │  │      │
│   │  │ (to caller)   │   │   │  Pipeline │   │   │ (from Agent)  │  │      │
│   │  └───────────────┘   │   └───────────┘   │   └───────────────┘  │      │
│   │                      │                   │                      │      │
│   └──────────────────────┤                   ├──────────────────────┘      │
│                          │                   │                              │
│                          └───────────────────┘                              │
│                                                                             │
│   INBOUND PIPELINE (Caller → Agent):                                       │
│   ──────────────────────────────────                                        │
│   1. Receive RTP from GoTo                                                 │
│   2. Decode (Opus/G.711 → PCM)                                            │
│   3. Resample (48kHz → 16kHz for STT, 48kHz for LiveKit)                  │
│   4. Fork: Send to LiveKit AND to STT                                      │
│                                                                             │
│   OUTBOUND PIPELINE (Agent → Caller):                                      │
│   ───────────────────────────────────                                       │
│   1. Receive from LiveKit (TTS output)                                     │
│   2. Resample (24kHz → 48kHz)                                              │
│   3. Encode (PCM → negotiated codec)                                       │
│   4. Send RTP to GoTo                                                      │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

10.2 Audio Bridge Implementation \

# bridge/audio_bridge.py

import asyncio
from dataclasses import dataclass
from typing import Optional, Callable
import numpy as np

from bridge.goto.connection_handler import GoToConnectionHandler, GoToCallInfo
from bridge.livekit.connection_handler import LiveKitConnectionHandler, LiveKitConfig
from bridge.audio.resampler import MultiRateResampler
from bridge.audio.codecs import CodecManager
from bridge.audio.buffer import AudioBuffer

@dataclass
class BridgeConfig:
    """Configuration for the audio bridge."""
    # Sample rates
    goto_sample_rate: int = 48000
    livekit_sample_rate: int = 48000
    stt_sample_rate: int = 16000
    tts_sample_rate: int = 24000
    
    # Buffer sizes
    inbound_buffer_ms: int = 100
    outbound_buffer_ms: int = 100
    
    # Frame sizes
    frame_duration_ms: int = 20

class AudioBridge:
    """
    Bridges audio between GoToConnect and LiveKit.
    
    Handles all audio routing, format conversion,
    and synchronization between the two endpoints.
    """
    
    def __init__(
        self,
        call_id: str,
        call_info: GoToCallInfo,
        goto_client: "GoToCallControlClient",
        event_listener: "GoToEventListener",
        livekit_config: LiveKitConfig,
        bridge_config: BridgeConfig = None,
    ):
        self.call_id = call_id
        self.config = bridge_config or BridgeConfig()
        
        # Connection handlers
        self._goto = GoToConnectionHandler(
            call_info=call_info,
            goto_client=goto_client,
            event_listener=event_listener,
        )
        self._livekit = LiveKitConnectionHandler(
            config=livekit_config,
            call_id=call_id,
        )
        
        # Audio processing
        self._resampler = MultiRateResampler()
        self._codec_manager = CodecManager()
        
        # Buffers
        self._inbound_buffer = AudioBuffer(
            max_duration_ms=self.config.inbound_buffer_ms,
            sample_rate=self.config.stt_sample_rate,
        )
        self._outbound_buffer = AudioBuffer(
            max_duration_ms=self.config.outbound_buffer_ms,
            sample_rate=self.config.goto_sample_rate,
        )
        
        # Callbacks for voice pipeline
        self.on_caller_audio: Optional[Callable] = None  # For STT
        
        # State
        self._running = False
        self._outbound_task: Optional[asyncio.Task] = None
    
    async def initialize(self) -> None:
        """Initialize both connections."""
        # Initialize GoTo connection
        await self._goto.initialize()
        self._goto.on_audio_frame = self._handle_goto_audio
        self._goto.on_connected = self._on_goto_connected
        
        # Connect to LiveKit
        await self._livekit.connect()
        self._livekit.on_agent_audio = self._handle_agent_audio
        
        logger.info(f"[{self.call_id}] Audio bridge initialized")
    
    async def handle_inbound_call(self, sdp_offer: str) -> str:
        """
        Handle an inbound call.
        
        Args:
            sdp_offer: SDP offer from GoToConnect
        
        Returns:
            SDP answer
        """
        return await self._goto.handle_inbound_call(sdp_offer)
    
    async def start(self) -> None:
        """Start audio bridging."""
        self._running = True
        
        # Start outbound audio task
        self._outbound_task = asyncio.create_task(
            self._outbound_audio_loop()
        )
        
        logger.info(f"[{self.call_id}] Audio bridge started")
    
    async def stop(self) -> None:
        """Stop audio bridging."""
        self._running = False
        
        if self._outbound_task:
            self._outbound_task.cancel()
            try:
                await self._outbound_task
            except asyncio.CancelledError:
                pass
        
        # Disconnect both ends
        await self._goto.close()
        await self._livekit.disconnect()
        await self._livekit.delete_room()
        
        logger.info(f"[{self.call_id}] Audio bridge stopped")
    
    async def _on_goto_connected(self) -> None:
        """Called when GoTo connection is established."""
        await self.start()
    
    async def _handle_goto_audio(self, frame: "AudioFrame") -> None:
        """
        Handle audio from GoToConnect (caller).
        
        Routes to:
        1. LiveKit (for agent to hear)
        2. Voice pipeline (for STT)
        """
        if not self._running:
            return
        
        # Convert frame to numpy
        from bridge.audio.frames import AudioFrameProcessor
        pcm_48k = AudioFrameProcessor.frame_to_numpy(frame)
        
        # Ensure mono
        if pcm_48k.ndim == 2 and pcm_48k.shape[1] > 1:
            from bridge.audio.converter import AudioConverter
            pcm_48k = AudioConverter.stereo_to_mono(pcm_48k)
        
        # Route to LiveKit (48kHz)
        await self._livekit.publish_audio(pcm_48k)
        
        # Resample for STT (16kHz)
        pcm_16k = self._resampler.resample(
            pcm_48k,
            input_rate=self.config.goto_sample_rate,
            output_rate=self.config.stt_sample_rate,
        )
        
        # Send to voice pipeline
        if self.on_caller_audio:
            await self.on_caller_audio(pcm_16k)
    
    async def _handle_agent_audio(
        self,
        audio_data: np.ndarray,
        participant_id: str,
    ) -> None:
        """
        Handle audio from agent (LiveKit).
        
        Routes to GoToConnect (to caller).
        """
        if not self._running:
            return
        
        # Audio arrives at 48kHz from LiveKit
        # May need to resample if TTS outputs different rate
        
        # Buffer for smooth playback
        await self._outbound_buffer.write(audio_data.reshape(-1, 1))
    
    async def _outbound_audio_loop(self) -> None:
        """
        Send buffered audio to GoToConnect.
        
        Runs continuously to ensure smooth audio delivery.
        """
        frame_samples = int(
            self.config.goto_sample_rate * 
            self.config.frame_duration_ms / 1000
        )
        frame_interval = self.config.frame_duration_ms / 1000
        
        while self._running:
            try:
                # Read frame from buffer
                audio = await self._outbound_buffer.read(frame_samples)
                
                # Ensure correct shape
                if audio.ndim == 2:
                    audio = audio[:, 0]
                
                # Send to GoTo
                await self._goto.send_audio(audio)
                
                # Pace ourselves
                await asyncio.sleep(frame_interval * 0.9)
                
            except asyncio.CancelledError:
                break
            except Exception as e:
                logger.error(f"[{self.call_id}] Outbound audio error: {e}")
                await asyncio.sleep(0.01)
    
    async def send_tts_audio(self, audio_data: np.ndarray, sample_rate: int) -> None:
        """
        Send TTS audio to the caller.
        
        This is called by the voice pipeline when TTS
        generates audio to be played to the caller.
        
        Args:
            audio_data: PCM audio from TTS
            sample_rate: Sample rate of TTS audio
        """
        # Resample if needed
        if sample_rate != self.config.goto_sample_rate:
            audio_data = self._resampler.resample(
                audio_data,
                input_rate=sample_rate,
                output_rate=self.config.goto_sample_rate,
            )
        
        # Add to outbound buffer
        await self._outbound_buffer.write(audio_data.reshape(-1, 1))
    
    def clear_outbound_audio(self) -> None:
        """Clear outbound audio buffer (for interruption handling)."""
        self._outbound_buffer.clear()
    
    @property
    def is_connected(self) -> bool:
        """Whether both connections are active."""
        return self._goto.is_connected and self._livekit.is_connected


class BridgeManager:
    """
    Manages multiple audio bridges for concurrent calls.
    """
    
    def __init__(
        self,
        goto_client: "GoToCallControlClient",
        event_listener: "GoToEventListener",
        livekit_config: LiveKitConfig,
    ):
        self.goto_client = goto_client
        self.event_listener = event_listener
        self.livekit_config = livekit_config
        
        self._bridges: dict[str, AudioBridge] = {}
        self._lock = asyncio.Lock()
    
    async def create_bridge(
        self,
        call_id: str,
        call_info: GoToCallInfo,
    ) -> AudioBridge:
        """Create a new audio bridge for a call."""
        async with self._lock:
            if call_id in self._bridges:
                raise ValueError(f"Bridge already exists for call {call_id}")
            
            bridge = AudioBridge(
                call_id=call_id,
                call_info=call_info,
                goto_client=self.goto_client,
                event_listener=self.event_listener,
                livekit_config=self.livekit_config,
            )
            
            await bridge.initialize()
            
            self._bridges[call_id] = bridge
            
            logger.info(f"Created bridge for call {call_id}")
            
            return bridge
    
    async def get_bridge(self, call_id: str) -> Optional[AudioBridge]:
        """Get an existing bridge."""
        return self._bridges.get(call_id)
    
    async def remove_bridge(self, call_id: str) -> None:
        """Remove and clean up a bridge."""
        async with self._lock:
            bridge = self._bridges.pop(call_id, None)
            if bridge:
                await bridge.stop()
                logger.info(f"Removed bridge for call {call_id}")
    
    @property
    def active_bridges(self) -> int:
        """Number of active bridges."""
        return len(self._bridges)

10.3 Audio Forking for STT \

# bridge/audio_fork.py

import asyncio
from typing import Callable, List
import numpy as np

class AudioFork:
    """
    Fork audio to multiple destinations.
    
    Allows the same audio stream to be sent to
    multiple consumers (e.g., LiveKit and STT).
    """
    
    def __init__(self):
        self._destinations: List[Callable] = []
    
    def add_destination(self, callback: Callable) -> None:
        """Add a destination for audio."""
        self._destinations.append(callback)
    
    def remove_destination(self, callback: Callable) -> None:
        """Remove a destination."""
        if callback in self._destinations:
            self._destinations.remove(callback)
    
    async def send(self, audio_data: np.ndarray) -> None:
        """Send audio to all destinations."""
        tasks = [
            asyncio.create_task(dest(audio_data.copy()))
            for dest in self._destinations
        ]
        
        if tasks:
            await asyncio.gather(*tasks, return_exceptions=True)


class MultiRateAudioFork:
    """
    Fork audio to destinations at different sample rates.
    
    Maintains separate resamplers for each destination
    to avoid repeated resampling.
    """
    
    def __init__(self, source_rate: int):
        self.source_rate = source_rate
        self._destinations: dict[int, List[Callable]] = {}
        self._resampler = MultiRateResampler()
    
    def add_destination(
        self,
        callback: Callable,
        target_rate: int,
    ) -> None:
        """Add a destination at a specific sample rate."""
        if target_rate not in self._destinations:
            self._destinations[target_rate] = []
        self._destinations[target_rate].append(callback)
    
    async def send(self, audio_data: np.ndarray) -> None:
        """Send audio to all destinations at their target rates."""
        tasks = []
        
        for target_rate, callbacks in self._destinations.items():
            # Resample if needed
            if target_rate != self.source_rate:
                resampled = self._resampler.resample(
                    audio_data,
                    self.source_rate,
                    target_rate,
                )
            else:
                resampled = audio_data
            
            # Send to all callbacks at this rate
            for callback in callbacks:
                tasks.append(
                    asyncio.create_task(callback(resampled.copy()))
                )
        
        if tasks:
            await asyncio.gather(*tasks, return_exceptions=True)

  1. Connection Lifecycle Management \

11.1 Complete Call Lifecycle \

┌─────────────────────────────────────────────────────────────────────────────┐
│                      COMPLETE CALL LIFECYCLE                                │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   ┌─────────────────────────────────────────────────────────────────────┐   │
│   │                        PHASE 1: SETUP                                │   │
│   │                                                                      │   │
│   │   1. Webhook received: call.ringing                                 │   │
│   │   2. Create Bridge instance                                         │   │
│   │   3. Initialize GoTo WebRTC peer                                    │   │
│   │   4. Initialize LiveKit connection                                  │   │
│   │   5. Answer call via GoTo API                                       │   │
│   │   6. Receive SDP offer from GoTo                                    │   │
│   │                                                                      │   │
│   │   Duration: ~500ms                                                   │   │
│   │                                                                      │   │
│   └─────────────────────────────────────────────────────────────────────┘   │
│                                       │                                     │
│                                       ▼                                     │
│   ┌─────────────────────────────────────────────────────────────────────┐   │
│   │                     PHASE 2: NEGOTIATION                             │   │
│   │                                                                      │   │
│   │   7. Parse SDP offer, extract codecs                                │   │
│   │   8. Select preferred codec (Opus > G.711)                          │   │
│   │   9. Generate SDP answer                                            │   │
│   │   10. Send answer to GoTo                                           │   │
│   │   11. Begin ICE candidate exchange                                  │   │
│   │                                                                      │   │
│   │   Duration: ~300ms                                                   │   │
│   │                                                                      │   │
│   └─────────────────────────────────────────────────────────────────────┘   │
│                                       │                                     │
│                                       ▼                                     │
│   ┌─────────────────────────────────────────────────────────────────────┐   │
│   │                     PHASE 3: CONNECTION                              │   │
│   │                                                                      │   │
│   │   12. ICE connectivity checks                                       │   │
│   │   13. DTLS handshake                                                │   │
│   │   14. SRTP session established                                      │   │
│   │   15. Connection state → CONNECTED                                  │   │
│   │   16. Join LiveKit room                                             │   │
│   │   17. Publish caller audio track                                    │   │
│   │   18. Subscribe to agent audio track                                │   │
│   │                                                                      │   │
│   │   Duration: ~500-2000ms                                              │   │
│   │                                                                      │   │
│   └─────────────────────────────────────────────────────────────────────┘   │
│                                       │                                     │
│                                       ▼                                     │
│   ┌─────────────────────────────────────────────────────────────────────┐   │
│   │                      PHASE 4: ACTIVE CALL                            │   │
│   │                                                                      │   │
│   │   19. Bidirectional audio streaming                                 │   │
│   │   20. Continuous health monitoring                                  │   │
│   │   21. Handle hold/resume if needed                                  │   │
│   │   22. Handle network transitions (ICE restart)                      │   │
│   │                                                                      │   │
│   │   Duration: Call duration (seconds to hours)                        │   │
│   │                                                                      │   │
│   └─────────────────────────────────────────────────────────────────────┘   │
│                                       │                                     │
│                                       ▼                                     │
│   ┌─────────────────────────────────────────────────────────────────────┐   │
│   │                       PHASE 5: TEARDOWN                              │   │
│   │                                                                      │   │
│   │   23. Call ended (hangup, timeout, error)                           │   │
│   │   24. Stop audio processing                                         │   │
│   │   25. Leave LiveKit room                                            │   │
│   │   26. Close GoTo WebRTC connection                                  │   │
│   │   27. Delete LiveKit room                                           │   │
│   │   28. Clean up resources                                            │   │
│   │   29. Log final metrics                                             │   │
│   │                                                                      │   │
│   │   Duration: ~200ms                                                   │   │
│   │                                                                      │   │
│   └─────────────────────────────────────────────────────────────────────┘   │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

11.2 Lifecycle Manager Implementation \

# bridge/lifecycle/manager.py

import asyncio
import time
from dataclasses import dataclass, field
from typing import Optional, Callable, Dict, Any
from enum import Enum
import logging

logger = logging.getLogger(__name__)

class BridgePhase(Enum):
    """Bridge lifecycle phases."""
    CREATED = "created"
    INITIALIZING = "initializing"
    NEGOTIATING = "negotiating"
    CONNECTING = "connecting"
    CONNECTED = "connected"
    ACTIVE = "active"
    DISCONNECTING = "disconnecting"
    TERMINATED = "terminated"
    FAILED = "failed"

@dataclass
class LifecycleMetrics:
    """Metrics collected during bridge lifecycle."""
    created_at: float = 0.0
    initialized_at: float = 0.0
    negotiation_started_at: float = 0.0
    negotiation_completed_at: float = 0.0
    connected_at: float = 0.0
    first_audio_at: float = 0.0
    terminated_at: float = 0.0
    
    total_audio_frames_received: int = 0
    total_audio_frames_sent: int = 0
    total_bytes_received: int = 0
    total_bytes_sent: int = 0
    
    ice_candidates_sent: int = 0
    ice_candidates_received: int = 0
    
    reconnection_attempts: int = 0
    
    @property
    def time_to_connect(self) -> Optional[float]:
        """Time from creation to connected state."""
        if self.connected_at and self.created_at:
            return self.connected_at - self.created_at
        return None
    
    @property
    def time_to_first_audio(self) -> Optional[float]:
        """Time from creation to first audio frame."""
        if self.first_audio_at and self.created_at:
            return self.first_audio_at - self.created_at
        return None
    
    @property
    def call_duration(self) -> Optional[float]:
        """Total call duration."""
        if self.terminated_at and self.connected_at:
            return self.terminated_at - self.connected_at
        return None

class BridgeLifecycleManager:
    """
    Manages the complete lifecycle of a WebRTC bridge.
    
    Coordinates initialization, connection, active state,
    and teardown across all bridge components.
    """
    
    def __init__(
        self,
        call_id: str,
        bridge: "AudioBridge",
    ):
        self.call_id = call_id
        self.bridge = bridge
        
        # State
        self._phase = BridgePhase.CREATED
        self._phase_lock = asyncio.Lock()
        
        # Metrics
        self.metrics = LifecycleMetrics(created_at=time.time())
        
        # Callbacks
        self._on_phase_change: Optional[Callable] = None
        self._on_error: Optional[Callable] = None
        
        # Timeouts
        self._phase_timeouts: Dict[BridgePhase, float] = {
            BridgePhase.INITIALIZING: 5.0,
            BridgePhase.NEGOTIATING: 10.0,
            BridgePhase.CONNECTING: 30.0,
        }
        
        # Timeout task
        self._timeout_task: Optional[asyncio.Task] = None
    
    async def initialize(self) -> None:
        """Initialize the bridge."""
        await self._transition_to(BridgePhase.INITIALIZING)
        
        try:
            await self.bridge.initialize()
            self.metrics.initialized_at = time.time()
            
            logger.info(f"[{self.call_id}] Bridge initialized")
            
        except Exception as e:
            logger.error(f"[{self.call_id}] Initialization failed: {e}")
            await self._transition_to(BridgePhase.FAILED)
            raise
    
    async def start_negotiation(self, sdp_offer: str) -> str:
        """Start SDP negotiation."""
        await self._transition_to(BridgePhase.NEGOTIATING)
        self.metrics.negotiation_started_at = time.time()
        
        try:
            answer = await self.bridge.handle_inbound_call(sdp_offer)
            self.metrics.negotiation_completed_at = time.time()
            
            await self._transition_to(BridgePhase.CONNECTING)
            
            logger.info(f"[{self.call_id}] Negotiation completed")
            return answer
            
        except Exception as e:
            logger.error(f"[{self.call_id}] Negotiation failed: {e}")
            await self._transition_to(BridgePhase.FAILED)
            raise
    
    async def on_connected(self) -> None:
        """Called when connection is established."""
        await self._transition_to(BridgePhase.CONNECTED)
        self.metrics.connected_at = time.time()
        
        # Start active call processing
        await self.bridge.start()
        await self._transition_to(BridgePhase.ACTIVE)
        
        logger.info(
            f"[{self.call_id}] Connected in "
            f"{self.metrics.time_to_connect:.2f}s"
        )
    
    async def on_first_audio(self) -> None:
        """Called when first audio frame is received."""
        if self.metrics.first_audio_at == 0:
            self.metrics.first_audio_at = time.time()
            
            logger.info(
                f"[{self.call_id}] First audio in "
                f"{self.metrics.time_to_first_audio:.2f}s"
            )
    
    async def terminate(self, reason: str = "normal") -> None:
        """Terminate the bridge."""
        if self._phase in (BridgePhase.TERMINATED, BridgePhase.FAILED):
            return
        
        await self._transition_to(BridgePhase.DISCONNECTING)
        
        try:
            await self.bridge.stop()
        except Exception as e:
            logger.warning(f"[{self.call_id}] Error during stop: {e}")
        
        self.metrics.terminated_at = time.time()
        await self._transition_to(BridgePhase.TERMINATED)
        
        logger.info(
            f"[{self.call_id}] Terminated ({reason}), "
            f"duration: {self.metrics.call_duration:.1f}s"
        )
    
    async def _transition_to(self, new_phase: BridgePhase) -> None:
        """Transition to a new phase."""
        async with self._phase_lock:
            old_phase = self._phase
            self._phase = new_phase
            
            # Cancel existing timeout
            if self._timeout_task:
                self._timeout_task.cancel()
                self._timeout_task = None
            
            # Set new timeout if applicable
            if new_phase in self._phase_timeouts:
                timeout = self._phase_timeouts[new_phase]
                self._timeout_task = asyncio.create_task(
                    self._phase_timeout(new_phase, timeout)
                )
            
            logger.debug(
                f"[{self.call_id}] Phase: {old_phase.value}{new_phase.value}"
            )
            
            if self._on_phase_change:
                await self._on_phase_change(old_phase, new_phase)
    
    async def _phase_timeout(self, phase: BridgePhase, timeout: float) -> None:
        """Handle phase timeout."""
        try:
            await asyncio.sleep(timeout)
            
            # Check if still in this phase
            if self._phase == phase:
                logger.error(
                    f"[{self.call_id}] Timeout in phase {phase.value} "
                    f"after {timeout}s"
                )
                await self._transition_to(BridgePhase.FAILED)
                
                if self._on_error:
                    await self._on_error(f"Timeout in {phase.value}")
                    
        except asyncio.CancelledError:
            pass
    
    @property
    def phase(self) -> BridgePhase:
        """Current lifecycle phase."""
        return self._phase
    
    @property
    def is_active(self) -> bool:
        """Whether bridge is in active call state."""
        return self._phase == BridgePhase.ACTIVE
    
    def on_phase_change(self, callback: Callable) -> None:
        """Set phase change callback."""
        self._on_phase_change = callback
    
    def on_error(self, callback: Callable) -> None:
        """Set error callback."""
        self._on_error = callback

11.3 Graceful Shutdown \

# bridge/lifecycle/shutdown.py

import asyncio
import signal
from typing import Set
import logging

logger = logging.getLogger(__name__)

class GracefulShutdown:
    """
    Manages graceful shutdown of all active bridges.
    
    Ensures calls are properly terminated when the
    service is stopped.
    """
    
    def __init__(self, bridge_manager: "BridgeManager"):
        self.bridge_manager = bridge_manager
        self._shutdown_event = asyncio.Event()
        self._active_shutdowns: Set[asyncio.Task] = set()
    
    def setup_signal_handlers(self) -> None:
        """Set up signal handlers for graceful shutdown."""
        loop = asyncio.get_event_loop()
        
        for sig in (signal.SIGTERM, signal.SIGINT):
            loop.add_signal_handler(
                sig,
                lambda s=sig: asyncio.create_task(self._handle_signal(s))
            )
    
    async def _handle_signal(self, sig: signal.Signals) -> None:
        """Handle shutdown signal."""
        logger.info(f"Received signal {sig.name}, initiating graceful shutdown")
        
        self._shutdown_event.set()
        await self.shutdown_all_bridges()
    
    async def shutdown_all_bridges(
        self,
        timeout: float = 30.0,
        reason: str = "service_shutdown",
    ) -> None:
        """
        Shutdown all active bridges gracefully.
        
        Args:
            timeout: Maximum time to wait for shutdown
            reason: Reason for shutdown (logged)
        """
        bridges = list(self.bridge_manager._bridges.values())
        
        if not bridges:
            logger.info("No active bridges to shutdown")
            return
        
        logger.info(f"Shutting down {len(bridges)} active bridges")
        
        # Create shutdown tasks
        tasks = [
            asyncio.create_task(self._shutdown_bridge(bridge, reason))
            for bridge in bridges
        ]
        self._active_shutdowns.update(tasks)
        
        # Wait with timeout
        try:
            await asyncio.wait_for(
                asyncio.gather(*tasks, return_exceptions=True),
                timeout=timeout,
            )
            logger.info("All bridges shutdown gracefully")
            
        except asyncio.TimeoutError:
            logger.warning(
                f"Shutdown timeout after {timeout}s, "
                f"forcing remaining bridges"
            )
            
            # Force stop remaining
            for task in tasks:
                if not task.done():
                    task.cancel()
        
        self._active_shutdowns.clear()
    
    async def _shutdown_bridge(
        self,
        bridge: "AudioBridge",
        reason: str,
    ) -> None:
        """Shutdown a single bridge."""
        try:
            # Play goodbye message if possible
            # await bridge.play_announcement("call_ending")
            
            await bridge.stop()
            logger.debug(f"Bridge {bridge.call_id} shutdown complete")
            
        except Exception as e:
            logger.error(f"Error shutting down bridge {bridge.call_id}: {e}")
    
    @property
    def is_shutting_down(self) -> bool:
        """Whether shutdown is in progress."""
        return self._shutdown_event.is_set()

  1. Error Handling and Recovery \

12.1 Error Categories \

┌─────────────────────────────────────────────────────────────────────────────┐
│                         ERROR CATEGORIES                                    │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   CATEGORY              EXAMPLES                    RECOVERY ACTION         │
│   ─────────────────────────────────────────────────────────────────────    │
│                                                                             │
│   TRANSIENT             • Packet loss               • Automatic retry       │
│                         • Brief network hiccup      • Buffer through it     │
│                         • Temporary high latency    • Wait and continue     │
│                                                                             │
│   RECOVERABLE           • ICE disconnection         • ICE restart           │
│                         • LiveKit reconnection      • Reconnect + resume    │
│                         • Codec negotiation fail    • Fallback codec        │
│                                                                             │
│   TERMINAL              • GoTo call ended           • Clean shutdown        │
│                         • Authentication expired    • Terminate + notify    │
│                         • Fatal connection error    • Log + cleanup         │
│                                                                             │
│   INFRASTRUCTURE        • Memory exhaustion         • Alert + graceful      │
│                         • CPU overload              • Shed load             │
│                         • Disk full                 • Emergency cleanup     │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

12.2 Error Handler Implementation \

# bridge/errors/handler.py

import asyncio
import traceback
from dataclasses import dataclass
from typing import Optional, Callable, Any
from enum import Enum
import logging

logger = logging.getLogger(__name__)

class ErrorSeverity(Enum):
    """Error severity levels."""
    DEBUG = "debug"
    INFO = "info"
    WARNING = "warning"
    ERROR = "error"
    CRITICAL = "critical"

class ErrorCategory(Enum):
    """Error categories for handling decisions."""
    TRANSIENT = "transient"
    RECOVERABLE = "recoverable"
    TERMINAL = "terminal"
    INFRASTRUCTURE = "infrastructure"

@dataclass
class BridgeError:
    """Structured bridge error."""
    code: str
    message: str
    category: ErrorCategory
    severity: ErrorSeverity
    details: Optional[dict] = None
    cause: Optional[Exception] = None
    
    def __str__(self) -> str:
        return f"[{self.code}] {self.message}"

# Error code definitions
class ErrorCodes:
    # Connection errors (1xxx)
    ICE_FAILED = "1001"
    ICE_DISCONNECTED = "1002"
    DTLS_FAILED = "1003"
    CONNECTION_TIMEOUT = "1004"
    
    # Signaling errors (2xxx)
    SDP_PARSE_ERROR = "2001"
    SDP_NEGOTIATION_FAILED = "2002"
    NO_COMPATIBLE_CODECS = "2003"
    
    # Media errors (3xxx)
    AUDIO_TRACK_FAILED = "3001"
    CODEC_ERROR = "3002"
    BUFFER_OVERFLOW = "3003"
    
    # GoTo errors (4xxx)
    GOTO_API_ERROR = "4001"
    GOTO_CALL_ENDED = "4002"
    GOTO_AUTH_EXPIRED = "4003"
    
    # LiveKit errors (5xxx)
    LIVEKIT_CONNECTION_FAILED = "5001"
    LIVEKIT_ROOM_ERROR = "5002"
    LIVEKIT_TRACK_ERROR = "5003"
    
    # System errors (9xxx)
    INTERNAL_ERROR = "9001"
    RESOURCE_EXHAUSTED = "9002"
    SHUTDOWN_REQUESTED = "9003"

class ErrorHandler:
    """
    Centralized error handling for the bridge.
    
    Routes errors to appropriate recovery mechanisms
    based on category and severity.
    """
    
    def __init__(
        self,
        call_id: str,
        lifecycle_manager: "BridgeLifecycleManager",
    ):
        self.call_id = call_id
        self.lifecycle_manager = lifecycle_manager
        
        # Recovery callbacks
        self._recovery_handlers: dict[ErrorCategory, Callable] = {}
        
        # Error history for pattern detection
        self._error_history: list[BridgeError] = []
        self._max_history = 100
        
        # Circuit breaker state
        self._consecutive_errors = 0
        self._circuit_open = False
    
    async def handle_error(self, error: BridgeError) -> bool:
        """
        Handle an error and attempt recovery.
        
        Returns:
            True if recovered, False if terminal
        """
        # Log the error
        self._log_error(error)
        
        # Record in history
        self._error_history.append(error)
        if len(self._error_history) > self._max_history:
            self._error_history.pop(0)
        
        # Check circuit breaker
        if self._circuit_open:
            logger.warning(f"[{self.call_id}] Circuit breaker open, skipping recovery")
            return False
        
        # Update consecutive error count
        self._consecutive_errors += 1
        
        if self._consecutive_errors >= 5:
            logger.error(f"[{self.call_id}] Too many consecutive errors, opening circuit")
            self._circuit_open = True
            return False
        
        # Route to appropriate handler
        try:
            if error.category == ErrorCategory.TRANSIENT:
                return await self._handle_transient(error)
            elif error.category == ErrorCategory.RECOVERABLE:
                return await self._handle_recoverable(error)
            elif error.category == ErrorCategory.TERMINAL:
                return await self._handle_terminal(error)
            elif error.category == ErrorCategory.INFRASTRUCTURE:
                return await self._handle_infrastructure(error)
            else:
                return False
                
        except Exception as e:
            logger.error(f"[{self.call_id}] Error during recovery: {e}")
            return False
    
    def clear_error_state(self) -> None:
        """Clear error state after successful operation."""
        self._consecutive_errors = 0
        self._circuit_open = False
    
    async def _handle_transient(self, error: BridgeError) -> bool:
        """Handle transient errors."""
        # Usually just log and continue
        logger.debug(f"[{self.call_id}] Transient error, continuing: {error}")
        return True
    
    async def _handle_recoverable(self, error: BridgeError) -> bool:
        """Handle recoverable errors."""
        handler = self._recovery_handlers.get(error.category)
        
        if handler:
            return await handler(error)
        
        # Default recovery based on error code
        if error.code == ErrorCodes.ICE_DISCONNECTED:
            return await self._attempt_ice_restart()
        elif error.code == ErrorCodes.LIVEKIT_CONNECTION_FAILED:
            return await self._attempt_livekit_reconnect()
        elif error.code == ErrorCodes.NO_COMPATIBLE_CODECS:
            return await self._attempt_codec_fallback()
        
        return False
    
    async def _handle_terminal(self, error: BridgeError) -> bool:
        """Handle terminal errors."""
        logger.error(f"[{self.call_id}] Terminal error: {error}")
        
        # Terminate the bridge
        await self.lifecycle_manager.terminate(reason=str(error))
        
        return False
    
    async def _handle_infrastructure(self, error: BridgeError) -> bool:
        """Handle infrastructure errors."""
        logger.critical(f"[{self.call_id}] Infrastructure error: {error}")
        
        # Alert operations
        # await alert_ops(error)
        
        return False
    
    async def _attempt_ice_restart(self) -> bool:
        """Attempt ICE restart."""
        logger.info(f"[{self.call_id}] Attempting ICE restart")
        
        # Implementation would trigger ICE restart
        # through the bridge's WebRTC peer
        
        return True  # Optimistically return true
    
    async def _attempt_livekit_reconnect(self) -> bool:
        """Attempt LiveKit reconnection."""
        logger.info(f"[{self.call_id}] Attempting LiveKit reconnect")
        
        # Implementation would reconnect to LiveKit
        
        return True
    
    async def _attempt_codec_fallback(self) -> bool:
        """Attempt codec fallback."""
        logger.info(f"[{self.call_id}] Attempting codec fallback")
        
        # Implementation would renegotiate with fallback codec
        
        return False  # Usually requires re-negotiation
    
    def _log_error(self, error: BridgeError) -> None:
        """Log error with appropriate level."""
        log_message = (
            f"[{self.call_id}] {error.category.value.upper()} ERROR: "
            f"{error.code} - {error.message}"
        )
        
        if error.details:
            log_message += f" | Details: {error.details}"
        
        if error.severity == ErrorSeverity.DEBUG:
            logger.debug(log_message)
        elif error.severity == ErrorSeverity.INFO:
            logger.info(log_message)
        elif error.severity == ErrorSeverity.WARNING:
            logger.warning(log_message)
        elif error.severity == ErrorSeverity.ERROR:
            logger.error(log_message)
        elif error.severity == ErrorSeverity.CRITICAL:
            logger.critical(log_message)
            
            if error.cause:
                logger.critical(
                    f"[{self.call_id}] Traceback:\n"
                    f"{''.join(traceback.format_exception(type(error.cause), error.cause, error.cause.__traceback__))}"
                )
    
    def register_recovery_handler(
        self,
        category: ErrorCategory,
        handler: Callable,
    ) -> None:
        """Register a custom recovery handler."""
        self._recovery_handlers[category] = handler

12.3 ICE Restart Manager \

# bridge/errors/ice_restart.py

import asyncio
import time
from typing import Optional
import logging

logger = logging.getLogger(__name__)

class ICERestartManager:
    """
    Manages ICE restart procedures.
    
    Handles automatic ICE restarts when connectivity
    is lost but call should continue.
    """
    
    def __init__(
        self,
        goto_peer: "GoToWebRTCPeer",
        max_restarts: int = 3,
        cooldown_seconds: float = 5.0,
    ):
        self.goto_peer = goto_peer
        self.max_restarts = max_restarts
        self.cooldown_seconds = cooldown_seconds
        
        self._restart_count = 0
        self._last_restart_time: Optional[float] = None
        self._restart_in_progress = False
    
    async def should_restart(self) -> bool:
        """Check if ICE restart should be attempted."""
        if self._restart_in_progress:
            return False
        
        if self._restart_count >= self.max_restarts:
            logger.warning("Maximum ICE restarts reached")
            return False
        
        if self._last_restart_time:
            elapsed = time.time() - self._last_restart_time
            if elapsed < self.cooldown_seconds:
                logger.debug(f"ICE restart cooldown ({elapsed:.1f}s)")
                return False
        
        return True
    
    async def perform_restart(self) -> bool:
        """
        Perform ICE restart.
        
        Returns:
            True if restart succeeded
        """
        if not await self.should_restart():
            return False
        
        self._restart_in_progress = True
        self._restart_count += 1
        self._last_restart_time = time.time()
        
        logger.info(f"Performing ICE restart ({self._restart_count}/{self.max_restarts})")
        
        try:
            # Create new offer with ICE restart flag
            # This is specific to how GoTo handles ICE restart
            
            # In aiortc, you'd create a new offer:
            # offer = await self.goto_peer._pc.createOffer()
            # Then signal it to GoTo
            
            # Wait for new ICE gathering
            await self.goto_peer.wait_for_ice_gathering(timeout=10.0)
            
            logger.info("ICE restart completed successfully")
            return True
            
        except Exception as e:
            logger.error(f"ICE restart failed: {e}")
            return False
            
        finally:
            self._restart_in_progress = False
    
    def reset(self) -> None:
        """Reset restart counter."""
        self._restart_count = 0
        self._last_restart_time = None

  1. Performance Optimization \

13.1 Performance Targets \

┌─────────────────────────────────────────────────────────────────────────────┐
│                      PERFORMANCE TARGETS                                    │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   METRIC                    TARGET          ACCEPTABLE       CRITICAL       │
│   ─────────────────────────────────────────────────────────────────────    │
│                                                                             │
│   End-to-end latency        < 150ms         < 300ms          > 500ms       │
│   Bridge processing         < 5ms           < 10ms           > 20ms        │
│   Frame jitter              < 10ms          < 20ms           > 50ms        │
│                                                                             │
│   CPU per call              < 2%            < 5%             > 10%         │
│   Memory per call           < 50MB          < 100MB          > 200MB       │
│                                                                             │
│   Connection setup          < 2s            < 5s             > 10s         │
│   ICE gathering             < 1s            < 3s             > 5s          │
│                                                                             │
│   Packet loss tolerance     < 5%            < 10%            > 20%         │
│   Audio quality (MOS)       > 4.0           > 3.5            < 3.0         │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

13.2 Audio Buffer Optimization \

# bridge/performance/buffer.py

import numpy as np
from collections import deque
from typing import Optional
import asyncio

class OptimizedAudioBuffer:
    """
    High-performance circular audio buffer.
    
    Uses numpy for efficient memory operations
    and avoids Python object overhead.
    """
    
    def __init__(
        self,
        max_duration_ms: int = 500,
        sample_rate: int = 48000,
        channels: int = 1,
    ):
        self.sample_rate = sample_rate
        self.channels = channels
        
        # Pre-allocate buffer
        max_samples = int(sample_rate * max_duration_ms / 1000)
        self._buffer = np.zeros((max_samples, channels), dtype=np.int16)
        
        # Ring buffer pointers
        self._write_pos = 0
        self._read_pos = 0
        self._available = 0
        
        # Lock for thread safety
        self._lock = asyncio.Lock()
    
    async def write(self, data: np.ndarray) -> int:
        """
        Write audio data to buffer.
        
        Args:
            data: Audio samples (samples, channels)
        
        Returns:
            Number of samples written
        """
        async with self._lock:
            samples = len(data)
            buffer_size = len(self._buffer)
            
            # Check for overflow
            available_space = buffer_size - self._available
            if samples > available_space:
                # Overwrite oldest data
                overflow = samples - available_space
                self._read_pos = (self._read_pos + overflow) % buffer_size
                self._available -= overflow
            
            # Write data (may wrap around)
            end_pos = (self._write_pos + samples) % buffer_size
            
            if end_pos > self._write_pos:
                # No wrap
                self._buffer[self._write_pos:end_pos] = data
            else:
                # Wrap around
                first_part = buffer_size - self._write_pos
                self._buffer[self._write_pos:] = data[:first_part]
                self._buffer[:end_pos] = data[first_part:]
            
            self._write_pos = end_pos
            self._available += samples
            
            return samples
    
    async def read(self, samples: int) -> np.ndarray:
        """
        Read audio data from buffer.
        
        Args:
            samples: Number of samples to read
        
        Returns:
            Audio data (samples, channels)
        """
        async with self._lock:
            if self._available < samples:
                # Not enough data, return what we have + silence
                actual = self._available
                silence_needed = samples - actual
                
                if actual == 0:
                    return np.zeros((samples, self.channels), dtype=np.int16)
                
                data = self._read_internal(actual)
                silence = np.zeros((silence_needed, self.channels), dtype=np.int16)
                return np.vstack([data, silence])
            
            return self._read_internal(samples)
    
    def _read_internal(self, samples: int) -> np.ndarray:
        """Internal read without lock."""
        buffer_size = len(self._buffer)
        end_pos = (self._read_pos + samples) % buffer_size
        
        if end_pos > self._read_pos:
            # No wrap
            data = self._buffer[self._read_pos:end_pos].copy()
        else:
            # Wrap around
            first_part = buffer_size - self._read_pos
            data = np.vstack([
                self._buffer[self._read_pos:],
                self._buffer[:end_pos]
            ])
        
        self._read_pos = end_pos
        self._available -= samples
        
        return data
    
    @property
    def available_samples(self) -> int:
        """Number of samples available."""
        return self._available
    
    @property
    def available_ms(self) -> float:
        """Duration available in milliseconds."""
        return (self._available / self.sample_rate) * 1000
    
    def clear(self) -> None:
        """Clear the buffer."""
        self._write_pos = 0
        self._read_pos = 0
        self._available = 0

13.3 Connection Pooling \

# bridge/performance/pool.py

import asyncio
from typing import Dict, Optional
from dataclasses import dataclass
import time

@dataclass
class PooledConnection:
    """A pooled WebRTC-related connection."""
    connection: any
    created_at: float
    last_used: float
    in_use: bool = False

class LiveKitConnectionPool:
    """
    Pool of pre-warmed LiveKit connections.
    
    Reduces connection setup time by maintaining
    ready-to-use connections.
    """
    
    def __init__(
        self,
        config: "LiveKitConfig",
        min_connections: int = 2,
        max_connections: int = 10,
        max_idle_seconds: float = 300,
    ):
        self.config = config
        self.min_connections = min_connections
        self.max_connections = max_connections
        self.max_idle_seconds = max_idle_seconds
        
        self._pool: Dict[str, PooledConnection] = {}
        self._lock = asyncio.Lock()
        self._maintenance_task: Optional[asyncio.Task] = None
    
    async def start(self) -> None:
        """Start the pool and warm up connections."""
        # Pre-create minimum connections
        for _ in range(self.min_connections):
            await self._create_connection()
        
        # Start maintenance task
        self._maintenance_task = asyncio.create_task(
            self._maintenance_loop()
        )
    
    async def stop(self) -> None:
        """Stop the pool and close all connections."""
        if self._maintenance_task:
            self._maintenance_task.cancel()
        
        async with self._lock:
            for conn_id, pooled in self._pool.items():
                try:
                    await pooled.connection.disconnect()
                except Exception:
                    pass
            self._pool.clear()
    
    async def acquire(self) -> "LiveKitConnectionHandler":
        """Acquire a connection from the pool."""
        async with self._lock:
            # Find available connection
            for conn_id, pooled in self._pool.items():
                if not pooled.in_use:
                    pooled.in_use = True
                    pooled.last_used = time.time()
                    return pooled.connection
            
            # Create new if under limit
            if len(self._pool) < self.max_connections:
                conn = await self._create_connection()
                conn_id = id(conn)
                self._pool[conn_id].in_use = True
                return conn
            
            # Pool exhausted
            raise RuntimeError("Connection pool exhausted")
    
    async def release(self, connection: "LiveKitConnectionHandler") -> None:
        """Release a connection back to the pool."""
        async with self._lock:
            conn_id = id(connection)
            if conn_id in self._pool:
                self._pool[conn_id].in_use = False
                self._pool[conn_id].last_used = time.time()
    
    async def _create_connection(self) -> "LiveKitConnectionHandler":
        """Create a new pooled connection."""
        from bridge.livekit.connection_handler import LiveKitConnectionHandler
        
        conn = LiveKitConnectionHandler(
            config=self.config,
            call_id=f"pool_{len(self._pool)}",
        )
        # Pre-initialize but don't join room yet
        
        self._pool[id(conn)] = PooledConnection(
            connection=conn,
            created_at=time.time(),
            last_used=time.time(),
        )
        
        return conn
    
    async def _maintenance_loop(self) -> None:
        """Periodic pool maintenance."""
        while True:
            try:
                await asyncio.sleep(60)
                await self._cleanup_idle()
                await self._ensure_minimum()
            except asyncio.CancelledError:
                break
            except Exception as e:
                logger.error(f"Pool maintenance error: {e}")
    
    async def _cleanup_idle(self) -> None:
        """Remove idle connections beyond minimum."""
        async with self._lock:
            now = time.time()
            to_remove = []
            
            for conn_id, pooled in self._pool.items():
                if (not pooled.in_use and 
                    now - pooled.last_used > self.max_idle_seconds and
                    len(self._pool) > self.min_connections):
                    to_remove.append(conn_id)
            
            for conn_id in to_remove:
                pooled = self._pool.pop(conn_id)
                try:
                    await pooled.connection.disconnect()
                except Exception:
                    pass
    
    async def _ensure_minimum(self) -> None:
        """Ensure minimum connections are available."""
        async with self._lock:
            available = sum(1 for p in self._pool.values() if not p.in_use)
            
            while available < self.min_connections:
                await self._create_connection()
                available += 1

  1. Monitoring and Debugging \

14.1 Metrics Collection \

# bridge/monitoring/metrics.py

from prometheus_client import Counter, Histogram, Gauge
import time

# Connection metrics
CONNECTIONS_TOTAL = Counter(
    'webrtc_bridge_connections_total',
    'Total WebRTC connections',
    ['direction', 'result']  # inbound/outbound, success/failed
)

CONNECTIONS_ACTIVE = Gauge(
    'webrtc_bridge_connections_active',
    'Currently active connections'
)

CONNECTION_DURATION = Histogram(
    'webrtc_bridge_connection_duration_seconds',
    'Connection duration',
    buckets=[1, 5, 30, 60, 300, 600, 1800, 3600]
)

# Latency metrics
AUDIO_LATENCY = Histogram(
    'webrtc_bridge_audio_latency_ms',
    'Audio processing latency',
    ['direction'],  # inbound/outbound
    buckets=[1, 2, 5, 10, 20, 50, 100, 200]
)

ICE_GATHERING_TIME = Histogram(
    'webrtc_bridge_ice_gathering_seconds',
    'ICE gathering duration',
    buckets=[0.1, 0.5, 1, 2, 5, 10]
)

CONNECTION_SETUP_TIME = Histogram(
    'webrtc_bridge_connection_setup_seconds',
    'Time to establish connection',
    buckets=[0.5, 1, 2, 3, 5, 10, 30]
)

# Audio metrics
AUDIO_FRAMES_PROCESSED = Counter(
    'webrtc_bridge_audio_frames_total',
    'Audio frames processed',
    ['direction']
)

AUDIO_BUFFER_LEVEL = Gauge(
    'webrtc_bridge_audio_buffer_ms',
    'Current audio buffer level',
    ['direction']
)

PACKET_LOSS_RATE = Gauge(
    'webrtc_bridge_packet_loss_rate',
    'Current packet loss rate',
    ['peer']  # goto/livekit
)

# Error metrics
ERRORS_TOTAL = Counter(
    'webrtc_bridge_errors_total',
    'Total errors',
    ['category', 'code']
)

ICE_RESTARTS = Counter(
    'webrtc_bridge_ice_restarts_total',
    'ICE restart attempts',
    ['result']
)

class BridgeMetricsCollector:
    """Collects and exposes bridge metrics."""
    
    def __init__(self, call_id: str):
        self.call_id = call_id
        self._start_time = time.time()
    
    def record_connection_start(self, direction: str) -> None:
        """Record connection attempt."""
        CONNECTIONS_ACTIVE.inc()
    
    def record_connection_established(self, direction: str) -> None:
        """Record successful connection."""
        CONNECTIONS_TOTAL.labels(direction=direction, result='success').inc()
        CONNECTION_SETUP_TIME.observe(time.time() - self._start_time)
    
    def record_connection_failed(self, direction: str) -> None:
        """Record failed connection."""
        CONNECTIONS_TOTAL.labels(direction=direction, result='failed').inc()
        CONNECTIONS_ACTIVE.dec()
    
    def record_connection_ended(self) -> None:
        """Record connection end."""
        CONNECTIONS_ACTIVE.dec()
        CONNECTION_DURATION.observe(time.time() - self._start_time)
    
    def record_audio_latency(self, direction: str, latency_ms: float) -> None:
        """Record audio processing latency."""
        AUDIO_LATENCY.labels(direction=direction).observe(latency_ms)
    
    def record_audio_frame(self, direction: str) -> None:
        """Record processed audio frame."""
        AUDIO_FRAMES_PROCESSED.labels(direction=direction).inc()
    
    def update_buffer_level(self, direction: str, level_ms: float) -> None:
        """Update audio buffer level."""
        AUDIO_BUFFER_LEVEL.labels(direction=direction).set(level_ms)
    
    def record_error(self, category: str, code: str) -> None:
        """Record an error."""
        ERRORS_TOTAL.labels(category=category, code=code).inc()

14.2 Structured Logging \

# bridge/monitoring/logging.py

import structlog
from typing import Any, Dict

def configure_logging() -> None:
    """Configure structured logging for the bridge."""
    structlog.configure(
        processors=[
            structlog.stdlib.filter_by_level,
            structlog.stdlib.add_logger_name,
            structlog.stdlib.add_log_level,
            structlog.processors.TimeStamper(fmt="iso"),
            structlog.processors.StackInfoRenderer(),
            structlog.processors.format_exc_info,
            structlog.processors.UnicodeDecoder(),
            structlog.processors.JSONRenderer()
        ],
        wrapper_class=structlog.stdlib.BoundLogger,
        context_class=dict,
        logger_factory=structlog.stdlib.LoggerFactory(),
        cache_logger_on_first_use=True,
    )

def get_bridge_logger(call_id: str) -> structlog.BoundLogger:
    """Get a logger bound to a specific call."""
    return structlog.get_logger().bind(call_id=call_id)

class BridgeLogContext:
    """Context manager for bridge logging."""
    
    def __init__(self, call_id: str):
        self.call_id = call_id
        self.logger = get_bridge_logger(call_id)
        self._context: Dict[str, Any] = {}
    
    def bind(self, **kwargs) -> "BridgeLogContext":
        """Add context to logger."""
        self._context.update(kwargs)
        self.logger = self.logger.bind(**kwargs)
        return self
    
    def info(self, message: str, **kwargs) -> None:
        self.logger.info(message, **kwargs)
    
    def debug(self, message: str, **kwargs) -> None:
        self.logger.debug(message, **kwargs)
    
    def warning(self, message: str, **kwargs) -> None:
        self.logger.warning(message, **kwargs)
    
    def error(self, message: str, **kwargs) -> None:
        self.logger.error(message, **kwargs)
    
    def log_audio_frame(
        self,
        direction: str,
        samples: int,
        latency_ms: float,
    ) -> None:
        """Log audio frame processing (sampled)."""
        self.logger.debug(
            "audio_frame",
            direction=direction,
            samples=samples,
            latency_ms=round(latency_ms, 2),
        )
    
    def log_ice_event(self, event: str, **details) -> None:
        """Log ICE-related event."""
        self.logger.info(
            "ice_event",
            event=event,
            **details
        )
    
    def log_connection_state(self, state: str, **details) -> None:
        """Log connection state change."""
        self.logger.info(
            "connection_state_change",
            state=state,
            **details
        )

14.3 Debug Tools \

# bridge/monitoring/debug.py

import asyncio
from typing import Dict, Any, Optional
from dataclasses import dataclass, asdict
import json

@dataclass
class BridgeSnapshot:
    """Point-in-time snapshot of bridge state."""
    call_id: str
    timestamp: float
    
    # Connection state
    goto_connection_state: str
    goto_ice_state: str
    livekit_connected: bool
    
    # Audio state
    inbound_buffer_ms: float
    outbound_buffer_ms: float
    frames_received: int
    frames_sent: int
    
    # Quality metrics
    estimated_latency_ms: float
    packet_loss_percent: float
    jitter_ms: float
    
    # Error state
    consecutive_errors: int
    last_error: Optional[str]

class BridgeDebugger:
    """
    Debug tools for inspecting bridge state.
    
    Provides real-time inspection and diagnostic
    capabilities for troubleshooting.
    """
    
    def __init__(self, bridge: "AudioBridge"):
        self.bridge = bridge
        self._snapshots: list[BridgeSnapshot] = []
        self._max_snapshots = 100
        self._recording = False
    
    def take_snapshot(self) -> BridgeSnapshot:
        """Take a snapshot of current state."""
        snapshot = BridgeSnapshot(
            call_id=self.bridge.call_id,
            timestamp=time.time(),
            goto_connection_state=self.bridge._goto.connection_state.value,
            goto_ice_state=self.bridge._goto.ice_connection_state.value,
            livekit_connected=self.bridge._livekit.is_connected,
            inbound_buffer_ms=self.bridge._inbound_buffer.available_ms,
            outbound_buffer_ms=self.bridge._outbound_buffer.available_ms,
            frames_received=self.bridge.lifecycle_manager.metrics.total_audio_frames_received,
            frames_sent=self.bridge.lifecycle_manager.metrics.total_audio_frames_sent,
            estimated_latency_ms=0,  # Would calculate from actual measurements
            packet_loss_percent=0,
            jitter_ms=0,
            consecutive_errors=0,
            last_error=None,
        )
        
        if self._recording:
            self._snapshots.append(snapshot)
            if len(self._snapshots) > self._max_snapshots:
                self._snapshots.pop(0)
        
        return snapshot
    
    def start_recording(self) -> None:
        """Start recording snapshots."""
        self._recording = True
        self._snapshots.clear()
    
    def stop_recording(self) -> list[BridgeSnapshot]:
        """Stop recording and return snapshots."""
        self._recording = False
        return self._snapshots.copy()
    
    def dump_state(self) -> str:
        """Dump current state as JSON."""
        snapshot = self.take_snapshot()
        return json.dumps(asdict(snapshot), indent=2)
    
    async def run_diagnostics(self) -> Dict[str, Any]:
        """Run diagnostic checks."""
        results = {
            "call_id": self.bridge.call_id,
            "checks": {}
        }
        
        # Check GoTo connection
        results["checks"]["goto_connected"] = (
            self.bridge._goto.is_connected
        )
        
        # Check LiveKit connection
        results["checks"]["livekit_connected"] = (
            self.bridge._livekit.is_connected
        )
        
        # Check audio flow
        results["checks"]["audio_flowing"] = (
            self.bridge.lifecycle_manager.metrics.total_audio_frames_received > 0
        )
        
        # Check buffer health
        inbound_ms = self.bridge._inbound_buffer.available_ms
        results["checks"]["buffer_healthy"] = 20 <= inbound_ms <= 200
        results["buffer_level_ms"] = inbound_ms
        
        # Overall health
        results["healthy"] = all(results["checks"].values())
        
        return results

  1. Testing Strategy \

15.1 Test Categories \

┌─────────────────────────────────────────────────────────────────────────────┐
│                         TEST CATEGORIES                                     │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   UNIT TESTS                                                                │
│   ───────────                                                               │
│   • Audio buffer operations                                                 │
│   • Codec encoding/decoding                                                 │
│   • SDP parsing and generation                                              │
│   • ICE candidate parsing                                                   │
│   • Resampling accuracy                                                     │
│                                                                             │
│   INTEGRATION TESTS                                                         │
│   ─────────────────                                                         │
│   • aiortc peer connection establishment                                    │
│   • LiveKit room join/leave                                                 │
│   • Audio track publish/subscribe                                           │
│   • Full bridge audio flow                                                  │
│                                                                             │
│   END-TO-END TESTS                                                          │
│   ────────────────                                                          │
│   • Complete call flow with mock GoTo                                       │
│   • Real GoTo sandbox calls                                                 │
│   • Multi-call concurrency                                                  │
│   • Long-duration stability                                                 │
│                                                                             │
│   PERFORMANCE TESTS                                                         │
│   ─────────────────                                                         │
│   • Latency benchmarks                                                      │
│   • Throughput limits                                                       │
│   • Memory usage under load                                                 │
│   • CPU profiling                                                           │
│                                                                             │
│   CHAOS TESTS                                                               │
│   ────────────                                                              │
│   • Network partition simulation                                            │
│   • Packet loss injection                                                   │
│   • High latency simulation                                                 │
│   • Resource exhaustion                                                     │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

15.2 Unit Test Examples \

# tests/unit/test_audio_buffer.py

import pytest
import numpy as np
import asyncio
from bridge.performance.buffer import OptimizedAudioBuffer

@pytest.fixture
def audio_buffer():
    return OptimizedAudioBuffer(
        max_duration_ms=100,
        sample_rate=48000,
        channels=1,
    )

@pytest.mark.asyncio
async def test_write_and_read(audio_buffer):
    """Test basic write and read operations."""
    # Write 20ms of audio (960 samples at 48kHz)
    samples = np.random.randint(-32768, 32767, (960, 1), dtype=np.int16)
    
    written = await audio_buffer.write(samples)
    assert written == 960
    assert audio_buffer.available_samples == 960
    
    # Read back
    read_samples = await audio_buffer.read(960)
    assert read_samples.shape == (960, 1)
    np.testing.assert_array_equal(read_samples, samples)

@pytest.mark.asyncio
async def test_buffer_overflow(audio_buffer):
    """Test buffer overflow handling."""
    # Buffer holds 100ms = 4800 samples
    # Write 150ms worth
    samples = np.random.randint(-32768, 32767, (7200, 1), dtype=np.int16)
    
    written = await audio_buffer.write(samples)
    assert written == 7200
    
    # Should only have last 100ms
    assert audio_buffer.available_samples == 4800

@pytest.mark.asyncio
async def test_read_underflow(audio_buffer):
    """Test reading more than available."""
    # Write 10ms
    samples = np.random.randint(-32768, 32767, (480, 1), dtype=np.int16)
    await audio_buffer.write(samples)
    
    # Request 20ms
    read_samples = await audio_buffer.read(960)
    
    # Should get data + silence
    assert read_samples.shape == (960, 1)
    np.testing.assert_array_equal(read_samples[:480], samples)
    np.testing.assert_array_equal(read_samples[480:], np.zeros((480, 1)))


# tests/unit/test_sdp_negotiator.py

import pytest
from bridge.webrtc.sdp_negotiator import SDPNegotiator, ParsedSDP

@pytest.fixture
def negotiator():
    return SDPNegotiator()

def test_parse_sdp_offer(negotiator):
    """Test parsing a typical SDP offer."""
    sdp = """v=0
o=- 123456 1 IN IP4 0.0.0.0
s=-
t=0 0
a=group:BUNDLE 0
m=audio 9 UDP/TLS/RTP/SAVPF 111 0 8
c=IN IP4 0.0.0.0
a=mid:0
a=ice-ufrag:test123
a=ice-pwd:testpassword12345678
a=rtpmap:111 opus/48000/2
a=rtpmap:0 PCMU/8000
a=rtpmap:8 PCMA/8000
a=sendrecv
"""
    
    parsed = negotiator.parse_sdp(sdp)
    
    assert parsed.version == 0
    assert len(parsed.media) == 1
    assert parsed.media[0].media_type == "audio"
    assert len(parsed.media[0].codecs) == 3
    assert parsed.media[0].ice_ufrag == "test123"

def test_negotiate_codecs_prefers_opus(negotiator):
    """Test that Opus is preferred when available."""
    from bridge.webrtc.sdp_negotiator import CodecInfo
    
    offered = [
        CodecInfo(0, "PCMU", 8000, 1),
        CodecInfo(111, "opus", 48000, 2),
        CodecInfo(8, "PCMA", 8000, 1),
    ]
    
    negotiated = negotiator.negotiate_codecs(offered)
    
    assert len(negotiated) == 3
    assert negotiated[0].name == "opus"  # First preference

def test_generate_answer(negotiator):
    """Test SDP answer generation."""
    # Create minimal parsed offer
    from bridge.webrtc.sdp_negotiator import ParsedSDP, MediaDescription, CodecInfo
    
    offer = ParsedSDP(
        version=0,
        origin="",
        session_name="",
        timing="0 0",
        media=[
            MediaDescription(
                media_type="audio",
                port=9,
                protocol="UDP/TLS/RTP/SAVPF",
                formats=[111],
                codecs=[CodecInfo(111, "opus", 48000, 2)],
                ice_ufrag="remote",
                ice_pwd="remotepassword",
                setup="actpass",
                mid="0",
            )
        ],
    )
    
    answer = negotiator.generate_answer(
        offer,
        local_fingerprint="sha-256 AA:BB:CC",
        local_ice_ufrag="local",
        local_ice_pwd="localpassword",
    )
    
    assert "v=0" in answer
    assert "a=ice-ufrag:local" in answer
    assert "opus" in answer

15.3 Integration Test Examples \

# tests/integration/test_bridge_audio_flow.py

import pytest
import asyncio
import numpy as np
from unittest.mock import AsyncMock, MagicMock

@pytest.fixture
async def mock_bridge():
    """Create a bridge with mocked external connections."""
    from bridge.audio_bridge import AudioBridge, BridgeConfig
    from bridge.goto.connection_handler import GoToCallInfo
    
    # Create bridge with mocked dependencies
    bridge = AudioBridge(
        call_id="test_call_001",
        call_info=GoToCallInfo(
            call_id="test_call_001",
            external_call_id="goto_123",
            direction="inbound",
            caller_number="+15551234567",
            callee_number="+15559876543",
            line_id="line_001",
            started_at=0,
        ),
        goto_client=AsyncMock(),
        event_listener=AsyncMock(),
        livekit_config=MagicMock(),
    )
    
    # Mock the internal connections
    bridge._goto = AsyncMock()
    bridge._goto.is_connected = True
    bridge._livekit = AsyncMock()
    bridge._livekit.is_connected = True
    
    return bridge

@pytest.mark.asyncio
async def test_inbound_audio_flow(mock_bridge):
    """Test audio flowing from GoTo to LiveKit."""
    received_audio = []
    
    # Capture audio sent to LiveKit
    async def capture_audio(data):
        received_audio.append(data)
    
    mock_bridge._livekit.publish_audio = capture_audio
    
    # Simulate incoming audio from GoTo
    test_audio = np.random.randint(-1000, 1000, 960, dtype=np.int16)
    
    # Create mock frame
    mock_frame = MagicMock()
    mock_frame.to_ndarray.return_value = test_audio.reshape(1, -1)
    mock_frame.sample_rate = 48000
    
    # Process the frame
    await mock_bridge._handle_goto_audio(mock_frame)
    
    # Verify audio was forwarded
    assert len(received_audio) > 0

@pytest.mark.asyncio
async def test_outbound_audio_flow(mock_bridge):
    """Test audio flowing from LiveKit to GoTo."""
    # Start the bridge
    mock_bridge._running = True
    
    # Add audio to outbound buffer
    test_audio = np.random.randint(-1000, 1000, 960, dtype=np.int16)
    await mock_bridge._outbound_buffer.write(test_audio.reshape(-1, 1))
    
    # Verify buffer has audio
    assert mock_bridge._outbound_buffer.available_samples == 960

15.4 Load Test Framework \

# tests/load/bridge_load_test.py

import asyncio
import time
from dataclasses import dataclass
from typing import List
import statistics

@dataclass
class LoadTestResult:
    """Results from a load test run."""
    total_calls: int
    successful_calls: int
    failed_calls: int
    avg_setup_time_ms: float
    p95_setup_time_ms: float
    max_concurrent: int
    duration_seconds: float

class BridgeLoadTester:
    """Load testing framework for the WebRTC bridge."""
    
    def __init__(
        self,
        bridge_manager: "BridgeManager",
        concurrent_calls: int = 10,
        call_duration_seconds: float = 30,
        total_calls: int = 100,
    ):
        self.bridge_manager = bridge_manager
        self.concurrent_calls = concurrent_calls
        self.call_duration_seconds = call_duration_seconds
        self.total_calls = total_calls
        
        self._setup_times: List[float] = []
        self._successful = 0
        self._failed = 0
        self._max_concurrent = 0
    
    async def run(self) -> LoadTestResult:
        """Run the load test."""
        start_time = time.time()
        
        # Create semaphore for concurrency control
        semaphore = asyncio.Semaphore(self.concurrent_calls)
        
        # Create all call tasks
        tasks = [
            self._simulate_call(i, semaphore)
            for i in range(self.total_calls)
        ]
        
        # Run all tasks
        await asyncio.gather(*tasks, return_exceptions=True)
        
        duration = time.time() - start_time
        
        return LoadTestResult(
            total_calls=self.total_calls,
            successful_calls=self._successful,
            failed_calls=self._failed,
            avg_setup_time_ms=statistics.mean(self._setup_times) if self._setup_times else 0,
            p95_setup_time_ms=self._percentile(self._setup_times, 95),
            max_concurrent=self._max_concurrent,
            duration_seconds=duration,
        )
    
    async def _simulate_call(
        self,
        call_index: int,
        semaphore: asyncio.Semaphore,
    ) -> None:
        """Simulate a single call."""
        async with semaphore:
            call_id = f"load_test_{call_index}"
            
            # Track concurrency
            current = self.bridge_manager.active_bridges
            self._max_concurrent = max(self._max_concurrent, current + 1)
            
            try:
                setup_start = time.time()
                
                # Create bridge
                bridge = await self.bridge_manager.create_bridge(
                    call_id=call_id,
                    call_info=self._create_test_call_info(call_index),
                )
                
                setup_time = (time.time() - setup_start) * 1000
                self._setup_times.append(setup_time)
                
                # Simulate call duration
                await asyncio.sleep(self.call_duration_seconds)
                
                self._successful += 1
                
            except Exception as e:
                self._failed += 1
                
            finally:
                # Clean up
                await self.bridge_manager.remove_bridge(call_id)
    
    def _create_test_call_info(self, index: int) -> "GoToCallInfo":
        """Create test call info."""
        from bridge.goto.connection_handler import GoToCallInfo
        
        return GoToCallInfo(
            call_id=f"load_test_{index}",
            external_call_id=f"goto_load_{index}",
            direction="inbound",
            caller_number=f"+1555000{index:04d}",
            callee_number="+15559999999",
            line_id="test_line",
            started_at=time.time(),
        )
    
    @staticmethod
    def _percentile(data: List[float], percentile: int) -> float:
        """Calculate percentile."""
        if not data:
            return 0
        sorted_data = sorted(data)
        index = int(len(sorted_data) * percentile / 100)
        return sorted_data[min(index, len(sorted_data) - 1)]

  1. Appendix \

16.1 SDP Reference \

┌─────────────────────────────────────────────────────────────────────────────┐
│                         SDP QUICK REFERENCE                                 │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   SESSION DESCRIPTION:                                                      │
│   v=0                           Protocol version                            │
│   o=- 123 1 IN IP4 0.0.0.0     Origin (session ID, version)                │
│   s=-                           Session name                                │
│   t=0 0                         Timing (start/end)                          │
│                                                                             │
│   MEDIA DESCRIPTION:                                                        │
│   m=audio 9 UDP/TLS/RTP/SAVPF 111 0                                        │
│     └─────┘ └┘ └─────────────┘ └────┘                                      │
│      type  port   protocol     formats                                      │
│                                                                             │
│   COMMON ATTRIBUTES:                                                        │
│   a=rtpmap:111 opus/48000/2    Codec mapping                               │
│   a=fmtp:111 useinbandfec=1    Format parameters                           │
│   a=ice-ufrag:xxxx             ICE credentials                             │
│   a=ice-pwd:yyyy               ICE password                                │
│   a=fingerprint:sha-256 AA:BB  DTLS fingerprint                            │
│   a=setup:actpass              DTLS role                                   │
│   a=mid:0                      Media ID for BUNDLE                         │
│   a=sendrecv                   Direction                                   │
│   a=rtcp-mux                   Multiplex RTP/RTCP                          │
│                                                                             │
│   ICE CANDIDATE:                                                            │
│   a=candidate:1 1 UDP 2122260223 192.168.1.1 54321 typ host               │
│              └─┘ └┘ └─┘ └────────┘ └─────────┘ └───┘ └─────┘               │
│           found comp proto priority    IP      port   type                  │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

16.2 Audio Format Reference \

┌─────────────────────────────────────────────────────────────────────────────┐
│                       AUDIO FORMAT REFERENCE                                │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   FORMAT       SAMPLE RATE    CHANNELS    BITS    BITRATE                  │
│   ─────────────────────────────────────────────────────────────            │
│   Opus         48000 Hz       2           16      6-510 kbps (VBR)         │
│   PCMU (G.711) 8000 Hz        1           8       64 kbps                  │
│   PCMA (G.711) 8000 Hz        1           8       64 kbps                  │
│   PCM (raw)    16000 Hz       1           16      256 kbps                 │
│                                                                             │
│   FRAME SIZES (20ms):                                                       │
│   • 48kHz stereo: 1920 samples (3840 bytes)                                │
│   • 48kHz mono:   960 samples (1920 bytes)                                 │
│   • 16kHz mono:   320 samples (640 bytes)                                  │
│   • 8kHz mono:    160 samples (320 bytes)                                  │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

Document Revision History \

VersionDateAuthorChanges
1.02026-01-16EngineeringInitial draft
Last modified on April 18, 2026