diff --git a/iroh-gossip/Cargo.toml b/iroh-gossip/Cargo.toml index 9e4b329478..de0c0d2b5b 100644 --- a/iroh-gossip/Cargo.toml +++ b/iroh-gossip/Cargo.toml @@ -32,7 +32,7 @@ iroh-base = { version = "0.16.0", path = "../iroh-base" } # net dependencies (optional) futures-lite = { version = "2.3", optional = true } -iroh-net = { path = "../iroh-net", version = "0.16.0", optional = true, default-features = false } +iroh-net = { path = "../iroh-net", version = "0.16.0", optional = true, default-features = false, features = ["test-utils"] } tokio = { version = "1", optional = true, features = ["io-util", "sync", "rt", "macros", "net", "fs"] } tokio-util = { version = "0.7.8", optional = true, features = ["codec"] } genawaiter = { version = "0.99.1", default-features = false, features = ["futures03"] } diff --git a/iroh-gossip/src/net.rs b/iroh-gossip/src/net.rs index e415e68c69..97f554cda7 100644 --- a/iroh-gossip/src/net.rs +++ b/iroh-gossip/src/net.rs @@ -653,6 +653,7 @@ fn decode_peer_data(peer_data: &PeerData) -> anyhow::Result { mod test { use std::time::Duration; + use iroh_net::key::SecretKey; use iroh_net::relay::{RelayMap, RelayMode}; use tokio::spawn; use tokio::time::timeout; @@ -661,10 +662,15 @@ mod test { use super::*; - async fn create_endpoint(relay_map: RelayMap) -> anyhow::Result { + async fn create_endpoint( + rng: &mut rand_chacha::ChaCha12Rng, + relay_map: RelayMap, + ) -> anyhow::Result { Endpoint::builder() + .secret_key(SecretKey::generate_with_rng(rng)) .alpns(vec![GOSSIP_ALPN.to_vec()]) .relay_mode(RelayMode::Custom(relay_map)) + .insecure_skip_relay_cert_verify(true) .bind(0) .await } @@ -688,16 +694,15 @@ mod test { } #[tokio::test] - #[ignore = "flaky"] async fn gossip_net_smoke() { + let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1); let _guard = iroh_test::logging::setup(); - let (relay_map, relay_url, cleanup) = util::run_relay_and_stun([127, 0, 0, 1].into()) - .await - .unwrap(); + let (relay_map, relay_url, _guard) = + iroh_net::test_utils::run_relay_server().await.unwrap(); - let ep1 = create_endpoint(relay_map.clone()).await.unwrap(); - let ep2 = create_endpoint(relay_map.clone()).await.unwrap(); - let ep3 = create_endpoint(relay_map.clone()).await.unwrap(); + let ep1 = create_endpoint(&mut rng, relay_map.clone()).await.unwrap(); + let ep2 = create_endpoint(&mut rng, relay_map.clone()).await.unwrap(); + let ep3 = create_endpoint(&mut rng, relay_map.clone()).await.unwrap(); let addr1 = AddrInfo { relay_url: Some(relay_url.clone()), direct_addresses: Default::default(), @@ -722,8 +727,8 @@ mod test { let cancel = CancellationToken::new(); let tasks = [ spawn(endpoint_loop(ep1.clone(), go1.clone(), cancel.clone())), - spawn(endpoint_loop(ep2.clone(), go3.clone(), cancel.clone())), - spawn(endpoint_loop(ep3.clone(), go2.clone(), cancel.clone())), + spawn(endpoint_loop(ep2.clone(), go2.clone(), cancel.clone())), + spawn(endpoint_loop(ep3.clone(), go3.clone(), cancel.clone())), ]; debug!("----- adding peers ----- "); @@ -739,7 +744,7 @@ mod test { go2.join(topic, vec![pi1]).await.unwrap().await.unwrap(); go3.join(topic, vec![pi1]).await.unwrap().await.unwrap(); - let len = 10; + let len = 2; // subscribe nodes 2 and 3 to the topic let mut stream2 = go2.subscribe(topic).await.unwrap(); @@ -814,124 +819,5 @@ mod test { .unwrap() .unwrap(); } - drop(cleanup); - } - - // This is copied from iroh-net/src/hp/magicsock/conn.rs - // TODO: Move into a public test_utils module in iroh-net? - mod util { - use std::net::{IpAddr, SocketAddr}; - - use anyhow::Result; - use iroh_net::{ - key::SecretKey, - relay::{RelayMap, RelayUrl}, - stun::{is, parse_binding_request, response}, - }; - use tokio::sync::oneshot; - use tracing::{debug, info, trace}; - - /// A drop guard to clean up test infrastructure. - /// - /// After dropping the test infrastructure will asynchronously shutdown and release its - /// resources. - // Nightly sees the sender as dead code currently, but we only rely on Drop of the - // sender. - #[derive(Debug)] - #[allow(dead_code)] - pub(crate) struct CleanupDropGuard(pub(crate) oneshot::Sender<()>); - - /// Runs a relay server with STUN enabled suitable for tests. - /// - /// The returned `Url` is the url of the relay server in the returned [`RelayMap`], it - /// is always `Some` as that is how the [`Endpoint::connect`] API expects it. - /// - /// [`Endpoint::connect`]: crate::endpoint::Endpoint - pub(crate) async fn run_relay_and_stun( - stun_ip: IpAddr, - ) -> Result<(RelayMap, RelayUrl, CleanupDropGuard)> { - let server_key = SecretKey::generate(); - let server = iroh_net::relay::http::ServerBuilder::new("127.0.0.1:0".parse().unwrap()) - .secret_key(Some(server_key)) - .tls_config(None) - .spawn() - .await?; - - let http_addr = server.addr(); - info!("relay listening on {:?}", http_addr); - - let (stun_addr, stun_drop_guard) = serve(stun_ip).await?; - let relay_url: RelayUrl = format!("http://localhost:{}", http_addr.port()) - .parse() - .unwrap(); - let m = RelayMap::default_from_node(relay_url.clone(), stun_addr.port()); - - let (tx, rx) = oneshot::channel(); - tokio::spawn(async move { - let _stun_cleanup = stun_drop_guard; // move into this closure - - // Wait until we're dropped or receive a message. - rx.await.ok(); - server.shutdown().await; - }); - - Ok((m, relay_url, CleanupDropGuard(tx))) - } - - /// Sets up a simple STUN server. - async fn serve(ip: IpAddr) -> Result<(SocketAddr, CleanupDropGuard)> { - let pc = tokio::net::UdpSocket::bind((ip, 0)).await?; - let mut addr = pc.local_addr()?; - match addr.ip() { - IpAddr::V4(ip) => { - if ip.octets() == [0, 0, 0, 0] { - addr.set_ip("127.0.0.1".parse().unwrap()); - } - } - _ => unreachable!("using ipv4"), - } - - info!("STUN listening on {}", addr); - let (s, r) = oneshot::channel(); - tokio::task::spawn(async move { - run_stun(pc, r).await; - }); - - Ok((addr, CleanupDropGuard(s))) - } - - async fn run_stun(pc: tokio::net::UdpSocket, mut done: oneshot::Receiver<()>) { - let mut buf = vec![0u8; 64 << 10]; - loop { - trace!("read loop"); - tokio::select! { - _ = &mut done => { - debug!("shutting down"); - break; - } - res = pc.recv_from(&mut buf) => match res { - Ok((n, addr)) => { - trace!("read packet {}bytes from {}", n, addr); - let pkt = &buf[..n]; - if !is(pkt) { - debug!("received non STUN pkt"); - continue; - } - if let Ok(txid) = parse_binding_request(pkt) { - debug!("received binding request"); - - let res = response(txid, addr); - if let Err(err) = pc.send_to(&res, addr).await { - eprintln!("STUN server write failed: {:?}", err); - } - } - } - Err(err) => { - eprintln!("failed to read: {:?}", err); - } - } - } - } - } } }