From a83c976a2b822d3cd5b060099b59f3fdb03416fb Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 20 Dec 2019 15:49:24 +0100 Subject: [PATCH] stream: reset flowing state if no 'readable' or 'data' listeners If we don't have any 'readable' or 'data' listeners and we are not about to resume. Then reset flowing state to initial null state. PR-URL: https://github.com/nodejs/node/pull/31036 Fixes: https://github.com/nodejs/node/issues/24474 Reviewed-By: Luigi Pinca Reviewed-By: Matteo Collina Reviewed-By: Rich Trott --- lib/_stream_readable.js | 26 +++++++++++++++---- test/parallel/test-stream-readable-data.js | 19 ++++++++++++++ .../test-stream-readable-pause-and-resume.js | 18 +++++++++++++ 3 files changed, 58 insertions(+), 5 deletions(-) create mode 100644 test/parallel/test-stream-readable-data.js diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 5fbf0e030a6862..00b6622b789980 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -28,6 +28,7 @@ const { ObjectDefineProperty, ObjectSetPrototypeOf, SymbolAsyncIterator, + Symbol } = primordials; module.exports = Readable; @@ -51,6 +52,8 @@ const { ERR_STREAM_UNSHIFT_AFTER_END_EVENT } = require('internal/errors').codes; +const kPaused = Symbol('kPaused'); + // Lazy loaded to improve the startup performance. let StringDecoder; let createReadableStreamAsyncIterator; @@ -127,7 +130,7 @@ function ReadableState(options, stream, isDuplex) { this.emittedReadable = false; this.readableListening = false; this.resumeScheduled = false; - this.paused = true; + this[kPaused] = null; // Should close be emitted on destroy. Defaults to true. this.emitClose = !options || options.emitClose !== false; @@ -159,6 +162,16 @@ function ReadableState(options, stream, isDuplex) { } } +// Legacy property for `paused` +ObjectDefineProperty(ReadableState.prototype, 'paused', { + get() { + return this[kPaused] !== false; + }, + set(value) { + this[kPaused] = !!value; + } +}); + function Readable(options) { if (!(this instanceof Readable)) return new Readable(options); @@ -348,7 +361,8 @@ function chunkInvalid(state, chunk) { Readable.prototype.isPaused = function() { - return this._readableState.flowing === false; + const state = this._readableState; + return state[kPaused] === true || state.flowing === false; }; // Backwards compatibility. @@ -947,7 +961,7 @@ function updateReadableListening(self) { const state = self._readableState; state.readableListening = self.listenerCount('readable') > 0; - if (state.resumeScheduled && !state.paused) { + if (state.resumeScheduled && state[kPaused] === false) { // Flowing needs to be set to true now, otherwise // the upcoming resume will not flow. state.flowing = true; @@ -955,6 +969,8 @@ function updateReadableListening(self) { // Crude way to check if we should resume } else if (self.listenerCount('data') > 0) { self.resume(); + } else if (!state.readableListening) { + state.flowing = null; } } @@ -975,7 +991,7 @@ Readable.prototype.resume = function() { state.flowing = !state.readableListening; resume(this, state); } - state.paused = false; + state[kPaused] = false; return this; }; @@ -1006,7 +1022,7 @@ Readable.prototype.pause = function() { this._readableState.flowing = false; this.emit('pause'); } - this._readableState.paused = true; + this._readableState[kPaused] = true; return this; }; diff --git a/test/parallel/test-stream-readable-data.js b/test/parallel/test-stream-readable-data.js new file mode 100644 index 00000000000000..277adddde63584 --- /dev/null +++ b/test/parallel/test-stream-readable-data.js @@ -0,0 +1,19 @@ +'use strict'; +const common = require('../common'); + +const { Readable } = require('stream'); + +const readable = new Readable({ + read() {} +}); + +function read() {} + +readable.setEncoding('utf8'); +readable.on('readable', read); +readable.removeListener('readable', read); + +process.nextTick(function() { + readable.on('data', common.mustCall()); + readable.push('hello'); +}); diff --git a/test/parallel/test-stream-readable-pause-and-resume.js b/test/parallel/test-stream-readable-pause-and-resume.js index 4d7d860a6373d4..294ef2c35d4608 100644 --- a/test/parallel/test-stream-readable-pause-and-resume.js +++ b/test/parallel/test-stream-readable-pause-and-resume.js @@ -1,6 +1,7 @@ 'use strict'; const common = require('../common'); +const assert = require('assert'); const { Readable } = require('stream'); let ticks = 18; @@ -38,3 +39,20 @@ function readAndPause() { rs.on('data', ondata); } + +{ + const readable = new Readable({ + read() {} + }); + + function read() {} + + readable.setEncoding('utf8'); + readable.on('readable', read); + readable.removeListener('readable', read); + readable.pause(); + + process.nextTick(function() { + assert(readable.isPaused()); + }); +}