Skip to content

feature: sequential listeners #1709

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 75 additions & 22 deletions lib/http-proxy/passes/ws-incoming.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,50 @@ var http = require('http'),
https = require('https'),
common = require('../common');

function isPromise(x) { return !!x && typeof x.then === 'function'; }

async function runAsyncListenersSequentially(emitter, event, args, timeoutMs) {
const listeners = emitter ? emitter.listeners(event) : [];
for (const fn of listeners) {
if (fn.length > args.length) {
await new Promise((resolve, reject) => {
let settled = false;
let timer = null;
const done = (err) => {
if (timer) clearTimeout(timer);
if (!settled) {
settled = true;
err ? reject(err) : resolve();
}
};
try {
if (timeoutMs) {
timer = setTimeout(() => done(new Error(event + ' hook timeout')), timeoutMs);
}
const ret = fn.call(emitter, ...args, done);
if (isPromise(ret)) {
ret.then(() => done(), done);
}
} catch (e) {
done(e);
}
});
} else {
const ret = fn.call(emitter, ...args);
if (isPromise(ret)) {
if (timeoutMs) {
await Promise.race([
ret,
new Promise((_, rej) => setTimeout(() => rej(new Error(event + ' hook timeout')), timeoutMs))
]);
} else {
await ret;
}
}
}
}
}

/*!
* Array of passes.
*
Expand All @@ -15,18 +59,16 @@ var http = require('http'),
*
*/


module.exports = {
/**
* WebSocket requests must have the `GET` method and
* the `upgrade:websocket` header
*
* @param {ClientRequest} Req Request object
* @param {Socket} Websocket
* @param {Socket} Websocket
*
* @api private
*/

checkMethodAndHeader : function checkMethodAndHeader(req, socket) {
if (req.method !== 'GET' || !req.headers.upgrade) {
socket.destroy();
Expand All @@ -43,12 +85,11 @@ module.exports = {
* Sets `x-forwarded-*` headers if specified in config.
*
* @param {ClientRequest} Req Request object
* @param {Socket} Websocket
* @param {Socket} Websocket
* @param {Object} Options Config object passed to the proxy
*
* @api private
*/

XHeaders : function XHeaders(req, socket, options) {
if(!options.xfwd) return;

Expand All @@ -60,9 +101,9 @@ module.exports = {

['for', 'port', 'proto'].forEach(function(header) {
req.headers['x-forwarded-' + header] =
(req.headers['x-forwarded-' + header] || '') +
(req.headers['x-forwarded-' + header] ? ',' : '') +
values[header];
(req.headers['x-forwarded-' + header] || '') +
(req.headers['x-forwarded-' + header] ? ',' : '') +
values[header];
});
},

Expand All @@ -71,7 +112,7 @@ module.exports = {
* send the Switching Protocols request and pipe the sockets.
*
* @param {ClientRequest} Req Request object
* @param {Socket} Websocket
* @param {Socket} Websocket
* @param {Object} Options Config object passed to the proxy
*
* @api private
Expand All @@ -92,21 +133,17 @@ module.exports = {
}
return head;
}, [line])
.join('\r\n') + '\r\n\r\n';
}
.join('\r\n') + '\r\n\r\n';
};

common.setupSocket(socket);

if (head && head.length) socket.unshift(head);


var proxyReq = (common.isSSL.test(options.target.protocol) ? https : http).request(
common.setupOutgoing(options.ssl || {}, options, req)
common.setupOutgoing(options.ssl || {}, options, req)
);

// Enable developers to modify the proxyReq before headers are sent
if (server) { server.emit('proxyReqWs', proxyReq, req, socket, options, head); }

// Error Handler
proxyReq.on('error', onOutgoingError);
proxyReq.on('response', function (res) {
Expand All @@ -122,7 +159,7 @@ module.exports = {

// Allow us to listen when the websocket has completed
proxySocket.on('end', function () {
server.emit('close', proxyRes, proxySocket, proxyHead);
server && server.emit('close', proxyRes, proxySocket, proxyHead);
});

// The pipe below will end proxySocket if socket closes cleanly, but not
Expand All @@ -144,19 +181,35 @@ module.exports = {

proxySocket.pipe(socket).pipe(proxySocket);

server.emit('open', proxySocket);
server.emit('proxySocket', proxySocket); //DEPRECATED.
server && server.emit('open', proxySocket);
server && server.emit('proxySocket', proxySocket); // DEPRECATED.
});

return proxyReq.end(); // XXX: CHECK IF THIS IS THIS CORRECT
const hookTimeout = options && options.proxyReqWsTimeout; // optional in ms

(async () => {
try {
if (server && typeof server.listenerCount === 'function' && server.listenerCount('proxyReqWs') > 0) {
await runAsyncListenersSequentially(
server,
'proxyReqWs',
[proxyReq, req, socket, options, head],
hookTimeout
);
}
proxyReq.end(); // nach Abschluss aller Hooks
} catch (err) {
onOutgoingError(err);
}
})();

function onOutgoingError(err) {
if (clb) {
clb(err, req, socket);
} else {
} else if (server && typeof server.emit === 'function') {
server.emit('error', err, req, socket);
}
socket.end();
try { socket.end(); } catch (_) {}
}
}
};