Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/tender-falcons-think.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@signalwire/webrtc': patch
---

refactored the early invites implementation to make it more stable.
188 changes: 52 additions & 136 deletions packages/webrtc/src/RTCPeer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
} from './utils'
import { watchRTCPeerMediaPackets } from './utils/watchRTCPeerMediaPackets'
import { connectionPoolManager } from './connectionPoolManager'

const RESUME_TIMEOUT = 12_000

export default class RTCPeer<EventTypes extends EventEmitter.ValidEventTypes> {
Expand All @@ -28,18 +29,15 @@ export default class RTCPeer<EventTypes extends EventEmitter.ValidEventTypes> {
public instance: RTCPeerConnection

private _iceTimeout: any
private _iceGatheringTimeout: any
private _negotiating = false
private _processingRemoteSDP = false
private _restartingIce = false
private _watchMediaPacketsTimer: ReturnType<typeof setTimeout>
private _connectionStateTimer: ReturnType<typeof setTimeout>
private _resumeTimer?: ReturnType<typeof setTimeout>
private _mediaWatcher: ReturnType<typeof watchRTCPeerMediaPackets>
private _candidatesSnapshot: RTCIceCandidate[] = []
private _allCandidates: RTCIceCandidate[] = []
private _processingLocalSDP = false
private _waitNegotiation: Promise<void> = Promise.resolve()
private _waitNegotiationCompleter: () => void
/**
* Both of these properties are used to have granular
* control over when to `resolve` and when `reject` the
Expand Down Expand Up @@ -202,11 +200,9 @@ export default class RTCPeer<EventTypes extends EventEmitter.ValidEventTypes> {
private _negotiationCompleted(error?: unknown) {
if (!error) {
this._resolveStartMethod()
this._waitNegotiationCompleter?.()
this._pendingNegotiationPromise?.resolve()
} else {
this._rejectStartMethod(error)
this._waitNegotiationCompleter?.()
this._pendingNegotiationPromise?.reject(error)
}
}
Expand Down Expand Up @@ -455,6 +451,7 @@ export default class RTCPeer<EventTypes extends EventEmitter.ValidEventTypes> {
return this.logger.warn('Skip twice onnegotiationneeded!')
}
this._negotiating = true

try {
/**
* additionalDevice and screenShare are `sendonly`
Expand Down Expand Up @@ -507,11 +504,6 @@ export default class RTCPeer<EventTypes extends EventEmitter.ValidEventTypes> {
}

this.logger.info('iceGatheringState', this.instance.iceGatheringState)
if (this.instance.iceGatheringState === 'gathering') {
this._iceTimeout = setTimeout(() => {
this._onIceTimeout()
}, this.options.maxIceGatheringTimeout)
}
} catch (error) {
this.logger.error(`Error creating ${this.type}:`, error)
this._negotiationCompleted(error)
Expand Down Expand Up @@ -921,20 +913,13 @@ export default class RTCPeer<EventTypes extends EventEmitter.ValidEventTypes> {
}

try {
const isAllowedToSendLocalSDP = await this._isAllowedToSendLocalSDP()
if (!isAllowedToSendLocalSDP) {
this.logger.info('Skipping onLocalSDPReady due to early invite')
this._processingLocalSDP = false
return
}

this._waitNegotiation = new Promise((resolve) => {
this._waitNegotiationCompleter = resolve
})

await this.call.onLocalSDPReady(this)
this._processingLocalSDP = false
if (this.isAnswer) {
this.logger.debug('Setting negotiating false for inbound calls')
this._negotiating = false
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the inbound case, the negotiation ends after we end the verto.answer

this._restartingIce = false
this.resetNeedResume()
this._negotiationCompleted()
}
} catch (error) {
Expand All @@ -943,25 +928,6 @@ export default class RTCPeer<EventTypes extends EventEmitter.ValidEventTypes> {
}
}

/**
* Waits for the pending negotiation promise to resolve
* and checks if the current signaling state allows to send a local SDP.
* This is used to prevent sending an offer when the signaling state is not appropriate.
* or when still waiting for a previous negotiation to complete.
*/
private async _isAllowedToSendLocalSDP() {
await this._waitNegotiation

// Check if signalingState have the right state to sand an offer
return (
(this.type === 'offer' &&
['have-local-offer', 'have-local-pranswer'].includes(
this.instance.signalingState
)) ||
(this.type === 'answer' && this.instance.signalingState === 'stable')
)
}

private _sdpIsValid() {
if (this.localSdp && this.hasIceServers) {
return sdpHasValidCandidates(this.localSdp)
Expand All @@ -977,6 +943,8 @@ export default class RTCPeer<EventTypes extends EventEmitter.ValidEventTypes> {
}

private _onIceTimeout() {
this.instance.removeEventListener('icecandidate', this._onIce)

if (this._sdpIsValid()) {
this._sdpReady()
return
Expand All @@ -1002,94 +970,29 @@ export default class RTCPeer<EventTypes extends EventEmitter.ValidEventTypes> {
}

private _onIce(event: RTCPeerConnectionIceEvent) {
/**
* Clear _iceTimeout on each single candidate
*/
// Clear _iceTimeout on each single candidate
if (this._iceTimeout) {
clearTimeout(this._iceTimeout)
}

// Add new _newTimeout for next candidate
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the primary change introduced by the new "Early Invite" implementation, before the implementation forced to send of the SDP after the first non-host candidate was gathered. Now, as long as we are still gathering new candidates "fast enough," we wait. This leads to a more reliable approach while still allowing for an "early invite" in cases of slow candidate gathering.

this._iceTimeout = setTimeout(() => {
this._onIceTimeout()
}, this.options.iceGatheringTimeout)

/**
* Following spec: no candidate means the gathering is completed.
*/
if (!event.candidate) {
this.instance.removeEventListener('icecandidate', this._onIce)
// not call _sdpReady if an early invite has been sent
if (this._candidatesSnapshot.length > 0) {
this.logger.debug('No more candidates, calling _sdpReady')
this._sdpReady()
}
return
}

// Store all candidates
this._allCandidates.push(event.candidate)

this.logger.debug('RTCPeer Candidate:', event.candidate)
if (event.candidate.type === 'host') {
/**
* With `host` candidate set timeout to
* maxIceGatheringTimeout and then invoke
* _onIceTimeout to check if the SDP is valid
*/
this._iceTimeout = setTimeout(() => {
this.instance.removeEventListener('icecandidate', this._onIce)
this._onIceTimeout()
}, this.options.maxIceGatheringTimeout)
} else {
/**
* With non-HOST candidate (srflx, prflx or relay), check if we have
* candidates for all media sections to support early invite
*/
if (this.instance.localDescription?.sdp) {
if (sdpHasValidCandidates(this.instance.localDescription.sdp)) {
// Take a snapshot of candidates at this point
if (this._candidatesSnapshot.length === 0 && this.type === 'offer') {
this._candidatesSnapshot = [...this._allCandidates]
this.logger.info(
'SDP has candidates for all media sections, calling _sdpReady for early invite'
)
setTimeout(() => this._sdpReady(), 0) // Defer to allow any pending operations to complete
}
} else {
this.logger.info(
'SDP does not have candidates for all media sections, waiting for more candidates'
)
this.logger.debug(this.instance.localDescription?.sdp)
}
}
}
}

private _retryWithMoreCandidates() {
// Check if we have better candidates now than when we first sent SDP
const hasMoreCandidates = this._hasMoreCandidates()
this.logger.debug('No more candidates, calling _sdpReady')
this._sdpReady()

if (hasMoreCandidates && this.instance.connectionState !== 'connected') {
this.logger.info(
'More candidates found after ICE gathering complete, triggering renegotiation'
)
// Reset negotiation state to allow new negotiation
this._negotiating = false
this._candidatesSnapshot = []
this._allCandidates = []

// set the SDP type to 'offer' since the client is initiating a new negotiation
this.type = 'offer'
// Start negotiation with force=true
if (this.instance.signalingState === 'stable') {
this.startNegotiation(true)
} else {
this.logger.warn(
'Signaling state is not stable, cannot start negotiation immediately'
)
this.restartIce()
}
return
}
}

private _hasMoreCandidates(): boolean {
return this._allCandidates.length > this._candidatesSnapshot.length
this.logger.debug('RTCPeer Candidate:', event.candidate)
}

private _setLocalDescription(localDescription: RTCSessionDescriptionInit) {
Expand All @@ -1115,12 +1018,6 @@ export default class RTCPeer<EventTypes extends EventEmitter.ValidEventTypes> {
googleStartBitrate
)
}
// this.logger.debug(
// 'LOCAL SDP \n',
// `Type: ${localDescription.type}`,
// '\n\n',
// localDescription.sdp
// )
return this.instance.setLocalDescription(localDescription)
}

Expand Down Expand Up @@ -1161,9 +1058,14 @@ export default class RTCPeer<EventTypes extends EventEmitter.ValidEventTypes> {
case 'stable':
// Workaround to skip nested negotiations
// Chrome bug: https://bugs.chromium.org/p/chromium/issues/detail?id=740501
this._negotiating = false
this._restartingIce = false
this.resetNeedResume()

if (this.isOffer) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Being stable is not enough to consider the negotiating done for the inbound case.

// only when it's an offer that means the negotiation is done
this.logger.debug('Setting negotiating false for outbound calls')
this._negotiating = false
this._restartingIce = false
this.resetNeedResume()
}

if (this.instance.connectionState === 'connected') {
// An ice restart won't change the connectionState so we emit the same event in here
Expand Down Expand Up @@ -1194,14 +1096,6 @@ export default class RTCPeer<EventTypes extends EventEmitter.ValidEventTypes> {
// case 'new':
// break
case 'connecting':
this._connectionStateTimer = setTimeout(() => {
this.logger.warn('connectionState timed out')
if (this._hasMoreCandidates()) {
this._retryWithMoreCandidates()
} else {
this.restartIceWithRelayOnly()
}
}, this.options.maxConnectionStateTimeout)
break
case 'connected':
this.clearConnectionStateTimer()
Expand Down Expand Up @@ -1230,14 +1124,31 @@ export default class RTCPeer<EventTypes extends EventEmitter.ValidEventTypes> {

this.instance.addEventListener('icegatheringstatechange', () => {
this.logger.debug('iceGatheringState:', this.instance.iceGatheringState)
if (this.instance.iceGatheringState === 'complete') {
this.logger.debug('ICE gathering complete')
void this._sdpReady()
switch (this.instance.iceGatheringState) {
case 'gathering':
this._iceGatheringTimeout = setTimeout(() => {
this._onIceTimeout()
}, this.options.maxIceGatheringTimeout)
break
case 'complete':
this.clearIceGatheringTimer()

// start connectionState timer after the gathering is complete
this._connectionStateTimer = setTimeout(() => {
this.logger.warn('connectionState timed out')
this.restartIceWithRelayOnly()
}, this.options.maxConnectionStateTimeout)

this.logger.debug('ICE gathering complete')
void this._sdpReady()
break
}
})

// this.instance.addEventListener('icecandidateerror', (event) => {
// this.logger.warn('IceCandidate Error:', event)
// this.clearTimers()
// this._forceNegotiation()
// })

this.instance.addEventListener('track', (event: RTCTrackEvent) => {
Expand Down Expand Up @@ -1265,9 +1176,14 @@ export default class RTCPeer<EventTypes extends EventEmitter.ValidEventTypes> {
private clearTimers() {
this.clearResumeTimer()
this.clearWatchMediaPacketsTimer()
this.clearIceGatheringTimer()
this.clearConnectionStateTimer()
}

clearIceGatheringTimer() {
clearTimeout(this._iceGatheringTimeout)
}

private clearConnectionStateTimer() {
clearTimeout(this._connectionStateTimer)
}
Expand Down
6 changes: 1 addition & 5 deletions packages/webrtc/src/RTCPeerConnectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ export class RTCPeerConnectionManager {
return null
}


/**
* Clean up the manager and all connections
*/
Expand Down Expand Up @@ -185,10 +184,7 @@ export class RTCPeerConnectionManager {
}

this.logger.debug(`Pooled connection ${id} created successfully`)
this.logger.debug(
`ICE candidates gathered for connection ${id}:`,
pc.localDescription?.sdp
)

return pooledConnection
} catch (error) {
this.logger.error('Failed to create pooled connection:', error)
Expand Down
4 changes: 2 additions & 2 deletions packages/webrtc/src/utils/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ export const DEFAULT_CALL_OPTIONS: ConnectionOptions = {
userVariables: {},
requestTimeout: 10 * 1000,
autoApplyMediaParams: true,
iceGatheringTimeout: 2 * 1000,
iceGatheringTimeout: 3 * 100,
maxIceGatheringTimeout: 5 * 1000,
maxConnectionStateTimeout: 3 * 1000,
maxConnectionStateTimeout: 15 * 1000,
watchMediaPackets: true,
watchMediaPacketsTimeout: 2 * 1000,
}
Loading