Normalized for Mintlify from
knowledge-base/aiconnected-apps-and-modules/modules/aiConnected-voice/GoToConnect-integration-spec.mdx.Voice by aiConnected — GoToConnect Integration Specification \
Document Information \
| Field | Value |
|---|---|
| Document ID | ARCH-002 |
| Version | 1.0 |
| Last Updated | 2026-01-16 |
| Status | Draft |
| Owner | Engineering |
| Dependencies | ARCH-001 (System Architecture Overview) |
Table of Contents \
Voice by aiConnected — GoToConnect Integration Specification Document Information Table of Contents 1. Introduction 1.1 Purpose 1.2 Scope 1.3 Prerequisites 1.4 API Versions 2. GoToConnect Platform Overview 2.1 Architecture Context 2.2 Key Concepts 2.2.1 Lines and Extensions 2.2.2 Users and Accounts 2.2.3 Web Calls 2.2.4 Call Events 2.3 Our Integration Points 3. Authentication and Authorization 3.1 OAuth 2.0 Overview 3.2 OAuth Application Setup 3.2.1 Register Application 3.2.2 Create Service User 3.3 Token Management 3.3.1 Token Request 3.3.2 Token Response Structure 3.4 Required Scopes 3.5 Authentication Error Handling 4. WebRTC Integration 4.1 WebRTC Overview 4.2 Web Calls API 4.2.1 API Base Configuration 4.2.2 Initiate Outbound Call 4.2.3 Answer Inbound Call 4.3 SDP Exchange 4.3.1 SDP Offer Structure (from GoToConnect) 4.3.2 SDP Answer Generation 4.3.3 Codec Preferences 4.4 ICE Candidate Exchange 4.4.1 Trickle ICE 4.4.2 Handling Remote ICE Candidates 4.5 Audio Stream Handling 4.5.1 Receiving Audio from GoToConnect 4.5.2 Sending Audio to GoToConnect 5. Call Control API 5.1 Call Control Operations 5.1.1 Hold Call 5.1.2 Resume Call 5.1.3 Mute/Unmute 5.1.4 Send DTMF 5.1.5 Hang Up 5.2 Transfer Operations 5.2.1 Blind Transfer 5.2.2 Warm Transfer (Attended Transfer) 5.2.3 Conference (Merge Calls) 5.3 Call Control State Machine 5.4 Call Control Client 6. Event Subscriptions 6.1 Event System Overview 6.2 Create Notification Channel 6.3 Subscribe to Events 6.4 WebSocket Connection 6.5 Event Types 6.5.1 call.ringing 6.5.2 call.connected 6.5.3 call.ended 6.5.4 call.held 6.5.5 call.resumed 6.5.6 call.transferred 6.5.7 call.ice_candidate 6.5.8 call.dtmf 6.6 Event Handler Implementation 6.7 Subscription Management 7. Phone Number Management 7.1 Lines API 7.1.1 List Lines 7.1.2 Get Line Details 7.2 Phone Number to Line Mapping 8. Error Handling 8.1 Error Categories 8.2 Error Response Handling 8.3 Retry Logic 8.4 WebSocket Reconnection 9. Rate Limits and Quotas 9.1 GoToConnect Rate Limits 9.2 Rate Limit Handling 9.3 Quota Monitoring 10. Security Considerations 10.1 Credential Storage 10.2 Token Security 10.3 WebSocket Security 10.4 Audit Logging 11. Testing Strategy 11.1 Mock Server 11.2 Integration Tests 11.3 Unit Tests 12. Implementation Guide 12.1 Setup Checklist 12.2 Configuration Template 12.3 Service Initialization 13. Troubleshooting 13.1 Common Issues Authentication Failures WebRTC Issues Event Subscription Issues 13.2 Diagnostic Commands 13.3 Debug Logging 14. API Reference Summary 14.1 Authentication API 14.2 Web Calls API 14.3 Call Events API 14.4 Users/Lines API Appendix A: SDP Templates A.1 Minimal SDP Offer A.2 Full SDP Offer (GoToConnect) Appendix B: Event Schemas B.1 Common Event Structure B.2 Event Type Reference Document History
- Introduction \
1.1 Purpose \
This document provides a comprehensive specification for integrating Voice by aiConnected with GoToConnect’s telephony platform. It covers authentication, WebRTC session management, call control operations, and real-time event handling. GoToConnect serves as our PSTN gateway, providing:- Inbound and outbound call connectivity
- WebRTC-based audio transport
- Call control operations (transfer, hold, merge)
- Real-time call event notifications
1.2 Scope \
This document covers:- OAuth 2.0 authentication flow with GoToConnect
- WebRTC session establishment and management
- Complete call control API mapping
- WebSocket-based event subscription system
- Error handling and recovery patterns
- Security and compliance considerations
- Internal service architecture (see ARCH-001)
- LiveKit integration (see ARCH-003)
- Voice pipeline implementation (see ARCH-004)
1.3 Prerequisites \
Before implementing this integration, ensure you have:- GoToConnect account with API access enabled
- OAuth application registered in GoTo Developer Portal
- Understanding of WebRTC fundamentals
- Familiarity with OAuth 2.0 flows
1.4 API Versions \
| API | Version | Base URL |
|---|---|---|
| Authentication | v2 | https://authentication.logmeininc.com |
| Web Calls | v1 | https://api.goto.com/web-calls/v1 |
| Call Events | v1 | https://api.goto.com/call-events/v1 |
| Users | v1 | https://api.goto.com/users/v1 |
| Lines | v1 | https://api.goto.com/lines/v1 |
- GoToConnect Platform Overview \
2.1 Architecture Context \
┌─────────────────────────────────────────────────────────────────────────────┐
│ GOTOCONNECT INTEGRATION CONTEXT │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ PSTN Network │
│ │ │
│ │ Phone calls │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ GOTOCONNECT CLOUD │ │
│ │ │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ PBX │ │ WebRTC │ │ Event │ │ │
│ │ │ (Jive) │ │ Gateway │ │ System │ │ │
│ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │
│ │ │ │ │ │ │
│ │ │ │ │ │ │
│ │ ┌──────▼─────────────────▼─────────────────▼──────┐ │ │
│ │ │ API LAYER │ │ │
│ │ │ │ │ │
│ │ │ • REST APIs (call control, users, lines) │ │ │
│ │ │ • WebRTC signaling (SDP exchange) │ │ │
│ │ │ • WebSocket (real-time events) │ │ │
│ │ │ │ │ │
│ │ └──────────────────────┬───────────────────────────┘ │ │
│ │ │ │ │
│ └─────────────────────────┼────────────────────────────────────────────┘ │
│ │ │
│ │ HTTPS / WSS │
│ │ │
│ ┌─────────────────────────▼────────────────────────────────────────────┐ │
│ │ VOICE BY AICONNECTED │ │
│ │ │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ OAuth │ │ WebRTC │ │ Event │ │ │
│ │ │ Manager │ │ Bridge │ │ Subscriber │ │ │
│ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │
│ │ │ │
│ └──────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
2.2 Key Concepts \
2.2.1 Lines and Extensions \
In GoToConnect, a line represents a phone endpoint:- Each line has a unique extension number
- Lines can have one or more associated phone numbers (DIDs)
- Our AI agents will be assigned to specific lines
2.2.2 Users and Accounts \
- Account: The top-level organizational entity
- User: An individual with login credentials
- Service User: A programmatic user for API access (what we use)
2.2.3 Web Calls \
GoToConnect’s Web Calls API enables browser-based calling:- Provides WebRTC signaling endpoints
- Manages call state via REST
- Supports full call control (hold, transfer, merge)
2.2.4 Call Events \
Real-time notifications delivered via WebSocket:- Call state changes (ringing, connected, ended)
- DTMF tones
- Recording status
- Error conditions
2.3 Our Integration Points \
| Integration Point | Purpose | Protocol |
|---|---|---|
| OAuth API | Authentication | HTTPS |
| Web Calls API | Call control, WebRTC signaling | HTTPS |
| Call Events API | Subscription management | HTTPS |
| Event WebSocket | Real-time notifications | WSS |
| Users API | User/line lookup | HTTPS |
| Lines API | Line configuration | HTTPS |
- Authentication and Authorization \
3.1 OAuth 2.0 Overview \
GoToConnect uses OAuth 2.0 for API authentication. For server-to-server integration, we use the Client Credentials flow with a Service User.┌─────────────────────────────────────────────────────────────────────────────┐
│ OAUTH 2.0 CLIENT CREDENTIALS FLOW │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ Voice by │ │ GoTo │ │
│ │ aiConnected │ │ Auth API │ │
│ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │
│ │ 1. POST /oauth/token │ │
│ │ grant_type=password │ │
│ │ username={service_user} │ │
│ │ password={service_password} │ │
│ │ client_id={client_id} │ │
│ │────────────────────────────────────────────▶│ │
│ │ │ │
│ │ 2. 200 OK │ │
│ │ { │ │
│ │ "access_token": "eyJ...", │ │
│ │ "token_type": "Bearer", │ │
│ │ "expires_in": 3600, │ │
│ │ "refresh_token": "abc...", │ │
│ │ "account_key": "123...", │ │
│ │ "organizer_key": "456..." │ │
│ │ } │ │
│ │◀────────────────────────────────────────────│ │
│ │ │ │
│ │ 3. API Request │ │
│ │ Authorization: Bearer eyJ... │ │
│ │────────────────────────────────────────────▶│ │
│ │ │ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
3.2 OAuth Application Setup \
3.2.1 Register Application \
- Navigate to GoTo Developer Portal
- Create a new OAuth application
- Configure the following settings:
application:
name: "Voice by aiConnected"
description: "AI Voice Agent Platform"
redirect_uris:
- "https://api.aiconnected.io/oauth/callback" # Not used for client credentials
scopes:
# Required scopes
- "webrtc.v1.write" # WebRTC call management
- "call-events.v1.notifications.manage" # Event subscriptions
- "call-control.v1.calls.write" # Call control operations
- "users.v1.lines.read" # Read line information
- "users.v1.users.read" # Read user information
grant_types:
- "password" # For service user authentication
- "refresh_token" # For token refresh
3.2.2 Create Service User \
In GoToConnect Admin Portal:- Create a dedicated user for API access
-
Assign appropriate permissions:
- Make and receive calls
- Access to required lines/extensions
- API access enabled
- Store credentials securely:
service_user:
username: "aiconnected-service@yourdomain.com"
password: "${GOTO_SERVICE_PASSWORD}" # Stored in secrets manager
3.3 Token Management \
3.3.1 Token Request \
# integrations/gotoconnect/auth.py
import httpx
from datetime import datetime, timedelta
from typing import Optional
import asyncio
class GoToAuthManager:
"""Manages OAuth tokens for GoToConnect API access."""
AUTH_URL = "https://authentication.logmeininc.com/oauth/token"
def __init__(
self,
client_id: str,
username: str,
password: str
):
self.client_id = client_id
self.username = username
self.password = password
self._access_token: Optional[str] = None
self._refresh_token: Optional[str] = None
self._expires_at: Optional[datetime] = None
self._account_key: Optional[str] = None
self._organizer_key: Optional[str] = None
self._lock = asyncio.Lock()
async def get_access_token(self) -> str:
"""Get a valid access token, refreshing if necessary."""
async with self._lock:
if self._is_token_valid():
return self._access_token
if self._refresh_token:
try:
await self._refresh_access_token()
return self._access_token
except Exception:
pass # Fall through to full auth
await self._authenticate()
return self._access_token
def _is_token_valid(self) -> bool:
"""Check if current token is valid with buffer."""
if not self._access_token or not self._expires_at:
return False
# Refresh 5 minutes before expiry
return datetime.utcnow() < (self._expires_at - timedelta(minutes=5))
async def _authenticate(self) -> None:
"""Perform full authentication with username/password."""
async with httpx.AsyncClient() as client:
response = await client.post(
self.AUTH_URL,
data={
"grant_type": "password",
"username": self.username,
"password": self.password,
"client_id": self.client_id,
},
headers={
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/json",
}
)
response.raise_for_status()
self._process_token_response(response.json())
async def _refresh_access_token(self) -> None:
"""Refresh the access token using refresh token."""
async with httpx.AsyncClient() as client:
response = await client.post(
self.AUTH_URL,
data={
"grant_type": "refresh_token",
"refresh_token": self._refresh_token,
"client_id": self.client_id,
},
headers={
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/json",
}
)
response.raise_for_status()
self._process_token_response(response.json())
def _process_token_response(self, data: dict) -> None:
"""Process and store token response."""
self._access_token = data["access_token"]
self._refresh_token = data.get("refresh_token")
self._account_key = data.get("account_key")
self._organizer_key = data.get("organizer_key")
expires_in = data.get("expires_in", 3600)
self._expires_at = datetime.utcnow() + timedelta(seconds=expires_in)
@property
def account_key(self) -> Optional[str]:
"""Get the account key from authentication."""
return self._account_key
@property
def organizer_key(self) -> Optional[str]:
"""Get the organizer key (user key) from authentication."""
return self._organizer_key
3.3.2 Token Response Structure \
{
"access_token": "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9...",
"token_type": "Bearer",
"expires_in": 3600,
"refresh_token": "abc123def456...",
"scope": "webrtc.v1.write call-events.v1.notifications.manage ...",
"account_key": "1234567890",
"organizer_key": "9876543210",
"principal": "aiconnected-service@yourdomain.com"
}
3.4 Required Scopes \
| Scope | Purpose | Required For |
|---|---|---|
webrtc.v1.write | Create and manage WebRTC calls | All call operations |
call-events.v1.notifications.manage | Subscribe to call events | Real-time notifications |
call-control.v1.calls.write | Transfer, hold, merge calls | Call control features |
users.v1.lines.read | Read line/extension info | Line lookup |
users.v1.users.read | Read user info | User validation |
3.5 Authentication Error Handling \
class GoToAuthError(Exception):
"""Base exception for GoTo authentication errors."""
pass
class InvalidCredentialsError(GoToAuthError):
"""Raised when credentials are invalid."""
pass
class TokenExpiredError(GoToAuthError):
"""Raised when token has expired and refresh failed."""
pass
class InsufficientScopesError(GoToAuthError):
"""Raised when token lacks required scopes."""
pass
# Error handling in auth manager
async def _authenticate(self) -> None:
try:
async with httpx.AsyncClient() as client:
response = await client.post(self.AUTH_URL, ...)
if response.status_code == 400:
error_data = response.json()
if error_data.get("error") == "invalid_grant":
raise InvalidCredentialsError(
"Invalid username or password"
)
raise GoToAuthError(f"Auth failed: {error_data}")
if response.status_code == 401:
raise InvalidCredentialsError("Authentication failed")
response.raise_for_status()
self._process_token_response(response.json())
except httpx.HTTPError as e:
raise GoToAuthError(f"HTTP error during authentication: {e}")
- WebRTC Integration \
4.1 WebRTC Overview \
GoToConnect provides WebRTC endpoints for browser-based calling. Our WebRTC Bridge uses these to establish audio connections with callers.┌─────────────────────────────────────────────────────────────────────────────┐
│ WEBRTC CALL FLOW │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ PSTN Caller GoToConnect WebRTC Bridge LiveKit │
│ │ │ │ │ │
│ │ 1. Inbound call │ │ │ │
│ │───────────────────▶│ │ │ │
│ │ │ │ │ │
│ │ │ 2. Event: call.ringing │ │
│ │ │─────────────────────▶│ │ │
│ │ │ │ │ │
│ │ │ 3. POST /calls/{id}/answer │ │
│ │ │◀─────────────────────│ │ │
│ │ │ │ │ │
│ │ │ 4. SDP Offer │ │ │
│ │ │─────────────────────▶│ │ │
│ │ │ │ │ │
│ │ │ │ 5. Create room │ │
│ │ │ │────────────────▶│ │
│ │ │ │ │ │
│ │ │ 6. SDP Answer │ │ │
│ │ │◀─────────────────────│ │ │
│ │ │ │ │ │
│ │ │ 7. ICE Candidates │ │ │
│ │ │◀────────────────────▶│ │ │
│ │ │ │ │ │
│ │ 8. Audio stream │ 9. Audio stream │ 10. Audio │ │
│ │◀──────────────────▶│◀────────────────────▶│◀───────────────▶│ │
│ │ │ │ │ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
4.2 Web Calls API \
4.2.1 API Base Configuration \
# integrations/gotoconnect/config.py
WEB_CALLS_BASE_URL = "https://api.goto.com/web-calls/v1"
# Endpoints
ENDPOINTS = {
"calls": f"{WEB_CALLS_BASE_URL}/calls",
"call": f"{WEB_CALLS_BASE_URL}/calls/{{call_id}}",
"answer": f"{WEB_CALLS_BASE_URL}/calls/{{call_id}}/answer",
"hangup": f"{WEB_CALLS_BASE_URL}/calls/{{call_id}}/hangup",
"hold": f"{WEB_CALLS_BASE_URL}/calls/{{call_id}}/hold",
"resume": f"{WEB_CALLS_BASE_URL}/calls/{{call_id}}/resume",
"mute": f"{WEB_CALLS_BASE_URL}/calls/{{call_id}}/mute",
"unmute": f"{WEB_CALLS_BASE_URL}/calls/{{call_id}}/unmute",
"dtmf": f"{WEB_CALLS_BASE_URL}/calls/{{call_id}}/dtmf",
"blind_transfer": f"{WEB_CALLS_BASE_URL}/calls/{{call_id}}/blind-transfer",
"warm_transfer": f"{WEB_CALLS_BASE_URL}/calls/{{call_id}}/warm-transfer",
"merge": f"{WEB_CALLS_BASE_URL}/calls/{{call_id}}/merge",
"ice_candidates": f"{WEB_CALLS_BASE_URL}/calls/{{call_id}}/ice-candidates",
}
4.2.2 Initiate Outbound Call \
async def initiate_call(
self,
dial_string: str,
caller_id: str,
line_id: str
) -> dict:
"""
Initiate an outbound call.
Args:
dial_string: Number to call (e.g., "tel:+15551234567")
caller_id: Caller ID to display
line_id: Line/extension to call from
Returns:
Call object with ID and initial SDP offer
"""
token = await self.auth_manager.get_access_token()
async with httpx.AsyncClient() as client:
response = await client.post(
ENDPOINTS["calls"],
json={
"dialString": dial_string,
"callerId": caller_id,
"lineId": line_id,
},
headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
"Accept": "application/json",
}
)
response.raise_for_status()
return response.json()
POST /web-calls/v1/calls
{
"dialString": "tel:+15551234567",
"callerId": "+15559876543",
"lineId": "line_abc123"
}
{
"callId": "call_xyz789",
"state": "dialing",
"direction": "outbound",
"from": "+15559876543",
"to": "+15551234567",
"lineId": "line_abc123",
"sdpOffer": "v=0\r\no=- 123456789 2 IN IP4 127.0.0.1\r\n...",
"createdAt": "2026-01-16T10:30:00Z"
}
4.2.3 Answer Inbound Call \
async def answer_call(
self,
call_id: str,
sdp_answer: str
) -> dict:
"""
Answer an inbound call with SDP answer.
Args:
call_id: The call to answer
sdp_answer: Our SDP answer
Returns:
Updated call object
"""
token = await self.auth_manager.get_access_token()
endpoint = ENDPOINTS["answer"].format(call_id=call_id)
async with httpx.AsyncClient() as client:
response = await client.post(
endpoint,
json={
"sdpAnswer": sdp_answer,
},
headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
"Accept": "application/json",
}
)
response.raise_for_status()
return response.json()
POST /web-calls/v1/calls/{call_id}/answer
{
"sdpAnswer": "v=0\r\no=- 987654321 2 IN IP4 127.0.0.1\r\n..."
}
{
"callId": "call_xyz789",
"state": "connected",
"direction": "inbound",
"from": "+15551234567",
"to": "+15559876543",
"lineId": "line_abc123",
"answeredAt": "2026-01-16T10:30:05Z"
}
4.3 SDP Exchange \
4.3.1 SDP Offer Structure (from GoToConnect) \
v=0
o=- 1234567890 2 IN IP4 0.0.0.0
s=-
t=0 0
a=group:BUNDLE audio
a=msid-semantic: WMS
m=audio 9 UDP/TLS/RTP/SAVPF 111 103 104 9 0 8 106 105 13 110 112 113 126
c=IN IP4 0.0.0.0
a=rtcp:9 IN IP4 0.0.0.0
a=ice-ufrag:abcd
a=ice-pwd:efghijklmnopqrstuvwxyz
a=ice-options:trickle
a=fingerprint:sha-256 AA:BB:CC:DD:EE:FF:...
a=setup:actpass
a=mid:audio
a=extmap:1 urn:ietf:params:rtp-hdrext:ssrc-audio-level
a=sendrecv
a=rtcp-mux
a=rtpmap:111 opus/48000/2
a=rtcp-fb:111 transport-cc
a=fmtp:111 minptime=10;useinbandfec=1
a=rtpmap:103 ISAC/16000
a=rtpmap:104 ISAC/32000
a=rtpmap:9 G722/8000
a=rtpmap:0 PCMU/8000
a=rtpmap:8 PCMA/8000
4.3.2 SDP Answer Generation \
from aiortc import RTCPeerConnection, RTCSessionDescription
from aiortc.contrib.media import MediaPlayer, MediaRecorder
async def create_sdp_answer(sdp_offer: str) -> tuple[RTCPeerConnection, str]:
"""
Create an SDP answer for a GoToConnect offer.
Args:
sdp_offer: The SDP offer from GoToConnect
Returns:
Tuple of (peer_connection, sdp_answer)
"""
# Create peer connection with our configuration
pc = RTCPeerConnection(configuration={
"iceServers": [
{"urls": "stun:stun.l.google.com:19302"},
]
})
# Set the remote description (offer from GoToConnect)
offer = RTCSessionDescription(sdp=sdp_offer, type="offer")
await pc.setRemoteDescription(offer)
# Create and set local description (our answer)
answer = await pc.createAnswer()
await pc.setLocalDescription(answer)
return pc, pc.localDescription.sdp
4.3.3 Codec Preferences \
GoToConnect supports multiple audio codecs. We prefer Opus for quality:CODEC_PREFERENCES = [
{
"name": "opus",
"clockRate": 48000,
"channels": 2,
"priority": 1,
"params": {
"minptime": "10",
"useinbandfec": "1",
}
},
{
"name": "PCMU", # G.711 μ-law (fallback)
"clockRate": 8000,
"channels": 1,
"priority": 2,
},
{
"name": "PCMA", # G.711 A-law (fallback)
"clockRate": 8000,
"channels": 1,
"priority": 3,
},
]
4.4 ICE Candidate Exchange \
4.4.1 Trickle ICE \
GoToConnect supports trickle ICE, allowing candidates to be exchanged incrementally:async def send_ice_candidate(
self,
call_id: str,
candidate: dict
) -> None:
"""
Send an ICE candidate to GoToConnect.
Args:
call_id: The call ID
candidate: ICE candidate object
"""
token = await self.auth_manager.get_access_token()
endpoint = ENDPOINTS["ice_candidates"].format(call_id=call_id)
async with httpx.AsyncClient() as client:
response = await client.post(
endpoint,
json={
"candidate": candidate["candidate"],
"sdpMid": candidate["sdpMid"],
"sdpMLineIndex": candidate["sdpMLineIndex"],
},
headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}
)
response.raise_for_status()
POST /web-calls/v1/calls/{call_id}/ice-candidates
{
"candidate": "candidate:1 1 UDP 2130706431 192.168.1.100 54321 typ host",
"sdpMid": "audio",
"sdpMLineIndex": 0
}
4.4.2 Handling Remote ICE Candidates \
ICE candidates from GoToConnect arrive via WebSocket events:async def handle_ice_candidate_event(
self,
event: dict,
peer_connection: RTCPeerConnection
) -> None:
"""
Handle an ICE candidate event from GoToConnect.
Args:
event: The ICE candidate event
peer_connection: Our RTCPeerConnection
"""
candidate_data = event.get("candidate")
if candidate_data:
from aiortc import RTCIceCandidate
candidate = RTCIceCandidate(
candidate=candidate_data["candidate"],
sdpMid=candidate_data["sdpMid"],
sdpMLineIndex=candidate_data["sdpMLineIndex"],
)
await peer_connection.addIceCandidate(candidate)
4.5 Audio Stream Handling \
4.5.1 Receiving Audio from GoToConnect \
from aiortc import MediaStreamTrack
class GoToAudioReceiver:
"""Receives audio from GoToConnect and forwards to LiveKit."""
def __init__(
self,
peer_connection: RTCPeerConnection,
livekit_room: Room
):
self.pc = peer_connection
self.livekit_room = livekit_room
# Set up track handler
self.pc.on("track", self._on_track)
async def _on_track(self, track: MediaStreamTrack) -> None:
"""Handle incoming audio track from GoToConnect."""
if track.kind != "audio":
return
logger.info(f"Received audio track from GoToConnect: {track.id}")
# Create a track forwarder to LiveKit
forwarder = AudioTrackForwarder(track, self.livekit_room)
await forwarder.start()
class AudioTrackForwarder:
"""Forwards audio frames from GoToConnect to LiveKit."""
def __init__(
self,
source_track: MediaStreamTrack,
livekit_room: Room
):
self.source_track = source_track
self.livekit_room = livekit_room
self._running = False
async def start(self) -> None:
"""Start forwarding audio frames."""
self._running = True
# Create LiveKit audio source
audio_source = rtc.AudioSource(
sample_rate=48000,
num_channels=1
)
# Publish track to LiveKit
track = rtc.LocalAudioTrack.create_audio_track(
"caller-audio",
audio_source
)
await self.livekit_room.local_participant.publish_track(track)
# Forward frames
while self._running:
try:
frame = await self.source_track.recv()
# Convert frame format if needed
audio_frame = self._convert_frame(frame)
# Push to LiveKit
await audio_source.capture_frame(audio_frame)
except MediaStreamError:
logger.info("Audio track ended")
break
def _convert_frame(self, frame) -> rtc.AudioFrame:
"""Convert aiortc frame to LiveKit frame."""
# Frame conversion logic
return rtc.AudioFrame(
data=frame.to_ndarray().tobytes(),
sample_rate=frame.sample_rate,
num_channels=frame.layout.channels,
samples_per_channel=frame.samples,
)
async def stop(self) -> None:
"""Stop forwarding."""
self._running = False
4.5.2 Sending Audio to GoToConnect \
class GoToAudioSender:
"""Sends audio from LiveKit to GoToConnect."""
def __init__(
self,
peer_connection: RTCPeerConnection,
livekit_room: Room
):
self.pc = peer_connection
self.livekit_room = livekit_room
self._audio_track = None
async def start(self) -> None:
"""Start sending audio to GoToConnect."""
# Create an audio track for GoToConnect
self._audio_track = AudioStreamTrack()
# Add track to peer connection
self.pc.addTrack(self._audio_track)
# Subscribe to AI agent's audio in LiveKit
@self.livekit_room.on("track_subscribed")
async def on_track_subscribed(
track: rtc.Track,
publication: rtc.TrackPublication,
participant: rtc.RemoteParticipant
):
if track.kind == rtc.TrackKind.KIND_AUDIO:
# Forward AI audio to GoToConnect
async for frame in track:
await self._audio_track.send_frame(frame)
class AudioStreamTrack(MediaStreamTrack):
"""Custom audio track for sending to GoToConnect."""
kind = "audio"
def __init__(self):
super().__init__()
self._queue = asyncio.Queue()
async def send_frame(self, frame: rtc.AudioFrame) -> None:
"""Queue a frame for sending."""
await self._queue.put(frame)
async def recv(self):
"""Receive next frame to send."""
frame = await self._queue.get()
return self._convert_to_aiortc_frame(frame)
def _convert_to_aiortc_frame(self, frame: rtc.AudioFrame):
"""Convert LiveKit frame to aiortc frame."""
from av import AudioFrame as AVAudioFrame
av_frame = AVAudioFrame(
format="s16",
layout="mono",
samples=frame.samples_per_channel
)
av_frame.sample_rate = frame.sample_rate
av_frame.planes[0].update(frame.data)
return av_frame
- Call Control API \
5.1 Call Control Operations \
5.1.1 Hold Call \
Place the remote party on hold (they hear hold music).async def hold_call(self, call_id: str) -> dict:
"""
Place a call on hold.
Args:
call_id: The call to hold
Returns:
Updated call object
"""
token = await self.auth_manager.get_access_token()
endpoint = ENDPOINTS["hold"].format(call_id=call_id)
async with httpx.AsyncClient() as client:
response = await client.put(
endpoint,
headers={
"Authorization": f"Bearer {token}",
"Accept": "application/json",
}
)
response.raise_for_status()
return response.json()
PUT /web-calls/v1/calls/{call_id}/hold
{
"callId": "call_xyz789",
"state": "held",
"holdStartedAt": "2026-01-16T10:35:00Z"
}
5.1.2 Resume Call \
Resume a held call.async def resume_call(self, call_id: str) -> dict:
"""
Resume a held call.
Args:
call_id: The call to resume
Returns:
Updated call object
"""
token = await self.auth_manager.get_access_token()
endpoint = ENDPOINTS["resume"].format(call_id=call_id)
async with httpx.AsyncClient() as client:
response = await client.put(
endpoint,
headers={
"Authorization": f"Bearer {token}",
"Accept": "application/json",
}
)
response.raise_for_status()
return response.json()
PUT /web-calls/v1/calls/{call_id}/resume
5.1.3 Mute/Unmute \
Control the microphone (our audio to the caller).async def mute_call(self, call_id: str) -> dict:
"""Mute our audio (caller won't hear us)."""
token = await self.auth_manager.get_access_token()
endpoint = ENDPOINTS["mute"].format(call_id=call_id)
async with httpx.AsyncClient() as client:
response = await client.put(
endpoint,
headers={"Authorization": f"Bearer {token}"}
)
response.raise_for_status()
return response.json()
async def unmute_call(self, call_id: str) -> dict:
"""Unmute our audio."""
token = await self.auth_manager.get_access_token()
endpoint = ENDPOINTS["unmute"].format(call_id=call_id)
async with httpx.AsyncClient() as client:
response = await client.put(
endpoint,
headers={"Authorization": f"Bearer {token}"}
)
response.raise_for_status()
return response.json()
5.1.4 Send DTMF \
Send touch-tone digits.async def send_dtmf(
self,
call_id: str,
digits: str,
duration_ms: int = 100,
gap_ms: int = 50
) -> dict:
"""
Send DTMF tones.
Args:
call_id: The call ID
digits: Digits to send (0-9, *, #)
duration_ms: Duration of each tone
gap_ms: Gap between tones
Returns:
Confirmation response
"""
token = await self.auth_manager.get_access_token()
endpoint = ENDPOINTS["dtmf"].format(call_id=call_id)
async with httpx.AsyncClient() as client:
response = await client.post(
endpoint,
json={
"digits": digits,
"durationMs": duration_ms,
"gapMs": gap_ms,
},
headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}
)
response.raise_for_status()
return response.json()
POST /web-calls/v1/calls/{call_id}/dtmf
{
"digits": "1234#",
"durationMs": 100,
"gapMs": 50
}
5.1.5 Hang Up \
End the call.async def hangup_call(
self,
call_id: str,
reason: str = "normal"
) -> dict:
"""
End a call.
Args:
call_id: The call to end
reason: Reason for ending (normal, busy, rejected)
Returns:
Final call object
"""
token = await self.auth_manager.get_access_token()
endpoint = ENDPOINTS["hangup"].format(call_id=call_id)
async with httpx.AsyncClient() as client:
response = await client.post(
endpoint,
json={"reason": reason},
headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}
)
response.raise_for_status()
return response.json()
5.2 Transfer Operations \
5.2.1 Blind Transfer \
Transfer the call immediately without consulting the target.async def blind_transfer(
self,
call_id: str,
dial_string: str
) -> dict:
"""
Perform a blind (cold) transfer.
The caller is immediately connected to the transfer target.
Our connection is terminated.
Args:
call_id: The call to transfer
dial_string: Target (e.g., "ext:1001" or "tel:+15551234567")
Returns:
Transfer result
"""
token = await self.auth_manager.get_access_token()
endpoint = ENDPOINTS["blind_transfer"].format(call_id=call_id)
async with httpx.AsyncClient() as client:
response = await client.post(
endpoint,
json={
"dialString": dial_string,
},
headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}
)
response.raise_for_status()
return response.json()
POST /web-calls/v1/calls/{call_id}/blind-transfer
{
"dialString": "ext:1001"
}
{
"callId": "call_xyz789",
"state": "transferred",
"transferType": "blind",
"transferTarget": "ext:1001",
"transferredAt": "2026-01-16T10:40:00Z"
}
**Dial String Formats:** | Format | Example | Description | |--------|---------|-------------| | `ext:{extension}` | `ext:1001` | Internal extension | | `tel:{number}` | `tel:+15551234567` | External phone number | | `sip:{uri}` | `sip:user@domain.com` | SIP URI | | `voicemail:{extension}` | `voicemail:1001` | Extension's voicemail |
#### 5.2.2 Warm Transfer (Attended Transfer) {#5.2.2-warm-transfer-(attended-transfer)}
┌─────────────────────────────────────────────────────────────────────────────┐
│ WARM TRANSFER FLOW │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ Caller AI Agent Human Agent GoToConnect │
│ │ │ │ │ │
│ │ 1. Talking │ │ │ │
│ │◀──────────────────▶│ │ │ │
│ │ │ │ │ │
│ │ │ 2. Hold caller │ │ │
│ │ (hold music) │────────────────────────────────────────▶│ │
│ │◀─ ─ ─ ─ ─ ─ ─ ─ ─ ─│ │ │ │
│ │ │ │ │ │
│ │ │ 3. Call agent │ │ │
│ │ │────────────────────────────────────────▶│ │
│ │ │ │◀───────────────────│ │
│ │ │ │ │ │
│ │ │ 4. Brief agent │ │ │
│ │ │◀───────────────────▶│ │ │
│ │ │ "Customer John, │ │ │
│ │ │ billing issue" │ │ │
│ │ │ │ │ │
│ │ │ 5. Complete transfer │ │
│ │ │────────────────────────────────────────▶│ │
│ │ │ │ │ │
│ │ 6. Connected to agent │ │ │
│ │◀────────────────────────────────────────▶│ │ │
│ │ │ │ │ │
│ │ │ 7. AI disconnected │ │ │
│ │ │◀────────────────────────────────────────│ │
│ │ │ │ │ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
async def warm_transfer(
self,
call_id: str,
dial_string: str,
announcement: str = None
) -> dict:
"""
Perform a warm (attended) transfer.
This is a multi-step process:
1. Place caller on hold
2. Call the transfer target
3. Brief the target (optional)
4. Complete the transfer
Args:
call_id: The original call ID
dial_string: Target to transfer to
announcement: Message to speak to target before transfer
Returns:
Transfer result
"""
# Step 1: Hold the original caller
await self.hold_call(call_id)
# Step 2: Initiate call to transfer target
consult_call = await self.initiate_call(
dial_string=dial_string,
caller_id=self._get_caller_id(call_id),
line_id=self._get_line_id(call_id),
)
consult_call_id = consult_call["callId"]
# Step 3: Wait for target to answer
# (This would be handled via events in practice)
# Step 4: Complete the transfer
token = await self.auth_manager.get_access_token()
endpoint = ENDPOINTS["warm_transfer"].format(call_id=call_id)
async with httpx.AsyncClient() as client:
response = await client.post(
endpoint,
json={
"referId": consult_call_id,
},
headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}
)
response.raise_for_status()
return response.json()
POST /web-calls/v1/calls/{call_id}/warm-transfer
{
"referId": "call_consult_abc"
}
5.2.3 Conference (Merge Calls) \
Add a third party to an existing call.async def merge_calls(
self,
call_id: str,
other_call_id: str
) -> dict:
"""
Merge two calls into a conference.
Both parties will be connected together with us.
Args:
call_id: First call ID
other_call_id: Second call ID to merge
Returns:
Merged call result
"""
token = await self.auth_manager.get_access_token()
endpoint = ENDPOINTS["merge"].format(call_id=call_id)
async with httpx.AsyncClient() as client:
response = await client.post(
endpoint,
json={
"referId": other_call_id,
},
headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}
)
response.raise_for_status()
return response.json()
POST /web-calls/v1/calls/{call_id}/merge
{
"referId": "call_supervisor_xyz"
}
5.3 Call Control State Machine \
┌─────────────────────────────────────────────────────────────────────────────┐
│ CALL CONTROL STATE MACHINE │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ │
│ ┌─────────────────────────────────────────┐ │
│ │ │ │
│ │ ┌─────────┐ │ │
│ │ │ INITIAL │ │ │
│ │ └────┬────┘ │ │
│ │ │ │ │
│ │ ┌──────────────┼──────────────┐ │ │
│ │ │ │ │ │ │
│ │ ▼ ▼ ▼ │ │
│ │ ┌──────┐ ┌─────────┐ ┌────────┐ │ │
│ │ │DIALING │ RINGING │ │OFFERING│ │ │
│ │ │(outbound) │(inbound)│ │(inbound│ │ │
│ │ └────┬─┘ └────┬────┘ │ WebRTC)│ │ │
│ │ │ │ └───┬────┘ │ │
│ │ │ │ │ │ │
│ │ └───────────┼─────────────┘ │ │
│ │ │ │ │
│ │ │ answer │ │
│ │ ▼ │ │
│ │ ┌───────────┐ │ │
│ resume │ ┌─────▶│ CONNECTED │◀─────┐ │ │
│ │ │ │ └─────┬─────┘ │ │ │
│ │ │ │ │ │ │ │
│ │ │ │ ┌───────┴───────┐ │ │ │
│ │ │ │ │ │ │ │ │
│ │ │ │ ▼ ▼ │ │ │
│ │ │ ┌──────────┐ ┌──────────┐│ │ │
│ └─────────┼──▶│ HELD │ │ MUTED ││ │ │
│ │ └──────────┘ └──────────┘│ │ │
│ │ │ │ │
│ │ │ unmute │ │
│ │ │ │ │
│ │ │ │
│ │ From any state: │ │
│ │ │ │
│ │ hangup transfer │ │
│ │ │ │ │ │
│ │ ▼ ▼ │ │
│ │ ┌─────────┐ ┌────────────┐ │ │
│ │ │ ENDED │ │TRANSFERRED │ │ │
│ │ └─────────┘ └────────────┘ │ │
│ │ │ │
│ └─────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
5.4 Call Control Client \
Complete client implementation:# integrations/gotoconnect/call_control.py
from dataclasses import dataclass
from enum import Enum
from typing import Optional
import httpx
class CallState(Enum):
INITIAL = "initial"
DIALING = "dialing"
RINGING = "ringing"
OFFERING = "offering"
CONNECTED = "connected"
HELD = "held"
MUTED = "muted"
TRANSFERRED = "transferred"
ENDED = "ended"
@dataclass
class Call:
"""Represents a GoToConnect call."""
call_id: str
state: CallState
direction: str
from_number: str
to_number: str
line_id: str
created_at: str
answered_at: Optional[str] = None
ended_at: Optional[str] = None
sdp_offer: Optional[str] = None
sdp_answer: Optional[str] = None
class GoToCallControlClient:
"""
Client for GoToConnect Call Control operations.
Provides methods for all call control operations including
hold, transfer, merge, and DTMF.
"""
BASE_URL = "https://api.goto.com/web-calls/v1"
def __init__(self, auth_manager: GoToAuthManager):
self.auth_manager = auth_manager
async def _request(
self,
method: str,
endpoint: str,
json: dict = None
) -> dict:
"""Make an authenticated request to GoToConnect."""
token = await self.auth_manager.get_access_token()
async with httpx.AsyncClient() as client:
response = await client.request(
method=method,
url=f"{self.BASE_URL}{endpoint}",
json=json,
headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
"Accept": "application/json",
},
timeout=30.0,
)
if response.status_code >= 400:
await self._handle_error(response)
if response.status_code == 204:
return {}
return response.json()
async def _handle_error(self, response: httpx.Response) -> None:
"""Handle API error responses."""
try:
error_data = response.json()
except Exception:
error_data = {"message": response.text}
status = response.status_code
if status == 401:
raise GoToAuthError("Authentication failed")
elif status == 403:
raise GoToPermissionError(
f"Permission denied: {error_data.get('message')}"
)
elif status == 404:
raise GoToNotFoundError(
f"Resource not found: {error_data.get('message')}"
)
elif status == 409:
raise GoToConflictError(
f"Conflict: {error_data.get('message')}"
)
elif status == 429:
raise GoToRateLimitError(
f"Rate limit exceeded: {error_data.get('message')}"
)
else:
raise GoToAPIError(
f"API error {status}: {error_data.get('message')}"
)
# Call initiation
async def initiate_call(
self,
dial_string: str,
caller_id: str,
line_id: str
) -> Call:
"""Initiate an outbound call."""
data = await self._request(
"POST",
"/calls",
json={
"dialString": dial_string,
"callerId": caller_id,
"lineId": line_id,
}
)
return self._parse_call(data)
async def answer_call(
self,
call_id: str,
sdp_answer: str
) -> Call:
"""Answer an inbound call."""
data = await self._request(
"POST",
f"/calls/{call_id}/answer",
json={"sdpAnswer": sdp_answer}
)
return self._parse_call(data)
# Call control
async def hangup(self, call_id: str) -> Call:
"""End a call."""
data = await self._request("POST", f"/calls/{call_id}/hangup")
return self._parse_call(data)
async def hold(self, call_id: str) -> Call:
"""Place call on hold."""
data = await self._request("PUT", f"/calls/{call_id}/hold")
return self._parse_call(data)
async def resume(self, call_id: str) -> Call:
"""Resume held call."""
data = await self._request("PUT", f"/calls/{call_id}/resume")
return self._parse_call(data)
async def mute(self, call_id: str) -> Call:
"""Mute our audio."""
data = await self._request("PUT", f"/calls/{call_id}/mute")
return self._parse_call(data)
async def unmute(self, call_id: str) -> Call:
"""Unmute our audio."""
data = await self._request("PUT", f"/calls/{call_id}/unmute")
return self._parse_call(data)
async def send_dtmf(
self,
call_id: str,
digits: str,
duration_ms: int = 100,
gap_ms: int = 50
) -> dict:
"""Send DTMF tones."""
return await self._request(
"POST",
f"/calls/{call_id}/dtmf",
json={
"digits": digits,
"durationMs": duration_ms,
"gapMs": gap_ms,
}
)
# Transfers
async def blind_transfer(
self,
call_id: str,
dial_string: str
) -> Call:
"""Perform blind transfer."""
data = await self._request(
"POST",
f"/calls/{call_id}/blind-transfer",
json={"dialString": dial_string}
)
return self._parse_call(data)
async def warm_transfer(
self,
call_id: str,
refer_id: str
) -> Call:
"""Complete warm transfer."""
data = await self._request(
"POST",
f"/calls/{call_id}/warm-transfer",
json={"referId": refer_id}
)
return self._parse_call(data)
async def merge(
self,
call_id: str,
refer_id: str
) -> Call:
"""Merge calls into conference."""
data = await self._request(
"POST",
f"/calls/{call_id}/merge",
json={"referId": refer_id}
)
return self._parse_call(data)
# ICE candidates
async def send_ice_candidate(
self,
call_id: str,
candidate: str,
sdp_mid: str,
sdp_m_line_index: int
) -> None:
"""Send ICE candidate."""
await self._request(
"POST",
f"/calls/{call_id}/ice-candidates",
json={
"candidate": candidate,
"sdpMid": sdp_mid,
"sdpMLineIndex": sdp_m_line_index,
}
)
# Helpers
def _parse_call(self, data: dict) -> Call:
"""Parse API response into Call object."""
return Call(
call_id=data["callId"],
state=CallState(data["state"]),
direction=data.get("direction", "unknown"),
from_number=data.get("from", ""),
to_number=data.get("to", ""),
line_id=data.get("lineId", ""),
created_at=data.get("createdAt", ""),
answered_at=data.get("answeredAt"),
ended_at=data.get("endedAt"),
sdp_offer=data.get("sdpOffer"),
sdp_answer=data.get("sdpAnswer"),
)
- Event Subscriptions \
6.1 Event System Overview \
GoToConnect provides real-time events via WebSocket. Events notify us of call state changes, allowing reactive handling.┌─────────────────────────────────────────────────────────────────────────────┐
│ EVENT SUBSCRIPTION FLOW │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ Voice by │ │ GoToConnect │ │
│ │ aiConnected │ │ │ │
│ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │
│ │ 1. POST /notifications/channels │ │
│ │ Create channel for events │ │
│ │────────────────────────────────────────────▶│ │
│ │ │ │
│ │ 2. Response: channel_id, websocket_url │ │
│ │◀────────────────────────────────────────────│ │
│ │ │ │
│ │ 3. POST /notifications/subscriptions │ │
│ │ Subscribe to call events │ │
│ │────────────────────────────────────────────▶│ │
│ │ │ │
│ │ 4. Connect WebSocket │ │
│ │═══════════════════════════════════════════▶│ │
│ │ │ │
│ │ 5. Events flow via WebSocket │ │
│ │◀══════════════════════════════════════════▶│ │
│ │ { "event": "call.ringing", ... } │ │
│ │ { "event": "call.connected", ... } │ │
│ │ { "event": "call.ended", ... } │ │
│ │ │ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
6.2 Create Notification Channel \
# integrations/gotoconnect/events.py
class GoToEventManager:
"""Manages GoToConnect event subscriptions."""
EVENTS_BASE_URL = "https://api.goto.com/call-events/v1"
def __init__(self, auth_manager: GoToAuthManager):
self.auth_manager = auth_manager
self._channel_id: Optional[str] = None
self._websocket_url: Optional[str] = None
self._subscriptions: dict[str, str] = {}
async def create_channel(self) -> tuple[str, str]:
"""
Create a notification channel.
Returns:
Tuple of (channel_id, websocket_url)
"""
token = await self.auth_manager.get_access_token()
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.EVENTS_BASE_URL}/notifications/channels",
headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}
)
response.raise_for_status()
data = response.json()
self._channel_id = data["channelId"]
self._websocket_url = data["websocketUrl"]
return self._channel_id, self._websocket_url
POST /call-events/v1/notifications/channels
{
"channelId": "ch_abc123def456",
"websocketUrl": "wss://realtime.goto.com/v1/notifications?channelId=ch_abc123def456",
"expiresAt": "2026-01-16T11:30:00Z"
}
6.3 Subscribe to Events \
async def subscribe_to_calls(
self,
line_ids: list[str] = None,
event_types: list[str] = None
) -> str:
"""
Subscribe to call events.
Args:
line_ids: Specific lines to subscribe to (None = all)
event_types: Specific events (None = all)
Returns:
Subscription ID
"""
if not self._channel_id:
await self.create_channel()
token = await self.auth_manager.get_access_token()
# Default to all call events
if event_types is None:
event_types = [
"call.ringing",
"call.connected",
"call.ended",
"call.held",
"call.resumed",
"call.transferred",
"call.dtmf",
"call.recording",
"call.ice_candidate",
]
subscription_request = {
"channelId": self._channel_id,
"eventTypes": event_types,
}
if line_ids:
subscription_request["lineIds"] = line_ids
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.EVENTS_BASE_URL}/notifications/subscriptions",
json=subscription_request,
headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}
)
response.raise_for_status()
data = response.json()
subscription_id = data["subscriptionId"]
self._subscriptions[subscription_id] = data
return subscription_id
POST /call-events/v1/notifications/subscriptions
{
"channelId": "ch_abc123def456",
"eventTypes": [
"call.ringing",
"call.connected",
"call.ended",
"call.held",
"call.resumed",
"call.transferred"
],
"lineIds": ["line_001", "line_002"]
}
{
"subscriptionId": "sub_xyz789",
"channelId": "ch_abc123def456",
"eventTypes": ["call.ringing", "call.connected", ...],
"lineIds": ["line_001", "line_002"],
"createdAt": "2026-01-16T10:30:00Z"
}
6.4 WebSocket Connection \
import websockets
import json
from typing import Callable, Awaitable
EventHandler = Callable[[dict], Awaitable[None]]
class GoToEventListener:
"""
WebSocket listener for GoToConnect events.
"""
def __init__(
self,
websocket_url: str,
auth_manager: GoToAuthManager
):
self.websocket_url = websocket_url
self.auth_manager = auth_manager
self._handlers: dict[str, list[EventHandler]] = {}
self._running = False
self._ws = None
def on(self, event_type: str, handler: EventHandler) -> None:
"""Register an event handler."""
if event_type not in self._handlers:
self._handlers[event_type] = []
self._handlers[event_type].append(handler)
async def start(self) -> None:
"""Start listening for events."""
self._running = True
token = await self.auth_manager.get_access_token()
# Add auth token to WebSocket URL
ws_url = f"{self.websocket_url}&token={token}"
while self._running:
try:
async with websockets.connect(ws_url) as ws:
self._ws = ws
await self._listen(ws)
except websockets.ConnectionClosed:
if self._running:
logger.warning("WebSocket disconnected, reconnecting...")
await asyncio.sleep(1)
except Exception as e:
logger.error(f"WebSocket error: {e}")
if self._running:
await asyncio.sleep(5)
async def _listen(self, ws) -> None:
"""Listen for events on the WebSocket."""
async for message in ws:
try:
event = json.loads(message)
await self._dispatch(event)
except json.JSONDecodeError:
logger.warning(f"Invalid JSON: {message}")
except Exception as e:
logger.error(f"Error handling event: {e}")
async def _dispatch(self, event: dict) -> None:
"""Dispatch event to registered handlers."""
event_type = event.get("type") or event.get("event")
if not event_type:
logger.warning(f"Event without type: {event}")
return
handlers = self._handlers.get(event_type, [])
handlers += self._handlers.get("*", []) # Wildcard handlers
for handler in handlers:
try:
await handler(event)
except Exception as e:
logger.error(f"Handler error for {event_type}: {e}")
async def stop(self) -> None:
"""Stop listening."""
self._running = False
if self._ws:
await self._ws.close()
6.5 Event Types \
6.5.1 call.ringing \
Fired when an inbound call arrives.{
"type": "call.ringing",
"timestamp": "2026-01-16T10:30:00.123Z",
"data": {
"callId": "call_xyz789",
"direction": "inbound",
"from": "+15551234567",
"to": "+15559876543",
"lineId": "line_abc123",
"sdpOffer": "v=0\r\no=- 123456789 2 IN IP4 127.0.0.1\r\n..."
}
}
6.5.2 call.connected \
Fired when a call is answered.{
"type": "call.connected",
"timestamp": "2026-01-16T10:30:05.456Z",
"data": {
"callId": "call_xyz789",
"direction": "inbound",
"from": "+15551234567",
"to": "+15559876543",
"lineId": "line_abc123",
"answeredAt": "2026-01-16T10:30:05.456Z"
}
}
6.5.3 call.ended \
Fired when a call ends.{
"type": "call.ended",
"timestamp": "2026-01-16T10:35:30.789Z",
"data": {
"callId": "call_xyz789",
"direction": "inbound",
"from": "+15551234567",
"to": "+15559876543",
"lineId": "line_abc123",
"durationSeconds": 325,
"endReason": "caller_hangup",
"endedAt": "2026-01-16T10:35:30.789Z"
}
}
caller_hangup | Remote party hung up | | agent_hangup | We hung up | | transfer | Call was transferred | | timeout | Call timed out | | error | Call failed due to error | | busy | Remote party was busy | | no_answer | Remote party didn’t answer | | rejected | Call was rejected |
6.5.4 call.held \
Fired when a call is placed on hold.{
"type": "call.held",
"timestamp": "2026-01-16T10:32:00.000Z",
"data": {
"callId": "call_xyz789",
"holdStartedAt": "2026-01-16T10:32:00.000Z"
}
}
6.5.5 call.resumed \
Fired when a held call is resumed.{
"type": "call.resumed",
"timestamp": "2026-01-16T10:33:00.000Z",
"data": {
"callId": "call_xyz789",
"holdDurationSeconds": 60
}
}
6.5.6 call.transferred \
Fired when a call is transferred.{
"type": "call.transferred",
"timestamp": "2026-01-16T10:34:00.000Z",
"data": {
"callId": "call_xyz789",
"transferType": "blind",
"transferTarget": "ext:1001",
"transferredAt": "2026-01-16T10:34:00.000Z"
}
}
6.5.7 call.ice_candidate \
Fired when a new ICE candidate is available.{
"type": "call.ice_candidate",
"timestamp": "2026-01-16T10:30:01.000Z",
"data": {
"callId": "call_xyz789",
"candidate": {
"candidate": "candidate:1 1 UDP 2130706431 192.168.1.100 54321 typ host",
"sdpMid": "audio",
"sdpMLineIndex": 0
}
}
}
6.5.8 call.dtmf \
Fired when DTMF tones are received.{
"type": "call.dtmf",
"timestamp": "2026-01-16T10:31:00.000Z",
"data": {
"callId": "call_xyz789",
"digit": "5",
"durationMs": 120
}
}
6.6 Event Handler Implementation \
class CallEventHandler:
"""Handles GoToConnect call events."""
def __init__(
self,
event_listener: GoToEventListener,
call_manager: CallManager,
redis_client: Redis
):
self.event_listener = event_listener
self.call_manager = call_manager
self.redis = redis_client
# Register handlers
self._register_handlers()
def _register_handlers(self) -> None:
"""Register event handlers."""
self.event_listener.on("call.ringing", self._on_call_ringing)
self.event_listener.on("call.connected", self._on_call_connected)
self.event_listener.on("call.ended", self._on_call_ended)
self.event_listener.on("call.held", self._on_call_held)
self.event_listener.on("call.resumed", self._on_call_resumed)
self.event_listener.on("call.transferred", self._on_call_transferred)
self.event_listener.on("call.ice_candidate", self._on_ice_candidate)
self.event_listener.on("call.dtmf", self._on_dtmf)
async def _on_call_ringing(self, event: dict) -> None:
"""Handle incoming call."""
data = event["data"]
call_id = data["callId"]
logger.info(f"Incoming call: {call_id} from {data['from']}")
# Look up tenant by phone number
tenant = await self._lookup_tenant_by_number(data["to"])
if not tenant:
logger.warning(f"No tenant for number {data['to']}")
return
# Create call record
call = await self.call_manager.create_call(
external_call_id=call_id,
tenant_id=tenant.id,
direction="inbound",
from_number=data["from"],
to_number=data["to"],
line_id=data["lineId"],
)
# Store SDP offer for WebRTC bridge
await self.redis.hset(
f"call:{call.id}:webrtc",
mapping={
"sdp_offer": data["sdpOffer"],
"state": "ringing",
}
)
# Publish event for WebRTC bridge
await self.redis.publish(
f"call:{call.id}:events",
json.dumps({
"event": "call.ringing",
"call_id": str(call.id),
"external_call_id": call_id,
"tenant_id": str(tenant.id),
"sdp_offer": data["sdpOffer"],
})
)
async def _on_call_connected(self, event: dict) -> None:
"""Handle call connected."""
data = event["data"]
call_id = data["callId"]
logger.info(f"Call connected: {call_id}")
# Update call record
call = await self.call_manager.get_by_external_id(call_id)
if call:
await self.call_manager.update_call(
call.id,
status="connected",
answered_at=data["answeredAt"],
)
# Notify agent service to join
await self.redis.publish(
f"call:{call.id}:events",
json.dumps({
"event": "call.connected",
"call_id": str(call.id),
})
)
async def _on_call_ended(self, event: dict) -> None:
"""Handle call ended."""
data = event["data"]
call_id = data["callId"]
logger.info(f"Call ended: {call_id}, reason: {data['endReason']}")
call = await self.call_manager.get_by_external_id(call_id)
if call:
await self.call_manager.update_call(
call.id,
status="ended",
ended_at=data["endedAt"],
duration_seconds=data["durationSeconds"],
end_reason=data["endReason"],
)
# Notify all services
await self.redis.publish(
f"call:{call.id}:events",
json.dumps({
"event": "call.ended",
"call_id": str(call.id),
"duration_seconds": data["durationSeconds"],
"end_reason": data["endReason"],
})
)
# Clean up Redis state
await self._cleanup_call_state(call.id)
async def _on_call_held(self, event: dict) -> None:
"""Handle call held."""
data = event["data"]
call_id = data["callId"]
call = await self.call_manager.get_by_external_id(call_id)
if call:
await self.redis.publish(
f"call:{call.id}:events",
json.dumps({
"event": "call.held",
"call_id": str(call.id),
})
)
async def _on_call_resumed(self, event: dict) -> None:
"""Handle call resumed."""
data = event["data"]
call_id = data["callId"]
call = await self.call_manager.get_by_external_id(call_id)
if call:
await self.redis.publish(
f"call:{call.id}:events",
json.dumps({
"event": "call.resumed",
"call_id": str(call.id),
})
)
async def _on_call_transferred(self, event: dict) -> None:
"""Handle call transferred."""
data = event["data"]
call_id = data["callId"]
call = await self.call_manager.get_by_external_id(call_id)
if call:
await self.call_manager.update_call(
call.id,
status="transferred",
end_reason=f"transferred_to_{data['transferTarget']}",
)
async def _on_ice_candidate(self, event: dict) -> None:
"""Handle ICE candidate."""
data = event["data"]
call_id = data["callId"]
call = await self.call_manager.get_by_external_id(call_id)
if call:
await self.redis.publish(
f"call:{call.id}:events",
json.dumps({
"event": "call.ice_candidate",
"call_id": str(call.id),
"candidate": data["candidate"],
})
)
async def _on_dtmf(self, event: dict) -> None:
"""Handle DTMF tone."""
data = event["data"]
call_id = data["callId"]
call = await self.call_manager.get_by_external_id(call_id)
if call:
await self.redis.publish(
f"call:{call.id}:events",
json.dumps({
"event": "call.dtmf",
"call_id": str(call.id),
"digit": data["digit"],
})
)
async def _lookup_tenant_by_number(self, phone_number: str) -> Optional[Tenant]:
"""Look up tenant by phone number."""
# Implementation depends on your data model
pass
async def _cleanup_call_state(self, call_id: str) -> None:
"""Clean up Redis state for ended call."""
keys_to_delete = [
f"call:{call_id}:state",
f"call:{call_id}:webrtc",
f"call:{call_id}:context",
]
for key in keys_to_delete:
await self.redis.delete(key)
6.7 Subscription Management \
async def refresh_subscriptions(self) -> None:
"""Refresh event subscriptions before they expire."""
# Channels expire after a period of inactivity
# Refresh by creating new channel and resubscribing
old_channel_id = self._channel_id
# Create new channel
await self.create_channel()
# Resubscribe with same parameters
for sub_id, sub_data in list(self._subscriptions.items()):
await self.subscribe_to_calls(
line_ids=sub_data.get("lineIds"),
event_types=sub_data.get("eventTypes"),
)
logger.info(f"Refreshed subscriptions: {old_channel_id} -> {self._channel_id}")
async def unsubscribe(self, subscription_id: str) -> None:
"""Remove a subscription."""
token = await self.auth_manager.get_access_token()
async with httpx.AsyncClient() as client:
response = await client.delete(
f"{self.EVENTS_BASE_URL}/notifications/subscriptions/{subscription_id}",
headers={"Authorization": f"Bearer {token}"}
)
response.raise_for_status()
del self._subscriptions[subscription_id]
- Phone Number Management \
7.1 Lines API \
7.1.1 List Lines \
async def list_lines(self) -> list[dict]:
"""
List all lines/extensions for the account.
Returns:
List of line objects
"""
token = await self.auth_manager.get_access_token()
account_key = self.auth_manager.account_key
async with httpx.AsyncClient() as client:
response = await client.get(
f"https://api.goto.com/users/v1/accounts/{account_key}/lines",
headers={
"Authorization": f"Bearer {token}",
"Accept": "application/json",
}
)
response.raise_for_status()
return response.json()["items"]
{
"items": [
{
"lineId": "line_abc123",
"extension": "1001",
"displayName": "AI Agent 1",
"type": "user",
"status": "active",
"phoneNumbers": [
{
"number": "+15559876543",
"type": "direct"
}
]
},
{
"lineId": "line_def456",
"extension": "1002",
"displayName": "AI Agent 2",
"type": "user",
"status": "active",
"phoneNumbers": [
{
"number": "+15551112222",
"type": "direct"
}
]
}
]
}
7.1.2 Get Line Details \
async def get_line(self, line_id: str) -> dict:
"""Get details for a specific line."""
token = await self.auth_manager.get_access_token()
account_key = self.auth_manager.account_key
async with httpx.AsyncClient() as client:
response = await client.get(
f"https://api.goto.com/users/v1/accounts/{account_key}/lines/{line_id}",
headers={
"Authorization": f"Bearer {token}",
"Accept": "application/json",
}
)
response.raise_for_status()
return response.json()
7.2 Phone Number to Line Mapping \
We need to map incoming phone numbers to our tenant/agent configuration:class PhoneNumberRegistry:
"""Maps phone numbers to tenants and agents."""
def __init__(self, db: AsyncSession, cache: Redis):
self.db = db
self.cache = cache
async def get_config_for_number(
self,
phone_number: str
) -> Optional[PhoneNumberConfig]:
"""
Get configuration for a phone number.
Args:
phone_number: E.164 format phone number
Returns:
Configuration including tenant, agent, line info
"""
# Check cache first
cache_key = f"phone_number:{phone_number}"
cached = await self.cache.get(cache_key)
if cached:
return PhoneNumberConfig.parse_raw(cached)
# Query database
result = await self.db.execute(
select(PhoneNumber)
.options(
joinedload(PhoneNumber.tenant),
joinedload(PhoneNumber.agent),
)
.where(PhoneNumber.number == phone_number)
.where(PhoneNumber.status == "active")
)
phone_number_record = result.scalar_one_or_none()
if not phone_number_record:
return None
config = PhoneNumberConfig(
phone_number=phone_number,
tenant_id=phone_number_record.tenant_id,
agent_id=phone_number_record.agent_id,
line_id=phone_number_record.line_id,
greeting_enabled=phone_number_record.greeting_enabled,
)
# Cache for 5 minutes
await self.cache.setex(
cache_key,
300,
config.json()
)
return config
async def register_number(
self,
phone_number: str,
tenant_id: str,
agent_id: str,
line_id: str
) -> PhoneNumber:
"""Register a phone number for a tenant."""
# Verify the line exists and has this number
goto_line = await self.goto_client.get_line(line_id)
has_number = any(
pn["number"] == phone_number
for pn in goto_line.get("phoneNumbers", [])
)
if not has_number:
raise ValueError(
f"Line {line_id} does not have number {phone_number}"
)
# Create database record
phone_number_record = PhoneNumber(
number=phone_number,
tenant_id=tenant_id,
agent_id=agent_id,
line_id=line_id,
status="active",
)
self.db.add(phone_number_record)
await self.db.commit()
# Invalidate cache
await self.cache.delete(f"phone_number:{phone_number}")
return phone_number_record
- Error Handling \
8.1 Error Categories \
# integrations/gotoconnect/exceptions.py
class GoToError(Exception):
"""Base exception for GoToConnect errors."""
pass
class GoToAuthError(GoToError):
"""Authentication/authorization errors."""
pass
class GoToAPIError(GoToError):
"""General API errors."""
def __init__(self, message: str, status_code: int = None, error_code: str = None):
super().__init__(message)
self.status_code = status_code
self.error_code = error_code
class GoToNotFoundError(GoToAPIError):
"""Resource not found (404)."""
pass
class GoToConflictError(GoToAPIError):
"""Conflict error (409) - e.g., call already ended."""
pass
class GoToRateLimitError(GoToAPIError):
"""Rate limit exceeded (429)."""
def __init__(self, message: str, retry_after: int = None):
super().__init__(message, status_code=429)
self.retry_after = retry_after
class GoToPermissionError(GoToAPIError):
"""Permission denied (403)."""
pass
class GoToWebRTCError(GoToError):
"""WebRTC-specific errors."""
pass
class GoToCallError(GoToError):
"""Call operation errors."""
pass
8.2 Error Response Handling \
async def _handle_api_response(
self,
response: httpx.Response,
operation: str
) -> dict:
"""
Handle API response with appropriate error handling.
Args:
response: HTTP response
operation: Description of the operation
Returns:
Response JSON data
Raises:
Appropriate GoToError subclass
"""
status = response.status_code
# Success
if 200 <= status < 300:
if status == 204:
return {}
return response.json()
# Parse error response
try:
error_data = response.json()
error_message = error_data.get("message", response.text)
error_code = error_data.get("code")
except Exception:
error_message = response.text
error_code = None
# Map to specific exceptions
if status == 400:
raise GoToAPIError(
f"Bad request for {operation}: {error_message}",
status_code=status,
error_code=error_code,
)
elif status == 401:
raise GoToAuthError(f"Authentication failed for {operation}")
elif status == 403:
raise GoToPermissionError(
f"Permission denied for {operation}: {error_message}"
)
elif status == 404:
raise GoToNotFoundError(
f"Not found for {operation}: {error_message}"
)
elif status == 409:
raise GoToConflictError(
f"Conflict for {operation}: {error_message}"
)
elif status == 429:
retry_after = response.headers.get("Retry-After")
raise GoToRateLimitError(
f"Rate limit exceeded for {operation}",
retry_after=int(retry_after) if retry_after else None,
)
else:
raise GoToAPIError(
f"API error for {operation}: {error_message}",
status_code=status,
error_code=error_code,
)
8.3 Retry Logic \
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
)
class GoToClientWithRetry:
"""GoToConnect client with automatic retry for transient errors."""
@retry(
retry=retry_if_exception_type((
httpx.TimeoutException,
httpx.NetworkError,
GoToRateLimitError,
)),
wait=wait_exponential(multiplier=1, min=1, max=60),
stop=stop_after_attempt(5),
before_sleep=lambda retry_state: logger.warning(
f"Retrying {retry_state.fn.__name__}, attempt {retry_state.attempt_number}"
),
)
async def make_call_with_retry(
self,
dial_string: str,
caller_id: str,
line_id: str
) -> dict:
"""Make a call with automatic retry."""
return await self.initiate_call(dial_string, caller_id, line_id)
async def make_call_safe(
self,
dial_string: str,
caller_id: str,
line_id: str
) -> Optional[dict]:
"""Make a call, returning None on failure."""
try:
return await self.make_call_with_retry(dial_string, caller_id, line_id)
except Exception as e:
logger.error(f"Failed to make call after retries: {e}")
return None
8.4 WebSocket Reconnection \
class ResilientEventListener:
"""Event listener with automatic reconnection."""
def __init__(self, event_manager: GoToEventManager):
self.event_manager = event_manager
self._reconnect_delay = 1 # Start with 1 second
self._max_reconnect_delay = 300 # Max 5 minutes
self._running = False
async def start(self) -> None:
"""Start with automatic reconnection."""
self._running = True
while self._running:
try:
await self._connect_and_listen()
# Reset delay on successful connection
self._reconnect_delay = 1
except websockets.ConnectionClosed as e:
logger.warning(f"WebSocket closed: {e.code} {e.reason}")
if self._running:
await self._reconnect()
except Exception as e:
logger.error(f"WebSocket error: {e}")
if self._running:
await self._reconnect()
async def _connect_and_listen(self) -> None:
"""Connect and listen for events."""
# Ensure we have a valid channel
if not self.event_manager._channel_id:
await self.event_manager.create_channel()
# Connect WebSocket
ws_url = self.event_manager._websocket_url
token = await self.event_manager.auth_manager.get_access_token()
async with websockets.connect(
f"{ws_url}&token={token}",
ping_interval=30,
ping_timeout=10,
) as ws:
logger.info("WebSocket connected")
async for message in ws:
await self._handle_message(message)
async def _reconnect(self) -> None:
"""Reconnect with exponential backoff."""
logger.info(f"Reconnecting in {self._reconnect_delay}s...")
await asyncio.sleep(self._reconnect_delay)
# Exponential backoff
self._reconnect_delay = min(
self._reconnect_delay * 2,
self._max_reconnect_delay
)
# Refresh channel and subscriptions
try:
await self.event_manager.refresh_subscriptions()
except Exception as e:
logger.error(f"Failed to refresh subscriptions: {e}")
async def _handle_message(self, message: str) -> None:
"""Handle incoming message."""
try:
event = json.loads(message)
await self.event_manager._dispatch(event)
except Exception as e:
logger.error(f"Error handling message: {e}")
- Rate Limits and Quotas \
9.1 GoToConnect Rate Limits \
| Endpoint Category | Limit | Window |
|---|---|---|
| Authentication | 10 | per minute |
| Call Control | 100 | per minute |
| Call Events | 100 | per minute |
| Lines/Users | 60 | per minute |
9.2 Rate Limit Handling \
import asyncio
from collections import defaultdict
from datetime import datetime, timedelta
class RateLimiter:
"""Token bucket rate limiter for GoToConnect API."""
def __init__(self):
self._buckets: dict[str, list[datetime]] = defaultdict(list)
self._limits = {
"auth": (10, 60), # 10 per minute
"call_control": (100, 60),
"call_events": (100, 60),
"users": (60, 60),
}
self._lock = asyncio.Lock()
async def acquire(self, category: str) -> None:
"""
Acquire permission to make a request.
Blocks if rate limit would be exceeded.
"""
async with self._lock:
limit, window = self._limits.get(category, (100, 60))
now = datetime.utcnow()
cutoff = now - timedelta(seconds=window)
# Clean old entries
self._buckets[category] = [
t for t in self._buckets[category]
if t > cutoff
]
# Check if we can proceed
if len(self._buckets[category]) >= limit:
# Calculate wait time
oldest = self._buckets[category][0]
wait_time = (oldest + timedelta(seconds=window) - now).total_seconds()
if wait_time > 0:
logger.warning(
f"Rate limit for {category}, waiting {wait_time:.1f}s"
)
await asyncio.sleep(wait_time)
# Record this request
self._buckets[category].append(now)
# Usage in client
class RateLimitedGoToClient:
def __init__(self, auth_manager: GoToAuthManager):
self.auth_manager = auth_manager
self.rate_limiter = RateLimiter()
async def initiate_call(self, *args, **kwargs) -> dict:
await self.rate_limiter.acquire("call_control")
return await self._initiate_call(*args, **kwargs)
9.3 Quota Monitoring \
class QuotaMonitor:
"""Monitor API usage and quotas."""
def __init__(self, metrics_client):
self.metrics = metrics_client
self._usage = defaultdict(int)
def record_request(self, category: str) -> None:
"""Record an API request."""
self._usage[category] += 1
self.metrics.increment(
"goto_api_requests_total",
tags={"category": category}
)
def record_rate_limit(self, category: str) -> None:
"""Record a rate limit hit."""
self.metrics.increment(
"goto_rate_limits_total",
tags={"category": category}
)
def get_usage_report(self) -> dict:
"""Get current usage statistics."""
return dict(self._usage)
- Security Considerations \
10.1 Credential Storage \
# Credentials should be stored in environment variables or secrets manager
environment:
GOTO_CLIENT_ID: "${GOTO_CLIENT_ID}"
GOTO_SERVICE_USERNAME: "${GOTO_SERVICE_USERNAME}"
GOTO_SERVICE_PASSWORD: "${GOTO_SERVICE_PASSWORD}"
# Never log credentials
logging:
filters:
- pattern: "password"
replacement: "[REDACTED]"
- pattern: "access_token"
replacement: "[TOKEN]"
10.2 Token Security \
class SecureTokenManager:
"""Secure handling of OAuth tokens."""
def __init__(self, encryption_key: bytes):
from cryptography.fernet import Fernet
self._fernet = Fernet(encryption_key)
def encrypt_token(self, token: str) -> bytes:
"""Encrypt a token for storage."""
return self._fernet.encrypt(token.encode())
def decrypt_token(self, encrypted: bytes) -> str:
"""Decrypt a stored token."""
return self._fernet.decrypt(encrypted).decode()
10.3 WebSocket Security \
# Always use WSS (WebSocket Secure)
# Validate message origins
# Implement message signing if needed
async def validate_event(self, event: dict) -> bool:
"""Validate an incoming event."""
# Check required fields
required = ["type", "timestamp", "data"]
if not all(k in event for k in required):
logger.warning(f"Invalid event structure: {event}")
return False
# Check timestamp is recent (prevent replay attacks)
timestamp = datetime.fromisoformat(event["timestamp"].replace("Z", "+00:00"))
age = datetime.now(timezone.utc) - timestamp
if age > timedelta(minutes=5):
logger.warning(f"Stale event: {age}")
return False
return True
10.4 Audit Logging \
class GoToAuditLogger:
"""Audit logging for GoToConnect operations."""
def __init__(self, logger):
self.logger = logger
def log_call_operation(
self,
operation: str,
call_id: str,
tenant_id: str,
details: dict = None
) -> None:
"""Log a call control operation."""
self.logger.info(
"goto_operation",
extra={
"operation": operation,
"call_id": call_id,
"tenant_id": tenant_id,
"details": details or {},
"timestamp": datetime.utcnow().isoformat(),
}
)
def log_auth_event(
self,
event: str,
success: bool,
details: dict = None
) -> None:
"""Log an authentication event."""
level = logging.INFO if success else logging.WARNING
self.logger.log(
level,
"goto_auth",
extra={
"event": event,
"success": success,
"details": details or {},
"timestamp": datetime.utcnow().isoformat(),
}
)
- Testing Strategy \
11.1 Mock Server \
# tests/mocks/goto_mock_server.py
from fastapi import FastAPI, HTTPException
from typing import Dict
app = FastAPI()
# In-memory state
calls: Dict[str, dict] = {}
channels: Dict[str, dict] = {}
@app.post("/oauth/token")
async def mock_token():
return {
"access_token": "mock_token_12345",
"token_type": "Bearer",
"expires_in": 3600,
"refresh_token": "mock_refresh_12345",
"account_key": "mock_account",
"organizer_key": "mock_organizer",
}
@app.post("/web-calls/v1/calls")
async def mock_create_call(request: dict):
call_id = f"call_{len(calls) + 1}"
call = {
"callId": call_id,
"state": "dialing",
"direction": "outbound",
"from": request.get("callerId"),
"to": request.get("dialString"),
"lineId": request.get("lineId"),
"sdpOffer": "v=0\r\nmock sdp offer\r\n",
"createdAt": datetime.utcnow().isoformat() + "Z",
}
calls[call_id] = call
return call
@app.post("/web-calls/v1/calls/{call_id}/answer")
async def mock_answer_call(call_id: str, request: dict):
if call_id not in calls:
raise HTTPException(404, "Call not found")
calls[call_id]["state"] = "connected"
calls[call_id]["sdpAnswer"] = request.get("sdpAnswer")
calls[call_id]["answeredAt"] = datetime.utcnow().isoformat() + "Z"
return calls[call_id]
@app.post("/web-calls/v1/calls/{call_id}/hangup")
async def mock_hangup(call_id: str):
if call_id not in calls:
raise HTTPException(404, "Call not found")
calls[call_id]["state"] = "ended"
calls[call_id]["endedAt"] = datetime.utcnow().isoformat() + "Z"
return calls[call_id]
# Add more mock endpoints as needed...
11.2 Integration Tests \
# tests/integration/test_goto_integration.py
import pytest
from httpx import AsyncClient
@pytest.fixture
async def goto_client():
"""Create a GoTo client for testing."""
auth_manager = GoToAuthManager(
client_id="test_client",
username="test_user",
password="test_pass",
)
return GoToCallControlClient(auth_manager)
@pytest.mark.integration
async def test_call_lifecycle(goto_client):
"""Test complete call lifecycle."""
# Initiate call
call = await goto_client.initiate_call(
dial_string="tel:+15551234567",
caller_id="+15559876543",
line_id="line_test",
)
assert call.state == CallState.DIALING
# Answer (simulated)
call = await goto_client.answer_call(
call.call_id,
sdp_answer="v=0\r\ntest answer\r\n",
)
assert call.state == CallState.CONNECTED
# Hold
call = await goto_client.hold(call.call_id)
assert call.state == CallState.HELD
# Resume
call = await goto_client.resume(call.call_id)
assert call.state == CallState.CONNECTED
# Hangup
call = await goto_client.hangup(call.call_id)
assert call.state == CallState.ENDED
@pytest.mark.integration
async def test_blind_transfer(goto_client):
"""Test blind transfer operation."""
# Create active call first
call = await goto_client.initiate_call(
dial_string="tel:+15551234567",
caller_id="+15559876543",
line_id="line_test",
)
# Transfer
result = await goto_client.blind_transfer(
call.call_id,
dial_string="ext:1001",
)
assert result.state == CallState.TRANSFERRED
@pytest.mark.integration
async def test_event_subscription(goto_client):
"""Test event subscription flow."""
event_manager = GoToEventManager(goto_client.auth_manager)
# Create channel
channel_id, ws_url = await event_manager.create_channel()
assert channel_id is not None
assert ws_url.startswith("wss://")
# Subscribe
sub_id = await event_manager.subscribe_to_calls(
event_types=["call.ringing", "call.connected"],
)
assert sub_id is not None
# Cleanup
await event_manager.unsubscribe(sub_id)
11.3 Unit Tests \
# tests/unit/test_goto_auth.py
import pytest
from unittest.mock import AsyncMock, patch
@pytest.mark.asyncio
async def test_token_caching():
"""Test that tokens are cached."""
auth = GoToAuthManager(
client_id="test",
username="test",
password="test",
)
with patch.object(auth, '_authenticate', new_callable=AsyncMock) as mock_auth:
mock_auth.return_value = None
auth._access_token = "cached_token"
auth._expires_at = datetime.utcnow() + timedelta(hours=1)
token = await auth.get_access_token()
assert token == "cached_token"
mock_auth.assert_not_called()
@pytest.mark.asyncio
async def test_token_refresh_on_expiry():
"""Test that expired tokens trigger refresh."""
auth = GoToAuthManager(
client_id="test",
username="test",
password="test",
)
auth._access_token = "old_token"
auth._refresh_token = "refresh_token"
auth._expires_at = datetime.utcnow() - timedelta(hours=1) # Expired
with patch.object(auth, '_refresh_access_token', new_callable=AsyncMock) as mock_refresh:
mock_refresh.return_value = None
auth._access_token = "new_token"
auth._expires_at = datetime.utcnow() + timedelta(hours=1)
token = await auth.get_access_token()
mock_refresh.assert_called_once()
- Implementation Guide \
12.1 Setup Checklist \
## GoToConnect Integration Setup
### Prerequisites
- [ ] GoTo Developer Portal account created
- [ ] OAuth application registered
- [ ] Service user created in GoToConnect Admin
- [ ] Required scopes granted to application
### Configuration
- [ ] Client ID stored in environment
- [ ] Service user credentials stored securely
- [ ] Line IDs mapped to tenants
- [ ] Phone numbers registered in database
### Integration
- [ ] Auth manager implemented and tested
- [ ] Call control client implemented
- [ ] Event subscription system implemented
- [ ] WebRTC bridge connected
### Monitoring
- [ ] API request metrics configured
- [ ] Error alerting set up
- [ ] Audit logging enabled
- [ ] Rate limit monitoring in place
### Testing
- [ ] Unit tests passing
- [ ] Integration tests passing
- [ ] End-to-end call test successful
- [ ] Transfer test successful
12.2 Configuration Template \
# config/gotoconnect.yaml
gotoconnect:
# Authentication
auth:
token_url: "https://authentication.logmeininc.com/oauth/token"
client_id: "${GOTO_CLIENT_ID}"
service_user: "${GOTO_SERVICE_USERNAME}"
service_password: "${GOTO_SERVICE_PASSWORD}"
# API endpoints
api:
web_calls_base: "https://api.goto.com/web-calls/v1"
call_events_base: "https://api.goto.com/call-events/v1"
users_base: "https://api.goto.com/users/v1"
# WebRTC settings
webrtc:
ice_servers:
- urls: "stun:stun.l.google.com:19302"
codec_preference:
- opus
- PCMU
- PCMA
# Event subscription
events:
event_types:
- "call.ringing"
- "call.connected"
- "call.ended"
- "call.held"
- "call.resumed"
- "call.transferred"
- "call.dtmf"
- "call.ice_candidate"
reconnect_delay_initial: 1
reconnect_delay_max: 300
# Timeouts
timeouts:
api_request: 30
websocket_ping: 30
call_setup: 30
# Rate limiting
rate_limits:
call_control_per_minute: 100
auth_per_minute: 10
12.3 Service Initialization \
# services/webrtc_bridge/main.py
import asyncio
from contextlib import asynccontextmanager
from integrations.gotoconnect.auth import GoToAuthManager
from integrations.gotoconnect.call_control import GoToCallControlClient
from integrations.gotoconnect.events import GoToEventManager, GoToEventListener
from config import settings
@asynccontextmanager
async def lifespan(app):
"""Application lifespan manager."""
# Initialize GoToConnect integration
auth_manager = GoToAuthManager(
client_id=settings.goto_client_id,
username=settings.goto_service_username,
password=settings.goto_service_password,
)
# Pre-authenticate
await auth_manager.get_access_token()
logger.info("GoToConnect authentication successful")
# Initialize call control client
call_client = GoToCallControlClient(auth_manager)
# Initialize event manager
event_manager = GoToEventManager(auth_manager)
await event_manager.create_channel()
await event_manager.subscribe_to_calls()
logger.info("GoToConnect event subscription active")
# Start event listener
event_listener = GoToEventListener(
event_manager._websocket_url,
auth_manager,
)
listener_task = asyncio.create_task(event_listener.start())
# Store in app state
app.state.goto_auth = auth_manager
app.state.goto_calls = call_client
app.state.goto_events = event_manager
yield
# Cleanup
await event_listener.stop()
listener_task.cancel()
logger.info("GoToConnect integration shut down")
app = FastAPI(lifespan=lifespan)
- Troubleshooting \
13.1 Common Issues \
Authentication Failures \
| Symptom | Possible Cause | Solution |
|---|---|---|
| 401 on token request | Invalid credentials | Verify client ID, username, password |
| 403 after authentication | Missing scopes | Check OAuth app scope configuration |
| Token expires immediately | Clock skew | Sync server time with NTP |
| Refresh token fails | Token revoked | Re-authenticate with password |
WebRTC Issues \
| Symptom | Possible Cause | Solution |
|---|---|---|
| No audio | ICE failure | Check firewall, TURN server |
| One-way audio | SDP mismatch | Verify codec compatibility |
| Audio drops | Network instability | Implement reconnection logic |
| Echo/feedback | Audio routing | Check duplex settings |
Event Subscription Issues \
| Symptom | Possible Cause | Solution |
|---|---|---|
| No events received | WebSocket disconnected | Check reconnection logic |
| Duplicate events | Multiple subscriptions | Deduplicate by event ID |
| Missing events | Subscription expired | Refresh subscriptions |
| Events delayed | Network latency | Monitor WebSocket health |
13.2 Diagnostic Commands \
async def diagnose_goto_connection() -> dict:
"""Run diagnostics on GoToConnect integration."""
results = {
"auth": None,
"api": None,
"websocket": None,
"lines": None,
}
# Test authentication
try:
token = await auth_manager.get_access_token()
results["auth"] = {"status": "ok", "token_length": len(token)}
except Exception as e:
results["auth"] = {"status": "error", "error": str(e)}
# Test API connectivity
try:
lines = await call_client.list_lines()
results["api"] = {"status": "ok", "line_count": len(lines)}
except Exception as e:
results["api"] = {"status": "error", "error": str(e)}
# Test WebSocket
try:
channel_id, ws_url = await event_manager.create_channel()
results["websocket"] = {
"status": "ok",
"channel_id": channel_id,
}
except Exception as e:
results["websocket"] = {"status": "error", "error": str(e)}
# List available lines
results["lines"] = lines if results["api"]["status"] == "ok" else []
return results
13.3 Debug Logging \
# Enable debug logging for GoToConnect integration
import logging
# Set specific loggers to DEBUG
logging.getLogger("integrations.gotoconnect").setLevel(logging.DEBUG)
logging.getLogger("httpx").setLevel(logging.DEBUG)
logging.getLogger("websockets").setLevel(logging.DEBUG)
# Example debug output
# DEBUG:integrations.gotoconnect.auth:Requesting new access token
# DEBUG:httpx:POST https://authentication.logmeininc.com/oauth/token
# DEBUG:httpx:Response: 200 OK
# DEBUG:integrations.gotoconnect.auth:Token expires at 2026-01-16T11:30:00Z
# DEBUG:integrations.gotoconnect.events:WebSocket connected to wss://realtime.goto.com/...
# DEBUG:integrations.gotoconnect.events:Received event: call.ringing
- API Reference Summary \
14.1 Authentication API \
| Method | Endpoint | Description |
|---|---|---|
| POST | /oauth/token | Obtain access token |
14.2 Web Calls API \
| Method | Endpoint | Description |
|---|---|---|
| POST | /calls | Initiate outbound call |
| POST | /calls/{id}/answer | Answer inbound call |
| POST | /calls/{id}/hangup | End call |
| PUT | /calls/{id}/hold | Place on hold |
| PUT | /calls/{id}/resume | Resume from hold |
| PUT | /calls/{id}/mute | Mute microphone |
| PUT | /calls/{id}/unmute | Unmute microphone |
| POST | /calls/{id}/dtmf | Send DTMF tones |
| POST | /calls/{id}/blind-transfer | Blind transfer |
| POST | /calls/{id}/warm-transfer | Warm transfer |
| POST | /calls/{id}/merge | Merge into conference |
| POST | /calls/{id}/ice-candidates | Send ICE candidate |
14.3 Call Events API \
| Method | Endpoint | Description |
|---|---|---|
| POST | /notifications/channels | Create event channel |
| POST | /notifications/subscriptions | Subscribe to events |
| DELETE | /notifications/subscriptions/{id} | Unsubscribe |
14.4 Users/Lines API \
| Method | Endpoint | Description |
|---|---|---|
| GET | /accounts/{id}/lines | List lines |
| GET | /accounts/{id}/lines/{lineId} | Get line details |
| GET | /accounts/{id}/users | List users |
## Appendix A: SDP Templates {#appendix-a:-sdp-templates}
### A.1 Minimal SDP Offer {#a.1-minimal-sdp-offer}
v=0
o=- 1234567890 2 IN IP4 0.0.0.0
s=-
t=0 0
a=group:BUNDLE audio
m=audio 9 UDP/TLS/RTP/SAVPF 111
c=IN IP4 0.0.0.0
a=rtcp:9 IN IP4 0.0.0.0
a=ice-ufrag:xxxx
a=ice-pwd:xxxxxxxxxxxxxxxxxxxxxxxx
a=fingerprint:sha-256 XX:XX:XX:...
a=setup:actpass
a=mid:audio
a=sendrecv
a=rtcp-mux
a=rtpmap:111 opus/48000/2
a=fmtp:111 minptime=10;useinbandfec=1
A.2 Full SDP Offer (GoToConnect) \
See Section 4.3.1 for complete example.## Appendix B: Event Schemas {#appendix-b:-event-schemas}
### B.1 Common Event Structure {#b.1-common-event-structure}
{
"type": "string",
"timestamp": "ISO 8601 datetime",
"data": {
"callId": "string",
// Additional fields per event type
}
}
B.2 Event Type Reference \
See Section 6.5 for complete event schemas.Document History \
| Version | Date | Author | Changes |
|---|---|---|---|
| 1.0 | 2026-01-16 | Claude | Initial document |
End of Document