Skip to content
This repository has been archived by the owner on Apr 22, 2023. It is now read-only.

Commit

Permalink
Fix: Multiple pipes to the same stream were broken
Browse files Browse the repository at this point in the history
When creating multiple .pipe()s to the same destination stream, the
first source to end would close the destination, breaking all remaining
pipes. This patch fixes the problem by keeping track of all open
pipes, so that we only call end on destinations that have no more
sources piping to them.

closes #929
  • Loading branch information
felixge authored and ry committed Apr 14, 2011
1 parent 8417870 commit 6c5b31b
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 9 deletions.
29 changes: 21 additions & 8 deletions lib/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,13 @@ function Stream() {
util.inherits(Stream, events.EventEmitter);
exports.Stream = Stream;

var pipes = [];

Stream.prototype.pipe = function(dest, options) {
var source = this;

pipes.push(dest);

function ondata(chunk) {
if (dest.writable) {
if (false === dest.write(chunk)) source.pause();
Expand All @@ -52,10 +56,18 @@ Stream.prototype.pipe = function(dest, options) {

if (!options || options.end !== false) {
function onend() {
var index = pipes.indexOf(dest);
pipes.splice(index, 1);

if (pipes.indexOf(dest) > -1) {
return;
}

dest.end();
}

source.on('end', onend);
source.on('close', onend);
}

/*
Expand All @@ -73,34 +85,35 @@ Stream.prototype.pipe = function(dest, options) {
source.emit('resume');
};
}

var onpause = function() {
source.pause();
}

dest.on('pause', onpause);

var onresume = function() {
if (source.readable) source.resume();
};

dest.on('resume', onresume);

var cleanup = function () {
source.removeListener('data', ondata);
dest.removeListener('drain', ondrain);
source.removeListener('end', onend);

source.removeListener('close', onend);

dest.removeListener('pause', onpause);
dest.removeListener('resume', onresume);

source.removeListener('end', cleanup);
source.removeListener('close', cleanup);

dest.removeListener('end', cleanup);
dest.removeListener('close', cleanup);
}

source.on('end', cleanup);
source.on('close', cleanup);

Expand Down
21 changes: 20 additions & 1 deletion test/simple/test-stream-pipe-cleanup.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,13 @@ var util = require('util');

function Writable () {
this.writable = true;
this.endCalls = 0;
stream.Stream.call(this);
}
util.inherits(Writable, stream.Stream);
Writable.prototype.end = function () {}
Writable.prototype.end = function () {
this.endCalls++;
}

function Readable () {
this.readable = true;
Expand All @@ -56,13 +59,29 @@ for (i = 0; i < limit; i++) {
r.emit('end')
}
assert.equal(0, r.listeners('end').length);
assert.equal(limit, w.endCalls);

w.endCalls = 0;

for (i = 0; i < limit; i++) {
r = new Readable()
r.pipe(w)
r.emit('close')
}
assert.equal(0, r.listeners('close').length);
assert.equal(limit, w.endCalls);

w.endCalls = 0;

var r2;
r = new Readable()
r2 = new Readable();

r.pipe(w)
r2.pipe(w)
r.emit('close')
r2.emit('close')
assert.equal(1, w.endCalls);

r = new Readable();

Expand Down

12 comments on commit 6c5b31b

@mikeal
Copy link

@mikeal mikeal commented on 6c5b31b Apr 23, 2011

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how the hell would you ever use this?

you're piping two readable streams to the same writable stream and you aren't insuring any kind of ordering. is there any use case for this?

@mikeal
Copy link

@mikeal mikeal commented on 6c5b31b Apr 23, 2011

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, why is pipes a module level singleton? shouldn't it be scoped to the stream? this seems quite broken.

@mikeal
Copy link

@mikeal mikeal commented on 6c5b31b Apr 23, 2011

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh shit

i'm pretty sure this patch makes all pipes in node wait until all pipes have finished, globally, before ending any one of the streams. the rest are left open.

@mikeal
Copy link

@mikeal mikeal commented on 6c5b31b Apr 23, 2011

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i read this wrong, it's not doing what i thought it was, freak out is done.

still wondering why pipes is global and not scoped to a single source? also don't know what this would be used for.

@jonseymour
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mikeal,

I think your concern about the singleton is fair enough - ending a pipe should not have O(n) performance characteristics.

That said, I can't see the concern about having to wait for all pipes to finish. A destination that is attached to only one pipe will be ended immediately, since the early return won't happen. Perhaps you can articulate the mechanism that leads to the issue you are concerned about?

@jonseymour
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comments, crossed in the ether - see you have corrected yourself now.

@isaacs
Copy link

@isaacs isaacs commented on 6c5b31b Apr 23, 2011

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, if it was a simple counter on the dest stream object, it'd be O(1). The array-as-reference-counter is bulletproof plating where aluminum foil will suffice.

@isaacs
Copy link

@isaacs isaacs commented on 6c5b31b Apr 23, 2011

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's really interesting about this patch is that the "pipes" static array is exactly the type of use-case where you want a WeakMap.

@felixge
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me try to explain. For some reason, Ryan squashed 3 of my patches into 1 with this commit. You can find the original commits here: https://github.com/felixge/node/commits/pipe. Besides the fact that I'll never catch up to @isaacs in terms commit counts this way, it really created more confusion here than necessary.

Anyway, I share your concerns about the patch for piping multiple streams into another stream. There is some method to my madness so:

also don't know what this would be used for.

The use case I had in mind was combining standard output and standard error of a child process. I don't know if there are other common use cases like this.

still wondering why pipes is global and not scoped to a single source?

It's not global, it's a local of the module. Anyway, I agree that adding a property on the readable stream would allow for a more efficient implementation. However, I discarded that idea because it would require us to officially extend the writable stream interface, or to attach new properties to an object outside of it's constructor (only ok for separately defined mixins in my book). Either way, it would require the writable stream to know that it's being piped to, something that shouldn't really be necessary as it only concerns the pipe implementation.

So if you guys see merit in any of the solutions I discarded, please go ahead. It would also not be the end of the world for me if we remove this feature,. However, I suspect the performance impact of this to not extend into a measurable value for any practical application using pipe().

@mikeal
Copy link

@mikeal mikeal commented on 6c5b31b Apr 23, 2011

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, this leaks on error since we only splice out of that array on end.

@mikeal
Copy link

@mikeal mikeal commented on 6c5b31b Apr 23, 2011

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not global, it's a local of the module.

the module is in core which means there is only ever going to be one of them for the entire node process, it's effectively global.

I can see the use case now, thanks felix, i don't know if it's a big enough use case to have in the reference implementation as it seems to be costly and prone to leaking (because we don't have WeakMaps). there are all kinds of cases where someone is going to have to handle their own pipe when it falls out of line with 99% of what people want to do.

good to know that this was multiple patches, i wondered if this was the case when i saw that additional source close cleanup which is great, if we back out this multi pipe code we should make sure that code stays.

@felixge
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the module is in core which means there is only ever going to be one of them for the entire node process, it's effectively global.

Einstein would call it a hidden variable : ).

also, this leaks on error since we only splice out of that array on end.

Yes, that's a valid issue.

there are all kinds of cases where someone is going to have to handle their own pipe when it falls out of line with 99% of what people want to do.

Well, if we don't handle this use case we are turning pipe() into a leaky abstraction.

if we back out this multi pipe code we should make sure that code stays.

Considering that this is probably not urgently broken right now, how about discussing this over beer next week at JSConf?

Please sign in to comment.