diff --git a/doc/api/stream.md b/doc/api/stream.md index 5a9a44d5c0fb75..ac9ea0afd8f331 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -2698,6 +2698,9 @@ const cleanup = finished(rs, (err) => { -* `streams` {Stream\[]|Iterable\[]|AsyncIterable\[]|Function\[]} -* `source` {Stream|Iterable|AsyncIterable|Function} +* `streams` {Stream\[]|Iterable\[]|AsyncIterable\[]|Function\[]| + ReadableStream\[]|WritableStream\[]|TransformStream\[]} +* `source` {Stream|Iterable|AsyncIterable|Function|ReadableStream} * Returns: {Iterable|AsyncIterable} -* `...transforms` {Stream|Function} +* `...transforms` {Stream|Function|TransformStream} * `source` {AsyncIterable} * Returns: {AsyncIterable} -* `destination` {Stream|Function} +* `destination` {Stream|Function|WritableStream} * `source` {AsyncIterable} * Returns: {AsyncIterable|Promise} * `callback` {Function} Called when the pipeline is fully done. diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index b8a756330536c5..44c0e06ee30557 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -35,6 +35,9 @@ const { isReadable, isReadableNodeStream, isNodeStream, + isTransformStream, + isWebStream, + isReadableStream, } = require('internal/streams/utils'); const { AbortController } = require('internal/abort_controller'); @@ -88,7 +91,7 @@ async function* fromReadable(val) { yield* Readable.prototype[SymbolAsyncIterator].call(val); } -async function pump(iterable, writable, finish, { end }) { +async function pumpToNode(iterable, writable, finish, { end }) { let error; let onresolve = null; @@ -147,6 +150,35 @@ async function pump(iterable, writable, finish, { end }) { } } +async function pumpToWeb(readable, writable, finish, { end }) { + if (isTransformStream(writable)) { + writable = writable.writable; + } + // https://streams.spec.whatwg.org/#example-manual-write-with-backpressure + const writer = writable.getWriter(); + try { + for await (const chunk of readable) { + await writer.ready; + writer.write(chunk).catch(() => {}); + } + + await writer.ready; + + if (end) { + await writer.close(); + } + + finish(); + } catch (err) { + try { + await writer.abort(err); + finish(err); + } catch (err) { + finish(err); + } + } +} + function pipeline(...streams) { return pipelineImpl(streams, once(popCallback(streams))); } @@ -259,7 +291,11 @@ function pipelineImpl(streams, callback, opts) { ret = Duplex.from(stream); } } else if (typeof stream === 'function') { - ret = makeAsyncIterable(ret); + if (isTransformStream(ret)) { + ret = makeAsyncIterable(ret?.readable); + } else { + ret = makeAsyncIterable(ret); + } ret = stream(ret, { signal }); if (reading) { @@ -303,7 +339,11 @@ function pipelineImpl(streams, callback, opts) { ); } else if (isIterable(ret, true)) { finishCount++; - pump(ret, pt, finish, { end }); + pumpToNode(ret, pt, finish, { end }); + } else if (isReadableStream(ret) || isTransformStream(ret)) { + const toRead = ret.readable || ret; + finishCount++; + pumpToNode(toRead, pt, finish, { end }); } else { throw new ERR_INVALID_RETURN_VALUE( 'AsyncIterable or Promise', 'destination', ret); @@ -324,12 +364,30 @@ function pipelineImpl(streams, callback, opts) { if (isReadable(stream) && isLastStream) { lastStreamCleanup.push(cleanup); } + } else if (isTransformStream(ret) || isReadableStream(ret)) { + const toRead = ret.readable || ret; + finishCount++; + pumpToNode(toRead, stream, finish, { end }); } else if (isIterable(ret)) { finishCount++; - pump(ret, stream, finish, { end }); + pumpToNode(ret, stream, finish, { end }); + } else { + throw new ERR_INVALID_ARG_TYPE( + 'val', ['Readable', 'Iterable', 'AsyncIterable', 'ReadableStream', 'TransformStream'], ret); + } + ret = stream; + } else if (isWebStream(stream)) { + if (isReadableNodeStream(ret)) { + finishCount++; + pumpToWeb(makeAsyncIterable(ret), stream, finish, { end }); + } else if (isReadableStream(ret) || isIterable(ret)) { + finishCount++; + pumpToWeb(ret, stream, finish, { end }); + } else if (isTransformStream(ret)) { + pumpToWeb(ret.readable, stream, finish, { end }); } else { throw new ERR_INVALID_ARG_TYPE( - 'val', ['Readable', 'Iterable', 'AsyncIterable'], ret); + 'val', ['Readable', 'Iterable', 'AsyncIterable', 'ReadableStream', 'TransformStream'], ret); } ret = stream; } else { diff --git a/lib/internal/streams/utils.js b/lib/internal/streams/utils.js index 9d08af6f31a280..74faca5fe9bb2a 100644 --- a/lib/internal/streams/utils.js +++ b/lib/internal/streams/utils.js @@ -77,6 +77,19 @@ function isWritableStream(obj) { ); } +function isTransformStream(obj) { + return !!( + obj && + !isNodeStream(obj) && + typeof obj.readable === 'object' && + typeof obj.writable === 'object' + ); +} + +function isWebStream(obj) { + return isReadableStream(obj) || isWritableStream(obj) || isTransformStream(obj); +} + function isIterable(obj, isAsync) { if (obj == null) return false; if (isAsync === true) return typeof obj[SymbolAsyncIterator] === 'function'; @@ -303,6 +316,7 @@ module.exports = { isReadableFinished, isReadableErrored, isNodeStream, + isWebStream, isWritable, isWritableNodeStream, isWritableStream, @@ -312,4 +326,5 @@ module.exports = { isServerRequest, isServerResponse, willEmitClose, + isTransformStream, }; diff --git a/lib/stream/promises.js b/lib/stream/promises.js index 512012860f4a7a..a8b65d62b0961b 100644 --- a/lib/stream/promises.js +++ b/lib/stream/promises.js @@ -8,6 +8,7 @@ const { const { isIterable, isNodeStream, + isWebStream, } = require('internal/streams/utils'); const { pipelineImpl: pl } = require('internal/streams/pipeline'); @@ -21,7 +22,7 @@ function pipeline(...streams) { let end; const lastArg = streams[streams.length - 1]; if (lastArg && typeof lastArg === 'object' && - !isNodeStream(lastArg) && !isIterable(lastArg)) { + !isNodeStream(lastArg) && !isIterable(lastArg) && !isWebStream(lastArg)) { const options = ArrayPrototypePop(streams); signal = options.signal; end = options.end; diff --git a/test/parallel/test-webstreams-pipeline.js b/test/parallel/test-webstreams-pipeline.js new file mode 100644 index 00000000000000..46bdf8718ea97a --- /dev/null +++ b/test/parallel/test-webstreams-pipeline.js @@ -0,0 +1,412 @@ +'use strict'; + +const common = require('../common'); +const assert = require('assert'); +const { Readable, Writable, Transform, pipeline } = require('stream'); +const { pipeline: pipelinePromise } = require('stream/promises'); +const { ReadableStream, WritableStream, TransformStream } = require('stream/web'); +const http = require('http'); + +{ + const values = []; + let c; + const rs = new ReadableStream({ + start(controller) { + c = controller; + } + }); + const ws = new WritableStream({ + write(chunk) { + values.push(chunk); + } + }); + + pipeline(rs, ws, common.mustSucceed(() => { + assert.deepStrictEqual(values, ['hello', 'world']); + })); + + c.enqueue('hello'); + c.enqueue('world'); + c.close(); +} + +{ + let c; + const rs = new ReadableStream({ + start(controller) { + c = controller; + } + }); + + const ws = new WritableStream({ + write() { } + }); + + pipeline(rs, ws, common.mustCall((err) => { + assert.strictEqual(err?.message, 'kaboom'); + })); + + c.error(new Error('kaboom')); +} + +{ + let c; + const values = []; + const rs = new ReadableStream({ + start(controller) { + c = controller; + } + }); + + const ts = new TransformStream({ + transform(chunk, controller) { + controller.enqueue(chunk?.toString().toUpperCase()); + } + }); + + const ws = new WritableStream({ + write(chunk) { + values.push(chunk?.toString()); + } + }); + + pipeline(rs, ts, ws, common.mustSucceed(() => { + assert.deepStrictEqual(values, ['HELLO', 'WORLD']); + })); + + c.enqueue('hello'); + c.enqueue('world'); + c.close(); +} + +{ + function makeTransformStream() { + return new TransformStream({ + transform(chunk, controller) { + controller.enqueue(chunk?.toString()); + } + }); + } + + let c; + const rs = new ReadableStream({ + start(controller) { + c = controller; + } + }); + + const ws = new WritableStream({ + write() { } + }); + + pipeline(rs, + makeTransformStream(), + makeTransformStream(), + makeTransformStream(), + makeTransformStream(), + ws, + common.mustCall((err) => { + assert.strictEqual(err?.message, 'kaboom'); + })); + + c.error(new Error('kaboom')); +} + +{ + const values = []; + + const r = new Readable({ + read() { } + }); + + const ws = new WritableStream({ + write(chunk) { + values.push(chunk?.toString()); + } + }); + + pipeline(r, ws, common.mustSucceed(() => { + assert.deepStrictEqual(values, ['helloworld']); + })); + + r.push('hello'); + r.push('world'); + r.push(null); +} + +{ + const values = []; + let c; + const rs = new ReadableStream({ + start(controller) { + c = controller; + } + }); + + const w = new Writable({ + write(chunk, encoding, callback) { + values.push(chunk?.toString()); + callback(); + } + }); + + pipeline(rs, w, common.mustSucceed(() => { + assert.deepStrictEqual(values, ['hello', 'world']); + })); + + c.enqueue('hello'); + c.enqueue('world'); + c.close(); +} + +{ + const values = []; + let c; + const rs = new ReadableStream({ + start(controller) { + c = controller; + } + }); + + const ws = new WritableStream({ + write(chunk) { + values.push(chunk?.toString()); + } + }); + + const t = new Transform({ + transform(chunk, encoding, callback) { + callback(null, chunk?.toString().toUpperCase()); + } + }); + + pipeline(rs, t, ws, common.mustSucceed(() => { + assert.deepStrictEqual(values, ['HELLOWORLD']); + })); + + c.enqueue('hello'); + c.enqueue('world'); + c.close(); +} + +{ + const server = http.createServer((req, res) => { + const rs = new ReadableStream({ + start(controller) { + controller.enqueue('hello'); + controller.enqueue('world'); + controller.close(); + } + }); + pipeline(rs, res, common.mustSucceed(() => {})); + }); + + server.listen(0, common.mustCall(() => { + const req = http.request({ + port: server.address().port + }); + req.end(); + const values = []; + req.on('response', (res) => { + res.on('data', (chunk) => { + values.push(chunk?.toString()); + }); + res.on('end', common.mustCall(() => { + assert.deepStrictEqual(values, ['hello', 'world']); + server.close(); + })); + }); + })); +} + +{ + const values = []; + const server = http.createServer((req, res) => { + const ts = new TransformStream({ + transform(chunk, controller) { + controller.enqueue(chunk?.toString().toUpperCase()); + } + }); + pipeline(req, ts, res, common.mustSucceed()); + }); + + server.listen(0, () => { + const req = http.request({ + port: server.address().port, + method: 'POST', + }); + + + const rs = new ReadableStream({ + start(controller) { + controller.enqueue('hello'); + controller.close(); + } + }); + + pipeline(rs, req, common.mustSucceed()); + + req.on('response', (res) => { + res.on('data', (chunk) => { + values.push(chunk?.toString()); + } + ); + res.on('end', common.mustCall(() => { + assert.deepStrictEqual(values, ['HELLO']); + server.close(); + })); + }); + }); +} + +{ + const values = []; + let c; + const rs = new ReadableStream({ + start(controller) { + c = controller; + } + }); + const ws = new WritableStream({ + write(chunk) { + values.push(chunk?.toString()); + } + }); + + pipelinePromise(rs, ws).then(common.mustCall(() => { + assert.deepStrictEqual(values, ['hello', 'world']); + })); + + c.enqueue('hello'); + c.enqueue('world'); + c.close(); +} + +{ + let c; + const rs = new ReadableStream({ + start(controller) { + c = controller; + } + }); + + const ws = new WritableStream({ + write() { } + }); + + pipelinePromise(rs, ws).then(common.mustNotCall()).catch(common.mustCall((err) => { + assert.strictEqual(err?.message, 'kaboom'); + })); + + c.error(new Error('kaboom')); +} + +{ + const values = []; + let c; + const rs = new ReadableStream({ + start(controller) { + c = controller; + } + }); + + pipeline(rs, async function(source) { + for await (const chunk of source) { + values.push(chunk?.toString()); + } + }, common.mustSucceed(() => { + assert.deepStrictEqual(values, ['hello', 'world']); + })); + + c.enqueue('hello'); + c.enqueue('world'); + c.close(); +} + +{ + const rs = new ReadableStream({ + start() {} + }); + + pipeline(rs, async function(source) { + throw new Error('kaboom'); + }, (err) => { + assert.strictEqual(err?.message, 'kaboom'); + }); +} + +{ + const values = []; + let c; + const rs = new ReadableStream({ + start(controller) { + c = controller; + } + }); + + const ts = new TransformStream({ + transform(chunk, controller) { + controller.enqueue(chunk?.toString().toUpperCase()); + } + }); + + pipeline(rs, ts, async function(source) { + for await (const chunk of source) { + values.push(chunk?.toString()); + } + }, common.mustSucceed(() => { + assert.deepStrictEqual(values, ['HELLO', 'WORLD']); + })); + + c.enqueue('hello'); + c.enqueue('world'); + c.close(); +} + +{ + const values = []; + let c; + const rs = new ReadableStream({ + start(controller) { + c = controller; + } + }); + + const ws = new WritableStream({ + write(chunk) { + values.push(chunk?.toString()); + } + }); + + pipeline(rs, async function* (source) { + for await (const chunk of source) { + yield chunk?.toString().toUpperCase(); + } + }, ws, common.mustSucceed(() => { + assert.deepStrictEqual(values, ['HELLO', 'WORLD']); + })); + + c.enqueue('hello'); + c.enqueue('world'); + c.close(); +} + +{ + let c; + const rs = new ReadableStream({ + start(controller) { + c = controller; + } + }); + + const ws = new WritableStream({ + write(chunk) { } + }, { highWaterMark: 0 }); + + pipeline(rs, ws, common.mustNotCall()); + + for (let i = 0; i < 10; i++) { + c.enqueue(`${i}`); + } + c.close(); +}