diff --git a/doc/api/errors.md b/doc/api/errors.md
index be67744f28799f..3680d742c547fd 100644
--- a/doc/api/errors.md
+++ b/doc/api/errors.md
@@ -1323,6 +1323,13 @@ but not provided in the `transferList` for that call.
An [ES6 module][] could not be resolved.
+
+### ERR_MISSING_PLATFORM_FOR_WORKER
+
+The V8 platform used by this instance of Node.js does not support creating
+Workers. This is caused by lack of embedder support for Workers. In particular,
+this error will not occur with standard builds of Node.js.
+
### ERR_MODULE_RESOLUTION_LEGACY
@@ -1723,6 +1730,22 @@ The fulfilled value of a linking promise is not a `vm.Module` object.
The current module's status does not allow for this operation. The specific
meaning of the error depends on the specific function.
+
+### ERR_WORKER_NEED_ABSOLUTE_PATH
+
+The path for the main script of a worker is not an absolute path.
+
+
+### ERR_WORKER_UNSERIALIZABLE_ERROR
+
+All attempts at serializing an uncaught exception from a worker thread failed.
+
+
+### ERR_WORKER_UNSUPPORTED_EXTENSION
+
+The pathname used for the main script of a worker has an
+unknown file extension.
+
### ERR_ZLIB_INITIALIZATION_FAILED
diff --git a/doc/api/process.md b/doc/api/process.md
index 8205efbd780300..0a9c52a44c3e23 100644
--- a/doc/api/process.md
+++ b/doc/api/process.md
@@ -410,6 +410,8 @@ added: v0.7.0
The `process.abort()` method causes the Node.js process to exit immediately and
generate a core file.
+This feature is not available in [`Worker`][] threads.
+
## process.arch
+
+* {boolean}
+
+Is `true` if this code is not running inside of a [`Worker`][] thread.
+
+## worker.parentPort
+
+
+* {null|MessagePort}
+
+If this thread was spawned as a [`Worker`][], this will be a [`MessagePort`][]
+allowing communication with the parent thread. Messages sent using
+`parentPort.postMessage()` will be available in the parent thread
+using `worker.on('message')`, and messages sent from the parent thread
+using `worker.postMessage()` will be available in this thread using
+`parentPort.on('message')`.
+
+## worker.threadId
+
+
+* {integer}
+
+An integer identifier for the current thread. On the corresponding worker object
+(if there is any), it is available as [`worker.threadId`][].
+
+## worker.workerData
+
+
+An arbitrary JavaScript value that contains a clone of the data passed
+to this thread’s `Worker` constructor.
+
## Class: MessageChannel
+
+The `Worker` class represents an independent JavaScript execution thread.
+Most Node.js APIs are available inside of it.
+
+Notable differences inside a Worker environment are:
+
+- The [`process.stdin`][], [`process.stdout`][] and [`process.stderr`][]
+ properties are set to `null`.
+- The [`require('worker').isMainThread`][] property is set to `false`.
+- The [`require('worker').parentPort`][] message port is available,
+- [`process.exit()`][] does not stop the whole program, just the single thread,
+ and [`process.abort()`][] is not available.
+- [`process.chdir()`][] and `process` methods that set group or user ids
+ are not available.
+- [`process.env`][] is a read-only reference to the environment variables.
+- [`process.title`][] cannot be modified.
+- Signals will not be delivered through [`process.on('...')`][Signals events].
+- Execution may stop at any point as a result of [`worker.terminate()`][]
+ being invoked.
+- IPC channels from parent processes are not accessible.
+
+Currently, the following differences also exist until they are addressed:
+
+- The [`inspector`][] module is not available yet.
+- Native addons are not supported yet.
+
+Creating `Worker` instances inside of other `Worker`s is possible.
+
+Like [Web Workers][] and the [`cluster` module][], two-way communication can be
+achieved through inter-thread message passing. Internally, a `Worker` has a
+built-in pair of [`MessagePort`][]s that are already associated with each other
+when the `Worker` is created. While the `MessagePort` object on the parent side
+is not directly exposed, its functionalities are exposed through
+[`worker.postMessage()`][] and the [`worker.on('message')`][] event
+on the `Worker` object for the parent thread.
+
+To create custom messaging channels (which is encouraged over using the default
+global channel because it facilitates separation of concerns), users can create
+a `MessageChannel` object on either thread and pass one of the
+`MessagePort`s on that `MessageChannel` to the other thread through a
+pre-existing channel, such as the global one.
+
+See [`port.postMessage()`][] for more information on how messages are passed,
+and what kind of JavaScript values can be successfully transported through
+the thread barrier.
+
+For example:
+
+```js
+const assert = require('assert');
+const { Worker, MessageChannel, MessagePort, isMainThread } = require('worker');
+if (isMainThread) {
+ const worker = new Worker(__filename);
+ const subChannel = new MessageChannel();
+ worker.postMessage({ hereIsYourPort: subChannel.port1 }, [subChannel.port1]);
+ subChannel.port2.on('message', (value) => {
+ console.log('received:', value);
+ });
+} else {
+ require('worker').once('workerMessage', (value) => {
+ assert(value.hereIsYourPort instanceof MessagePort);
+ value.hereIsYourPort.postMessage('the worker is sending this');
+ value.hereIsYourPort.close();
+ });
+}
+```
+
+### new Worker(filename, options)
+
+* `filename` {string} The absolute path to the Worker’s main script.
+ If `options.eval` is true, this is a string containing JavaScript code rather
+ than a path.
+* `options` {Object}
+ * `eval` {boolean} If true, interpret the first argument to the constructor
+ as a script that is executed once the worker is online.
+ * `data` {any} Any JavaScript value that will be cloned and made
+ available as [`require('worker').workerData`][]. The cloning will occur as
+ described in the [HTML structured clone algorithm][], and an error will be
+ thrown if the object cannot be cloned (e.g. because it contains
+ `function`s).
+
+### Event: 'error'
+
+
+* `err` {Error}
+
+The `'error'` event is emitted if the worker thread throws an uncaught
+exception. In that case, the worker will be terminated.
+
+### Event: 'exit'
+
+
+* `exitCode` {integer}
+
+The `'exit'` event is emitted once the worker has stopped. If the worker
+exited by calling [`process.exit()`][], the `exitCode` parameter will be the
+passed exit code. If the worker was terminated, the `exitCode` parameter will
+be `1`.
+
+### Event: 'message'
+
+
+* `value` {any} The transmitted value
+
+The `'message'` event is emitted when the worker thread has invoked
+[`require('worker').postMessage()`][]. See the [`port.on('message')`][] event
+for more details.
+
+### Event: 'online'
+
+
+The `'online'` event is emitted when the worker thread has started executing
+JavaScript code.
+
+### worker.postMessage(value[, transferList])
+
+
+* `value` {any}
+* `transferList` {Object[]}
+
+Send a message to the worker that will be received via
+[`require('worker').on('workerMessage')`][]. See [`port.postMessage()`][] for
+more details.
+
+### worker.ref()
+
+
+Opposite of `unref()`, calling `ref()` on a previously `unref()`ed worker will
+*not* let the program exit if it's the only active handle left (the default
+behavior). If the worker is `ref()`ed, calling `ref()` again will have
+no effect.
+
+### worker.terminate([callback])
+
+
+* `callback` {Function}
+
+Stop all JavaScript execution in the worker thread as soon as possible.
+`callback` is an optional function that is invoked once this operation is known
+to have completed.
+
+**Warning**: Currently, not all code in the internals of Node.js is prepared to
+expect termination at arbitrary points in time and may crash if it encounters
+that condition. Consequently, you should currently only call `.terminate()` if
+it is known that the Worker thread is not accessing Node.js core modules other
+than what is exposed in the `worker` module.
+
+### worker.threadId
+
+
+* {integer}
+
+An integer identifier for the referenced thread. Inside the worker thread,
+it is available as [`require('worker').threadId`][].
+
+### worker.unref()
+
+
+Calling `unref()` on a worker will allow the thread to exit if this is the only
+active handle in the event system. If the worker is already `unref()`ed calling
+`unref()` again will have no effect.
+
[`Buffer`]: buffer.html
-[child processes]: child_process.html
[`EventEmitter`]: events.html
[`MessagePort`]: #worker_class_messageport
[`port.postMessage()`]: #worker_port_postmessage_value_transferlist
+[`Worker`]: #worker_class_worker
+[`worker.terminate()`]: #worker_worker_terminate_callback
+[`worker.postMessage()`]: #worker_worker_postmessage_value_transferlist_1
+[`worker.on('message')`]: #worker_event_message_1
+[`worker.threadId`]: #worker_worker_threadid_1
+[`port.on('message')`]: #worker_event_message
+[`process.exit()`]: process.html#process_process_exit_code
+[`process.abort()`]: process.html#process_process_abort
+[`process.chdir()`]: process.html#process_process_chdir_directory
+[`process.env`]: process.html#process_process_env
+[`process.stdin`]: process.html#process_process_stdin
+[`process.stderr`]: process.html#process_process_stderr
+[`process.stdout`]: process.html#process_process_stdout
+[`process.title`]: process.html#process_process_title
+[`require('worker').workerData`]: #worker_worker_workerdata
+[`require('worker').on('workerMessage')`]: #worker_event_workermessage
+[`require('worker').postMessage()`]: #worker_worker_postmessage_value_transferlist
+[`require('worker').isMainThread`]: #worker_worker_ismainthread
+[`require('worker').threadId`]: #worker_worker_threadid
+[`cluster` module]: cluster.html
+[`inspector`]: inspector.html
[v8.serdes]: v8.html#v8_serialization_api
[`SharedArrayBuffer`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/SharedArrayBuffer
+[Signals events]: process.html#process_signal_events
[`Uint8Array`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Uint8Array
[browser `MessagePort`]: https://developer.mozilla.org/en-US/docs/Web/API/MessagePort
+[child processes]: child_process.html
[HTML structured clone algorithm]: https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm
+[Web Workers]: https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API
diff --git a/lib/inspector.js b/lib/inspector.js
index 3285c1040a7132..f4ec71fd6c2105 100644
--- a/lib/inspector.js
+++ b/lib/inspector.js
@@ -12,7 +12,7 @@ const {
const util = require('util');
const { Connection, open, url } = process.binding('inspector');
-if (!Connection)
+if (!Connection || !require('internal/worker').isMainThread)
throw new ERR_INSPECTOR_NOT_AVAILABLE();
const connectionSymbol = Symbol('connectionProperty');
diff --git a/lib/internal/bootstrap/node.js b/lib/internal/bootstrap/node.js
index 6477c2d8282f43..4817ec110a99e5 100644
--- a/lib/internal/bootstrap/node.js
+++ b/lib/internal/bootstrap/node.js
@@ -24,6 +24,7 @@
_shouldAbortOnUncaughtToggle },
{ internalBinding, NativeModule }) {
const exceptionHandlerState = { captureFn: null };
+ const isMainThread = internalBinding('worker').threadId === 0;
function startup() {
const EventEmitter = NativeModule.require('events');
@@ -100,7 +101,9 @@
NativeModule.require('internal/inspector_async_hook').setup();
}
- _process.setupChannel();
+ if (isMainThread)
+ _process.setupChannel();
+
_process.setupRawDebug(_rawDebug);
const browserGlobals = !process._noBrowserGlobals;
@@ -175,8 +178,11 @@
// are running from a script and running the REPL - but there are a few
// others like the debugger or running --eval arguments. Here we decide
// which mode we run in.
-
- if (NativeModule.exists('_third_party_main')) {
+ if (internalBinding('worker').getEnvMessagePort() !== undefined) {
+ // This means we are in a Worker context, and any script execution
+ // will be directed by the worker module.
+ NativeModule.require('internal/worker').setupChild(evalScript);
+ } else if (NativeModule.exists('_third_party_main')) {
// To allow people to extend Node in different ways, this hook allows
// one to drop a file lib/_third_party_main.js into the build
// directory which will be executed instead of Node's normal loading.
@@ -542,7 +548,7 @@
return `process.binding('inspector').callAndPauseOnStart(${fn}, {})`;
}
- function evalScript(name) {
+ function evalScript(name, body = wrapForBreakOnFirstLine(process._eval)) {
const CJSModule = NativeModule.require('internal/modules/cjs/loader');
const path = NativeModule.require('path');
const cwd = tryGetCwd(path);
@@ -550,7 +556,6 @@
const module = new CJSModule(name);
module.filename = path.join(cwd, name);
module.paths = CJSModule._nodeModulePaths(cwd);
- const body = wrapForBreakOnFirstLine(process._eval);
const script = `global.__filename = ${JSON.stringify(name)};\n` +
'global.exports = exports;\n' +
'global.module = module;\n' +
diff --git a/lib/internal/errors.js b/lib/internal/errors.js
index b9db40abd3b4c9..1fb90b37f88642 100644
--- a/lib/internal/errors.js
+++ b/lib/internal/errors.js
@@ -851,4 +851,9 @@ E('ERR_VM_MODULE_NOT_LINKED',
E('ERR_VM_MODULE_NOT_MODULE',
'Provided module is not an instance of Module', Error);
E('ERR_VM_MODULE_STATUS', 'Module status %s', Error);
+E('ERR_WORKER_NEED_ABSOLUTE_PATH',
+ 'The worker script filename must be an absolute path. Received "%s"',
+ TypeError);
+E('ERR_WORKER_UNSERIALIZABLE_ERROR',
+ 'Serializing an uncaught exception failed', Error);
E('ERR_ZLIB_INITIALIZATION_FAILED', 'Initialization failed', Error);
diff --git a/lib/internal/process.js b/lib/internal/process.js
index bcefca316a236e..45d9c77b32007f 100644
--- a/lib/internal/process.js
+++ b/lib/internal/process.js
@@ -16,6 +16,7 @@ const util = require('util');
const constants = process.binding('constants').os.signals;
const assert = require('assert').strict;
const { deprecate } = require('internal/util');
+const { isMainThread } = require('internal/worker');
process.assert = deprecate(
function(x, msg) {
@@ -186,6 +187,11 @@ function setupKillAndExit() {
function setupSignalHandlers() {
+ if (!isMainThread) {
+ // Worker threads don't receive signals.
+ return;
+ }
+
const signalWraps = Object.create(null);
let Signal;
diff --git a/lib/internal/process/methods.js b/lib/internal/process/methods.js
index 209702ed7b9b76..77ceeafef41122 100644
--- a/lib/internal/process/methods.js
+++ b/lib/internal/process/methods.js
@@ -1,9 +1,17 @@
'use strict';
+const {
+ isMainThread
+} = require('internal/worker');
+
function setupProcessMethods(_chdir, _cpuUsage, _hrtime, _memoryUsage,
_rawDebug, _umask, _initgroups, _setegid,
_seteuid, _setgid, _setuid, _setgroups) {
// Non-POSIX platforms like Windows don't have certain methods.
+ // Workers also lack these methods since they change process-global state.
+ if (!isMainThread)
+ return;
+
if (_setgid !== undefined) {
setupPosixMethods(_initgroups, _setegid, _seteuid,
_setgid, _setuid, _setgroups);
diff --git a/lib/internal/process/stdio.js b/lib/internal/process/stdio.js
index eaba4dfca13a47..76e6ab85140535 100644
--- a/lib/internal/process/stdio.js
+++ b/lib/internal/process/stdio.js
@@ -6,6 +6,7 @@ const {
ERR_UNKNOWN_STDIN_TYPE,
ERR_UNKNOWN_STREAM_TYPE
} = require('internal/errors').codes;
+const { isMainThread } = require('internal/worker');
exports.setup = setupStdio;
@@ -16,6 +17,8 @@ function setupStdio() {
function getStdout() {
if (stdout) return stdout;
+ if (!isMainThread)
+ return new (require('stream').Writable)({ write(b, e, cb) { cb(); } });
stdout = createWritableStdioStream(1);
stdout.destroySoon = stdout.destroy;
stdout._destroy = function(er, cb) {
@@ -31,6 +34,8 @@ function setupStdio() {
function getStderr() {
if (stderr) return stderr;
+ if (!isMainThread)
+ return new (require('stream').Writable)({ write(b, e, cb) { cb(); } });
stderr = createWritableStdioStream(2);
stderr.destroySoon = stderr.destroy;
stderr._destroy = function(er, cb) {
@@ -46,6 +51,8 @@ function setupStdio() {
function getStdin() {
if (stdin) return stdin;
+ if (!isMainThread)
+ return new (require('stream').Readable)({ read() { this.push(null); } });
const tty_wrap = process.binding('tty_wrap');
const fd = 0;
diff --git a/lib/internal/util/inspector.js b/lib/internal/util/inspector.js
index 634d3302333584..3dd73415ded862 100644
--- a/lib/internal/util/inspector.js
+++ b/lib/internal/util/inspector.js
@@ -1,6 +1,8 @@
'use strict';
-const hasInspector = process.config.variables.v8_enable_inspector === 1;
+// TODO(addaleax): Figure out how to integrate the inspector with workers.
+const hasInspector = process.config.variables.v8_enable_inspector === 1 &&
+ require('internal/worker').isMainThread;
const inspector = hasInspector ? require('inspector') : undefined;
let session;
diff --git a/lib/internal/worker.js b/lib/internal/worker.js
index 73f7525aa73cc2..c982478b9334e8 100644
--- a/lib/internal/worker.js
+++ b/lib/internal/worker.js
@@ -1,24 +1,49 @@
'use strict';
+const Buffer = require('buffer').Buffer;
const EventEmitter = require('events');
+const assert = require('assert');
+const path = require('path');
const util = require('util');
+const {
+ ERR_INVALID_ARG_TYPE,
+ ERR_WORKER_NEED_ABSOLUTE_PATH,
+ ERR_WORKER_UNSERIALIZABLE_ERROR
+} = require('internal/errors').codes;
const { internalBinding } = require('internal/bootstrap/loaders');
const { MessagePort, MessageChannel } = internalBinding('messaging');
const { handle_onclose } = internalBinding('symbols');
+const { clearAsyncIdStack } = require('internal/async_hooks');
util.inherits(MessagePort, EventEmitter);
+const {
+ Worker: WorkerImpl,
+ getEnvMessagePort,
+ threadId
+} = internalBinding('worker');
+
+const isMainThread = threadId === 0;
+
const kOnMessageListener = Symbol('kOnMessageListener');
+const kHandle = Symbol('kHandle');
+const kPort = Symbol('kPort');
+const kPublicPort = Symbol('kPublicPort');
+const kDispose = Symbol('kDispose');
+const kOnExit = Symbol('kOnExit');
+const kOnMessage = Symbol('kOnMessage');
+const kOnCouldNotSerializeErr = Symbol('kOnCouldNotSerializeErr');
+const kOnErrorMessage = Symbol('kOnErrorMessage');
const debug = util.debuglog('worker');
-// A MessagePort consists of a handle (that wraps around an
+// A communication channel consisting of a handle (that wraps around an
// uv_async_t) which can receive information from other threads and emits
// .onmessage events, and a function used for sending data to a MessagePort
// in some other thread.
MessagePort.prototype[kOnMessageListener] = function onmessage(payload) {
- debug('received message', payload);
+ debug(`[${threadId}] received message`, payload);
// Emit the deserialized object to userland.
this.emit('message', payload);
};
@@ -79,6 +104,9 @@ MessagePort.prototype.close = function(cb) {
originalClose.call(this);
};
+const drainMessagePort = MessagePort.prototype.drain;
+delete MessagePort.prototype.drain;
+
function setupPortReferencing(port, eventEmitter, eventName) {
// Keep track of whether there are any workerMessage listeners:
// If there are some, ref() the channel so it keeps the event loop alive.
@@ -99,7 +127,194 @@ function setupPortReferencing(port, eventEmitter, eventName) {
});
}
+
+class Worker extends EventEmitter {
+ constructor(filename, options = {}) {
+ super();
+ debug(`[${threadId}] create new worker`, filename, options);
+ if (typeof filename !== 'string') {
+ throw new ERR_INVALID_ARG_TYPE('filename', 'string', filename);
+ }
+
+ if (!options.eval && !path.isAbsolute(filename)) {
+ throw new ERR_WORKER_NEED_ABSOLUTE_PATH(filename);
+ }
+
+ // Set up the C++ handle for the worker, as well as some internal wiring.
+ this[kHandle] = new WorkerImpl();
+ this[kHandle].onexit = (code) => this[kOnExit](code);
+ this[kPort] = this[kHandle].messagePort;
+ this[kPort].on('message', (data) => this[kOnMessage](data));
+ this[kPort].start();
+ this[kPort].unref();
+ debug(`[${threadId}] created Worker with ID ${this.threadId}`);
+
+ const { port1, port2 } = new MessageChannel();
+ this[kPublicPort] = port1;
+ this[kPublicPort].on('message', (message) => this.emit('message', message));
+ setupPortReferencing(this[kPublicPort], this, 'message');
+ this[kPort].postMessage({
+ type: 'loadScript',
+ filename,
+ doEval: !!options.eval,
+ workerData: options.workerData,
+ publicPort: port2
+ }, [port2]);
+ // Actually start the new thread now that everything is in place.
+ this[kHandle].startThread();
+ }
+
+ [kOnExit](code) {
+ debug(`[${threadId}] hears end event for Worker ${this.threadId}`);
+ drainMessagePort.call(this[kPublicPort]);
+ this[kDispose]();
+ this.emit('exit', code);
+ this.removeAllListeners();
+ }
+
+ [kOnCouldNotSerializeErr]() {
+ this.emit('error', new ERR_WORKER_UNSERIALIZABLE_ERROR());
+ }
+
+ [kOnErrorMessage](serialized) {
+ // This is what is called for uncaught exceptions.
+ const error = deserializeError(serialized);
+ this.emit('error', error);
+ }
+
+ [kOnMessage](message) {
+ switch (message.type) {
+ case 'upAndRunning':
+ return this.emit('online');
+ case 'couldNotSerializeError':
+ return this[kOnCouldNotSerializeErr]();
+ case 'errorMessage':
+ return this[kOnErrorMessage](message.error);
+ }
+
+ assert.fail(`Unknown worker message type ${message.type}`);
+ }
+
+ [kDispose]() {
+ this[kHandle].onexit = null;
+ this[kHandle] = null;
+ this[kPort] = null;
+ this[kPublicPort] = null;
+ }
+
+ postMessage(...args) {
+ this[kPublicPort].postMessage(...args);
+ }
+
+ terminate(callback) {
+ if (this[kHandle] === null) return;
+
+ debug(`[${threadId}] terminates Worker with ID ${this.threadId}`);
+
+ if (typeof callback !== 'undefined')
+ this.once('exit', (exitCode) => callback(null, exitCode));
+
+ this[kHandle].stopThread();
+ }
+
+ ref() {
+ if (this[kHandle] === null) return;
+
+ this[kHandle].ref();
+ this[kPublicPort].ref();
+ }
+
+ unref() {
+ if (this[kHandle] === null) return;
+
+ this[kHandle].unref();
+ this[kPublicPort].unref();
+ }
+
+ get threadId() {
+ if (this[kHandle] === null) return -1;
+
+ return this[kHandle].threadId;
+ }
+}
+
+let originalFatalException;
+
+function setupChild(evalScript) {
+ // Called during bootstrap to set up worker script execution.
+ debug(`[${threadId}] is setting up worker child environment`);
+ const port = getEnvMessagePort();
+
+ const publicWorker = require('worker');
+
+ port.on('message', (message) => {
+ if (message.type === 'loadScript') {
+ const { filename, doEval, workerData, publicPort } = message;
+ publicWorker.parentPort = publicPort;
+ setupPortReferencing(publicPort, publicPort, 'message');
+ publicWorker.workerData = workerData;
+ debug(`[${threadId}] starts worker script ${filename} ` +
+ `(eval = ${eval}) at cwd = ${process.cwd()}`);
+ port.unref();
+ port.postMessage({ type: 'upAndRunning' });
+ if (doEval) {
+ evalScript('[worker eval]', filename);
+ } else {
+ process.argv[1] = filename; // script filename
+ require('module').runMain();
+ }
+ return;
+ }
+
+ assert.fail(`Unknown worker message type ${message.type}`);
+ });
+
+ port.start();
+
+ originalFatalException = process._fatalException;
+ process._fatalException = fatalException;
+
+ function fatalException(error) {
+ debug(`[${threadId}] gets fatal exception`);
+ let caught = false;
+ try {
+ caught = originalFatalException.call(this, error);
+ } catch (e) {
+ error = e;
+ }
+ debug(`[${threadId}] fatal exception caught = ${caught}`);
+
+ if (!caught) {
+ let serialized;
+ try {
+ serialized = serializeError(error);
+ } catch {}
+ debug(`[${threadId}] fatal exception serialized = ${!!serialized}`);
+ if (serialized)
+ port.postMessage({ type: 'errorMessage', error: serialized });
+ else
+ port.postMessage({ type: 'couldNotSerializeError' });
+ clearAsyncIdStack();
+ }
+ }
+}
+
+// TODO(addaleax): These can be improved a lot.
+function serializeError(error) {
+ return Buffer.from(util.inspect(error), 'utf8');
+}
+
+function deserializeError(error) {
+ return Buffer.from(error.buffer,
+ error.byteOffset,
+ error.byteLength).toString('utf8');
+}
+
module.exports = {
MessagePort,
- MessageChannel
+ MessageChannel,
+ threadId,
+ Worker,
+ setupChild,
+ isMainThread
};
diff --git a/lib/worker.js b/lib/worker.js
index d67fb4efe40a33..0609650cd5731d 100644
--- a/lib/worker.js
+++ b/lib/worker.js
@@ -1,5 +1,18 @@
'use strict';
-const { MessagePort, MessageChannel } = require('internal/worker');
+const {
+ isMainThread,
+ MessagePort,
+ MessageChannel,
+ threadId,
+ Worker
+} = require('internal/worker');
-module.exports = { MessagePort, MessageChannel };
+module.exports = {
+ isMainThread,
+ MessagePort,
+ MessageChannel,
+ threadId,
+ Worker,
+ parentPort: null
+};
diff --git a/node.gyp b/node.gyp
index 823dda45b03280..c7ba2432827132 100644
--- a/node.gyp
+++ b/node.gyp
@@ -348,6 +348,7 @@
'src/node_v8.cc',
'src/node_stat_watcher.cc',
'src/node_watchdog.cc',
+ 'src/node_worker.cc',
'src/node_zlib.cc',
'src/node_i18n.cc',
'src/pipe_wrap.cc',
@@ -406,6 +407,7 @@
'src/node_wrap.h',
'src/node_revert.h',
'src/node_i18n.h',
+ 'src/node_worker.h',
'src/pipe_wrap.h',
'src/tty_wrap.h',
'src/tcp_wrap.h',
diff --git a/src/async_wrap.h b/src/async_wrap.h
index cf269a4c1f5e1e..b2f96477b490e0 100644
--- a/src/async_wrap.h
+++ b/src/async_wrap.h
@@ -67,6 +67,7 @@ namespace node {
V(TTYWRAP) \
V(UDPSENDWRAP) \
V(UDPWRAP) \
+ V(WORKER) \
V(WRITEWRAP) \
V(ZLIB)
diff --git a/src/base_object-inl.h b/src/base_object-inl.h
index 3bd854639b2c6d..06a29223973c5d 100644
--- a/src/base_object-inl.h
+++ b/src/base_object-inl.h
@@ -65,6 +65,14 @@ v8::Local BaseObject::object() {
return PersistentToLocal(env_->isolate(), persistent_handle_);
}
+v8::Local BaseObject::object(v8::Isolate* isolate) {
+ v8::Local handle = object();
+#ifdef DEBUG
+ CHECK_EQ(handle->CreationContext()->GetIsolate(), isolate);
+ CHECK_EQ(env_->isolate(), isolate);
+#endif
+ return handle;
+}
Environment* BaseObject::env() const {
return env_;
diff --git a/src/base_object.h b/src/base_object.h
index e0b60843401681..38291d598feb1c 100644
--- a/src/base_object.h
+++ b/src/base_object.h
@@ -43,6 +43,10 @@ class BaseObject {
// persistent.IsEmpty() is true.
inline v8::Local object();
+ // Same as the above, except it additionally verifies that this object
+ // is associated with the passed Isolate in debug mode.
+ inline v8::Local object(v8::Isolate* isolate);
+
inline Persistent& persistent();
inline Environment* env() const;
diff --git a/src/bootstrapper.cc b/src/bootstrapper.cc
index 35c7c4dc696ebd..f9db02562d9c8a 100644
--- a/src/bootstrapper.cc
+++ b/src/bootstrapper.cc
@@ -114,12 +114,14 @@ void SetupBootstrapObject(Environment* env,
BOOTSTRAP_METHOD(_umask, Umask);
#if defined(__POSIX__) && !defined(__ANDROID__) && !defined(__CloudABI__)
- BOOTSTRAP_METHOD(_initgroups, InitGroups);
- BOOTSTRAP_METHOD(_setegid, SetEGid);
- BOOTSTRAP_METHOD(_seteuid, SetEUid);
- BOOTSTRAP_METHOD(_setgid, SetGid);
- BOOTSTRAP_METHOD(_setuid, SetUid);
- BOOTSTRAP_METHOD(_setgroups, SetGroups);
+ if (env->is_main_thread()) {
+ BOOTSTRAP_METHOD(_initgroups, InitGroups);
+ BOOTSTRAP_METHOD(_setegid, SetEGid);
+ BOOTSTRAP_METHOD(_seteuid, SetEUid);
+ BOOTSTRAP_METHOD(_setgid, SetGid);
+ BOOTSTRAP_METHOD(_setuid, SetUid);
+ BOOTSTRAP_METHOD(_setgroups, SetGroups);
+ }
#endif // __POSIX__ && !defined(__ANDROID__) && !defined(__CloudABI__)
Local should_abort_on_uncaught_toggle =
diff --git a/src/callback_scope.cc b/src/callback_scope.cc
index 9eac7beb038a26..23e6d5b0632f2c 100644
--- a/src/callback_scope.cc
+++ b/src/callback_scope.cc
@@ -79,6 +79,11 @@ void InternalCallbackScope::Close() {
closed_ = true;
HandleScope handle_scope(env_->isolate());
+ if (!env_->can_call_into_js()) return;
+ if (failed_ && !env_->is_main_thread() && env_->is_stopping_worker()) {
+ env_->async_hooks()->clear_async_id_stack();
+ }
+
if (pushed_ids_)
env_->async_hooks()->pop_async_id(async_context_.async_id);
diff --git a/src/env-inl.h b/src/env-inl.h
index 50328bd77c1a89..eeb419b4a0fad2 100644
--- a/src/env-inl.h
+++ b/src/env-inl.h
@@ -582,13 +582,42 @@ void Environment::SetUnrefImmediate(native_immediate_callback cb,
}
inline bool Environment::can_call_into_js() const {
- return can_call_into_js_;
+ return can_call_into_js_ && (is_main_thread() || !is_stopping_worker());
}
inline void Environment::set_can_call_into_js(bool can_call_into_js) {
can_call_into_js_ = can_call_into_js;
}
+inline bool Environment::is_main_thread() const {
+ return thread_id_ == 0;
+}
+
+inline double Environment::thread_id() const {
+ return thread_id_;
+}
+
+inline void Environment::set_thread_id(double id) {
+ thread_id_ = id;
+}
+
+inline worker::Worker* Environment::worker_context() const {
+ return worker_context_;
+}
+
+inline void Environment::set_worker_context(worker::Worker* context) {
+ CHECK_EQ(worker_context_, nullptr); // Should be set only once.
+ worker_context_ = context;
+}
+
+inline void Environment::add_sub_worker_context(worker::Worker* context) {
+ sub_worker_contexts_.insert(context);
+}
+
+inline void Environment::remove_sub_worker_context(worker::Worker* context) {
+ sub_worker_contexts_.erase(context);
+}
+
inline performance::performance_state* Environment::performance_state() {
return performance_state_.get();
}
diff --git a/src/env.cc b/src/env.cc
index 090b43968bf665..8df59d1546dbdd 100644
--- a/src/env.cc
+++ b/src/env.cc
@@ -4,6 +4,7 @@
#include "node_buffer.h"
#include "node_platform.h"
#include "node_file.h"
+#include "node_worker.h"
#include "tracing/agent.h"
#include
@@ -25,6 +26,7 @@ using v8::StackTrace;
using v8::String;
using v8::Symbol;
using v8::Value;
+using worker::Worker;
IsolateData::IsolateData(Isolate* isolate,
uv_loop_t* event_loop,
@@ -444,7 +446,9 @@ void Environment::RunAndClearNativeImmediates() {
if (it->refed_)
ref_count++;
if (UNLIKELY(try_catch.HasCaught())) {
- FatalException(isolate(), try_catch);
+ if (!try_catch.HasTerminated())
+ FatalException(isolate(), try_catch);
+
// Bail out, remove the already executed callbacks from list
// and set up a new TryCatch for the other pending callbacks.
std::move_backward(it, list.end(), list.begin() + (list.end() - it));
@@ -632,4 +636,25 @@ void Environment::AsyncHooks::grow_async_ids_stack() {
uv_key_t Environment::thread_local_env = {};
+void Environment::Exit(int exit_code) {
+ if (is_main_thread())
+ exit(exit_code);
+ else
+ worker_context_->Exit(exit_code);
+}
+
+void Environment::stop_sub_worker_contexts() {
+ while (!sub_worker_contexts_.empty()) {
+ Worker* w = *sub_worker_contexts_.begin();
+ remove_sub_worker_context(w);
+ w->Exit(1);
+ w->JoinThread();
+ }
+}
+
+bool Environment::is_stopping_worker() const {
+ CHECK(!is_main_thread());
+ return worker_context_->is_stopped();
+}
+
} // namespace node
diff --git a/src/env.h b/src/env.h
index cdb592732a4264..cf6873e5fe7c6a 100644
--- a/src/env.h
+++ b/src/env.h
@@ -55,6 +55,10 @@ namespace performance {
class performance_state;
}
+namespace worker {
+class Worker;
+}
+
namespace loader {
class ModuleWrap;
@@ -193,7 +197,10 @@ struct PackageConfig {
V(mac_string, "mac") \
V(main_string, "main") \
V(max_buffer_string, "maxBuffer") \
+ V(max_semi_space_size_string, "maxSemiSpaceSize") \
+ V(max_old_space_size_string, "maxOldSpaceSize") \
V(message_string, "message") \
+ V(message_port_string, "messagePort") \
V(message_port_constructor_string, "MessagePort") \
V(minttl_string, "minttl") \
V(modulus_string, "modulus") \
@@ -280,6 +287,7 @@ struct PackageConfig {
V(subject_string, "subject") \
V(subjectaltname_string, "subjectaltname") \
V(syscall_string, "syscall") \
+ V(thread_id_string, "threadId") \
V(ticketkeycallback_string, "onticketkeycallback") \
V(timeout_string, "timeout") \
V(tls_ticket_string, "tlsTicket") \
@@ -328,6 +336,7 @@ struct PackageConfig {
V(http2stream_constructor_template, v8::ObjectTemplate) \
V(immediate_callback_function, v8::Function) \
V(inspector_console_api_object, v8::Object) \
+ V(message_port, v8::Object) \
V(message_port_constructor_template, v8::FunctionTemplate) \
V(pbkdf2_constructor_template, v8::ObjectTemplate) \
V(pipe_constructor_template, v8::FunctionTemplate) \
@@ -601,6 +610,7 @@ class Environment {
void RegisterHandleCleanups();
void CleanupHandles();
+ void Exit(int code);
// Register clean-up cb to be called on environment destruction.
inline void RegisterHandleCleanup(uv_handle_t* handle,
@@ -714,6 +724,18 @@ class Environment {
inline bool can_call_into_js() const;
inline void set_can_call_into_js(bool can_call_into_js);
+ // TODO(addaleax): This should be inline.
+ bool is_stopping_worker() const;
+
+ inline bool is_main_thread() const;
+ inline double thread_id() const;
+ inline void set_thread_id(double id);
+ inline worker::Worker* worker_context() const;
+ inline void set_worker_context(worker::Worker* context);
+ inline void add_sub_worker_context(worker::Worker* context);
+ inline void remove_sub_worker_context(worker::Worker* context);
+ void stop_sub_worker_contexts();
+
inline void ThrowError(const char* errmsg);
inline void ThrowTypeError(const char* errmsg);
inline void ThrowRangeError(const char* errmsg);
@@ -855,12 +877,15 @@ class Environment {
std::vector destroy_async_id_list_;
AliasedBuffer should_abort_on_uncaught_toggle_;
-
int should_not_abort_scope_counter_ = 0;
std::unique_ptr performance_state_;
std::unordered_map performance_marks_;
+
bool can_call_into_js_ = true;
+ double thread_id_ = 0;
+ std::unordered_set sub_worker_contexts_;
+
#if HAVE_INSPECTOR
std::unique_ptr inspector_agent_;
@@ -893,6 +918,8 @@ class Environment {
std::vector>
file_handle_read_wrap_freelist_;
+ worker::Worker* worker_context_ = nullptr;
+
struct ExitCallback {
void (*cb_)(void* arg);
void* arg_;
diff --git a/src/js_stream.cc b/src/js_stream.cc
index c766c322e3017a..e562a62f3d1bb2 100644
--- a/src/js_stream.cc
+++ b/src/js_stream.cc
@@ -44,7 +44,8 @@ bool JSStream::IsClosing() {
TryCatch try_catch(env()->isolate());
Local value;
if (!MakeCallback(env()->isclosing_string(), 0, nullptr).ToLocal(&value)) {
- FatalException(env()->isolate(), try_catch);
+ if (!try_catch.HasTerminated())
+ FatalException(env()->isolate(), try_catch);
return true;
}
return value->IsTrue();
@@ -59,7 +60,8 @@ int JSStream::ReadStart() {
int value_int = UV_EPROTO;
if (!MakeCallback(env()->onreadstart_string(), 0, nullptr).ToLocal(&value) ||
!value->Int32Value(env()->context()).To(&value_int)) {
- FatalException(env()->isolate(), try_catch);
+ if (!try_catch.HasTerminated())
+ FatalException(env()->isolate(), try_catch);
}
return value_int;
}
@@ -73,7 +75,8 @@ int JSStream::ReadStop() {
int value_int = UV_EPROTO;
if (!MakeCallback(env()->onreadstop_string(), 0, nullptr).ToLocal(&value) ||
!value->Int32Value(env()->context()).To(&value_int)) {
- FatalException(env()->isolate(), try_catch);
+ if (!try_catch.HasTerminated())
+ FatalException(env()->isolate(), try_catch);
}
return value_int;
}
@@ -94,7 +97,8 @@ int JSStream::DoShutdown(ShutdownWrap* req_wrap) {
arraysize(argv),
argv).ToLocal(&value) ||
!value->Int32Value(env()->context()).To(&value_int)) {
- FatalException(env()->isolate(), try_catch);
+ if (!try_catch.HasTerminated())
+ FatalException(env()->isolate(), try_catch);
}
return value_int;
}
@@ -128,7 +132,8 @@ int JSStream::DoWrite(WriteWrap* w,
arraysize(argv),
argv).ToLocal(&value) ||
!value->Int32Value(env()->context()).To(&value_int)) {
- FatalException(env()->isolate(), try_catch);
+ if (!try_catch.HasTerminated())
+ FatalException(env()->isolate(), try_catch);
}
return value_int;
}
diff --git a/src/node.cc b/src/node.cc
index 281d441a26a86c..f257fde148925e 100644
--- a/src/node.cc
+++ b/src/node.cc
@@ -1016,9 +1016,9 @@ void AppendExceptionLine(Environment* env,
}
-static void ReportException(Environment* env,
- Local er,
- Local message) {
+void ReportException(Environment* env,
+ Local er,
+ Local message) {
CHECK(!er.IsEmpty());
HandleScope scope(env->isolate());
@@ -1105,9 +1105,9 @@ static void ReportException(Environment* env, const TryCatch& try_catch) {
// Executes a str within the current v8 context.
-static Local ExecuteString(Environment* env,
- Local source,
- Local filename) {
+static MaybeLocal ExecuteString(Environment* env,
+ Local source,
+ Local filename) {
EscapableHandleScope scope(env->isolate());
TryCatch try_catch(env->isolate());
@@ -1120,13 +1120,19 @@ static Local ExecuteString(Environment* env,
v8::Script::Compile(env->context(), source, &origin);
if (script.IsEmpty()) {
ReportException(env, try_catch);
- exit(3);
+ env->Exit(3);
+ return MaybeLocal();
}
MaybeLocal result = script.ToLocalChecked()->Run(env->context());
if (result.IsEmpty()) {
+ if (try_catch.HasTerminated()) {
+ env->isolate()->CancelTerminateExecution();
+ return MaybeLocal();
+ }
ReportException(env, try_catch);
- exit(4);
+ env->Exit(4);
+ return MaybeLocal();
}
return scope.Escape(result.ToLocalChecked());
@@ -1225,6 +1231,7 @@ static void Abort(const FunctionCallbackInfo& args) {
void Chdir(const FunctionCallbackInfo& args) {
Environment* env = Environment::GetCurrent(args);
+ CHECK(env->is_main_thread());
if (args.Length() != 1 || !args[0]->IsString()) {
return env->ThrowTypeError("Bad argument.");
@@ -1424,6 +1431,7 @@ static void GetEGid(const FunctionCallbackInfo& args) {
void SetGid(const FunctionCallbackInfo& args) {
Environment* env = Environment::GetCurrent(args);
+ CHECK(env->is_main_thread());
if (!args[0]->IsUint32() && !args[0]->IsString()) {
return env->ThrowTypeError("setgid argument must be a number or a string");
@@ -1443,6 +1451,7 @@ void SetGid(const FunctionCallbackInfo& args) {
void SetEGid(const FunctionCallbackInfo& args) {
Environment* env = Environment::GetCurrent(args);
+ CHECK(env->is_main_thread());
if (!args[0]->IsUint32() && !args[0]->IsString()) {
return env->ThrowTypeError("setegid argument must be a number or string");
@@ -1462,6 +1471,7 @@ void SetEGid(const FunctionCallbackInfo& args) {
void SetUid(const FunctionCallbackInfo& args) {
Environment* env = Environment::GetCurrent(args);
+ CHECK(env->is_main_thread());
if (!args[0]->IsUint32() && !args[0]->IsString()) {
return env->ThrowTypeError("setuid argument must be a number or a string");
@@ -1481,6 +1491,7 @@ void SetUid(const FunctionCallbackInfo& args) {
void SetEUid(const FunctionCallbackInfo& args) {
Environment* env = Environment::GetCurrent(args);
+ CHECK(env->is_main_thread());
if (!args[0]->IsUint32() && !args[0]->IsString()) {
return env->ThrowTypeError("seteuid argument must be a number or string");
@@ -1639,9 +1650,10 @@ static void WaitForInspectorDisconnect(Environment* env) {
static void Exit(const FunctionCallbackInfo& args) {
- WaitForInspectorDisconnect(Environment::GetCurrent(args));
+ Environment* env = Environment::GetCurrent(args);
+ WaitForInspectorDisconnect(env);
v8_platform.StopTracingAgent();
- exit(args[0]->Int32Value());
+ env->Exit(args[0]->Int32Value());
}
@@ -2050,6 +2062,9 @@ void FatalException(Isolate* isolate,
Local caught =
fatal_exception_function->Call(process_object, 1, &error);
+ if (fatal_try_catch.HasTerminated())
+ return;
+
if (fatal_try_catch.HasCaught()) {
// The fatal exception function threw, so we must exit
ReportException(env, fatal_try_catch);
@@ -2063,6 +2078,12 @@ void FatalException(Isolate* isolate,
void FatalException(Isolate* isolate, const TryCatch& try_catch) {
+ // If we try to print out a termination exception, we'd just get 'null',
+ // so just crashing here with that information seems like a better idea,
+ // and in particular it seems like we should handle terminations at the call
+ // site for this function rather than by printing them out somewhere.
+ CHECK(!try_catch.HasTerminated());
+
HandleScope scope(isolate);
if (!try_catch.IsVerbose()) {
FatalException(isolate, try_catch.Exception(), try_catch.Message());
@@ -2584,11 +2605,12 @@ void SetupProcessObject(Environment* env,
Local