Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions packages/utils/src/stream-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ export function byteStream <T extends MessageStream> (stream: T, opts?: ByteStre
const maxBufferSize = opts?.maxBufferSize ?? DEFAULT_MAX_BUFFER_SIZE
const readBuffer = new Uint8ArrayList()

let hasBytes = Promise.withResolvers<void>()
let hasBytes: PromiseWithResolvers<void> | undefined
let unwrapped = false

if (!isValid(stream)) {
Expand All @@ -129,24 +129,24 @@ export function byteStream <T extends MessageStream> (stream: T, opts?: ByteStre
if (readBuffer.byteLength > maxBufferSize) {
const readBufferSize = readBuffer.byteLength
readBuffer.consume(readBuffer.byteLength)
hasBytes.reject(new Error(`Read buffer overflow - ${readBufferSize} > ${maxBufferSize}`))
hasBytes?.reject(new Error(`Read buffer overflow - ${readBufferSize} > ${maxBufferSize}`))
}

hasBytes.resolve()
hasBytes?.resolve()
}
stream.addEventListener('message', byteStreamOnMessageListener)

const byteStreamOnCloseListener = (evt: StreamCloseEvent): void => {
if (evt.error != null) {
hasBytes.reject(evt.error)
hasBytes?.reject(evt.error)
} else {
hasBytes.resolve()
hasBytes?.resolve()
}
}
stream.addEventListener('close', byteStreamOnCloseListener)

const byteStreamOnRemoteCloseWrite = (): void => {
hasBytes.resolve()
hasBytes?.resolve()
}
stream.addEventListener('remoteCloseWrite', byteStreamOnRemoteCloseWrite)

Expand All @@ -171,6 +171,7 @@ export function byteStream <T extends MessageStream> (stream: T, opts?: ByteStre
}

const bytesToRead = options?.bytes ?? 1
hasBytes = Promise.withResolvers<void>()

while (true) {
if (readBuffer.byteLength >= bytesToRead) {
Expand Down
Loading