Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Named futures + replace NewConnection with Connection methods #1357

Merged
merged 5 commits into from
Sep 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 5 additions & 10 deletions bench/src/bin/bulk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,22 +65,17 @@ async fn server(mut incoming: quinn::Incoming, opt: Opt) -> Result<()> {
// Handle only the expected amount of clients
for _ in 0..opt.clients {
let handshake = incoming.next().await.unwrap();
let quinn::NewConnection {
mut bi_streams,
connection,
..
} = handshake.await.context("handshake failed")?;
let connection = handshake.await.context("handshake failed")?;

server_tasks.push(tokio::spawn(async move {
loop {
let (mut send_stream, mut recv_stream) = match bi_streams.next().await {
None => break,
Some(Err(quinn::ConnectionError::ApplicationClosed(_))) => break,
Some(Err(e)) => {
let (mut send_stream, mut recv_stream) = match connection.accept_bi().await {
Err(quinn::ConnectionError::ApplicationClosed(_)) => break,
Err(e) => {
eprintln!("accepting stream failed: {:?}", e);
break;
}
Some(Ok(stream)) => stream,
Ok(stream) => stream,
};
trace!("stream established");

Expand Down
2 changes: 1 addition & 1 deletion bench/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ pub async fn connect_client(
let mut client_config = quinn::ClientConfig::new(Arc::new(crypto));
client_config.transport_config(Arc::new(transport_config(&opt)));

let quinn::NewConnection { connection, .. } = endpoint
let connection = endpoint
.connect_with(client_config, server_addr, "localhost")
.unwrap()
.await
Expand Down
27 changes: 5 additions & 22 deletions perf/src/bin/perf_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,24 +119,17 @@ async fn run(opt: Opt) -> Result<()> {

let stream_stats = OpenStreamStats::default();

let quinn::NewConnection {
connection,
uni_streams,
..
} = endpoint
let connection = endpoint
.connect_with(cfg, addr, host_name)?
.await
.context("connecting")?;

info!("established");

let acceptor = UniAcceptor(Arc::new(tokio::sync::Mutex::new(uni_streams)));

let drive_fut = async {
tokio::try_join!(
drive_uni(
connection.clone(),
acceptor,
stream_stats.clone(),
opt.uni_requests,
opt.upload_size,
Expand Down Expand Up @@ -236,7 +229,6 @@ async fn drain_stream(

async fn drive_uni(
connection: quinn::Connection,
acceptor: UniAcceptor,
stream_stats: OpenStreamStats,
concurrency: u64,
upload: u64,
Expand All @@ -247,12 +239,12 @@ async fn drive_uni(
loop {
let permit = sem.clone().acquire_owned().await.unwrap();
let send = connection.open_uni().await?;
let acceptor = acceptor.clone();
let stream_stats = stream_stats.clone();

debug!("sending request on {}", send.id());
let connection = connection.clone();
tokio::spawn(async move {
if let Err(e) = request_uni(send, acceptor, upload, download, stream_stats).await {
if let Err(e) = request_uni(send, connection, upload, download, stream_stats).await {
error!("sending request failed: {:#}", e);
}

Expand All @@ -263,19 +255,13 @@ async fn drive_uni(

async fn request_uni(
send: quinn::SendStream,
acceptor: UniAcceptor,
conn: quinn::Connection,
upload: u64,
download: u64,
stream_stats: OpenStreamStats,
) -> Result<()> {
request(send, upload, download, stream_stats.clone()).await?;
let recv = {
let mut guard = acceptor.0.lock().await;
guard
.next()
.await
.ok_or_else(|| anyhow::anyhow!("End of stream"))
}??;
let recv = conn.accept_uni().await?;
drain_stream(recv, download, stream_stats).await?;
Ok(())
}
Expand Down Expand Up @@ -348,9 +334,6 @@ async fn request_bi(
Ok(())
}

#[derive(Clone)]
struct UniAcceptor(Arc<tokio::sync::Mutex<quinn::IncomingUniStreams>>);

struct SkipServerVerification;

impl SkipServerVerification {
Expand Down
24 changes: 7 additions & 17 deletions perf/src/bin/perf_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,16 +103,11 @@ async fn run(opt: Opt) -> Result<()> {
}

async fn handle(handshake: quinn::Connecting, opt: Arc<Opt>) -> Result<()> {
let quinn::NewConnection {
uni_streams,
bi_streams,
connection,
..
} = handshake.await.context("handshake failed")?;
let connection = handshake.await.context("handshake failed")?;
debug!("{} connected", connection.remote_address());
tokio::try_join!(
drive_uni(connection.clone(), uni_streams),
drive_bi(bi_streams),
drive_uni(connection.clone()),
drive_bi(connection.clone()),
conn_stats(connection, opt)
)?;
Ok(())
Expand All @@ -129,12 +124,8 @@ async fn conn_stats(connection: quinn::Connection, opt: Arc<Opt>) -> Result<()>
Ok(())
}

async fn drive_uni(
connection: quinn::Connection,
mut streams: quinn::IncomingUniStreams,
) -> Result<()> {
while let Some(stream) = streams.next().await {
let stream = stream?;
async fn drive_uni(connection: quinn::Connection) -> Result<()> {
while let Ok(stream) = connection.accept_uni().await {
let connection = connection.clone();
tokio::spawn(async move {
if let Err(e) = handle_uni(connection, stream).await {
Expand All @@ -152,9 +143,8 @@ async fn handle_uni(connection: quinn::Connection, stream: quinn::RecvStream) ->
Ok(())
}

async fn drive_bi(mut streams: quinn::IncomingBiStreams) -> Result<()> {
while let Some(stream) = streams.next().await {
let (send, recv) = stream?;
async fn drive_bi(connection: quinn::Connection) -> Result<()> {
while let Ok((send, recv)) = connection.accept_bi().await {
tokio::spawn(async move {
if let Err(e) = handle_bi(send, recv).await {
error!("request failed: {:#}", e);
Expand Down
1 change: 1 addition & 0 deletions quinn/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ futures-io = { version = "0.3.19", optional = true }
# Implements futures::Stream for async streams such as `Incoming`
futures-core = { version = "0.3.19", optional = true }
rustc-hash = "1.1"
pin-project-lite = "0.2"
proto = { package = "quinn-proto", path = "../quinn-proto", version = "0.8", default-features = false }
rustls = { version = "0.20.3", default-features = false, features = ["quic"], optional = true }
thiserror = "1.0.21"
Expand Down
8 changes: 3 additions & 5 deletions quinn/benches/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,16 +106,14 @@ impl Context {
};
let handle = runtime.spawn(
async move {
let quinn::NewConnection {
mut uni_streams, ..
} = incoming
let connection = incoming
.next()
.await
.expect("accept")
.await
.expect("connect");

while let Some(Ok(mut stream)) = uni_streams.next().await {
while let Ok(mut stream) = connection.accept_uni().await {
tokio::spawn(async move {
while stream
.read_chunk(usize::MAX, false)
Expand All @@ -142,7 +140,7 @@ impl Context {
let _guard = runtime.enter();
Endpoint::client(SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 0)).unwrap()
};
let quinn::NewConnection { connection, .. } = runtime
let connection = runtime
.block_on(async {
endpoint
.connect_with(self.client_config.clone(), server_addr, "localhost")
Expand Down
5 changes: 1 addition & 4 deletions quinn/examples/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,14 +108,11 @@ async fn run(options: Opt) -> Result<()> {
.ok_or_else(|| anyhow!("no hostname specified"))?;

eprintln!("connecting to {} at {}", host, remote);
let new_conn = endpoint
let conn = endpoint
.connect(remote, host)?
.await
.map_err(|e| anyhow!("failed to connect: {}", e))?;
eprintln!("connected at {:?}", start.elapsed());
let quinn::NewConnection {
connection: conn, ..
} = new_conn;
let (mut send, recv) = conn
.open_bi()
.await
Expand Down
12 changes: 4 additions & 8 deletions quinn/examples/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,25 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// accept a single connection
tokio::spawn(async move {
let incoming_conn = incoming.next().await.unwrap();
let new_conn = incoming_conn.await.unwrap();
let conn = incoming_conn.await.unwrap();
println!(
"[server] connection accepted: addr={}",
new_conn.connection.remote_address()
conn.remote_address()
);
// Dropping all handles associated with a connection implicitly closes it
});

let endpoint = make_client_endpoint("0.0.0.0:0".parse().unwrap(), &[&server_cert])?;
// connect to server
let quinn::NewConnection {
connection,
mut uni_streams,
..
} = endpoint
let connection = endpoint
.connect(server_addr, "localhost")
.unwrap()
.await
.unwrap();
println!("[client] connected: addr={}", connection.remote_address());

// Waiting for a stream will complete with an error when the server closes the connection
let _ = uni_streams.next().await;
let _ = connection.accept_uni().await;

// Give the server has a chance to clean up
endpoint.wait_idle().await;
Expand Down
6 changes: 3 additions & 3 deletions quinn/examples/insecure_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ async fn run_server(addr: SocketAddr) {
let (mut incoming, _server_cert) = make_server_endpoint(addr).unwrap();
// accept a single connection
let incoming_conn = incoming.next().await.unwrap();
let new_conn = incoming_conn.await.unwrap();
let conn = incoming_conn.await.unwrap();
println!(
"[server] connection accepted: addr={}",
new_conn.connection.remote_address()
conn.remote_address()
);
}

Expand All @@ -36,7 +36,7 @@ async fn run_client(server_addr: SocketAddr) -> Result<(), Box<dyn Error>> {
endpoint.set_default_client_config(client_cfg);

// connect to server
let quinn::NewConnection { connection, .. } = endpoint
let connection = endpoint
.connect(server_addr, "localhost")
.unwrap()
.await
Expand Down
10 changes: 3 additions & 7 deletions quinn/examples/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,7 @@ async fn run(options: Opt) -> Result<()> {
}

async fn handle_connection(root: Arc<Path>, conn: quinn::Connecting) -> Result<()> {
let quinn::NewConnection {
connection,
mut bi_streams,
..
} = conn.await?;
let connection = conn.await?;
let span = info_span!(
"connection",
remote = %connection.remote_address(),
Expand All @@ -180,7 +176,8 @@ async fn handle_connection(root: Arc<Path>, conn: quinn::Connecting) -> Result<(
info!("established");

// Each stream initiated by the client constitutes a new request.
while let Some(stream) = bi_streams.next().await {
loop {
let stream = connection.accept_bi().await;
let stream = match stream {
Err(quinn::ConnectionError::ApplicationClosed { .. }) => {
info!("connection closed");
Expand All @@ -201,7 +198,6 @@ async fn handle_connection(root: Arc<Path>, conn: quinn::Connecting) -> Result<(
.instrument(info_span!("request")),
);
}
Ok(())
}
.instrument(span)
.await?;
Expand Down
4 changes: 2 additions & 2 deletions quinn/examples/single_socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ fn run_server(addr: SocketAddr) -> Result<Vec<u8>, Box<dyn Error>> {
let (mut incoming, server_cert) = make_server_endpoint(addr)?;
// accept a single connection
tokio::spawn(async move {
let quinn::NewConnection { connection, .. } = incoming.next().await.unwrap().await.unwrap();
let connection = incoming.next().await.unwrap().await.unwrap();
println!(
"[server] incoming connection: addr={}",
connection.remote_address()
Expand All @@ -54,6 +54,6 @@ fn run_server(addr: SocketAddr) -> Result<Vec<u8>, Box<dyn Error>> {
/// Attempt QUIC connection with the given server address.
async fn run_client(endpoint: &Endpoint, server_addr: SocketAddr) {
let connect = endpoint.connect(server_addr, "localhost").unwrap();
let quinn::NewConnection { connection, .. } = connect.await.unwrap();
let connection = connect.await.unwrap();
println!("[client] connected: addr={}", connection.remote_address());
}
Loading