Skip to content

Commit

Permalink
stream: provide a _construct endpoint for async stream initialization.
Browse files Browse the repository at this point in the history
Some streams need to first asynchronously create a resources before
it can perform any work. Currently this is implemented in the different
stream implementations which both makes things more difficult and
error prone. Provide a standardized way of achieving this.
  • Loading branch information
ronag committed Sep 16, 2019
1 parent 17d87d5 commit 3ce4d2c
Show file tree
Hide file tree
Showing 10 changed files with 906 additions and 119 deletions.
174 changes: 172 additions & 2 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,15 @@ added: v12.3.0

Getter for the property `objectMode` of a given `Writable` stream.

##### writable.writableReady
<!-- YAML
added: REPLACEME
-->

* {boolean}

Is set to `true` immediately before the [`'ready'`][] event is emitted.

##### writable.write(chunk[, encoding][, callback])
<!-- YAML
added: v0.9.4
Expand Down Expand Up @@ -1169,6 +1178,15 @@ added: v12.3.0

Getter for the property `objectMode` of a given `Readable` stream.

##### readable.readableReady
<!-- YAML
added: REPLACEME
-->

* {boolean}

Is set to `true` immediately before the [`'ready'`][] event is emitted.

##### readable.resume()
<!-- YAML
added: v0.9.4
Expand Down Expand Up @@ -1622,8 +1640,8 @@ on the type of stream being created, as detailed in the chart below:
| Use-case | Class | Method(s) to implement |
| -------- | ----- | ---------------------- |
| Reading only | [`Readable`] | <code>[_read()][stream-_read]</code> |
| Writing only | [`Writable`] | <code>[_write()][stream-_write]</code>, <code>[_writev()][stream-_writev]</code>, <code>[_final()][stream-_final]</code> |
| Reading and writing | [`Duplex`] | <code>[_read()][stream-_read]</code>, <code>[_write()][stream-_write]</code>, <code>[_writev()][stream-_writev]</code>, <code>[_final()][stream-_final]</code> |
| Writing only | [`Writable`] | <code>[_write()][stream-_write]</code>, <code>[_writev()][stream-_writev]</code>, <code>[_final()][stream-_final]</code>, <code>[_construct()][stream-_construct]</code> |
| Reading and writing | [`Duplex`] | <code>[_read()][stream-_read]</code>, <code>[_write()][stream-_write]</code>, <code>[_writev()][stream-_writev]</code>, <code>[_final()][stream-_final]</code>, <code>[_construct()][stream-_construct]</code> |
| Operate on written data, then read the result | [`Transform`] | <code>[_transform()][stream-_transform]</code>, <code>[_flush()][stream-_flush]</code>, <code>[_final()][stream-_final]</code> |

The implementation code for a stream should *never* call the "public" methods
Expand All @@ -1645,8 +1663,14 @@ objects and passing appropriate methods as constructor options.
const { Writable } = require('stream');

const myWritable = new Writable({
construct(options, callback) {
// Initialize state and load resources...
},
write(chunk, encoding, callback) {
// ...
},
destroy() {
// Free resources...
}
});
```
Expand Down Expand Up @@ -1700,6 +1724,8 @@ changes:
[`stream._destroy()`][writable-_destroy] method.
* `final` {Function} Implementation for the
[`stream._final()`][stream-_final] method.
* `construct` {Function} Implementation for the
[`stream._construct()`][stream-_construct] method.
* `autoDestroy` {boolean} Whether this stream should automatically call
`.destroy()` on itself after ending. **Default:** `false`.

Expand Down Expand Up @@ -1745,6 +1771,57 @@ const myWritable = new Writable({
});
```

#### writable.\_construct(options, callback)
<!-- YAML
added: REPLACEME
-->

* `options` {Object} Options passed to constructor.
* `callback` {Function} Call this function (optionally with an error
argument) when finished writing any remaining data.

The `_construct()` method **must not** be called directly. It may be implemented
by child classes, and if so, will be called by the internal `Writable`
class methods only.

This optional function will be called by the stream constructor,
delaying the `'ready'` event until `callback` is called. This is useful to
initalize state or asynchronously initialize resources before the stream
can be used.

```js
const { Writable } = require('stream');
const fs = require('fs');

class WriteStream extends Writable {
constructor(filename) {
super({ filename, autoDestroy: true });
}
_construct({ filename }, callback) {
this.filename = filename;
this.fd = null;
fs.open(this.filename, (fd, err) => {
if (err) {
callback(err);
} else {
this.fd = fd;
callback();
}
});
}
_write(chunk, encoding, callback) {
fs.write(this.fd, chunk, callback);
}
_destroy(err, callback) {
if (this.fd) {
fs.close(this.fd, (er) => callback(er || err));
} else {
callback(err);
}
}
}
```

#### writable.\_write(chunk, encoding, callback)

* `chunk` {Buffer|string|any} The `Buffer` to be written, converted from the
Expand Down Expand Up @@ -1958,6 +2035,8 @@ changes:
method.
* `destroy` {Function} Implementation for the
[`stream._destroy()`][readable-_destroy] method.
* `construct` {Function} Implementation for the
[`stream._construct()`][readable-_construct] method.
* `autoDestroy` {boolean} Whether this stream should automatically call
`.destroy()` on itself after ending. **Default:** `false`.

Expand Down Expand Up @@ -2000,6 +2079,64 @@ const myReadable = new Readable({
});
```

#### readable.\_construct(options, callback)
<!-- YAML
added: REPLACEME
-->

* `options` {Object} Options passed to constructor.
* `callback` {Function} Call this function (optionally with an error
argument) when finished writing any remaining data.

The `_construct()` method **must not** be called directly. It may be implemented
by child classes, and if so, will be called by the internal `Writable`
class methods only.

This optional function will be called by the stream constructor,
delaying the `'ready'` event until `callback` is called. This is useful to
initalize state or asynchronously initialize resources before the stream
can be used.

```js
const { Readable } = require('stream');
const fs = require('fs');

class ReadStream extends Readable {
constructor(filename) {
super({ autoDestroy: true, filename });
}
_construct({ filename }, callback) {
this.filename = filename;
this.fd = null;
fs.open(this.filename, (fd, err) => {
if (err) {
callback(err);
} else {
this.fd = fd;
callback();
}
});
}
_read(n) {
const buf = Buffer.alloc(n);
fs.read(this.fd, buf, 0, n, null, (err, bytesRead) => {
if (err) {
this.destroy(err);
} else {
this.push(bytesRead > 0 ? buf.slice(bytesRead) : null);
}
});
}
_destroy(err, callback) {
if (this.fd) {
fs.close(this.fd, (er) => callback(er || err));
} else {
callback(err);
}
}
}
```

#### readable.\_read(size)
<!-- YAML
added: v0.9.4
Expand Down Expand Up @@ -2255,6 +2392,38 @@ const myDuplex = new Duplex({
});
```

When using pipeline:

```js
const { Duplex } = require('stream');
const fs = require('fs');

stream.pipeline(
fs.createReadStream('object.json')
.setEncoding('utf-8'),
new Duplex({
construct(options, callback) {
this.data = '';
callback();
},
transform(chunk, encoding, callback) {
this.data += chunk;
callback();
},
flush(callback) {
try {
// Make sure is valid json.
JSON.parse(this.data);
this.push(this.data);
} catch (err) {
callback(err);
}
}
}),
fs.createWriteStream('valid-object.json')
);
```

#### An Example Duplex Stream

The following illustrates a simple example of a `Duplex` stream that wraps a
Expand Down Expand Up @@ -2748,6 +2917,7 @@ contain multi-byte characters.
[object-mode]: #stream_object_mode
[readable-_destroy]: #stream_readable_destroy_err_callback
[readable-destroy]: #stream_readable_destroy_error
[stream-_construct]: #stream_writable_construct_options_callback
[stream-_final]: #stream_writable_final_callback
[stream-_flush]: #stream_transform_flush_callback
[stream-_read]: #stream_readable_read_size_1
Expand Down
10 changes: 10 additions & 0 deletions lib/_stream_duplex.js
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,16 @@ Object.defineProperty(Duplex.prototype, 'writableHighWaterMark', {
}
});

Object.defineProperty(Duplex.prototype, 'writableReady', {
// Making it explicit this property is not enumerable
// because otherwise some prototype manipulation in
// userland will fail
enumerable: false,
get: function() {
return this._writableState && this._writableState.status === 2;
}
});

Object.defineProperty(Duplex.prototype, 'writableBuffer', {
// Making it explicit this property is not enumerable
// because otherwise some prototype manipulation in
Expand Down
37 changes: 34 additions & 3 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,18 @@ function ReadableState(options, stream, isDuplex) {
this.endEmitted = false;
this.reading = false;

// Stream is still being constructed and no operations
// can take place until construction finished.
this.ready = false;

// Stream is still being constructed and cannot be
// destroyed until construction finished or failed.
this.pending = true;

// Callback to continue destruction after constrution
// has finished or failed.
this.destroyCallback = null;

// A flag to be able to tell if the event 'readable'/'data' is emitted
// immediately, or on a later tick. We set this to true at first, because
// any actions that shouldn't happen until "later" should generally also
Expand Down Expand Up @@ -178,9 +190,22 @@ function Readable(options) {

if (typeof options.destroy === 'function')
this._destroy = options.destroy;

if (typeof options.construct === 'function')
this._construct = options.construct;
}

Stream.call(this);

if (typeof this._construct === 'function') {
this.once('ready', function() {
maybeReadMore(this, this._readableState);
});
destroyImpl.construct(this, options);
} else {
this._readableState.pending = false;
this._readableState.ready = true;
}
}

Object.defineProperty(Readable.prototype, 'destroyed', {
Expand Down Expand Up @@ -489,9 +514,9 @@ Readable.prototype.read = function(n) {

// However, if we've ended, then there's no point, and if we're already
// reading, then it's unnecessary.
if (state.ended || state.reading) {
if (state.ended || state.reading || state.pending) {
doRead = false;
debug('reading or ended', doRead);
debug('reading, ended or pending', doRead);
} else if (doRead) {
debug('do read');
state.reading = true;
Expand Down Expand Up @@ -643,7 +668,7 @@ function maybeReadMore_(stream, state) {
// called push() with new data. In this case we skip performing more
// read()s. The execution ends in this method again after the _read() ends
// up calling push() with more data.
while (!state.reading && !state.ended &&
while (!state.reading && !state.ended && !state.destroyed &&
(state.length < state.highWaterMark ||
(state.flowing && state.length === 0))) {
const len = state.length;
Expand Down Expand Up @@ -1077,6 +1102,12 @@ Readable.prototype[Symbol.asyncIterator] = function() {
return createReadableStreamAsyncIterator(this);
};

Object.defineProperty(Readable.prototype, 'readableReady', {
get() {
return this._readableState.ready;
}
});

Object.defineProperty(Readable.prototype, 'readableHighWaterMark', {
// Making it explicit this property is not enumerable
// because otherwise some prototype manipulation in
Expand Down
Loading

0 comments on commit 3ce4d2c

Please sign in to comment.