Skip to content

Commit 9ab7f71

Browse files
committed
add WorkerPool wrapper, fix --grep
- created a wrapper around the `workerpool` module to help decouple and give us a place to do serialization before invoking the worker's `run` method - do not warn about dubious max worker count (`jobs` option) since it's not something we can be sure is an actual problem (make it debug output instead) - adds [serialize-javascript](https://npm.im/serialize-javascript) module - allows easier transmission of non-JSON-compatible objects over IPC, e.g., a `RegExp` (for `--grep`) - requires use of `eval()` to deserialize. I'm not too worried about this, but I think I need to play with it more - this avoids more custom serialization code, but is not especially helpful when serializing `Test`, `Suite` and `Hook` instances - in the integration test helper code, if we provide an _absolute path_, do not make a guess about where the fixture is when running `runMocha` or `runMochaJSON`. this makes globs easier to use
1 parent 336226d commit 9ab7f71

File tree

14 files changed

+431
-172
lines changed

14 files changed

+431
-172
lines changed

.eslintrc.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ overrides:
3131
- 'lib/worker.js'
3232
- 'lib/reporters/buffered.js'
3333
- 'lib/serializer.js'
34+
- 'lib/pool.js'
3435
- 'test/reporters/buffered.spec.js'
3536
parserOptions:
3637
ecmaVersion: 2018

karma.conf.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ module.exports = config => {
4141
.ignore('./lib/reporters/buffered.js')
4242
.ignore('./lib/serializer.js')
4343
.ignore('./lib/worker.js')
44+
.ignore('./lib/pool.js')
4445
.on('bundled', (err, content) => {
4546
if (err) {
4647
throw err;

lib/buffered-runner.js

Lines changed: 10 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,17 @@
11
'use strict';
22

33
const allSettled = require('promise.allsettled');
4-
const os = require('os');
54
const Runner = require('./runner');
65
const {EVENT_RUN_BEGIN, EVENT_RUN_END} = Runner.constants;
76
const debug = require('debug')('mocha:parallel:buffered-runner');
8-
const workerpool = require('workerpool');
9-
const {deserialize} = require('./serializer');
10-
const WORKER_PATH = require.resolve('./worker.js');
7+
const {WorkerPool} = require('./pool');
118
const {setInterval, clearInterval} = global;
12-
const {createMap, warn} = require('./utils');
9+
const {createMap} = require('./utils');
10+
11+
/**
12+
* Outputs a debug statement with worker stats
13+
* @param {WorkerPool} pool - Worker pool
14+
*/
1315
const debugStats = pool => {
1416
const {totalWorkers, busyWorkers, idleWorkers, pendingTasks} = pool.stats();
1517
debug(
@@ -108,36 +110,15 @@ class BufferedRunner extends Runner {
108110
let pool;
109111

110112
try {
111-
const cpuCount = os.cpus().length;
112-
const maxJobs = cpuCount - 1;
113-
const jobs = Math.max(1, Math.min(options.jobs || maxJobs, maxJobs));
114-
if (maxJobs < 2) {
115-
warn(
116-
`(Mocha) not enough CPU cores available (${cpuCount}) to run multiple jobs; avoid --parallel on this machine`
117-
);
118-
} else if (options.jobs && options.jobs > maxJobs) {
119-
warn(
120-
`(Mocha) ${options.jobs} concurrent jobs requested, but only enough cores available for ${maxJobs}`
121-
);
122-
}
123-
debug(
124-
'run(): starting worker pool of size %d, using node args: %s',
125-
jobs,
126-
process.execArgv.join(' ')
127-
);
128-
pool = workerpool.pool(WORKER_PATH, {
129-
workerType: 'process',
130-
maxWorkers: jobs,
131-
forkOpts: {execArgv: process.execArgv}
132-
});
113+
pool = WorkerPool.create({maxWorkers: options.jobs});
133114

134115
sigIntListener = async () => {
135116
if (this._state !== ABORTING) {
136117
debug('run(): caught a SIGINT');
137118
this._state = ABORTING;
138119

139120
try {
140-
debug('run(): shutting down %d (max) workers', jobs);
121+
debug('run(): force-terminating worker pool');
141122
await pool.terminate(true);
142123
} catch (err) {
143124
console.error(
@@ -155,10 +136,6 @@ class BufferedRunner extends Runner {
155136

156137
process.once('SIGINT', sigIntListener);
157138

158-
// the "pool proxy" object is essentially just syntactic sugar to call a
159-
// worker's procedure as one would a regular function.
160-
const poolProxy = await pool.proxy();
161-
162139
debugInterval = setInterval(
163140
() => debugStats(pool),
164141
DEBUG_STATS_INTERVAL
@@ -174,12 +151,11 @@ class BufferedRunner extends Runner {
174151
files.map(async file => {
175152
debug('run(): enqueueing test file %s', file);
176153
try {
177-
const result = await poolProxy.run(file, options);
154+
const {failureCount, events} = await pool.run(file, options);
178155
if (this._state === BAILED) {
179156
// short-circuit after a graceful bail
180157
return;
181158
}
182-
const {failureCount, events} = deserialize(result);
183159
debug(
184160
'run(): completed run of file %s; %d failures / %d events',
185161
file,

lib/pool.js

Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
'use strict';
2+
3+
const serializeJavascript = require('serialize-javascript');
4+
const workerpool = require('workerpool');
5+
const {deserialize} = require('./serializer');
6+
const debug = require('debug')('mocha:parallel:pool');
7+
const {cpus} = require('os');
8+
const {createInvalidArgumentTypeError} = require('./errors');
9+
10+
const WORKER_PATH = require.resolve('./worker.js');
11+
12+
/**
13+
* A mapping of Mocha `Options` objects to serialized values.
14+
*
15+
* This is helpful because we tend to same the same options over and over
16+
* over IPC.
17+
* @type {WeakMap<Options,string>}
18+
*/
19+
let optionsCache = new WeakMap();
20+
21+
/**
22+
* Count of CPU cores
23+
*/
24+
const CPU_COUNT = cpus().length;
25+
26+
/**
27+
* Default max number of workers.
28+
*
29+
* We are already using one core for the main process, so assume only _n - 1_ are left.
30+
*
31+
* This is a reasonable default, but YMMV.
32+
*/
33+
const DEFAULT_MAX_WORKERS = CPU_COUNT - 1;
34+
35+
/**
36+
* These options are passed into the [workerpool](https://npm.im/workerpool) module.
37+
* @type {Partial<WorkerPoolOptions>}
38+
*/
39+
const WORKER_POOL_DEFAULT_OPTS = {
40+
// use child processes, not worker threads!
41+
workerType: 'process',
42+
// ensure the same flags sent to `node` for this `mocha` invocation are passed
43+
// along to children
44+
forkOpts: {execArgv: process.execArgv},
45+
maxWorkers: DEFAULT_MAX_WORKERS
46+
};
47+
48+
/**
49+
* A wrapper around a third-party worker pool implementation.
50+
*/
51+
class WorkerPool {
52+
constructor(opts = WORKER_POOL_DEFAULT_OPTS) {
53+
const maxWorkers = Math.max(1, opts.maxWorkers);
54+
55+
if (maxWorkers < 2) {
56+
debug(
57+
'not enough CPU cores available (%d) to run multiple jobs; avoid --parallel on this machine',
58+
CPU_COUNT
59+
);
60+
} else if (maxWorkers >= CPU_COUNT) {
61+
debug(
62+
'%d concurrent job(s) requested, but only %d core(s) available',
63+
maxWorkers,
64+
CPU_COUNT
65+
);
66+
}
67+
debug(
68+
'run(): starting worker pool of max size %d, using node args: %s',
69+
maxWorkers,
70+
process.execArgv.join(' ')
71+
);
72+
73+
this.options = Object.assign({}, opts, {maxWorkers});
74+
this._pool = workerpool.pool(WORKER_PATH, this.options);
75+
}
76+
77+
/**
78+
* Terminates all workers in the pool.
79+
* @param {boolean} [force] - Whether to force-kill workers. By default, lets workers finish their current task before termination.
80+
* @private
81+
* @returns {Promise<void>}
82+
*/
83+
async terminate(force = false) {
84+
return this._pool.terminate(force);
85+
}
86+
87+
/**
88+
* Adds a test file run to the worker pool queue for execution by a worker process.
89+
*
90+
* Handles serialization/deserialization.
91+
*
92+
* @param {string} filepath - Filepath of test
93+
* @param {Options} [options] - Options for Mocha instance
94+
* @private
95+
* @returns {Promise<SerializedWorkerResult>}
96+
*/
97+
async run(filepath, options = {}) {
98+
if (!filepath || typeof filepath !== 'string') {
99+
throw createInvalidArgumentTypeError(
100+
'Expected a non-empty filepath',
101+
'filepath',
102+
'string'
103+
);
104+
}
105+
const serializedOptions = WorkerPool.serializeOptions(options);
106+
const result = await this._pool.exec('run', [filepath, serializedOptions]);
107+
return deserialize(result);
108+
}
109+
110+
/**
111+
* Returns stats about the state of the worker processes in the pool.
112+
*
113+
* Used for debugging.
114+
*
115+
* @private
116+
*/
117+
stats() {
118+
return this._pool.stats();
119+
}
120+
121+
/**
122+
* Instantiates a {@link WorkerPool}.
123+
*/
124+
static create(...args) {
125+
return new WorkerPool(...args);
126+
}
127+
128+
/**
129+
* Given Mocha options object `opts`, serialize into a format suitable for
130+
* transmission over IPC.
131+
*
132+
* @param {Options} [opts] - Mocha options
133+
* @private
134+
* @returns {string} Serialized options
135+
*/
136+
static serializeOptions(opts = {}) {
137+
if (!optionsCache.has(opts)) {
138+
const serialized = serializeJavascript(opts, {
139+
unsafe: true, // this means we don't care about XSS
140+
ignoreFunction: true // do not serialize functions
141+
});
142+
optionsCache.set(opts, serialized);
143+
debug(
144+
'serializeOptions(): serialized options %O to: %s',
145+
opts,
146+
serialized
147+
);
148+
}
149+
return optionsCache.get(opts);
150+
}
151+
152+
/**
153+
* Resets internal cache of serialized options objects.
154+
*
155+
* For testing/debugging
156+
* @private
157+
*/
158+
static resetOptionsCache() {
159+
optionsCache = new WeakMap();
160+
}
161+
}
162+
163+
exports.WorkerPool = WorkerPool;

lib/test.js

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ Test.prototype.clone = function() {
7272
};
7373

7474
/**
75-
* Returns an object suitable for IPC.
75+
* Returns an minimal object suitable for transmission over IPC.
7676
* Functions are represented by keys beginning with `$$`.
7777
* @returns {Object}
7878
*/
@@ -93,6 +93,7 @@ Test.prototype.serialize = function serialize() {
9393
speed: this.speed,
9494
state: this.state,
9595
title: this.title,
96-
type: this.type
96+
type: this.type,
97+
file: this.file
9798
};
9899
};

lib/worker.js

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
'use strict';
22

3-
const {createInvalidArgumentTypeError} = require('./errors');
3+
const {
4+
createInvalidArgumentTypeError,
5+
createInvalidArgumentValueError
6+
} = require('./errors');
47
const workerpool = require('workerpool');
58
const Mocha = require('./mocha');
6-
const {
7-
handleRequires,
8-
validatePlugin,
9-
loadRootHooks
10-
} = require('./cli/run-helpers');
9+
const {handleRequires, validatePlugin} = require('./cli/run-helpers');
1110
const debug = require('debug')(`mocha:parallel:worker:${process.pid}`);
1211
const {serialize} = require('./serializer');
1312
const {setInterval, clearInterval} = global;
@@ -34,20 +33,23 @@ if (workerpool.isMainThread) {
3433
* @param {Options} argv - Command-line options
3534
*/
3635
let bootstrap = async argv => {
37-
const rawRootHooks = handleRequires(argv.require);
38-
rootHooks = await loadRootHooks(rawRootHooks);
36+
handleRequires(argv.require);
37+
// const rawRootHooks = handleRequires(argv.require);
38+
// rootHooks = await loadRootHooks(rawRootHooks);
3939
validatePlugin(argv, 'ui', Mocha.interfaces);
4040
bootstrap = () => {};
41+
debug('bootstrap(): finished with args: %O', argv);
4142
};
4243

4344
/**
4445
* Runs a single test file in a worker thread.
4546
* @param {string} filepath - Filepath of test file
46-
* @param {Options} [argv] - Parsed command-line options object
47+
* @param {string} [serializedOptions] - **Serialized** options. This string will be eval'd!
48+
* @see https://npm.im/serialize-javascript
4749
* @returns {Promise<{failures: number, events: BufferedEvent[]}>} - Test
4850
* failure count and list of events.
4951
*/
50-
async function run(filepath, argv = {ui: 'bdd'}) {
52+
async function run(filepath, serializedOptions = '{}') {
5153
if (!filepath) {
5254
throw createInvalidArgumentTypeError(
5355
'Expected a non-empty "filepath" argument',
@@ -58,7 +60,27 @@ async function run(filepath, argv = {ui: 'bdd'}) {
5860

5961
debug('run(): running test file %s', filepath);
6062

61-
const opts = Object.assign(argv, {
63+
if (typeof serializedOptions !== 'string') {
64+
throw createInvalidArgumentTypeError(
65+
'run() expects second parameter to be a string which was serialized by the `serialize-javascript` module',
66+
'serializedOptions',
67+
'string'
68+
);
69+
}
70+
let argv;
71+
try {
72+
// eslint-disable-next-line no-eval
73+
argv = eval('(' + serializedOptions + ')');
74+
} catch (err) {
75+
throw createInvalidArgumentValueError(
76+
'run() was unable to deserialize the options',
77+
'serializedOptions',
78+
serializedOptions
79+
);
80+
}
81+
82+
debug('run(): deserialized options to %O', argv);
83+
const opts = Object.assign({ui: 'bdd'}, argv, {
6284
// workers only use the `Buffered` reporter.
6385
reporter: BUFFERED_REPORTER_PATH,
6486
// if this was true, it would cause infinite recursion.

package-lock.json

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package-scripts.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ function test(testName, mochaParams) {
3232
module.exports = {
3333
scripts: {
3434
build: {
35-
script: `browserify -e browser-entry.js --plugin ./scripts/dedefine --ignore './lib/cli/*.js' --ignore "./lib/esm-utils.js" --ignore 'chokidar' --ignore 'fs' --ignore 'glob' --ignore 'path' --ignore 'supports-color' --ignore './lib/buffered-runner.js' --ignore './lib/serializer.js' --ignore './lib/reporters/buffered.js' --ignore './lib/worker.js' -o mocha.js`,
35+
script: `browserify -e browser-entry.js --plugin ./scripts/dedefine --ignore './lib/cli/*.js' --ignore "./lib/esm-utils.js" --ignore 'chokidar' --ignore 'fs' --ignore 'glob' --ignore 'path' --ignore 'supports-color' --ignore './lib/buffered-runner.js' --ignore './lib/serializer.js' --ignore './lib/reporters/buffered.js' --ignore './lib/worker.js' --ignore './lib/pool.js' -o mocha.js`,
3636
description: 'Build browser bundle'
3737
},
3838
lint: {

package.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
"ms": "2.1.2",
6868
"object.assign": "4.1.0",
6969
"promise.allsettled": "1.0.2",
70+
"serialize-javascript": "3.0.0",
7071
"strip-json-comments": "3.0.1",
7172
"supports-color": "7.1.0",
7273
"which": "2.0.2",
@@ -164,7 +165,8 @@
164165
"./lib/serializer.js": false,
165166
"./lib/reporters/buffered.js": false,
166167
"./lib/buffered-reporter.js": false,
167-
"./lib/worker.js": false
168+
"./lib/worker.js": false,
169+
"./lib/pool.js": false
168170
},
169171
"prettier": {
170172
"singleQuote": true,

0 commit comments

Comments
 (0)