diff --git a/bench/src/bin/bulk.rs b/bench/src/bin/bulk.rs index c0f28ff001..add99ee6ea 100644 --- a/bench/src/bin/bulk.rs +++ b/bench/src/bin/bulk.rs @@ -65,22 +65,17 @@ async fn server(mut incoming: quinn::Incoming, opt: Opt) -> Result<()> { // Handle only the expected amount of clients for _ in 0..opt.clients { let handshake = incoming.next().await.unwrap(); - let quinn::NewConnection { - mut bi_streams, - connection, - .. - } = handshake.await.context("handshake failed")?; + let connection = handshake.await.context("handshake failed")?; server_tasks.push(tokio::spawn(async move { loop { - let (mut send_stream, mut recv_stream) = match bi_streams.next().await { - None => break, - Some(Err(quinn::ConnectionError::ApplicationClosed(_))) => break, - Some(Err(e)) => { + let (mut send_stream, mut recv_stream) = match connection.accept_bi().await { + Err(quinn::ConnectionError::ApplicationClosed(_)) => break, + Err(e) => { eprintln!("accepting stream failed: {:?}", e); break; } - Some(Ok(stream)) => stream, + Ok(stream) => stream, }; trace!("stream established"); diff --git a/bench/src/lib.rs b/bench/src/lib.rs index f4356a74ca..10006724c1 100644 --- a/bench/src/lib.rs +++ b/bench/src/lib.rs @@ -70,7 +70,7 @@ pub async fn connect_client( let mut client_config = quinn::ClientConfig::new(Arc::new(crypto)); client_config.transport_config(Arc::new(transport_config(&opt))); - let quinn::NewConnection { connection, .. } = endpoint + let connection = endpoint .connect_with(client_config, server_addr, "localhost") .unwrap() .await diff --git a/perf/src/bin/perf_client.rs b/perf/src/bin/perf_client.rs index 6c7dbed62f..f0fd477e78 100644 --- a/perf/src/bin/perf_client.rs +++ b/perf/src/bin/perf_client.rs @@ -119,24 +119,17 @@ async fn run(opt: Opt) -> Result<()> { let stream_stats = OpenStreamStats::default(); - let quinn::NewConnection { - connection, - uni_streams, - .. - } = endpoint + let connection = endpoint .connect_with(cfg, addr, host_name)? .await .context("connecting")?; info!("established"); - let acceptor = UniAcceptor(Arc::new(tokio::sync::Mutex::new(uni_streams))); - let drive_fut = async { tokio::try_join!( drive_uni( connection.clone(), - acceptor, stream_stats.clone(), opt.uni_requests, opt.upload_size, @@ -236,7 +229,6 @@ async fn drain_stream( async fn drive_uni( connection: quinn::Connection, - acceptor: UniAcceptor, stream_stats: OpenStreamStats, concurrency: u64, upload: u64, @@ -247,12 +239,12 @@ async fn drive_uni( loop { let permit = sem.clone().acquire_owned().await.unwrap(); let send = connection.open_uni().await?; - let acceptor = acceptor.clone(); let stream_stats = stream_stats.clone(); debug!("sending request on {}", send.id()); + let connection = connection.clone(); tokio::spawn(async move { - if let Err(e) = request_uni(send, acceptor, upload, download, stream_stats).await { + if let Err(e) = request_uni(send, connection, upload, download, stream_stats).await { error!("sending request failed: {:#}", e); } @@ -263,19 +255,13 @@ async fn drive_uni( async fn request_uni( send: quinn::SendStream, - acceptor: UniAcceptor, + conn: quinn::Connection, upload: u64, download: u64, stream_stats: OpenStreamStats, ) -> Result<()> { request(send, upload, download, stream_stats.clone()).await?; - let recv = { - let mut guard = acceptor.0.lock().await; - guard - .next() - .await - .ok_or_else(|| anyhow::anyhow!("End of stream")) - }??; + let recv = conn.accept_uni().await?; drain_stream(recv, download, stream_stats).await?; Ok(()) } @@ -348,9 +334,6 @@ async fn request_bi( Ok(()) } -#[derive(Clone)] -struct UniAcceptor(Arc>); - struct SkipServerVerification; impl SkipServerVerification { diff --git a/perf/src/bin/perf_server.rs b/perf/src/bin/perf_server.rs index 502f59906f..391f2297a0 100644 --- a/perf/src/bin/perf_server.rs +++ b/perf/src/bin/perf_server.rs @@ -103,16 +103,11 @@ async fn run(opt: Opt) -> Result<()> { } async fn handle(handshake: quinn::Connecting, opt: Arc) -> Result<()> { - let quinn::NewConnection { - uni_streams, - bi_streams, - connection, - .. - } = handshake.await.context("handshake failed")?; + let connection = handshake.await.context("handshake failed")?; debug!("{} connected", connection.remote_address()); tokio::try_join!( - drive_uni(connection.clone(), uni_streams), - drive_bi(bi_streams), + drive_uni(connection.clone()), + drive_bi(connection.clone()), conn_stats(connection, opt) )?; Ok(()) @@ -129,12 +124,8 @@ async fn conn_stats(connection: quinn::Connection, opt: Arc) -> Result<()> Ok(()) } -async fn drive_uni( - connection: quinn::Connection, - mut streams: quinn::IncomingUniStreams, -) -> Result<()> { - while let Some(stream) = streams.next().await { - let stream = stream?; +async fn drive_uni(connection: quinn::Connection) -> Result<()> { + while let Ok(stream) = connection.accept_uni().await { let connection = connection.clone(); tokio::spawn(async move { if let Err(e) = handle_uni(connection, stream).await { @@ -152,9 +143,8 @@ async fn handle_uni(connection: quinn::Connection, stream: quinn::RecvStream) -> Ok(()) } -async fn drive_bi(mut streams: quinn::IncomingBiStreams) -> Result<()> { - while let Some(stream) = streams.next().await { - let (send, recv) = stream?; +async fn drive_bi(connection: quinn::Connection) -> Result<()> { + while let Ok((send, recv)) = connection.accept_bi().await { tokio::spawn(async move { if let Err(e) = handle_bi(send, recv).await { error!("request failed: {:#}", e); diff --git a/quinn/benches/bench.rs b/quinn/benches/bench.rs index 0ccfef9249..6405980476 100644 --- a/quinn/benches/bench.rs +++ b/quinn/benches/bench.rs @@ -106,16 +106,14 @@ impl Context { }; let handle = runtime.spawn( async move { - let quinn::NewConnection { - mut uni_streams, .. - } = incoming + let connection = incoming .next() .await .expect("accept") .await .expect("connect"); - while let Some(Ok(mut stream)) = uni_streams.next().await { + while let Ok(mut stream) = connection.accept_uni().await { tokio::spawn(async move { while stream .read_chunk(usize::MAX, false) @@ -142,7 +140,7 @@ impl Context { let _guard = runtime.enter(); Endpoint::client(SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 0)).unwrap() }; - let quinn::NewConnection { connection, .. } = runtime + let connection = runtime .block_on(async { endpoint .connect_with(self.client_config.clone(), server_addr, "localhost") diff --git a/quinn/examples/client.rs b/quinn/examples/client.rs index aa4d5fe8c9..3d358cae83 100644 --- a/quinn/examples/client.rs +++ b/quinn/examples/client.rs @@ -108,14 +108,11 @@ async fn run(options: Opt) -> Result<()> { .ok_or_else(|| anyhow!("no hostname specified"))?; eprintln!("connecting to {} at {}", host, remote); - let new_conn = endpoint + let conn = endpoint .connect(remote, host)? .await .map_err(|e| anyhow!("failed to connect: {}", e))?; eprintln!("connected at {:?}", start.elapsed()); - let quinn::NewConnection { - connection: conn, .. - } = new_conn; let (mut send, recv) = conn .open_bi() .await diff --git a/quinn/examples/connection.rs b/quinn/examples/connection.rs index be7f8b0550..3d0be31db6 100644 --- a/quinn/examples/connection.rs +++ b/quinn/examples/connection.rs @@ -12,21 +12,17 @@ async fn main() -> Result<(), Box> { // accept a single connection tokio::spawn(async move { let incoming_conn = incoming.next().await.unwrap(); - let new_conn = incoming_conn.await.unwrap(); + let conn = incoming_conn.await.unwrap(); println!( "[server] connection accepted: addr={}", - new_conn.connection.remote_address() + conn.remote_address() ); // Dropping all handles associated with a connection implicitly closes it }); let endpoint = make_client_endpoint("0.0.0.0:0".parse().unwrap(), &[&server_cert])?; // connect to server - let quinn::NewConnection { - connection, - mut uni_streams, - .. - } = endpoint + let connection = endpoint .connect(server_addr, "localhost") .unwrap() .await @@ -34,7 +30,7 @@ async fn main() -> Result<(), Box> { println!("[client] connected: addr={}", connection.remote_address()); // Waiting for a stream will complete with an error when the server closes the connection - let _ = uni_streams.next().await; + let _ = connection.accept_uni().await; // Give the server has a chance to clean up endpoint.wait_idle().await; diff --git a/quinn/examples/insecure_connection.rs b/quinn/examples/insecure_connection.rs index ce15e94ac7..4dcec3bd6c 100644 --- a/quinn/examples/insecure_connection.rs +++ b/quinn/examples/insecure_connection.rs @@ -23,10 +23,10 @@ async fn run_server(addr: SocketAddr) { let (mut incoming, _server_cert) = make_server_endpoint(addr).unwrap(); // accept a single connection let incoming_conn = incoming.next().await.unwrap(); - let new_conn = incoming_conn.await.unwrap(); + let conn = incoming_conn.await.unwrap(); println!( "[server] connection accepted: addr={}", - new_conn.connection.remote_address() + conn.remote_address() ); } @@ -36,7 +36,7 @@ async fn run_client(server_addr: SocketAddr) -> Result<(), Box> { endpoint.set_default_client_config(client_cfg); // connect to server - let quinn::NewConnection { connection, .. } = endpoint + let connection = endpoint .connect(server_addr, "localhost") .unwrap() .await diff --git a/quinn/examples/server.rs b/quinn/examples/server.rs index e0ef9caaa6..de48bcba65 100644 --- a/quinn/examples/server.rs +++ b/quinn/examples/server.rs @@ -161,11 +161,7 @@ async fn run(options: Opt) -> Result<()> { } async fn handle_connection(root: Arc, conn: quinn::Connecting) -> Result<()> { - let quinn::NewConnection { - connection, - mut bi_streams, - .. - } = conn.await?; + let connection = conn.await?; let span = info_span!( "connection", remote = %connection.remote_address(), @@ -180,7 +176,8 @@ async fn handle_connection(root: Arc, conn: quinn::Connecting) -> Result<( info!("established"); // Each stream initiated by the client constitutes a new request. - while let Some(stream) = bi_streams.next().await { + loop { + let stream = connection.accept_bi().await; let stream = match stream { Err(quinn::ConnectionError::ApplicationClosed { .. }) => { info!("connection closed"); @@ -201,7 +198,6 @@ async fn handle_connection(root: Arc, conn: quinn::Connecting) -> Result<( .instrument(info_span!("request")), ); } - Ok(()) } .instrument(span) .await?; diff --git a/quinn/examples/single_socket.rs b/quinn/examples/single_socket.rs index dff7a0fc78..bb8c25c95f 100644 --- a/quinn/examples/single_socket.rs +++ b/quinn/examples/single_socket.rs @@ -41,7 +41,7 @@ fn run_server(addr: SocketAddr) -> Result, Box> { let (mut incoming, server_cert) = make_server_endpoint(addr)?; // accept a single connection tokio::spawn(async move { - let quinn::NewConnection { connection, .. } = incoming.next().await.unwrap().await.unwrap(); + let connection = incoming.next().await.unwrap().await.unwrap(); println!( "[server] incoming connection: addr={}", connection.remote_address() @@ -54,6 +54,6 @@ fn run_server(addr: SocketAddr) -> Result, Box> { /// Attempt QUIC connection with the given server address. async fn run_client(endpoint: &Endpoint, server_addr: SocketAddr) { let connect = endpoint.connect(server_addr, "localhost").unwrap(); - let quinn::NewConnection { connection, .. } = connect.await.unwrap(); + let connection = connect.await.unwrap(); println!("[client] connected: addr={}", connection.remote_address()); } diff --git a/quinn/src/connection.rs b/quinn/src/connection.rs index 72558cd502..e8c0714640 100644 --- a/quinn/src/connection.rs +++ b/quinn/src/connection.rs @@ -2,7 +2,6 @@ use std::{ any::Any, fmt, future::Future, - mem, net::{IpAddr, SocketAddr}, pin::Pin, sync::Arc, @@ -22,7 +21,6 @@ use udp::UdpState; use crate::{ mutex::Mutex, - poll_fn, recv_stream::RecvStream, send_stream::{SendStream, WriteError}, ConnectionEvent, EndpointEvent, VarInt, @@ -94,7 +92,7 @@ impl Connecting { /// ticket is found, `self` is returned unmodified. /// /// For incoming connections, a 0.5-RTT connection will always be successfully constructed. - pub fn into_0rtt(mut self) -> Result<(NewConnection, ZeroRttAccepted), Self> { + pub fn into_0rtt(mut self) -> Result<(Connection, ZeroRttAccepted), Self> { // This lock borrows `self` and would normally be dropped at the end of this scope, so we'll // have to release it explicitly before returning `self` by value. let conn = (self.conn.as_mut().unwrap()).state.lock("into_0rtt"); @@ -104,7 +102,7 @@ impl Connecting { if is_ok { let conn = self.conn.take().unwrap(); - Ok((NewConnection::new(conn), ZeroRttAccepted(self.connected))) + Ok((Connection(conn), ZeroRttAccepted(self.connected))) } else { Err(self) } @@ -160,14 +158,14 @@ impl Connecting { } impl Future for Connecting { - type Output = Result; + type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { Pin::new(&mut self.connected).poll(cx).map(|_| { let conn = self.conn.take().unwrap(); let inner = conn.state.lock("connecting"); if inner.connected { drop(inner); - Ok(NewConnection::new(conn)) + Ok(Connection(conn)) } else { Err(inner .error @@ -202,59 +200,6 @@ impl Future for ZeroRttAccepted { } } -/// Components of a newly established connection -/// -/// All fields of this struct, in addition to any other handles constructed later, must be dropped -/// for a connection to be implicitly closed. If the `NewConnection` is stored in a long-lived -/// variable, moving individual fields won't cause remaining unused fields to be dropped, even with -/// pattern-matching. The easiest way to ensure unused fields are dropped is to pattern-match on the -/// variable wrapped in brackets, which forces the entire `NewConnection` to be moved out of the -/// variable and into a temporary, ensuring all unused fields are dropped at the end of the -/// statement: -/// -#[cfg_attr( - feature = "rustls", - doc = "```rust -# use quinn::NewConnection; -# fn dummy(new_connection: NewConnection) { -let NewConnection { connection, .. } = { new_connection }; -# } -```" -)] -/// -/// You can also explicitly invoke [`Connection::close()`] at any time. -/// -/// [`Connection::close()`]: crate::Connection::close -#[derive(Debug)] -#[non_exhaustive] -pub struct NewConnection { - /// Handle for interacting with the connection - pub connection: Connection, - /// Unidirectional streams initiated by the peer, in the order they were opened - /// - /// Note that data for separate streams may be delivered in any order. In other words, reading - /// from streams in the order they're opened is not optimal. See [`IncomingUniStreams`] for - /// details. - /// - /// [`IncomingUniStreams`]: crate::IncomingUniStreams - pub uni_streams: IncomingUniStreams, - /// Bidirectional streams initiated by the peer, in the order they were opened - pub bi_streams: IncomingBiStreams, - /// Unordered, unreliable datagrams sent by the peer - pub datagrams: Datagrams, -} - -impl NewConnection { - fn new(conn: ConnectionRef) -> Self { - Self { - connection: Connection(conn.clone()), - uni_streams: IncomingUniStreams(conn.clone()), - bi_streams: IncomingBiStreams(conn.clone()), - datagrams: Datagrams(conn), - } - } -} - /// A future that drives protocol logic for a connection /// /// This future handles the protocol logic for a single connection, routing events from the @@ -668,96 +613,6 @@ fn poll_open<'a>( } } -/// A stream of unidirectional QUIC streams initiated by a remote peer. -/// -/// Incoming streams are *always* opened in the same order that the peer created them, but data can -/// be delivered to open streams in any order. This allows meaning to be assigned to the sequence in -/// which streams are opened. For example, a file transfer protocol might designate the first stream -/// the client opens as a "control" stream, using all others for exchanging file data. -/// -/// Processing streams in the order they're opened will produce head-of-line blocking. For best -/// performance, an application should be prepared to fully process later streams before any data is -/// received on earlier streams. -#[derive(Debug)] -pub struct IncomingUniStreams(ConnectionRef); - -impl IncomingUniStreams { - /// Fetch the next incoming unidirectional stream - pub async fn next(&mut self) -> Option> { - poll_fn(move |cx| self.poll(cx)).await - } - - fn poll(&mut self, cx: &mut Context) -> Poll>> { - let mut conn = self.0.state.lock("IncomingUniStreams::poll"); - if let Some(x) = conn.inner.streams().accept(Dir::Uni) { - conn.wake(); // To send additional stream ID credit - mem::drop(conn); // Release the lock so clone can take it - Poll::Ready(Some(Ok(RecvStream::new(self.0.clone(), x, false)))) - } else if let Some(ConnectionError::LocallyClosed) = conn.error { - Poll::Ready(None) - } else if let Some(ref e) = conn.error { - Poll::Ready(Some(Err(e.clone()))) - } else { - conn.incoming_uni_streams_reader = Some(cx.waker().clone()); - Poll::Pending - } - } -} - -#[cfg(feature = "futures-core")] -impl futures_core::Stream for IncomingUniStreams { - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - self.poll(cx) - } -} - -/// A stream of bidirectional QUIC streams initiated by a remote peer. -/// -/// See `IncomingUniStreams` for information about incoming streams in general. -#[derive(Debug)] -pub struct IncomingBiStreams(ConnectionRef); - -impl IncomingBiStreams { - /// Fetch the next incoming unidirectional stream - pub async fn next(&mut self) -> Option> { - poll_fn(move |cx| self.poll(cx)).await - } - - fn poll( - &mut self, - cx: &mut Context, - ) -> Poll>> { - let mut conn = self.0.state.lock("IncomingBiStreams::poll"); - if let Some(x) = conn.inner.streams().accept(Dir::Bi) { - let is_0rtt = conn.inner.is_handshaking(); - conn.wake(); // To send additional stream ID credit - mem::drop(conn); // Release the lock so clone can take it - Poll::Ready(Some(Ok(( - SendStream::new(self.0.clone(), x, is_0rtt), - RecvStream::new(self.0.clone(), x, is_0rtt), - )))) - } else if let Some(ConnectionError::LocallyClosed) = conn.error { - Poll::Ready(None) - } else if let Some(ref e) = conn.error { - Poll::Ready(Some(Err(e.clone()))) - } else { - conn.incoming_bi_streams_reader = Some(cx.waker().clone()); - Poll::Pending - } - } -} - -#[cfg(feature = "futures-core")] -impl futures_core::Stream for IncomingBiStreams { - type Item = Result<(SendStream, RecvStream), ConnectionError>; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - self.poll(cx) - } -} - pin_project! { /// Future produced by [`Connection::accept_uni`] pub struct AcceptUni<'a> { @@ -851,40 +706,6 @@ impl Future for ReadDatagram<'_> { } } -/// Stream of unordered, unreliable datagrams sent by the peer -#[derive(Debug)] -pub struct Datagrams(ConnectionRef); - -impl Datagrams { - /// Fetch the next application datagram from the peer - pub async fn next(&mut self) -> Option> { - poll_fn(move |cx| self.poll(cx)).await - } - - fn poll(&mut self, cx: &mut Context) -> Poll>> { - let mut conn = self.0.state.lock("Datagrams::poll_next"); - if let Some(x) = conn.inner.datagrams().recv() { - Poll::Ready(Some(Ok(x))) - } else if let Some(ConnectionError::LocallyClosed) = conn.error { - Poll::Ready(None) - } else if let Some(ref e) = conn.error { - Poll::Ready(Some(Err(e.clone()))) - } else { - conn.datagram_reader = Some(cx.waker().clone()); - Poll::Pending - } - } -} - -#[cfg(feature = "futures-core")] -impl futures_core::Stream for Datagrams { - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - self.poll(cx) - } -} - #[derive(Debug)] pub struct ConnectionRef(Arc); @@ -914,9 +735,6 @@ impl ConnectionRef { endpoint_events, blocked_writers: FxHashMap::default(), blocked_readers: FxHashMap::default(), - incoming_uni_streams_reader: None, - incoming_bi_streams_reader: None, - datagram_reader: None, finishing: FxHashMap::default(), stopped: FxHashMap::default(), error: None, @@ -995,9 +813,6 @@ pub(crate) struct State { endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>, pub(crate) blocked_writers: FxHashMap, pub(crate) blocked_readers: FxHashMap, - incoming_uni_streams_reader: Option, - incoming_bi_streams_reader: Option, - datagram_reader: Option, pub(crate) finishing: FxHashMap>>, pub(crate) stopped: FxHashMap, /// Always set to Some before the connection becomes drained @@ -1103,21 +918,12 @@ impl State { } Stream(StreamEvent::Opened { dir: Dir::Uni }) => { shared.stream_incoming[Dir::Uni as usize].notify_waiters(); - if let Some(x) = self.incoming_uni_streams_reader.take() { - x.wake(); - } } Stream(StreamEvent::Opened { dir: Dir::Bi }) => { shared.stream_incoming[Dir::Bi as usize].notify_waiters(); - if let Some(x) = self.incoming_bi_streams_reader.take() { - x.wake(); - } } DatagramReceived => { shared.datagrams.notify_waiters(); - if let Some(x) = self.datagram_reader.take() { - x.wake(); - } } Stream(StreamEvent::Readable { id }) => { if let Some(reader) = self.blocked_readers.remove(&id) { @@ -1222,16 +1028,7 @@ impl State { shared.stream_opening[Dir::Bi as usize].notify_waiters(); shared.stream_incoming[Dir::Uni as usize].notify_waiters(); shared.stream_incoming[Dir::Bi as usize].notify_waiters(); - if let Some(x) = self.incoming_uni_streams_reader.take() { - x.wake(); - } - if let Some(x) = self.incoming_bi_streams_reader.take() { - x.wake(); - } shared.datagrams.notify_waiters(); - if let Some(x) = self.datagram_reader.take() { - x.wake(); - } for (_, x) in self.finishing.drain() { let _ = x.send(Some(WriteError::ConnectionLost(reason.clone()))); } diff --git a/quinn/src/lib.rs b/quinn/src/lib.rs index e4bd4986be..40a51d967e 100644 --- a/quinn/src/lib.rs +++ b/quinn/src/lib.rs @@ -70,9 +70,8 @@ pub use proto::{ }; pub use crate::connection::{ - AcceptBi, AcceptUni, Connecting, Connection, Datagrams, IncomingBiStreams, IncomingUniStreams, - NewConnection, OpenBi, OpenUni, ReadDatagram, SendDatagramError, UnknownStream, - ZeroRttAccepted, + AcceptBi, AcceptUni, Connecting, Connection, OpenBi, OpenUni, ReadDatagram, SendDatagramError, + UnknownStream, ZeroRttAccepted, }; pub use crate::endpoint::{Endpoint, Incoming}; pub use crate::recv_stream::{ReadError, ReadExactError, ReadToEndError, RecvStream}; diff --git a/quinn/src/tests.rs b/quinn/src/tests.rs index 94bfc23f9e..34bdfec1f2 100644 --- a/quinn/src/tests.rs +++ b/quinn/src/tests.rs @@ -19,9 +19,7 @@ use tracing::{info, info_span}; use tracing_futures::Instrument as _; use tracing_subscriber::EnvFilter; -use super::{ - ClientConfig, Endpoint, Incoming, NewConnection, RecvStream, SendStream, TransportConfig, -}; +use super::{ClientConfig, Endpoint, Incoming, RecvStream, SendStream, TransportConfig}; #[test] fn handshake_timeout() { @@ -130,23 +128,18 @@ fn read_after_close() { .expect("endpoint") .await .expect("connection"); - let mut s = new_conn.connection.open_uni().await.unwrap(); + let mut s = new_conn.open_uni().await.unwrap(); s.write_all(MSG).await.unwrap(); s.finish().await.unwrap(); }); runtime.block_on(async move { - let mut new_conn = endpoint + let new_conn = endpoint .connect(endpoint.local_addr().unwrap(), "localhost") .unwrap() .await .expect("connect"); tokio::time::sleep_until(Instant::now() + Duration::from_millis(100)).await; - let stream = new_conn - .uni_streams - .next() - .await - .expect("incoming streams") - .expect("missing stream"); + let stream = new_conn.accept_uni().await.expect("incoming streams"); let msg = stream .read_to_end(usize::max_value()) .await @@ -178,12 +171,10 @@ fn export_keying_material() { .expect("connection"); let mut i_buf = [0u8; 64]; incoming_conn - .connection .export_keying_material(&mut i_buf, b"asdf", b"qwer") .unwrap(); let mut o_buf = [0u8; 64]; outgoing_conn - .connection .export_keying_material(&mut o_buf, b"asdf", b"qwer") .unwrap(); assert_eq!(&i_buf[..], &o_buf[..]); @@ -201,8 +192,7 @@ async fn accept_after_close() { .connect(endpoint.local_addr().unwrap(), "localhost") .unwrap() .await - .expect("connect") - .connection; + .expect("connect"); let mut s = sender.open_uni().await.unwrap(); s.write_all(MSG).await.unwrap(); s.finish().await.unwrap(); @@ -212,7 +202,7 @@ async fn accept_after_close() { tokio::time::sleep(Duration::from_millis(100)).await; // Despite the connection having closed, we should be able to accept it... - let mut receiver = incoming + let receiver = incoming .next() .await .expect("endpoint") @@ -220,12 +210,7 @@ async fn accept_after_close() { .expect("connection"); // ...and read what was sent. - let stream = receiver - .uni_streams - .next() - .await - .expect("incoming streams") - .expect("missing stream"); + let stream = receiver.accept_uni().await.expect("incoming streams"); let msg = stream .read_to_end(usize::max_value()) .await @@ -233,7 +218,7 @@ async fn accept_after_close() { assert_eq!(msg, MSG); // But it's still definitely closed. - assert!(receiver.connection.open_uni().await.is_err()); + assert!(receiver.open_uni().await.is_err()); } /// Construct an endpoint suitable for connecting to itself @@ -265,13 +250,10 @@ async fn zero_rtt() { tokio::spawn(async move { for _ in 0..2 { let incoming = incoming.next().await.unwrap(); - let NewConnection { - mut uni_streams, - connection, - .. - } = incoming.into_0rtt().unwrap_or_else(|_| unreachable!()).0; + let connection = incoming.into_0rtt().unwrap_or_else(|_| unreachable!()).0; + let c = connection.clone(); tokio::spawn(async move { - while let Some(Ok(x)) = uni_streams.next().await { + while let Ok(x) = c.accept_uni().await { let msg = x.read_to_end(usize::max_value()).await.unwrap(); assert_eq!(msg, MSG); } @@ -282,9 +264,7 @@ async fn zero_rtt() { } }); - let NewConnection { - mut uni_streams, .. - } = endpoint + let connection = endpoint .connect(endpoint.local_addr().unwrap(), "localhost") .unwrap() .into_0rtt() @@ -296,11 +276,7 @@ async fn zero_rtt() { tokio::spawn(async move { // Buy time for the driver to process the server's NewSessionTicket tokio::time::sleep_until(Instant::now() + Duration::from_millis(100)).await; - let stream = uni_streams - .next() - .await - .expect("incoming streams") - .expect("missing stream"); + let stream = connection.accept_uni().await.expect("incoming streams"); let msg = stream .read_to_end(usize::max_value()) .await @@ -311,30 +287,20 @@ async fn zero_rtt() { info!("initial connection complete"); - let ( - NewConnection { - connection, - mut uni_streams, - .. - }, - zero_rtt, - ) = endpoint + let (connection, zero_rtt) = endpoint .connect(endpoint.local_addr().unwrap(), "localhost") .unwrap() .into_0rtt() .unwrap_or_else(|_| panic!("missing 0-RTT keys")); // Send something ASAP to use 0-RTT + let c = connection.clone(); tokio::spawn(async move { - let mut s = connection.open_uni().await.expect("0-RTT open uni"); + let mut s = c.open_uni().await.expect("0-RTT open uni"); s.write_all(MSG).await.expect("0-RTT write"); s.finish().await.expect("0-RTT finish"); }); - let stream = uni_streams - .next() - .await - .expect("incoming streams") - .expect("missing stream"); + let stream = connection.accept_uni().await.expect("incoming streams"); let msg = stream .read_to_end(usize::max_value()) .await @@ -342,7 +308,7 @@ async fn zero_rtt() { assert_eq!(msg, MSG); assert!(zero_rtt.await); - drop(uni_streams); + drop(connection); endpoint.wait_idle().await; } @@ -494,9 +460,9 @@ fn run_echo(args: EchoArgs) { assert_eq!(None, incoming.local_ip()); } - let mut new_conn = incoming.instrument(info_span!("server")).await.unwrap(); + let new_conn = incoming.instrument(info_span!("server")).await.unwrap(); tokio::spawn(async move { - while let Some(Ok(stream)) = new_conn.bi_streams.next().await { + while let Ok(stream) = new_conn.accept_bi().await { tokio::spawn(echo(stream)); } }); @@ -517,7 +483,7 @@ fn run_echo(args: EchoArgs) { for i in 0..args.nr_streams { println!("Opening stream {}", i); - let (mut send, recv) = new_conn.connection.open_bi().await.expect("stream open"); + let (mut send, recv) = new_conn.open_bi().await.expect("stream open"); let msg = gen_data(args.stream_size, SEED); let send_task = async { @@ -530,7 +496,7 @@ fn run_echo(args: EchoArgs) { assert_eq!(data[..], msg[..], "Data mismatch"); } - new_conn.connection.close(0u32.into(), b"done"); + new_conn.close(0u32.into(), b"done"); client.wait_idle().await; }); handle @@ -653,7 +619,7 @@ async fn rebind_recv() { let connected_send = Arc::new(tokio::sync::Notify::new()); let connected_recv = connected_send.clone(); let server = tokio::spawn(async move { - let NewConnection { connection, .. } = incoming.next().await.unwrap().await.unwrap(); + let connection = incoming.next().await.unwrap().await.unwrap(); info!("got conn"); connected_send.notify_one(); write_recv.notified().await; @@ -662,9 +628,7 @@ async fn rebind_recv() { stream.finish().await.unwrap(); }); - let NewConnection { - mut uni_streams, .. - } = client + let connection = client .connect(server_addr, "localhost") .unwrap() .await @@ -676,7 +640,7 @@ async fn rebind_recv() { .unwrap(); info!("rebound"); write_send.notify_one(); - let stream = uni_streams.next().await.unwrap().unwrap(); + let stream = connection.accept_uni().await.unwrap(); assert_eq!(stream.read_to_end(MSG.len()).await.unwrap(), MSG); server.await.unwrap(); } diff --git a/quinn/tests/many_connections.rs b/quinn/tests/many_connections.rs index f52dffb408..5e5b434b73 100644 --- a/quinn/tests/many_connections.rs +++ b/quinn/tests/many_connections.rs @@ -39,14 +39,12 @@ fn connect_n_nodes_to_1_and_send_1mb_data() { let shared2 = shared.clone(); let read_incoming_data = async move { for _ in 0..expected_messages { - let new_conn = incoming_conns.next().await.unwrap().await.unwrap(); - let conn = new_conn.connection; - let mut uni_streams = new_conn.uni_streams; + let conn = incoming_conns.next().await.unwrap().await.unwrap(); let shared = shared2.clone(); let task = async move { - while let Some(stream) = uni_streams.next().await { - read_from_peer(stream?).await?; + while let Ok(stream) = conn.accept_uni().await { + read_from_peer(stream).await?; conn.close(0u32.into(), &[]); } Ok(()) @@ -69,8 +67,8 @@ fn connect_n_nodes_to_1_and_send_1mb_data() { .connect_with(client_cfg.clone(), listener_addr, "localhost") .unwrap(); let task = async move { - let new_conn = connecting.await.map_err(WriteError::ConnectionLost)?; - write_to_peer(new_conn.connection, data).await?; + let conn = connecting.await.map_err(WriteError::ConnectionLost)?; + write_to_peer(conn, data).await?; Ok(()) }; runtime.spawn(async move {