diff --git a/Cargo.lock b/Cargo.lock index 023a441b94..9ed7453567 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -200,7 +200,7 @@ checksum = "7378575ff571966e99a744addeff0bff98b8ada0dedf1956d59e634db95eaac1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.53", "synstructure 0.13.1", ] @@ -223,18 +223,18 @@ checksum = "7b18050c2cd6fe86c3a76584ef5e0baf286d038cda203eb6223df2cc413565f7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.53", ] [[package]] name = "async-trait" -version = "0.1.80" +version = "0.1.78" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6fa2087f2753a7da8cc1c0dbfcf89579dd57458e36769de5ac750b4671737ca" +checksum = "461abc97219de0eaaf81fe3ef974a540158f3d079c2ab200f891f1a2ef201e85" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.53", ] [[package]] @@ -328,7 +328,7 @@ dependencies = [ "heck 0.4.1", "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.53", ] [[package]] @@ -547,9 +547,9 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" [[package]] name = "cc" -version = "1.0.95" +version = "1.0.90" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d32a725bc159af97c3e629873bb9f88fb8cf8a4867175f76dc987815ea07c83b" +checksum = "8cd6604a82acf3039f1144f54b8eb34e91ffba622051189e71b781822d5ee1f5" [[package]] name = "cfg-if" @@ -570,9 +570,9 @@ dependencies = [ [[package]] name = "chrono" -version = "0.4.38" +version = "0.4.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a21f936df1771bf62b77f047b726c4625ff2e8aa607c01ec06e5a05bd8463401" +checksum = "8eaf5903dcbc0a39312feb77df2ff4c76387d591b9fc7b04a238dcf8bb62639a" dependencies = [ "android-tzdata", "iana-time-zone", @@ -652,7 +652,7 @@ dependencies = [ "heck 0.5.0", "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.53", ] [[package]] @@ -984,7 +984,7 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.53", ] [[package]] @@ -1008,7 +1008,7 @@ dependencies = [ "proc-macro2", "quote", "strsim 0.10.0", - "syn 2.0.60", + "syn 2.0.53", ] [[package]] @@ -1019,7 +1019,7 @@ checksum = "a668eda54683121533a393014d8692171709ff57a7d61f187b6e782719f8933f" dependencies = [ "darling_core", "quote", - "syn 2.0.60", + "syn 2.0.53", ] [[package]] @@ -1105,7 +1105,7 @@ checksum = "5fe87ce4529967e0ba1dcf8450bab64d97dfd5010a6256187ffe2e43e6f0e049" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.53", ] [[package]] @@ -1135,7 +1135,7 @@ checksum = "2bba3e9872d7c58ce7ef0fcf1844fcc3e23ef2a58377b50df35dd98e42a5726e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.53", "unicode-xid", ] @@ -1218,7 +1218,7 @@ checksum = "487585f4d0c6655fe74905e2504d8ad6908e4db67f744eb140876906c2f3175d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.53", ] [[package]] @@ -1363,7 +1363,7 @@ dependencies = [ "heck 0.4.1", "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.53", ] [[package]] @@ -1376,7 +1376,7 @@ dependencies = [ "num-traits", "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.53", ] [[package]] @@ -1396,7 +1396,7 @@ checksum = "5c785274071b1b420972453b306eeca06acf4633829db4223b58a2a8c5953bc4" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.53", ] [[package]] @@ -1607,7 +1607,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.53", ] [[package]] @@ -2702,7 +2702,7 @@ dependencies = [ [[package]] name = "iroh-net-bench" -version = "0.15.0" +version = "0.13.0" dependencies = [ "anyhow", "bytes", @@ -2710,6 +2710,9 @@ dependencies = [ "hdrhistogram", "iroh-net", "quinn", + "rcgen 0.11.3", + "rustls", + "socket2", "tokio", "tracing", "tracing-subscriber", @@ -3249,7 +3252,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.53", ] [[package]] @@ -3415,11 +3418,11 @@ checksum = "de3145af08024dea9fa9914f381a17b8fc6034dfb00f3a84013f7ff43f29ed4c" [[package]] name = "pem" -version = "3.0.4" +version = "3.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e459365e590736a54c3fa561947c84837534b8e9af6fc5bf781307e82658fae" +checksum = "1b8fcc794035347fb64beda2d3b462595dd2753e3f268d89c5aae77e8cf2c310" dependencies = [ - "base64 0.22.0", + "base64 0.21.7", "serde", ] @@ -3469,7 +3472,7 @@ dependencies = [ "pest_meta", "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.53", ] [[package]] @@ -3500,7 +3503,7 @@ checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.53", ] [[package]] @@ -3606,7 +3609,7 @@ dependencies = [ "proc-macro2", "quote", "regex", - "syn 2.0.60", + "syn 2.0.53", ] [[package]] @@ -3817,7 +3820,7 @@ checksum = "440f724eba9f6996b75d63681b0a92b06947f1457076d503a4d2e2c8f56442b8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.53", ] [[package]] @@ -3842,9 +3845,9 @@ dependencies = [ [[package]] name = "quanta" -version = "0.12.3" +version = "0.12.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e5167a477619228a0b284fac2674e3c388cba90631d7b7de620e6f1fcd08da5" +checksum = "9ca0b7bac0b97248c40bb77288fc52029cf1459c0461ea1b05ee32ccf011de2c" dependencies = [ "crossbeam-utils", "libc", @@ -4127,7 +4130,7 @@ checksum = "5fddb4f8d99b0a2ebafc65a87a69a7b9875e4b1ae1f00db265d300ef7f28bccc" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.53", ] [[package]] @@ -4610,14 +4613,14 @@ checksum = "e88edab869b01783ba905e7d0153f9fc1a6505a96e4ad3018011eedb838566d9" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.53", ] [[package]] name = "serde_json" -version = "1.0.116" +version = "1.0.114" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e17db7126d17feb94eb3fad46bf1a96b034e8aacbc2e775fe81505f8b0b2813" +checksum = "c5f09b1bd632ef549eaa9f60a1f8de742bdbc698e6cee2095fc84dde5f549ae0" dependencies = [ "itoa", "ryu", @@ -4691,7 +4694,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.53", ] [[package]] @@ -4942,7 +4945,7 @@ dependencies = [ "proc-macro2", "quote", "struct_iterable_internal", - "syn 2.0.60", + "syn 2.0.53", ] [[package]] @@ -4960,7 +4963,7 @@ dependencies = [ "proc-macro2", "quote", "structmeta-derive", - "syn 2.0.60", + "syn 2.0.53", ] [[package]] @@ -4971,7 +4974,7 @@ checksum = "a60bcaff7397072dca0017d1db428e30d5002e00b6847703e2e42005c95fbe00" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.53", ] [[package]] @@ -5002,7 +5005,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.60", + "syn 2.0.53", ] [[package]] @@ -5015,7 +5018,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.60", + "syn 2.0.53", ] [[package]] @@ -5077,9 +5080,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.60" +version = "2.0.53" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "909518bc7b1c9b779f1bbf07f2929d35af9f0f37e47c6e9ef7f9dddc1e1821f3" +checksum = "7383cd0e49fff4b6b90ca5670bfd3e9d6a733b3f90c686605aa7eec8c4996032" dependencies = [ "proc-macro2", "quote", @@ -5129,7 +5132,7 @@ checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.53", ] [[package]] @@ -5188,7 +5191,7 @@ dependencies = [ "proc-macro2", "quote", "structmeta", - "syn 2.0.60", + "syn 2.0.53", ] [[package]] @@ -5222,7 +5225,7 @@ checksum = "d1cd413b5d558b4c5bf3680e324a6fa5014e7b7c067a51e69dbdf47eb7148b66" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.53", ] [[package]] @@ -5320,7 +5323,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.53", ] [[package]] @@ -5545,7 +5548,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.53", ] [[package]] @@ -5792,7 +5795,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.53", "wasm-bindgen-shared", ] @@ -5826,7 +5829,7 @@ checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.53", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -5992,7 +5995,7 @@ checksum = "12168c33176773b86799be25e2a2ba07c7aab9968b37541f1094dbd7a60c8946" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.53", ] [[package]] @@ -6003,7 +6006,7 @@ checksum = "f6fc35f58ecd95a9b71c4f2329b911016e6bec66b3f2e6a4aad86bd2e99e2f9b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.53", ] [[package]] @@ -6014,7 +6017,7 @@ checksum = "9d8dc32e0095a7eeccebd0e3f09e9509365ecb3fc6ac4d6f5f14a3f6392942d1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.53", ] [[package]] @@ -6025,7 +6028,7 @@ checksum = "08990546bf4edef8f431fa6326e032865f27138718c587dc21bc0265bbcb57cc" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.53", ] [[package]] @@ -6305,7 +6308,7 @@ checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.53", ] [[package]] diff --git a/iroh-net/bench/Cargo.toml b/iroh-net/bench/Cargo.toml index 2c7ac57914..f1ce242862 100644 --- a/iroh-net/bench/Cargo.toml +++ b/iroh-net/bench/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "iroh-net-bench" -version = "0.15.0" +version = "0.13.0" edition = "2021" license = "MIT OR Apache-2.0" publish = false @@ -11,7 +11,10 @@ bytes = "1" hdrhistogram = { version = "7.2", default-features = false } iroh-net = { path = ".." } quinn = "0.10" +rcgen = "0.11.1" +rustls = { version = "0.21.0", default-features = false, features = ["quic"] } clap = { version = "4", features = ["derive"] } tokio = { version = "1.0.1", features = ["rt", "sync"] } tracing = "0.1" tracing-subscriber = { version = "0.3.0", default-features = false, features = ["env-filter", "fmt", "ansi", "time", "local-time"] } +socket2 = "0.5" \ No newline at end of file diff --git a/iroh-net/bench/src/bin/bulk.rs b/iroh-net/bench/src/bin/bulk.rs index a62dc01a55..cd7bc2cc17 100644 --- a/iroh-net/bench/src/bin/bulk.rs +++ b/iroh-net/bench/src/bin/bulk.rs @@ -1,35 +1,44 @@ -use std::{ - sync::{Arc, Mutex}, - time::Instant, -}; - -use anyhow::{Context, Result}; +use anyhow::Result; use clap::Parser; -use iroh_net::{MagicEndpoint, NodeAddr}; -use tokio::sync::Semaphore; -use tracing::{info, trace}; use iroh_net_bench::{ - configure_tracing_subscriber, connect_client, drain_stream, rt, send_data_on_stream, - server_endpoint, - stats::{Stats, TransferResult}, - Opt, + configure_tracing_subscriber, iroh, rt, Commands, quinn, s2n, }; fn main() { - let opt = Opt::parse(); + let cmd = Commands::parse(); configure_tracing_subscriber(); + match cmd { + Commands::Iroh(opt) => { + if let Err(e) = run_iroh(opt) { + eprintln!("failed: {e:#}"); + } + } + Commands::Quinn(opt) => { + if let Err(e) = run_quinn(opt) { + eprintln!("failed: {e:#}"); + } + } + Commands::S2n(opt) => { + if let Err(e) = run_s2n(opt) { + eprintln!("failed: {e:#}"); + } + } + } +} + +pub fn run_iroh(opt: iroh::Opt) -> Result<()> { let server_span = tracing::error_span!("server"); let runtime = rt(); let (server_addr, endpoint) = { let _guard = server_span.enter(); - server_endpoint(&runtime, &opt) + iroh::server_endpoint(&runtime, &opt) }; let server_thread = std::thread::spawn(move || { let _guard = server_span.entered(); - if let Err(e) = runtime.block_on(server(endpoint, opt)) { + if let Err(e) = runtime.block_on(iroh::server(endpoint, opt)) { eprintln!("server failed: {e:#}"); } }); @@ -40,7 +49,7 @@ fn main() { handles.push(std::thread::spawn(move || { let _guard = tracing::error_span!("client", id).entered(); let runtime = rt(); - match runtime.block_on(client(server_addr, opt)) { + match runtime.block_on(iroh::client(server_addr, opt)) { Ok(stats) => Ok(stats), Err(e) => { eprintln!("client failed: {e:#}"); @@ -59,153 +68,54 @@ fn main() { } server_thread.join().expect("server thread"); -} - -async fn server(endpoint: MagicEndpoint, opt: Opt) -> Result<()> { - let mut server_tasks = Vec::new(); - - // Handle only the expected amount of clients - for _ in 0..opt.clients { - let handshake = endpoint.accept().await.unwrap(); - let connection = handshake.await.context("handshake failed")?; - - server_tasks.push(tokio::spawn(async move { - loop { - let (mut send_stream, mut recv_stream) = match connection.accept_bi().await { - Err(quinn::ConnectionError::ApplicationClosed(_)) => break, - Err(e) => { - eprintln!("accepting stream failed: {e:?}"); - break; - } - Ok(stream) => stream, - }; - trace!("stream established"); - - tokio::spawn(async move { - drain_stream(&mut recv_stream, opt.read_unordered).await?; - send_data_on_stream(&mut send_stream, opt.download_size).await?; - Ok::<_, anyhow::Error>(()) - }); - } - - if opt.stats { - println!("\nServer connection stats:\n{:#?}", connection.stats()); - } - })); - } - - // Await all the tasks. We have to do this to prevent the runtime getting dropped - // and all server tasks to be cancelled - for handle in server_tasks { - if let Err(e) = handle.await { - eprintln!("Server task error: {e:?}"); - }; - } Ok(()) } -async fn client(server_addr: NodeAddr, opt: Opt) -> Result { - let (endpoint, connection) = connect_client(server_addr, opt).await?; - - let start = Instant::now(); - - let connection = Arc::new(connection); - - let mut stats = ClientStats::default(); - let mut first_error = None; - - let sem = Arc::new(Semaphore::new(opt.max_streams)); - let results = Arc::new(Mutex::new(Vec::new())); - for _ in 0..opt.streams { - let permit = sem.clone().acquire_owned().await.unwrap(); - let results = results.clone(); - let connection = connection.clone(); - tokio::spawn(async move { - let result = - handle_client_stream(connection, opt.upload_size, opt.read_unordered).await; - info!("stream finished: {:?}", result); - results.lock().unwrap().push(result); - drop(permit); - }); - } +pub fn run_quinn(opt: quinn::Opt) -> Result<()> { + let server_span = tracing::error_span!("server"); + let runtime = rt(); + let (server_addr, endpoint) = { + let _guard = server_span.enter(); + quinn::server_endpoint(&runtime, &opt) + }; - // Wait for remaining streams to finish - let _ = sem.acquire_many(opt.max_streams as u32).await.unwrap(); + let server_thread = std::thread::spawn(move || { + let _guard = server_span.entered(); + if let Err(e) = runtime.block_on(quinn::server(endpoint, opt)) { + eprintln!("server failed: {e:#}"); + } + }); - for result in results.lock().unwrap().drain(..) { - match result { - Ok((upload_result, download_result)) => { - stats.upload_stats.stream_finished(upload_result); - stats.download_stats.stream_finished(download_result); - } - Err(e) => { - if first_error.is_none() { - first_error = Some(e); + let mut handles = Vec::new(); + for id in 0..opt.clients { + let server_addr = server_addr.clone(); + handles.push(std::thread::spawn(move || { + let _guard = tracing::error_span!("client", id).entered(); + let runtime = rt(); + match runtime.block_on(quinn::client(server_addr, opt)) { + Ok(stats) => Ok(stats), + Err(e) => { + eprintln!("client failed: {e:#}"); + Err(e) } } - } - } - - stats.upload_stats.total_duration = start.elapsed(); - stats.download_stats.total_duration = start.elapsed(); - - // Explicit close of the connection, since handles can still be around due - // to `Arc`ing them - connection.close(0u32.into(), b"Benchmark done"); - - endpoint.close(0u32.into(), b"").await?; - - if opt.stats { - println!("\nClient connection stats:\n{:#?}", connection.stats()); + })); } - match first_error { - None => Ok(stats), - Some(e) => Err(e), + for (id, handle) in handles.into_iter().enumerate() { + // We print all stats at the end of the test sequentially to avoid + // them being garbled due to being printed concurrently + if let Ok(stats) = handle.join().expect("client thread") { + stats.print(id); + } } -} - -async fn handle_client_stream( - connection: Arc, - upload_size: u64, - read_unordered: bool, -) -> Result<(TransferResult, TransferResult)> { - let start = Instant::now(); - - let (mut send_stream, mut recv_stream) = connection - .open_bi() - .await - .context("failed to open stream")?; - - send_data_on_stream(&mut send_stream, upload_size).await?; - let upload_result = TransferResult::new(start.elapsed(), upload_size); - - let start = Instant::now(); - let size = drain_stream(&mut recv_stream, read_unordered).await?; - let download_result = TransferResult::new(start.elapsed(), size as u64); - - Ok((upload_result, download_result)) -} + server_thread.join().expect("server thread"); -#[derive(Default)] -struct ClientStats { - upload_stats: Stats, - download_stats: Stats, + Ok(()) } -impl ClientStats { - pub fn print(&self, client_id: usize) { - println!(); - println!("Client {client_id} stats:"); - - if self.upload_stats.total_size != 0 { - self.upload_stats.print("upload"); - } - - if self.download_stats.total_size != 0 { - self.download_stats.print("download"); - } - } -} +pub fn run_s2n(_opt: s2n::Opt) -> Result<()> { + Ok(()) +} \ No newline at end of file diff --git a/iroh-net/bench/src/iroh.rs b/iroh-net/bench/src/iroh.rs new file mode 100644 index 0000000000..56f6840453 --- /dev/null +++ b/iroh-net/bench/src/iroh.rs @@ -0,0 +1,218 @@ +use std::{net::SocketAddr, sync::{Arc, Mutex}, time::Instant}; + +use anyhow::{Context, Result}; +use clap::Parser; +use iroh_net::{relay::RelayMode, MagicEndpoint, NodeAddr}; +use tokio::sync::Semaphore; +use tracing::{info, trace}; + +use crate::{drain_stream, handle_client_stream, parse_byte_size, send_data_on_stream, stats::TransferResult, ClientStats}; + +pub const ALPN: &[u8] = b"n0/iroh-net-bench/0"; + +#[derive(Parser, Debug, Clone, Copy)] +#[clap(name = "iroh")] +pub struct Opt { + /// The total number of clients which should be created + #[clap(long = "clients", short = 'c', default_value = "1")] + pub clients: usize, + /// The total number of streams which should be created + #[clap(long = "streams", short = 'n', default_value = "1")] + pub streams: usize, + /// The amount of concurrent streams which should be used + #[clap(long = "max_streams", short = 'm', default_value = "1")] + pub max_streams: usize, + /// Number of bytes to transmit from server to client + /// + /// This can use SI prefixes for sizes. E.g. 1M will transfer 1MiB, 10G + /// will transfer 10GiB. + #[clap(long, default_value = "1G", value_parser = parse_byte_size)] + pub download_size: u64, + /// Number of bytes to transmit from client to server + /// + /// This can use SI prefixes for sizes. E.g. 1M will transfer 1MiB, 10G + /// will transfer 10GiB. + #[clap(long, default_value = "0", value_parser = parse_byte_size)] + pub upload_size: u64, + /// Show connection stats the at the end of the benchmark + #[clap(long = "stats")] + pub stats: bool, + /// Whether to use the unordered read API + #[clap(long = "unordered")] + pub read_unordered: bool, + /// Starting guess for maximum UDP payload size + #[clap(long, default_value = "1200")] + pub initial_mtu: u16, +} + +/// Creates a server endpoint which runs on the given runtime +pub fn server_endpoint(rt: &tokio::runtime::Runtime, opt: &Opt) -> (NodeAddr, MagicEndpoint) { + let _guard = rt.enter(); + rt.block_on(async move { + let ep = MagicEndpoint::builder() + .alpns(vec![ALPN.to_vec()]) + .relay_mode(RelayMode::Disabled) + .transport_config(transport_config(opt)) + .bind(0) + .await + .unwrap(); + let addr = ep.local_addr(); + let addr = SocketAddr::new("127.0.0.1".parse().unwrap(), addr.0.port()); + let addr = NodeAddr::new(ep.node_id()).with_direct_addresses([addr]); + (addr, ep) + }) +} + +/// Create a client endpoint and client connection +pub async fn connect_client( + server_addr: NodeAddr, + opt: Opt, +) -> Result<(MagicEndpoint, quinn::Connection)> { + let endpoint = MagicEndpoint::builder() + .alpns(vec![ALPN.to_vec()]) + .relay_mode(RelayMode::Disabled) + .transport_config(transport_config(&opt)) + .bind(0) + .await + .unwrap(); + + // TODO: We don't support passing client transport config currently + // let mut client_config = quinn::ClientConfig::new(Arc::new(crypto)); + // client_config.transport_config(Arc::new(transport_config(&opt))); + + let connection = endpoint + .connect(server_addr, ALPN) + .await + .context("unable to connect")?; + trace!("connected"); + + Ok((endpoint, connection)) +} + +pub fn transport_config(opt: &Opt) -> quinn::TransportConfig { + // High stream windows are chosen because the amount of concurrent streams + // is configurable as a parameter. + let mut config = quinn::TransportConfig::default(); + config.max_concurrent_uni_streams(opt.max_streams.try_into().unwrap()); + config.initial_mtu(opt.initial_mtu); + + // TODO: reenable when we upgrade quinn version + // let mut acks = quinn::AckFrequencyConfig::default(); + // acks.ack_eliciting_threshold(10u32.into()); + // config.ack_frequency_config(Some(acks)); + + config +} + + + + + + + + + +pub async fn server(endpoint: MagicEndpoint, opt: Opt) -> Result<()> { + let mut server_tasks = Vec::new(); + + // Handle only the expected amount of clients + for _ in 0..opt.clients { + let handshake = endpoint.accept().await.unwrap(); + let connection = handshake.await.context("handshake failed")?; + + server_tasks.push(tokio::spawn(async move { + loop { + let (mut send_stream, mut recv_stream) = match connection.accept_bi().await { + Err(quinn::ConnectionError::ApplicationClosed(_)) => break, + Err(e) => { + eprintln!("accepting stream failed: {e:?}"); + break; + } + Ok(stream) => stream, + }; + trace!("stream established"); + + tokio::spawn(async move { + drain_stream(&mut recv_stream, opt.read_unordered).await?; + send_data_on_stream(&mut send_stream, opt.download_size).await?; + Ok::<_, anyhow::Error>(()) + }); + } + + if opt.stats { + println!("\nServer connection stats:\n{:#?}", connection.stats()); + } + })); + } + + // Await all the tasks. We have to do this to prevent the runtime getting dropped + // and all server tasks to be cancelled + for handle in server_tasks { + if let Err(e) = handle.await { + eprintln!("Server task error: {e:?}"); + }; + } + + Ok(()) +} + +pub async fn client(server_addr: NodeAddr, opt: Opt) -> Result { + let (endpoint, connection) = connect_client(server_addr, opt).await?; + + let start = Instant::now(); + + let connection = Arc::new(connection); + + let mut stats = ClientStats::default(); + let mut first_error = None; + + let sem = Arc::new(Semaphore::new(opt.max_streams)); + let results = Arc::new(Mutex::new(Vec::new())); + for _ in 0..opt.streams { + let permit = sem.clone().acquire_owned().await.unwrap(); + let results = results.clone(); + let connection = connection.clone(); + tokio::spawn(async move { + let result = + handle_client_stream(connection, opt.upload_size, opt.read_unordered).await; + info!("stream finished: {:?}", result); + results.lock().unwrap().push(result); + drop(permit); + }); + } + + // Wait for remaining streams to finish + let _ = sem.acquire_many(opt.max_streams as u32).await.unwrap(); + + for result in results.lock().unwrap().drain(..) { + match result { + Ok((upload_result, download_result)) => { + stats.upload_stats.stream_finished(upload_result); + stats.download_stats.stream_finished(download_result); + } + Err(e) => { + if first_error.is_none() { + first_error = Some(e); + } + } + } + } + + stats.upload_stats.total_duration = start.elapsed(); + stats.download_stats.total_duration = start.elapsed(); + + // Explicit close of the connection, since handles can still be around due + // to `Arc`ing them + connection.close(0u32.into(), b"Benchmark done"); + + endpoint.close(0u32.into(), b"").await?; + + if opt.stats { + println!("\nClient connection stats:\n{:#?}", connection.stats()); + } + + match first_error { + None => Ok(stats), + Some(e) => Err(e), + } +} diff --git a/iroh-net/bench/src/lib.rs b/iroh-net/bench/src/lib.rs index 09adfb4d97..43c8fd54f9 100644 --- a/iroh-net/bench/src/lib.rs +++ b/iroh-net/bench/src/lib.rs @@ -1,15 +1,26 @@ -use std::{net::SocketAddr, num::ParseIntError, str::FromStr}; +use std::{net::SocketAddr, num::ParseIntError, str::FromStr, sync::Arc, time::Instant}; use anyhow::{Context, Result}; use bytes::Bytes; use clap::Parser; -use iroh_net::{relay::RelayMode, MagicEndpoint, NodeAddr}; +use ::quinn::{Connection, RecvStream, SendStream}; +use socket2::{Domain, Protocol, Socket, Type}; +use stats::{Stats, TransferResult}; use tokio::runtime::{Builder, Runtime}; -use tracing::trace; +use tracing::warn; +pub mod iroh; +pub mod quinn; +pub mod s2n; pub mod stats; -pub const ALPN: &[u8] = b"n0/iroh-net-bench/0"; +#[derive(Parser, Debug, Clone, Copy)] +#[clap(name = "bulk")] +pub enum Commands { + Iroh(iroh::Opt), + Quinn(quinn::Opt), + S2n(s2n::Opt), +} pub fn configure_tracing_subscriber() { tracing::subscriber::set_global_default( @@ -20,51 +31,95 @@ pub fn configure_tracing_subscriber() { .unwrap(); } -/// Creates a server endpoint which runs on the given runtime -pub fn server_endpoint(rt: &tokio::runtime::Runtime, opt: &Opt) -> (NodeAddr, MagicEndpoint) { - let _guard = rt.enter(); - rt.block_on(async move { - let ep = MagicEndpoint::builder() - .alpns(vec![ALPN.to_vec()]) - .relay_mode(RelayMode::Disabled) - .transport_config(transport_config(opt)) - .bind(0) - .await - .unwrap(); - let addr = ep.local_addr(); - let addr = SocketAddr::new("127.0.0.1".parse().unwrap(), addr.0.port()); - let addr = NodeAddr::new(ep.node_id()).with_direct_addresses([addr]); - (addr, ep) - }) +pub fn rt() -> Runtime { + Builder::new_current_thread().enable_all().build().unwrap() } -/// Create a client endpoint and client connection -pub async fn connect_client( - server_addr: NodeAddr, - opt: Opt, -) -> Result<(MagicEndpoint, quinn::Connection)> { - let endpoint = MagicEndpoint::builder() - .alpns(vec![ALPN.to_vec()]) - .relay_mode(RelayMode::Disabled) - .transport_config(transport_config(&opt)) - .bind(0) - .await - .unwrap(); +fn parse_byte_size(s: &str) -> Result { + let s = s.trim(); - // TODO: We don't support passing client transport config currently - // let mut client_config = quinn::ClientConfig::new(Arc::new(crypto)); - // client_config.transport_config(Arc::new(transport_config(&opt))); + let multiplier = match s.chars().last() { + Some('T') => 1024 * 1024 * 1024 * 1024, + Some('G') => 1024 * 1024 * 1024, + Some('M') => 1024 * 1024, + Some('k') => 1024, + _ => 1, + }; - let connection = endpoint - .connect(server_addr, ALPN) - .await - .context("unable to connect")?; - trace!("connected"); + let s = if multiplier != 1 { + &s[..s.len() - 1] + } else { + s + }; + + let base: u64 = u64::from_str(s)?; + + Ok(base * multiplier) +} - Ok((endpoint, connection)) +#[derive(Default)] +pub struct ClientStats { + upload_stats: Stats, + download_stats: Stats, } -pub async fn drain_stream(stream: &mut quinn::RecvStream, read_unordered: bool) -> Result { +impl ClientStats { + pub fn print(&self, client_id: usize) { + println!(); + println!("Client {client_id} stats:"); + + if self.upload_stats.total_size != 0 { + self.upload_stats.print("upload"); + } + + if self.download_stats.total_size != 0 { + self.download_stats.print("download"); + } + } +} + +pub fn bind_socket( + addr: SocketAddr, + send_buffer_size: usize, + recv_buffer_size: usize, +) -> Result { + let socket = Socket::new(Domain::for_address(addr), Type::DGRAM, Some(Protocol::UDP)) + .context("create socket")?; + + if addr.is_ipv6() { + socket.set_only_v6(false).context("set_only_v6")?; + } + + socket + .bind(&socket2::SockAddr::from(addr)) + .context("binding endpoint")?; + socket + .set_send_buffer_size(send_buffer_size) + .context("send buffer size")?; + socket + .set_recv_buffer_size(recv_buffer_size) + .context("recv buffer size")?; + + let buf_size = socket.send_buffer_size().context("send buffer size")?; + if buf_size < send_buffer_size { + warn!( + "Unable to set desired send buffer size. Desired: {}, Actual: {}", + send_buffer_size, buf_size + ); + } + + let buf_size = socket.recv_buffer_size().context("recv buffer size")?; + if buf_size < recv_buffer_size { + warn!( + "Unable to set desired recv buffer size. Desired: {}, Actual: {}", + recv_buffer_size, buf_size + ); + } + + Ok(socket.into()) +} + +pub async fn drain_stream(stream: &mut RecvStream, read_unordered: bool) -> Result { let mut read = 0; if read_unordered { @@ -93,7 +148,7 @@ pub async fn drain_stream(stream: &mut quinn::RecvStream, read_unordered: bool) Ok(read) } -pub async fn send_data_on_stream(stream: &mut quinn::SendStream, stream_size: u64) -> Result<()> { +pub async fn send_data_on_stream(stream: &mut SendStream, stream_size: u64) -> Result<()> { const DATA: &[u8] = &[0xAB; 1024 * 1024]; let bytes_data = Bytes::from_static(DATA); @@ -119,78 +174,26 @@ pub async fn send_data_on_stream(stream: &mut quinn::SendStream, stream_size: u6 Ok(()) } -pub fn rt() -> Runtime { - Builder::new_current_thread().enable_all().build().unwrap() -} - -pub fn transport_config(opt: &Opt) -> quinn::TransportConfig { - // High stream windows are chosen because the amount of concurrent streams - // is configurable as a parameter. - let mut config = quinn::TransportConfig::default(); - config.max_concurrent_uni_streams(opt.max_streams.try_into().unwrap()); - config.initial_mtu(opt.initial_mtu); - // TODO: reenable when we upgrade quinn version - // let mut acks = quinn::AckFrequencyConfig::default(); - // acks.ack_eliciting_threshold(10u32.into()); - // config.ack_frequency_config(Some(acks)); +async fn handle_client_stream( + connection: Arc, + upload_size: u64, + read_unordered: bool, +) -> Result<(TransferResult, TransferResult)> { + let start = Instant::now(); - config -} - -#[derive(Parser, Debug, Clone, Copy)] -#[clap(name = "bulk")] -pub struct Opt { - /// The total number of clients which should be created - #[clap(long = "clients", short = 'c', default_value = "1")] - pub clients: usize, - /// The total number of streams which should be created - #[clap(long = "streams", short = 'n', default_value = "1")] - pub streams: usize, - /// The amount of concurrent streams which should be used - #[clap(long = "max_streams", short = 'm', default_value = "1")] - pub max_streams: usize, - /// Number of bytes to transmit from server to client - /// - /// This can use SI prefixes for sizes. E.g. 1M will transfer 1MiB, 10G - /// will transfer 10GiB. - #[clap(long, default_value = "1G", value_parser = parse_byte_size)] - pub download_size: u64, - /// Number of bytes to transmit from client to server - /// - /// This can use SI prefixes for sizes. E.g. 1M will transfer 1MiB, 10G - /// will transfer 10GiB. - #[clap(long, default_value = "0", value_parser = parse_byte_size)] - pub upload_size: u64, - /// Show connection stats the at the end of the benchmark - #[clap(long = "stats")] - pub stats: bool, - /// Whether to use the unordered read API - #[clap(long = "unordered")] - pub read_unordered: bool, - /// Starting guess for maximum UDP payload size - #[clap(long, default_value = "1200")] - pub initial_mtu: u16, -} - -fn parse_byte_size(s: &str) -> Result { - let s = s.trim(); + let (mut send_stream, mut recv_stream) = connection + .open_bi() + .await + .context("failed to open stream")?; - let multiplier = match s.chars().last() { - Some('T') => 1024 * 1024 * 1024 * 1024, - Some('G') => 1024 * 1024 * 1024, - Some('M') => 1024 * 1024, - Some('k') => 1024, - _ => 1, - }; + send_data_on_stream(&mut send_stream, upload_size).await?; - let s = if multiplier != 1 { - &s[..s.len() - 1] - } else { - s - }; + let upload_result = TransferResult::new(start.elapsed(), upload_size); - let base: u64 = u64::from_str(s)?; + let start = Instant::now(); + let size = drain_stream(&mut recv_stream, read_unordered).await?; + let download_result = TransferResult::new(start.elapsed(), size as u64); - Ok(base * multiplier) -} + Ok((upload_result, download_result)) +} \ No newline at end of file diff --git a/iroh-net/bench/src/quinn.rs b/iroh-net/bench/src/quinn.rs new file mode 100644 index 0000000000..79a9ae6d0a --- /dev/null +++ b/iroh-net/bench/src/quinn.rs @@ -0,0 +1,301 @@ +use std::{net::SocketAddr, sync::{Arc, Mutex}, time::Instant}; + +use anyhow::{Context, Result}; +use clap::Parser; +use quinn::{Connection, Endpoint, TokioRuntime}; +use tokio::sync::Semaphore; +use tracing::{info, trace}; + +use crate::{bind_socket, drain_stream, handle_client_stream, parse_byte_size, send_data_on_stream, ClientStats}; + +pub const ALPN: &[u8] = b"n0/quinn-bench/0"; + +#[derive(Parser, Debug, Clone, Copy)] +#[clap(name = "quinn")] +pub struct Opt { + /// The total number of clients which should be created + #[clap(long = "clients", short = 'c', default_value = "1")] + pub clients: usize, + /// The total number of streams which should be created + #[clap(long = "streams", short = 'n', default_value = "1")] + pub streams: usize, + /// The amount of concurrent streams which should be used + #[clap(long = "max_streams", short = 'm', default_value = "1")] + pub max_streams: usize, + /// Number of bytes to transmit from server to client + /// + /// This can use SI prefixes for sizes. E.g. 1M will transfer 1MiB, 10G + /// will transfer 10GiB. + #[clap(long, default_value = "1G", value_parser = parse_byte_size)] + pub download_size: u64, + /// Number of bytes to transmit from client to server + /// + /// This can use SI prefixes for sizes. E.g. 1M will transfer 1MiB, 10G + /// will transfer 10GiB. + #[clap(long, default_value = "0", value_parser = parse_byte_size)] + pub upload_size: u64, + /// Show connection stats the at the end of the benchmark + #[clap(long = "stats")] + pub stats: bool, + /// Whether to use the unordered read API + #[clap(long = "unordered")] + pub read_unordered: bool, + /// Starting guess for maximum UDP payload size + #[clap(long, default_value = "1200")] + pub initial_mtu: u16, + + /// Send buffer size in bytes + #[clap(long, default_value = "2097152")] + send_buffer_size: usize, + /// Receive buffer size in bytes + #[clap(long, default_value = "2097152")] + recv_buffer_size: usize, +} + +/// Creates a server endpoint which runs on the given runtime +pub fn server_endpoint(rt: &tokio::runtime::Runtime, opt: &Opt) -> (SocketAddr, quinn::Endpoint) { + let (key, cert) = { + let cert = rcgen::generate_simple_self_signed(vec!["localhost".into()]).unwrap(); + ( + rustls::PrivateKey(cert.serialize_private_key_der()), + vec![rustls::Certificate(cert.serialize_der().unwrap())], + ) + }; + + let mut crypto = rustls::ServerConfig::builder() + .with_cipher_suites(PERF_CIPHER_SUITES) + .with_safe_default_kx_groups() + .with_protocol_versions(&[&rustls::version::TLS13]) + .unwrap() + .with_no_client_auth() + .with_single_cert(cert, key) + .unwrap(); + crypto.alpn_protocols = vec![ALPN.to_vec()]; + + let transport = transport_config(opt); + + let mut server_config = quinn::ServerConfig::with_crypto(Arc::new(crypto)); + server_config.transport_config(Arc::new(transport)); + + let addr = SocketAddr::new("127.0.0.1".parse().unwrap(), 0); + + let socket = bind_socket(addr, opt.send_buffer_size, opt.recv_buffer_size).unwrap(); + + let _guard = rt.enter(); + rt.block_on(async move { + let ep = quinn::Endpoint::new( + Default::default(), + Some(server_config), + socket, + Arc::new(TokioRuntime), + ).unwrap(); + let addr = ep.local_addr().unwrap(); + (addr, ep) + }) +} + +pub fn transport_config(opt: &Opt) -> quinn::TransportConfig { + // High stream windows are chosen because the amount of concurrent streams + // is configurable as a parameter. + let mut config = quinn::TransportConfig::default(); + config.max_concurrent_uni_streams(opt.max_streams.try_into().unwrap()); + config.initial_mtu(opt.initial_mtu); + config +} + +pub static PERF_CIPHER_SUITES: &[rustls::SupportedCipherSuite] = &[ + rustls::cipher_suite::TLS13_AES_128_GCM_SHA256, + rustls::cipher_suite::TLS13_AES_256_GCM_SHA384, + rustls::cipher_suite::TLS13_CHACHA20_POLY1305_SHA256, +]; + +pub async fn server(endpoint: quinn::Endpoint, opt: Opt) -> Result<()> { + let mut server_tasks = Vec::new(); + + // Handle only the expected amount of clients + for _ in 0..opt.clients { + let handshake = endpoint.accept().await.unwrap(); + let connection = handshake.await.context("handshake failed")?; + + server_tasks.push(tokio::spawn(async move { + loop { + let (mut send_stream, mut recv_stream) = match connection.accept_bi().await { + Err(quinn::ConnectionError::ApplicationClosed(_)) => break, + Err(e) => { + eprintln!("accepting stream failed: {e:?}"); + break; + } + Ok(stream) => stream, + }; + trace!("stream established"); + + tokio::spawn(async move { + drain_stream(&mut recv_stream, opt.read_unordered).await?; + send_data_on_stream(&mut send_stream, opt.download_size).await?; + Ok::<_, anyhow::Error>(()) + }); + } + + if opt.stats { + println!("\nServer connection stats:\n{:#?}", connection.stats()); + } + })); + } + + // Await all the tasks. We have to do this to prevent the runtime getting dropped + // and all server tasks to be cancelled + for handle in server_tasks { + if let Err(e) = handle.await { + eprintln!("Server task error: {e:?}"); + }; + } + + Ok(()) +} + + +pub async fn client(server_addr: SocketAddr, opt: Opt) -> Result { + let (endpoint, connection) = connect_client(server_addr, opt).await?; + + let start = Instant::now(); + + let connection = Arc::new(connection); + + let mut stats = ClientStats::default(); + let mut first_error = None; + + let sem = Arc::new(Semaphore::new(opt.max_streams)); + let results = Arc::new(Mutex::new(Vec::new())); + for _ in 0..opt.streams { + let permit = sem.clone().acquire_owned().await.unwrap(); + let results = results.clone(); + let connection = connection.clone(); + tokio::spawn(async move { + let result = + handle_client_stream(connection, opt.upload_size, opt.read_unordered).await; + info!("stream finished: {:?}", result); + results.lock().unwrap().push(result); + drop(permit); + }); + } + + // Wait for remaining streams to finish + let _ = sem.acquire_many(opt.max_streams as u32).await.unwrap(); + + for result in results.lock().unwrap().drain(..) { + match result { + Ok((upload_result, download_result)) => { + stats.upload_stats.stream_finished(upload_result); + stats.download_stats.stream_finished(download_result); + } + Err(e) => { + if first_error.is_none() { + first_error = Some(e); + } + } + } + } + + stats.upload_stats.total_duration = start.elapsed(); + stats.download_stats.total_duration = start.elapsed(); + + // Explicit close of the connection, since handles can still be around due + // to `Arc`ing them + connection.close(0u32.into(), b"Benchmark done"); + + endpoint.close(0u32.into(), b""); + + if opt.stats { + println!("\nClient connection stats:\n{:#?}", connection.stats()); + } + + match first_error { + None => Ok(stats), + Some(e) => Err(e), + } +} + +/// Create a client endpoint and client connection +pub async fn connect_client( + // rt: &tokio::runtime::Runtime, + server_addr: SocketAddr, + opt: Opt, +) -> Result<(Endpoint, Connection)> { + // let endpoint = MagicEndpoint::builder() + // .alpns(vec![ALPN.to_vec()]) + // .relay_mode(RelayMode::Disabled) + // .transport_config(transport_config(&opt)) + // .bind(0) + // .await + // .unwrap(); + + // // TODO: We don't support passing client transport config currently + // // let mut client_config = quinn::ClientConfig::new(Arc::new(crypto)); + // // client_config.transport_config(Arc::new(transport_config(&opt))); + + // let connection = endpoint + // .connect(server_addr, ALPN) + // .await + // .context("unable to connect")?; + // trace!("connected"); + + + let mut crypto = rustls::ClientConfig::builder() + .with_cipher_suites(PERF_CIPHER_SUITES) + .with_safe_default_kx_groups() + .with_protocol_versions(&[&rustls::version::TLS13]) + .unwrap() + .with_custom_certificate_verifier(SkipServerVerification::new()) + .with_no_client_auth(); + crypto.alpn_protocols = vec![ALPN.to_vec()]; + + let transport = transport_config(&opt); + + let mut config = quinn::ClientConfig::new(Arc::new(crypto)); + config.transport_config(Arc::new(transport)); + + let addr = SocketAddr::new("127.0.0.1".parse().unwrap(), 0); + + let socket = bind_socket(addr, opt.send_buffer_size, opt.recv_buffer_size).unwrap(); + + // let _guard = rt.enter(); + // rt.block_on(async move { + let ep = quinn::Endpoint::new( + Default::default(), + None, + socket, + Arc::new(TokioRuntime), + ).unwrap(); + // let addr = ep.local_addr().unwrap(); + // (addr, ep) + let connection = ep + .connect_with(config, server_addr, "local")? + .await + .context("connecting").unwrap(); + Ok((ep, connection)) + // }) + + // Ok((endpoint, connection)) +} + +struct SkipServerVerification; + +impl SkipServerVerification { + fn new() -> Arc { + Arc::new(Self) + } +} + +impl rustls::client::ServerCertVerifier for SkipServerVerification { + fn verify_server_cert( + &self, + _end_entity: &rustls::Certificate, + _intermediates: &[rustls::Certificate], + _server_name: &rustls::ServerName, + _scts: &mut dyn Iterator, + _ocsp_response: &[u8], + _now: std::time::SystemTime, + ) -> Result { + Ok(rustls::client::ServerCertVerified::assertion()) + } +} \ No newline at end of file diff --git a/iroh-net/bench/src/s2n.rs b/iroh-net/bench/src/s2n.rs new file mode 100644 index 0000000000..b30cfd83fa --- /dev/null +++ b/iroh-net/bench/src/s2n.rs @@ -0,0 +1,6 @@ +use clap::Parser; + +#[derive(Parser, Debug, Clone, Copy)] +#[clap(name = "s2n")] +pub struct Opt { +} \ No newline at end of file