Skip to content

Commit

Permalink
Revert Connection::open_{uni, bi} to named 'static futures
Browse files Browse the repository at this point in the history
  • Loading branch information
Ralith committed May 12, 2022
1 parent d19ffc3 commit 5b2e266
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 39 deletions.
110 changes: 72 additions & 38 deletions quinn/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use bytes::Bytes;
use proto::{ConnectionError, ConnectionHandle, ConnectionStats, Dir, StreamEvent, StreamId};
use rustc_hash::FxHashMap;
use thiserror::Error;
use tokio::sync::{mpsc, oneshot, Notify};
use tokio::sync::{mpsc, oneshot};
use tokio::time::{sleep_until, Instant as TokioInstant, Sleep};
use tracing::debug_span;
use udp::UdpState;
Expand Down Expand Up @@ -324,45 +324,22 @@ impl Connection {
/// Streams are cheap and instantaneous to open unless blocked by flow control. As a
/// consequence, the peer won't be notified that a stream has been opened until the stream is
/// actually used.
pub async fn open_uni(&self) -> Result<SendStream, ConnectionError> {
let (id, is_0rtt) = self.open(Dir::Uni).await?;
Ok(SendStream::new(self.0.clone(), id, is_0rtt))
pub fn open_uni(&self) -> OpenUni {
OpenUni {
conn: Some(self.0.clone()),
notify: self.0.lock("open_uni").stream_opening[Dir::Uni as usize].wait(),
}
}

/// Initiate a new outgoing bidirectional stream.
///
/// Streams are cheap and instantaneous to open unless blocked by flow control. As a
/// consequence, the peer won't be notified that a stream has been opened until the stream is
/// actually used.
pub async fn open_bi(&self) -> Result<(SendStream, RecvStream), ConnectionError> {
let (id, is_0rtt) = self.open(Dir::Bi).await?;
Ok((
SendStream::new(self.0.clone(), id, is_0rtt),
RecvStream::new(self.0.clone(), id, is_0rtt),
))
}

async fn open(&self, dir: Dir) -> Result<(StreamId, bool), ConnectionError> {
loop {
let opening;
{
let mut conn = self.0.lock("open");
if let Some(ref e) = conn.error {
return Err(e.clone());
}
if let Some(id) = conn.inner.streams().open(dir) {
let is_0rtt = conn.inner.side().is_client() && conn.inner.is_handshaking();
return Ok((id, is_0rtt));
}
// Clone the `Arc<Notify>` so we can wait on the underlying `Notify` without holding
// the lock. Store it in the outer scope to ensure it outlives the lock guard.
opening = conn.stream_opening[dir as usize].clone();
// Construct the future while the lock is held to ensure we can't miss a wakeup if
// the `Notify` is signaled immediately after we release the lock. `await` it after
// the lock guard is out of scope.
opening.notified()
}
.await
pub fn open_bi(&self) -> OpenBi {
OpenBi {
conn: Some(self.0.clone()),
notify: self.0.lock("open_bi").stream_opening[Dir::Bi as usize].wait(),
}
}

Expand Down Expand Up @@ -590,6 +567,63 @@ impl Clone for Connection {
}
}

/// Future produced by [`Connection::open_uni`]
pub struct OpenUni {
conn: Option<ConnectionRef>,
notify: notify::Waiter,
}

impl Future for OpenUni {
type Output = Result<SendStream, ConnectionError>;
fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
let this = &mut *self;
let (conn, id, is_0rtt) =
ready!(poll_open(ctx, &mut this.conn, &mut this.notify, Dir::Uni)?);
Poll::Ready(Ok(SendStream::new(conn, id, is_0rtt)))
}
}

/// Future produced by [`Connection::open_bi`]
pub struct OpenBi {
conn: Option<ConnectionRef>,
notify: notify::Waiter,
}

impl Future for OpenBi {
type Output = Result<(SendStream, RecvStream), ConnectionError>;
fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
let this = &mut *self;
let (conn, id, is_0rtt) =
ready!(poll_open(ctx, &mut this.conn, &mut this.notify, Dir::Bi)?);

Poll::Ready(Ok((
SendStream::new(conn.clone(), id, is_0rtt),
RecvStream::new(conn, id, is_0rtt),
)))
}
}

fn poll_open(
ctx: &mut Context<'_>,
conn_storage: &mut Option<ConnectionRef>,
notify: &mut notify::Waiter,
dir: Dir,
) -> Poll<Result<(ConnectionRef, StreamId, bool), ConnectionError>> {
let mut conn = conn_storage.as_ref().unwrap().lock("poll_open");
if let Some(ref e) = conn.error {
Poll::Ready(Err(e.clone()))
} else if let Some(id) = conn.inner.streams().open(dir) {
let is_0rtt = conn.inner.side().is_client() && conn.inner.is_handshaking();
drop(conn); // Release the borrow so it can be passed to `RecvStream`
let conn = conn_storage.take().expect("polled after completion");
Poll::Ready(Ok((conn, id, is_0rtt)))
} else {
// `conn` lock ensures we don't race with readiness
notify.register(ctx);
Poll::Pending
}
}

/// A stream of unidirectional QUIC streams initiated by a remote peer.
///
/// Incoming streams are *always* opened in the same order that the peer created them, but data can
Expand Down Expand Up @@ -824,7 +858,7 @@ impl ConnectionRef {
endpoint_events,
blocked_writers: FxHashMap::default(),
blocked_readers: FxHashMap::default(),
stream_opening: [Arc::new(Notify::new()), Arc::new(Notify::new())],
stream_opening: [NotifyOwned::new(), NotifyOwned::new()],
incoming_uni_streams_reader: None,
stream_incoming: [NotifyOwned::new(), NotifyOwned::new()],
incoming_bi_streams_reader: None,
Expand Down Expand Up @@ -886,7 +920,7 @@ pub struct ConnectionInner {
endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
pub(crate) blocked_writers: FxHashMap<StreamId, Waker>,
pub(crate) blocked_readers: FxHashMap<StreamId, Waker>,
stream_opening: [Arc<Notify>; 2],
stream_opening: [NotifyOwned; 2],
incoming_uni_streams_reader: Option<Waker>,
incoming_bi_streams_reader: Option<Waker>,
stream_incoming: [NotifyOwned; 2],
Expand Down Expand Up @@ -1014,7 +1048,7 @@ impl ConnectionInner {
}
}
Stream(StreamEvent::Available { dir }) => {
self.stream_opening[dir as usize].notify_one();
self.stream_opening[dir as usize].notify_all();
}
Stream(StreamEvent::Finished { id }) => {
if let Some(finishing) = self.finishing.remove(&id) {
Expand Down Expand Up @@ -1106,8 +1140,8 @@ impl ConnectionInner {
for (_, reader) in self.blocked_readers.drain() {
reader.wake()
}
self.stream_opening[Dir::Uni as usize].notify_waiters();
self.stream_opening[Dir::Bi as usize].notify_waiters();
self.stream_opening[Dir::Uni as usize].notify_all();
self.stream_opening[Dir::Bi as usize].notify_all();
self.stream_incoming[Dir::Uni as usize].notify_all();
self.stream_incoming[Dir::Bi as usize].notify_all();
if let Some(x) = self.incoming_uni_streams_reader.take() {
Expand Down
3 changes: 2 additions & 1 deletion quinn/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ pub use proto::{

pub use crate::connection::{
AcceptBi, AcceptUni, Connecting, Connection, Datagrams, IncomingBiStreams, IncomingUniStreams,
NewConnection, ReadDatagram, SendDatagramError, UnknownStream, ZeroRttAccepted,
NewConnection, OpenBi, OpenUni, ReadDatagram, SendDatagramError, UnknownStream,
ZeroRttAccepted,
};
pub use crate::endpoint::{Endpoint, Incoming};
pub use crate::recv_stream::{ReadError, ReadExactError, ReadToEndError, RecvStream};
Expand Down

0 comments on commit 5b2e266

Please sign in to comment.