diff --git a/lib/internal/main/worker_thread.js b/lib/internal/main/worker_thread.js index 62a4bb1b7dd77e..94d0e613e8ce38 100644 --- a/lib/internal/main/worker_thread.js +++ b/lib/internal/main/worker_thread.js @@ -10,30 +10,118 @@ const { } = require('internal/bootstrap/pre_execution'); const { - getEnvMessagePort, - threadId + threadId, + getEnvMessagePort } = internalBinding('worker'); const { - createMessageHandler, - createWorkerFatalExeception -} = require('internal/process/worker_thread_only'); + messageTypes: { + // Messages that may be received by workers + LOAD_SCRIPT, + // Messages that may be posted from workers + UP_AND_RUNNING, + ERROR_MESSAGE, + COULD_NOT_SERIALIZE_ERROR, + // Messages that may be either received or posted + STDIO_PAYLOAD, + STDIO_WANTS_MORE_DATA, + }, + kStdioWantsMoreDataCallback +} = require('internal/worker/io'); +const { + fatalException: originalFatalException +} = require('internal/process/execution'); + +const publicWorker = require('worker_threads'); const debug = require('util').debuglog('worker'); -debug(`[${threadId}] is setting up worker child environment`); -function prepareUserCodeExecution() { - initializeClusterIPC(); - initializeESMLoader(); - loadPreloadModules(); -} +debug(`[${threadId}] is setting up worker child environment`); // Set up the message port and start listening const port = getEnvMessagePort(); -port.on('message', createMessageHandler(port, prepareUserCodeExecution)); -port.start(); + +port.on('message', (message) => { + if (message.type === LOAD_SCRIPT) { + const { + filename, + doEval, + workerData, + publicPort, + manifestSrc, + manifestURL, + hasStdin + } = message; + if (manifestSrc) { + require('internal/process/policy').setup(manifestSrc, manifestURL); + } + initializeClusterIPC(); + initializeESMLoader(); + loadPreloadModules(); + publicWorker.parentPort = publicPort; + publicWorker.workerData = workerData; + + if (!hasStdin) + process.stdin.push(null); + + debug(`[${threadId}] starts worker script ${filename} ` + + `(eval = ${eval}) at cwd = ${process.cwd()}`); + port.unref(); + port.postMessage({ type: UP_AND_RUNNING }); + if (doEval) { + const { evalScript } = require('internal/process/execution'); + evalScript('[worker eval]', filename); + } else { + process.argv[1] = filename; // script filename + require('module').runMain(); + } + return; + } else if (message.type === STDIO_PAYLOAD) { + const { stream, chunk, encoding } = message; + process[stream].push(chunk, encoding); + return; + } else if (message.type === STDIO_WANTS_MORE_DATA) { + const { stream } = message; + process[stream][kStdioWantsMoreDataCallback](); + return; + } + + require('assert').fail(`Unknown worker message type ${message.type}`); +}); // Overwrite fatalException -process._fatalException = createWorkerFatalExeception(port); +process._fatalException = (error) => { + debug(`[${threadId}] gets fatal exception`); + let caught = false; + try { + caught = originalFatalException.call(this, error); + } catch (e) { + error = e; + } + debug(`[${threadId}] fatal exception caught = ${caught}`); + + if (!caught) { + let serialized; + try { + const { serializeError } = require('internal/error-serdes'); + serialized = serializeError(error); + } catch {} + debug(`[${threadId}] fatal exception serialized = ${!!serialized}`); + if (serialized) + port.postMessage({ + type: ERROR_MESSAGE, + error: serialized + }); + else + port.postMessage({ type: COULD_NOT_SERIALIZE_ERROR }); + + const { clearAsyncIdStack } = require('internal/async_hooks'); + clearAsyncIdStack(); + + process.exit(); + } +}; markBootstrapComplete(); + +port.start(); diff --git a/lib/internal/process/worker_thread_only.js b/lib/internal/process/worker_thread_only.js index 2171d2b586a4ca..f05d5e932bebf2 100644 --- a/lib/internal/process/worker_thread_only.js +++ b/lib/internal/process/worker_thread_only.js @@ -3,13 +3,10 @@ // This file contains process bootstrappers that can only be // run in the worker thread. const { - getEnvMessagePort, - threadId + getEnvMessagePort } = internalBinding('worker'); const { - messageTypes, - kStdioWantsMoreDataCallback, kWaitingStreams, ReadableWorkerStdio, WritableWorkerStdio @@ -18,15 +15,6 @@ const { const { codes: { ERR_WORKER_UNSUPPORTED_OPERATION } } = require('internal/errors'); - -let debuglog; -function debug(...args) { - if (!debuglog) { - debuglog = require('util').debuglog('worker'); - } - return debuglog(...args); -} - const workerStdio = {}; function initializeWorkerStdio() { @@ -43,97 +31,6 @@ function initializeWorkerStdio() { }; } -function createMessageHandler(port, prepareUserCodeExecution) { - const publicWorker = require('worker_threads'); - - return function(message) { - if (message.type === messageTypes.LOAD_SCRIPT) { - const { - filename, - doEval, - workerData, - publicPort, - manifestSrc, - manifestURL, - hasStdin - } = message; - if (manifestSrc) { - require('internal/process/policy').setup(manifestSrc, manifestURL); - } - prepareUserCodeExecution(); - publicWorker.parentPort = publicPort; - publicWorker.workerData = workerData; - - if (!hasStdin) - workerStdio.stdin.push(null); - - debug(`[${threadId}] starts worker script ${filename} ` + - `(eval = ${eval}) at cwd = ${process.cwd()}`); - port.unref(); - port.postMessage({ type: messageTypes.UP_AND_RUNNING }); - if (doEval) { - const { evalScript } = require('internal/process/execution'); - evalScript('[worker eval]', filename); - } else { - process.argv[1] = filename; // script filename - require('module').runMain(); - } - return; - } else if (message.type === messageTypes.STDIO_PAYLOAD) { - const { stream, chunk, encoding } = message; - workerStdio[stream].push(chunk, encoding); - return; - } else if (message.type === messageTypes.STDIO_WANTS_MORE_DATA) { - const { stream } = message; - workerStdio[stream][kStdioWantsMoreDataCallback](); - return; - } - - require('assert').fail(`Unknown worker message type ${message.type}`); - }; -} - -// XXX(joyeecheung): this has to be returned as an anonymous function -// wrapped in a closure, see the comment of the original -// process._fatalException in lib/internal/process/execution.js -function createWorkerFatalExeception(port) { - const { - fatalException: originalFatalException - } = require('internal/process/execution'); - - return (error) => { - debug(`[${threadId}] gets fatal exception`); - let caught = false; - try { - caught = originalFatalException.call(this, error); - } catch (e) { - error = e; - } - debug(`[${threadId}] fatal exception caught = ${caught}`); - - if (!caught) { - let serialized; - try { - const { serializeError } = require('internal/error-serdes'); - serialized = serializeError(error); - } catch {} - debug(`[${threadId}] fatal exception serialized = ${!!serialized}`); - if (serialized) - port.postMessage({ - type: messageTypes.ERROR_MESSAGE, - error: serialized - }); - else - port.postMessage({ type: messageTypes.COULD_NOT_SERIALIZE_ERROR }); - - const { clearAsyncIdStack } = require('internal/async_hooks'); - clearAsyncIdStack(); - - process.exit(); - } - }; -} - // The execution of this function itself should not cause any side effects. function wrapProcessMethods(binding) { function umask(mask) { @@ -150,7 +47,5 @@ function wrapProcessMethods(binding) { module.exports = { initializeWorkerStdio, - createMessageHandler, - createWorkerFatalExeception, wrapProcessMethods };