Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

async-std support, second try #1345

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[workspace]
members = ["quinn", "quinn-proto", "quinn-udp", "bench", "perf", "fuzz"]
default-members = ["quinn", "quinn-proto", "quinn-udp", "bench", "perf"]
members = ["quinn", "quinn-proto", "quinn-udp", "bench", "perf", "fuzz", "quinn-runtime"]
default-members = ["quinn", "quinn-proto", "quinn-udp", "bench", "perf", "quinn-runtime"]

[profile.bench]
debug = true
Expand Down
5 changes: 3 additions & 2 deletions bench/src/bin/bulk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{
};

use anyhow::{Context, Result};
use quinn::runtime::TokioRuntime;
use structopt::StructOpt;
use tokio::sync::Semaphore;
use tracing::{info, trace};
Expand Down Expand Up @@ -59,7 +60,7 @@ fn main() {
server_thread.join().expect("server thread");
}

async fn server(mut incoming: quinn::Incoming, opt: Opt) -> Result<()> {
async fn server(mut incoming: quinn::Incoming<TokioRuntime>, opt: Opt) -> Result<()> {
let mut server_tasks = Vec::new();

// Handle only the expected amount of clients
Expand Down Expand Up @@ -174,7 +175,7 @@ async fn client(
}

async fn handle_client_stream(
connection: Arc<quinn::Connection>,
connection: Arc<quinn::Connection<TokioRuntime>>,
upload_size: usize,
read_unordered: bool,
) -> Result<(TransferResult, TransferResult)> {
Expand Down
18 changes: 14 additions & 4 deletions bench/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{

use anyhow::{Context, Result};
use bytes::Bytes;
use quinn::runtime::TokioRuntime;
use rustls::RootCertStore;
use structopt::StructOpt;
use tokio::runtime::{Builder, Runtime};
Expand All @@ -30,7 +31,7 @@ pub fn server_endpoint(
cert: rustls::Certificate,
key: rustls::PrivateKey,
opt: &Opt,
) -> (SocketAddr, quinn::Incoming) {
) -> (SocketAddr, quinn::Incoming<TokioRuntime>) {
let cert_chain = vec![cert];
let mut server_config = quinn::ServerConfig::with_single_cert(cert_chain, key).unwrap();
server_config.transport = Arc::new(transport_config(opt));
Expand All @@ -53,7 +54,10 @@ pub async fn connect_client(
server_addr: SocketAddr,
server_cert: rustls::Certificate,
opt: Opt,
) -> Result<(quinn::Endpoint, quinn::Connection)> {
) -> Result<(
quinn::Endpoint<TokioRuntime>,
quinn::Connection<TokioRuntime>,
)> {
let endpoint =
quinn::Endpoint::client(SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 0)).unwrap();

Expand All @@ -80,7 +84,10 @@ pub async fn connect_client(
Ok((endpoint, connection))
}

pub async fn drain_stream(stream: &mut quinn::RecvStream, read_unordered: bool) -> Result<usize> {
pub async fn drain_stream(
stream: &mut quinn::RecvStream<TokioRuntime>,
read_unordered: bool,
) -> Result<usize> {
let mut read = 0;

if read_unordered {
Expand Down Expand Up @@ -109,7 +116,10 @@ pub async fn drain_stream(stream: &mut quinn::RecvStream, read_unordered: bool)
Ok(read)
}

pub async fn send_data_on_stream(stream: &mut quinn::SendStream, stream_size: usize) -> Result<()> {
pub async fn send_data_on_stream(
stream: &mut quinn::SendStream<TokioRuntime>,
stream_size: usize,
) -> Result<()> {
const DATA: &[u8] = &[0xAB; 1024 * 1024];
let bytes_data = Bytes::from_static(DATA);

Expand Down
17 changes: 9 additions & 8 deletions perf/src/bin/perf_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{

use anyhow::{Context, Result};
use bytes::Bytes;
use quinn::runtime::TokioRuntime;
use structopt::StructOpt;
use tokio::sync::Semaphore;
use tracing::{debug, error, info};
Expand Down Expand Up @@ -195,7 +196,7 @@ async fn run(opt: Opt) -> Result<()> {
}

async fn drain_stream(
mut stream: quinn::RecvStream,
mut stream: quinn::RecvStream<TokioRuntime>,
download: u64,
stream_stats: OpenStreamStats,
) -> Result<()> {
Expand Down Expand Up @@ -234,7 +235,7 @@ async fn drain_stream(
}

async fn drive_uni(
connection: quinn::Connection,
connection: quinn::Connection<TokioRuntime>,
acceptor: UniAcceptor,
stream_stats: OpenStreamStats,
concurrency: u64,
Expand All @@ -261,7 +262,7 @@ async fn drive_uni(
}

async fn request_uni(
send: quinn::SendStream,
send: quinn::SendStream<TokioRuntime>,
acceptor: UniAcceptor,
upload: u64,
download: u64,
Expand All @@ -280,7 +281,7 @@ async fn request_uni(
}

async fn request(
mut send: quinn::SendStream,
mut send: quinn::SendStream<TokioRuntime>,
mut upload: u64,
download: u64,
stream_stats: OpenStreamStats,
Expand Down Expand Up @@ -311,7 +312,7 @@ async fn request(
}

async fn drive_bi(
connection: quinn::Connection,
connection: quinn::Connection<TokioRuntime>,
stream_stats: OpenStreamStats,
concurrency: u64,
upload: u64,
Expand All @@ -336,8 +337,8 @@ async fn drive_bi(
}

async fn request_bi(
send: quinn::SendStream,
recv: quinn::RecvStream,
send: quinn::SendStream<TokioRuntime>,
recv: quinn::RecvStream<TokioRuntime>,
upload: u64,
download: u64,
stream_stats: OpenStreamStats,
Expand All @@ -348,7 +349,7 @@ async fn request_bi(
}

#[derive(Clone)]
struct UniAcceptor(Arc<tokio::sync::Mutex<quinn::IncomingUniStreams>>);
struct UniAcceptor(Arc<tokio::sync::Mutex<quinn::IncomingUniStreams<TokioRuntime>>>);

struct SkipServerVerification;

Expand Down
27 changes: 17 additions & 10 deletions perf/src/bin/perf_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::{fs, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};

use anyhow::{Context, Result};
use bytes::Bytes;
use quinn::runtime::TokioRuntime;
use structopt::StructOpt;
use tracing::{debug, error, info};

Expand Down Expand Up @@ -97,7 +98,7 @@ async fn run(opt: Opt) -> Result<()> {
Ok(())
}

async fn handle(handshake: quinn::Connecting, opt: Arc<Opt>) -> Result<()> {
async fn handle(handshake: quinn::Connecting<TokioRuntime>, opt: Arc<Opt>) -> Result<()> {
let quinn::NewConnection {
uni_streams,
bi_streams,
Expand All @@ -113,7 +114,7 @@ async fn handle(handshake: quinn::Connecting, opt: Arc<Opt>) -> Result<()> {
Ok(())
}

async fn conn_stats(connection: quinn::Connection, opt: Arc<Opt>) -> Result<()> {
async fn conn_stats(connection: quinn::Connection<TokioRuntime>, opt: Arc<Opt>) -> Result<()> {
if opt.conn_stats {
loop {
tokio::time::sleep(Duration::from_secs(2)).await;
Expand All @@ -125,8 +126,8 @@ async fn conn_stats(connection: quinn::Connection, opt: Arc<Opt>) -> Result<()>
}

async fn drive_uni(
connection: quinn::Connection,
mut streams: quinn::IncomingUniStreams,
connection: quinn::Connection<TokioRuntime>,
mut streams: quinn::IncomingUniStreams<TokioRuntime>,
) -> Result<()> {
while let Some(stream) = streams.next().await {
let stream = stream?;
Expand All @@ -140,14 +141,17 @@ async fn drive_uni(
Ok(())
}

async fn handle_uni(connection: quinn::Connection, stream: quinn::RecvStream) -> Result<()> {
async fn handle_uni(
connection: quinn::Connection<TokioRuntime>,
stream: quinn::RecvStream<TokioRuntime>,
) -> Result<()> {
let bytes = read_req(stream).await?;
let response = connection.open_uni().await?;
respond(bytes, response).await?;
Ok(())
}

async fn drive_bi(mut streams: quinn::IncomingBiStreams) -> Result<()> {
async fn drive_bi(mut streams: quinn::IncomingBiStreams<TokioRuntime>) -> Result<()> {
while let Some(stream) = streams.next().await {
let (send, recv) = stream?;
tokio::spawn(async move {
Expand All @@ -159,13 +163,16 @@ async fn drive_bi(mut streams: quinn::IncomingBiStreams) -> Result<()> {
Ok(())
}

async fn handle_bi(send: quinn::SendStream, recv: quinn::RecvStream) -> Result<()> {
async fn handle_bi(
send: quinn::SendStream<TokioRuntime>,
recv: quinn::RecvStream<TokioRuntime>,
) -> Result<()> {
let bytes = read_req(recv).await?;
respond(bytes, send).await?;
Ok(())
}

async fn read_req(mut stream: quinn::RecvStream) -> Result<u64> {
async fn read_req(mut stream: quinn::RecvStream<TokioRuntime>) -> Result<u64> {
let mut buf = [0; 8];
stream
.read_exact(&mut buf)
Expand All @@ -177,7 +184,7 @@ async fn read_req(mut stream: quinn::RecvStream) -> Result<u64> {
Ok(n)
}

async fn drain_stream(mut stream: quinn::RecvStream) -> Result<()> {
async fn drain_stream(mut stream: quinn::RecvStream<TokioRuntime>) -> Result<()> {
#[rustfmt::skip]
let mut bufs = [
Bytes::new(), Bytes::new(), Bytes::new(), Bytes::new(),
Expand All @@ -194,7 +201,7 @@ async fn drain_stream(mut stream: quinn::RecvStream) -> Result<()> {
Ok(())
}

async fn respond(mut bytes: u64, mut stream: quinn::SendStream) -> Result<()> {
async fn respond(mut bytes: u64, mut stream: quinn::SendStream<TokioRuntime>) -> Result<()> {
const DATA: [u8; 1024 * 1024] = [42; 1024 * 1024];

while bytes > 0 {
Expand Down
13 changes: 11 additions & 2 deletions perf/src/stats.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use hdrhistogram::Histogram;
use quinn::runtime::Runtime;
use quinn::StreamId;
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
Expand Down Expand Up @@ -132,7 +133,11 @@ impl Stats {
pub struct OpenStreamStats(Arc<Mutex<Vec<Arc<StreamStats>>>>);

impl OpenStreamStats {
pub fn new_sender(&self, stream: &quinn::SendStream, upload_size: u64) -> Arc<StreamStats> {
pub fn new_sender<RT: Runtime>(
&self,
stream: &quinn::SendStream<RT>,
upload_size: u64,
) -> Arc<StreamStats> {
let send_stream_stats = StreamStats {
id: stream.id(),
request_size: upload_size,
Expand All @@ -147,7 +152,11 @@ impl OpenStreamStats {
send_stream_stats
}

pub fn new_receiver(&self, stream: &quinn::RecvStream, download_size: u64) -> Arc<StreamStats> {
pub fn new_receiver<RT: Runtime>(
&self,
stream: &quinn::RecvStream<RT>,
download_size: u64,
) -> Arc<StreamStats> {
let recv_stream_stats = StreamStats {
id: stream.id(),
request_size: download_size,
Expand Down
21 changes: 21 additions & 0 deletions quinn-runtime/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
[package]
name = "quinn-runtime"
yu-re-ka marked this conversation as resolved.
Show resolved Hide resolved
version = "0.1.0"
license = "MIT OR Apache-2.0"
repository = "https://github.com/quinn-rs/quinn"
description = "Abstractions for writing async-runtime-independent code"
keywords = ["quic"]
categories = ["network-programming", "asynchronous"]
workspace = ".."
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
tokio = { version = "1.0.1", features = ["net", "time"], optional = true }
async-io = { version = "1.6", optional = true }
async-std = { version = "1.11", optional = true }

[features]
runtime-tokio = [ "tokio" ]
runtime-async-std = [ "async-io", "async-std" ]
Loading