Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Borrow self in read_to_end, rather than consuming #1502

Merged
merged 2 commits into from
Mar 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion quinn/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "quinn"
version = "0.9.3"
version = "0.10.0"
license = "MIT OR Apache-2.0"
repository = "https://github.com/quinn-rs/quinn"
description = "Versatile QUIC transport protocol implementation"
Expand Down
2 changes: 1 addition & 1 deletion quinn/examples/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ async fn run(options: Opt) -> Result<()> {
.await
.map_err(|e| anyhow!("failed to connect: {}", e))?;
eprintln!("connected at {:?}", start.elapsed());
let (mut send, recv) = conn
let (mut send, mut recv) = conn
.open_bi()
.await
.map_err(|e| anyhow!("failed to open stream: {}", e))?;
Expand Down
2 changes: 1 addition & 1 deletion quinn/examples/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ async fn handle_connection(root: Arc<Path>, conn: quinn::Connecting) -> Result<(

async fn handle_request(
root: Arc<Path>,
(mut send, recv): (quinn::SendStream, quinn::RecvStream),
(mut send, mut recv): (quinn::SendStream, quinn::RecvStream),
) -> Result<()> {
let req = recv
.read_to_end(64 * 1024)
Expand Down
25 changes: 13 additions & 12 deletions quinn/src/recv_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,16 @@ use crate::{
/// // In the receiving task
/// let mut buf = [0u8; 10];
/// let data = recv_stream.read_exact(&mut buf).await?;
/// recv_stream.read_to_end(0).await?;
/// if recv_stream.read_to_end(0).await.is_err() {
/// // Discard unexpected data and notify the peer to stop sending it
/// let _ = recv_stream.stop(0u8.into());
/// }
/// # Ok(())
/// # }
/// ```
///
/// Note that in this example the receiver is dropped because [`RecvStream::read_to_end`]
/// takes ownership. This results in the implicit call to `stop(0)` if the stream was not
/// finished, interrupting any further attempts to send data. Crucially the `stop` call
/// only happens after it attempted to read the entire stream.
/// An alternative approach, used in HTTP/3, is to specify a particular error code used with `stop`
/// that indicates graceful receiver-initiated stream shutdown, rather than a true error condition.
///
/// [`RecvStream::read_chunk`] could be used instead which does not take ownership and
/// allows using an explit call to [`RecvStream::stop`] with a custom error code.
Expand Down Expand Up @@ -221,15 +222,15 @@ impl RecvStream {

/// Convenience method to read all remaining data into a buffer
///
/// The returned future fails with [`ReadToEndError::TooLong`] if it's longer than `size_limit`
/// bytes. Uses unordered reads to be more efficient than using `AsyncRead` would allow.
/// `size_limit` should be set to limit worst-case memory use.
/// Fails with [`ReadToEndError::TooLong`] on reading more than `size_limit` bytes, discarding
/// all data read. Uses unordered reads to be more efficient than using `AsyncRead` would
/// allow. `size_limit` should be set to limit worst-case memory use.
///
/// If unordered reads have already been made, the resulting buffer may have gaps containing
/// arbitrary data.
///
/// [`ReadToEndError::TooLong`]: crate::ReadToEndError::TooLong
pub async fn read_to_end(self, size_limit: usize) -> Result<Vec<u8>, ReadToEndError> {
pub async fn read_to_end(&mut self, size_limit: usize) -> Result<Vec<u8>, ReadToEndError> {
ReadToEnd {
stream: self,
size_limit,
Expand Down Expand Up @@ -357,15 +358,15 @@ impl<T> From<(Option<T>, Option<proto::ReadError>)> for ReadStatus<T> {
///
/// [`RecvStream::read_to_end()`]: crate::RecvStream::read_to_end
#[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"]
struct ReadToEnd {
stream: RecvStream,
struct ReadToEnd<'a> {
stream: &'a mut RecvStream,
read: Vec<(Bytes, u64)>,
start: u64,
end: u64,
size_limit: usize,
}

impl Future for ReadToEnd {
impl Future for ReadToEnd<'_> {
type Output = Result<Vec<u8>, ReadToEndError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
loop {
Expand Down
14 changes: 7 additions & 7 deletions quinn/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ fn read_after_close() {
.await
.expect("connect");
tokio::time::sleep_until(Instant::now() + Duration::from_millis(100)).await;
let stream = new_conn.accept_uni().await.expect("incoming streams");
let mut stream = new_conn.accept_uni().await.expect("incoming streams");
let msg = stream
.read_to_end(usize::max_value())
.await
Expand Down Expand Up @@ -211,7 +211,7 @@ async fn accept_after_close() {
.expect("connection");

// ...and read what was sent.
let stream = receiver.accept_uni().await.expect("incoming streams");
let mut stream = receiver.accept_uni().await.expect("incoming streams");
let msg = stream
.read_to_end(usize::max_value())
.await
Expand Down Expand Up @@ -262,7 +262,7 @@ async fn zero_rtt() {
let connection = incoming.into_0rtt().unwrap_or_else(|_| unreachable!()).0;
let c = connection.clone();
tokio::spawn(async move {
while let Ok(x) = c.accept_uni().await {
while let Ok(mut x) = c.accept_uni().await {
let msg = x.read_to_end(usize::max_value()).await.unwrap();
assert_eq!(msg, MSG);
}
Expand All @@ -285,7 +285,7 @@ async fn zero_rtt() {
tokio::spawn(async move {
// Buy time for the driver to process the server's NewSessionTicket
tokio::time::sleep_until(Instant::now() + Duration::from_millis(100)).await;
let stream = connection.accept_uni().await.expect("incoming streams");
let mut stream = connection.accept_uni().await.expect("incoming streams");
let msg = stream
.read_to_end(usize::max_value())
.await
Expand All @@ -309,7 +309,7 @@ async fn zero_rtt() {
s.finish().await.expect("0-RTT finish");
});

let stream = connection.accept_uni().await.expect("incoming streams");
let mut stream = connection.accept_uni().await.expect("incoming streams");
let msg = stream
.read_to_end(usize::max_value())
.await
Expand Down Expand Up @@ -493,7 +493,7 @@ fn run_echo(args: EchoArgs) {

for i in 0..args.nr_streams {
println!("Opening stream {i}");
let (mut send, recv) = new_conn.open_bi().await.expect("stream open");
let (mut send, mut recv) = new_conn.open_bi().await.expect("stream open");
let msg = gen_data(args.stream_size, SEED);

let send_task = async {
Expand Down Expand Up @@ -650,7 +650,7 @@ async fn rebind_recv() {
.unwrap();
info!("rebound");
write_send.notify_one();
let stream = connection.accept_uni().await.unwrap();
let mut stream = connection.accept_uni().await.unwrap();
assert_eq!(stream.read_to_end(MSG.len()).await.unwrap(), MSG);
server.await.unwrap();
}
Expand Down
2 changes: 1 addition & 1 deletion quinn/tests/many_connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ fn connect_n_nodes_to_1_and_send_1mb_data() {
}
}

async fn read_from_peer(stream: quinn::RecvStream) -> Result<(), quinn::ConnectionError> {
async fn read_from_peer(mut stream: quinn::RecvStream) -> Result<(), quinn::ConnectionError> {
let crc = crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC);
match stream.read_to_end(1024 * 1024 * 5).await {
Ok(data) => {
Expand Down