From 1a18fe89222f65c049c281548ca8953018c595ca Mon Sep 17 00:00:00 2001 From: Geoffrey Goodman Date: Mon, 23 Jul 2018 12:11:04 -0400 Subject: [PATCH 1/2] Defer piping of options.payload until socket connection Fixes #222. --- lib/index.js | 17 ++++++++++++++++- test/index.js | 49 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 65 insertions(+), 1 deletion(-) diff --git a/lib/index.js b/lib/index.js index 1c34c24..893ace9 100755 --- a/lib/index.js +++ b/lib/index.js @@ -314,7 +314,7 @@ internals.Client.prototype._request = function (method, url, options, relay, _tr stream = options.payload.pipe(collector); } - stream.pipe(req); + internals.deferPipeUntilSocketConnects(req, stream); return req; } @@ -328,6 +328,21 @@ internals.Client.prototype._request = function (method, url, options, relay, _tr }; +internals.deferPipeUntilSocketConnects = function (req, stream) { + + const onSocket = (socket) => { + + socket.on('connect', onSocketConnect); + }; + const onSocketConnect = () => { + + stream.pipe(req); + }; + + req.on('socket', onSocket); +}; + + internals.redirectMethod = function (code, method, options) { switch (code) { diff --git a/test/index.js b/test/index.js index b884f92..523d789 100755 --- a/test/index.js +++ b/test/index.js @@ -1121,6 +1121,47 @@ describe('read()', () => { expect(err.isBoom).to.equal(true); }); + it('will not pipe the stream if no socket can be established', async () => { + + const agent = new internals.SlowAgent(); + const stream = new Stream.Readable({ + read() { + + piped = true; + this.push(null); + } + }); + const onPiped = () => { + + piped = true; + }; + let piped = false; + + stream.on('pipe', onPiped); + + const promiseA = Wreck.request('post', 'http://localhost:0', { + agent, + payload: stream + }); + + await expect(promiseA).to.reject(Error, /Unable to obtain socket/); + expect(piped).to.equal(false); + + const handler = (req, res) => { + + res.writeHead(200); + res.end(internals.payload); + }; + + const server = await internals.server(handler); + const res = await Wreck.request('post', 'http://localhost:' + server.address().port, { + payload: stream + }); + expect(res.statusCode).to.equal(200); + expect(piped).to.equal(true); + server.close(); + }); + it('times out when stream read takes too long', async () => { const TestStream = function () { @@ -2020,6 +2061,14 @@ internals.https = function (handler) { }; +internals.SlowAgent = class SlowAgent extends Http.Agent { + createConnection(options, cb) { + + setTimeout(cb, 200, new Error('Unable to obtain socket')); + } +}; + + internals.wait = function (timeout) { return new Promise((resolve) => setTimeout(resolve, timeout)); From 0122d11df8a302ae7b326db2b8a590b52fb8f02f Mon Sep 17 00:00:00 2001 From: Geoffrey Goodman Date: Mon, 23 Jul 2018 12:11:43 -0400 Subject: [PATCH 2/2] Adjust tests so that they do not rely on localhost:80 being unbound --- test/index.js | 40 +++++++++++++++++++++++----------------- 1 file changed, 23 insertions(+), 17 deletions(-) diff --git a/test/index.js b/test/index.js index 523d789..53b6ce9 100755 --- a/test/index.js +++ b/test/index.js @@ -958,57 +958,57 @@ describe('options.baseUrl', () => { it('uses baseUrl option without trailing slash and uri is prefixed with a slash', async () => { - const promise = Wreck.request('get', '/foo', { baseUrl: 'http://localhost' }); + const promise = Wreck.request('get', '/foo', { baseUrl: 'http://localhost:0' }); await expect(promise).to.reject(); - expect(promise.req._headers.host).to.equal('localhost'); + expect(promise.req._headers.host).to.equal('localhost:0'); expect(promise.req.path).to.equal('/foo'); }); it('uses baseUrl option with trailing slash and uri is prefixed without a slash', async () => { - const promise = Wreck.request('get', 'foo', { baseUrl: 'http://localhost/' }); + const promise = Wreck.request('get', 'foo', { baseUrl: 'http://localhost:0/' }); await expect(promise).to.reject(); - expect(promise.req._headers.host).to.equal('localhost'); + expect(promise.req._headers.host).to.equal('localhost:0'); expect(promise.req.path).to.equal('/foo'); }); it('uses baseUrl option without trailing slash and uri is prefixed without a slash', async () => { - const promise = Wreck.request('get', 'foo', { baseUrl: 'http://localhost' }); + const promise = Wreck.request('get', 'foo', { baseUrl: 'http://localhost:0' }); await expect(promise).to.reject(); - expect(promise.req._headers.host).to.equal('localhost'); + expect(promise.req._headers.host).to.equal('localhost:0'); expect(promise.req.path).to.equal('/foo'); }); it('uses baseUrl option when uri is an empty string', async () => { - const promise = Wreck.request('get', '', { baseUrl: 'http://localhost' }); + const promise = Wreck.request('get', '', { baseUrl: 'http://localhost:0' }); await expect(promise).to.reject(); - expect(promise.req._headers.host).to.equal('localhost'); + expect(promise.req._headers.host).to.equal('localhost:0'); expect(promise.req.path).to.equal('/'); }); it('uses baseUrl option with a path', async () => { - const promise = Wreck.request('get', '/bar', { baseUrl: 'http://localhost/foo' }); + const promise = Wreck.request('get', '/bar', { baseUrl: 'http://localhost:0/foo' }); await expect(promise).to.reject(); - expect(promise.req._headers.host).to.equal('localhost'); + expect(promise.req._headers.host).to.equal('localhost:0'); expect(promise.req.path).to.equal('/foo/bar'); }); it('uses baseUrl option with a path and removes extra slashes', async () => { - const promise = Wreck.request('get', '/bar', { baseUrl: 'http://localhost/foo/' }); + const promise = Wreck.request('get', '/bar', { baseUrl: 'http://localhost:0/foo/' }); await expect(promise).to.reject(); - expect(promise.req._headers.host).to.equal('localhost'); + expect(promise.req._headers.host).to.equal('localhost:0'); expect(promise.req.path).to.equal('/foo/bar'); }); it('uses baseUrl option with a url that has a querystring', async () => { - const promise = Wreck.request('get', '/bar?test=hello', { baseUrl: 'http://localhost/foo' }); + const promise = Wreck.request('get', '/bar?test=hello', { baseUrl: 'http://localhost:0/foo' }); await expect(promise).to.reject(); - expect(promise.req._headers.host).to.equal('localhost'); + expect(promise.req._headers.host).to.equal('localhost:0'); expect(promise.req.path).to.equal('/foo/bar?test=hello'); }); }); @@ -1800,7 +1800,7 @@ describe('Events', () => { once = true; }); - await expect(wreck.get('http://127.0.0.1', { timeout: 10 })).to.reject(); + await expect(wreck.get('http://localhost:0', { timeout: 10 })).to.reject(); expect(once).to.be.true(); }); @@ -1818,8 +1818,8 @@ describe('Events', () => { const wreck = Wreck.defaults({ events: true }); wreck.events.on('response', handler); - await expect(wreck.get('http://127.0.0.1', { timeout: 10 })).to.reject(); - await expect(wreck.get('http://127.0.0.1', { timeout: 10 })).to.reject(); + await expect(wreck.get('http://localhost:0', { timeout: 10 })).to.reject(); + await expect(wreck.get('http://localhost:0', { timeout: 10 })).to.reject(); expect(count).to.equal(2); }); @@ -2014,6 +2014,12 @@ internals.server = function (handler, socket) { req.pipe(res); }; } + else if (handler === 'fail') { + handler = (req, res) => { + + res.socket.destroy(); + }; + } else if (handler === 'ok') { handler = (req, res) => {