From 79aca371d164be5b042c49e3a8899568f4b42151 Mon Sep 17 00:00:00 2001 From: Arsenii Ronzhyn <33022971+arsenron@users.noreply.github.com> Date: Fri, 11 Aug 2023 15:14:37 +0300 Subject: [PATCH] fix(quic): add support for reusing an existing socket for local dialing Tracked in #4259. Now if a listener supports loopback interfaces, when a remote address is also a loopback address, we reuse an existing listener. Pull-Request: #4304. --- Cargo.lock | 2 +- Cargo.toml | 2 +- transports/quic/CHANGELOG.md | 7 ++++++ transports/quic/Cargo.toml | 2 +- transports/quic/src/transport.rs | 28 +++++++++++++++------ transports/quic/tests/smoke.rs | 43 ++++++++++++++++++++++++++++++++ 6 files changed, 74 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5f5026f1afc..979bc43d8d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3058,7 +3058,7 @@ dependencies = [ [[package]] name = "libp2p-quic" -version = "0.9.1-alpha" +version = "0.9.2-alpha" dependencies = [ "async-std", "bytes", diff --git a/Cargo.toml b/Cargo.toml index 6b4bd5ebe09..fb503a12e61 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -85,7 +85,7 @@ libp2p-perf = { version = "0.2.0", path = "protocols/perf" } libp2p-ping = { version = "0.43.0", path = "protocols/ping" } libp2p-plaintext = { version = "0.40.0", path = "transports/plaintext" } libp2p-pnet = { version = "0.23.0", path = "transports/pnet" } -libp2p-quic = { version = "0.9.1-alpha", path = "transports/quic" } +libp2p-quic = { version = "0.9.2-alpha", path = "transports/quic" } libp2p-relay = { version = "0.16.1", path = "protocols/relay" } libp2p-rendezvous = { version = "0.13.0", path = "protocols/rendezvous" } libp2p-request-response = { version = "0.25.1", path = "protocols/request-response" } diff --git a/transports/quic/CHANGELOG.md b/transports/quic/CHANGELOG.md index 5fc4ccc1cdf..66a6dec3a2e 100644 --- a/transports/quic/CHANGELOG.md +++ b/transports/quic/CHANGELOG.md @@ -1,3 +1,10 @@ +## 0.9.2-alpha + +- Add support for reusing an existing socket when dialing localhost address. + See [PR 4304]. + +[PR 4304]: https://github.com/libp2p/rust-libp2p/pull/4304 + ## 0.9.1-alpha - Allow listening on ipv4 and ipv6 separately. diff --git a/transports/quic/Cargo.toml b/transports/quic/Cargo.toml index 67f131aeb31..9dba8c692c5 100644 --- a/transports/quic/Cargo.toml +++ b/transports/quic/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "libp2p-quic" -version = "0.9.1-alpha" +version = "0.9.2-alpha" authors = ["Parity Technologies "] edition = "2021" rust-version = { workspace = true } diff --git a/transports/quic/src/transport.rs b/transports/quic/src/transport.rs index 7e0a4a812f7..9f025cd63fc 100644 --- a/transports/quic/src/transport.rs +++ b/transports/quic/src/transport.rs @@ -39,7 +39,7 @@ use libp2p_core::{ use libp2p_identity::PeerId; use socket2::{Domain, Socket, Type}; use std::collections::hash_map::{DefaultHasher, Entry}; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::hash::{Hash, Hasher}; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, UdpSocket}; use std::time::Duration; @@ -155,9 +155,16 @@ impl GenTransport

{ if l.is_closed { return false; } - let listen_addr = l.socket_addr(); - SocketFamily::is_same(&listen_addr.ip(), &socket_addr.ip()) - && listen_addr.ip().is_loopback() == socket_addr.ip().is_loopback() + SocketFamily::is_same(&l.socket_addr().ip(), &socket_addr.ip()) + }) + .filter(|l| { + if socket_addr.ip().is_loopback() { + l.listening_addresses + .iter() + .any(|ip_addr| ip_addr.is_loopback()) + } else { + true + } }) .collect(); match listeners.len() { @@ -428,6 +435,8 @@ struct Listener { /// The stream must be awaken after it has been closed to deliver the last event. close_listener_waker: Option, + + listening_addresses: HashSet, } impl Listener

{ @@ -440,12 +449,14 @@ impl Listener

{ ) -> Result { let if_watcher; let pending_event; + let mut listening_addresses = HashSet::new(); let local_addr = socket.local_addr()?; if local_addr.ip().is_unspecified() { if_watcher = Some(P::new_if_watcher()?); pending_event = None; } else { if_watcher = None; + listening_addresses.insert(local_addr.ip()); let ma = socketaddr_to_multiaddr(&local_addr, version); pending_event = Some(TransportEvent::NewAddress { listener_id, @@ -467,6 +478,7 @@ impl Listener

{ is_closed: false, pending_event, close_listener_waker: None, + listening_addresses, }) } @@ -513,7 +525,8 @@ impl Listener

{ if let Some(listen_addr) = ip_to_listenaddr(&endpoint_addr, inet.addr(), self.version) { - log::debug!("New listen address: {}", listen_addr); + log::debug!("New listen address: {listen_addr}"); + self.listening_addresses.insert(inet.addr()); return Poll::Ready(TransportEvent::NewAddress { listener_id: self.listener_id, listen_addr, @@ -524,7 +537,8 @@ impl Listener

{ if let Some(listen_addr) = ip_to_listenaddr(&endpoint_addr, inet.addr(), self.version) { - log::debug!("Expired listen address: {}", listen_addr); + log::debug!("Expired listen address: {listen_addr}"); + self.listening_addresses.remove(&inet.addr()); return Poll::Ready(TransportEvent::AddressExpired { listener_id: self.listener_id, listen_addr, @@ -730,7 +744,7 @@ fn socketaddr_to_multiaddr(socket_addr: &SocketAddr, version: ProtocolVersion) - #[cfg(test)] #[cfg(any(feature = "async-std", feature = "tokio"))] -mod test { +mod tests { use futures::future::poll_fn; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; diff --git a/transports/quic/tests/smoke.rs b/transports/quic/tests/smoke.rs index 93adfa68013..a0c43b9ebdd 100644 --- a/transports/quic/tests/smoke.rs +++ b/transports/quic/tests/smoke.rs @@ -414,6 +414,49 @@ async fn write_after_peer_dropped_stream() { stream_b.close().await.expect("Close failed."); } +/// - A listens on 0.0.0.0:0 +/// - B listens on 127.0.0.1:0 +/// - A dials B +/// - Source port of A at B is the A's listen port +#[cfg(feature = "tokio")] +#[tokio::test] +async fn test_local_listener_reuse() { + let (_, mut a_transport) = create_default_transport::(); + let (_, mut b_transport) = create_default_transport::(); + + a_transport + .listen_on( + ListenerId::next(), + "/ip4/0.0.0.0/udp/0/quic-v1".parse().unwrap(), + ) + .unwrap(); + + // wait until a listener reports a loopback address + let a_listen_addr = 'outer: loop { + let ev = a_transport.next().await.unwrap(); + let listen_addr = ev.into_new_address().unwrap(); + for proto in listen_addr.iter() { + if let Protocol::Ip4(ip4) = proto { + if ip4.is_loopback() { + break 'outer listen_addr; + } + } + } + }; + // If we do not poll until the end, `NewAddress` events may be `Ready` and `connect` function + // below will panic due to an unexpected event. + poll_fn(|cx| { + let mut pinned = Pin::new(&mut a_transport); + while pinned.as_mut().poll(cx).is_ready() {} + Poll::Ready(()) + }) + .await; + + let b_addr = start_listening(&mut b_transport, "/ip4/127.0.0.1/udp/0/quic-v1").await; + let (_, send_back_addr, _) = connect(&mut b_transport, &mut a_transport, b_addr).await.0; + assert_eq!(send_back_addr, a_listen_addr); +} + async fn smoke() { let _ = env_logger::try_init();