Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v18.x backport] stream: streams and webstreams interop related changes #46314

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -2579,6 +2579,9 @@ further errors except from `_destroy()` may be emitted as `'error'`.
<!-- YAML
added: v10.0.0
changes:
- version: v18.14.0
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was release in #46396 hence added the version here

pr-url: https://github.com/nodejs/node/pull/46205
description: Added support for `ReadableStream` and `WritableStream`.
- version: v15.11.0
pr-url: https://github.com/nodejs/node/pull/37354
description: The `signal` option was added.
Expand All @@ -2598,7 +2601,9 @@ changes:
finished before the call to `finished(stream, cb)`.
-->

* `stream` {Stream} A readable and/or writable stream.
* `stream` {Stream|ReadableStream|WritableStream}

A readable and/or writable stream/webstream.

* `options` {Object}
* `error` {boolean} If set to `false`, then a call to `emit('error', err)` is
Expand Down Expand Up @@ -3022,10 +3027,16 @@ added: v17.0.0

<!-- YAML
added: v16.8.0
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/46190
description: The `src` argument can now be a `ReadableStream` or
`WritableStream`.
-->

* `src` {Stream|Blob|ArrayBuffer|string|Iterable|AsyncIterable|
AsyncGeneratorFunction|AsyncFunction|Promise|Object}
AsyncGeneratorFunction|AsyncFunction|Promise|Object|
ReadableStream|WritableStream}

A utility method for creating duplex streams.

Expand All @@ -3045,6 +3056,8 @@ A utility method for creating duplex streams.
`writable` into `Stream` and then combines them into `Duplex` where the
`Duplex` will write to the `writable` and read from the `readable`.
* `Promise` converts into readable `Duplex`. Value `null` is ignored.
* `ReadableStream` converts into readable `Duplex`.
* `WritableStream` converts into writable `Duplex`.
* Returns: {stream.Duplex}

If an `Iterable` object containing promises is passed as an argument,
Expand Down
30 changes: 15 additions & 15 deletions lib/internal/streams/duplexify.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ const {
isReadableNodeStream,
isWritableNodeStream,
isDuplexNodeStream,
isReadableStream,
isWritableStream,
} = require('internal/streams/utils');
const eos = require('internal/streams/end-of-stream');
const {
Expand All @@ -20,6 +22,7 @@ const {
const { destroyer } = require('internal/streams/destroy');
const Duplex = require('internal/streams/duplex');
const Readable = require('internal/streams/readable');
const Writable = require('internal/streams/writable');
const { createDeferredPromise } = require('internal/util');
const from = require('internal/streams/from');

Expand Down Expand Up @@ -71,15 +74,13 @@ module.exports = function duplexify(body, name) {
return _duplexify({ writable: false, readable: false });
}

// TODO: Webstreams
// if (isReadableStream(body)) {
// return _duplexify({ readable: Readable.fromWeb(body) });
// }
if (isReadableStream(body)) {
return _duplexify({ readable: Readable.fromWeb(body) });
}

// TODO: Webstreams
// if (isWritableStream(body)) {
// return _duplexify({ writable: Writable.fromWeb(body) });
// }
if (isWritableStream(body)) {
return _duplexify({ writable: Writable.fromWeb(body) });
}

if (typeof body === 'function') {
const { value, write, final, destroy } = fromAsyncGen(body);
Expand Down Expand Up @@ -146,13 +147,12 @@ module.exports = function duplexify(body, name) {
});
}

// TODO: Webstreams.
// if (
// isReadableStream(body?.readable) &&
// isWritableStream(body?.writable)
// ) {
// return Duplexify.fromWeb(body);
// }
if (
isReadableStream(body?.readable) &&
isWritableStream(body?.writable)
) {
return Duplexify.fromWeb(body);
}

if (
typeof body?.writable === 'object' ||
Expand Down
102 changes: 102 additions & 0 deletions test/parallel/test-stream-duplex-from.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
const common = require('../common');
const assert = require('assert');
const { Duplex, Readable, Writable, pipeline, PassThrough } = require('stream');
const { ReadableStream, WritableStream } = require('stream/web');
const { Blob } = require('buffer');

{
Expand Down Expand Up @@ -299,3 +300,104 @@ const { Blob } = require('buffer');
assert.strictEqual(res, 'foobar');
})).on('close', common.mustCall());
}

function makeATestReadableStream(value) {
return new ReadableStream({
start(controller) {
controller.enqueue(value);
controller.close();
}
});
}

function makeATestWritableStream(writeFunc) {
return new WritableStream({
write(chunk) {
writeFunc(chunk);
}
});
}

{
const d = Duplex.from({
readable: makeATestReadableStream('foo'),
});
assert.strictEqual(d.readable, true);
assert.strictEqual(d.writable, false);

d.on('data', common.mustCall((data) => {
assert.strictEqual(data.toString(), 'foo');
}));

d.on('end', common.mustCall(() => {
assert.strictEqual(d.readable, false);
}));
}

{
const d = Duplex.from(makeATestReadableStream('foo'));

assert.strictEqual(d.readable, true);
assert.strictEqual(d.writable, false);

d.on('data', common.mustCall((data) => {
assert.strictEqual(data.toString(), 'foo');
}));

d.on('end', common.mustCall(() => {
assert.strictEqual(d.readable, false);
}));
}

{
let ret = '';
const d = Duplex.from({
writable: makeATestWritableStream((chunk) => ret += chunk),
});

assert.strictEqual(d.readable, false);
assert.strictEqual(d.writable, true);

d.end('foo');
d.on('finish', common.mustCall(() => {
assert.strictEqual(ret, 'foo');
assert.strictEqual(d.writable, false);
}));
}

{
let ret = '';
const d = Duplex.from(makeATestWritableStream((chunk) => ret += chunk));

assert.strictEqual(d.readable, false);
assert.strictEqual(d.writable, true);

d.end('foo');
d.on('finish', common.mustCall(() => {
assert.strictEqual(ret, 'foo');
assert.strictEqual(d.writable, false);
}));
}

{
let ret = '';
const d = Duplex.from({
readable: makeATestReadableStream('foo'),
writable: makeATestWritableStream((chunk) => ret += chunk),
});

d.end('bar');

d.on('data', common.mustCall((data) => {
assert.strictEqual(data.toString(), 'foo');
}));

d.on('end', common.mustCall(() => {
assert.strictEqual(d.readable, false);
}));

d.on('finish', common.mustCall(() => {
assert.strictEqual(ret, 'bar');
assert.strictEqual(d.writable, false);
}));
}