From 22ca334090519498a0f60402c240fcf941d2f102 Mon Sep 17 00:00:00 2001 From: Paolo Insogna Date: Tue, 9 Jul 2024 09:16:04 +0200 Subject: [PATCH] worker: add postMessageToThread MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR-URL: https://github.com/nodejs/node/pull/53682 Reviewed-By: Matteo Collina Reviewed-By: James M Snell Reviewed-By: Benjamin Gruenbaum Reviewed-By: Gerhard Stöbich Reviewed-By: Marco Ippolito --- doc/api/errors.md | 49 ++++ doc/api/process.md | 13 + doc/api/worker_threads.md | 116 +++++++++ lib/internal/errors.js | 4 + lib/internal/main/worker_thread.js | 4 + lib/internal/worker.js | 15 +- lib/internal/worker/messaging.js | 242 ++++++++++++++++++ lib/worker_threads.js | 5 + test/parallel/test-bootstrap-modules.js | 1 + test/parallel/test-http-multiple-headers.js | 1 - .../test-worker-messaging-errors-handler.js | 34 +++ .../test-worker-messaging-errors-invalid.js | 48 ++++ .../test-worker-messaging-errors-timeout.js | 38 +++ test/parallel/test-worker-messaging.js | 112 ++++++++ 14 files changed, 677 insertions(+), 5 deletions(-) create mode 100644 lib/internal/worker/messaging.js create mode 100644 test/parallel/test-worker-messaging-errors-handler.js create mode 100644 test/parallel/test-worker-messaging-errors-invalid.js create mode 100644 test/parallel/test-worker-messaging-errors-timeout.js create mode 100644 test/parallel/test-worker-messaging.js diff --git a/doc/api/errors.md b/doc/api/errors.md index fd09eb77af2f2b..8300d66c467e57 100644 --- a/doc/api/errors.md +++ b/doc/api/errors.md @@ -3105,6 +3105,54 @@ The `Worker` instance terminated because it reached its memory limit. The path for the main script of a worker is neither an absolute path nor a relative path starting with `./` or `../`. + + +### `ERR_WORKER_MESSAGING_ERRORED` + + + +> Stability: 1.1 - Active development + +The destination thread threw an error while processing a message sent via [`postMessageToThread()`][]. + + + +### `ERR_WORKER_MESSAGING_FAILED` + + + +> Stability: 1.1 - Active development + +The thread requested in [`postMessageToThread()`][] is invalid or has no `workerMessage` listener. + + + +### `ERR_WORKER_MESSAGING_SAME_THREAD` + + + +> Stability: 1.1 - Active development + +The thread id requested in [`postMessageToThread()`][] is the current thread id. + + + +### `ERR_WORKER_MESSAGING_TIMEOUT` + + + +> Stability: 1.1 - Active development + +Sending a message via [`postMessageToThread()`][] timed out. + ### `ERR_WORKER_UNSERIALIZABLE_ERROR` @@ -4017,6 +4065,7 @@ An error occurred trying to allocate memory. This should never happen. [`new URLSearchParams(iterable)`]: url.md#new-urlsearchparamsiterable [`package.json`]: packages.md#nodejs-packagejson-field-definitions [`postMessage()`]: worker_threads.md#portpostmessagevalue-transferlist +[`postMessageToThread()`]: worker_threads.md#workerpostmessagetothreadthreadid-value-transferlist-timeout [`process.on('exit')`]: process.md#event-exit [`process.send()`]: process.md#processsendmessage-sendhandle-options-callback [`process.setUncaughtExceptionCaptureCallback()`]: process.md#processsetuncaughtexceptioncapturecallbackfn diff --git a/doc/api/process.md b/doc/api/process.md index 3f4a411e440151..5eb4eef0745c66 100644 --- a/doc/api/process.md +++ b/doc/api/process.md @@ -327,6 +327,18 @@ possible to record such errors in an error log, either periodically (which is likely best for long-running application) or upon process exit (which is likely most convenient for scripts). +### Event: `'workerMessage'` + + + +* `value` {any} A value transmitted using [`postMessageToThread()`][]. +* `source` {number} The transmitting worker thread ID or `0` for the main thread. + +The `'workerMessage'` event is emitted for any incoming message send by the other +party by using [`postMessageToThread()`][]. + ### Event: `'uncaughtException'` + +> Stability: 1.1 - Active development + +* `destination` {number} The target thread ID. If the thread ID is invalid, a + [`ERR_WORKER_MESSAGING_FAILED`][] error will be thrown. If the target thread ID is the current thread ID, + a [`ERR_WORKER_MESSAGING_SAME_THREAD`][] error will be thrown. +* `value` {any} The value to send. +* `transferList` {Object\[]} If one or more `MessagePort`-like objects are passed in `value`, + a `transferList` is required for those items or [`ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST`][] is thrown. + See [`port.postMessage()`][] for more information. +* `timeout` {number} Time to wait for the message to be delivered in milliseconds. + By default it's `undefined`, which means wait forever. If the operation times out, + a [`ERR_WORKER_MESSAGING_TIMEOUT`][] error is thrown. +* Returns: {Promise} A promise which is fulfilled if the message was successfully processed by destination thread. + +Sends a value to another worker, identified by its thread ID. + +If the target thread has no listener for the `workerMessage` event, then the operation will throw +a [`ERR_WORKER_MESSAGING_FAILED`][] error. + +If the target thread threw an error while processing the `workerMessage` event, then the operation will throw +a [`ERR_WORKER_MESSAGING_ERRORED`][] error. + +This method should be used when the target thread is not the direct +parent or child of the current thread. +If the two threads are parent-children, use the [`require('node:worker_threads').parentPort.postMessage()`][] +and the [`worker.postMessage()`][] to let the threads communicate. + +The example below shows the use of of `postMessageToThread`: it creates 10 nested threads, +the last one will try to communicate with the main thread. + +```mjs +import { fileURLToPath } from 'node:url'; +import { once } from 'node:events'; +import process from 'node:process'; +import { + isMainThread, + postMessageToThread, + threadId, + workerData, + Worker, +} from 'node:worker_threads'; + +const channel = new BroadcastChannel('sync'); +const level = workerData?.level ?? 0; + +if (level < 10) { + const worker = new Worker(fileURLToPath(import.meta.url), { + workerData: { level: level + 1 }, + }); +} + +if (level === 0) { + process.on('workerMessage', (value, source) => { + console.log(`${source} -> ${threadId}:`, value); + postMessageToThread(source, { message: 'pong' }); + }); +} else if (level === 10) { + process.on('workerMessage', (value, source) => { + console.log(`${source} -> ${threadId}:`, value); + channel.postMessage('done'); + channel.close(); + }); + + await postMessageToThread(0, { message: 'ping' }); +} + +channel.onmessage = channel.close; +``` + +```cjs +const { once } = require('node:events'); +const { + isMainThread, + postMessageToThread, + threadId, + workerData, + Worker, +} = require('node:worker_threads'); + +const channel = new BroadcastChannel('sync'); +const level = workerData?.level ?? 0; + +if (level < 10) { + const worker = new Worker(__filename, { + workerData: { level: level + 1 }, + }); +} + +if (level === 0) { + process.on('workerMessage', (value, source) => { + console.log(`${source} -> ${threadId}:`, value); + postMessageToThread(source, { message: 'pong' }); + }); +} else if (level === 10) { + process.on('workerMessage', (value, source) => { + console.log(`${source} -> ${threadId}:`, value); + channel.postMessage('done'); + channel.close(); + }); + + postMessageToThread(0, { message: 'ping' }); +} + +channel.onmessage = channel.close; +``` + ## `worker.receiveMessageOnPort(port)`