Skip to content

Commit

Permalink
stream: add abort signal for ReadableStream and WritableStream
Browse files Browse the repository at this point in the history
Refs: #39316
PR-URL: #46273
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Benjamin Gruenbaum <[email protected]>
Reviewed-By: James M Snell <[email protected]>
  • Loading branch information
debadree25 authored Feb 17, 2023
1 parent a2bbe5f commit 96c720e
Show file tree
Hide file tree
Showing 6 changed files with 233 additions and 12 deletions.
42 changes: 40 additions & 2 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -3228,17 +3228,24 @@ readable.getReader().read().then((result) => {

<!-- YAML
added: v15.4.0
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/46273
description: Added support for `ReadableStream` and
`WritableStream`.
-->

* `signal` {AbortSignal} A signal representing possible cancellation
* `stream` {Stream} a stream to attach a signal to
* `stream` {Stream|ReadableStream|WritableStream}

A stream to attach a signal to.

Attaches an AbortSignal to a readable or writeable stream. This lets code
control stream destruction using an `AbortController`.

Calling `abort` on the `AbortController` corresponding to the passed
`AbortSignal` will behave the same way as calling `.destroy(new AbortError())`
on the stream.
on the stream, and `controller.error(new AbortError())` for webstreams.

```js
const fs = require('node:fs');
Expand Down Expand Up @@ -3276,6 +3283,37 @@ const stream = addAbortSignal(
})();
```

Or using an `AbortSignal` with a ReadableStream:

```js
const controller = new AbortController();
const rs = new ReadableStream({
start(controller) {
controller.enqueue('hello');
controller.enqueue('world');
controller.close();
},
});

addAbortSignal(controller.signal, rs);

finished(rs, (err) => {
if (err) {
if (err.name === 'AbortError') {
// The operation was cancelled
}
}
});

const reader = rs.getReader();

reader.read().then(({ value, done }) => {
console.log(value); // hello
console.log(done); // false
controller.abort();
});
```

## API for stream implementers

<!--type=misc-->
Expand Down
25 changes: 16 additions & 9 deletions lib/internal/streams/add-abort-signal.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ const {
codes,
} = require('internal/errors');

const {
isNodeStream,
isWebStream,
kControllerErrorFunction,
} = require('internal/streams/utils');

const eos = require('internal/streams/end-of-stream');
const { ERR_INVALID_ARG_TYPE } = codes;

Expand All @@ -18,24 +24,25 @@ const validateAbortSignal = (signal, name) => {
}
};

function isNodeStream(obj) {
return !!(obj && typeof obj.pipe === 'function');
}

module.exports.addAbortSignal = function addAbortSignal(signal, stream) {
validateAbortSignal(signal, 'signal');
if (!isNodeStream(stream)) {
throw new ERR_INVALID_ARG_TYPE('stream', 'stream.Stream', stream);
if (!isNodeStream(stream) && !isWebStream(stream)) {
throw new ERR_INVALID_ARG_TYPE('stream', ['ReadableStream', 'WritableStream', 'Stream'], stream);
}
return module.exports.addAbortSignalNoValidate(signal, stream);
};

module.exports.addAbortSignalNoValidate = function(signal, stream) {
if (typeof signal !== 'object' || !('aborted' in signal)) {
return stream;
}
const onAbort = () => {
stream.destroy(new AbortError(undefined, { cause: signal.reason }));
};
const onAbort = isNodeStream(stream) ?
() => {
stream.destroy(new AbortError(undefined, { cause: signal.reason }));
} :
() => {
stream[kControllerErrorFunction](new AbortError(undefined, { cause: signal.reason }));
};
if (signal.aborted) {
onAbort();
} else {
Expand Down
2 changes: 2 additions & 0 deletions lib/internal/streams/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const kIsReadable = Symbol('kIsReadable');
const kIsDisturbed = Symbol('kIsDisturbed');

const kIsClosedPromise = SymbolFor('nodejs.webstream.isClosedPromise');
const kControllerErrorFunction = SymbolFor('nodejs.webstream.controllerErrorFunction');

function isReadableNodeStream(obj, strict = false) {
return !!(
Expand Down Expand Up @@ -305,6 +306,7 @@ module.exports = {
isReadable,
kIsReadable,
kIsClosedPromise,
kControllerErrorFunction,
isClosed,
isDestroyed,
isDuplexNodeStream,
Expand Down
4 changes: 3 additions & 1 deletion lib/internal/webstreams/readablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ const {
kIsErrored,
kIsReadable,
kIsClosedPromise,
kControllerErrorFunction,
} = require('internal/streams/utils');

const {
Expand Down Expand Up @@ -260,6 +261,7 @@ class ReadableStream {
};

this[kIsClosedPromise] = createDeferredPromise();
this[kControllerErrorFunction] = () => {};

// The spec requires handling of the strategy first
// here. Specifically, if getting the size and
Expand Down Expand Up @@ -1891,7 +1893,6 @@ function readableStreamClose(stream) {
assert(stream[kState].state === 'readable');
stream[kState].state = 'closed';
stream[kIsClosedPromise].resolve();

const {
reader,
} = stream[kState];
Expand Down Expand Up @@ -2330,6 +2331,7 @@ function setupReadableStreamDefaultController(
stream,
};
stream[kState].controller = controller;
stream[kControllerErrorFunction] = FunctionPrototypeBind(controller.error, controller);

const startResult = startAlgorithm();

Expand Down
4 changes: 4 additions & 0 deletions lib/internal/webstreams/writablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ const {

const {
kIsClosedPromise,
kControllerErrorFunction,
} = require('internal/streams/utils');

const {
Expand Down Expand Up @@ -199,6 +200,7 @@ class WritableStream {
};

this[kIsClosedPromise] = createDeferredPromise();
this[kControllerErrorFunction] = () => {};

const size = extractSizeAlgorithm(strategy?.size);
const highWaterMark = extractHighWaterMark(strategy?.highWaterMark, 1);
Expand Down Expand Up @@ -370,6 +372,7 @@ function TransferredWritableStream() {
},
};
this[kIsClosedPromise] = createDeferredPromise();
this[kControllerErrorFunction] = () => {};
},
[], WritableStream));
}
Expand Down Expand Up @@ -1282,6 +1285,7 @@ function setupWritableStreamDefaultController(
writeAlgorithm,
};
stream[kState].controller = controller;
stream[kControllerErrorFunction] = FunctionPrototypeBind(controller.error, controller);

writableStreamUpdateBackpressure(
stream,
Expand Down
168 changes: 168 additions & 0 deletions test/parallel/test-webstreams-abort-controller.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
'use strict';

const common = require('../common');
const { finished, addAbortSignal } = require('stream');
const { ReadableStream, WritableStream } = require('stream/web');
const assert = require('assert');

function createTestReadableStream() {
return new ReadableStream({
start(controller) {
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
controller.close();
}
});
}

function createTestWritableStream(values) {
return new WritableStream({
write(chunk) {
values.push(chunk);
}
});
}

{
const rs = createTestReadableStream();

const reader = rs.getReader();

const ac = new AbortController();

addAbortSignal(ac.signal, rs);

finished(rs, common.mustCall((err) => {
assert.strictEqual(err.name, 'AbortError');
assert.rejects(reader.read(), /AbortError/).then(common.mustCall());
assert.rejects(reader.closed, /AbortError/).then(common.mustCall());
}));

reader.read().then(common.mustCall((result) => {
assert.strictEqual(result.value, 'a');
ac.abort();
}));
}

{
const rs = createTestReadableStream();

const ac = new AbortController();

addAbortSignal(ac.signal, rs);

assert.rejects((async () => {
for await (const chunk of rs) {
if (chunk === 'b') {
ac.abort();
}
}
})(), /AbortError/).then(common.mustCall());
}

{
const rs1 = createTestReadableStream();

const rs2 = createTestReadableStream();

const ac = new AbortController();

addAbortSignal(ac.signal, rs1);
addAbortSignal(ac.signal, rs2);

const reader1 = rs1.getReader();
const reader2 = rs2.getReader();

finished(rs1, common.mustCall((err) => {
assert.strictEqual(err.name, 'AbortError');
assert.rejects(reader1.read(), /AbortError/).then(common.mustCall());
assert.rejects(reader1.closed, /AbortError/).then(common.mustCall());
}));

finished(rs2, common.mustCall((err) => {
assert.strictEqual(err.name, 'AbortError');
assert.rejects(reader2.read(), /AbortError/).then(common.mustCall());
assert.rejects(reader2.closed, /AbortError/).then(common.mustCall());
}));

ac.abort();
}

{
const rs = createTestReadableStream();

const { 0: rs1, 1: rs2 } = rs.tee();

const ac = new AbortController();

addAbortSignal(ac.signal, rs);

const reader1 = rs1.getReader();
const reader2 = rs2.getReader();

finished(rs1, common.mustCall((err) => {
assert.strictEqual(err.name, 'AbortError');
assert.rejects(reader1.read(), /AbortError/).then(common.mustCall());
assert.rejects(reader1.closed, /AbortError/).then(common.mustCall());
}));

finished(rs2, common.mustCall((err) => {
assert.strictEqual(err.name, 'AbortError');
assert.rejects(reader2.read(), /AbortError/).then(common.mustCall());
assert.rejects(reader2.closed, /AbortError/).then(common.mustCall());
}));

ac.abort();
}

{
const values = [];
const ws = createTestWritableStream(values);

const ac = new AbortController();

addAbortSignal(ac.signal, ws);

const writer = ws.getWriter();

finished(ws, common.mustCall((err) => {
assert.strictEqual(err.name, 'AbortError');
assert.deepStrictEqual(values, ['a']);
assert.rejects(writer.write('b'), /AbortError/).then(common.mustCall());
assert.rejects(writer.closed, /AbortError/).then(common.mustCall());
}));

writer.write('a').then(() => {
ac.abort();
});
}

{
const values = [];

const ws1 = createTestWritableStream(values);
const ws2 = createTestWritableStream(values);

const ac = new AbortController();

addAbortSignal(ac.signal, ws1);
addAbortSignal(ac.signal, ws2);

const writer1 = ws1.getWriter();
const writer2 = ws2.getWriter();

finished(ws1, common.mustCall((err) => {
assert.strictEqual(err.name, 'AbortError');
assert.rejects(writer1.write('a'), /AbortError/).then(common.mustCall());
assert.rejects(writer1.closed, /AbortError/).then(common.mustCall());
}));

finished(ws2, common.mustCall((err) => {
assert.strictEqual(err.name, 'AbortError');
assert.rejects(writer2.write('a'), /AbortError/).then(common.mustCall());
assert.rejects(writer2.closed, /AbortError/).then(common.mustCall());
}));

ac.abort();
}

0 comments on commit 96c720e

Please sign in to comment.