Skip to content

Commit

Permalink
stream: add pipeline() for webstreams
Browse files Browse the repository at this point in the history
Refs: #39316
PR-URL: #46307
Reviewed-By: Robert Nagy <[email protected]>
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Benjamin Gruenbaum <[email protected]>
  • Loading branch information
debadree25 authored Feb 2, 2023
1 parent ebcc711 commit 23effb2
Show file tree
Hide file tree
Showing 5 changed files with 500 additions and 10 deletions.
12 changes: 8 additions & 4 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -2698,6 +2698,9 @@ const cleanup = finished(rs, (err) => {
<!-- YAML
added: v10.0.0
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/46307
description: Added support for webstreams.
- version: v18.0.0
pr-url: https://github.com/nodejs/node/pull/41678
description: Passing an invalid callback to the `callback` argument
Expand All @@ -2714,13 +2717,14 @@ changes:
description: Add support for async generators.
-->

* `streams` {Stream\[]|Iterable\[]|AsyncIterable\[]|Function\[]}
* `source` {Stream|Iterable|AsyncIterable|Function}
* `streams` {Stream\[]|Iterable\[]|AsyncIterable\[]|Function\[]|
ReadableStream\[]|WritableStream\[]|TransformStream\[]}
* `source` {Stream|Iterable|AsyncIterable|Function|ReadableStream}
* Returns: {Iterable|AsyncIterable}
* `...transforms` {Stream|Function}
* `...transforms` {Stream|Function|TransformStream}
* `source` {AsyncIterable}
* Returns: {AsyncIterable}
* `destination` {Stream|Function}
* `destination` {Stream|Function|WritableStream}
* `source` {AsyncIterable}
* Returns: {AsyncIterable|Promise}
* `callback` {Function} Called when the pipeline is fully done.
Expand Down
68 changes: 63 additions & 5 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ const {
isReadable,
isReadableNodeStream,
isNodeStream,
isTransformStream,
isWebStream,
isReadableStream,
} = require('internal/streams/utils');
const { AbortController } = require('internal/abort_controller');

Expand Down Expand Up @@ -88,7 +91,7 @@ async function* fromReadable(val) {
yield* Readable.prototype[SymbolAsyncIterator].call(val);
}

async function pump(iterable, writable, finish, { end }) {
async function pumpToNode(iterable, writable, finish, { end }) {
let error;
let onresolve = null;

Expand Down Expand Up @@ -147,6 +150,35 @@ async function pump(iterable, writable, finish, { end }) {
}
}

async function pumpToWeb(readable, writable, finish, { end }) {
if (isTransformStream(writable)) {
writable = writable.writable;
}
// https://streams.spec.whatwg.org/#example-manual-write-with-backpressure
const writer = writable.getWriter();
try {
for await (const chunk of readable) {
await writer.ready;
writer.write(chunk).catch(() => {});
}

await writer.ready;

if (end) {
await writer.close();
}

finish();
} catch (err) {
try {
await writer.abort(err);
finish(err);
} catch (err) {
finish(err);
}
}
}

function pipeline(...streams) {
return pipelineImpl(streams, once(popCallback(streams)));
}
Expand Down Expand Up @@ -259,7 +291,11 @@ function pipelineImpl(streams, callback, opts) {
ret = Duplex.from(stream);
}
} else if (typeof stream === 'function') {
ret = makeAsyncIterable(ret);
if (isTransformStream(ret)) {
ret = makeAsyncIterable(ret?.readable);
} else {
ret = makeAsyncIterable(ret);
}
ret = stream(ret, { signal });

if (reading) {
Expand Down Expand Up @@ -303,7 +339,11 @@ function pipelineImpl(streams, callback, opts) {
);
} else if (isIterable(ret, true)) {
finishCount++;
pump(ret, pt, finish, { end });
pumpToNode(ret, pt, finish, { end });
} else if (isReadableStream(ret) || isTransformStream(ret)) {
const toRead = ret.readable || ret;
finishCount++;
pumpToNode(toRead, pt, finish, { end });
} else {
throw new ERR_INVALID_RETURN_VALUE(
'AsyncIterable or Promise', 'destination', ret);
Expand All @@ -324,12 +364,30 @@ function pipelineImpl(streams, callback, opts) {
if (isReadable(stream) && isLastStream) {
lastStreamCleanup.push(cleanup);
}
} else if (isTransformStream(ret) || isReadableStream(ret)) {
const toRead = ret.readable || ret;
finishCount++;
pumpToNode(toRead, stream, finish, { end });
} else if (isIterable(ret)) {
finishCount++;
pump(ret, stream, finish, { end });
pumpToNode(ret, stream, finish, { end });
} else {
throw new ERR_INVALID_ARG_TYPE(
'val', ['Readable', 'Iterable', 'AsyncIterable', 'ReadableStream', 'TransformStream'], ret);
}
ret = stream;
} else if (isWebStream(stream)) {
if (isReadableNodeStream(ret)) {
finishCount++;
pumpToWeb(makeAsyncIterable(ret), stream, finish, { end });
} else if (isReadableStream(ret) || isIterable(ret)) {
finishCount++;
pumpToWeb(ret, stream, finish, { end });
} else if (isTransformStream(ret)) {
pumpToWeb(ret.readable, stream, finish, { end });
} else {
throw new ERR_INVALID_ARG_TYPE(
'val', ['Readable', 'Iterable', 'AsyncIterable'], ret);
'val', ['Readable', 'Iterable', 'AsyncIterable', 'ReadableStream', 'TransformStream'], ret);
}
ret = stream;
} else {
Expand Down
15 changes: 15 additions & 0 deletions lib/internal/streams/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,19 @@ function isWritableStream(obj) {
);
}

function isTransformStream(obj) {
return !!(
obj &&
!isNodeStream(obj) &&
typeof obj.readable === 'object' &&
typeof obj.writable === 'object'
);
}

function isWebStream(obj) {
return isReadableStream(obj) || isWritableStream(obj) || isTransformStream(obj);
}

function isIterable(obj, isAsync) {
if (obj == null) return false;
if (isAsync === true) return typeof obj[SymbolAsyncIterator] === 'function';
Expand Down Expand Up @@ -303,6 +316,7 @@ module.exports = {
isReadableFinished,
isReadableErrored,
isNodeStream,
isWebStream,
isWritable,
isWritableNodeStream,
isWritableStream,
Expand All @@ -312,4 +326,5 @@ module.exports = {
isServerRequest,
isServerResponse,
willEmitClose,
isTransformStream,
};
3 changes: 2 additions & 1 deletion lib/stream/promises.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ const {
const {
isIterable,
isNodeStream,
isWebStream,
} = require('internal/streams/utils');

const { pipelineImpl: pl } = require('internal/streams/pipeline');
Expand All @@ -21,7 +22,7 @@ function pipeline(...streams) {
let end;
const lastArg = streams[streams.length - 1];
if (lastArg && typeof lastArg === 'object' &&
!isNodeStream(lastArg) && !isIterable(lastArg)) {
!isNodeStream(lastArg) && !isIterable(lastArg) && !isWebStream(lastArg)) {
const options = ArrayPrototypePop(streams);
signal = options.signal;
end = options.end;
Expand Down
Loading

0 comments on commit 23effb2

Please sign in to comment.