-
-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add SimplexStream
structure
#6589
Conversation
Adding a stream with one direction of communication seems reasonable, but I don't love the name |
My reasoning for not using But if you think it is negligible I will change it back to the |
A |
Sorry, this PR has gotten lost. I'm happy to take it up again if you're still interested. |
I would be glad to continue working on this feature (and possibly extending it into other structures, as @ robgjansen mentioned) . However, I am still uncertain whether we agree to name it |
We can go with SimplexStream. Sorry for not replying earlier. |
tokio/src/io/util/mem.rs
Outdated
pub fn simplex(max_buf_size: usize) -> SimplexStream { | ||
SimplexStream { | ||
buffer: BytesMut::new(), | ||
is_closed: false, | ||
max_buf_size, | ||
read_waker: None, | ||
write_waker: None, | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This provides a single object that is both the reader and writer, but in practice I think people will want those to be two different objects.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with the previous comment. In my case I would like to use these stream objects as drop-in replacements for TcpStream
and related types.
A TcpStream
is specific to one "node", i.e., the client has a TcpStream
, and the server has a TcpStream
. For testing code that uses a TcpStream
, we can replace with DuplexStream
s
let (client_rw, server_rw) = tokio::io::duplex(65_536);
Here we have both the client-side and server-side objects, and both are Read+Write. Thus, code written to generically handle IO on a TcpStream
will function identically when replaced with the DuplexStream
.
Now in my use case, each TcpStream
is split into a reader (OwnedReadHalf
) and a writer (OwnedWriteHalf
),
let (read_half, write_half) = tcp_stream.into_split();
and my code operates generically on the Read and on the Write objects. To replicate that, I would need the SimplexStream
to work similarly:
let (client_read_half, server_write_half) = tokio::io::simplex(65_536);
And then you could stack two of these together to get functionality similar to the split of a TcpStream
let (client_read_half, server_write_half) = tokio::io::simplex(65_536);
let (server_read_half, client_write_half) = tokio::io::simplex(65_536);
Then I could pass client_read_half
/ client_write_half
to my client-side code, and server_read_half
/ server_write_half
to my server-side code.
There is a bit of a question about the API. Do we want tokio::io::simplex
to return a SimplexStream
that is Read+Write, and then support SimplexStream::into_split
that returns a SimplexStreamReadHalf
and SimplexStreamWriteHalf
? Or should tokio::io::simplex
directly return the tuple (SimplexStreamReadHalf, SimplexStreamWriteHalf)
as in my examples above?
(Also, I'm not sure if it makes sense to mix DuplexStreams and SimplexStreams or not, it sorta depends on how the simplex API looks.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO, we should not reinvent the wheel and try to reuse already existing APIs and facilities. It is true that for most of the use cases, we should rather return a pair of reader and writer instead of a single structure, that is a great point. Also, most of the channel-like structures already do this.
Furthermore, since SimplexStream
already implements AsyncRead
and AsyncWrite
, we can use already existing split(…)
to create reader and writer half (where possibly the split reader and writer half can be unsplit
).
What do you think of it now?
I am not sure whether it makes sense to add more tests to test the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think returning an already split SimplexStream
makes sense as an API.
ab0b514
to
f1a10bc
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could make new
public, but then I'd call it new_unsplit
or similar.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM. Just one more minor request below.
tokio/src/io/util/mem.rs
Outdated
/// A unidirectional IO over a piece of memory. | ||
/// A unidirectional pipe to read and write bytes in memory. | ||
/// | ||
/// Data can be written to the pipe, and reading will return that data. | ||
/// # Example | ||
/// | ||
/// ``` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be nice to include a link to the simplex
function that acts as the primary constructor.
Currently it looks like this: https://deploy-preview-6589--tokio-rs.netlify.app/tokio/io/struct.simplexstream
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I extended the documentation with links & rebased the PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
7cddcfa
to
006bbba
Compare
tokio/src/io/util/mem.rs
Outdated
/// A unidirectional pipe to read and write bytes in memory. It can be constructed | ||
/// by [`simplex`] function which will create a pair of reader and writer or by | ||
/// calling [`SimplexStream::new_unsplit`] that will create a handle for both | ||
/// reading and writing. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Everything on the first markdown line is visible in the module docs.
Please move everything after the first period into a new paragraph.
/// A unidirectional pipe to read and write bytes in memory. It can be constructed | |
/// by [`simplex`] function which will create a pair of reader and writer or by | |
/// calling [`SimplexStream::new_unsplit`] that will create a handle for both | |
/// reading and writing. | |
/// A unidirectional pipe to read and write bytes in memory. | |
/// | |
/// It can be constructed by [`simplex`] function which will create a pair of | |
/// reader and writer or by calling [`SimplexStream::new_unsplit`] that will | |
/// create a handle for both reading and writing. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ops, should be fixed.
Up until this patch we had the `DuplexStream` which was backed by two underlying pipes. This patch makes public this underlying structure and renames it from the `Pipe` to the `SimplexStream` to provent name confusion and keep simililiarity with the already existing duplex.
Thanks! |
Bumps tokio from 1.39.3 to 1.40.0. Release notes Sourced from tokio's releases. Tokio v1.40.0 1.40.0 (August 30th, 2024) Added io: add util::SimplexStream (#6589) process: stabilize Command::process_group (#6731) sync: add {TrySendError,SendTimeoutError}::into_inner (#6755) task: add JoinSet::join_all (#6784) Added (unstable) runtime: add Builder::{on_task_spawn, on_task_terminate} (#6742) Changed io: use vectored io for write_all_buf when possible (#6724) runtime: prevent niche-optimization to avoid triggering miri (#6744) sync: mark mpsc types as UnwindSafe (#6783) sync,time: make Sleep and BatchSemaphore instrumentation explicit roots (#6727) task: use NonZeroU64 for task::Id (#6733) task: include panic message when printing JoinError (#6753) task: add #[must_use] to JoinHandle::abort_handle (#6762) time: eliminate timer wheel allocations (#6779) Documented docs: clarify that [build] section doesn't go in Cargo.toml (#6728) io: clarify zero remaining capacity case (#6790) macros: improve documentation for select! (#6774) sync: document mpsc channel allocation behavior (#6773) #6589: tokio-rs/tokio#6589 #6724: tokio-rs/tokio#6724 #6727: tokio-rs/tokio#6727 #6728: tokio-rs/tokio#6728 #6731: tokio-rs/tokio#6731 #6733: tokio-rs/tokio#6733 #6742: tokio-rs/tokio#6742 #6744: tokio-rs/tokio#6744 #6753: tokio-rs/tokio#6753 #6755: tokio-rs/tokio#6755 #6762: tokio-rs/tokio#6762 #6773: tokio-rs/tokio#6773 #6774: tokio-rs/tokio#6774 #6779: tokio-rs/tokio#6779 #6783: tokio-rs/tokio#6783 #6784: tokio-rs/tokio#6784 #6790: tokio-rs/tokio#6790 Commits ea6d652 chore: prepare Tokio v1.40.0 (#6806) 11f66f4 chore: replace ready! with std::task::ready! (#6804) 479a56a time: eliminate timer wheel allocations (#6779) b37f0de runtime: implement initial set of task hooks (#6742) c9fad08 codec: fix typo in the docs for Encoder::Error (#6800) cc70a21 task: add join_all method to JoinSet (#6784) 1ac8dff task: add AbortOnDropHandle type (#6786) ff3f2a8 io: add SimplexStream (#6589) 5b9a290 io: clarify zero remaining capacity case (#6790) 70569bd task: fix typo in TaskTracker docs (#6792) Additional commits viewable in compare view Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting @dependabot rebase. Dependabot commands and options You can trigger Dependabot actions by commenting on this PR: @dependabot rebase will rebase this PR @dependabot recreate will recreate this PR, overwriting any edits that have been made to it @dependabot merge will merge this PR after your CI passes on it @dependabot squash and merge will squash and merge this PR after your CI passes on it @dependabot cancel merge will cancel a previously requested merge and block automerging @dependabot reopen will reopen this PR if it is closed @dependabot close will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually @dependabot show <dependency name> ignore conditions will show all of the ignore conditions of the specified dependency @dependabot ignore this major version will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself) @dependabot ignore this minor version will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself) @dependabot ignore this dependency will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself)
I notice that dropping a |
I agree that it should not hang if one of the halves is dropped. However, I don't think that it will be as easy as adding |
Please open an issue for this. |
I think that it is unfortunate that the underlying structure backing the
DuplexStream
is not exposed. There are cases where a single direction will suffice, for example, when I have only a single writer and single reader, thus not requiring bidirectionality.In this patch, I renamed the
Pipe
to theSimplexStream
for coherence with the already existingDuplexStream
and made it public