Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions packages/transport-webrtc/src/private-to-private/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ export interface ReadCandidatesOptions extends AbortOptions, LoggerOptions, Prog
}

export const readCandidatesUntilConnected = async (pc: RTCPeerConnection, stream: MessageStream<Message, Stream>, options: ReadCandidatesOptions): Promise<void> => {
try {
const connectedPromise = Promise.withResolvers<void>()
resolveOnConnected(pc, connectedPromise)
const connectedPromise = Promise.withResolvers<void>()
resolveOnConnected(pc, connectedPromise)

try {
// read candidates until we are connected or we reach the end of the stream
while (true) {
// if we connect, stop trying to read from the stream
Expand Down Expand Up @@ -66,10 +66,19 @@ export const readCandidatesUntilConnected = async (pc: RTCPeerConnection, stream
}
}
} catch (err) {
options.log.error('%s error parsing ICE candidate - %e', options.direction, err)

if (options.signal?.aborted === true && pc.connectionState !== 'connected') {
throw err
options.log.error('%s error reading ICE candidates - %e', options.direction, err)

// If the peer connection is not connected, the error may still be
// recoverable — the signaling stream can close just before the
// connectionstatechange event fires. Wait for the connected promise to
// settle: if the PC connects we can ignore the stream error; if it fails
// or was never going to connect, re-throw.
if (pc.connectionState !== 'connected') {
try {
await connectedPromise.promise
} catch {
throw err
}
}
}
}
Expand Down
118 changes: 118 additions & 0 deletions packages/transport-webrtc/test/maconn.spec.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
/* eslint-disable @typescript-eslint/no-unused-expressions */

import { ConnectionFailedError } from '@libp2p/interface'
import { defaultLogger } from '@libp2p/logger'
import { pbStream, streamPair } from '@libp2p/utils'
import { multiaddr } from '@multiformats/multiaddr'
import { expect } from 'aegir/chai'
import { stubObject } from 'sinon-ts'
import { Message } from '../src/private-to-private/pb/message.js'
import { readCandidatesUntilConnected } from '../src/private-to-private/util.js'
import { toMultiaddrConnection } from '../src/rtcpeerconnection-to-conn.ts'
import { RTCPeerConnection } from '../src/webrtc/index.js'
import type { CounterGroup } from '@libp2p/interface'
Expand Down Expand Up @@ -34,3 +38,117 @@ describe('Multiaddr Connection', () => {
expect(metrics.increment.calledWith({ close: true })).to.be.true
})
})

describe('readCandidatesUntilConnected', () => {
it('throws ConnectionFailedError when the peer connection enters failed state', async () => {
const [localStream, remoteStream] = await streamPair()

const pc: any = {
connectionState: 'checking' as RTCPeerConnectionState,
onconnectionstatechange: null as any
}

// Schedule ICE failure after a short delay
setTimeout(() => {
pc.connectionState = 'failed'
pc.onconnectionstatechange?.(new Event('connectionstatechange'))
}, 50)

const messageStream = pbStream(localStream).pb(Message)

await expect(
readCandidatesUntilConnected(pc, messageStream, {
direction: 'initiator',
signal: AbortSignal.timeout(5000),
log: defaultLogger().forComponent('test:webrtc')
})
).to.eventually.be.rejectedWith(ConnectionFailedError)

await remoteStream.close()
})

it('does not fail when peer connection is temporarily disconnected then recovers to connected', async () => {
const [localStream, remoteStream] = await streamPair()

const pc: any = {
connectionState: 'checking' as RTCPeerConnectionState,
onconnectionstatechange: null as any
}

// ICE briefly goes disconnected at t=30ms, then recovers at t=60ms
setTimeout(() => {
pc.connectionState = 'disconnected'
pc.onconnectionstatechange?.(new Event('connectionstatechange'))
}, 30)

setTimeout(() => {
pc.connectionState = 'connected'
pc.onconnectionstatechange?.(new Event('connectionstatechange'))
}, 60)

const messageStream = pbStream(localStream).pb(Message)

await expect(
readCandidatesUntilConnected(pc, messageStream, {
direction: 'recipient',
signal: AbortSignal.timeout(5000),
log: defaultLogger().forComponent('test:webrtc')
})
).to.eventually.be.undefined

await remoteStream.close()
})

it('throws ConnectionFailedError when peer connection goes disconnected then failed', async () => {
const [localStream, remoteStream] = await streamPair()

const pc: any = {
connectionState: 'checking' as RTCPeerConnectionState,
onconnectionstatechange: null as any
}

// ICE goes disconnected at t=30ms, then fails at t=60ms
setTimeout(() => {
pc.connectionState = 'disconnected'
pc.onconnectionstatechange?.(new Event('connectionstatechange'))
}, 30)

setTimeout(() => {
pc.connectionState = 'failed'
pc.onconnectionstatechange?.(new Event('connectionstatechange'))
}, 60)

const messageStream = pbStream(localStream).pb(Message)

await expect(
readCandidatesUntilConnected(pc, messageStream, {
direction: 'recipient',
signal: AbortSignal.timeout(5000),
log: defaultLogger().forComponent('test:webrtc')
})
).to.eventually.be.rejectedWith(ConnectionFailedError)

await remoteStream.close()
})

it('returns without error when peer connection reaches connected state', async () => {
const [localStream, remoteStream] = await streamPair()

const pc: any = {
connectionState: 'connected' as RTCPeerConnectionState,
onconnectionstatechange: null as any
}

const messageStream = pbStream(localStream).pb(Message)

void remoteStream.close()

await expect(
readCandidatesUntilConnected(pc, messageStream, {
direction: 'initiator',
signal: AbortSignal.timeout(5000),
log: defaultLogger().forComponent('test:webrtc')
})
).to.eventually.be.undefined
})
})
Loading