Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 21 additions & 20 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,29 @@ function isFunction (f) {
return 'function' === typeof f
}

function maxDelay(fn, delay) {
if(!delay) return fn
return function (a, cb) {
var timer = setTimeout(function () {
fn(new Error('pull-reader: read exceeded timeout'), cb)
}, delay)
fn(a, function (err, value) {
clearTimeout(timer)
cb(err, value)
})

}

}

module.exports = function (timeout) {

var queue = [], read, readTimed, reading = false
var state = State(), ended, streaming, abort

function maxDelay(fn, delay) {
return function (a, cb) {
var timer
if (delay) {
timer = setTimeout(function () {
fn(new Error('pull-reader: read exceeded timeout'), cb)
}, delay)
}
fn(a, function (err, value) {
if (delay) clearTimeout(timer)
if (err === true && state.read < state.total) {
return cb(new Error('attempted to read '+state.wants+' of '+state.total+' bytes'))
}
cb(err, value)
})
}
}

function drain () {
while (queue.length) {
if(null == queue[0].length && state.has(1)) {
Expand Down Expand Up @@ -88,6 +91,9 @@ module.exports = function (timeout) {
}

reader.read = function (len, _timeout, cb) {
if(isInteger(len)) {
state.wants += len
}
if(isFunction(_timeout))
cb = _timeout, _timeout = timeout
if(isFunction(cb)) {
Expand Down Expand Up @@ -115,8 +121,3 @@ module.exports = function (timeout) {

return reader
}





16 changes: 10 additions & 6 deletions state.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,23 @@ module.exports = function () {

return {
length: length,
total: 0,
data: this,
wants: 0,
read: 0,
add: function (data) {
if(!Buffer.isBuffer(data))
throw new Error('data must be a buffer, was: ' + JSON.stringify(data))
this.length = length = length + data.length
this.total += data.length
buffers.push(data)
return this
},
has: function (n) {
if(null == n) return length > 0
return length >= n
},
get: function (n) {
_get: function (n) {
var _length
if(n == null || n === length) {
length = 0
Expand Down Expand Up @@ -62,12 +66,12 @@ module.exports = function () {
}
else
throw new Error('could not get ' + n + ' bytes')
},
get: function(n) {
var b = this._get(n)
this.read += b.length
return b
}
}

}





34 changes: 32 additions & 2 deletions test/read.js
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ tape('if streaming, the stream should abort', function (t) {

tape('abort stream once in streaming mode', function (t) {

var reader = Reader(), err = new Error('intended')
var reader = Reader()

pull(Hang(), reader)

Expand Down Expand Up @@ -232,7 +232,6 @@ tape('timeout does not apply to the rest of the stream', function (t) {
pull(
reader.read(),
pull.collect(function (err, ary) {
console.log(err)
t.notOk(err)
t.equal(Buffer.concat(ary).toString(), 'hello world')
t.end()
Expand All @@ -241,6 +240,37 @@ tape('timeout does not apply to the rest of the stream', function (t) {
})


tape('overreading results in an error', function (t) {
var corruptedBytes = crypto.randomBytes(10)

pull(
pull.values([corruptedBytes]),
reader = Reader(20e3)
)

reader.read(11, function(_err) {
t.ok(_err)
t.equal(_err.message, 'attempted to read 11 of 10 bytes')
t.end()
})
})


tape('overreading with multiple reads results in an error', function (t) {
var corruptedBytes = crypto.randomBytes(10)

pull(
pull.values([corruptedBytes]),
reader = Reader()
)

reader.read(1, function(err) {
t.notOk(err)
reader.read(100, function(_err) {
t.ok(_err)
t.equal(_err.message, 'attempted to read 101 of 10 bytes')
t.end()
})
})
})