From 269103a0e5e30cc217bde1660087e87dfc722b8a Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Wed, 13 Mar 2019 22:26:18 +0100 Subject: [PATCH] stream: fix regression introduced in #26059 In #26059, we introduced a bug that caused 'readable' to be nextTicked on EOF of a ReadableStream. This breaks the dicer module on CITGM. That change was partially reverted to still fix the bug in #25810 and not break dicer. See: https://github.com/nodejs/node/pull/26059 Fixes: https://github.com/nodejs/node/issues/25810 PR-URL: https://github.com/nodejs/node/pull/26643 Reviewed-By: Luigi Pinca Reviewed-By: James M Snell --- lib/_stream_readable.js | 22 ++++++++++++++----- ...eam-readable-emit-readable-short-stream.js | 6 ++--- .../test-stream-readable-emittedReadable.js | 13 +---------- .../test-stream-readable-needReadable.js | 2 +- ...est-stream-readable-reading-readingMore.js | 2 +- .../test-stream2-httpclient-response-end.js | 2 +- test/parallel/test-stream2-transform.js | 14 ++++-------- 7 files changed, 28 insertions(+), 33 deletions(-) diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 968c7b258cc89a..fe77c441861478 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -512,12 +512,24 @@ function onEofChunk(stream, state) { } } state.ended = true; - state.needReadable = false; - // We are not protecting if emittedReadable = true, - // so 'readable' gets scheduled anyway. - state.emittedReadable = true; - process.nextTick(emitReadable_, stream); + if (state.sync) { + // If we are sync, wait until next tick to emit the data. + // Otherwise we risk emitting data in the flow() + // the readable code triggers during a read() call + emitReadable(stream); + } else { + // Emit 'readable' now to make sure it gets picked up. + state.needReadable = false; + state.emittedReadable = true; + // We have to emit readable now that we are EOF. Modules + // in the ecosystem (e.g. dicer) rely on this event being sync. + if (state.ended) { + emitReadable_(stream); + } else { + process.nextTick(emitReadable_, stream); + } + } } // Don't emit readable right away in sync mode, because this can trigger diff --git a/test/parallel/test-stream-readable-emit-readable-short-stream.js b/test/parallel/test-stream-readable-emit-readable-short-stream.js index 2f4f43baf5a848..d8b84bfbe71d6e 100644 --- a/test/parallel/test-stream-readable-emit-readable-short-stream.js +++ b/test/parallel/test-stream-readable-emit-readable-short-stream.js @@ -54,7 +54,7 @@ const assert = require('assert'); break; assert.strictEqual(chunk.toString(), 'content'); } - }, 2)); + })); } { @@ -78,7 +78,7 @@ const assert = require('assert'); break; assert.strictEqual(chunk.toString(), 'content'); } - }, 2)); + })); } { @@ -94,7 +94,7 @@ const assert = require('assert'); break; assert.strictEqual(chunk.toString(), 'content'); } - }, 2)); + })); t.push('content'); t.push(null); diff --git a/test/parallel/test-stream-readable-emittedReadable.js b/test/parallel/test-stream-readable-emittedReadable.js index 6580c36303e11e..ba613f9e9ff19d 100644 --- a/test/parallel/test-stream-readable-emittedReadable.js +++ b/test/parallel/test-stream-readable-emittedReadable.js @@ -43,23 +43,12 @@ const noRead = new Readable({ read: () => {} }); -noRead.once('readable', common.mustCall(() => { +noRead.on('readable', common.mustCall(() => { // emittedReadable should be true when the readable event is emitted assert.strictEqual(noRead._readableState.emittedReadable, true); noRead.read(0); // emittedReadable is not reset during read(0) assert.strictEqual(noRead._readableState.emittedReadable, true); - - noRead.on('readable', common.mustCall(() => { - // The second 'readable' is emitted because we are ending - - // emittedReadable should be true when the readable event is emitted - assert.strictEqual(noRead._readableState.emittedReadable, false); - noRead.read(0); - // emittedReadable is not reset during read(0) - assert.strictEqual(noRead._readableState.emittedReadable, false); - - })); })); noRead.push('foo'); diff --git a/test/parallel/test-stream-readable-needReadable.js b/test/parallel/test-stream-readable-needReadable.js index 54618b5e8ab14c..c4bc90bb19d3e2 100644 --- a/test/parallel/test-stream-readable-needReadable.js +++ b/test/parallel/test-stream-readable-needReadable.js @@ -14,7 +14,7 @@ readable.on('readable', common.mustCall(() => { // When the readable event fires, needReadable is reset. assert.strictEqual(readable._readableState.needReadable, false); readable.read(); -}, 2)); +})); // If a readable listener is attached, then a readable event is needed. assert.strictEqual(readable._readableState.needReadable, true); diff --git a/test/parallel/test-stream-readable-reading-readingMore.js b/test/parallel/test-stream-readable-reading-readingMore.js index e72159d1c9be94..e57a808286619d 100644 --- a/test/parallel/test-stream-readable-reading-readingMore.js +++ b/test/parallel/test-stream-readable-reading-readingMore.js @@ -31,7 +31,7 @@ const Readable = require('stream').Readable; assert.strictEqual(state.reading, false); } - const expectedReadingMore = [true, false, false]; + const expectedReadingMore = [true, true, false]; readable.on('readable', common.mustCall(() => { // There is only one readingMore scheduled from on('data'), // after which everything is governed by the .read() call diff --git a/test/parallel/test-stream2-httpclient-response-end.js b/test/parallel/test-stream2-httpclient-response-end.js index 8b2920668cd703..73667eb3dd2e92 100644 --- a/test/parallel/test-stream2-httpclient-response-end.js +++ b/test/parallel/test-stream2-httpclient-response-end.js @@ -15,7 +15,7 @@ const server = http.createServer(function(req, res) { while ((chunk = res.read()) !== null) { data += chunk; } - }, 2)); + })); res.on('end', common.mustCall(function() { console.log('end event'); assert.strictEqual(msg, data); diff --git a/test/parallel/test-stream2-transform.js b/test/parallel/test-stream2-transform.js index 2590d5192fe103..b27b4116f3c84e 100644 --- a/test/parallel/test-stream2-transform.js +++ b/test/parallel/test-stream2-transform.js @@ -321,16 +321,10 @@ const Transform = require('_stream_transform'); pt.end(); - // The next readable is emitted on the next tick. - assert.strictEqual(emits, 0); - - process.on('nextTick', function() { - assert.strictEqual(emits, 1); - assert.strictEqual(pt.read(5).toString(), 'l'); - assert.strictEqual(pt.read(5), null); - - assert.strictEqual(emits, 1); - }); + assert.strictEqual(emits, 1); + assert.strictEqual(pt.read(5).toString(), 'l'); + assert.strictEqual(pt.read(5), null); + assert.strictEqual(emits, 1); } {