Skip to content

Commit

Permalink
stream: add AbortSignal support to finished
Browse files Browse the repository at this point in the history
Add AbortSignal support to stream.finished
  • Loading branch information
Nitzan Uziely committed Feb 13, 2021
1 parent 88d9268 commit caa5a1b
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 1 deletion.
6 changes: 6 additions & 0 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -1578,6 +1578,9 @@ further errors except from `_destroy()` may be emitted as `'error'`.
<!-- YAML
added: v10.0.0
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/37354
description: The `signal` option was added.
- version: v14.0.0
pr-url: https://github.com/nodejs/node/pull/32158
description: The `finished(stream, cb)` will wait for the `'close'` event
Expand All @@ -1604,6 +1607,9 @@ changes:
* `writable` {boolean} When set to `false`, the callback will be called when
the stream ends even though the stream might still be writable.
**Default**: `true`.
* `signal` {AbortSignal} allows aborting the wait for the stream finish. The
underlying stream will *not* be aborted if the signal is aborted. The
callback will get called with an `AbortError`.
* `callback` {Function} A callback function that takes an optional error
argument.
* Returns: {Function} A cleanup function which removes all registered
Expand Down
23 changes: 22 additions & 1 deletion lib/internal/streams/end-of-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,20 @@
const {
FunctionPrototype,
FunctionPrototypeCall,
ReflectApply,
} = primordials;
const {
codes,
AbortError,
} = require('internal/errors');
const {
ERR_STREAM_PREMATURE_CLOSE
} = require('internal/errors').codes;
} = codes;
const { once } = require('internal/util');
const {
validateFunction,
validateObject,
validateAbortSignal,
} = require('internal/validators');

function isSocket(stream) {
Expand Down Expand Up @@ -76,6 +82,7 @@ function eos(stream, options, callback) {
validateObject(options, 'options');
}
validateFunction(callback, 'callback');
validateAbortSignal(options.signal, 'options.signal');

callback = once(callback);

Expand Down Expand Up @@ -199,6 +206,20 @@ function eos(stream, options, callback) {
});
}

if (options.signal && !closed) {
const abort = () => callback(new AbortError());
if (options.signal.aborted) {
process.nextTick(abort);
} else {
const originalCallback = callback;
callback = once((...args) => {
options.signal.removeEventListener('abort', abort);
ReflectApply(originalCallback, null, args);
});
options.signal.addEventListener('abort', abort);
}
}

return function() {
callback = nop;
stream.removeListener('aborted', onclose);
Expand Down
83 changes: 83 additions & 0 deletions test/parallel/test-stream-finished.js
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,89 @@ const http = require('http');
run();
}

{
// Check pre-cancelled
const signal = new EventTarget();
signal.aborted = true;

const rs = Readable.from((function* () {})());
finished(rs, { signal }, common.mustCall((err) => {
assert.strictEqual(err.name, 'AbortError');
}));
}

{
// Check cancelled before the stream ends sync.
const ac = new AbortController();
const { signal } = ac;

const rs = Readable.from((function* () {})());
finished(rs, { signal }, common.mustCall((err) => {
assert.strictEqual(err.name, 'AbortError');
}));

ac.abort();
}

{
// Check cancelled before the stream ends async.
const ac = new AbortController();
const { signal } = ac;

const rs = Readable.from((function* () {})());
setTimeout(() => ac.abort(), 1);
finished(rs, { signal }, common.mustCall((err) => {
assert.strictEqual(err.name, 'AbortError');
}));
}

{
// Check cancelled after doesn't throw.
const ac = new AbortController();
const { signal } = ac;

const rs = Readable.from((function* () {
yield 5;
setImmediate(() => ac.abort());
})());
rs.resume();
finished(rs, { signal }, common.mustCall((err) => {
assert.strictEqual(err, undefined);
}));
}

{
// Promisified abort works
const finishedPromise = promisify(finished);
async function run() {
const ac = new AbortController();
const { signal } = ac;
const rs = Readable.from((function* () {})());
setImmediate(() => ac.abort());
await finishedPromise(rs, { signal });
}

run().catch(common.mustCall((err) => {
assert.strictEqual(err.name, 'AbortError');
}));
}

{
// Promisified pre-aborted works
const finishedPromise = promisify(finished);
async function run() {
const signal = new EventTarget();
signal.aborted = true;
const rs = Readable.from((function* () {})());
await finishedPromise(rs, { signal });
}

run().catch(common.mustCall((err) => {
assert.strictEqual(err.name, 'AbortError');
}));
}


{
const rs = fs.createReadStream('file-does-not-exist');

Expand Down

0 comments on commit caa5a1b

Please sign in to comment.