From d05c0a08f40afe7dd7b07a54bab87b7566fc220e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E5=88=9A?= Date: Sat, 20 Jan 2018 11:06:26 +0800 Subject: [PATCH] stream: cleanup() when unpiping all streams. This PR makes sure the object emitted as the 'unpipe' event in the destination stream is not shared between destination, as it would be muted. Refs: https://github.com/nodejs/node/pull/12746 PR-URL: https://github.com/nodejs/node/pull/18266 Reviewed-By: Luigi Pinca Reviewed-By: Anatoli Papirovski Reviewed-By: Matteo Collina Reviewed-By: James M Snell Reviewed-By: Ruben Bridgewater Reviewed-By: Anna Henningsen --- lib/_stream_readable.js | 2 +- .../test-stream-pipe-unpipe-streams.js | 54 +++++++++++++++++++ 2 files changed, 55 insertions(+), 1 deletion(-) diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 500071203b0f3b..17ff233ca9d3d5 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -754,7 +754,7 @@ Readable.prototype.unpipe = function(dest) { state.flowing = false; for (var i = 0; i < len; i++) - dests[i].emit('unpipe', this, unpipeInfo); + dests[i].emit('unpipe', this, { hasUnpiped: false }); return this; } diff --git a/test/parallel/test-stream-pipe-unpipe-streams.js b/test/parallel/test-stream-pipe-unpipe-streams.js index cd29b66365ae0f..49e02bea9cb695 100644 --- a/test/parallel/test-stream-pipe-unpipe-streams.js +++ b/test/parallel/test-stream-pipe-unpipe-streams.js @@ -31,3 +31,57 @@ source.unpipe(dest2); source.unpipe(dest1); assert.strictEqual(source._readableState.pipes, null); + +{ + // test `cleanup()` if we unpipe all streams. + const source = Readable({ read: () => {} }); + const dest1 = Writable({ write: () => {} }); + const dest2 = Writable({ write: () => {} }); + + let destCount = 0; + const srcCheckEventNames = ['end', 'data']; + const destCheckEventNames = ['close', 'finish', 'drain', 'error', 'unpipe']; + + const checkSrcCleanup = common.mustCall(() => { + assert.strictEqual(source._readableState.pipes, null); + assert.strictEqual(source._readableState.pipesCount, 0); + assert.strictEqual(source._readableState.flowing, false); + + srcCheckEventNames.forEach((eventName) => { + assert.strictEqual( + source.listenerCount(eventName), 0, + `source's '${eventName}' event listeners not removed` + ); + }); + }); + + function checkDestCleanup(dest) { + const currentDestId = ++destCount; + source.pipe(dest); + + const unpipeChecker = common.mustCall(() => { + assert.deepStrictEqual( + dest.listeners('unpipe'), [unpipeChecker], + `destination{${currentDestId}} should have a 'unpipe' event ` + + 'listener which is `unpipeChecker`' + ); + dest.removeListener('unpipe', unpipeChecker); + destCheckEventNames.forEach((eventName) => { + assert.strictEqual( + dest.listenerCount(eventName), 0, + `destination{${currentDestId}}'s '${eventName}' event ` + + 'listeners not removed' + ); + }); + + if (--destCount === 0) + checkSrcCleanup(); + }); + + dest.on('unpipe', unpipeChecker); + } + + checkDestCleanup(dest1); + checkDestCleanup(dest2); + source.unpipe(); +}