From 6893bca8896ff70e8a238ae832440c3b6162ef74 Mon Sep 17 00:00:00 2001 From: Benjamin Saunders Date: Tue, 28 Feb 2023 16:50:26 -0800 Subject: [PATCH 1/2] Borrow `self` in read_to_end, rather than consuming Allows `RecvStream::stop` to be called with custom error codes if `ReadToEndError::TooLong` is encountered. --- quinn/examples/client.rs | 2 +- quinn/examples/server.rs | 2 +- quinn/src/recv_stream.rs | 25 +++++++++++++------------ quinn/src/tests.rs | 14 +++++++------- quinn/tests/many_connections.rs | 2 +- 5 files changed, 23 insertions(+), 22 deletions(-) diff --git a/quinn/examples/client.rs b/quinn/examples/client.rs index 0cbc4dca5..16f90b7b2 100644 --- a/quinn/examples/client.rs +++ b/quinn/examples/client.rs @@ -113,7 +113,7 @@ async fn run(options: Opt) -> Result<()> { .await .map_err(|e| anyhow!("failed to connect: {}", e))?; eprintln!("connected at {:?}", start.elapsed()); - let (mut send, recv) = conn + let (mut send, mut recv) = conn .open_bi() .await .map_err(|e| anyhow!("failed to open stream: {}", e))?; diff --git a/quinn/examples/server.rs b/quinn/examples/server.rs index 849fcdd18..3ffc9d463 100644 --- a/quinn/examples/server.rs +++ b/quinn/examples/server.rs @@ -205,7 +205,7 @@ async fn handle_connection(root: Arc, conn: quinn::Connecting) -> Result<( async fn handle_request( root: Arc, - (mut send, recv): (quinn::SendStream, quinn::RecvStream), + (mut send, mut recv): (quinn::SendStream, quinn::RecvStream), ) -> Result<()> { let req = recv .read_to_end(64 * 1024) diff --git a/quinn/src/recv_stream.rs b/quinn/src/recv_stream.rs index 1ea18b14d..d433a18e8 100644 --- a/quinn/src/recv_stream.rs +++ b/quinn/src/recv_stream.rs @@ -51,15 +51,16 @@ use crate::{ /// // In the receiving task /// let mut buf = [0u8; 10]; /// let data = recv_stream.read_exact(&mut buf).await?; -/// recv_stream.read_to_end(0).await?; +/// if recv_stream.read_to_end(0).await.is_err() { +/// // Discard unexpected data and notify the peer to stop sending it +/// let _ = recv_stream.stop(0u8.into()); +/// } /// # Ok(()) /// # } /// ``` /// -/// Note that in this example the receiver is dropped because [`RecvStream::read_to_end`] -/// takes ownership. This results in the implicit call to `stop(0)` if the stream was not -/// finished, interrupting any further attempts to send data. Crucially the `stop` call -/// only happens after it attempted to read the entire stream. +/// An alternative approach, used in HTTP/3, is to specify a particular error code used with `stop` +/// that indicates graceful receiver-initiated stream shutdown, rather than a true error condition. /// /// [`RecvStream::read_chunk`] could be used instead which does not take ownership and /// allows using an explit call to [`RecvStream::stop`] with a custom error code. @@ -221,15 +222,15 @@ impl RecvStream { /// Convenience method to read all remaining data into a buffer /// - /// The returned future fails with [`ReadToEndError::TooLong`] if it's longer than `size_limit` - /// bytes. Uses unordered reads to be more efficient than using `AsyncRead` would allow. - /// `size_limit` should be set to limit worst-case memory use. + /// Fails with [`ReadToEndError::TooLong`] on reading more than `size_limit` bytes, discarding + /// all data read. Uses unordered reads to be more efficient than using `AsyncRead` would + /// allow. `size_limit` should be set to limit worst-case memory use. /// /// If unordered reads have already been made, the resulting buffer may have gaps containing /// arbitrary data. /// /// [`ReadToEndError::TooLong`]: crate::ReadToEndError::TooLong - pub async fn read_to_end(self, size_limit: usize) -> Result, ReadToEndError> { + pub async fn read_to_end(&mut self, size_limit: usize) -> Result, ReadToEndError> { ReadToEnd { stream: self, size_limit, @@ -357,15 +358,15 @@ impl From<(Option, Option)> for ReadStatus { /// /// [`RecvStream::read_to_end()`]: crate::RecvStream::read_to_end #[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"] -struct ReadToEnd { - stream: RecvStream, +struct ReadToEnd<'a> { + stream: &'a mut RecvStream, read: Vec<(Bytes, u64)>, start: u64, end: u64, size_limit: usize, } -impl Future for ReadToEnd { +impl Future for ReadToEnd<'_> { type Output = Result, ReadToEndError>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { loop { diff --git a/quinn/src/tests.rs b/quinn/src/tests.rs index 92c998f4d..6c93d6731 100644 --- a/quinn/src/tests.rs +++ b/quinn/src/tests.rs @@ -140,7 +140,7 @@ fn read_after_close() { .await .expect("connect"); tokio::time::sleep_until(Instant::now() + Duration::from_millis(100)).await; - let stream = new_conn.accept_uni().await.expect("incoming streams"); + let mut stream = new_conn.accept_uni().await.expect("incoming streams"); let msg = stream .read_to_end(usize::max_value()) .await @@ -211,7 +211,7 @@ async fn accept_after_close() { .expect("connection"); // ...and read what was sent. - let stream = receiver.accept_uni().await.expect("incoming streams"); + let mut stream = receiver.accept_uni().await.expect("incoming streams"); let msg = stream .read_to_end(usize::max_value()) .await @@ -262,7 +262,7 @@ async fn zero_rtt() { let connection = incoming.into_0rtt().unwrap_or_else(|_| unreachable!()).0; let c = connection.clone(); tokio::spawn(async move { - while let Ok(x) = c.accept_uni().await { + while let Ok(mut x) = c.accept_uni().await { let msg = x.read_to_end(usize::max_value()).await.unwrap(); assert_eq!(msg, MSG); } @@ -285,7 +285,7 @@ async fn zero_rtt() { tokio::spawn(async move { // Buy time for the driver to process the server's NewSessionTicket tokio::time::sleep_until(Instant::now() + Duration::from_millis(100)).await; - let stream = connection.accept_uni().await.expect("incoming streams"); + let mut stream = connection.accept_uni().await.expect("incoming streams"); let msg = stream .read_to_end(usize::max_value()) .await @@ -309,7 +309,7 @@ async fn zero_rtt() { s.finish().await.expect("0-RTT finish"); }); - let stream = connection.accept_uni().await.expect("incoming streams"); + let mut stream = connection.accept_uni().await.expect("incoming streams"); let msg = stream .read_to_end(usize::max_value()) .await @@ -493,7 +493,7 @@ fn run_echo(args: EchoArgs) { for i in 0..args.nr_streams { println!("Opening stream {i}"); - let (mut send, recv) = new_conn.open_bi().await.expect("stream open"); + let (mut send, mut recv) = new_conn.open_bi().await.expect("stream open"); let msg = gen_data(args.stream_size, SEED); let send_task = async { @@ -650,7 +650,7 @@ async fn rebind_recv() { .unwrap(); info!("rebound"); write_send.notify_one(); - let stream = connection.accept_uni().await.unwrap(); + let mut stream = connection.accept_uni().await.unwrap(); assert_eq!(stream.read_to_end(MSG.len()).await.unwrap(), MSG); server.await.unwrap(); } diff --git a/quinn/tests/many_connections.rs b/quinn/tests/many_connections.rs index e7accb335..0f9b0677a 100644 --- a/quinn/tests/many_connections.rs +++ b/quinn/tests/many_connections.rs @@ -91,7 +91,7 @@ fn connect_n_nodes_to_1_and_send_1mb_data() { } } -async fn read_from_peer(stream: quinn::RecvStream) -> Result<(), quinn::ConnectionError> { +async fn read_from_peer(mut stream: quinn::RecvStream) -> Result<(), quinn::ConnectionError> { let crc = crc::Crc::::new(&crc::CRC_32_ISO_HDLC); match stream.read_to_end(1024 * 1024 * 5).await { Ok(data) => { From 7e1d8de3f3770dea27aede3e16b3aea1c2bf9973 Mon Sep 17 00:00:00 2001 From: Benjamin Saunders Date: Sat, 4 Mar 2023 15:27:32 -0800 Subject: [PATCH 2/2] Bump version --- quinn/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/quinn/Cargo.toml b/quinn/Cargo.toml index 877b72bfb..a43f49325 100644 --- a/quinn/Cargo.toml +++ b/quinn/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "quinn" -version = "0.9.3" +version = "0.10.0" license = "MIT OR Apache-2.0" repository = "https://github.com/quinn-rs/quinn" description = "Versatile QUIC transport protocol implementation"