Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: enable usage of webstreams on compose() #46675

Merged
merged 12 commits into from
Feb 27, 2023
3 changes: 2 additions & 1 deletion doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -2802,7 +2802,8 @@ added: v16.9.0

> Stability: 1 - `stream.compose` is experimental.

* `streams` {Stream\[]|Iterable\[]|AsyncIterable\[]|Function\[]}
* `streams` {Stream\[]|Iterable\[]|AsyncIterable\[]|Function\[]|
ReadableStream\[]|WritableStream\[]|TransformStream\[]}
* Returns: {stream.Duplex}

Combines two or more streams into a `Duplex` stream that writes to the
Expand Down
176 changes: 143 additions & 33 deletions lib/internal/streams/compose.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ const {
isNodeStream,
isReadable,
isWritable,
isWebStream,
isTransformStream,
isWritableStream,
isReadableStream,
} = require('internal/streams/utils');
const {
AbortError,
Expand All @@ -15,6 +19,7 @@ const {
ERR_MISSING_ARGS,
},
} = require('internal/errors');
const eos = require('internal/streams/end-of-stream');

module.exports = function compose(...streams) {
if (streams.length === 0) {
Expand All @@ -37,18 +42,32 @@ module.exports = function compose(...streams) {
}

for (let n = 0; n < streams.length; ++n) {
if (!isNodeStream(streams[n])) {
if (!isNodeStream(streams[n]) && !isWebStream(streams[n])) {
// TODO(ronag): Add checks for non streams.
continue;
}
if (n < streams.length - 1 && !isReadable(streams[n])) {
if (
n < streams.length - 1 &&
!(
isReadable(streams[n]) ||
isReadableStream(streams[n]) ||
isTransformStream(streams[n])
)
) {
throw new ERR_INVALID_ARG_VALUE(
`streams[${n}]`,
orgStreams[n],
'must be readable'
);
}
if (n > 0 && !isWritable(streams[n])) {
if (
n > 0 &&
!(
isWritable(streams[n]) ||
isWritableStream(streams[n]) ||
isTransformStream(streams[n])
)
) {
throw new ERR_INVALID_ARG_VALUE(
`streams[${n}]`,
orgStreams[n],
Expand All @@ -57,9 +76,8 @@ module.exports = function compose(...streams) {
}
}

let ondrain;
let onfinish;
let onreadable;
let writableEndDestructor;
let readableEndDestructor;
let onclose;
let d;

Expand All @@ -79,8 +97,16 @@ module.exports = function compose(...streams) {
const head = streams[0];
const tail = pipeline(streams, onfinished);

const writable = !!isWritable(head);
const readable = !!isReadable(tail);
const writable = !!(
isWritable(head) ||
isWritableStream(head) ||
isTransformStream(head)
);
const readable = !!(
isReadable(tail) ||
isReadableStream(tail) ||
isTransformStream(tail)
);

// TODO(ronag): Avoid double buffering.
// Implement Writable/Readable/Duplex traits.
Expand All @@ -94,15 +120,53 @@ module.exports = function compose(...streams) {
});

if (writable) {
d._write = function(chunk, encoding, callback) {
writableEndDestructor = makeWritableEnd(d, head, tail);
}

if (readable) {
readableEndDestructor = makeReadableEnd(d, head, tail);
}

d._destroy = function(err, callback) {
if (!err && onclose !== null) {
err = new AbortError();
}

if (readableEndDestructor) {
readableEndDestructor();
}

if (writableEndDestructor) {
writableEndDestructor();
}

if (onclose === null) {
callback(err);
} else {
onclose = callback;
if (isNodeStream(tail)) {
destroyer(tail, err);
Copy link
Member Author

@debadree25 debadree25 Feb 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some help is needed here, how could we destroy webstreams here? or should we even?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate on the question? (We can destroy web streams the question is what scenario do you specifically mean)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the pipeline encounters an error, it would call d.destroy, which in turn would destroy the last stream in the series the tail stream should the same happen for webstreams too I think we could do writableStream.abort() here.

Actually, i am a little confused why destroying the last stream is necessary 😅

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one question remain wdyt @benjamingr ?

}
}
};

return d;
};

function makeWritableEnd(duplex, head, tail) {
let ondrain;
let onfinish;

if (isNodeStream(head)) {
duplex._write = function(chunk, encoding, callback) {
if (head.write(chunk, encoding)) {
callback();
} else {
ondrain = callback;
}
};

d._final = function(callback) {
duplex._final = function(callback) {
head.end();
onfinish = callback;
};
Expand All @@ -114,17 +178,61 @@ module.exports = function compose(...streams) {
cb();
}
});
} else if (isWebStream(head)) {
const writable = isTransformStream(head) ? head.writable : head;
const writer = writable.getWriter();

duplex._write = async function(chunk, encoding, callback) {
try {
await writer.ready;
writer.write(chunk).catch(() => {});
callback();
} catch (err) {
callback(err);
}
};

duplex._final = async function(callback) {
try {
await writer.ready;
writer.close().catch(() => {});
onfinish = callback;
} catch (err) {
callback(err);
}
};
}

if (isNodeStream(tail)) {
tail.on('finish', function() {
if (onfinish) {
const cb = onfinish;
onfinish = null;
cb();
}
});
} else if (isWebStream(tail)) {
const readable = isTransformStream(tail) ? tail.readable : tail;
eos(readable, () => {
if (onfinish) {
const cb = onfinish;
onfinish = null;
cb();
}
});
}

if (readable) {
function destructor() {
ondrain = null;
onfinish = null;
}

return destructor;
}

function makeReadableEnd(duplex, head, tail) {
let onreadable;
if (isNodeStream(tail)) {
tail.on('readable', function() {
if (onreadable) {
const cb = onreadable;
Expand All @@ -134,41 +242,43 @@ module.exports = function compose(...streams) {
});

tail.on('end', function() {
d.push(null);
duplex.push(null);
});

d._read = function() {
duplex._read = function() {
while (true) {
const buf = tail.read();

if (buf === null) {
onreadable = d._read;
onreadable = duplex._read;
return;
}

if (!d.push(buf)) {
if (!duplex.push(buf)) {
return;
}
}
};
}
} else if (isWebStream(tail)) {
const readable = isTransformStream(tail) ? tail.readable : tail;
const reader = readable.getReader();
duplex._read = async function() {
while (true) {
const { value, done } = await reader.read();
if (done) {
duplex.push(null);
return;
}

d._destroy = function(err, callback) {
if (!err && onclose !== null) {
err = new AbortError();
}
if (!duplex.push(value)) {
return;
}
}
};
}

function destructor() {
onreadable = null;
ondrain = null;
onfinish = null;

if (onclose === null) {
callback(err);
} else {
onclose = callback;
destroyer(tail, err);
}
};
}

return d;
};
return destructor;
}
3 changes: 2 additions & 1 deletion lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ function pipelineImpl(streams, callback, opts) {
throw new ERR_INVALID_RETURN_VALUE(
'Iterable, AsyncIterable or Stream', 'source', ret);
}
} else if (isIterable(stream) || isReadableNodeStream(stream)) {
} else if (isIterable(stream) || isReadableNodeStream(stream) || isTransformStream(stream)) {
ret = stream;
} else {
ret = Duplex.from(stream);
Expand Down Expand Up @@ -384,6 +384,7 @@ function pipelineImpl(streams, callback, opts) {
finishCount++;
pumpToWeb(ret, stream, finish, { end });
} else if (isTransformStream(ret)) {
finishCount++;
pumpToWeb(ret.readable, stream, finish, { end });
} else {
throw new ERR_INVALID_ARG_TYPE(
Expand Down
Loading