From 49375980ee14a48b57a1f62963b5b712851e18c7 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Sun, 8 Jan 2023 02:47:22 +0530 Subject: [PATCH] stream: add webstreams to duplex Refs: https://github.com/nodejs/node/pull/39519 Refs: https://github.com/nodejs/node/pull/39519/commits/6d06b1c78465bde5b45915ddf7428df411c04fb2 --- lib/internal/streams/duplexify.js | 38 ++++--- test/parallel/test-stream-duplex-from.js | 128 +++++++++++++++++++++++ 2 files changed, 151 insertions(+), 15 deletions(-) diff --git a/lib/internal/streams/duplexify.js b/lib/internal/streams/duplexify.js index bfcd51233ac786..4ef72102ebcef8 100644 --- a/lib/internal/streams/duplexify.js +++ b/lib/internal/streams/duplexify.js @@ -20,6 +20,7 @@ const { const { destroyer } = require('internal/streams/destroy'); const Duplex = require('internal/streams/duplex'); const Readable = require('internal/streams/readable'); +const Writable = require('internal/streams/writable'); const { createDeferredPromise } = require('internal/util'); const from = require('internal/streams/from'); @@ -32,6 +33,16 @@ const { FunctionPrototypeCall } = primordials; +const { + isBrandCheck, +} = require('internal/webstreams/util'); +const console = require('console'); + +const isReadableStream = + isBrandCheck('ReadableStream'); +const isWritableStream = + isBrandCheck('WritableStream'); + // This is needed for pre node 17. class Duplexify extends Duplex { constructor(options) { @@ -71,15 +82,13 @@ module.exports = function duplexify(body, name) { return _duplexify({ writable: false, readable: false }); } - // TODO: Webstreams - // if (isReadableStream(body)) { - // return _duplexify({ readable: Readable.fromWeb(body) }); - // } + if (isReadableStream(body)) { + return _duplexify({ readable: Readable.fromWeb(body) }); + } - // TODO: Webstreams - // if (isWritableStream(body)) { - // return _duplexify({ writable: Writable.fromWeb(body) }); - // } + if (isWritableStream(body)) { + return _duplexify({ writable: Writable.fromWeb(body) }); + } if (typeof body === 'function') { const { value, write, final, destroy } = fromAsyncGen(body); @@ -146,13 +155,12 @@ module.exports = function duplexify(body, name) { }); } - // TODO: Webstreams. - // if ( - // isReadableStream(body?.readable) && - // isWritableStream(body?.writable) - // ) { - // return Duplexify.fromWeb(body); - // } + if ( + isReadableStream(body?.readable) && + isWritableStream(body?.writable) + ) { + return Duplexify.fromWeb(body); + } if ( typeof body?.writable === 'object' || diff --git a/test/parallel/test-stream-duplex-from.js b/test/parallel/test-stream-duplex-from.js index c3f3dd756b2e66..21c540c40c8447 100644 --- a/test/parallel/test-stream-duplex-from.js +++ b/test/parallel/test-stream-duplex-from.js @@ -3,6 +3,7 @@ const common = require('../common'); const assert = require('assert'); const { Duplex, Readable, Writable, pipeline, PassThrough } = require('stream'); +const { ReadableStream, WritableStream } = require('stream/web'); const { Blob } = require('buffer'); { @@ -299,3 +300,130 @@ const { Blob } = require('buffer'); assert.strictEqual(res, 'foobar'); })).on('close', common.mustCall()); } + +{ + const d = Duplex.from({ + readable: new ReadableStream({ + start(controller) { + controller.enqueue('foo'); + controller.close(); + }, + }), + }); + assert.strictEqual(d.readable, true); + assert.strictEqual(d.writable, false); + d.once( + 'readable', + common.mustCall(() => { + assert.strictEqual(d.read().toString(), 'foo'); + }) + ); + // assert.strictEqual(d.readable, false); +} + +{ + const d = Duplex.from( + new ReadableStream({ + start(controller) { + controller.enqueue('foo'); + controller.close(); + }, + }) + ); + assert.strictEqual(d.readable, true); + assert.strictEqual(d.writable, false); + d.once( + 'readable', + common.mustCall(() => { + assert.strictEqual(d.read().toString(), 'foo'); + }) + ); + // d.once( + // 'end', + // common.mustCall(() => { + // assert.strictEqual(d.readable, false); + // }) + // ); +} + +{ + let str = ''; + const d = Duplex.from({ + writable: new WritableStream({ + write(chunk) { + str += chunk; + }, + }), + }); + assert.strictEqual(d.readable, false); + assert.strictEqual(d.writable, true); + d.end('foo'); + d.on( + 'finish', + common.mustCall(() => { + assert.strictEqual(d.writable, false); + assert.strictEqual(str, 'foo'); + }) + ); +} + +{ + let str = ''; + const d = Duplex.from( + new WritableStream({ + write(chunk) { + str += chunk; + }, + }) + ); + assert.strictEqual(d.readable, false); + assert.strictEqual(d.writable, true); + d.end('foo'); + d.on( + 'finish', + common.mustCall(() => { + assert.strictEqual(d.writable, false); + assert.strictEqual(str, 'foo'); + }) + ); +} + +{ + let ret = ''; + const d = Duplex.from({ + readable: new Readable({ + read() { + this.push('foo'); + this.push(null); + }, + }), + writable: new Writable({ + write(chunk, encoding, callback) { + ret += chunk; + callback(); + }, + }), + }); + assert.strictEqual(d.readable, true); + assert.strictEqual(d.writable, true); + d.once( + 'readable', + common.mustCall(function () { + assert.strictEqual(d.read().toString(), 'foo'); + }) + ); + d.once( + 'end', + common.mustCall(function () { + assert.strictEqual(d.readable, false); + }) + ); + d.end('asd'); + d.once( + 'finish', + common.mustCall(function () { + assert.strictEqual(d.writable, false); + assert.strictEqual(ret, 'asd'); + }) + ); +}