Skip to content

Commit

Permalink
async-std support
Browse files Browse the repository at this point in the history
  • Loading branch information
yuyuyureka committed May 4, 2022
1 parent 7ef7a5c commit c12d09c
Show file tree
Hide file tree
Showing 19 changed files with 441 additions and 65 deletions.
3 changes: 2 additions & 1 deletion perf/src/bin/perf_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{

use anyhow::{Context, Result};
use bytes::Bytes;
use quinn::TokioRuntime;
use structopt::StructOpt;
use tokio::sync::Semaphore;
use tracing::{debug, error, info};
Expand Down Expand Up @@ -103,7 +104,7 @@ async fn run(opt: Opt) -> Result<()> {

let socket = bind_socket(bind_addr, opt.send_buffer_size, opt.recv_buffer_size)?;

let (endpoint, _) = quinn::Endpoint::new(Default::default(), None, socket)?;
let (endpoint, _) = quinn::Endpoint::new(Default::default(), None, socket, TokioRuntime)?;

let mut crypto = rustls::ClientConfig::builder()
.with_cipher_suites(perf::PERF_CIPHER_SUITES)
Expand Down
11 changes: 8 additions & 3 deletions perf/src/bin/perf_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::{fs, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};

use anyhow::{Context, Result};
use bytes::Bytes;
use quinn::TokioRuntime;
use structopt::StructOpt;
use tracing::{debug, error, info};

Expand Down Expand Up @@ -77,9 +78,13 @@ async fn run(opt: Opt) -> Result<()> {

let socket = bind_socket(opt.listen, opt.send_buffer_size, opt.recv_buffer_size)?;

let (endpoint, mut incoming) =
quinn::Endpoint::new(Default::default(), Some(server_config), socket)
.context("creating endpoint")?;
let (endpoint, mut incoming) = quinn::Endpoint::new(
Default::default(),
Some(server_config),
socket,
TokioRuntime,
)
.context("creating endpoint")?;

info!("listening on {}", endpoint.local_addr().unwrap());

Expand Down
7 changes: 6 additions & 1 deletion quinn-udp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,9 @@ libc = "0.2.69"
proto = { package = "quinn-proto", path = "../quinn-proto", version = "0.8", default-features = false }
socket2 = "0.4"
tracing = "0.1.10"
tokio = { version = "1.0.1", features = ["net"] }
tokio = { version = "1.0.1", features = [ "net" ], optional = true }
async-io = { version = "1.6", optional = true }

[features]
runtime-tokio = [ "tokio" ]
runtime-async-std = [ "async-io" ]
52 changes: 40 additions & 12 deletions quinn-udp/src/fallback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use std::{
time::Instant,
};

use crate::runtime::AsyncWrappedUdpSocket;
use proto::Transmit;
use tokio::io::ReadBuf;

use super::{log_sendmsg_error, RecvMeta, UdpState, IO_ERROR_LOG_INTERVAL};

Expand All @@ -16,16 +16,15 @@ use super::{log_sendmsg_error, RecvMeta, UdpState, IO_ERROR_LOG_INTERVAL};
/// platforms.
#[derive(Debug)]
pub struct UdpSocket {
io: tokio::net::UdpSocket,
io: Box<dyn AsyncWrappedUdpSocket>,
last_send_error: Instant,
}

impl UdpSocket {
pub fn from_std(socket: std::net::UdpSocket) -> io::Result<UdpSocket> {
socket.set_nonblocking(true)?;
pub fn new(socket: Box<dyn AsyncWrappedUdpSocket>) -> io::Result<UdpSocket> {
let now = Instant::now();
Ok(UdpSocket {
io: tokio::net::UdpSocket::from_std(socket)?,
io: socket,
last_send_error: now.checked_sub(2 * IO_ERROR_LOG_INTERVAL).unwrap_or(now),
})
}
Expand All @@ -38,10 +37,28 @@ impl UdpSocket {
) -> Poll<Result<usize, io::Error>> {
let mut sent = 0;
for transmit in transmits {
match self
.io
.poll_send_to(cx, &transmit.contents, transmit.destination)
{
let io_res = loop {
let poll_res = self.io.poll_write_ready(cx);

break match poll_res {
Poll::Ready(Ok(())) => Poll::Ready(
match self
.io
.try_send_to(&transmit.contents, transmit.destination)
{
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
self.io.clear_write_ready(cx);
continue; // try again
}
res => res,
},
),
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Pending => Poll::Pending,
};
};

match io_res {
Poll::Ready(Ok(_)) => {
sent += 1;
}
Expand Down Expand Up @@ -75,10 +92,21 @@ impl UdpSocket {
meta: &mut [RecvMeta],
) -> Poll<io::Result<usize>> {
debug_assert!(!bufs.is_empty());
let mut buf = ReadBuf::new(&mut bufs[0]);
let addr = ready!(self.io.poll_recv_from(cx, &mut buf))?;

let (len, addr) = loop {
ready!(self.io.poll_read_ready(cx))?;

break match self.io.try_recv_from(&mut bufs[0]) {
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
self.io.clear_read_ready(cx);
continue; // try again
}
res => res?,
};
};

meta[0] = RecvMeta {
len: buf.filled().len(),
len,
addr,
ecn: None,
dst_ip: None,
Expand Down
2 changes: 2 additions & 0 deletions quinn-udp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ mod imp;
#[path = "fallback.rs"]
mod imp;

pub mod runtime;

pub use imp::UdpSocket;

/// Number of UDP packets to send/receive at a time
Expand Down
51 changes: 51 additions & 0 deletions quinn-udp/src/runtime/async_std_runtime.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
use super::{AsyncWrappedUdpSocket, Runtime};
use async_io::Async;
use std::io;
use std::task::{Context, Poll};

impl AsyncWrappedUdpSocket for Async<std::net::UdpSocket> {
fn poll_read_ready(&self, cx: &mut Context) -> Poll<io::Result<()>> {
Async::poll_readable(self, cx)
}

fn poll_write_ready(&self, cx: &mut Context) -> Poll<io::Result<()>> {
Async::poll_writable(self, cx)
}

fn clear_read_ready(&self, _cx: &mut Context) {
// async-std doesn't need this
}

fn clear_write_ready(&self, _cx: &mut Context) {
// async-std doesn't need this
}

fn try_recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, std::net::SocketAddr)> {
self.get_ref().recv_from(buf)
}

fn try_send_to(&self, buf: &[u8], target: std::net::SocketAddr) -> io::Result<usize> {
self.get_ref().send_to(buf, target)
}

fn local_addr(&self) -> io::Result<std::net::SocketAddr> {
self.get_ref().local_addr()
}

#[cfg(unix)]
fn get_ref(&self) -> &std::net::UdpSocket {
Async::get_ref(self)
}
}

#[derive(Clone, Debug)]
pub struct AsyncStdRuntime;

impl Runtime for AsyncStdRuntime {
fn wrap_udp_socket(
&self,
t: std::net::UdpSocket,
) -> io::Result<Box<dyn AsyncWrappedUdpSocket>> {
Ok(Box::new(Async::new(t)?))
}
}
39 changes: 39 additions & 0 deletions quinn-udp/src/runtime/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#[cfg(feature = "runtime-tokio")]
mod tokio_runtime;
#[cfg(feature = "runtime-tokio")]
pub use tokio_runtime::*;

#[cfg(feature = "runtime-async-std")]
mod async_std_runtime;
#[cfg(feature = "runtime-async-std")]
pub use async_std_runtime::*;

use std::fmt::Debug;
use std::io;
use std::task::{Context, Poll};

pub trait AsyncWrappedUdpSocket: Send + Debug {
fn poll_read_ready(&self, cx: &mut Context) -> Poll<io::Result<()>>;

fn poll_write_ready(&self, cx: &mut Context) -> Poll<io::Result<()>>;

fn clear_read_ready(&self, cx: &mut Context);

fn clear_write_ready(&self, cx: &mut Context);

fn try_recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, std::net::SocketAddr)>;

fn try_send_to(&self, buf: &[u8], target: std::net::SocketAddr) -> io::Result<usize>;

fn local_addr(&self) -> io::Result<std::net::SocketAddr>;

// On Unix we expect to be able to access the underlying std UdpSocket
// to be able to implement more advanced features
#[cfg(unix)]
fn get_ref(&self) -> &std::net::UdpSocket;
}

pub trait Runtime: Send + Sync + Debug + 'static {
fn wrap_udp_socket(&self, t: std::net::UdpSocket)
-> io::Result<Box<dyn AsyncWrappedUdpSocket>>;
}
99 changes: 99 additions & 0 deletions quinn-udp/src/runtime/tokio_runtime.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
use super::{AsyncWrappedUdpSocket, Runtime};
use std::io;
use std::task::{Context, Poll};
#[cfg(unix)]
use tokio::io::unix::AsyncFd;

#[cfg(unix)]
impl AsyncWrappedUdpSocket for AsyncFd<std::net::UdpSocket> {
fn poll_read_ready(&self, cx: &mut Context) -> Poll<io::Result<()>> {
AsyncFd::poll_read_ready(self, cx).map(|x| x.map(|_| ()))
}

fn poll_write_ready(&self, cx: &mut Context) -> Poll<io::Result<()>> {
AsyncFd::poll_write_ready(self, cx).map(|x| x.map(|_| ()))
}

fn clear_read_ready(&self, cx: &mut Context) {
match self.poll_read_ready(cx) {
Poll::Pending => {}
Poll::Ready(Err(_)) => {}
Poll::Ready(Ok(mut guard)) => guard.clear_ready(),
}
}

fn clear_write_ready(&self, cx: &mut Context) {
match self.poll_write_ready(cx) {
Poll::Pending => {}
Poll::Ready(Err(_)) => {}
Poll::Ready(Ok(mut guard)) => guard.clear_ready(),
}
}

fn try_recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, std::net::SocketAddr)> {
self.get_ref().recv_from(buf)
}

fn try_send_to(&self, buf: &[u8], target: std::net::SocketAddr) -> io::Result<usize> {
self.get_ref().send_to(buf, target)
}

fn local_addr(&self) -> io::Result<std::net::SocketAddr> {
self.get_ref().local_addr()
}

fn get_ref(&self) -> &std::net::UdpSocket {
AsyncFd::get_ref(self)
}
}

#[cfg(not(unix))]
impl AsyncWrappedUdpSocket for tokio::net::UdpSocket {
fn poll_read_ready(&self, cx: &mut Context) -> Poll<io::Result<()>> {
tokio::net::UdpSocket::poll_recv_ready(self, cx)
}

fn poll_write_ready(&self, cx: &mut Context) -> Poll<io::Result<()>> {
tokio::net::UdpSocket::poll_send_ready(self, cx)
}

fn clear_read_ready(&self, _cx: &mut Context) {
// not necessary because tokio::net::UdpSocket::try_recv_from already uses try_io
}

fn clear_write_ready(&self, _cx: &mut Context) {
// not necessary because tokio::net::UdpSocket::try_send_from already uses try_io
}

fn try_recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, std::net::SocketAddr)> {
tokio::net::UdpSocket::try_recv_from(self, buf)
}

fn try_send_to(&self, buf: &[u8], target: std::net::SocketAddr) -> io::Result<usize> {
tokio::net::UdpSocket::try_send_to(self, buf, target)
}

fn local_addr(&self) -> io::Result<std::net::SocketAddr> {
tokio::net::UdpSocket::local_addr(self)
}
}

#[derive(Debug, Clone)]
pub struct TokioRuntime;

impl Runtime for TokioRuntime {
fn wrap_udp_socket(
&self,
t: std::net::UdpSocket,
) -> io::Result<Box<dyn AsyncWrappedUdpSocket>> {
t.set_nonblocking(true)?;
#[cfg(unix)]
{
Ok(Box::new(AsyncFd::new(t)?))
}
#[cfg(not(unix))]
{
Ok(Box::new(tokio::net::UdpSocket::from_std(t)?))
}
}
}
Loading

0 comments on commit c12d09c

Please sign in to comment.