|  | 
|  | 1 | +// test the speed of .pipe() with JSStream wrapping for PassThrough streams | 
|  | 2 | +'use strict'; | 
|  | 3 | + | 
|  | 4 | +const common = require('../common.js'); | 
|  | 5 | +const { PassThrough } = require('stream'); | 
|  | 6 | + | 
|  | 7 | +const bench = common.createBenchmark(main, { | 
|  | 8 | +  len: [102400, 1024 * 1024 * 16], | 
|  | 9 | +  type: ['utf', 'asc', 'buf'], | 
|  | 10 | +  dur: [5], | 
|  | 11 | +}, { | 
|  | 12 | +  flags: ['--expose-internals'] | 
|  | 13 | +}); | 
|  | 14 | + | 
|  | 15 | +var dur; | 
|  | 16 | +var len; | 
|  | 17 | +var type; | 
|  | 18 | +var chunk; | 
|  | 19 | +var encoding; | 
|  | 20 | +var JSStreamWrap;  // Can only require internals inside main(). | 
|  | 21 | + | 
|  | 22 | +function main(conf) { | 
|  | 23 | +  dur = +conf.dur; | 
|  | 24 | +  len = +conf.len; | 
|  | 25 | +  type = conf.type; | 
|  | 26 | +  JSStreamWrap = require('internal/wrap_js_stream'); | 
|  | 27 | + | 
|  | 28 | +  switch (type) { | 
|  | 29 | +    case 'buf': | 
|  | 30 | +      chunk = Buffer.alloc(len, 'x'); | 
|  | 31 | +      break; | 
|  | 32 | +    case 'utf': | 
|  | 33 | +      encoding = 'utf8'; | 
|  | 34 | +      chunk = 'ü'.repeat(len / 2); | 
|  | 35 | +      break; | 
|  | 36 | +    case 'asc': | 
|  | 37 | +      encoding = 'ascii'; | 
|  | 38 | +      chunk = 'x'.repeat(len); | 
|  | 39 | +      break; | 
|  | 40 | +    default: | 
|  | 41 | +      throw new Error(`invalid type: ${type}`); | 
|  | 42 | +  } | 
|  | 43 | + | 
|  | 44 | +  doBenchmark(); | 
|  | 45 | +} | 
|  | 46 | + | 
|  | 47 | +function Writer() { | 
|  | 48 | +  this.received = 0; | 
|  | 49 | +  this.writable = true; | 
|  | 50 | +} | 
|  | 51 | + | 
|  | 52 | +Writer.prototype.write = function(chunk, encoding, cb) { | 
|  | 53 | +  this.received += chunk.length; | 
|  | 54 | + | 
|  | 55 | +  if (typeof encoding === 'function') | 
|  | 56 | +    encoding(); | 
|  | 57 | +  else if (typeof cb === 'function') | 
|  | 58 | +    cb(); | 
|  | 59 | + | 
|  | 60 | +  return true; | 
|  | 61 | +}; | 
|  | 62 | + | 
|  | 63 | +// doesn't matter, never emits anything. | 
|  | 64 | +Writer.prototype.on = function() {}; | 
|  | 65 | +Writer.prototype.once = function() {}; | 
|  | 66 | +Writer.prototype.emit = function() {}; | 
|  | 67 | +Writer.prototype.prependListener = function() {}; | 
|  | 68 | + | 
|  | 69 | + | 
|  | 70 | +function flow() { | 
|  | 71 | +  const dest = this.dest; | 
|  | 72 | +  const res = dest.write(chunk, encoding); | 
|  | 73 | +  if (!res) | 
|  | 74 | +    dest.once('drain', this.flow); | 
|  | 75 | +  else | 
|  | 76 | +    process.nextTick(this.flow); | 
|  | 77 | +} | 
|  | 78 | + | 
|  | 79 | +function Reader() { | 
|  | 80 | +  this.flow = flow.bind(this); | 
|  | 81 | +  this.readable = true; | 
|  | 82 | +} | 
|  | 83 | + | 
|  | 84 | +Reader.prototype.pipe = function(dest) { | 
|  | 85 | +  this.dest = dest; | 
|  | 86 | +  this.flow(); | 
|  | 87 | +  return dest; | 
|  | 88 | +}; | 
|  | 89 | + | 
|  | 90 | + | 
|  | 91 | +function doBenchmark() { | 
|  | 92 | +  const reader = new Reader(); | 
|  | 93 | +  const writer = new Writer(); | 
|  | 94 | + | 
|  | 95 | +  // the actual benchmark. | 
|  | 96 | +  const fakeSocket = new JSStreamWrap(new PassThrough()); | 
|  | 97 | +  bench.start(); | 
|  | 98 | +  reader.pipe(fakeSocket); | 
|  | 99 | +  fakeSocket.pipe(writer); | 
|  | 100 | + | 
|  | 101 | +  setTimeout(function() { | 
|  | 102 | +    const bytes = writer.received; | 
|  | 103 | +    const gbits = (bytes * 8) / (1024 * 1024 * 1024); | 
|  | 104 | +    bench.end(gbits); | 
|  | 105 | +    process.exit(0); | 
|  | 106 | +  }, dur * 1000); | 
|  | 107 | +} | 
0 commit comments