Skip to content

Commit

Permalink
stream: add suport for abort signal in finished() for webstreams
Browse files Browse the repository at this point in the history
Refs: #46205
PR-URL: #46403
Refs: #37354
Reviewed-By: Benjamin Gruenbaum <[email protected]>
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Robert Nagy <[email protected]>
  • Loading branch information
debadree25 authored Feb 2, 2023
1 parent bd09205 commit ebcc711
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 3 deletions.
29 changes: 26 additions & 3 deletions lib/internal/streams/end-of-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -261,11 +261,34 @@ function eos(stream, options, callback) {
return cleanup;
}

function eosWeb(stream, opts, callback) {
function eosWeb(stream, options, callback) {
let isAborted = false;
let abort = nop;
if (options.signal) {
abort = () => {
isAborted = true;
callback.call(stream, new AbortError(undefined, { cause: options.signal.reason }));
};
if (options.signal.aborted) {
process.nextTick(abort);
} else {
const originalCallback = callback;
callback = once((...args) => {
options.signal.removeEventListener('abort', abort);
originalCallback.apply(stream, args);
});
options.signal.addEventListener('abort', abort);
}
}
const resolverFn = (...args) => {
if (!isAborted) {
process.nextTick(() => callback.apply(stream, args));
}
};
PromisePrototypeThen(
stream[kIsClosedPromise].promise,
() => process.nextTick(() => callback.call(stream)),
(err) => process.nextTick(() => callback.call(stream, err)),
resolverFn,
resolverFn
);
return nop;
}
Expand Down
90 changes: 90 additions & 0 deletions test/parallel/test-webstreams-finished.js
Original file line number Diff line number Diff line change
Expand Up @@ -230,3 +230,93 @@ const { finished: finishedPromise } = require('stream/promises');
assert.strictEqual(err?.message, 'asd');
});
}

{
// Check pre-cancelled
const signal = new EventTarget();
signal.aborted = true;

const rs = new ReadableStream({
start() {}
});
finished(rs, { signal }, common.mustCall((err) => {
assert.strictEqual(err.name, 'AbortError');
}));
}

{
// Check cancelled before the stream ends sync.
const ac = new AbortController();
const { signal } = ac;

const rs = new ReadableStream({
start() {}
});
finished(rs, { signal }, common.mustCall((err) => {
assert.strictEqual(err.name, 'AbortError');
}));

ac.abort();
}

{
// Check cancelled before the stream ends async.
const ac = new AbortController();
const { signal } = ac;

const rs = new ReadableStream({
start() {}
});
setTimeout(() => ac.abort(), 1);
finished(rs, { signal }, common.mustCall((err) => {
assert.strictEqual(err.name, 'AbortError');
}));
}

{
// Check cancelled after doesn't throw.
const ac = new AbortController();
const { signal } = ac;

const rs = new ReadableStream({
start(controller) {
controller.enqueue('asd');
controller.close();
}
});
finished(rs, { signal }, common.mustSucceed());

rs.getReader().read().then(common.mustCall((chunk) => {
assert.strictEqual(chunk.value, 'asd');
setImmediate(() => ac.abort());
}));
}

{
// Promisified abort works
async function run() {
const ac = new AbortController();
const { signal } = ac;
const rs = new ReadableStream({
start() {}
});
setImmediate(() => ac.abort());
await finishedPromise(rs, { signal });
}

assert.rejects(run, { name: 'AbortError' }).then(common.mustCall());
}

{
// Promisified pre-aborted works
async function run() {
const signal = new EventTarget();
signal.aborted = true;
const rs = new ReadableStream({
start() {}
});
await finishedPromise(rs, { signal });
}

assert.rejects(run, { name: 'AbortError' }).then(common.mustCall());
}

0 comments on commit ebcc711

Please sign in to comment.