diff --git a/benchmarks/_util/index.js b/benchmarks/_util/index.js index 75f903530ca..4b14d843c35 100644 --- a/benchmarks/_util/index.js +++ b/benchmarks/_util/index.js @@ -50,4 +50,20 @@ function printResults (results) { return console.table(rows) } -module.exports = { makeParallelRequests, printResults } +/** + * @param {number} num + * @returns {string} + */ +function formatBytes (num) { + if (!Number.isFinite(num)) { + throw new Error('invalid number') + } + + const prefixes = ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB'] + + const idx = Math.min(Math.floor(Math.log(num) / Math.log(1024)), prefixes.length - 1) + + return `${(num / Math.pow(1024, idx)).toFixed(2)}${prefixes[idx]}` +} + +module.exports = { makeParallelRequests, printResults, formatBytes } diff --git a/benchmarks/_util/runner.js b/benchmarks/_util/runner.js new file mode 100644 index 00000000000..8bc72f167e6 --- /dev/null +++ b/benchmarks/_util/runner.js @@ -0,0 +1,141 @@ +// @ts-check + +'use strict' + +class Info { + /** @type {string} */ + #name + /** @type {bigint} */ + #current + /** @type {bigint} */ + #finish + /** @type {(...args: any[]) => any} */ + #callback + /** @type {boolean} */ + #finalized = false + + /** + * @param {string} name + * @param {(...args: any[]) => any} callback + */ + constructor (name, callback) { + this.#name = name + this.#callback = callback + } + + get name () { + return this.#name + } + + start () { + if (this.#finalized) { + throw new TypeError('called after finished.') + } + this.#current = process.hrtime.bigint() + } + + end () { + if (this.#finalized) { + throw new TypeError('called after finished.') + } + this.#finish = process.hrtime.bigint() + this.#finalized = true + this.#callback() + } + + diff () { + return Number(this.#finish - this.#current) + } +} + +/** + * @typedef BenchMarkHandler + * @type {(ev: { name: string; start(): void; end(): void; }) => any} + */ + +/** + * @param {Record} experiments + * @param {{ minSamples?: number; maxSamples?: number }} [options] + * @returns {Promise<{ name: string; average: number; samples: number; fn: BenchMarkHandler; iterationPerSecond: number; min: number; max: number }[]>} + */ +async function bench (experiments, options = {}) { + const names = Object.keys(experiments) + + /** @type {{ name: string; average: number; samples: number; fn: BenchMarkHandler; iterationPerSecond: number; min: number; max: number }[]} */ + const results = [] + + async function waitMaybePromiseLike (p) { + if ( + (typeof p === 'object' || typeof p === 'function') && + p !== null && + typeof p.then === 'function' + ) { + await p + } + } + + for (let i = 0; i < names.length; ++i) { + const name = names[i] + const fn = experiments[name] + const samples = [] + + for (let i = 0; i < 8; ++i) { + // warmup + await new Promise((resolve, reject) => { + const info = new Info(name, resolve) + + try { + const p = fn(info) + + waitMaybePromiseLike(p).catch((err) => reject(err)) + } catch (err) { + reject(err) + } + }) + } + + let timing = 0 + const minSamples = options.minSamples ?? 128 + + for (let j = 0; (j < minSamples || timing < 800_000_000) && (typeof options.maxSamples === 'number' ? options.maxSamples > j : true); ++j) { + let resolve = (value) => {} + let reject = (reason) => {} + const promise = new Promise( + (_resolve, _reject) => { resolve = _resolve; reject = _reject } + ) + + const info = new Info(name, resolve) + + try { + const p = fn(info) + + await waitMaybePromiseLike(p) + } catch (err) { + reject(err) + } + + await promise + + samples.push({ time: info.diff() }) + + timing += info.diff() + } + + const average = + samples.map((v) => v.time).reduce((a, b) => a + b, 0) / samples.length + + results.push({ + name: names[i], + average, + samples: samples.length, + fn, + iterationPerSecond: 1e9 / average, + min: samples.reduce((a, acc) => Math.min(a, acc.time), samples[0].time), + max: samples.reduce((a, acc) => Math.max(a, acc.time), samples[0].time) + }) + } + + return results +} + +module.exports = { bench } diff --git a/benchmarks/package.json b/benchmarks/package.json index 371e2d24ff4..4885423c96a 100644 --- a/benchmarks/package.json +++ b/benchmarks/package.json @@ -21,6 +21,7 @@ "node-fetch": "^3.3.2", "request": "^2.88.2", "superagent": "^10.0.0", - "wait-on": "^8.0.0" + "wait-on": "^8.0.0", + "uWebSockets.js": "uNetworking/uWebSockets.js#v20.52.0" } } diff --git a/benchmarks/websocket-benchmark.mjs b/benchmarks/websocket-benchmark.mjs new file mode 100644 index 00000000000..a590a66d415 --- /dev/null +++ b/benchmarks/websocket-benchmark.mjs @@ -0,0 +1,208 @@ +// @ts-check + +import { bench } from './_util/runner.js' +import { formatBytes } from './_util/index.js' +import { WebSocket, WebSocketStream } from '../index.js' +import { WebSocket as WsWebSocket } from 'ws' + +/** + * @type {Record import('./_util/runner.js').BenchMarkHandler; connect: (url: string) => Promise; binaries: (string | Uint8Array)[] }>} + */ +const experiments = {} +/** + * @type {Record} + */ +const experimentsInfo = {} + +/** + * @type {any[]} + */ +const connections = [] + +const binary = Buffer.alloc(256 * 1024, '_') +const binaries = [binary, binary.subarray(0, 256 * 1024).toString('utf-8')] + +experiments['undici'] = { + fn: (ws, binary) => { + if (!(ws instanceof WebSocket)) { + throw new Error("'undici' websocket are expected.") + } + + return (ev) => { + ws.addEventListener( + 'message', + () => { + ev.end() + }, + { once: true } + ) + + ev.start() + ws.send(binary) + } + }, + + connect: async (url) => { + const ws = new WebSocket(url) + + await /** @type {Promise} */ ( + new Promise((resolve, reject) => { + function onOpen () { + resolve() + ws.removeEventListener('open', onOpen) + ws.removeEventListener('error', onError) + } + function onError (err) { + reject(err) + ws.removeEventListener('open', onOpen) + ws.removeEventListener('error', onError) + } + ws.addEventListener('open', onOpen) + ws.addEventListener('error', onError) + }) + ) + + // avoid create blob + ws.binaryType = 'arraybuffer' + + return ws + }, + + binaries +} + +experiments['undici - stream'] = { + fn: (ws, binary) => { + /** @type {ReadableStreamDefaultReader} */ + const reader = ws.reader + /** @type {WritableStreamDefaultWriter} */ + const writer = ws.writer + + return async (ev) => { + ev.start() + await writer.write(binary) + await reader.read() + ev.end() + } + }, + + connect: async (url) => { + const ws = new WebSocketStream(url) + + const { readable, writable } = await ws.opened + const reader = readable.getReader() + const writer = writable.getWriter() + + // @ts-ignore + return { reader, writer, close: () => ws.close() } + }, + + binaries +} + +experiments['ws'] = { + fn: (ws, binary) => { + if (!(ws instanceof WsWebSocket)) { + throw new Error("'ws' websocket are expected.") + } + + return (ev) => { + ws.once('message', () => { + ev.end() + }) + ev.start() + ws.send(binary) + } + }, + + connect: async (url) => { + const ws = new WsWebSocket(url, { maxPayload: 1024 * 1024 * 1024 }) + + await /** @type {Promise} */ ( + new Promise((resolve, reject) => { + function onOpen () { + resolve() + ws.off('open', onOpen) + ws.off('error', onError) + } + function onError (err) { + reject(err) + ws.off('open', onOpen) + ws.off('error', onError) + } + ws.on('open', onOpen) + ws.on('error', onError) + }) + ) + + ws.binaryType = 'arraybuffer' + + return ws + }, + + binaries +} + +async function init () { + /** @type {Record} */ + const round = {} + + const keys = Object.keys(experiments) + + for (let i = 0; i < keys.length; ++i) { + const name = keys[i] + + const { fn, connect, binaries } = experiments[name] + + const ws = await connect('ws://localhost:8080') + + const needShowBytes = binaries.length !== 2 || typeof binaries[0] === typeof binaries[1] + for (let i = 0; i < binaries.length; ++i) { + const binary = binaries[i] + const bytes = Buffer.byteLength(binary) + + const binaryType = typeof binary === 'string' ? 'string' : 'binary' + const roundName = needShowBytes + ? `${name} [${formatBytes(bytes)} (${binaryType})]` + : `${name} [${binaryType}]` + + round[roundName] = fn(ws, binary) + experimentsInfo[roundName] = { bytes, binaryType } + } + + connections.push(ws) + } + + return round +} + +init() + .then((round) => bench(round, { + minSamples: 2048 + })) + .then((results) => { + print(results) + + for (const ws of connections) { + ws.close() + } + }, (err) => { + process.nextTick((err) => { + throw err + }, err) + }) + +/** + * @param {{ name: string; average: number; iterationPerSecond: number; }[]} results + */ +function print (results) { + for (const { name, average, iterationPerSecond } of results) { + const { bytes } = experimentsInfo[name] + + console.log( + `${name}: transferred ${formatBytes((bytes / average) * 1e9)} Bytes/s (${iterationPerSecond.toFixed(4)} per/sec)` + ) + } +} + +export {} diff --git a/benchmarks/websocket-echo-server.mjs b/benchmarks/websocket-echo-server.mjs new file mode 100644 index 00000000000..130c49fd0d0 --- /dev/null +++ b/benchmarks/websocket-echo-server.mjs @@ -0,0 +1,46 @@ +import { Worker, isMainThread, parentPort, threadId } from 'node:worker_threads' +import { cpus } from 'node:os' +import url from 'node:url' +import uws from 'uWebSockets.js' + +const __filename = url.fileURLToPath(import.meta.url) + +const app = uws.App() + +if (isMainThread) { + for (let i = cpus().length - 1; i >= 0; --i) { + new Worker(__filename).on('message', (workerAppDescriptor) => { + app.addChildAppDescriptor(workerAppDescriptor) + }) + } +} else { + app + .ws('/*', { + compression: uws.DISABLED, + maxPayloadLength: 1024 * 1024 * 1024, + maxBackpressure: 1 * 1024 * 1024, + idleTimeout: 60, + message: (ws, message, isBinary) => { + /* Here we echo the message back, using compression if available */ + const ok = ws.send(message, isBinary) // eslint-disable-line + } + }) + .get('/*', (res, req) => { + /* It does Http as well */ + res + .writeStatus('200 OK') + .end('Hello there!') + }) + + parentPort.postMessage(app.getDescriptor()) +} + +app.listen(8080, (listenSocket) => { + if (listenSocket) { + if (threadId === 0) { + console.log('Listening to port 8080') + } else { + console.log(`Listening to port 8080 from thread ${threadId}`) + } + } +}) diff --git a/benchmarks/websocket/generate-mask.mjs b/benchmarks/websocket/generate-mask.mjs index 032f05d8b99..61bc5495daf 100644 --- a/benchmarks/websocket/generate-mask.mjs +++ b/benchmarks/websocket/generate-mask.mjs @@ -1,20 +1,8 @@ -import { randomFillSync, randomBytes } from 'node:crypto' -import { bench, group, run } from 'mitata' +import { randomBytes } from 'node:crypto' +import { bench, summary, run } from 'mitata' +import { generateMask } from '../../lib/web/websocket/frame.js' -const BUFFER_SIZE = 16384 - -const buf = Buffer.allocUnsafe(BUFFER_SIZE) -let bufIdx = BUFFER_SIZE - -function generateMask () { - if (bufIdx === BUFFER_SIZE) { - bufIdx = 0 - randomFillSync(buf, 0, BUFFER_SIZE) - } - return [buf[bufIdx++], buf[bufIdx++], buf[bufIdx++], buf[bufIdx++]] -} - -group('generate', () => { +summary(() => { bench('generateMask', () => generateMask()) bench('crypto.randomBytes(4)', () => randomBytes(4)) }) diff --git a/lib/web/websocket/frame.js b/lib/web/websocket/frame.js index e773b33e1a6..68f31ebab9f 100644 --- a/lib/web/websocket/frame.js +++ b/lib/web/websocket/frame.js @@ -134,5 +134,6 @@ class WebsocketFrameSend { } module.exports = { - WebsocketFrameSend + WebsocketFrameSend, + generateMask // for benchmark }