From ca855be5fb626ab0aa35ab6bcde3fb8541cb5143 Mon Sep 17 00:00:00 2001 From: MrBBot Date: Fri, 23 Dec 2022 11:45:43 +0000 Subject: [PATCH 1/2] stream: allow transfer of readable byte streams Updates the `ReadableStream` constructor to mark byte streams as transferable. When transferred, byte streams become regular streams. Refs: https://github.com/nodejs/node/pull/39062 Refs: https://streams.spec.whatwg.org/#rs-transfer --- lib/internal/webstreams/readablestream.js | 17 +++--- .../test-whatwg-webstreams-transfer.js | 55 +++++++++++++++++++ 2 files changed, 63 insertions(+), 9 deletions(-) diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index 5344785b90cd3e..e7bcde32a992dc 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -251,17 +251,16 @@ class ReadableStream { this, source, extractHighWaterMark(highWaterMark, 0)); - return; + } else { + if (type !== undefined) + throw new ERR_INVALID_ARG_VALUE('source.type', type); + setupReadableStreamDefaultControllerFromSource( + this, + source, + extractHighWaterMark(highWaterMark, 1), + extractSizeAlgorithm(size)); } - if (type !== undefined) - throw new ERR_INVALID_ARG_VALUE('source.type', type); - setupReadableStreamDefaultControllerFromSource( - this, - source, - extractHighWaterMark(highWaterMark, 1), - extractSizeAlgorithm(size)); - // eslint-disable-next-line no-constructor-return return makeTransferable(this); } diff --git a/test/parallel/test-whatwg-webstreams-transfer.js b/test/parallel/test-whatwg-webstreams-transfer.js index 1923cca13bcc6f..f947953bda61f5 100644 --- a/test/parallel/test-whatwg-webstreams-transfer.js +++ b/test/parallel/test-whatwg-webstreams-transfer.js @@ -15,6 +15,7 @@ const { const { isReadableStream, + isReadableByteStreamController, } = require('internal/webstreams/readablestream'); const { @@ -25,6 +26,10 @@ const { isTransformStream, } = require('internal/webstreams/transformstream'); +const { + kState, +} = require('internal/webstreams/util'); + const { makeTransferable, kClone, @@ -107,6 +112,56 @@ const theData = 'hello'; assert(readable.locked); } +{ + const { port1, port2 } = new MessageChannel(); + port1.onmessageerror = common.mustNotCall(); + port2.onmessageerror = common.mustNotCall(); + + // This test repeats the test above, but with a readable byte stream. + // Note transferring a readable byte stream results in a regular + // value-oriented stream on the other side: + // https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable + + const theByteData = new Uint8Array([1, 2, 3]); + + const readable = new ReadableStream({ + type: "bytes", + start: common.mustCall((controller) => { + // `enqueue` will detach its argument's buffer, so clone first + controller.enqueue(theByteData.slice()); + controller.close(); + }), + }); + assert(isReadableByteStreamController(readable[kState].controller)); + + port2.onmessage = common.mustCall(({ data }) => { + assert(isReadableStream(data)); + assert(!isReadableByteStreamController(data[kState].controller)); + + const reader = data.getReader(); + reader.read().then(common.mustCall((chunk) => { + assert.deepStrictEqual(chunk, { done: false, value: theByteData }); + })); + + port2.close(); + }); + + port1.onmessage = common.mustCall(({ data }) => { + assert(isReadableStream(data)); + assert(!isReadableByteStreamController(data[kState].controller)); + assert(!data.locked); + port1.postMessage(data, [data]); + assert(data.locked); + }); + + assert.throws(() => port2.postMessage(readable), { + code: 'ERR_MISSING_TRANSFERABLE_IN_TRANSFER_LIST', + }); + + port2.postMessage(readable, [readable]); + assert(readable.locked); +} + { const { port1, port2 } = new MessageChannel(); port1.onmessageerror = common.mustNotCall(); From 8e6988fa8e0ea686e6b55d81b8932336c9eed3cb Mon Sep 17 00:00:00 2001 From: MrBBot Date: Fri, 23 Dec 2022 12:06:38 +0000 Subject: [PATCH 2/2] fixup! stream: allow transfer of readable byte streams --- test/parallel/test-whatwg-webstreams-transfer.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/parallel/test-whatwg-webstreams-transfer.js b/test/parallel/test-whatwg-webstreams-transfer.js index f947953bda61f5..6abc2fe2a87f91 100644 --- a/test/parallel/test-whatwg-webstreams-transfer.js +++ b/test/parallel/test-whatwg-webstreams-transfer.js @@ -125,7 +125,7 @@ const theData = 'hello'; const theByteData = new Uint8Array([1, 2, 3]); const readable = new ReadableStream({ - type: "bytes", + type: 'bytes', start: common.mustCall((controller) => { // `enqueue` will detach its argument's buffer, so clone first controller.enqueue(theByteData.slice());