From 9ec64a0c79d3a4e6ef846b022cadbfd9eee06ffa Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 25 Feb 2024 11:03:21 +0100 Subject: [PATCH 1/3] refactor: avoid http2 dynamic dispatch in socket handlers Steps towards more clean separation between h1 and h2. Refs: https://github.com/nodejs/undici/pull/2816 --- lib/core/symbols.js | 3 +- lib/dispatcher/client.js | 143 ++++++++++++++++++++++----------------- 2 files changed, 82 insertions(+), 64 deletions(-) diff --git a/lib/core/symbols.js b/lib/core/symbols.js index 68d8566fac0..d3edb65dda6 100644 --- a/lib/core/symbols.js +++ b/lib/core/symbols.js @@ -59,5 +59,6 @@ module.exports = { kHTTP2CopyHeaders: Symbol('http2 copy headers'), kHTTPConnVersion: Symbol('http connection version'), kRetryHandlerDefaultRetry: Symbol('retry agent default retry'), - kConstruct: Symbol('constructable') + kConstruct: Symbol('constructable'), + kListeners: Symbol('listeners') } diff --git a/lib/dispatcher/client.js b/lib/dispatcher/client.js index cb3feabb37b..2e05169d7ce 100644 --- a/lib/dispatcher/client.js +++ b/lib/dispatcher/client.js @@ -72,6 +72,7 @@ const { kLocalAddress, kMaxResponseSize, kHTTPConnVersion, + kListeners, // HTTP2 kHost, kHTTP2Session, @@ -111,6 +112,20 @@ const FastBuffer = Buffer[Symbol.species] const kClosedResolve = Symbol('kClosedResolve') +function addListener (obj, name, listener) { + const listeners = (obj[kListeners] ??= []) + listeners.push([name, listener]) + obj.on(name, listener) + return obj +} + +function removeAllListeners (obj) { + for (const [name, listener] of obj[kListeners] ?? []) { + obj.removeListener(name, listener) + } + obj[kListeners] = null +} + /** * @type {import('../../types/client.js').default} */ @@ -803,11 +818,8 @@ class Parser { socket[kClient] = null socket[kError] = null - socket - .removeListener('error', onSocketError) - .removeListener('readable', onSocketReadable) - .removeListener('end', onSocketEnd) - .removeListener('close', onSocketClose) + + removeAllListeners(socket) client[kSocket] = null client[kHTTP2Session] = null @@ -1050,33 +1062,6 @@ function onParserTimeout (parser) { } } -function onSocketReadable () { - const { [kParser]: parser } = this - if (parser) { - parser.readMore() - } -} - -function onSocketError (err) { - const { [kClient]: client, [kParser]: parser } = this - - assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID') - - if (client[kHTTPConnVersion] !== 'h2') { - // On Mac OS, we get an ECONNRESET even if there is a full body to be forwarded - // to the user. - if (err.code === 'ECONNRESET' && parser.statusCode && !parser.shouldKeepAlive) { - // We treat all incoming data so for as a valid response. - parser.onMessageComplete() - return - } - } - - this[kError] = err - - onError(this[kClient], err) -} - function onError (client, err) { if ( client[kRunning] === 0 && @@ -1097,32 +1082,8 @@ function onError (client, err) { } } -function onSocketEnd () { - const { [kParser]: parser, [kClient]: client } = this - - if (client[kHTTPConnVersion] !== 'h2') { - if (parser.statusCode && !parser.shouldKeepAlive) { - // We treat all incoming data so far as a valid response. - parser.onMessageComplete() - return - } - } - - util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this))) -} - function onSocketClose () { - const { [kClient]: client, [kParser]: parser } = this - - if (client[kHTTPConnVersion] === 'h1' && parser) { - if (!this[kError] && parser.statusCode && !parser.shouldKeepAlive) { - // We treat all incoming data so far as a valid response. - parser.onMessageComplete() - } - - this[kParser].destroy() - this[kParser] = null - } + const { [kClient]: client } = this const err = this[kError] || new SocketError('closed', util.getSocketInfo(this)) @@ -1241,6 +1202,18 @@ async function connect (client) { client[kHTTP2Session] = session socket[kHTTP2Session] = session + + addListener(socket, 'error', function (err) { + assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID') + + this[kError] = err + + onError(this[kClient], err) + }) + addListener(socket, 'end', function () { + util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this))) + }) + addListener(socket, 'close', onSocketClose) } else { if (!llhttpInstance) { llhttpInstance = await llhttpPromise @@ -1252,6 +1225,56 @@ async function connect (client) { socket[kReset] = false socket[kBlocking] = false socket[kParser] = new Parser(client, socket, llhttpInstance) + + addListener(socket, 'error', function (err) { + const { [kParser]: parser } = this + + assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID') + + // On Mac OS, we get an ECONNRESET even if there is a full body to be forwarded + // to the user. + if (err.code === 'ECONNRESET' && parser.statusCode && !parser.shouldKeepAlive) { + // We treat all incoming data so for as a valid response. + parser.onMessageComplete() + return + } + + this[kError] = err + + onError(this[kClient], err) + }) + addListener(socket, 'readable', function () { + const { [kParser]: parser } = this + if (parser) { + parser.readMore() + } + }) + addListener(socket, 'end', function () { + const { [kParser]: parser } = this + + if (parser.statusCode && !parser.shouldKeepAlive) { + // We treat all incoming data so far as a valid response. + parser.onMessageComplete() + return + } + + util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this))) + }) + addListener(socket, 'close', function () { + const { [kParser]: parser } = this + + if (parser) { + if (!this[kError] && parser.statusCode && !parser.shouldKeepAlive) { + // We treat all incoming data so far as a valid response. + parser.onMessageComplete() + } + + this[kParser].destroy() + this[kParser] = null + } + + onSocketClose.call(this) + }) } socket[kCounter] = 0 @@ -1259,12 +1282,6 @@ async function connect (client) { socket[kClient] = client socket[kError] = null - socket - .on('error', onSocketError) - .on('readable', onSocketReadable) - .on('end', onSocketEnd) - .on('close', onSocketClose) - client[kSocket] = socket if (channels.connected.hasSubscribers) { From 33795485da7bc0af8cd3a169bf23fc8f58067b45 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 25 Feb 2024 11:15:23 +0100 Subject: [PATCH 2/3] refactor: split h1 and h2 connect into own methods --- lib/dispatcher/client.js | 207 ++++++++++++++++++++------------------- 1 file changed, 108 insertions(+), 99 deletions(-) diff --git a/lib/dispatcher/client.js b/lib/dispatcher/client.js index 2e05169d7ce..dbc2cf7d036 100644 --- a/lib/dispatcher/client.js +++ b/lib/dispatcher/client.js @@ -291,7 +291,7 @@ class Client extends DispatcherBase { this[kMaxRequests] = maxRequestsPerClient this[kClosedResolve] = null this[kMaxResponseSize] = maxResponseSize > -1 ? maxResponseSize : -1 - this[kHTTPConnVersion] = 'h1' + this[kHTTPConnVersion] = null // HTTP/2 this[kHTTP2Session] = null @@ -1176,107 +1176,14 @@ async function connect (client) { assert(socket) - const isH2 = socket.alpnProtocol === 'h2' - if (isH2) { - if (!h2ExperimentalWarned) { - h2ExperimentalWarned = true - process.emitWarning('H2 support is experimental, expect them to change at any time.', { - code: 'UNDICI-H2' - }) - } - - const session = http2.connect(client[kUrl], { - createConnection: () => socket, - peerMaxConcurrentStreams: client[kHTTP2SessionState].maxConcurrentStreams - }) - - client[kHTTPConnVersion] = 'h2' - session[kClient] = client - session[kSocket] = socket - session.on('error', onHttp2SessionError) - session.on('frameError', onHttp2FrameError) - session.on('end', onHttp2SessionEnd) - session.on('goaway', onHTTP2GoAway) - session.on('close', onSocketClose) - session.unref() - - client[kHTTP2Session] = session - socket[kHTTP2Session] = session - - addListener(socket, 'error', function (err) { - assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID') - - this[kError] = err - - onError(this[kClient], err) - }) - addListener(socket, 'end', function () { - util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this))) - }) - addListener(socket, 'close', onSocketClose) + if (socket.alpnProtocol === 'h2') { + await connectH2(client, socket) } else { - if (!llhttpInstance) { - llhttpInstance = await llhttpPromise - llhttpPromise = null - } - - socket[kNoRef] = false - socket[kWriting] = false - socket[kReset] = false - socket[kBlocking] = false - socket[kParser] = new Parser(client, socket, llhttpInstance) - - addListener(socket, 'error', function (err) { - const { [kParser]: parser } = this - - assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID') - - // On Mac OS, we get an ECONNRESET even if there is a full body to be forwarded - // to the user. - if (err.code === 'ECONNRESET' && parser.statusCode && !parser.shouldKeepAlive) { - // We treat all incoming data so for as a valid response. - parser.onMessageComplete() - return - } - - this[kError] = err - - onError(this[kClient], err) - }) - addListener(socket, 'readable', function () { - const { [kParser]: parser } = this - if (parser) { - parser.readMore() - } - }) - addListener(socket, 'end', function () { - const { [kParser]: parser } = this - - if (parser.statusCode && !parser.shouldKeepAlive) { - // We treat all incoming data so far as a valid response. - parser.onMessageComplete() - return - } - - util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this))) - }) - addListener(socket, 'close', function () { - const { [kParser]: parser } = this - - if (parser) { - if (!this[kError] && parser.statusCode && !parser.shouldKeepAlive) { - // We treat all incoming data so far as a valid response. - parser.onMessageComplete() - } - - this[kParser].destroy() - this[kParser] = null - } - - onSocketClose.call(this) - }) + await connectH1(client, socket) } + addListener(socket, 'close', onSocketClose) + socket[kCounter] = 0 socket[kMaxRequests] = client[kMaxRequests] socket[kClient] = client @@ -2358,4 +2265,106 @@ function errorRequest (client, request, err) { } } +async function connectH1 (client, socket) { + client[kHTTPConnVersion] = 'h1' + + if (!llhttpInstance) { + llhttpInstance = await llhttpPromise + llhttpPromise = null + } + + socket[kNoRef] = false + socket[kWriting] = false + socket[kReset] = false + socket[kBlocking] = false + socket[kParser] = new Parser(client, socket, llhttpInstance) + + addListener(socket, 'error', function (err) { + const { [kParser]: parser } = this + + assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID') + + // On Mac OS, we get an ECONNRESET even if there is a full body to be forwarded + // to the user. + if (err.code === 'ECONNRESET' && parser.statusCode && !parser.shouldKeepAlive) { + // We treat all incoming data so for as a valid response. + parser.onMessageComplete() + return + } + + this[kError] = err + + onError(this[kClient], err) + }) + addListener(socket, 'readable', function () { + const { [kParser]: parser } = this + if (parser) { + parser.readMore() + } + }) + addListener(socket, 'end', function () { + const { [kParser]: parser } = this + + if (parser.statusCode && !parser.shouldKeepAlive) { + // We treat all incoming data so far as a valid response. + parser.onMessageComplete() + return + } + + util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this))) + }) + addListener(socket, 'close', function () { + const { [kParser]: parser } = this + + if (parser) { + if (!this[kError] && parser.statusCode && !parser.shouldKeepAlive) { + // We treat all incoming data so far as a valid response. + parser.onMessageComplete() + } + + this[kParser].destroy() + this[kParser] = null + } + }) +} + +async function connectH2 (client, socket) { + client[kHTTPConnVersion] = 'h2' + + if (!h2ExperimentalWarned) { + h2ExperimentalWarned = true + process.emitWarning('H2 support is experimental, expect them to change at any time.', { + code: 'UNDICI-H2' + }) + } + + const session = http2.connect(client[kUrl], { + createConnection: () => socket, + peerMaxConcurrentStreams: client[kHTTP2SessionState].maxConcurrentStreams + }) + + session[kClient] = client + session[kSocket] = socket + session.on('error', onHttp2SessionError) + session.on('frameError', onHttp2FrameError) + session.on('end', onHttp2SessionEnd) + session.on('goaway', onHTTP2GoAway) + session.on('close', onSocketClose) + session.unref() + + client[kHTTP2Session] = session + socket[kHTTP2Session] = session + + addListener(socket, 'error', function (err) { + assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID') + + this[kError] = err + + onError(this[kClient], err) + }) + addListener(socket, 'end', function () { + util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this))) + }) +} + module.exports = Client From b23d1f7bb5d94ccd85c15d5cdd53221dc01ebadb Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 25 Feb 2024 11:33:03 +0100 Subject: [PATCH 3/3] refactor: separate writeH1 and writeH2 --- lib/dispatcher/client.js | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/lib/dispatcher/client.js b/lib/dispatcher/client.js index dbc2cf7d036..b4ca2f85428 100644 --- a/lib/dispatcher/client.js +++ b/lib/dispatcher/client.js @@ -1399,10 +1399,15 @@ function shouldSendContentLength (method) { function write (client, request) { if (client[kHTTPConnVersion] === 'h2') { - writeH2(client, client[kHTTP2Session], request) - return + // TODO (fix): Why does this not return the value + // from writeH2. + writeH2(client, request) + } else { + return writeH1(client, request) } +} +function writeH1 (client, request) { const { method, path, host, upgrade, blocking, reset } = request let { body, headers, contentLength } = request @@ -1580,7 +1585,8 @@ function write (client, request) { return true } -function writeH2 (client, session, request) { +function writeH2 (client, request) { + const session = client[kHTTP2Session] const { body, method, path, host, upgrade, expectContinue, signal, headers: reqHeaders } = request let headers