Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,38 @@ pull(
)
```

## API

### `encode([opts])`

- `opts: Object`, optional
- `fixed: false`:
- `bytes: 4`: If `fixed` is `true` this is the amount of bytes used for the prefix.

By default all messages will be prefixed with a varint. If you want to use a fixed length prefix you can specify this through the `opts`.

Returns a pull-stream through.

### `decode([opts])`

- `opts: Object`, optional
- `fixed: false`:
- `bytes: 4`: If `fixed` is `true` this is the amount of bytes used for the prefix.

By default all messages will be prefixed with a varint. If you want to use a fixed length prefix you can specify this through the `opts`.


Returns a pull-stream through.

### `decodeFromReader(reader, [opts], cb)`

- `reader: [pull-reader](https://github.com/dominictarr/pull-reader)`
- `opts: Object`, optional. Same as for `decode`.
- `cb: Function`: Callback called with `(err, message)`.

This uses a [pull-reader](https://github.com/dominictarr/pull-reader) instance to reade and decode a single message. Useful when using [pull-handshake](https://github.com/pull-stream/pull-handshake) with length prefixed messages.


## Contribute

PRs and issues gladly accepted! Check out the [issues](//github.com/dignifiedquire/pull-length-prefixed/issues).
Expand Down
66 changes: 51 additions & 15 deletions src/decode.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ exports.decodeFromReader = decodeFromReader
const MSB = 0x80
const isEndByte = (byte) => !(byte & MSB)

function decode () {
function decode (opts) {
let reader = new Reader()
let p = pushable((err) => {
reader.abort(err)
Expand All @@ -20,7 +20,7 @@ function decode () {
return (read) => {
reader(read)
function next () {
decodeFromReader(reader, (err, msg) => {
decodeFromReader(reader, opts, (err, msg) => {
if (err) return p.end(err)

p.push(msg)
Expand All @@ -33,7 +33,36 @@ function decode () {
}
}

function decodeFromReader (reader, cb) {
function decodeFromReader (reader, opts, cb) {
if (typeof opts === 'function') {
cb = opts
opts = {}
}

opts = Object.assign({
fixed: false,
bytes: 4
}, opts || {})

if (opts.fixed) {
readFixedMessage(reader, opts.bytes, cb)
} else {
readVarintMessage(reader, cb)
}
}

function readFixedMessage (reader, byteLength, cb) {
reader.read(byteLength, (err, bytes) => {
if (err) {
return cb(err)
}

const msgSize = bytes.readInt32BE(0)
readMessage(reader, msgSize, cb)
})
}

function readVarintMessage (reader, cb) {
let rawMsgSize = []
if (rawMsgSize.length === 0) readByte()

Expand All @@ -48,22 +77,29 @@ function decodeFromReader (reader, cb) {

if (byte && !isEndByte(byte[0])) {
readByte()
} else {
readMessage()
return
}
})
}

function readMessage () {
const msgSize = varint.decode(Buffer.concat(rawMsgSize))
reader.read(msgSize, (err, msg) => {
if (err) {
return cb(err)
}
const msgSize = varint.decode(Buffer.concat(rawMsgSize))
readMessage(reader, msgSize, (err, msg) => {
if (err) {
return cb(err)
}

rawMsgSize = []
rawMsgSize = []

cb(null, msg)
cb(null, msg)
})
})
}
}

function readMessage (reader, size, cb) {
reader.read(size, (err, msg) => {
if (err) {
return cb(err)
}

cb(null, msg)
})
}
43 changes: 31 additions & 12 deletions src/encode.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
'use strict'

const varint = require('varint')
const Buffer = require('safe-buffer').Buffer

module.exports = encode

function encode () {
const poolSize = 10 * 1024
let pool = Buffer.alloc(poolSize)
const poolSize = 10 * 1024

function encode (opts) {
opts = Object.assign({
fixed: false,
bytes: 4
}, opts || {})

// Only needed for varint
const varint = require('varint')
let pool = opts.fixed ? null : createPool()
let used = 0

let ended = false

return (read) => (end, cb) => {
Expand All @@ -24,18 +32,29 @@ function encode () {
return cb(ended)
}

varint.encode(data.length, pool, used)
used += varint.encode.bytes
let encodedLength
if (opts.fixed) {
encodedLength = Buffer.alloc(opts.bytes)
encodedLength.writeInt32BE(data.length, 0)
} else {
varint.encode(data.length, pool, used)
used += varint.encode.bytes
encodedLength = pool.slice(used - varint.encode.bytes, used)

if (pool.length - used < 100) {
pool = createPool()
used = 0
}
}

cb(null, Buffer.concat([
pool.slice(used - varint.encode.bytes, used),
encodedLength,
data
]))

if (pool.length - used < 100) {
pool = Buffer.alloc(poolSize)
used = 0
}
})
}
}

function createPool () {
return Buffer.alloc(poolSize)
}
51 changes: 51 additions & 0 deletions test/fixed.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/* eslint-env mocha */
'use strict'

const pull = require('pull-stream')
const expect = require('chai').expect

const lp = require('../src')

describe('pull-length-prefixed', () => {
it('basics', (done) => {
const input = [
new Buffer('hello '),
new Buffer('world')
]

pull(
pull.values(input),
lp.encode({fixed: true}),
pull.collect((err, encoded) => {
if (err) throw err

expect(
encoded
).to.be.eql([
Buffer.concat([
new Buffer('00000006', 'hex'),
new Buffer('hello ')
]),
Buffer.concat([
new Buffer('00000005', 'hex'),
new Buffer('world')
])
])

pull(
pull.values(encoded),
lp.decode({fixed: true}),
pull.collect((err, output) => {
if (err) throw err
expect(
input
).to.be.eql(
output
)
done()
})
)
})
)
})
})