jazelle/bin/cluster.js (103 lines of code) (raw):
// @flow
/*
This file is an entry point for multi-cpu coordination. See commands/batch.js or commands/each.js
*/
const {isMaster, fork} = require('cluster');
const {cpus, tmpdir} = require('os');
const {resolve} = require('path');
const {createWriteStream} = require('fs');
const {parse} = require('../utils/parse-argv.js');
const {getManifest} = require('../utils/get-manifest.js');
const {read, write, remove, exists} = require('../utils/node-helpers.js');
const {groupByDepsets} = require('../utils/group-by-depsets.js');
const {install} = require('../commands/install.js');
const {executeProjectCommand} = require('../utils/execute-project-command.js');
const {getLocalDependencies} = require('../utils/get-local-dependencies.js');
const {setupSymlinks} = require('../utils/setup-symlinks.js');
const {root, plan, index, cores, log} = parse(process.argv.slice(2));
run();
async function run() {
if (isMaster) await runMaster();
else await runWorker();
}
async function runMaster() {
const groups = JSON.parse(plan);
const group = groups[index];
const errors = [];
const availableCores = cores ? parseInt(cores, 10) : cpus().length - 1 || 1;
const {projects} = await getManifest({root});
const metas = await Promise.all(
projects.map(async dir => ({
meta: JSON.parse(await read(`${root}/${dir}/package.json`, 'utf8')),
dir: `${root}/${dir}`,
depth: 0,
}))
);
const payload = groupByDepsets({root, metas, group});
for (const data of payload) {
if (data.length > 0) {
const requiredCores = Math.min(availableCores, data.length);
const workers = [...Array(requiredCores)].map(() => fork(process.env));
await install({
root,
cwd: `${root}/${data[0].dir}`,
frozenLockfile: true,
conservative: true,
});
// setup symlinks
const map = new Map();
await Promise.all(
data.slice(1).map(async item => {
const deps = await getLocalDependencies({
dirs: projects.map(dir => `${root}/${dir}`),
target: resolve(root, item.dir),
});
for (const dep of deps) {
map.set(dep.dir, dep);
}
})
);
await setupSymlinks({root, deps: [...map.values()]});
try {
await Promise.all(
workers.map(async worker => {
while (data.length > 0) {
await new Promise(async (resolve, reject) => {
const command = data.shift();
const log = `${tmpdir()}/${Math.random() * 1e17}`;
if (worker.state === 'dead') worker = fork(process.env);
worker.send({command, log});
worker.once('exit', async () => {
// 3) ...then we collect the error from each worker...
if (await exists(log)) {
const stderr = await read(log, 'utf8');
errors.push({command, stderr});
await remove(log);
}
resolve();
});
});
}
})
);
} finally {
// 4) ...then kill the workers again (because otherwise it may stay up as zombies for whatever reason)...
for (const worker of workers) worker.kill();
}
}
}
// 5) ...finally write the full error log to the master log file that was passed by batch-test-group.js
if (errors.length > 0) await write(log, JSON.stringify(errors));
}
async function runWorker() {
const payload = await new Promise(resolve => {
process.once('message', resolve);
});
if (!payload) return;
const {command, log} = payload;
const {dir, action, args} = command;
const cwd = `${root}/${dir}`;
const stream = createWriteStream(log);
await new Promise(resolve => stream.on('open', resolve));
const stdio = ['inherit', 'inherit', stream];
try {
// 1) if the command fails, throw from this block...
await executeProjectCommand({root, cwd, command: action, args, stdio});
if (await exists(log)) await remove(log); // if command succeeds, don't log errors
process.exit(0);
} catch (e) {
// 2) ...which exits the worker process w/ a log file in disk...
stream.write(`\n${e.stack}`);
process.exit(1);
}
}