Skip to content

Commit

Permalink
stream: add AbortSignal support to finished
Browse files Browse the repository at this point in the history
Add AbortSignal support to stream.finished
  • Loading branch information
Nitzan Uziely authored and Linkgoron committed Feb 17, 2021
1 parent d345ac9 commit 44bd8c6
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 3 deletions.
9 changes: 8 additions & 1 deletion doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -1578,6 +1578,9 @@ further errors except from `_destroy()` may be emitted as `'error'`.
<!-- YAML
added: v10.0.0
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/37354
description: The `signal` option was added.
- version: v14.0.0
pr-url: https://github.com/nodejs/node/pull/32158
description: The `finished(stream, cb)` will wait for the `'close'` event
Expand All @@ -1604,10 +1607,14 @@ changes:
* `writable` {boolean} When set to `false`, the callback will be called when
the stream ends even though the stream might still be writable.
**Default:** `true`.
* `signal` {AbortSignal} allows aborting the wait for the stream finish. The
underlying stream will *not* be aborted if the signal is aborted. The
callback will get called with an `AbortError`. All registered
listeners added by this function will also be removed.
* `callback` {Function} A callback function that takes an optional error
argument.
* Returns: {Function} A cleanup function which removes all registered
listeners.
listeners added by this function.

A function to get notified when a stream is no longer readable, writable
or has experienced an error or a premature close event.
Expand Down
32 changes: 30 additions & 2 deletions lib/internal/streams/end-of-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,18 @@
const {
FunctionPrototype,
FunctionPrototypeCall,
ReflectApply,
} = primordials;
const {
AbortError,
codes,
} = require('internal/errors');
const {
ERR_STREAM_PREMATURE_CLOSE
} = require('internal/errors').codes;
} = codes;
const { once } = require('internal/util');
const {
validateAbortSignal,
validateFunction,
validateObject,
} = require('internal/validators');
Expand Down Expand Up @@ -76,6 +82,7 @@ function eos(stream, options, callback) {
validateObject(options, 'options');
}
validateFunction(callback, 'callback');
validateAbortSignal(options.signal, 'options.signal');

callback = once(callback);

Expand Down Expand Up @@ -199,7 +206,7 @@ function eos(stream, options, callback) {
});
}

return function() {
const cleanup = () => {
callback = nop;
stream.removeListener('aborted', onclose);
stream.removeListener('complete', onfinish);
Expand All @@ -213,6 +220,27 @@ function eos(stream, options, callback) {
stream.removeListener('error', onerror);
stream.removeListener('close', onclose);
};

if (options.signal && !closed) {
const abort = () => {
// Keep it because cleanup removes it.
const endCallback = callback;
cleanup();
FunctionPrototypeCall(endCallback, stream, new AbortError());
};
if (options.signal.aborted) {
process.nextTick(abort);
} else {
const originalCallback = callback;
callback = once((...args) => {
options.signal.removeEventListener('abort', abort);
ReflectApply(originalCallback, stream, args);
});
options.signal.addEventListener('abort', abort);
}
}

return cleanup;
}

module.exports = eos;
77 changes: 77 additions & 0 deletions test/parallel/test-stream-finished.js
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,83 @@ const http = require('http');
run();
}

{
// Check pre-cancelled
const signal = new EventTarget();
signal.aborted = true;

const rs = Readable.from((function* () {})());
finished(rs, { signal }, common.mustCall((err) => {
assert.strictEqual(err.name, 'AbortError');
}));
}

{
// Check cancelled before the stream ends sync.
const ac = new AbortController();
const { signal } = ac;

const rs = Readable.from((function* () {})());
finished(rs, { signal }, common.mustCall((err) => {
assert.strictEqual(err.name, 'AbortError');
}));

ac.abort();
}

{
// Check cancelled before the stream ends async.
const ac = new AbortController();
const { signal } = ac;

const rs = Readable.from((function* () {})());
setTimeout(() => ac.abort(), 1);
finished(rs, { signal }, common.mustCall((err) => {
assert.strictEqual(err.name, 'AbortError');
}));
}

{
// Check cancelled after doesn't throw.
const ac = new AbortController();
const { signal } = ac;

const rs = Readable.from((function* () {
yield 5;
setImmediate(() => ac.abort());
})());
rs.resume();
finished(rs, { signal }, common.mustSucceed());
}

{
// Promisified abort works
const finishedPromise = promisify(finished);
async function run() {
const ac = new AbortController();
const { signal } = ac;
const rs = Readable.from((function* () {})());
setImmediate(() => ac.abort());
await finishedPromise(rs, { signal });
}

assert.rejects(run, { name: 'AbortError' }).then(common.mustCall());
}

{
// Promisified pre-aborted works
const finishedPromise = promisify(finished);
async function run() {
const signal = new EventTarget();
signal.aborted = true;
const rs = Readable.from((function* () {})());
await finishedPromise(rs, { signal });
}

assert.rejects(run, { name: 'AbortError' }).then(common.mustCall());
}


{
const rs = fs.createReadStream('file-does-not-exist');

Expand Down

0 comments on commit 44bd8c6

Please sign in to comment.