Skip to content

Commit

Permalink
Introduce Connection::{accept_{uni, bi}, read_datagram}
Browse files Browse the repository at this point in the history
Provides a more natural alternative to the streams in NewConnection
  • Loading branch information
Ralith committed May 12, 2022
1 parent 4fb1de8 commit 1775281
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 2 deletions.
116 changes: 116 additions & 0 deletions quinn/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use udp::UdpState;

use crate::{
mutex::Mutex,
notify,
notify::NotifyOwned,
poll_fn,
recv_stream::RecvStream,
send_stream::{SendStream, WriteError},
Expand Down Expand Up @@ -364,6 +366,30 @@ impl Connection {
}
}

/// Accept the next incoming uni-directional stream
pub fn accept_uni(&self) -> AcceptUni {
AcceptUni {
conn: Some(self.0.clone()),
notify: self.0.lock("accept_uni").stream_incoming[Dir::Uni as usize].wait(),
}
}

/// Accept the next incoming bidirectional stream
pub fn accept_bi(&self) -> AcceptBi {
AcceptBi {
conn: Some(self.0.clone()),
notify: self.0.lock("accept_bi").stream_incoming[Dir::Bi as usize].wait(),
}
}

/// Receive an application datagram
pub fn read_datagram(&self) -> ReadDatagram {
ReadDatagram {
conn: self.0.clone(),
notify: self.0.lock("read_datagram").datagrams.wait(),
}
}

/// Close the connection immediately.
///
/// Pending operations will fail immediately with [`ConnectionError::LocallyClosed`]. Delivery
Expand Down Expand Up @@ -654,6 +680,86 @@ impl futures_core::Stream for IncomingBiStreams {
}
}

/// Future produced by [`Connection::accept_uni`]
pub struct AcceptUni {
conn: Option<ConnectionRef>,
notify: notify::Waiter,
}

impl Future for AcceptUni {
type Output = Result<RecvStream, ConnectionError>;
fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
let this = &mut *self;
let (conn, id, is_0rtt) =
ready!(poll_accept(ctx, &mut this.conn, &mut this.notify, Dir::Uni))?;
Poll::Ready(Ok(RecvStream::new(conn, id, is_0rtt)))
}
}

/// Future produced by [`Connection::accept_bi`]
pub struct AcceptBi {
conn: Option<ConnectionRef>,
notify: notify::Waiter,
}

impl Future for AcceptBi {
type Output = Result<(SendStream, RecvStream), ConnectionError>;
fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
let this = &mut *self;
let (conn, id, is_0rtt) =
ready!(poll_accept(ctx, &mut this.conn, &mut this.notify, Dir::Bi))?;
Poll::Ready(Ok((
SendStream::new(conn.clone(), id, is_0rtt),
RecvStream::new(conn, id, is_0rtt),
)))
}
}

fn poll_accept(
ctx: &mut Context<'_>,
conn_storage: &mut Option<ConnectionRef>,
notify: &mut notify::Waiter,
dir: Dir,
) -> Poll<Result<(ConnectionRef, StreamId, bool), ConnectionError>> {
let mut conn = conn_storage.as_ref().unwrap().lock("poll_accept");
if let Some(id) = conn.inner.streams().accept(dir) {
let is_0rtt = conn.inner.is_handshaking();
conn.wake(); // To send additional stream ID credit
drop(conn); // Release the borrow so it can be passed to `RecvStream`
let conn = conn_storage.take().expect("polled after completion");
Poll::Ready(Ok((conn, id, is_0rtt)))
} else if let Some(ref e) = conn.error {
Poll::Ready(Err(e.clone()))
} else {
// `conn` lock ensures we don't race with readiness
notify.register(ctx);
Poll::Pending
}
}

/// Future produced by [`Connection::read_datagram`]
pub struct ReadDatagram {
conn: ConnectionRef,
notify: notify::Waiter,
}

impl Future for ReadDatagram {
type Output = Result<Bytes, ConnectionError>;
fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
let this = &mut *self;
let mut conn = this.conn.lock("ReadDatagram::poll");
if let Some(x) = conn.inner.datagrams().recv() {
Poll::Ready(Ok(x))
} else if let Some(ref e) = conn.error {
Poll::Ready(Err(e.clone()))
} else {
// `conn` lock ensures we don't race with readiness
this.notify.register(ctx);
Poll::Pending
}
}
}

/// Stream of unordered, unreliable datagrams sent by the peer
#[derive(Debug)]
pub struct Datagrams(ConnectionRef);
Expand Down Expand Up @@ -716,8 +822,10 @@ impl ConnectionRef {
blocked_readers: FxHashMap::default(),
stream_opening: [Arc::new(Notify::new()), Arc::new(Notify::new())],
incoming_uni_streams_reader: None,
stream_incoming: [NotifyOwned::new(), NotifyOwned::new()],
incoming_bi_streams_reader: None,
datagram_reader: None,
datagrams: NotifyOwned::new(),
finishing: FxHashMap::default(),
stopped: FxHashMap::default(),
error: None,
Expand Down Expand Up @@ -777,7 +885,9 @@ pub struct ConnectionInner {
stream_opening: [Arc<Notify>; 2],
incoming_uni_streams_reader: Option<Waker>,
incoming_bi_streams_reader: Option<Waker>,
stream_incoming: [NotifyOwned; 2],
datagram_reader: Option<Waker>,
datagrams: NotifyOwned,
pub(crate) finishing: FxHashMap<StreamId, oneshot::Sender<Option<WriteError>>>,
pub(crate) stopped: FxHashMap<StreamId, Waker>,
/// Always set to Some before the connection becomes drained
Expand Down Expand Up @@ -877,16 +987,19 @@ impl ConnectionInner {
}
}
Stream(StreamEvent::Opened { dir: Dir::Uni }) => {
self.stream_incoming[Dir::Uni as usize].notify_all();
if let Some(x) = self.incoming_uni_streams_reader.take() {
x.wake();
}
}
Stream(StreamEvent::Opened { dir: Dir::Bi }) => {
self.stream_incoming[Dir::Bi as usize].notify_all();
if let Some(x) = self.incoming_bi_streams_reader.take() {
x.wake();
}
}
DatagramReceived => {
self.datagrams.notify_all();
if let Some(x) = self.datagram_reader.take() {
x.wake();
}
Expand Down Expand Up @@ -991,12 +1104,15 @@ impl ConnectionInner {
}
self.stream_opening[Dir::Uni as usize].notify_waiters();
self.stream_opening[Dir::Bi as usize].notify_waiters();
self.stream_incoming[Dir::Uni as usize].notify_all();
self.stream_incoming[Dir::Bi as usize].notify_all();
if let Some(x) = self.incoming_uni_streams_reader.take() {
x.wake();
}
if let Some(x) = self.incoming_bi_streams_reader.take() {
x.wake();
}
self.datagrams.notify_all();
if let Some(x) = self.datagram_reader.take() {
x.wake();
}
Expand Down
4 changes: 2 additions & 2 deletions quinn/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ pub use proto::{
};

pub use crate::connection::{
Connecting, Connection, Datagrams, IncomingBiStreams, IncomingUniStreams, NewConnection,
SendDatagramError, UnknownStream, ZeroRttAccepted,
AcceptBi, AcceptUni, Connecting, Connection, Datagrams, IncomingBiStreams, IncomingUniStreams,
NewConnection, ReadDatagram, SendDatagramError, UnknownStream, ZeroRttAccepted,
};
pub use crate::endpoint::{Endpoint, Incoming};
pub use crate::recv_stream::{ReadError, ReadExactError, ReadToEndError, RecvStream};
Expand Down

0 comments on commit 1775281

Please sign in to comment.