diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 5fbf0e030a6862..00b6622b789980 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -28,6 +28,7 @@ const { ObjectDefineProperty, ObjectSetPrototypeOf, SymbolAsyncIterator, + Symbol } = primordials; module.exports = Readable; @@ -51,6 +52,8 @@ const { ERR_STREAM_UNSHIFT_AFTER_END_EVENT } = require('internal/errors').codes; +const kPaused = Symbol('kPaused'); + // Lazy loaded to improve the startup performance. let StringDecoder; let createReadableStreamAsyncIterator; @@ -127,7 +130,7 @@ function ReadableState(options, stream, isDuplex) { this.emittedReadable = false; this.readableListening = false; this.resumeScheduled = false; - this.paused = true; + this[kPaused] = null; // Should close be emitted on destroy. Defaults to true. this.emitClose = !options || options.emitClose !== false; @@ -159,6 +162,16 @@ function ReadableState(options, stream, isDuplex) { } } +// Legacy property for `paused` +ObjectDefineProperty(ReadableState.prototype, 'paused', { + get() { + return this[kPaused] !== false; + }, + set(value) { + this[kPaused] = !!value; + } +}); + function Readable(options) { if (!(this instanceof Readable)) return new Readable(options); @@ -348,7 +361,8 @@ function chunkInvalid(state, chunk) { Readable.prototype.isPaused = function() { - return this._readableState.flowing === false; + const state = this._readableState; + return state[kPaused] === true || state.flowing === false; }; // Backwards compatibility. @@ -947,7 +961,7 @@ function updateReadableListening(self) { const state = self._readableState; state.readableListening = self.listenerCount('readable') > 0; - if (state.resumeScheduled && !state.paused) { + if (state.resumeScheduled && state[kPaused] === false) { // Flowing needs to be set to true now, otherwise // the upcoming resume will not flow. state.flowing = true; @@ -955,6 +969,8 @@ function updateReadableListening(self) { // Crude way to check if we should resume } else if (self.listenerCount('data') > 0) { self.resume(); + } else if (!state.readableListening) { + state.flowing = null; } } @@ -975,7 +991,7 @@ Readable.prototype.resume = function() { state.flowing = !state.readableListening; resume(this, state); } - state.paused = false; + state[kPaused] = false; return this; }; @@ -1006,7 +1022,7 @@ Readable.prototype.pause = function() { this._readableState.flowing = false; this.emit('pause'); } - this._readableState.paused = true; + this._readableState[kPaused] = true; return this; }; diff --git a/test/parallel/test-stream-readable-data.js b/test/parallel/test-stream-readable-data.js new file mode 100644 index 00000000000000..277adddde63584 --- /dev/null +++ b/test/parallel/test-stream-readable-data.js @@ -0,0 +1,19 @@ +'use strict'; +const common = require('../common'); + +const { Readable } = require('stream'); + +const readable = new Readable({ + read() {} +}); + +function read() {} + +readable.setEncoding('utf8'); +readable.on('readable', read); +readable.removeListener('readable', read); + +process.nextTick(function() { + readable.on('data', common.mustCall()); + readable.push('hello'); +}); diff --git a/test/parallel/test-stream-readable-pause-and-resume.js b/test/parallel/test-stream-readable-pause-and-resume.js index 4d7d860a6373d4..294ef2c35d4608 100644 --- a/test/parallel/test-stream-readable-pause-and-resume.js +++ b/test/parallel/test-stream-readable-pause-and-resume.js @@ -1,6 +1,7 @@ 'use strict'; const common = require('../common'); +const assert = require('assert'); const { Readable } = require('stream'); let ticks = 18; @@ -38,3 +39,20 @@ function readAndPause() { rs.on('data', ondata); } + +{ + const readable = new Readable({ + read() {} + }); + + function read() {} + + readable.setEncoding('utf8'); + readable.on('readable', read); + readable.removeListener('readable', read); + readable.pause(); + + process.nextTick(function() { + assert(readable.isPaused()); + }); +}