Skip to content

Commit

Permalink
reuse processes using a process pool
Browse files Browse the repository at this point in the history
  • Loading branch information
fabiosantoscode committed Nov 3, 2018
1 parent 96ca197 commit 5f73f22
Show file tree
Hide file tree
Showing 12 changed files with 216 additions and 96 deletions.
2 changes: 1 addition & 1 deletion .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ insert_final_newline = true
indent_style = space
indent_size = 2

[{*.js,test/index.sh}]
[{*.js}]
indent_size = 4
3 changes: 3 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,6 @@ install: npm install
before_script: greenkeeper-lockfile-update
after_script: greenkeeper-lockfile-upload
script: npm run test:ci
cache:
directories:
- node_modules
90 changes: 54 additions & 36 deletions src/main/mocha.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import { fork } from 'child_process';
import * as assert from 'assert';
import * as CircularJSON from 'circular-json';
import * as debug from 'debug';
import * as Mocha from 'mocha';
import { resolve as pathResolve } from 'path';

import ProcessPool from './process-pool';
import RunnerMain from './runner';
import TaskManager from './task-manager';
import {
removeDebugArgs,
subprocessParseReviver,
} from './util';

import { DEBUG_SUBPROCESS, SUITE_OWN_OPTIONS } from '../config';
import { SUITE_OWN_OPTIONS } from '../config';
import {
IRetriedTest,
ISubprocessOutputMessage,
Expand All @@ -24,6 +24,7 @@ import {
const debugLog = debug('mocha-parallel-tests');

export default class MochaWrapper extends Mocha {
private pool = new ProcessPool();
private isTypescriptRunMode = false;
private maxParallel: number | undefined;
private requires: string[] = [];
Expand All @@ -50,6 +51,7 @@ export default class MochaWrapper extends Mocha {

setMaxParallel(maxParallel: number) {
this.maxParallel = maxParallel;
this.pool.setMaxParallel(maxParallel);
}

enableExitMode() {
Expand Down Expand Up @@ -136,6 +138,7 @@ export default class MochaWrapper extends Mocha {
};

runner.emitFinishEvents(done);
this.pool.destroyAll();
});

return runner;
Expand All @@ -162,89 +165,90 @@ export default class MochaWrapper extends Mocha {
}

private spawnTestProcess(file: string): Promise<ISubprocessResult> {
return new Promise((resolve) => {
const nodeFlags: string[] = [];
const extension = this.isTypescriptRunMode ? 'ts' : 'js';
const runnerPath = pathResolve(__dirname, `../subprocess/runner.${extension}`);
return new Promise<ISubprocessResult>(async (resolve) => {
const resolvedFilePath = pathResolve(file);

const forkArgs: string[] = ['--test', resolvedFilePath];
const testOptions: {[key: string]: any} = { test: resolvedFilePath };

for (const option of SUITE_OWN_OPTIONS) {
const propValue = this.suite[option]();
// bail is undefined by default, we need to somehow pass its value to the subprocess
forkArgs.push(`--${option}`, propValue === undefined ? false : propValue);
testOptions[option] = propValue === undefined ? false : propValue;
}

for (const requirePath of this.requires) {
forkArgs.push('--require', requirePath);
testOptions.require = requirePath;
}

for (const compilerPath of this.compilers) {
forkArgs.push('--compilers', compilerPath);
}
testOptions.compilers = this.compilers || [];

if (this.options.delay) {
forkArgs.push('--delay');
testOptions.delay = true;
}

if (this.options.grep) {
forkArgs.push('--grep', this.options.grep.toString());
testOptions.grep = this.options.grep.toString();
}

if (this.exitImmediately) {
forkArgs.push('--exit');
testOptions.exit = true;
}

if (this.options.fullStackTrace) {
forkArgs.push('--full-trace');
testOptions.fullStackTrace = true;
}

const test = fork(runnerPath, forkArgs, {
// otherwise `--inspect-brk` and other params will be passed to subprocess
execArgv: process.execArgv.filter(removeDebugArgs),
stdio: ['ipc'],
});

if (this.isTypescriptRunMode) {
nodeFlags.push('--require', 'ts-node/register');
let test;
try {
test = await this.pool.getOrCreate(this.isTypescriptRunMode);
} catch (e) {
throw e;
}

debugLog('Process spawned. You can run it manually with this command:');
debugLog(`node ${nodeFlags.join(' ')} ${runnerPath} ${forkArgs.concat([DEBUG_SUBPROCESS.argument]).join(' ')}`);
test.send(JSON.stringify({ type: 'start', testOptions }));

const events: Array<ISubprocessOutputMessage | ISubprocessRunnerMessage> = [];
let syncedSubprocessData: ISubprocessSyncedData | undefined;
const startedAt = Date.now();

test.on('message', function onMessageHandler({ event, data }) {
function onMessageHandler({ event, data }) {
if (event === 'sync') {
syncedSubprocessData = data;
} else if (event === 'end') {
clean();
resolve({
code: data.code || 0,
events,
execTime: Date.now() - startedAt,
file,
syncedSubprocessData,
});
} else {
assert(event);
events.push({
data,
event,
type: 'runner',
});
}
});
}

test.stdout.on('data', function onStdoutData(data: Buffer) {
function onStdoutData(data: Buffer) {
events.push({
data,
event: undefined,
type: 'stdout',
});
});
}

test.stderr.on('data', function onStderrData(data: Buffer) {
function onStderrData(data: Buffer) {
events.push({
data,
event: undefined,
type: 'stderr',
});
});

test.on('close', (code) => {
}
function onClose(code) {
debugLog(`Process for ${file} exited with code ${code}`);

resolve({
Expand All @@ -254,7 +258,21 @@ export default class MochaWrapper extends Mocha {
file,
syncedSubprocessData,
});
});
}

test.on('message', onMessageHandler);
test.stdout.on('data', onStdoutData);
test.stderr.on('data', onStderrData);
test.on('close', onClose);

function clean() {
test.removeListener('message', onMessageHandler);
test.stdout.removeListener('data', onStdoutData);
test.stderr.removeListener('data', onStderrData);
test.removeListener('close', onClose);
test.destroy();
return null;
}
});
}
}
70 changes: 70 additions & 0 deletions src/main/process-pool.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import { ChildProcess, fork } from 'child_process';
import * as os from 'os';
import { resolve as pathResolve } from 'path';
import { removeDebugArgs } from './util';

interface IMochaProcess extends ChildProcess {
destroy: () => void;
}

export default class ProcessPool {
private maxParallel: number;
private waitingList: Array<(process: IMochaProcess) => void> = [];
private unusedProcesses: IMochaProcess[] = [];
private processes: IMochaProcess[] = [];

constructor() {
this.maxParallel = os.cpus().length;
}

setMaxParallel(n) {
this.maxParallel = n;
}

async getOrCreate(isTypescriptRunMode): Promise<IMochaProcess> {
const extension = isTypescriptRunMode ? 'ts' : 'js';
const runnerPath = pathResolve(__dirname, `../subprocess/runner.${extension}`);

const lastUnusedProcess = this.unusedProcesses.pop();
if (lastUnusedProcess) {
return lastUnusedProcess;
}

if (this.processes.length >= this.maxParallel) {
const process: IMochaProcess = await new Promise<IMochaProcess>((resolve) => {
this.waitingList.push((proc: IMochaProcess) => {
resolve(proc);
});
});
return process;
}
return this.create(runnerPath, {
// otherwise `--inspect-brk` and other params will be passed to subprocess
execArgv: process.execArgv.filter(removeDebugArgs),
stdio: ['ipc'],
});
}

create(runnerPath, opt) {
const process = fork(runnerPath, [], opt) as IMochaProcess;

this.processes.push(process);

process.destroy = () => {
const nextOnWaitingList = this.waitingList.pop();
if (nextOnWaitingList) {
nextOnWaitingList(process);
} else {
this.unusedProcesses.push(process);
}
};

return process;
}

destroyAll() {
this.processes.forEach((proc) => {
proc.kill();
});
}
}
Loading

0 comments on commit 5f73f22

Please sign in to comment.