Skip to content

Commit

Permalink
async-std support
Browse files Browse the repository at this point in the history
  • Loading branch information
yuyuyureka committed Apr 17, 2022
1 parent 7248b12 commit bd4ce30
Show file tree
Hide file tree
Showing 29 changed files with 1,045 additions and 215 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[workspace]
members = ["quinn", "quinn-proto", "quinn-udp", "bench", "perf", "fuzz"]
default-members = ["quinn", "quinn-proto", "quinn-udp", "bench", "perf"]
members = ["quinn", "quinn-proto", "quinn-udp", "bench", "perf", "fuzz", "quinn-runtime"]
default-members = ["quinn", "quinn-proto", "quinn-udp", "bench", "perf", "quinn-runtime"]

[profile.bench]
debug = true
Expand Down
5 changes: 3 additions & 2 deletions bench/src/bin/bulk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{
};

use anyhow::{Context, Result};
use quinn::runtime::TokioRuntime;
use structopt::StructOpt;
use tokio::sync::Semaphore;
use tracing::{info, trace};
Expand Down Expand Up @@ -59,7 +60,7 @@ fn main() {
server_thread.join().expect("server thread");
}

async fn server(mut incoming: quinn::Incoming, opt: Opt) -> Result<()> {
async fn server(mut incoming: quinn::Incoming<TokioRuntime>, opt: Opt) -> Result<()> {
let mut server_tasks = Vec::new();

// Handle only the expected amount of clients
Expand Down Expand Up @@ -174,7 +175,7 @@ async fn client(
}

async fn handle_client_stream(
connection: Arc<quinn::Connection>,
connection: Arc<quinn::Connection<TokioRuntime>>,
upload_size: usize,
read_unordered: bool,
) -> Result<(TransferResult, TransferResult)> {
Expand Down
18 changes: 14 additions & 4 deletions bench/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{

use anyhow::{Context, Result};
use bytes::Bytes;
use quinn::runtime::TokioRuntime;
use rustls::RootCertStore;
use structopt::StructOpt;
use tokio::runtime::{Builder, Runtime};
Expand All @@ -30,7 +31,7 @@ pub fn server_endpoint(
cert: rustls::Certificate,
key: rustls::PrivateKey,
opt: &Opt,
) -> (SocketAddr, quinn::Incoming) {
) -> (SocketAddr, quinn::Incoming<TokioRuntime>) {
let cert_chain = vec![cert];
let mut server_config = quinn::ServerConfig::with_single_cert(cert_chain, key).unwrap();
server_config.transport = Arc::new(transport_config(opt));
Expand All @@ -53,7 +54,10 @@ pub async fn connect_client(
server_addr: SocketAddr,
server_cert: rustls::Certificate,
opt: Opt,
) -> Result<(quinn::Endpoint, quinn::Connection)> {
) -> Result<(
quinn::Endpoint<TokioRuntime>,
quinn::Connection<TokioRuntime>,
)> {
let endpoint =
quinn::Endpoint::client(SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 0)).unwrap();

Expand All @@ -80,7 +84,10 @@ pub async fn connect_client(
Ok((endpoint, connection))
}

pub async fn drain_stream(stream: &mut quinn::RecvStream, read_unordered: bool) -> Result<usize> {
pub async fn drain_stream(
stream: &mut quinn::RecvStream<TokioRuntime>,
read_unordered: bool,
) -> Result<usize> {
let mut read = 0;

if read_unordered {
Expand Down Expand Up @@ -109,7 +116,10 @@ pub async fn drain_stream(stream: &mut quinn::RecvStream, read_unordered: bool)
Ok(read)
}

pub async fn send_data_on_stream(stream: &mut quinn::SendStream, stream_size: usize) -> Result<()> {
pub async fn send_data_on_stream(
stream: &mut quinn::SendStream<TokioRuntime>,
stream_size: usize,
) -> Result<()> {
const DATA: &[u8] = &[0xAB; 1024 * 1024];
let bytes_data = Bytes::from_static(DATA);

Expand Down
17 changes: 9 additions & 8 deletions perf/src/bin/perf_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{

use anyhow::{Context, Result};
use bytes::Bytes;
use quinn::runtime::TokioRuntime;
use structopt::StructOpt;
use tokio::sync::Semaphore;
use tracing::{debug, error, info};
Expand Down Expand Up @@ -195,7 +196,7 @@ async fn run(opt: Opt) -> Result<()> {
}

async fn drain_stream(
mut stream: quinn::RecvStream,
mut stream: quinn::RecvStream<TokioRuntime>,
download: u64,
stream_stats: OpenStreamStats,
) -> Result<()> {
Expand Down Expand Up @@ -234,7 +235,7 @@ async fn drain_stream(
}

async fn drive_uni(
connection: quinn::Connection,
connection: quinn::Connection<TokioRuntime>,
acceptor: UniAcceptor,
stream_stats: OpenStreamStats,
concurrency: u64,
Expand All @@ -261,7 +262,7 @@ async fn drive_uni(
}

async fn request_uni(
send: quinn::SendStream,
send: quinn::SendStream<TokioRuntime>,
acceptor: UniAcceptor,
upload: u64,
download: u64,
Expand All @@ -280,7 +281,7 @@ async fn request_uni(
}

async fn request(
mut send: quinn::SendStream,
mut send: quinn::SendStream<TokioRuntime>,
mut upload: u64,
download: u64,
stream_stats: OpenStreamStats,
Expand Down Expand Up @@ -311,7 +312,7 @@ async fn request(
}

async fn drive_bi(
connection: quinn::Connection,
connection: quinn::Connection<TokioRuntime>,
stream_stats: OpenStreamStats,
concurrency: u64,
upload: u64,
Expand All @@ -336,8 +337,8 @@ async fn drive_bi(
}

async fn request_bi(
send: quinn::SendStream,
recv: quinn::RecvStream,
send: quinn::SendStream<TokioRuntime>,
recv: quinn::RecvStream<TokioRuntime>,
upload: u64,
download: u64,
stream_stats: OpenStreamStats,
Expand All @@ -348,7 +349,7 @@ async fn request_bi(
}

#[derive(Clone)]
struct UniAcceptor(Arc<tokio::sync::Mutex<quinn::IncomingUniStreams>>);
struct UniAcceptor(Arc<tokio::sync::Mutex<quinn::IncomingUniStreams<TokioRuntime>>>);

struct SkipServerVerification;

Expand Down
27 changes: 17 additions & 10 deletions perf/src/bin/perf_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::{fs, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};

use anyhow::{Context, Result};
use bytes::Bytes;
use quinn::runtime::TokioRuntime;
use structopt::StructOpt;
use tracing::{debug, error, info};

Expand Down Expand Up @@ -97,7 +98,7 @@ async fn run(opt: Opt) -> Result<()> {
Ok(())
}

async fn handle(handshake: quinn::Connecting, opt: Arc<Opt>) -> Result<()> {
async fn handle(handshake: quinn::Connecting<TokioRuntime>, opt: Arc<Opt>) -> Result<()> {
let quinn::NewConnection {
uni_streams,
bi_streams,
Expand All @@ -113,7 +114,7 @@ async fn handle(handshake: quinn::Connecting, opt: Arc<Opt>) -> Result<()> {
Ok(())
}

async fn conn_stats(connection: quinn::Connection, opt: Arc<Opt>) -> Result<()> {
async fn conn_stats(connection: quinn::Connection<TokioRuntime>, opt: Arc<Opt>) -> Result<()> {
if opt.conn_stats {
loop {
tokio::time::sleep(Duration::from_secs(2)).await;
Expand All @@ -125,8 +126,8 @@ async fn conn_stats(connection: quinn::Connection, opt: Arc<Opt>) -> Result<()>
}

async fn drive_uni(
connection: quinn::Connection,
mut streams: quinn::IncomingUniStreams,
connection: quinn::Connection<TokioRuntime>,
mut streams: quinn::IncomingUniStreams<TokioRuntime>,
) -> Result<()> {
while let Some(stream) = streams.next().await {
let stream = stream?;
Expand All @@ -140,14 +141,17 @@ async fn drive_uni(
Ok(())
}

async fn handle_uni(connection: quinn::Connection, stream: quinn::RecvStream) -> Result<()> {
async fn handle_uni(
connection: quinn::Connection<TokioRuntime>,
stream: quinn::RecvStream<TokioRuntime>,
) -> Result<()> {
let bytes = read_req(stream).await?;
let response = connection.open_uni().await?;
respond(bytes, response).await?;
Ok(())
}

async fn drive_bi(mut streams: quinn::IncomingBiStreams) -> Result<()> {
async fn drive_bi(mut streams: quinn::IncomingBiStreams<TokioRuntime>) -> Result<()> {
while let Some(stream) = streams.next().await {
let (send, recv) = stream?;
tokio::spawn(async move {
Expand All @@ -159,13 +163,16 @@ async fn drive_bi(mut streams: quinn::IncomingBiStreams) -> Result<()> {
Ok(())
}

async fn handle_bi(send: quinn::SendStream, recv: quinn::RecvStream) -> Result<()> {
async fn handle_bi(
send: quinn::SendStream<TokioRuntime>,
recv: quinn::RecvStream<TokioRuntime>,
) -> Result<()> {
let bytes = read_req(recv).await?;
respond(bytes, send).await?;
Ok(())
}

async fn read_req(mut stream: quinn::RecvStream) -> Result<u64> {
async fn read_req(mut stream: quinn::RecvStream<TokioRuntime>) -> Result<u64> {
let mut buf = [0; 8];
stream
.read_exact(&mut buf)
Expand All @@ -177,7 +184,7 @@ async fn read_req(mut stream: quinn::RecvStream) -> Result<u64> {
Ok(n)
}

async fn drain_stream(mut stream: quinn::RecvStream) -> Result<()> {
async fn drain_stream(mut stream: quinn::RecvStream<TokioRuntime>) -> Result<()> {
#[rustfmt::skip]
let mut bufs = [
Bytes::new(), Bytes::new(), Bytes::new(), Bytes::new(),
Expand All @@ -194,7 +201,7 @@ async fn drain_stream(mut stream: quinn::RecvStream) -> Result<()> {
Ok(())
}

async fn respond(mut bytes: u64, mut stream: quinn::SendStream) -> Result<()> {
async fn respond(mut bytes: u64, mut stream: quinn::SendStream<TokioRuntime>) -> Result<()> {
const DATA: [u8; 1024 * 1024] = [42; 1024 * 1024];

while bytes > 0 {
Expand Down
13 changes: 11 additions & 2 deletions perf/src/stats.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use hdrhistogram::Histogram;
use quinn::runtime::Runtime;
use quinn::StreamId;
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
Expand Down Expand Up @@ -132,7 +133,11 @@ impl Stats {
pub struct OpenStreamStats(Arc<Mutex<Vec<Arc<StreamStats>>>>);

impl OpenStreamStats {
pub fn new_sender(&self, stream: &quinn::SendStream, upload_size: u64) -> Arc<StreamStats> {
pub fn new_sender<RT: Runtime>(
&self,
stream: &quinn::SendStream<RT>,
upload_size: u64,
) -> Arc<StreamStats> {
let send_stream_stats = StreamStats {
id: stream.id(),
request_size: upload_size,
Expand All @@ -147,7 +152,11 @@ impl OpenStreamStats {
send_stream_stats
}

pub fn new_receiver(&self, stream: &quinn::RecvStream, download_size: u64) -> Arc<StreamStats> {
pub fn new_receiver<RT: Runtime>(
&self,
stream: &quinn::RecvStream<RT>,
download_size: u64,
) -> Arc<StreamStats> {
let recv_stream_stats = StreamStats {
id: stream.id(),
request_size: download_size,
Expand Down
21 changes: 21 additions & 0 deletions quinn-runtime/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
[package]
name = "quinn-runtime"
version = "0.1.0"
license = "MIT OR Apache-2.0"
repository = "https://github.com/quinn-rs/quinn"
description = "Abstractions for writing async-runtime-independent code"
keywords = ["quic"]
categories = ["network-programming", "asynchronous"]
workspace = ".."
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
tokio = { version = "1.0.1", features = ["net", "time"], optional = true }
async-io = { version = "1.6", optional = true }
async-std = { version = "1.11", optional = true }

[features]
runtime-tokio = [ "tokio" ]
runtime-async-std = [ "async-io", "async-std" ]
Loading

0 comments on commit bd4ce30

Please sign in to comment.