diff --git a/package.json b/package.json index 83f3d902..3abaa152 100644 --- a/package.json +++ b/package.json @@ -2,7 +2,7 @@ "name": "ipfs-bitswap", "version": "0.7.1", "description": "Node.js implementation of the Bitswap data exchange protocol used by IPFS", - "main": "lib/index.js", + "main": "src/index.js", "jsnext:main": "src/index.js", "scripts": { "test": "aegir-test", @@ -55,12 +55,12 @@ "debug": "^2.2.0", "heap": "^0.2.6", "ipfs-block": "^0.3.0", + "lodash.debounce": "^4.0.8", "lodash.isequalwith": "^4.4.0", "lodash.isundefined": "^3.0.1", "multihashes": "^0.2.2", "protocol-buffers": "^3.1.6", "pull-defer": "^0.2.2", - "pull-generate": "^2.2.0", "pull-length-prefixed": "^1.2.0", "pull-paramap": "^1.1.6", "pull-pushable": "^2.0.1", @@ -74,4 +74,4 @@ "greenkeeperio-bot ", "npmcdn-to-unpkg-bot " ] -} \ No newline at end of file +} diff --git a/src/decision/engine.js b/src/decision/engine.js index 545a3b4b..ddb16269 100644 --- a/src/decision/engine.js +++ b/src/decision/engine.js @@ -3,7 +3,8 @@ const debug = require('debug') const mh = require('multihashes') const pull = require('pull-stream') -const generate = require('pull-generate') +const whilst = require('async/whilst') +const debounce = require('lodash.debounce') const log = debug('bitswap:engine') log.error = debug('bitswap:engine:error') @@ -26,6 +27,8 @@ module.exports = class Engine { this.peerRequestQueue = new PeerRequestQueue() this._running = false + + this._outbox = debounce(this._outboxExec.bind(this), 100) } _sendBlock (env, cb) { @@ -42,21 +45,23 @@ module.exports = class Engine { }) } - _outbox () { - if (!this._running) return + _outboxExec () { + let nextTask + log('outbox') - const doIt = (cb) => pull( - generate(null, (state, cb) => { - log('generating', this._running) + whilst( + () => { if (!this._running) { - return cb(true) + return } - const nextTask = this.peerRequestQueue.pop() + nextTask = this.peerRequestQueue.pop() + log('check', this._running && nextTask) + return Boolean(nextTask) + }, + (next) => { + log('generating') log('got task', nextTask) - if (!nextTask) { - return cb(true) - } pull( this.blockstore.getStream(nextTask.entry.key), @@ -65,31 +70,20 @@ module.exports = class Engine { const block = blocks[0] if (err || !block) { nextTask.done() - return cb(null, false) + return next() } - cb(null, { + this._sendBlock({ peer: nextTask.target, block: block, - sent: () => { + sent () { nextTask.done() } - }) + }, next) }) ) - }), - pull.filter(Boolean), - pull.asyncMap(this._sendBlock.bind(this)), - pull.onEnd(cb) + } ) - - if (!this._timer) { - this._timer = setTimeout(() => { - doIt(() => { - this._timer = null - }) - }, 50) - } } wantlistForPeer (peerId) { diff --git a/src/index.js b/src/index.js index 32df2149..eceea92a 100644 --- a/src/index.js +++ b/src/index.js @@ -78,7 +78,6 @@ module.exports = class Bitwap { } _handleReceivedBlock (peerId, block, cb) { - log('handling block', block) series([ (cb) => this._updateReceiveCounters(block, (err) => { if (err) { @@ -87,7 +86,7 @@ module.exports = class Bitwap { return cb() } - log('got block from %s', peerId.toB58String(), block.data.toString()) + log('got block from %s', peerId.toB58String(), block.data.length) cb() }), (cb) => this.put(block, (err) => { @@ -150,6 +149,7 @@ module.exports = class Bitwap { } getStream (keys) { + log('getStream', keys.length) if (!Array.isArray(keys)) { return this._getStreamSingle(keys) } @@ -167,6 +167,7 @@ module.exports = class Bitwap { } _getStreamSingle (key) { + log('getStreamSingle', mh.toB58String(key)) const unwantListeners = {} const blockListeners = {} const unwantEvent = (key) => `unwant:${key}` @@ -197,6 +198,7 @@ module.exports = class Bitwap { } blockListeners[keyS] = (block) => { + log('received block', keyS) this.wm.cancelWants([block.key]) cleanupListener(key) d.resolve(pull.values([block])) @@ -211,6 +213,7 @@ module.exports = class Bitwap { return d.resolve(pull.error(err)) } if (exists) { + log('already have block', mh.toB58String(key)) return d.resolve(this.blockstore.getStream(key)) } diff --git a/src/network/index.js b/src/network/index.js index da974c9f..9d0b88fe 100644 --- a/src/network/index.js +++ b/src/network/index.js @@ -3,6 +3,7 @@ const debug = require('debug') const lp = require('pull-length-prefixed') const pull = require('pull-stream') +const pushable = require('pull-pushable') const Message = require('../message') const cs = require('../constants') @@ -15,6 +16,7 @@ module.exports = class Network { this.libp2p = libp2p this.peerBook = peerBook this.bitswap = bitswap + this.conns = {} // increase event listener max this.libp2p.swarm.setMaxListeners(cs.maxListeners) @@ -29,7 +31,6 @@ module.exports = class Network { this.libp2p.handle(PROTOCOL_IDENTIFIER, this._onConnection) this.libp2p.swarm.on('peer-mux-established', this._onPeerMux) - this.libp2p.swarm.on('peer-mux-closed', this._onPeerMuxClosed) // All existing connections are like new ones for us @@ -47,7 +48,7 @@ module.exports = class Network { } _onConnection (conn) { - log('incomming new bitswap connection') + log('got connection') pull( conn, lp.decode(), @@ -62,10 +63,12 @@ module.exports = class Network { if (err) { return this.bitswap._receiveError(err) } + log('data from', peerInfo.id.toB58String()) this.bitswap._receiveMessage(peerInfo.id, msg) }) }), pull.onEnd((err) => { + log('ending connection') if (err) { return this.bitswap._receiveError(err) } @@ -106,17 +109,33 @@ module.exports = class Network { return cb(err) } - this.libp2p.dialByPeerInfo(peerInfo, PROTOCOL_IDENTIFIER, (err, conn) => { - log('dialed %s', peerInfo.id.toB58String(), err) - if (err) { - return cb(err) - } - pull( - pull.values([msg.toProto()]), - lp.encode(), - conn - ) + if (this.conns[peerInfo]) { + log('connection exists') + this.conns[peerInfo].push(msg.toProto()) cb() - }) + } else { + log('dialByPeerInfo') + this.libp2p.dialByPeerInfo(peerInfo, PROTOCOL_IDENTIFIER, (err, conn) => { + log('dialed %s', peerInfo.id.toB58String(), err) + if (err) { + return cb(err) + } + + this.conns[peerInfo] = pushable() + this.conns[peerInfo].push(msg.toProto()) + + pull( + this.conns[peerInfo], + lp.encode(), + conn, + pull.onEnd((err) => { + this.conns[peerInfo].end() + this.conns[peerInfo] = null + }) + ) + + cb() + }) + } } } diff --git a/src/wantmanager/index.js b/src/wantmanager/index.js index ebce56fe..96c5d617 100644 --- a/src/wantmanager/index.js +++ b/src/wantmanager/index.js @@ -128,24 +128,23 @@ module.exports = class Wantmanager { } run () { - // TODO: is this needed? if so enable it - // // resend entirew wantlist every so often - // const es = [] - // for (let e of this.wl.entries()) { - // es.push(new Message.Entry(e.key, e.priority)) - // } - - // this.peers.forEach((p) => { - // p.addEntries(es, true) - // }) - // timer.start() - // } - // } + this.timer = setInterval(() => { + // resend entirew wantlist every so often + const fullwantlist = new Message(true) + for (let entry of this.wl.entries()) { + fullwantlist.addEntry(entry[1].key, entry[1].priority) + } + + this.peers.forEach((p) => { + p.addMessage(fullwantlist) + }) + }, 10 * 1000) } stop () { for (let mq of this.peers.values()) { this.disconnected(mq.p) } + clearInterval(this.timer) } } diff --git a/src/wantmanager/msg-queue.js b/src/wantmanager/msg-queue.js index aa61954d..30259425 100644 --- a/src/wantmanager/msg-queue.js +++ b/src/wantmanager/msg-queue.js @@ -1,8 +1,7 @@ 'use strict' +const queue = require('async/queue') const debug = require('debug') -const pull = require('pull-stream') -const pushable = require('pull-pushable') const Message = require('../message') @@ -15,10 +14,14 @@ module.exports = class MsgQueue { this.network = network this.refcnt = 1 - this.queue = pushable() + this.queue = queue(this.doWork.bind(this), 1) + this.queue.pause() } addMessage (msg) { + if (msg.empty) { + return + } log('addMessage: %s', this.p.toB58String(), msg) this.queue.push(msg) } @@ -43,12 +46,13 @@ module.exports = class MsgQueue { this.network.connectTo(this.p, (err) => { if (err) { log.error('cant connect to peer %s: %s', this.p.toB58String(), err.message) - return cb() + return cb(err) } log('sending message', wlm) this.network.sendMessage(this.p, wlm, (err) => { if (err) { log.error('send error: %s', err.message) + return cb(err) } cb() }) @@ -57,20 +61,11 @@ module.exports = class MsgQueue { run () { log('starting queue') - - pull( - this.queue, - pull.asyncMap(this.doWork.bind(this)), - pull.onEnd((err) => { - if (err) { - log.error('error processing message queue', err) - } - this.queue = pushable() - }) - ) + this.queue.resume() } stop () { - this.queue.end() + log('killing queue') + this.queue.kill() } } diff --git a/test/wantmanager/index.spec.js b/test/wantmanager/index.spec.js index cd761479..da329e11 100644 --- a/test/wantmanager/index.spec.js +++ b/test/wantmanager/index.spec.js @@ -3,6 +3,7 @@ const expect = require('chai').expect const PeerId = require('peer-id') +const series = require('run-series') const cs = require('../../src/constants') const Message = require('../../src/message') @@ -46,14 +47,21 @@ describe('Wantmanager', () => { wm.connected(peer1) wm.connected(peer2) - setTimeout(() => { - wm.cancelWants([new Buffer('world')]) - setTimeout(() => { + series([ + (cb) => setTimeout(cb, 100), + (cb) => { + wm.cancelWants([new Buffer('world')]) + cb() + }, + (cb) => setTimeout(cb, 100), + (cb) => { wm.wantBlocks([new Buffer('foo')]) - - wm.disconnected(peer1) - wm.disconnected(peer2) - }, 100) - }, 100) + cb() + }, + (cb) => setTimeout(cb, 100) + ], () => { + wm.disconnected(peer1) + wm.disconnected(peer2) + }) }) })