Skip to content

Commit

Permalink
stream: implement finished() for ReadableStream and WritableStream
Browse files Browse the repository at this point in the history
Refs: #39316
PR-URL: #46205
Reviewed-By: Robert Nagy <[email protected]>
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Darshan Sen <[email protected]>
Reviewed-By: James M Snell <[email protected]>
  • Loading branch information
debadree25 committed Jan 23, 2023
1 parent 1e11247 commit 7902a99
Show file tree
Hide file tree
Showing 5 changed files with 301 additions and 9 deletions.
25 changes: 20 additions & 5 deletions lib/internal/streams/end-of-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,23 @@ const {
validateBoolean
} = require('internal/validators');

const { Promise } = primordials;
const { Promise, PromisePrototypeThen } = primordials;

const {
isClosed,
isReadable,
isReadableNodeStream,
isReadableStream,
isReadableFinished,
isReadableErrored,
isWritable,
isWritableNodeStream,
isWritableStream,
isWritableFinished,
isWritableErrored,
isNodeStream,
willEmitClose: _willEmitClose,
kIsClosedPromise,
} = require('internal/streams/utils');

function isRequest(stream) {
Expand All @@ -58,14 +61,17 @@ function eos(stream, options, callback) {

callback = once(callback);

const readable = options.readable ?? isReadableNodeStream(stream);
const writable = options.writable ?? isWritableNodeStream(stream);
if (isReadableStream(stream) || isWritableStream(stream)) {
return eosWeb(stream, options, callback);
}

if (!isNodeStream(stream)) {
// TODO: Webstreams.
throw new ERR_INVALID_ARG_TYPE('stream', 'Stream', stream);
throw new ERR_INVALID_ARG_TYPE('stream', ['ReadableStream', 'WritableStream', 'Stream'], stream);
}

const readable = options.readable ?? isReadableNodeStream(stream);
const writable = options.writable ?? isWritableNodeStream(stream);

const wState = stream._writableState;
const rState = stream._readableState;

Expand Down Expand Up @@ -255,6 +261,15 @@ function eos(stream, options, callback) {
return cleanup;
}

function eosWeb(stream, opts, callback) {
PromisePrototypeThen(
stream[kIsClosedPromise].promise,
() => process.nextTick(() => callback.call(stream)),
(err) => process.nextTick(() => callback.call(stream, err)),
);
return nop;
}

function finished(stream, opts) {
let autoCleanup = false;
if (opts === null) {
Expand Down
25 changes: 25 additions & 0 deletions lib/internal/streams/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@ const {
Symbol,
SymbolAsyncIterator,
SymbolIterator,
SymbolFor,
} = primordials;

const kDestroyed = Symbol('kDestroyed');
const kIsErrored = Symbol('kIsErrored');
const kIsReadable = Symbol('kIsReadable');
const kIsDisturbed = Symbol('kIsDisturbed');

const kIsClosedPromise = SymbolFor('nodejs.webstream.isClosedPromise');

function isReadableNodeStream(obj, strict = false) {
return !!(
obj &&
Expand Down Expand Up @@ -55,6 +58,25 @@ function isNodeStream(obj) {
);
}

function isReadableStream(obj) {
return !!(
obj &&
!isNodeStream(obj) &&
typeof obj.pipeThrough === 'function' &&
typeof obj.getReader === 'function' &&
typeof obj.cancel === 'function'
);
}

function isWritableStream(obj) {
return !!(
obj &&
!isNodeStream(obj) &&
typeof obj.getWriter === 'function' &&
typeof obj.abort === 'function'
);
}

function isIterable(obj, isAsync) {
if (obj == null) return false;
if (isAsync === true) return typeof obj[SymbolAsyncIterator] === 'function';
Expand Down Expand Up @@ -269,18 +291,21 @@ module.exports = {
kIsErrored,
isReadable,
kIsReadable,
kIsClosedPromise,
isClosed,
isDestroyed,
isDuplexNodeStream,
isFinished,
isIterable,
isReadableNodeStream,
isReadableStream,
isReadableEnded,
isReadableFinished,
isReadableErrored,
isNodeStream,
isWritable,
isWritableNodeStream,
isWritableStream,
isWritableEnded,
isWritableFinished,
isWritableErrored,
Expand Down
14 changes: 11 additions & 3 deletions lib/internal/webstreams/readablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ const {
kIsDisturbed,
kIsErrored,
kIsReadable,
kIsClosedPromise,
} = require('internal/streams/utils');

const {
Expand Down Expand Up @@ -231,9 +232,11 @@ class ReadableStream {
port1: undefined,
port2: undefined,
promise: undefined,
}
},
};

this[kIsClosedPromise] = createDeferredPromise();

// The spec requires handling of the strategy first
// here. Specifically, if getting the size and
// highWaterMark from the strategy fail, that has
Expand Down Expand Up @@ -625,8 +628,9 @@ function TransferredReadableStream() {
writable: undefined,
port: undefined,
promise: undefined,
}
},
};
this[kIsClosedPromise] = createDeferredPromise();
},
[], ReadableStream));
}
Expand Down Expand Up @@ -1195,8 +1199,9 @@ function createTeeReadableStream(start, pull, cancel) {
writable: undefined,
port: undefined,
promise: undefined,
}
},
};
this[kIsClosedPromise] = createDeferredPromise();
setupReadableStreamDefaultControllerFromSource(
this,
ObjectCreate(null, {
Expand Down Expand Up @@ -1869,6 +1874,7 @@ function readableStreamCancel(stream, reason) {
function readableStreamClose(stream) {
assert(stream[kState].state === 'readable');
stream[kState].state = 'closed';
stream[kIsClosedPromise].resolve();

const {
reader,
Expand All @@ -1890,6 +1896,8 @@ function readableStreamError(stream, error) {
assert(stream[kState].state === 'readable');
stream[kState].state = 'errored';
stream[kState].storedError = error;
stream[kIsClosedPromise].reject(error);
setPromiseHandled(stream[kIsClosedPromise].promise);

const {
reader
Expand Down
14 changes: 13 additions & 1 deletion lib/internal/webstreams/writablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ const {
kState,
} = require('internal/webstreams/util');

const {
kIsClosedPromise,
} = require('internal/streams/utils');

const {
AbortController,
} = require('internal/abort_controller');
Expand Down Expand Up @@ -175,9 +179,11 @@ class WritableStream {
port1: undefined,
port2: undefined,
promise: undefined,
}
},
};

this[kIsClosedPromise] = createDeferredPromise();

const size = extractSizeAlgorithm(strategy?.size);
const highWaterMark = extractHighWaterMark(strategy?.highWaterMark, 1);

Expand Down Expand Up @@ -347,6 +353,7 @@ function TransferredWritableStream() {
readable: undefined,
},
};
this[kIsClosedPromise] = createDeferredPromise();
},
[], WritableStream));
}
Expand Down Expand Up @@ -726,6 +733,10 @@ function writableStreamRejectCloseAndClosedPromiseIfNeeded(stream) {
resolve: undefined,
};
}

stream[kIsClosedPromise].reject(stream[kState]?.storedError);
setPromiseHandled(stream[kIsClosedPromise].promise);

const {
writer,
} = stream[kState];
Expand Down Expand Up @@ -839,6 +850,7 @@ function writableStreamFinishInFlightClose(stream) {
stream[kState].state = 'closed';
if (stream[kState].writer !== undefined)
stream[kState].writer[kState].close.resolve?.();
stream[kIsClosedPromise].resolve?.();
assert(stream[kState].pendingAbortRequest.abort.promise === undefined);
assert(stream[kState].storedError === undefined);
}
Expand Down
Loading

0 comments on commit 7902a99

Please sign in to comment.