-
-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
io: wake pending writers on DuplexStream close #3756
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. This seems reasonable to me.
The CI failure in valgrind doesn't appear to be your fault. |
tokio/tests/io_mem_stream.rs
Outdated
// wait for drop first | ||
t2.await.unwrap(); | ||
// then try and join writer task | ||
t1.await.unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These comments seem to imply that the order is important, but changing the order cannot change the test's behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't the await necessary to ensure that t2 is scheduled to run before t1? or is that not necessary in tests/wrong in general? otherwise this would just test the existing behaviour of error on starting a new write on a closed stream
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When you tokio::spawn
something, it starts running immediately. The returned JoinHandle
is only useful for waiting for the task — Tokio takes care of running it regardless of whether you use the JoinHandle
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be clear, the test does currently not guarantee any ordering between t1
and t2
whatsoever. They may run in any order or even at the same time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, I see, thanks. Would it be worth it then to guarantee the order somehow? In my testing this would always reliably always trigger the bug, but both cases are supposed to behave the same on the writer side anyway. If we decide to leave it, I'd just remove the comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you want t2
to run first, then t1
, then just don't spawn them and put them in that order.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But the write_all blocks, so I need to await it in a spawned task? Anyway, for me it reproduces the issue just fine and shows the fix working, so I've just removed the comments for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's fine like this as well.
Performing a write on a closed DuplexStream (i.e. other end is dropped) results in an Err(BrokenPipe). However, if there is a writer waiting to be awoken from a buffer-full condition, it would previously be ignored, and thus stuck in suspended state, as no further reads could ever be made. Split the Pipe::close routine into close_read and close_write, and perform both in case one side of the DuplexStream is dropped. close_read will notify any writers to wake up, which will then cause them to see the updated is_closed flag and return an Err(BrokenPipe) immediately. Test case 'disconnect_reader' is added to test the fixed behaviour, it would previously get stuck indefinitely. The 'max_write_size' test needs to be updated, as otherwise the read side is dropped immediately after performing the read_exact, which now resulted in the second write being awoken and failing with a BrokenPipe error.
See this PR for more info: tokio-rs/tokio#3756 As a workaround use a pair of connected unix sockets - this obviously incurs some overhead, albeit not measureable on my machine. Once tokio includes the fix we can go back to a DuplexStream for performance and simplicity. Signed-off-by: Stefan Reiter <[email protected]>
Performing a write on a closed DuplexStream (i.e. other end is dropped) results in an Err(BrokenPipe). However, if there is a writer waiting to be awoken from a buffer-full condition, it would previously be ignored, and thus stuck in suspended state, as no further reads could ever be made.
We encountered this when trying to write data from a tokio task via the duplex stream to a hyper Body::wrap_stream (via
ReaderStream
) for a server application. If the client aborted the download prematurely, the socket would be closed and the error propagated up to the duplex, but if there was a pending write (very likely, as we read from a fast source over potentially slow network) it would get stuck and never woken up, thus the tokio task would run indefinitely and cleanup routines wouldn't.This PR fixes the issue by waking pending writers on a DuplexStream on Drop of the other side. This works for our use case, and also satisfies the reproducer, included as a test case (
disconnect_reader
). I've also updated the documentation with a paragraph about the behaviour of the DuplexStream when one side is closed.Note that this requires modifications to an existing test case, so I'm not sure if this would qualify as a "breaking change" - AFAICT that test case was most likely slightly broken beforehand too, as it somewhat relied on timing between drop and awake.