Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions lib/_stream_passthrough.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ function PassThrough(options) {
Transform.call(this, options);
}

PassThrough.prototype._transform = function(chunk, encoding, cb) {
PassThrough.prototype._transform = passThrough_transform;
function passThrough_transform(chunk, encoding, cb) {
cb(null, chunk);
};
}
42 changes: 21 additions & 21 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ function Readable(options) {
// This returns true if the highWaterMark has not been hit yet,
// similar to how Writable.write() returns true if you should
// write() some more.
Readable.prototype.push = function(chunk, encoding) {
Readable.prototype.push = function readablePush(chunk, encoding) {
var state = this._readableState;

if (!state.objectMode && typeof chunk === 'string') {
Expand All @@ -135,12 +135,12 @@ Readable.prototype.push = function(chunk, encoding) {
};

// Unshift should *always* be something directly out of read()
Readable.prototype.unshift = function(chunk) {
Readable.prototype.unshift = function readableUnshift(chunk) {
var state = this._readableState;
return readableAddChunk(this, state, chunk, '', true);
};

Readable.prototype.isPaused = function() {
Readable.prototype.isPaused = function readableIsPaused() {
return this._readableState.flowing === false;
};

Expand Down Expand Up @@ -213,7 +213,7 @@ function needMoreData(state) {
}

// backwards compatibility.
Readable.prototype.setEncoding = function(enc) {
Readable.prototype.setEncoding = function readableSetEncoding(enc) {
if (!StringDecoder)
StringDecoder = require('string_decoder').StringDecoder;
this._readableState.decoder = new StringDecoder(enc);
Expand Down Expand Up @@ -268,7 +268,7 @@ function howMuchToRead(n, state) {
}

// you can override either this method, or the async _read(n) below.
Readable.prototype.read = function(n) {
Readable.prototype.read = function readableRead(n) {
debug('read', n);
n = parseInt(n, 10);
var state = this._readableState;
Expand Down Expand Up @@ -466,11 +466,11 @@ function maybeReadMore_(stream, state) {
// call cb(er, data) where data is <= n in length.
// for virtual (non-string, non-buffer) streams, "length" is somewhat
// arbitrary, and perhaps not very meaningful.
Readable.prototype._read = function(n) {
Readable.prototype._read = function readable_read(n) {
this.emit('error', new Error('_read() is not implemented'));
};

Readable.prototype.pipe = function(dest, pipeOpts) {
Readable.prototype.pipe = function readablePipe(dest, pipeOpts) {
var src = this;
var state = this._readableState;

Expand Down Expand Up @@ -613,7 +613,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
};

function pipeOnDrain(src) {
return function() {
return function onPipeOnDrain() {
var state = src._readableState;
debug('pipeOnDrain', state.awaitDrain);
if (state.awaitDrain)
Expand All @@ -626,7 +626,7 @@ function pipeOnDrain(src) {
}


Readable.prototype.unpipe = function(dest) {
Readable.prototype.unpipe = function readableUnpipe(dest) {
var state = this._readableState;

// if we're not piping anywhere, then do nothing.
Expand Down Expand Up @@ -683,7 +683,7 @@ Readable.prototype.unpipe = function(dest) {

// set up data events if they are asked for
// Ensure readable listeners eventually get something
Readable.prototype.on = function(ev, fn) {
Readable.prototype.on = function readableOn(ev, fn) {
const res = Stream.prototype.on.call(this, ev, fn);

if (ev === 'data') {
Expand Down Expand Up @@ -714,7 +714,7 @@ function nReadingNextTick(self) {

// pause() and resume() are remnants of the legacy readable stream API
// If the user uses them, then switch into old mode.
Readable.prototype.resume = function() {
Readable.prototype.resume = function readableResume() {
var state = this._readableState;
if (!state.flowing) {
debug('resume');
Expand Down Expand Up @@ -745,7 +745,7 @@ function resume_(stream, state) {
stream.read(0);
}

Readable.prototype.pause = function() {
Readable.prototype.pause = function readablePause() {
debug('call pause flowing=%j', this._readableState.flowing);
if (false !== this._readableState.flowing) {
debug('pause');
Expand All @@ -764,12 +764,12 @@ function flow(stream) {
// wrap an old-style stream as the async data source.
// This is *not* part of the readable stream interface.
// It is an ugly unfortunate mess of history.
Readable.prototype.wrap = function(stream) {
Readable.prototype.wrap = function readableWrap(stream) {
var state = this._readableState;
var paused = false;

var self = this;
stream.on('end', function() {
stream.on('end', function onEnd() {
debug('wrapped end');
if (state.decoder && !state.ended) {
var chunk = state.decoder.end();
Expand All @@ -780,7 +780,7 @@ Readable.prototype.wrap = function(stream) {
self.push(null);
});

stream.on('data', function(chunk) {
stream.on('data', function onData(chunk) {
debug('wrapped data');
if (state.decoder)
chunk = state.decoder.write(chunk);
Expand All @@ -802,8 +802,8 @@ Readable.prototype.wrap = function(stream) {
// important when wrapping filters and duplexes.
for (var i in stream) {
if (this[i] === undefined && typeof stream[i] === 'function') {
this[i] = function(method) {
return function() {
this[i] = function proxyMethods(method) {
return function onProxyMethods() {
return stream[method].apply(stream, arguments);
};
}(i);
Expand All @@ -812,13 +812,13 @@ Readable.prototype.wrap = function(stream) {

// proxy certain important events.
const events = ['error', 'close', 'destroy', 'pause', 'resume'];
events.forEach(function(ev) {
stream.on(ev, self.emit.bind(self, ev));
});
for (var ev = 0; ev < events.length; ev++) {
stream.on(events[ev], self.emit.bind(self, events[ev]));
}

// when we try to consume some more bytes, simply unpause the
// underlying stream.
self._read = function(n) {
self._read = function _read(n) {
debug('wrapped _read', n);
if (paused) {
paused = false;
Expand Down
14 changes: 7 additions & 7 deletions lib/_stream_transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ util.inherits(Transform, Duplex);


function TransformState(stream) {
this.afterTransform = function(er, data) {
this.afterTransform = function _afterTransform(er, data) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why add the _ prefix?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to avoid conflict with afterTransform that is called on next line.

return afterTransform(stream, er, data);
};

Expand Down Expand Up @@ -113,17 +113,17 @@ function Transform(options) {
}

// When the writable side finishes, then flush out anything remaining.
this.once('prefinish', function() {
this.once('prefinish', function prefinish() {
if (typeof this._flush === 'function')
this._flush(function(er, data) {
this._flush(function _flush(er, data) {
done(stream, er, data);
});
else
done(stream);
});
}

Transform.prototype.push = function(chunk, encoding) {
Transform.prototype.push = function push(chunk, encoding) {
this._transformState.needTransform = false;
return Duplex.prototype.push.call(this, chunk, encoding);
};
Expand All @@ -138,11 +138,11 @@ Transform.prototype.push = function(chunk, encoding) {
// Call `cb(err)` when you are done with this chunk. If you pass
// an error, then that'll put the hurt on the whole operation. If you
// never call cb(), then you'll never get another chunk.
Transform.prototype._transform = function(chunk, encoding, cb) {
Transform.prototype._transform = function _transform(chunk, encoding, cb) {
throw new Error('_transform() is not implemented');
};

Transform.prototype._write = function(chunk, encoding, cb) {
Transform.prototype._write = function _write(chunk, encoding, cb) {
var ts = this._transformState;
ts.writecb = cb;
ts.writechunk = chunk;
Expand All @@ -159,7 +159,7 @@ Transform.prototype._write = function(chunk, encoding, cb) {
// Doesn't matter what the args are here.
// _transform does all the work.
// That we got here means that the readable side wants more data.
Transform.prototype._read = function(n) {
Transform.prototype._read = function _read(n) {
var ts = this._transformState;

if (ts.writechunk !== null && ts.writecb && !ts.transforming) {
Expand Down
36 changes: 20 additions & 16 deletions lib/_stream_wrap.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,26 @@ function StreamWrap(stream) {
this._list = null;

const self = this;
handle.close = function(cb) {
handle.close = function handleClose(cb) {
debug('close');
self.doClose(cb);
};
handle.isAlive = function() {
handle.isAlive = function handleIsAlive() {
return self.isAlive();
};
handle.isClosing = function() {
handle.isClosing = function handleIsClosing() {
return self.isClosing();
};
handle.onreadstart = function() {
handle.onreadstart = function handleOnreadstart() {
return self.readStart();
};
handle.onreadstop = function() {
handle.onreadstop = function handleOnreadstop() {
return self.readStop();
};
handle.onshutdown = function(req) {
handle.onshutdown = function handleOnshutdown(req) {
return self.doShutdown(req);
};
handle.onwrite = function(req, bufs) {
handle.onwrite = function handleOnwrite(req, bufs) {
return self.doWrite(req, bufs);
};

Expand All @@ -57,7 +57,7 @@ function StreamWrap(stream) {
if (self._handle)
self._handle.readBuffer(chunk);
});
this.stream.once('end', function onend() {
this.stream.once('end', function onEnd() {
debug('end');
if (self._handle)
self._handle.emitEOF();
Expand Down Expand Up @@ -96,9 +96,9 @@ StreamWrap.prototype.doShutdown = function doShutdown(req) {
const handle = this._handle;
const item = this._enqueue('shutdown', req);

this.stream.end(function() {
this.stream.end(function streamEnd() {
// Ensure that write was dispatched
setImmediate(function() {
setImmediate(function setImmediateDoShutdown() {
if (!self._dequeue(item))
return;

Expand All @@ -118,9 +118,11 @@ StreamWrap.prototype.doWrite = function doWrite(req, bufs) {
const item = self._enqueue('write', req);

self.stream.cork();
bufs.forEach(function(buf) {
self.stream.write(buf, done);
});
for (var buf in bufs) {
if (bufs.hasOwnProperty(buf)) {
self.stream.write(buf, done);
}
}
self.stream.uncork();

function done(err) {
Expand All @@ -131,7 +133,7 @@ StreamWrap.prototype.doWrite = function doWrite(req, bufs) {
pending = 0;

// Ensure that write was dispatched
setImmediate(function() {
setImmediate(function setImmediateDoWrite() {
// Do not invoke callback twice
if (!self._dequeue(item))
return;
Expand Down Expand Up @@ -204,7 +206,7 @@ StreamWrap.prototype.doClose = function doClose(cb) {
const self = this;
const handle = self._handle;

setImmediate(function() {
function setImmediateDoClose() {
while (self._list !== null) {
const item = self._list;
const req = item.req;
Expand All @@ -222,5 +224,7 @@ StreamWrap.prototype.doClose = function doClose(cb) {
// Should be already set by net.js
assert(self._handle === null);
cb();
});
}

setImmediate(setImmediateDoClose);
};
18 changes: 9 additions & 9 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ WritableState.prototype.getBuffer = function getBuffer() {
};

Object.defineProperty(WritableState.prototype, 'buffer', {
get: internalUtil.deprecate(function() {
get: internalUtil.deprecate(function getBuffer() {
return this.getBuffer();
}, '_writableState.buffer is deprecated. Use _writableState.getBuffer ' +
'instead.', 'DEP0003')
Expand All @@ -134,15 +134,15 @@ var realHasInstance;
if (typeof Symbol === 'function' && Symbol.hasInstance) {
realHasInstance = Function.prototype[Symbol.hasInstance];
Object.defineProperty(Writable, Symbol.hasInstance, {
value: function(object) {
value: function writableHasInstance(object) {
if (realHasInstance.call(this, object))
return true;

return object && object._writableState instanceof WritableState;
}
});
} else {
realHasInstance = function(object) {
realHasInstance = function _hasInstancePolyfill(object) {
return object instanceof this;
};
}
Expand Down Expand Up @@ -177,7 +177,7 @@ function Writable(options) {
}

// Otherwise people can pipe Writable streams, which is just wrong.
Writable.prototype.pipe = function() {
Writable.prototype.pipe = function writablePipe() {
this.emit('error', new Error('Cannot pipe, not readable'));
};

Expand Down Expand Up @@ -211,7 +211,7 @@ function validChunk(stream, state, chunk, cb) {
return valid;
}

Writable.prototype.write = function(chunk, encoding, cb) {
Writable.prototype.write = function writableWrite(chunk, encoding, cb) {
var state = this._writableState;
var ret = false;
var isBuf = (chunk instanceof Buffer);
Expand Down Expand Up @@ -239,13 +239,13 @@ Writable.prototype.write = function(chunk, encoding, cb) {
return ret;
};

Writable.prototype.cork = function() {
Writable.prototype.cork = function writableCork() {
var state = this._writableState;

state.corked++;
};

Writable.prototype.uncork = function() {
Writable.prototype.uncork = function writableUncork() {
var state = this._writableState;

if (state.corked) {
Expand Down Expand Up @@ -452,13 +452,13 @@ function clearBuffer(stream, state) {
state.bufferProcessing = false;
}

Writable.prototype._write = function(chunk, encoding, cb) {
Writable.prototype._write = function writable_write(chunk, encoding, cb) {
cb(new Error('_write() is not implemented'));
};

Writable.prototype._writev = null;

Writable.prototype.end = function(chunk, encoding, cb) {
Writable.prototype.end = function writableEnd(chunk, encoding, cb) {
var state = this._writableState;

if (typeof chunk === 'function') {
Expand Down