Skip to main content
Normalized for Mintlify from knowledge-base/aiconnected-apps-and-modules/modules/aiConnected-voice/GoToConnect-integration-spec.mdx.

Voice by aiConnected — GoToConnect Integration Specification \

Document Information \

FieldValue
Document IDARCH-002
Version1.0
Last Updated2026-01-16
StatusDraft
OwnerEngineering
DependenciesARCH-001 (System Architecture Overview)

Table of Contents \

Voice by aiConnected — GoToConnect Integration Specification Document Information Table of Contents 1. Introduction 1.1 Purpose 1.2 Scope 1.3 Prerequisites 1.4 API Versions 2. GoToConnect Platform Overview 2.1 Architecture Context 2.2 Key Concepts 2.2.1 Lines and Extensions 2.2.2 Users and Accounts 2.2.3 Web Calls 2.2.4 Call Events 2.3 Our Integration Points 3. Authentication and Authorization 3.1 OAuth 2.0 Overview 3.2 OAuth Application Setup 3.2.1 Register Application 3.2.2 Create Service User 3.3 Token Management 3.3.1 Token Request 3.3.2 Token Response Structure 3.4 Required Scopes 3.5 Authentication Error Handling 4. WebRTC Integration 4.1 WebRTC Overview 4.2 Web Calls API 4.2.1 API Base Configuration 4.2.2 Initiate Outbound Call 4.2.3 Answer Inbound Call 4.3 SDP Exchange 4.3.1 SDP Offer Structure (from GoToConnect) 4.3.2 SDP Answer Generation 4.3.3 Codec Preferences 4.4 ICE Candidate Exchange 4.4.1 Trickle ICE 4.4.2 Handling Remote ICE Candidates 4.5 Audio Stream Handling 4.5.1 Receiving Audio from GoToConnect 4.5.2 Sending Audio to GoToConnect 5. Call Control API 5.1 Call Control Operations 5.1.1 Hold Call 5.1.2 Resume Call 5.1.3 Mute/Unmute 5.1.4 Send DTMF 5.1.5 Hang Up 5.2 Transfer Operations 5.2.1 Blind Transfer 5.2.2 Warm Transfer (Attended Transfer) 5.2.3 Conference (Merge Calls) 5.3 Call Control State Machine 5.4 Call Control Client 6. Event Subscriptions 6.1 Event System Overview 6.2 Create Notification Channel 6.3 Subscribe to Events 6.4 WebSocket Connection 6.5 Event Types 6.5.1 call.ringing 6.5.2 call.connected 6.5.3 call.ended 6.5.4 call.held 6.5.5 call.resumed 6.5.6 call.transferred 6.5.7 call.ice_candidate 6.5.8 call.dtmf 6.6 Event Handler Implementation 6.7 Subscription Management 7. Phone Number Management 7.1 Lines API 7.1.1 List Lines 7.1.2 Get Line Details 7.2 Phone Number to Line Mapping 8. Error Handling 8.1 Error Categories 8.2 Error Response Handling 8.3 Retry Logic 8.4 WebSocket Reconnection 9. Rate Limits and Quotas 9.1 GoToConnect Rate Limits 9.2 Rate Limit Handling 9.3 Quota Monitoring 10. Security Considerations 10.1 Credential Storage 10.2 Token Security 10.3 WebSocket Security 10.4 Audit Logging 11. Testing Strategy 11.1 Mock Server 11.2 Integration Tests 11.3 Unit Tests 12. Implementation Guide 12.1 Setup Checklist 12.2 Configuration Template 12.3 Service Initialization 13. Troubleshooting 13.1 Common Issues Authentication Failures WebRTC Issues Event Subscription Issues 13.2 Diagnostic Commands 13.3 Debug Logging 14. API Reference Summary 14.1 Authentication API 14.2 Web Calls API 14.3 Call Events API 14.4 Users/Lines API Appendix A: SDP Templates A.1 Minimal SDP Offer A.2 Full SDP Offer (GoToConnect) Appendix B: Event Schemas B.1 Common Event Structure B.2 Event Type Reference Document History

  1. Introduction \

1.1 Purpose \

This document provides a comprehensive specification for integrating Voice by aiConnected with GoToConnect’s telephony platform. It covers authentication, WebRTC session management, call control operations, and real-time event handling. GoToConnect serves as our PSTN gateway, providing:
  • Inbound and outbound call connectivity
  • WebRTC-based audio transport
  • Call control operations (transfer, hold, merge)
  • Real-time call event notifications

1.2 Scope \

This document covers:
  • OAuth 2.0 authentication flow with GoToConnect
  • WebRTC session establishment and management
  • Complete call control API mapping
  • WebSocket-based event subscription system
  • Error handling and recovery patterns
  • Security and compliance considerations
This document does not cover:
  • Internal service architecture (see ARCH-001)
  • LiveKit integration (see ARCH-003)
  • Voice pipeline implementation (see ARCH-004)

1.3 Prerequisites \

Before implementing this integration, ensure you have:
  • GoToConnect account with API access enabled
  • OAuth application registered in GoTo Developer Portal
  • Understanding of WebRTC fundamentals
  • Familiarity with OAuth 2.0 flows

1.4 API Versions \

APIVersionBase URL
Authenticationv2https://authentication.logmeininc.com
Web Callsv1https://api.goto.com/web-calls/v1
Call Eventsv1https://api.goto.com/call-events/v1
Usersv1https://api.goto.com/users/v1
Linesv1https://api.goto.com/lines/v1

  1. GoToConnect Platform Overview \

2.1 Architecture Context \

┌─────────────────────────────────────────────────────────────────────────────┐
│                    GOTOCONNECT INTEGRATION CONTEXT                          │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│                              PSTN Network                                   │
│                                   │                                         │
│                                   │ Phone calls                             │
│                                   ▼                                         │
│  ┌─────────────────────────────────────────────────────────────────────┐   │
│  │                        GOTOCONNECT CLOUD                             │   │
│  │                                                                      │   │
│  │  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐               │   │
│  │  │     PBX      │  │   WebRTC     │  │    Event     │               │   │
│  │  │   (Jive)     │  │   Gateway    │  │    System    │               │   │
│  │  └──────────────┘  └──────────────┘  └──────────────┘               │   │
│  │         │                 │                 │                        │   │
│  │         │                 │                 │                        │   │
│  │  ┌──────▼─────────────────▼─────────────────▼──────┐                │   │
│  │  │                    API LAYER                     │                │   │
│  │  │                                                  │                │   │
│  │  │  • REST APIs (call control, users, lines)       │                │   │
│  │  │  • WebRTC signaling (SDP exchange)              │                │   │
│  │  │  • WebSocket (real-time events)                 │                │   │
│  │  │                                                  │                │   │
│  │  └──────────────────────┬───────────────────────────┘                │   │
│  │                         │                                            │   │
│  └─────────────────────────┼────────────────────────────────────────────┘   │
│                            │                                                │
│                            │ HTTPS / WSS                                    │
│                            │                                                │
│  ┌─────────────────────────▼────────────────────────────────────────────┐   │
│  │                    VOICE BY AICONNECTED                              │   │
│  │                                                                      │   │
│  │  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐               │   │
│  │  │    OAuth     │  │   WebRTC     │  │    Event     │               │   │
│  │  │   Manager    │  │   Bridge     │  │  Subscriber  │               │   │
│  │  └──────────────┘  └──────────────┘  └──────────────┘               │   │
│  │                                                                      │   │
│  └──────────────────────────────────────────────────────────────────────┘   │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

2.2 Key Concepts \

2.2.1 Lines and Extensions \

In GoToConnect, a line represents a phone endpoint:
  • Each line has a unique extension number
  • Lines can have one or more associated phone numbers (DIDs)
  • Our AI agents will be assigned to specific lines

2.2.2 Users and Accounts \

  • Account: The top-level organizational entity
  • User: An individual with login credentials
  • Service User: A programmatic user for API access (what we use)

2.2.3 Web Calls \

GoToConnect’s Web Calls API enables browser-based calling:
  • Provides WebRTC signaling endpoints
  • Manages call state via REST
  • Supports full call control (hold, transfer, merge)

2.2.4 Call Events \

Real-time notifications delivered via WebSocket:
  • Call state changes (ringing, connected, ended)
  • DTMF tones
  • Recording status
  • Error conditions

2.3 Our Integration Points \

Integration PointPurposeProtocol
OAuth APIAuthenticationHTTPS
Web Calls APICall control, WebRTC signalingHTTPS
Call Events APISubscription managementHTTPS
Event WebSocketReal-time notificationsWSS
Users APIUser/line lookupHTTPS
Lines APILine configurationHTTPS

  1. Authentication and Authorization \

3.1 OAuth 2.0 Overview \

GoToConnect uses OAuth 2.0 for API authentication. For server-to-server integration, we use the Client Credentials flow with a Service User.
┌─────────────────────────────────────────────────────────────────────────────┐
│                      OAUTH 2.0 CLIENT CREDENTIALS FLOW                      │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│  ┌──────────────┐                              ┌──────────────┐             │
│  │   Voice by   │                              │     GoTo     │             │
│  │ aiConnected  │                              │   Auth API   │             │
│  └──────┬───────┘                              └──────┬───────┘             │
│         │                                             │                     │
│         │  1. POST /oauth/token                       │                     │
│         │     grant_type=password                     │                     │
│         │     username={service_user}                 │                     │
│         │     password={service_password}             │                     │
│         │     client_id={client_id}                   │                     │
│         │────────────────────────────────────────────▶│                     │
│         │                                             │                     │
│         │  2. 200 OK                                  │                     │
│         │     {                                       │                     │
│         │       "access_token": "eyJ...",             │                     │
│         │       "token_type": "Bearer",               │                     │
│         │       "expires_in": 3600,                   │                     │
│         │       "refresh_token": "abc...",            │                     │
│         │       "account_key": "123...",              │                     │
│         │       "organizer_key": "456..."             │                     │
│         │     }                                       │                     │
│         │◀────────────────────────────────────────────│                     │
│         │                                             │                     │
│         │  3. API Request                             │                     │
│         │     Authorization: Bearer eyJ...            │                     │
│         │────────────────────────────────────────────▶│                     │
│         │                                             │                     │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

3.2 OAuth Application Setup \

3.2.1 Register Application \

  1. Navigate to GoTo Developer Portal
  2. Create a new OAuth application
  3. Configure the following settings:
application:
  name: "Voice by aiConnected"
  description: "AI Voice Agent Platform"
  redirect_uris:
    - "https://api.aiconnected.io/oauth/callback"  # Not used for client credentials
  
  scopes:
    # Required scopes
    - "webrtc.v1.write"                    # WebRTC call management
    - "call-events.v1.notifications.manage" # Event subscriptions
    - "call-control.v1.calls.write"        # Call control operations
    - "users.v1.lines.read"                # Read line information
    - "users.v1.users.read"                # Read user information
    
  grant_types:
    - "password"           # For service user authentication
    - "refresh_token"      # For token refresh

3.2.2 Create Service User \

In GoToConnect Admin Portal:
  1. Create a dedicated user for API access
  2. Assign appropriate permissions:
    • Make and receive calls
    • Access to required lines/extensions
    • API access enabled
  3. Store credentials securely:
service_user:
  username: "aiconnected-service@yourdomain.com"
  password: "${GOTO_SERVICE_PASSWORD}"  # Stored in secrets manager

3.3 Token Management \

3.3.1 Token Request \

# integrations/gotoconnect/auth.py

import httpx
from datetime import datetime, timedelta
from typing import Optional
import asyncio

class GoToAuthManager:
    """Manages OAuth tokens for GoToConnect API access."""
    
    AUTH_URL = "https://authentication.logmeininc.com/oauth/token"
    
    def __init__(
        self,
        client_id: str,
        username: str,
        password: str
    ):
        self.client_id = client_id
        self.username = username
        self.password = password
        
        self._access_token: Optional[str] = None
        self._refresh_token: Optional[str] = None
        self._expires_at: Optional[datetime] = None
        self._account_key: Optional[str] = None
        self._organizer_key: Optional[str] = None
        self._lock = asyncio.Lock()
    
    async def get_access_token(self) -> str:
        """Get a valid access token, refreshing if necessary."""
        async with self._lock:
            if self._is_token_valid():
                return self._access_token
            
            if self._refresh_token:
                try:
                    await self._refresh_access_token()
                    return self._access_token
                except Exception:
                    pass  # Fall through to full auth
            
            await self._authenticate()
            return self._access_token
    
    def _is_token_valid(self) -> bool:
        """Check if current token is valid with buffer."""
        if not self._access_token or not self._expires_at:
            return False
        # Refresh 5 minutes before expiry
        return datetime.utcnow() < (self._expires_at - timedelta(minutes=5))
    
    async def _authenticate(self) -> None:
        """Perform full authentication with username/password."""
        async with httpx.AsyncClient() as client:
            response = await client.post(
                self.AUTH_URL,
                data={
                    "grant_type": "password",
                    "username": self.username,
                    "password": self.password,
                    "client_id": self.client_id,
                },
                headers={
                    "Content-Type": "application/x-www-form-urlencoded",
                    "Accept": "application/json",
                }
            )
            response.raise_for_status()
            self._process_token_response(response.json())
    
    async def _refresh_access_token(self) -> None:
        """Refresh the access token using refresh token."""
        async with httpx.AsyncClient() as client:
            response = await client.post(
                self.AUTH_URL,
                data={
                    "grant_type": "refresh_token",
                    "refresh_token": self._refresh_token,
                    "client_id": self.client_id,
                },
                headers={
                    "Content-Type": "application/x-www-form-urlencoded",
                    "Accept": "application/json",
                }
            )
            response.raise_for_status()
            self._process_token_response(response.json())
    
    def _process_token_response(self, data: dict) -> None:
        """Process and store token response."""
        self._access_token = data["access_token"]
        self._refresh_token = data.get("refresh_token")
        self._account_key = data.get("account_key")
        self._organizer_key = data.get("organizer_key")
        
        expires_in = data.get("expires_in", 3600)
        self._expires_at = datetime.utcnow() + timedelta(seconds=expires_in)
    
    @property
    def account_key(self) -> Optional[str]:
        """Get the account key from authentication."""
        return self._account_key
    
    @property
    def organizer_key(self) -> Optional[str]:
        """Get the organizer key (user key) from authentication."""
        return self._organizer_key

3.3.2 Token Response Structure \

{
  "access_token": "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9...",
  "token_type": "Bearer",
  "expires_in": 3600,
  "refresh_token": "abc123def456...",
  "scope": "webrtc.v1.write call-events.v1.notifications.manage ...",
  "account_key": "1234567890",
  "organizer_key": "9876543210",
  "principal": "aiconnected-service@yourdomain.com"
}

3.4 Required Scopes \

ScopePurposeRequired For
webrtc.v1.writeCreate and manage WebRTC callsAll call operations
call-events.v1.notifications.manageSubscribe to call eventsReal-time notifications
call-control.v1.calls.writeTransfer, hold, merge callsCall control features
users.v1.lines.readRead line/extension infoLine lookup
users.v1.users.readRead user infoUser validation

3.5 Authentication Error Handling \

class GoToAuthError(Exception):
    """Base exception for GoTo authentication errors."""
    pass

class InvalidCredentialsError(GoToAuthError):
    """Raised when credentials are invalid."""
    pass

class TokenExpiredError(GoToAuthError):
    """Raised when token has expired and refresh failed."""
    pass

class InsufficientScopesError(GoToAuthError):
    """Raised when token lacks required scopes."""
    pass

# Error handling in auth manager
async def _authenticate(self) -> None:
    try:
        async with httpx.AsyncClient() as client:
            response = await client.post(self.AUTH_URL, ...)
            
            if response.status_code == 400:
                error_data = response.json()
                if error_data.get("error") == "invalid_grant":
                    raise InvalidCredentialsError(
                        "Invalid username or password"
                    )
                raise GoToAuthError(f"Auth failed: {error_data}")
            
            if response.status_code == 401:
                raise InvalidCredentialsError("Authentication failed")
            
            response.raise_for_status()
            self._process_token_response(response.json())
            
    except httpx.HTTPError as e:
        raise GoToAuthError(f"HTTP error during authentication: {e}")

  1. WebRTC Integration \

4.1 WebRTC Overview \

GoToConnect provides WebRTC endpoints for browser-based calling. Our WebRTC Bridge uses these to establish audio connections with callers.
┌─────────────────────────────────────────────────────────────────────────────┐
│                        WEBRTC CALL FLOW                                     │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   PSTN Caller          GoToConnect           WebRTC Bridge        LiveKit   │
│        │                    │                      │                 │      │
│        │  1. Inbound call   │                      │                 │      │
│        │───────────────────▶│                      │                 │      │
│        │                    │                      │                 │      │
│        │                    │  2. Event: call.ringing               │      │
│        │                    │─────────────────────▶│                 │      │
│        │                    │                      │                 │      │
│        │                    │  3. POST /calls/{id}/answer            │      │
│        │                    │◀─────────────────────│                 │      │
│        │                    │                      │                 │      │
│        │                    │  4. SDP Offer        │                 │      │
│        │                    │─────────────────────▶│                 │      │
│        │                    │                      │                 │      │
│        │                    │                      │  5. Create room │      │
│        │                    │                      │────────────────▶│      │
│        │                    │                      │                 │      │
│        │                    │  6. SDP Answer       │                 │      │
│        │                    │◀─────────────────────│                 │      │
│        │                    │                      │                 │      │
│        │                    │  7. ICE Candidates   │                 │      │
│        │                    │◀────────────────────▶│                 │      │
│        │                    │                      │                 │      │
│        │  8. Audio stream   │  9. Audio stream     │  10. Audio      │      │
│        │◀──────────────────▶│◀────────────────────▶│◀───────────────▶│      │
│        │                    │                      │                 │      │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

4.2 Web Calls API \

4.2.1 API Base Configuration \

# integrations/gotoconnect/config.py

WEB_CALLS_BASE_URL = "https://api.goto.com/web-calls/v1"

# Endpoints
ENDPOINTS = {
    "calls": f"{WEB_CALLS_BASE_URL}/calls",
    "call": f"{WEB_CALLS_BASE_URL}/calls/{{call_id}}",
    "answer": f"{WEB_CALLS_BASE_URL}/calls/{{call_id}}/answer",
    "hangup": f"{WEB_CALLS_BASE_URL}/calls/{{call_id}}/hangup",
    "hold": f"{WEB_CALLS_BASE_URL}/calls/{{call_id}}/hold",
    "resume": f"{WEB_CALLS_BASE_URL}/calls/{{call_id}}/resume",
    "mute": f"{WEB_CALLS_BASE_URL}/calls/{{call_id}}/mute",
    "unmute": f"{WEB_CALLS_BASE_URL}/calls/{{call_id}}/unmute",
    "dtmf": f"{WEB_CALLS_BASE_URL}/calls/{{call_id}}/dtmf",
    "blind_transfer": f"{WEB_CALLS_BASE_URL}/calls/{{call_id}}/blind-transfer",
    "warm_transfer": f"{WEB_CALLS_BASE_URL}/calls/{{call_id}}/warm-transfer",
    "merge": f"{WEB_CALLS_BASE_URL}/calls/{{call_id}}/merge",
    "ice_candidates": f"{WEB_CALLS_BASE_URL}/calls/{{call_id}}/ice-candidates",
}

4.2.2 Initiate Outbound Call \

async def initiate_call(
    self,
    dial_string: str,
    caller_id: str,
    line_id: str
) -> dict:
    """
    Initiate an outbound call.
    
    Args:
        dial_string: Number to call (e.g., "tel:+15551234567")
        caller_id: Caller ID to display
        line_id: Line/extension to call from
    
    Returns:
        Call object with ID and initial SDP offer
    """
    token = await self.auth_manager.get_access_token()
    
    async with httpx.AsyncClient() as client:
        response = await client.post(
            ENDPOINTS["calls"],
            json={
                "dialString": dial_string,
                "callerId": caller_id,
                "lineId": line_id,
            },
            headers={
                "Authorization": f"Bearer {token}",
                "Content-Type": "application/json",
                "Accept": "application/json",
            }
        )
        response.raise_for_status()
        return response.json()
Request:
POST /web-calls/v1/calls
{
  "dialString": "tel:+15551234567",
  "callerId": "+15559876543",
  "lineId": "line_abc123"
}
Response:
{
  "callId": "call_xyz789",
  "state": "dialing",
  "direction": "outbound",
  "from": "+15559876543",
  "to": "+15551234567",
  "lineId": "line_abc123",
  "sdpOffer": "v=0\r\no=- 123456789 2 IN IP4 127.0.0.1\r\n...",
  "createdAt": "2026-01-16T10:30:00Z"
}

4.2.3 Answer Inbound Call \

async def answer_call(
    self,
    call_id: str,
    sdp_answer: str
) -> dict:
    """
    Answer an inbound call with SDP answer.
    
    Args:
        call_id: The call to answer
        sdp_answer: Our SDP answer
    
    Returns:
        Updated call object
    """
    token = await self.auth_manager.get_access_token()
    
    endpoint = ENDPOINTS["answer"].format(call_id=call_id)
    
    async with httpx.AsyncClient() as client:
        response = await client.post(
            endpoint,
            json={
                "sdpAnswer": sdp_answer,
            },
            headers={
                "Authorization": f"Bearer {token}",
                "Content-Type": "application/json",
                "Accept": "application/json",
            }
        )
        response.raise_for_status()
        return response.json()
Request:
POST /web-calls/v1/calls/{call_id}/answer
{
  "sdpAnswer": "v=0\r\no=- 987654321 2 IN IP4 127.0.0.1\r\n..."
}
Response:
{
  "callId": "call_xyz789",
  "state": "connected",
  "direction": "inbound",
  "from": "+15551234567",
  "to": "+15559876543",
  "lineId": "line_abc123",
  "answeredAt": "2026-01-16T10:30:05Z"
}

4.3 SDP Exchange \

4.3.1 SDP Offer Structure (from GoToConnect) \

v=0
o=- 1234567890 2 IN IP4 0.0.0.0
s=-
t=0 0
a=group:BUNDLE audio
a=msid-semantic: WMS
m=audio 9 UDP/TLS/RTP/SAVPF 111 103 104 9 0 8 106 105 13 110 112 113 126
c=IN IP4 0.0.0.0
a=rtcp:9 IN IP4 0.0.0.0
a=ice-ufrag:abcd
a=ice-pwd:efghijklmnopqrstuvwxyz
a=ice-options:trickle
a=fingerprint:sha-256 AA:BB:CC:DD:EE:FF:...
a=setup:actpass
a=mid:audio
a=extmap:1 urn:ietf:params:rtp-hdrext:ssrc-audio-level
a=sendrecv
a=rtcp-mux
a=rtpmap:111 opus/48000/2
a=rtcp-fb:111 transport-cc
a=fmtp:111 minptime=10;useinbandfec=1
a=rtpmap:103 ISAC/16000
a=rtpmap:104 ISAC/32000
a=rtpmap:9 G722/8000
a=rtpmap:0 PCMU/8000
a=rtpmap:8 PCMA/8000

4.3.2 SDP Answer Generation \

from aiortc import RTCPeerConnection, RTCSessionDescription
from aiortc.contrib.media import MediaPlayer, MediaRecorder

async def create_sdp_answer(sdp_offer: str) -> tuple[RTCPeerConnection, str]:
    """
    Create an SDP answer for a GoToConnect offer.
    
    Args:
        sdp_offer: The SDP offer from GoToConnect
    
    Returns:
        Tuple of (peer_connection, sdp_answer)
    """
    # Create peer connection with our configuration
    pc = RTCPeerConnection(configuration={
        "iceServers": [
            {"urls": "stun:stun.l.google.com:19302"},
        ]
    })
    
    # Set the remote description (offer from GoToConnect)
    offer = RTCSessionDescription(sdp=sdp_offer, type="offer")
    await pc.setRemoteDescription(offer)
    
    # Create and set local description (our answer)
    answer = await pc.createAnswer()
    await pc.setLocalDescription(answer)
    
    return pc, pc.localDescription.sdp

4.3.3 Codec Preferences \

GoToConnect supports multiple audio codecs. We prefer Opus for quality:
CODEC_PREFERENCES = [
    {
        "name": "opus",
        "clockRate": 48000,
        "channels": 2,
        "priority": 1,
        "params": {
            "minptime": "10",
            "useinbandfec": "1",
        }
    },
    {
        "name": "PCMU",  # G.711 μ-law (fallback)
        "clockRate": 8000,
        "channels": 1,
        "priority": 2,
    },
    {
        "name": "PCMA",  # G.711 A-law (fallback)
        "clockRate": 8000,
        "channels": 1,
        "priority": 3,
    },
]

4.4 ICE Candidate Exchange \

4.4.1 Trickle ICE \

GoToConnect supports trickle ICE, allowing candidates to be exchanged incrementally:
async def send_ice_candidate(
    self,
    call_id: str,
    candidate: dict
) -> None:
    """
    Send an ICE candidate to GoToConnect.
    
    Args:
        call_id: The call ID
        candidate: ICE candidate object
    """
    token = await self.auth_manager.get_access_token()
    endpoint = ENDPOINTS["ice_candidates"].format(call_id=call_id)
    
    async with httpx.AsyncClient() as client:
        response = await client.post(
            endpoint,
            json={
                "candidate": candidate["candidate"],
                "sdpMid": candidate["sdpMid"],
                "sdpMLineIndex": candidate["sdpMLineIndex"],
            },
            headers={
                "Authorization": f"Bearer {token}",
                "Content-Type": "application/json",
            }
        )
        response.raise_for_status()
Request:
POST /web-calls/v1/calls/{call_id}/ice-candidates
{
  "candidate": "candidate:1 1 UDP 2130706431 192.168.1.100 54321 typ host",
  "sdpMid": "audio",
  "sdpMLineIndex": 0
}

4.4.2 Handling Remote ICE Candidates \

ICE candidates from GoToConnect arrive via WebSocket events:
async def handle_ice_candidate_event(
    self,
    event: dict,
    peer_connection: RTCPeerConnection
) -> None:
    """
    Handle an ICE candidate event from GoToConnect.
    
    Args:
        event: The ICE candidate event
        peer_connection: Our RTCPeerConnection
    """
    candidate_data = event.get("candidate")
    
    if candidate_data:
        from aiortc import RTCIceCandidate
        
        candidate = RTCIceCandidate(
            candidate=candidate_data["candidate"],
            sdpMid=candidate_data["sdpMid"],
            sdpMLineIndex=candidate_data["sdpMLineIndex"],
        )
        
        await peer_connection.addIceCandidate(candidate)

4.5 Audio Stream Handling \

4.5.1 Receiving Audio from GoToConnect \

from aiortc import MediaStreamTrack

class GoToAudioReceiver:
    """Receives audio from GoToConnect and forwards to LiveKit."""
    
    def __init__(
        self,
        peer_connection: RTCPeerConnection,
        livekit_room: Room
    ):
        self.pc = peer_connection
        self.livekit_room = livekit_room
        
        # Set up track handler
        self.pc.on("track", self._on_track)
    
    async def _on_track(self, track: MediaStreamTrack) -> None:
        """Handle incoming audio track from GoToConnect."""
        if track.kind != "audio":
            return
        
        logger.info(f"Received audio track from GoToConnect: {track.id}")
        
        # Create a track forwarder to LiveKit
        forwarder = AudioTrackForwarder(track, self.livekit_room)
        await forwarder.start()
    

class AudioTrackForwarder:
    """Forwards audio frames from GoToConnect to LiveKit."""
    
    def __init__(
        self,
        source_track: MediaStreamTrack,
        livekit_room: Room
    ):
        self.source_track = source_track
        self.livekit_room = livekit_room
        self._running = False
    
    async def start(self) -> None:
        """Start forwarding audio frames."""
        self._running = True
        
        # Create LiveKit audio source
        audio_source = rtc.AudioSource(
            sample_rate=48000,
            num_channels=1
        )
        
        # Publish track to LiveKit
        track = rtc.LocalAudioTrack.create_audio_track(
            "caller-audio",
            audio_source
        )
        
        await self.livekit_room.local_participant.publish_track(track)
        
        # Forward frames
        while self._running:
            try:
                frame = await self.source_track.recv()
                
                # Convert frame format if needed
                audio_frame = self._convert_frame(frame)
                
                # Push to LiveKit
                await audio_source.capture_frame(audio_frame)
                
            except MediaStreamError:
                logger.info("Audio track ended")
                break
    
    def _convert_frame(self, frame) -> rtc.AudioFrame:
        """Convert aiortc frame to LiveKit frame."""
        # Frame conversion logic
        return rtc.AudioFrame(
            data=frame.to_ndarray().tobytes(),
            sample_rate=frame.sample_rate,
            num_channels=frame.layout.channels,
            samples_per_channel=frame.samples,
        )
    
    async def stop(self) -> None:
        """Stop forwarding."""
        self._running = False

4.5.2 Sending Audio to GoToConnect \

class GoToAudioSender:
    """Sends audio from LiveKit to GoToConnect."""
    
    def __init__(
        self,
        peer_connection: RTCPeerConnection,
        livekit_room: Room
    ):
        self.pc = peer_connection
        self.livekit_room = livekit_room
        self._audio_track = None
    
    async def start(self) -> None:
        """Start sending audio to GoToConnect."""
        # Create an audio track for GoToConnect
        self._audio_track = AudioStreamTrack()
        
        # Add track to peer connection
        self.pc.addTrack(self._audio_track)
        
        # Subscribe to AI agent's audio in LiveKit
        @self.livekit_room.on("track_subscribed")
        async def on_track_subscribed(
            track: rtc.Track,
            publication: rtc.TrackPublication,
            participant: rtc.RemoteParticipant
        ):
            if track.kind == rtc.TrackKind.KIND_AUDIO:
                # Forward AI audio to GoToConnect
                async for frame in track:
                    await self._audio_track.send_frame(frame)


class AudioStreamTrack(MediaStreamTrack):
    """Custom audio track for sending to GoToConnect."""
    
    kind = "audio"
    
    def __init__(self):
        super().__init__()
        self._queue = asyncio.Queue()
    
    async def send_frame(self, frame: rtc.AudioFrame) -> None:
        """Queue a frame for sending."""
        await self._queue.put(frame)
    
    async def recv(self):
        """Receive next frame to send."""
        frame = await self._queue.get()
        return self._convert_to_aiortc_frame(frame)
    
    def _convert_to_aiortc_frame(self, frame: rtc.AudioFrame):
        """Convert LiveKit frame to aiortc frame."""
        from av import AudioFrame as AVAudioFrame
        
        av_frame = AVAudioFrame(
            format="s16",
            layout="mono",
            samples=frame.samples_per_channel
        )
        av_frame.sample_rate = frame.sample_rate
        av_frame.planes[0].update(frame.data)
        
        return av_frame

  1. Call Control API \

5.1 Call Control Operations \

5.1.1 Hold Call \

Place the remote party on hold (they hear hold music).
async def hold_call(self, call_id: str) -> dict:
    """
    Place a call on hold.
    
    Args:
        call_id: The call to hold
    
    Returns:
        Updated call object
    """
    token = await self.auth_manager.get_access_token()
    endpoint = ENDPOINTS["hold"].format(call_id=call_id)
    
    async with httpx.AsyncClient() as client:
        response = await client.put(
            endpoint,
            headers={
                "Authorization": f"Bearer {token}",
                "Accept": "application/json",
            }
        )
        response.raise_for_status()
        return response.json()
Request:
PUT /web-calls/v1/calls/{call_id}/hold
Response:
{
  "callId": "call_xyz789",
  "state": "held",
  "holdStartedAt": "2026-01-16T10:35:00Z"
}

5.1.2 Resume Call \

Resume a held call.
async def resume_call(self, call_id: str) -> dict:
    """
    Resume a held call.
    
    Args:
        call_id: The call to resume
    
    Returns:
        Updated call object
    """
    token = await self.auth_manager.get_access_token()
    endpoint = ENDPOINTS["resume"].format(call_id=call_id)
    
    async with httpx.AsyncClient() as client:
        response = await client.put(
            endpoint,
            headers={
                "Authorization": f"Bearer {token}",
                "Accept": "application/json",
            }
        )
        response.raise_for_status()
        return response.json()
Request:
PUT /web-calls/v1/calls/{call_id}/resume

5.1.3 Mute/Unmute \

Control the microphone (our audio to the caller).
async def mute_call(self, call_id: str) -> dict:
    """Mute our audio (caller won't hear us)."""
    token = await self.auth_manager.get_access_token()
    endpoint = ENDPOINTS["mute"].format(call_id=call_id)
    
    async with httpx.AsyncClient() as client:
        response = await client.put(
            endpoint,
            headers={"Authorization": f"Bearer {token}"}
        )
        response.raise_for_status()
        return response.json()

async def unmute_call(self, call_id: str) -> dict:
    """Unmute our audio."""
    token = await self.auth_manager.get_access_token()
    endpoint = ENDPOINTS["unmute"].format(call_id=call_id)
    
    async with httpx.AsyncClient() as client:
        response = await client.put(
            endpoint,
            headers={"Authorization": f"Bearer {token}"}
        )
        response.raise_for_status()
        return response.json()

5.1.4 Send DTMF \

Send touch-tone digits.
async def send_dtmf(
    self,
    call_id: str,
    digits: str,
    duration_ms: int = 100,
    gap_ms: int = 50
) -> dict:
    """
    Send DTMF tones.
    
    Args:
        call_id: The call ID
        digits: Digits to send (0-9, *, #)
        duration_ms: Duration of each tone
        gap_ms: Gap between tones
    
    Returns:
        Confirmation response
    """
    token = await self.auth_manager.get_access_token()
    endpoint = ENDPOINTS["dtmf"].format(call_id=call_id)
    
    async with httpx.AsyncClient() as client:
        response = await client.post(
            endpoint,
            json={
                "digits": digits,
                "durationMs": duration_ms,
                "gapMs": gap_ms,
            },
            headers={
                "Authorization": f"Bearer {token}",
                "Content-Type": "application/json",
            }
        )
        response.raise_for_status()
        return response.json()
Request:
POST /web-calls/v1/calls/{call_id}/dtmf
{
  "digits": "1234#",
  "durationMs": 100,
  "gapMs": 50
}

5.1.5 Hang Up \

End the call.
async def hangup_call(
    self,
    call_id: str,
    reason: str = "normal"
) -> dict:
    """
    End a call.
    
    Args:
        call_id: The call to end
        reason: Reason for ending (normal, busy, rejected)
    
    Returns:
        Final call object
    """
    token = await self.auth_manager.get_access_token()
    endpoint = ENDPOINTS["hangup"].format(call_id=call_id)
    
    async with httpx.AsyncClient() as client:
        response = await client.post(
            endpoint,
            json={"reason": reason},
            headers={
                "Authorization": f"Bearer {token}",
                "Content-Type": "application/json",
            }
        )
        response.raise_for_status()
        return response.json()

5.2 Transfer Operations \

5.2.1 Blind Transfer \

Transfer the call immediately without consulting the target.
async def blind_transfer(
    self,
    call_id: str,
    dial_string: str
) -> dict:
    """
    Perform a blind (cold) transfer.
    
    The caller is immediately connected to the transfer target.
    Our connection is terminated.
    
    Args:
        call_id: The call to transfer
        dial_string: Target (e.g., "ext:1001" or "tel:+15551234567")
    
    Returns:
        Transfer result
    """
    token = await self.auth_manager.get_access_token()
    endpoint = ENDPOINTS["blind_transfer"].format(call_id=call_id)
    
    async with httpx.AsyncClient() as client:
        response = await client.post(
            endpoint,
            json={
                "dialString": dial_string,
            },
            headers={
                "Authorization": f"Bearer {token}",
                "Content-Type": "application/json",
            }
        )
        response.raise_for_status()
        return response.json()
Request:
POST /web-calls/v1/calls/{call_id}/blind-transfer
{
  "dialString": "ext:1001"
}
Response:
{
  "callId": "call_xyz789",
  "state": "transferred",
  "transferType": "blind",
  "transferTarget": "ext:1001",
  "transferredAt": "2026-01-16T10:40:00Z"
}
**Dial String Formats:** | Format | Example | Description | |--------|---------|-------------| | `ext:{extension}` | `ext:1001` | Internal extension | | `tel:{number}` | `tel:+15551234567` | External phone number | | `sip:{uri}` | `sip:user@domain.com` | SIP URI | | `voicemail:{extension}` | `voicemail:1001` | Extension's voicemail |

#### 5.2.2 Warm Transfer (Attended Transfer) {#5.2.2-warm-transfer-(attended-transfer)}

┌─────────────────────────────────────────────────────────────────────────────┐
│                        WARM TRANSFER FLOW                                   │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   Caller              AI Agent            Human Agent          GoToConnect  │
│     │                    │                     │                    │       │
│     │  1. Talking        │                     │                    │       │
│     │◀──────────────────▶│                     │                    │       │
│     │                    │                     │                    │       │
│     │                    │  2. Hold caller     │                    │       │
│     │  (hold music)      │────────────────────────────────────────▶│       │
│     │◀─ ─ ─ ─ ─ ─ ─ ─ ─ ─│                     │                    │       │
│     │                    │                     │                    │       │
│     │                    │  3. Call agent      │                    │       │
│     │                    │────────────────────────────────────────▶│       │
│     │                    │                     │◀───────────────────│       │
│     │                    │                     │                    │       │
│     │                    │  4. Brief agent     │                    │       │
│     │                    │◀───────────────────▶│                    │       │
│     │                    │  "Customer John,    │                    │       │
│     │                    │   billing issue"    │                    │       │
│     │                    │                     │                    │       │
│     │                    │  5. Complete transfer                    │       │
│     │                    │────────────────────────────────────────▶│       │
│     │                    │                     │                    │       │
│     │  6. Connected to agent                   │                    │       │
│     │◀────────────────────────────────────────▶│                    │       │
│     │                    │                     │                    │       │
│     │                    │  7. AI disconnected │                    │       │
│     │                    │◀────────────────────────────────────────│       │
│     │                    │                     │                    │       │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘
async def warm_transfer(
    self,
    call_id: str,
    dial_string: str,
    announcement: str = None
) -> dict:
    """
    Perform a warm (attended) transfer.
    
    This is a multi-step process:
    1. Place caller on hold
    2. Call the transfer target
    3. Brief the target (optional)
    4. Complete the transfer
    
    Args:
        call_id: The original call ID
        dial_string: Target to transfer to
        announcement: Message to speak to target before transfer
    
    Returns:
        Transfer result
    """
    # Step 1: Hold the original caller
    await self.hold_call(call_id)
    
    # Step 2: Initiate call to transfer target
    consult_call = await self.initiate_call(
        dial_string=dial_string,
        caller_id=self._get_caller_id(call_id),
        line_id=self._get_line_id(call_id),
    )
    
    consult_call_id = consult_call["callId"]
    
    # Step 3: Wait for target to answer
    # (This would be handled via events in practice)
    
    # Step 4: Complete the transfer
    token = await self.auth_manager.get_access_token()
    endpoint = ENDPOINTS["warm_transfer"].format(call_id=call_id)
    
    async with httpx.AsyncClient() as client:
        response = await client.post(
            endpoint,
            json={
                "referId": consult_call_id,
            },
            headers={
                "Authorization": f"Bearer {token}",
                "Content-Type": "application/json",
            }
        )
        response.raise_for_status()
        return response.json()
Request:
POST /web-calls/v1/calls/{call_id}/warm-transfer
{
  "referId": "call_consult_abc"
}

5.2.3 Conference (Merge Calls) \

Add a third party to an existing call.
async def merge_calls(
    self,
    call_id: str,
    other_call_id: str
) -> dict:
    """
    Merge two calls into a conference.
    
    Both parties will be connected together with us.
    
    Args:
        call_id: First call ID
        other_call_id: Second call ID to merge
    
    Returns:
        Merged call result
    """
    token = await self.auth_manager.get_access_token()
    endpoint = ENDPOINTS["merge"].format(call_id=call_id)
    
    async with httpx.AsyncClient() as client:
        response = await client.post(
            endpoint,
            json={
                "referId": other_call_id,
            },
            headers={
                "Authorization": f"Bearer {token}",
                "Content-Type": "application/json",
            }
        )
        response.raise_for_status()
        return response.json()
Request:
POST /web-calls/v1/calls/{call_id}/merge
{
  "referId": "call_supervisor_xyz"
}

5.3 Call Control State Machine \

┌─────────────────────────────────────────────────────────────────────────────┐
│                     CALL CONTROL STATE MACHINE                              │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│                                                                             │
│                    ┌─────────────────────────────────────────┐             │
│                    │                                         │             │
│                    │              ┌─────────┐                │             │
│                    │              │ INITIAL │                │             │
│                    │              └────┬────┘                │             │
│                    │                   │                     │             │
│                    │    ┌──────────────┼──────────────┐      │             │
│                    │    │              │              │      │             │
│                    │    ▼              ▼              ▼      │             │
│                    │ ┌──────┐    ┌─────────┐    ┌────────┐   │             │
│                    │ │DIALING    │ RINGING │    │OFFERING│   │             │
│                    │ │(outbound) │(inbound)│    │(inbound│   │             │
│                    │ └────┬─┘    └────┬────┘    │ WebRTC)│   │             │
│                    │      │           │         └───┬────┘   │             │
│                    │      │           │             │        │             │
│                    │      └───────────┼─────────────┘        │             │
│                    │                  │                      │             │
│                    │                  │ answer               │             │
│                    │                  ▼                      │             │
│                    │            ┌───────────┐                │             │
│       resume       │     ┌─────▶│ CONNECTED │◀─────┐        │             │
│          │         │     │      └─────┬─────┘      │        │             │
│          │         │     │            │            │        │             │
│          │         │     │    ┌───────┴───────┐    │        │             │
│          │         │     │    │               │    │        │             │
│          │         │     │    ▼               ▼    │        │             │
│          │         │   ┌──────────┐     ┌──────────┐│        │             │
│          └─────────┼──▶│   HELD   │     │  MUTED   ││        │             │
│                    │   └──────────┘     └──────────┘│        │             │
│                    │                                │        │             │
│                    │                                │ unmute │             │
│                    │                                │        │             │
│                    │                                         │             │
│                    │    From any state:                      │             │
│                    │                                         │             │
│                    │         hangup      transfer            │             │
│                    │            │            │                │             │
│                    │            ▼            ▼                │             │
│                    │      ┌─────────┐  ┌────────────┐        │             │
│                    │      │  ENDED  │  │TRANSFERRED │        │             │
│                    │      └─────────┘  └────────────┘        │             │
│                    │                                         │             │
│                    └─────────────────────────────────────────┘             │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

5.4 Call Control Client \

Complete client implementation:
# integrations/gotoconnect/call_control.py

from dataclasses import dataclass
from enum import Enum
from typing import Optional
import httpx

class CallState(Enum):
    INITIAL = "initial"
    DIALING = "dialing"
    RINGING = "ringing"
    OFFERING = "offering"
    CONNECTED = "connected"
    HELD = "held"
    MUTED = "muted"
    TRANSFERRED = "transferred"
    ENDED = "ended"

@dataclass
class Call:
    """Represents a GoToConnect call."""
    call_id: str
    state: CallState
    direction: str
    from_number: str
    to_number: str
    line_id: str
    created_at: str
    answered_at: Optional[str] = None
    ended_at: Optional[str] = None
    sdp_offer: Optional[str] = None
    sdp_answer: Optional[str] = None

class GoToCallControlClient:
    """
    Client for GoToConnect Call Control operations.
    
    Provides methods for all call control operations including
    hold, transfer, merge, and DTMF.
    """
    
    BASE_URL = "https://api.goto.com/web-calls/v1"
    
    def __init__(self, auth_manager: GoToAuthManager):
        self.auth_manager = auth_manager
    
    async def _request(
        self,
        method: str,
        endpoint: str,
        json: dict = None
    ) -> dict:
        """Make an authenticated request to GoToConnect."""
        token = await self.auth_manager.get_access_token()
        
        async with httpx.AsyncClient() as client:
            response = await client.request(
                method=method,
                url=f"{self.BASE_URL}{endpoint}",
                json=json,
                headers={
                    "Authorization": f"Bearer {token}",
                    "Content-Type": "application/json",
                    "Accept": "application/json",
                },
                timeout=30.0,
            )
            
            if response.status_code >= 400:
                await self._handle_error(response)
            
            if response.status_code == 204:
                return {}
            
            return response.json()
    
    async def _handle_error(self, response: httpx.Response) -> None:
        """Handle API error responses."""
        try:
            error_data = response.json()
        except Exception:
            error_data = {"message": response.text}
        
        status = response.status_code
        
        if status == 401:
            raise GoToAuthError("Authentication failed")
        elif status == 403:
            raise GoToPermissionError(
                f"Permission denied: {error_data.get('message')}"
            )
        elif status == 404:
            raise GoToNotFoundError(
                f"Resource not found: {error_data.get('message')}"
            )
        elif status == 409:
            raise GoToConflictError(
                f"Conflict: {error_data.get('message')}"
            )
        elif status == 429:
            raise GoToRateLimitError(
                f"Rate limit exceeded: {error_data.get('message')}"
            )
        else:
            raise GoToAPIError(
                f"API error {status}: {error_data.get('message')}"
            )
    
    # Call initiation
    async def initiate_call(
        self,
        dial_string: str,
        caller_id: str,
        line_id: str
    ) -> Call:
        """Initiate an outbound call."""
        data = await self._request(
            "POST",
            "/calls",
            json={
                "dialString": dial_string,
                "callerId": caller_id,
                "lineId": line_id,
            }
        )
        return self._parse_call(data)
    
    async def answer_call(
        self,
        call_id: str,
        sdp_answer: str
    ) -> Call:
        """Answer an inbound call."""
        data = await self._request(
            "POST",
            f"/calls/{call_id}/answer",
            json={"sdpAnswer": sdp_answer}
        )
        return self._parse_call(data)
    
    # Call control
    async def hangup(self, call_id: str) -> Call:
        """End a call."""
        data = await self._request("POST", f"/calls/{call_id}/hangup")
        return self._parse_call(data)
    
    async def hold(self, call_id: str) -> Call:
        """Place call on hold."""
        data = await self._request("PUT", f"/calls/{call_id}/hold")
        return self._parse_call(data)
    
    async def resume(self, call_id: str) -> Call:
        """Resume held call."""
        data = await self._request("PUT", f"/calls/{call_id}/resume")
        return self._parse_call(data)
    
    async def mute(self, call_id: str) -> Call:
        """Mute our audio."""
        data = await self._request("PUT", f"/calls/{call_id}/mute")
        return self._parse_call(data)
    
    async def unmute(self, call_id: str) -> Call:
        """Unmute our audio."""
        data = await self._request("PUT", f"/calls/{call_id}/unmute")
        return self._parse_call(data)
    
    async def send_dtmf(
        self,
        call_id: str,
        digits: str,
        duration_ms: int = 100,
        gap_ms: int = 50
    ) -> dict:
        """Send DTMF tones."""
        return await self._request(
            "POST",
            f"/calls/{call_id}/dtmf",
            json={
                "digits": digits,
                "durationMs": duration_ms,
                "gapMs": gap_ms,
            }
        )
    
    # Transfers
    async def blind_transfer(
        self,
        call_id: str,
        dial_string: str
    ) -> Call:
        """Perform blind transfer."""
        data = await self._request(
            "POST",
            f"/calls/{call_id}/blind-transfer",
            json={"dialString": dial_string}
        )
        return self._parse_call(data)
    
    async def warm_transfer(
        self,
        call_id: str,
        refer_id: str
    ) -> Call:
        """Complete warm transfer."""
        data = await self._request(
            "POST",
            f"/calls/{call_id}/warm-transfer",
            json={"referId": refer_id}
        )
        return self._parse_call(data)
    
    async def merge(
        self,
        call_id: str,
        refer_id: str
    ) -> Call:
        """Merge calls into conference."""
        data = await self._request(
            "POST",
            f"/calls/{call_id}/merge",
            json={"referId": refer_id}
        )
        return self._parse_call(data)
    
    # ICE candidates
    async def send_ice_candidate(
        self,
        call_id: str,
        candidate: str,
        sdp_mid: str,
        sdp_m_line_index: int
    ) -> None:
        """Send ICE candidate."""
        await self._request(
            "POST",
            f"/calls/{call_id}/ice-candidates",
            json={
                "candidate": candidate,
                "sdpMid": sdp_mid,
                "sdpMLineIndex": sdp_m_line_index,
            }
        )
    
    # Helpers
    def _parse_call(self, data: dict) -> Call:
        """Parse API response into Call object."""
        return Call(
            call_id=data["callId"],
            state=CallState(data["state"]),
            direction=data.get("direction", "unknown"),
            from_number=data.get("from", ""),
            to_number=data.get("to", ""),
            line_id=data.get("lineId", ""),
            created_at=data.get("createdAt", ""),
            answered_at=data.get("answeredAt"),
            ended_at=data.get("endedAt"),
            sdp_offer=data.get("sdpOffer"),
            sdp_answer=data.get("sdpAnswer"),
        )

  1. Event Subscriptions \

6.1 Event System Overview \

GoToConnect provides real-time events via WebSocket. Events notify us of call state changes, allowing reactive handling.
┌─────────────────────────────────────────────────────────────────────────────┐
│                         EVENT SUBSCRIPTION FLOW                             │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│  ┌──────────────┐                              ┌──────────────┐             │
│  │   Voice by   │                              │  GoToConnect │             │
│  │ aiConnected  │                              │              │             │
│  └──────┬───────┘                              └──────┬───────┘             │
│         │                                             │                     │
│         │  1. POST /notifications/channels            │                     │
│         │     Create channel for events               │                     │
│         │────────────────────────────────────────────▶│                     │
│         │                                             │                     │
│         │  2. Response: channel_id, websocket_url     │                     │
│         │◀────────────────────────────────────────────│                     │
│         │                                             │                     │
│         │  3. POST /notifications/subscriptions       │                     │
│         │     Subscribe to call events                │                     │
│         │────────────────────────────────────────────▶│                     │
│         │                                             │                     │
│         │  4. Connect WebSocket                       │                     │
│         │═══════════════════════════════════════════▶│                     │
│         │                                             │                     │
│         │  5. Events flow via WebSocket               │                     │
│         │◀══════════════════════════════════════════▶│                     │
│         │     { "event": "call.ringing", ... }        │                     │
│         │     { "event": "call.connected", ... }      │                     │
│         │     { "event": "call.ended", ... }          │                     │
│         │                                             │                     │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

6.2 Create Notification Channel \

# integrations/gotoconnect/events.py

class GoToEventManager:
    """Manages GoToConnect event subscriptions."""
    
    EVENTS_BASE_URL = "https://api.goto.com/call-events/v1"
    
    def __init__(self, auth_manager: GoToAuthManager):
        self.auth_manager = auth_manager
        self._channel_id: Optional[str] = None
        self._websocket_url: Optional[str] = None
        self._subscriptions: dict[str, str] = {}
    
    async def create_channel(self) -> tuple[str, str]:
        """
        Create a notification channel.
        
        Returns:
            Tuple of (channel_id, websocket_url)
        """
        token = await self.auth_manager.get_access_token()
        
        async with httpx.AsyncClient() as client:
            response = await client.post(
                f"{self.EVENTS_BASE_URL}/notifications/channels",
                headers={
                    "Authorization": f"Bearer {token}",
                    "Content-Type": "application/json",
                }
            )
            response.raise_for_status()
            data = response.json()
            
            self._channel_id = data["channelId"]
            self._websocket_url = data["websocketUrl"]
            
            return self._channel_id, self._websocket_url
Request:
POST /call-events/v1/notifications/channels
Response:
{
  "channelId": "ch_abc123def456",
  "websocketUrl": "wss://realtime.goto.com/v1/notifications?channelId=ch_abc123def456",
  "expiresAt": "2026-01-16T11:30:00Z"
}

6.3 Subscribe to Events \

async def subscribe_to_calls(
    self,
    line_ids: list[str] = None,
    event_types: list[str] = None
) -> str:
    """
    Subscribe to call events.
    
    Args:
        line_ids: Specific lines to subscribe to (None = all)
        event_types: Specific events (None = all)
    
    Returns:
        Subscription ID
    """
    if not self._channel_id:
        await self.create_channel()
    
    token = await self.auth_manager.get_access_token()
    
    # Default to all call events
    if event_types is None:
        event_types = [
            "call.ringing",
            "call.connected",
            "call.ended",
            "call.held",
            "call.resumed",
            "call.transferred",
            "call.dtmf",
            "call.recording",
            "call.ice_candidate",
        ]
    
    subscription_request = {
        "channelId": self._channel_id,
        "eventTypes": event_types,
    }
    
    if line_ids:
        subscription_request["lineIds"] = line_ids
    
    async with httpx.AsyncClient() as client:
        response = await client.post(
            f"{self.EVENTS_BASE_URL}/notifications/subscriptions",
            json=subscription_request,
            headers={
                "Authorization": f"Bearer {token}",
                "Content-Type": "application/json",
            }
        )
        response.raise_for_status()
        data = response.json()
        
        subscription_id = data["subscriptionId"]
        self._subscriptions[subscription_id] = data
        
        return subscription_id
Request:
POST /call-events/v1/notifications/subscriptions
{
  "channelId": "ch_abc123def456",
  "eventTypes": [
    "call.ringing",
    "call.connected",
    "call.ended",
    "call.held",
    "call.resumed",
    "call.transferred"
  ],
  "lineIds": ["line_001", "line_002"]
}
Response:
{
  "subscriptionId": "sub_xyz789",
  "channelId": "ch_abc123def456",
  "eventTypes": ["call.ringing", "call.connected", ...],
  "lineIds": ["line_001", "line_002"],
  "createdAt": "2026-01-16T10:30:00Z"
}

6.4 WebSocket Connection \

import websockets
import json
from typing import Callable, Awaitable

EventHandler = Callable[[dict], Awaitable[None]]

class GoToEventListener:
    """
    WebSocket listener for GoToConnect events.
    """
    
    def __init__(
        self,
        websocket_url: str,
        auth_manager: GoToAuthManager
    ):
        self.websocket_url = websocket_url
        self.auth_manager = auth_manager
        self._handlers: dict[str, list[EventHandler]] = {}
        self._running = False
        self._ws = None
    
    def on(self, event_type: str, handler: EventHandler) -> None:
        """Register an event handler."""
        if event_type not in self._handlers:
            self._handlers[event_type] = []
        self._handlers[event_type].append(handler)
    
    async def start(self) -> None:
        """Start listening for events."""
        self._running = True
        
        token = await self.auth_manager.get_access_token()
        
        # Add auth token to WebSocket URL
        ws_url = f"{self.websocket_url}&token={token}"
        
        while self._running:
            try:
                async with websockets.connect(ws_url) as ws:
                    self._ws = ws
                    await self._listen(ws)
            except websockets.ConnectionClosed:
                if self._running:
                    logger.warning("WebSocket disconnected, reconnecting...")
                    await asyncio.sleep(1)
            except Exception as e:
                logger.error(f"WebSocket error: {e}")
                if self._running:
                    await asyncio.sleep(5)
    
    async def _listen(self, ws) -> None:
        """Listen for events on the WebSocket."""
        async for message in ws:
            try:
                event = json.loads(message)
                await self._dispatch(event)
            except json.JSONDecodeError:
                logger.warning(f"Invalid JSON: {message}")
            except Exception as e:
                logger.error(f"Error handling event: {e}")
    
    async def _dispatch(self, event: dict) -> None:
        """Dispatch event to registered handlers."""
        event_type = event.get("type") or event.get("event")
        
        if not event_type:
            logger.warning(f"Event without type: {event}")
            return
        
        handlers = self._handlers.get(event_type, [])
        handlers += self._handlers.get("*", [])  # Wildcard handlers
        
        for handler in handlers:
            try:
                await handler(event)
            except Exception as e:
                logger.error(f"Handler error for {event_type}: {e}")
    
    async def stop(self) -> None:
        """Stop listening."""
        self._running = False
        if self._ws:
            await self._ws.close()

6.5 Event Types \

6.5.1 call.ringing \

Fired when an inbound call arrives.
{
  "type": "call.ringing",
  "timestamp": "2026-01-16T10:30:00.123Z",
  "data": {
    "callId": "call_xyz789",
    "direction": "inbound",
    "from": "+15551234567",
    "to": "+15559876543",
    "lineId": "line_abc123",
    "sdpOffer": "v=0\r\no=- 123456789 2 IN IP4 127.0.0.1\r\n..."
  }
}

6.5.2 call.connected \

Fired when a call is answered.
{
  "type": "call.connected",
  "timestamp": "2026-01-16T10:30:05.456Z",
  "data": {
    "callId": "call_xyz789",
    "direction": "inbound",
    "from": "+15551234567",
    "to": "+15559876543",
    "lineId": "line_abc123",
    "answeredAt": "2026-01-16T10:30:05.456Z"
  }
}

6.5.3 call.ended \

Fired when a call ends.
{
  "type": "call.ended",
  "timestamp": "2026-01-16T10:35:30.789Z",
  "data": {
    "callId": "call_xyz789",
    "direction": "inbound",
    "from": "+15551234567",
    "to": "+15559876543",
    "lineId": "line_abc123",
    "durationSeconds": 325,
    "endReason": "caller_hangup",
    "endedAt": "2026-01-16T10:35:30.789Z"
  }
}
End Reasons: | Reason | Description | |--------|-------------| | caller_hangup | Remote party hung up | | agent_hangup | We hung up | | transfer | Call was transferred | | timeout | Call timed out | | error | Call failed due to error | | busy | Remote party was busy | | no_answer | Remote party didn’t answer | | rejected | Call was rejected |

6.5.4 call.held \

Fired when a call is placed on hold.
{
  "type": "call.held",
  "timestamp": "2026-01-16T10:32:00.000Z",
  "data": {
    "callId": "call_xyz789",
    "holdStartedAt": "2026-01-16T10:32:00.000Z"
  }
}

6.5.5 call.resumed \

Fired when a held call is resumed.
{
  "type": "call.resumed",
  "timestamp": "2026-01-16T10:33:00.000Z",
  "data": {
    "callId": "call_xyz789",
    "holdDurationSeconds": 60
  }
}

6.5.6 call.transferred \

Fired when a call is transferred.
{
  "type": "call.transferred",
  "timestamp": "2026-01-16T10:34:00.000Z",
  "data": {
    "callId": "call_xyz789",
    "transferType": "blind",
    "transferTarget": "ext:1001",
    "transferredAt": "2026-01-16T10:34:00.000Z"
  }
}

6.5.7 call.ice_candidate \

Fired when a new ICE candidate is available.
{
  "type": "call.ice_candidate",
  "timestamp": "2026-01-16T10:30:01.000Z",
  "data": {
    "callId": "call_xyz789",
    "candidate": {
      "candidate": "candidate:1 1 UDP 2130706431 192.168.1.100 54321 typ host",
      "sdpMid": "audio",
      "sdpMLineIndex": 0
    }
  }
}

6.5.8 call.dtmf \

Fired when DTMF tones are received.
{
  "type": "call.dtmf",
  "timestamp": "2026-01-16T10:31:00.000Z",
  "data": {
    "callId": "call_xyz789",
    "digit": "5",
    "durationMs": 120
  }
}

6.6 Event Handler Implementation \

class CallEventHandler:
    """Handles GoToConnect call events."""
    
    def __init__(
        self,
        event_listener: GoToEventListener,
        call_manager: CallManager,
        redis_client: Redis
    ):
        self.event_listener = event_listener
        self.call_manager = call_manager
        self.redis = redis_client
        
        # Register handlers
        self._register_handlers()
    
    def _register_handlers(self) -> None:
        """Register event handlers."""
        self.event_listener.on("call.ringing", self._on_call_ringing)
        self.event_listener.on("call.connected", self._on_call_connected)
        self.event_listener.on("call.ended", self._on_call_ended)
        self.event_listener.on("call.held", self._on_call_held)
        self.event_listener.on("call.resumed", self._on_call_resumed)
        self.event_listener.on("call.transferred", self._on_call_transferred)
        self.event_listener.on("call.ice_candidate", self._on_ice_candidate)
        self.event_listener.on("call.dtmf", self._on_dtmf)
    
    async def _on_call_ringing(self, event: dict) -> None:
        """Handle incoming call."""
        data = event["data"]
        call_id = data["callId"]
        
        logger.info(f"Incoming call: {call_id} from {data['from']}")
        
        # Look up tenant by phone number
        tenant = await self._lookup_tenant_by_number(data["to"])
        if not tenant:
            logger.warning(f"No tenant for number {data['to']}")
            return
        
        # Create call record
        call = await self.call_manager.create_call(
            external_call_id=call_id,
            tenant_id=tenant.id,
            direction="inbound",
            from_number=data["from"],
            to_number=data["to"],
            line_id=data["lineId"],
        )
        
        # Store SDP offer for WebRTC bridge
        await self.redis.hset(
            f"call:{call.id}:webrtc",
            mapping={
                "sdp_offer": data["sdpOffer"],
                "state": "ringing",
            }
        )
        
        # Publish event for WebRTC bridge
        await self.redis.publish(
            f"call:{call.id}:events",
            json.dumps({
                "event": "call.ringing",
                "call_id": str(call.id),
                "external_call_id": call_id,
                "tenant_id": str(tenant.id),
                "sdp_offer": data["sdpOffer"],
            })
        )
    
    async def _on_call_connected(self, event: dict) -> None:
        """Handle call connected."""
        data = event["data"]
        call_id = data["callId"]
        
        logger.info(f"Call connected: {call_id}")
        
        # Update call record
        call = await self.call_manager.get_by_external_id(call_id)
        if call:
            await self.call_manager.update_call(
                call.id,
                status="connected",
                answered_at=data["answeredAt"],
            )
            
            # Notify agent service to join
            await self.redis.publish(
                f"call:{call.id}:events",
                json.dumps({
                    "event": "call.connected",
                    "call_id": str(call.id),
                })
            )
    
    async def _on_call_ended(self, event: dict) -> None:
        """Handle call ended."""
        data = event["data"]
        call_id = data["callId"]
        
        logger.info(f"Call ended: {call_id}, reason: {data['endReason']}")
        
        call = await self.call_manager.get_by_external_id(call_id)
        if call:
            await self.call_manager.update_call(
                call.id,
                status="ended",
                ended_at=data["endedAt"],
                duration_seconds=data["durationSeconds"],
                end_reason=data["endReason"],
            )
            
            # Notify all services
            await self.redis.publish(
                f"call:{call.id}:events",
                json.dumps({
                    "event": "call.ended",
                    "call_id": str(call.id),
                    "duration_seconds": data["durationSeconds"],
                    "end_reason": data["endReason"],
                })
            )
            
            # Clean up Redis state
            await self._cleanup_call_state(call.id)
    
    async def _on_call_held(self, event: dict) -> None:
        """Handle call held."""
        data = event["data"]
        call_id = data["callId"]
        
        call = await self.call_manager.get_by_external_id(call_id)
        if call:
            await self.redis.publish(
                f"call:{call.id}:events",
                json.dumps({
                    "event": "call.held",
                    "call_id": str(call.id),
                })
            )
    
    async def _on_call_resumed(self, event: dict) -> None:
        """Handle call resumed."""
        data = event["data"]
        call_id = data["callId"]
        
        call = await self.call_manager.get_by_external_id(call_id)
        if call:
            await self.redis.publish(
                f"call:{call.id}:events",
                json.dumps({
                    "event": "call.resumed",
                    "call_id": str(call.id),
                })
            )
    
    async def _on_call_transferred(self, event: dict) -> None:
        """Handle call transferred."""
        data = event["data"]
        call_id = data["callId"]
        
        call = await self.call_manager.get_by_external_id(call_id)
        if call:
            await self.call_manager.update_call(
                call.id,
                status="transferred",
                end_reason=f"transferred_to_{data['transferTarget']}",
            )
    
    async def _on_ice_candidate(self, event: dict) -> None:
        """Handle ICE candidate."""
        data = event["data"]
        call_id = data["callId"]
        
        call = await self.call_manager.get_by_external_id(call_id)
        if call:
            await self.redis.publish(
                f"call:{call.id}:events",
                json.dumps({
                    "event": "call.ice_candidate",
                    "call_id": str(call.id),
                    "candidate": data["candidate"],
                })
            )
    
    async def _on_dtmf(self, event: dict) -> None:
        """Handle DTMF tone."""
        data = event["data"]
        call_id = data["callId"]
        
        call = await self.call_manager.get_by_external_id(call_id)
        if call:
            await self.redis.publish(
                f"call:{call.id}:events",
                json.dumps({
                    "event": "call.dtmf",
                    "call_id": str(call.id),
                    "digit": data["digit"],
                })
            )
    
    async def _lookup_tenant_by_number(self, phone_number: str) -> Optional[Tenant]:
        """Look up tenant by phone number."""
        # Implementation depends on your data model
        pass
    
    async def _cleanup_call_state(self, call_id: str) -> None:
        """Clean up Redis state for ended call."""
        keys_to_delete = [
            f"call:{call_id}:state",
            f"call:{call_id}:webrtc",
            f"call:{call_id}:context",
        ]
        for key in keys_to_delete:
            await self.redis.delete(key)

6.7 Subscription Management \

async def refresh_subscriptions(self) -> None:
    """Refresh event subscriptions before they expire."""
    # Channels expire after a period of inactivity
    # Refresh by creating new channel and resubscribing
    
    old_channel_id = self._channel_id
    
    # Create new channel
    await self.create_channel()
    
    # Resubscribe with same parameters
    for sub_id, sub_data in list(self._subscriptions.items()):
        await self.subscribe_to_calls(
            line_ids=sub_data.get("lineIds"),
            event_types=sub_data.get("eventTypes"),
        )
    
    logger.info(f"Refreshed subscriptions: {old_channel_id} -> {self._channel_id}")

async def unsubscribe(self, subscription_id: str) -> None:
    """Remove a subscription."""
    token = await self.auth_manager.get_access_token()
    
    async with httpx.AsyncClient() as client:
        response = await client.delete(
            f"{self.EVENTS_BASE_URL}/notifications/subscriptions/{subscription_id}",
            headers={"Authorization": f"Bearer {token}"}
        )
        response.raise_for_status()
        
        del self._subscriptions[subscription_id]

  1. Phone Number Management \

7.1 Lines API \

7.1.1 List Lines \

async def list_lines(self) -> list[dict]:
    """
    List all lines/extensions for the account.
    
    Returns:
        List of line objects
    """
    token = await self.auth_manager.get_access_token()
    account_key = self.auth_manager.account_key
    
    async with httpx.AsyncClient() as client:
        response = await client.get(
            f"https://api.goto.com/users/v1/accounts/{account_key}/lines",
            headers={
                "Authorization": f"Bearer {token}",
                "Accept": "application/json",
            }
        )
        response.raise_for_status()
        return response.json()["items"]
Response:
{
  "items": [
    {
      "lineId": "line_abc123",
      "extension": "1001",
      "displayName": "AI Agent 1",
      "type": "user",
      "status": "active",
      "phoneNumbers": [
        {
          "number": "+15559876543",
          "type": "direct"
        }
      ]
    },
    {
      "lineId": "line_def456",
      "extension": "1002",
      "displayName": "AI Agent 2",
      "type": "user",
      "status": "active",
      "phoneNumbers": [
        {
          "number": "+15551112222",
          "type": "direct"
        }
      ]
    }
  ]
}

7.1.2 Get Line Details \

async def get_line(self, line_id: str) -> dict:
    """Get details for a specific line."""
    token = await self.auth_manager.get_access_token()
    account_key = self.auth_manager.account_key
    
    async with httpx.AsyncClient() as client:
        response = await client.get(
            f"https://api.goto.com/users/v1/accounts/{account_key}/lines/{line_id}",
            headers={
                "Authorization": f"Bearer {token}",
                "Accept": "application/json",
            }
        )
        response.raise_for_status()
        return response.json()

7.2 Phone Number to Line Mapping \

We need to map incoming phone numbers to our tenant/agent configuration:
class PhoneNumberRegistry:
    """Maps phone numbers to tenants and agents."""
    
    def __init__(self, db: AsyncSession, cache: Redis):
        self.db = db
        self.cache = cache
    
    async def get_config_for_number(
        self,
        phone_number: str
    ) -> Optional[PhoneNumberConfig]:
        """
        Get configuration for a phone number.
        
        Args:
            phone_number: E.164 format phone number
        
        Returns:
            Configuration including tenant, agent, line info
        """
        # Check cache first
        cache_key = f"phone_number:{phone_number}"
        cached = await self.cache.get(cache_key)
        if cached:
            return PhoneNumberConfig.parse_raw(cached)
        
        # Query database
        result = await self.db.execute(
            select(PhoneNumber)
            .options(
                joinedload(PhoneNumber.tenant),
                joinedload(PhoneNumber.agent),
            )
            .where(PhoneNumber.number == phone_number)
            .where(PhoneNumber.status == "active")
        )
        phone_number_record = result.scalar_one_or_none()
        
        if not phone_number_record:
            return None
        
        config = PhoneNumberConfig(
            phone_number=phone_number,
            tenant_id=phone_number_record.tenant_id,
            agent_id=phone_number_record.agent_id,
            line_id=phone_number_record.line_id,
            greeting_enabled=phone_number_record.greeting_enabled,
        )
        
        # Cache for 5 minutes
        await self.cache.setex(
            cache_key,
            300,
            config.json()
        )
        
        return config
    
    async def register_number(
        self,
        phone_number: str,
        tenant_id: str,
        agent_id: str,
        line_id: str
    ) -> PhoneNumber:
        """Register a phone number for a tenant."""
        # Verify the line exists and has this number
        goto_line = await self.goto_client.get_line(line_id)
        
        has_number = any(
            pn["number"] == phone_number
            for pn in goto_line.get("phoneNumbers", [])
        )
        
        if not has_number:
            raise ValueError(
                f"Line {line_id} does not have number {phone_number}"
            )
        
        # Create database record
        phone_number_record = PhoneNumber(
            number=phone_number,
            tenant_id=tenant_id,
            agent_id=agent_id,
            line_id=line_id,
            status="active",
        )
        
        self.db.add(phone_number_record)
        await self.db.commit()
        
        # Invalidate cache
        await self.cache.delete(f"phone_number:{phone_number}")
        
        return phone_number_record

  1. Error Handling \

8.1 Error Categories \

# integrations/gotoconnect/exceptions.py

class GoToError(Exception):
    """Base exception for GoToConnect errors."""
    pass

class GoToAuthError(GoToError):
    """Authentication/authorization errors."""
    pass

class GoToAPIError(GoToError):
    """General API errors."""
    def __init__(self, message: str, status_code: int = None, error_code: str = None):
        super().__init__(message)
        self.status_code = status_code
        self.error_code = error_code

class GoToNotFoundError(GoToAPIError):
    """Resource not found (404)."""
    pass

class GoToConflictError(GoToAPIError):
    """Conflict error (409) - e.g., call already ended."""
    pass

class GoToRateLimitError(GoToAPIError):
    """Rate limit exceeded (429)."""
    def __init__(self, message: str, retry_after: int = None):
        super().__init__(message, status_code=429)
        self.retry_after = retry_after

class GoToPermissionError(GoToAPIError):
    """Permission denied (403)."""
    pass

class GoToWebRTCError(GoToError):
    """WebRTC-specific errors."""
    pass

class GoToCallError(GoToError):
    """Call operation errors."""
    pass

8.2 Error Response Handling \

async def _handle_api_response(
    self,
    response: httpx.Response,
    operation: str
) -> dict:
    """
    Handle API response with appropriate error handling.
    
    Args:
        response: HTTP response
        operation: Description of the operation
    
    Returns:
        Response JSON data
    
    Raises:
        Appropriate GoToError subclass
    """
    status = response.status_code
    
    # Success
    if 200 <= status < 300:
        if status == 204:
            return {}
        return response.json()
    
    # Parse error response
    try:
        error_data = response.json()
        error_message = error_data.get("message", response.text)
        error_code = error_data.get("code")
    except Exception:
        error_message = response.text
        error_code = None
    
    # Map to specific exceptions
    if status == 400:
        raise GoToAPIError(
            f"Bad request for {operation}: {error_message}",
            status_code=status,
            error_code=error_code,
        )
    
    elif status == 401:
        raise GoToAuthError(f"Authentication failed for {operation}")
    
    elif status == 403:
        raise GoToPermissionError(
            f"Permission denied for {operation}: {error_message}"
        )
    
    elif status == 404:
        raise GoToNotFoundError(
            f"Not found for {operation}: {error_message}"
        )
    
    elif status == 409:
        raise GoToConflictError(
            f"Conflict for {operation}: {error_message}"
        )
    
    elif status == 429:
        retry_after = response.headers.get("Retry-After")
        raise GoToRateLimitError(
            f"Rate limit exceeded for {operation}",
            retry_after=int(retry_after) if retry_after else None,
        )
    
    else:
        raise GoToAPIError(
            f"API error for {operation}: {error_message}",
            status_code=status,
            error_code=error_code,
        )

8.3 Retry Logic \

from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential,
    retry_if_exception_type,
)

class GoToClientWithRetry:
    """GoToConnect client with automatic retry for transient errors."""
    
    @retry(
        retry=retry_if_exception_type((
            httpx.TimeoutException,
            httpx.NetworkError,
            GoToRateLimitError,
        )),
        wait=wait_exponential(multiplier=1, min=1, max=60),
        stop=stop_after_attempt(5),
        before_sleep=lambda retry_state: logger.warning(
            f"Retrying {retry_state.fn.__name__}, attempt {retry_state.attempt_number}"
        ),
    )
    async def make_call_with_retry(
        self,
        dial_string: str,
        caller_id: str,
        line_id: str
    ) -> dict:
        """Make a call with automatic retry."""
        return await self.initiate_call(dial_string, caller_id, line_id)
    
    async def make_call_safe(
        self,
        dial_string: str,
        caller_id: str,
        line_id: str
    ) -> Optional[dict]:
        """Make a call, returning None on failure."""
        try:
            return await self.make_call_with_retry(dial_string, caller_id, line_id)
        except Exception as e:
            logger.error(f"Failed to make call after retries: {e}")
            return None

8.4 WebSocket Reconnection \

class ResilientEventListener:
    """Event listener with automatic reconnection."""
    
    def __init__(self, event_manager: GoToEventManager):
        self.event_manager = event_manager
        self._reconnect_delay = 1  # Start with 1 second
        self._max_reconnect_delay = 300  # Max 5 minutes
        self._running = False
    
    async def start(self) -> None:
        """Start with automatic reconnection."""
        self._running = True
        
        while self._running:
            try:
                await self._connect_and_listen()
                # Reset delay on successful connection
                self._reconnect_delay = 1
            except websockets.ConnectionClosed as e:
                logger.warning(f"WebSocket closed: {e.code} {e.reason}")
                if self._running:
                    await self._reconnect()
            except Exception as e:
                logger.error(f"WebSocket error: {e}")
                if self._running:
                    await self._reconnect()
    
    async def _connect_and_listen(self) -> None:
        """Connect and listen for events."""
        # Ensure we have a valid channel
        if not self.event_manager._channel_id:
            await self.event_manager.create_channel()
        
        # Connect WebSocket
        ws_url = self.event_manager._websocket_url
        token = await self.event_manager.auth_manager.get_access_token()
        
        async with websockets.connect(
            f"{ws_url}&token={token}",
            ping_interval=30,
            ping_timeout=10,
        ) as ws:
            logger.info("WebSocket connected")
            async for message in ws:
                await self._handle_message(message)
    
    async def _reconnect(self) -> None:
        """Reconnect with exponential backoff."""
        logger.info(f"Reconnecting in {self._reconnect_delay}s...")
        await asyncio.sleep(self._reconnect_delay)
        
        # Exponential backoff
        self._reconnect_delay = min(
            self._reconnect_delay * 2,
            self._max_reconnect_delay
        )
        
        # Refresh channel and subscriptions
        try:
            await self.event_manager.refresh_subscriptions()
        except Exception as e:
            logger.error(f"Failed to refresh subscriptions: {e}")
    
    async def _handle_message(self, message: str) -> None:
        """Handle incoming message."""
        try:
            event = json.loads(message)
            await self.event_manager._dispatch(event)
        except Exception as e:
            logger.error(f"Error handling message: {e}")

  1. Rate Limits and Quotas \

9.1 GoToConnect Rate Limits \

Endpoint CategoryLimitWindow
Authentication10per minute
Call Control100per minute
Call Events100per minute
Lines/Users60per minute

9.2 Rate Limit Handling \

import asyncio
from collections import defaultdict
from datetime import datetime, timedelta

class RateLimiter:
    """Token bucket rate limiter for GoToConnect API."""
    
    def __init__(self):
        self._buckets: dict[str, list[datetime]] = defaultdict(list)
        self._limits = {
            "auth": (10, 60),       # 10 per minute
            "call_control": (100, 60),
            "call_events": (100, 60),
            "users": (60, 60),
        }
        self._lock = asyncio.Lock()
    
    async def acquire(self, category: str) -> None:
        """
        Acquire permission to make a request.
        
        Blocks if rate limit would be exceeded.
        """
        async with self._lock:
            limit, window = self._limits.get(category, (100, 60))
            now = datetime.utcnow()
            cutoff = now - timedelta(seconds=window)
            
            # Clean old entries
            self._buckets[category] = [
                t for t in self._buckets[category]
                if t > cutoff
            ]
            
            # Check if we can proceed
            if len(self._buckets[category]) >= limit:
                # Calculate wait time
                oldest = self._buckets[category][0]
                wait_time = (oldest + timedelta(seconds=window) - now).total_seconds()
                if wait_time > 0:
                    logger.warning(
                        f"Rate limit for {category}, waiting {wait_time:.1f}s"
                    )
                    await asyncio.sleep(wait_time)
            
            # Record this request
            self._buckets[category].append(now)

# Usage in client
class RateLimitedGoToClient:
    def __init__(self, auth_manager: GoToAuthManager):
        self.auth_manager = auth_manager
        self.rate_limiter = RateLimiter()
    
    async def initiate_call(self, *args, **kwargs) -> dict:
        await self.rate_limiter.acquire("call_control")
        return await self._initiate_call(*args, **kwargs)

9.3 Quota Monitoring \

class QuotaMonitor:
    """Monitor API usage and quotas."""
    
    def __init__(self, metrics_client):
        self.metrics = metrics_client
        self._usage = defaultdict(int)
    
    def record_request(self, category: str) -> None:
        """Record an API request."""
        self._usage[category] += 1
        self.metrics.increment(
            "goto_api_requests_total",
            tags={"category": category}
        )
    
    def record_rate_limit(self, category: str) -> None:
        """Record a rate limit hit."""
        self.metrics.increment(
            "goto_rate_limits_total",
            tags={"category": category}
        )
    
    def get_usage_report(self) -> dict:
        """Get current usage statistics."""
        return dict(self._usage)

  1. Security Considerations \

10.1 Credential Storage \

# Credentials should be stored in environment variables or secrets manager
environment:
  GOTO_CLIENT_ID: "${GOTO_CLIENT_ID}"
  GOTO_SERVICE_USERNAME: "${GOTO_SERVICE_USERNAME}"
  GOTO_SERVICE_PASSWORD: "${GOTO_SERVICE_PASSWORD}"

# Never log credentials
logging:
  filters:
    - pattern: "password"
      replacement: "[REDACTED]"
    - pattern: "access_token"
      replacement: "[TOKEN]"

10.2 Token Security \

class SecureTokenManager:
    """Secure handling of OAuth tokens."""
    
    def __init__(self, encryption_key: bytes):
        from cryptography.fernet import Fernet
        self._fernet = Fernet(encryption_key)
    
    def encrypt_token(self, token: str) -> bytes:
        """Encrypt a token for storage."""
        return self._fernet.encrypt(token.encode())
    
    def decrypt_token(self, encrypted: bytes) -> str:
        """Decrypt a stored token."""
        return self._fernet.decrypt(encrypted).decode()

10.3 WebSocket Security \

# Always use WSS (WebSocket Secure)
# Validate message origins
# Implement message signing if needed

async def validate_event(self, event: dict) -> bool:
    """Validate an incoming event."""
    # Check required fields
    required = ["type", "timestamp", "data"]
    if not all(k in event for k in required):
        logger.warning(f"Invalid event structure: {event}")
        return False
    
    # Check timestamp is recent (prevent replay attacks)
    timestamp = datetime.fromisoformat(event["timestamp"].replace("Z", "+00:00"))
    age = datetime.now(timezone.utc) - timestamp
    if age > timedelta(minutes=5):
        logger.warning(f"Stale event: {age}")
        return False
    
    return True

10.4 Audit Logging \

class GoToAuditLogger:
    """Audit logging for GoToConnect operations."""
    
    def __init__(self, logger):
        self.logger = logger
    
    def log_call_operation(
        self,
        operation: str,
        call_id: str,
        tenant_id: str,
        details: dict = None
    ) -> None:
        """Log a call control operation."""
        self.logger.info(
            "goto_operation",
            extra={
                "operation": operation,
                "call_id": call_id,
                "tenant_id": tenant_id,
                "details": details or {},
                "timestamp": datetime.utcnow().isoformat(),
            }
        )
    
    def log_auth_event(
        self,
        event: str,
        success: bool,
        details: dict = None
    ) -> None:
        """Log an authentication event."""
        level = logging.INFO if success else logging.WARNING
        self.logger.log(
            level,
            "goto_auth",
            extra={
                "event": event,
                "success": success,
                "details": details or {},
                "timestamp": datetime.utcnow().isoformat(),
            }
        )

  1. Testing Strategy \

11.1 Mock Server \

# tests/mocks/goto_mock_server.py

from fastapi import FastAPI, HTTPException
from typing import Dict

app = FastAPI()

# In-memory state
calls: Dict[str, dict] = {}
channels: Dict[str, dict] = {}

@app.post("/oauth/token")
async def mock_token():
    return {
        "access_token": "mock_token_12345",
        "token_type": "Bearer",
        "expires_in": 3600,
        "refresh_token": "mock_refresh_12345",
        "account_key": "mock_account",
        "organizer_key": "mock_organizer",
    }

@app.post("/web-calls/v1/calls")
async def mock_create_call(request: dict):
    call_id = f"call_{len(calls) + 1}"
    call = {
        "callId": call_id,
        "state": "dialing",
        "direction": "outbound",
        "from": request.get("callerId"),
        "to": request.get("dialString"),
        "lineId": request.get("lineId"),
        "sdpOffer": "v=0\r\nmock sdp offer\r\n",
        "createdAt": datetime.utcnow().isoformat() + "Z",
    }
    calls[call_id] = call
    return call

@app.post("/web-calls/v1/calls/{call_id}/answer")
async def mock_answer_call(call_id: str, request: dict):
    if call_id not in calls:
        raise HTTPException(404, "Call not found")
    
    calls[call_id]["state"] = "connected"
    calls[call_id]["sdpAnswer"] = request.get("sdpAnswer")
    calls[call_id]["answeredAt"] = datetime.utcnow().isoformat() + "Z"
    
    return calls[call_id]

@app.post("/web-calls/v1/calls/{call_id}/hangup")
async def mock_hangup(call_id: str):
    if call_id not in calls:
        raise HTTPException(404, "Call not found")
    
    calls[call_id]["state"] = "ended"
    calls[call_id]["endedAt"] = datetime.utcnow().isoformat() + "Z"
    
    return calls[call_id]

# Add more mock endpoints as needed...

11.2 Integration Tests \

# tests/integration/test_goto_integration.py

import pytest
from httpx import AsyncClient

@pytest.fixture
async def goto_client():
    """Create a GoTo client for testing."""
    auth_manager = GoToAuthManager(
        client_id="test_client",
        username="test_user",
        password="test_pass",
    )
    return GoToCallControlClient(auth_manager)

@pytest.mark.integration
async def test_call_lifecycle(goto_client):
    """Test complete call lifecycle."""
    # Initiate call
    call = await goto_client.initiate_call(
        dial_string="tel:+15551234567",
        caller_id="+15559876543",
        line_id="line_test",
    )
    assert call.state == CallState.DIALING
    
    # Answer (simulated)
    call = await goto_client.answer_call(
        call.call_id,
        sdp_answer="v=0\r\ntest answer\r\n",
    )
    assert call.state == CallState.CONNECTED
    
    # Hold
    call = await goto_client.hold(call.call_id)
    assert call.state == CallState.HELD
    
    # Resume
    call = await goto_client.resume(call.call_id)
    assert call.state == CallState.CONNECTED
    
    # Hangup
    call = await goto_client.hangup(call.call_id)
    assert call.state == CallState.ENDED

@pytest.mark.integration
async def test_blind_transfer(goto_client):
    """Test blind transfer operation."""
    # Create active call first
    call = await goto_client.initiate_call(
        dial_string="tel:+15551234567",
        caller_id="+15559876543",
        line_id="line_test",
    )
    
    # Transfer
    result = await goto_client.blind_transfer(
        call.call_id,
        dial_string="ext:1001",
    )
    
    assert result.state == CallState.TRANSFERRED

@pytest.mark.integration
async def test_event_subscription(goto_client):
    """Test event subscription flow."""
    event_manager = GoToEventManager(goto_client.auth_manager)
    
    # Create channel
    channel_id, ws_url = await event_manager.create_channel()
    assert channel_id is not None
    assert ws_url.startswith("wss://")
    
    # Subscribe
    sub_id = await event_manager.subscribe_to_calls(
        event_types=["call.ringing", "call.connected"],
    )
    assert sub_id is not None
    
    # Cleanup
    await event_manager.unsubscribe(sub_id)

11.3 Unit Tests \

# tests/unit/test_goto_auth.py

import pytest
from unittest.mock import AsyncMock, patch

@pytest.mark.asyncio
async def test_token_caching():
    """Test that tokens are cached."""
    auth = GoToAuthManager(
        client_id="test",
        username="test",
        password="test",
    )
    
    with patch.object(auth, '_authenticate', new_callable=AsyncMock) as mock_auth:
        mock_auth.return_value = None
        auth._access_token = "cached_token"
        auth._expires_at = datetime.utcnow() + timedelta(hours=1)
        
        token = await auth.get_access_token()
        
        assert token == "cached_token"
        mock_auth.assert_not_called()

@pytest.mark.asyncio
async def test_token_refresh_on_expiry():
    """Test that expired tokens trigger refresh."""
    auth = GoToAuthManager(
        client_id="test",
        username="test",
        password="test",
    )
    
    auth._access_token = "old_token"
    auth._refresh_token = "refresh_token"
    auth._expires_at = datetime.utcnow() - timedelta(hours=1)  # Expired
    
    with patch.object(auth, '_refresh_access_token', new_callable=AsyncMock) as mock_refresh:
        mock_refresh.return_value = None
        auth._access_token = "new_token"
        auth._expires_at = datetime.utcnow() + timedelta(hours=1)
        
        token = await auth.get_access_token()
        
        mock_refresh.assert_called_once()

  1. Implementation Guide \

12.1 Setup Checklist \

## GoToConnect Integration Setup

### Prerequisites
- [ ] GoTo Developer Portal account created
- [ ] OAuth application registered
- [ ] Service user created in GoToConnect Admin
- [ ] Required scopes granted to application

### Configuration
- [ ] Client ID stored in environment
- [ ] Service user credentials stored securely
- [ ] Line IDs mapped to tenants
- [ ] Phone numbers registered in database

### Integration
- [ ] Auth manager implemented and tested
- [ ] Call control client implemented
- [ ] Event subscription system implemented
- [ ] WebRTC bridge connected

### Monitoring
- [ ] API request metrics configured
- [ ] Error alerting set up
- [ ] Audit logging enabled
- [ ] Rate limit monitoring in place

### Testing
- [ ] Unit tests passing
- [ ] Integration tests passing
- [ ] End-to-end call test successful
- [ ] Transfer test successful

12.2 Configuration Template \

# config/gotoconnect.yaml

gotoconnect:
  # Authentication
  auth:
    token_url: "https://authentication.logmeininc.com/oauth/token"
    client_id: "${GOTO_CLIENT_ID}"
    service_user: "${GOTO_SERVICE_USERNAME}"
    service_password: "${GOTO_SERVICE_PASSWORD}"
  
  # API endpoints
  api:
    web_calls_base: "https://api.goto.com/web-calls/v1"
    call_events_base: "https://api.goto.com/call-events/v1"
    users_base: "https://api.goto.com/users/v1"
  
  # WebRTC settings
  webrtc:
    ice_servers:
      - urls: "stun:stun.l.google.com:19302"
    codec_preference:
      - opus
      - PCMU
      - PCMA
  
  # Event subscription
  events:
    event_types:
      - "call.ringing"
      - "call.connected"
      - "call.ended"
      - "call.held"
      - "call.resumed"
      - "call.transferred"
      - "call.dtmf"
      - "call.ice_candidate"
    reconnect_delay_initial: 1
    reconnect_delay_max: 300
  
  # Timeouts
  timeouts:
    api_request: 30
    websocket_ping: 30
    call_setup: 30
  
  # Rate limiting
  rate_limits:
    call_control_per_minute: 100
    auth_per_minute: 10

12.3 Service Initialization \

# services/webrtc_bridge/main.py

import asyncio
from contextlib import asynccontextmanager

from integrations.gotoconnect.auth import GoToAuthManager
from integrations.gotoconnect.call_control import GoToCallControlClient
from integrations.gotoconnect.events import GoToEventManager, GoToEventListener
from config import settings

@asynccontextmanager
async def lifespan(app):
    """Application lifespan manager."""
    
    # Initialize GoToConnect integration
    auth_manager = GoToAuthManager(
        client_id=settings.goto_client_id,
        username=settings.goto_service_username,
        password=settings.goto_service_password,
    )
    
    # Pre-authenticate
    await auth_manager.get_access_token()
    logger.info("GoToConnect authentication successful")
    
    # Initialize call control client
    call_client = GoToCallControlClient(auth_manager)
    
    # Initialize event manager
    event_manager = GoToEventManager(auth_manager)
    await event_manager.create_channel()
    await event_manager.subscribe_to_calls()
    logger.info("GoToConnect event subscription active")
    
    # Start event listener
    event_listener = GoToEventListener(
        event_manager._websocket_url,
        auth_manager,
    )
    
    listener_task = asyncio.create_task(event_listener.start())
    
    # Store in app state
    app.state.goto_auth = auth_manager
    app.state.goto_calls = call_client
    app.state.goto_events = event_manager
    
    yield
    
    # Cleanup
    await event_listener.stop()
    listener_task.cancel()
    logger.info("GoToConnect integration shut down")

app = FastAPI(lifespan=lifespan)

  1. Troubleshooting \

13.1 Common Issues \

Authentication Failures \

SymptomPossible CauseSolution
401 on token requestInvalid credentialsVerify client ID, username, password
403 after authenticationMissing scopesCheck OAuth app scope configuration
Token expires immediatelyClock skewSync server time with NTP
Refresh token failsToken revokedRe-authenticate with password

WebRTC Issues \

SymptomPossible CauseSolution
No audioICE failureCheck firewall, TURN server
One-way audioSDP mismatchVerify codec compatibility
Audio dropsNetwork instabilityImplement reconnection logic
Echo/feedbackAudio routingCheck duplex settings

Event Subscription Issues \

SymptomPossible CauseSolution
No events receivedWebSocket disconnectedCheck reconnection logic
Duplicate eventsMultiple subscriptionsDeduplicate by event ID
Missing eventsSubscription expiredRefresh subscriptions
Events delayedNetwork latencyMonitor WebSocket health

13.2 Diagnostic Commands \

async def diagnose_goto_connection() -> dict:
    """Run diagnostics on GoToConnect integration."""
    results = {
        "auth": None,
        "api": None,
        "websocket": None,
        "lines": None,
    }
    
    # Test authentication
    try:
        token = await auth_manager.get_access_token()
        results["auth"] = {"status": "ok", "token_length": len(token)}
    except Exception as e:
        results["auth"] = {"status": "error", "error": str(e)}
    
    # Test API connectivity
    try:
        lines = await call_client.list_lines()
        results["api"] = {"status": "ok", "line_count": len(lines)}
    except Exception as e:
        results["api"] = {"status": "error", "error": str(e)}
    
    # Test WebSocket
    try:
        channel_id, ws_url = await event_manager.create_channel()
        results["websocket"] = {
            "status": "ok",
            "channel_id": channel_id,
        }
    except Exception as e:
        results["websocket"] = {"status": "error", "error": str(e)}
    
    # List available lines
    results["lines"] = lines if results["api"]["status"] == "ok" else []
    
    return results

13.3 Debug Logging \

# Enable debug logging for GoToConnect integration
import logging

# Set specific loggers to DEBUG
logging.getLogger("integrations.gotoconnect").setLevel(logging.DEBUG)
logging.getLogger("httpx").setLevel(logging.DEBUG)
logging.getLogger("websockets").setLevel(logging.DEBUG)

# Example debug output
# DEBUG:integrations.gotoconnect.auth:Requesting new access token
# DEBUG:httpx:POST https://authentication.logmeininc.com/oauth/token
# DEBUG:httpx:Response: 200 OK
# DEBUG:integrations.gotoconnect.auth:Token expires at 2026-01-16T11:30:00Z
# DEBUG:integrations.gotoconnect.events:WebSocket connected to wss://realtime.goto.com/...
# DEBUG:integrations.gotoconnect.events:Received event: call.ringing

  1. API Reference Summary \

14.1 Authentication API \

MethodEndpointDescription
POST/oauth/tokenObtain access token

14.2 Web Calls API \

MethodEndpointDescription
POST/callsInitiate outbound call
POST/calls/{id}/answerAnswer inbound call
POST/calls/{id}/hangupEnd call
PUT/calls/{id}/holdPlace on hold
PUT/calls/{id}/resumeResume from hold
PUT/calls/{id}/muteMute microphone
PUT/calls/{id}/unmuteUnmute microphone
POST/calls/{id}/dtmfSend DTMF tones
POST/calls/{id}/blind-transferBlind transfer
POST/calls/{id}/warm-transferWarm transfer
POST/calls/{id}/mergeMerge into conference
POST/calls/{id}/ice-candidatesSend ICE candidate

14.3 Call Events API \

MethodEndpointDescription
POST/notifications/channelsCreate event channel
POST/notifications/subscriptionsSubscribe to events
DELETE/notifications/subscriptions/{id}Unsubscribe

14.4 Users/Lines API \

MethodEndpointDescription
GET/accounts/{id}/linesList lines
GET/accounts/{id}/lines/{lineId}Get line details
GET/accounts/{id}/usersList users

## Appendix A: SDP Templates {#appendix-a:-sdp-templates}

### A.1 Minimal SDP Offer {#a.1-minimal-sdp-offer}

v=0
o=- 1234567890 2 IN IP4 0.0.0.0
s=-
t=0 0
a=group:BUNDLE audio
m=audio 9 UDP/TLS/RTP/SAVPF 111
c=IN IP4 0.0.0.0
a=rtcp:9 IN IP4 0.0.0.0
a=ice-ufrag:xxxx
a=ice-pwd:xxxxxxxxxxxxxxxxxxxxxxxx
a=fingerprint:sha-256 XX:XX:XX:...
a=setup:actpass
a=mid:audio
a=sendrecv
a=rtcp-mux
a=rtpmap:111 opus/48000/2
a=fmtp:111 minptime=10;useinbandfec=1

A.2 Full SDP Offer (GoToConnect) \

See Section 4.3.1 for complete example.
## Appendix B: Event Schemas {#appendix-b:-event-schemas}

### B.1 Common Event Structure {#b.1-common-event-structure}

{
  "type": "string",
  "timestamp": "ISO 8601 datetime",
  "data": {
    "callId": "string",
    // Additional fields per event type
  }
}

B.2 Event Type Reference \

See Section 6.5 for complete event schemas.

Document History \

VersionDateAuthorChanges
1.02026-01-16ClaudeInitial document

End of Document
Last modified on April 17, 2026