diff --git a/Cargo.lock b/Cargo.lock index 61095bb524e..72b8ffed191 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2520,6 +2520,7 @@ dependencies = [ "libp2p-tcp", "libp2p-yamux", "log", + "lru 0.11.1", "quick-protobuf", "quick-protobuf-codec", "rand 0.8.5", @@ -2632,7 +2633,7 @@ dependencies = [ "libp2p-swarm", "libp2p-swarm-test", "log", - "lru", + "lru 0.12.0", "quick-protobuf", "quick-protobuf-codec", "smallvec", @@ -3412,6 +3413,15 @@ dependencies = [ "value-bag", ] +[[package]] +name = "lru" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4a83fb7698b3643a0e34f9ae6f2e8f0178c0fd42f8b59d493aa271ff3a5bf21" +dependencies = [ + "hashbrown 0.14.0", +] + [[package]] name = "lru" version = "0.12.0" diff --git a/examples/dcutr/src/main.rs b/examples/dcutr/src/main.rs index f9ddb1e2ef1..6a87e351e02 100644 --- a/examples/dcutr/src/main.rs +++ b/examples/dcutr/src/main.rs @@ -156,8 +156,7 @@ async fn main() -> Result<(), Box> { info: identify::Info { observed_addr, .. }, .. })) => { - info!("Relay told us our public address: {:?}", observed_addr); - swarm.add_external_address(observed_addr); + info!("Relay told us our observed address: {observed_addr}"); learned_observed_addr = true; } event => panic!("{event:?}"), diff --git a/hole-punching-tests/src/main.rs b/hole-punching-tests/src/main.rs index 4bd7312df4b..5e86ecf24b8 100644 --- a/hole-punching-tests/src/main.rs +++ b/hole-punching-tests/src/main.rs @@ -211,7 +211,6 @@ async fn client_connect_to_relay( .. })) => { log::info!("Relay told us our public address: {observed_addr}"); - swarm.add_external_address(observed_addr); break; } SwarmEvent::ConnectionEstablished { connection_id, .. } diff --git a/protocols/dcutr/CHANGELOG.md b/protocols/dcutr/CHANGELOG.md index fc9fddb9fe0..179db86dff2 100644 --- a/protocols/dcutr/CHANGELOG.md +++ b/protocols/dcutr/CHANGELOG.md @@ -5,6 +5,10 @@ [PR 4558]: https://github.com/libp2p/rust-libp2p/pull/4558 +- Exchange address _candidates_ instead of external addresses in `CONNECT`. + If hole-punching wasn't working properly for you until now, this might be the reason why. + See [PR 4624](https://github.com/libp2p/rust-libp2p/pull/4624). + ## 0.10.0 - Raise MSRV to 1.65. diff --git a/protocols/dcutr/Cargo.toml b/protocols/dcutr/Cargo.toml index 737d8aede70..ce038b4b5b7 100644 --- a/protocols/dcutr/Cargo.toml +++ b/protocols/dcutr/Cargo.toml @@ -24,6 +24,7 @@ quick-protobuf = "0.8" quick-protobuf-codec = { workspace = true } thiserror = "1.0" void = "1" +lru = "0.11.1" [dev-dependencies] async-std = { version = "1.12.0", features = ["attributes"] } diff --git a/protocols/dcutr/src/behaviour_impl.rs b/protocols/dcutr/src/behaviour.rs similarity index 88% rename from protocols/dcutr/src/behaviour_impl.rs rename to protocols/dcutr/src/behaviour.rs index e7ecdd3c6ad..72b30421346 100644 --- a/protocols/dcutr/src/behaviour_impl.rs +++ b/protocols/dcutr/src/behaviour.rs @@ -29,13 +29,13 @@ use libp2p_identity::PeerId; use libp2p_swarm::behaviour::{ConnectionClosed, DialFailure, FromSwarm}; use libp2p_swarm::dial_opts::{self, DialOpts}; use libp2p_swarm::{ - dummy, ConnectionDenied, ConnectionHandler, ConnectionId, THandler, THandlerOutEvent, -}; -use libp2p_swarm::{ - ExternalAddresses, NetworkBehaviour, NotifyHandler, StreamUpgradeError, THandlerInEvent, - ToSwarm, + dummy, ConnectionDenied, ConnectionHandler, ConnectionId, NewExternalAddrCandidate, THandler, + THandlerOutEvent, }; +use libp2p_swarm::{NetworkBehaviour, NotifyHandler, StreamUpgradeError, THandlerInEvent, ToSwarm}; +use lru::LruCache; use std::collections::{HashMap, HashSet, VecDeque}; +use std::num::NonZeroUsize; use std::task::{Context, Poll}; use thiserror::Error; use void::Void; @@ -79,9 +79,7 @@ pub struct Behaviour { /// All direct (non-relayed) connections. direct_connections: HashMap>, - external_addresses: ExternalAddresses, - - local_peer_id: PeerId, + address_candidates: Candidates, direct_to_relayed_connections: HashMap, @@ -95,20 +93,14 @@ impl Behaviour { Behaviour { queued_events: Default::default(), direct_connections: Default::default(), - external_addresses: Default::default(), - local_peer_id, + address_candidates: Candidates::new(local_peer_id), direct_to_relayed_connections: Default::default(), outgoing_direct_connection_attempts: Default::default(), } } fn observed_addresses(&self) -> Vec { - self.external_addresses - .iter() - .filter(|a| !a.iter().any(|p| p == Protocol::P2pCircuit)) - .cloned() - .map(|a| a.with(Protocol::P2p(self.local_peer_id))) - .collect() + self.address_candidates.iter().cloned().collect() } fn on_dial_failure( @@ -359,13 +351,14 @@ impl NetworkBehaviour for Behaviour { } fn on_swarm_event(&mut self, event: FromSwarm) { - self.external_addresses.on_swarm_event(&event); - match event { FromSwarm::ConnectionClosed(connection_closed) => { self.on_connection_closed(connection_closed) } FromSwarm::DialFailure(dial_failure) => self.on_dial_failure(dial_failure), + FromSwarm::NewExternalAddrCandidate(NewExternalAddrCandidate { addr }) => { + self.address_candidates.add(addr.clone()); + } FromSwarm::AddressChange(_) | FromSwarm::ConnectionEstablished(_) | FromSwarm::ListenFailure(_) @@ -374,13 +367,48 @@ impl NetworkBehaviour for Behaviour { | FromSwarm::ExpiredListenAddr(_) | FromSwarm::ListenerError(_) | FromSwarm::ListenerClosed(_) - | FromSwarm::NewExternalAddrCandidate(_) | FromSwarm::ExternalAddrExpired(_) | FromSwarm::ExternalAddrConfirmed(_) => {} } } } +/// Stores our address candidates. +/// +/// We use an [`LruCache`] to favor addresses that are reported more often. +/// When attempting a hole-punch, we will try more frequent addresses first. +/// Most of these addresses will come from observations by other nodes (via e.g. the identify protocol). +/// More common observations mean a more likely stable port-mapping and thus a higher chance of a successful hole-punch. +struct Candidates { + inner: LruCache, + me: PeerId, +} + +impl Candidates { + fn new(me: PeerId) -> Self { + Self { + inner: LruCache::new(NonZeroUsize::new(20).expect("20 > 0")), + me, + } + } + + fn add(&mut self, mut address: Multiaddr) { + if is_relayed(&address) { + return; + } + + if address.iter().last() != Some(Protocol::P2p(self.me)) { + address.push(Protocol::P2p(self.me)); + } + + self.inner.push(address, ()); + } + + fn iter(&self) -> impl Iterator { + self.inner.iter().map(|(a, _)| a) + } +} + fn is_relayed(addr: &Multiaddr) -> bool { addr.iter().any(|p| p == Protocol::P2pCircuit) } diff --git a/protocols/dcutr/src/handler/relayed.rs b/protocols/dcutr/src/handler/relayed.rs index 3b23a1a2a6b..23ab9f4ae5a 100644 --- a/protocols/dcutr/src/handler/relayed.rs +++ b/protocols/dcutr/src/handler/relayed.rs @@ -20,7 +20,7 @@ //! [`ConnectionHandler`] handling relayed connection potentially upgraded to a direct connection. -use crate::behaviour_impl::MAX_NUMBER_OF_UPGRADE_ATTEMPTS; +use crate::behaviour::MAX_NUMBER_OF_UPGRADE_ATTEMPTS; use crate::protocol; use either::Either; use futures::future; diff --git a/protocols/dcutr/src/lib.rs b/protocols/dcutr/src/lib.rs index 6001c9144e7..389365f94c5 100644 --- a/protocols/dcutr/src/lib.rs +++ b/protocols/dcutr/src/lib.rs @@ -23,7 +23,7 @@ #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] -mod behaviour_impl; // TODO: Rename back `behaviour` once deprecation symbols are removed. +mod behaviour; mod handler; mod protocol; @@ -33,9 +33,7 @@ mod proto { pub(crate) use self::holepunch::pb::{mod_HolePunch::*, HolePunch}; } -pub use behaviour_impl::Behaviour; -pub use behaviour_impl::Error; -pub use behaviour_impl::Event; +pub use behaviour::{Behaviour, Error, Event}; pub use protocol::PROTOCOL_NAME; pub mod inbound { pub use crate::protocol::inbound::UpgradeError; diff --git a/protocols/dcutr/tests/lib.rs b/protocols/dcutr/tests/lib.rs index 7c732f90173..6078b101fa2 100644 --- a/protocols/dcutr/tests/lib.rs +++ b/protocols/dcutr/tests/lib.rs @@ -22,6 +22,7 @@ use libp2p_core::multiaddr::{Multiaddr, Protocol}; use libp2p_core::transport::upgrade::Version; use libp2p_core::transport::{MemoryTransport, Transport}; use libp2p_dcutr as dcutr; +use libp2p_identify as identify; use libp2p_identity as identity; use libp2p_identity::PeerId; use libp2p_plaintext as plaintext; @@ -38,10 +39,19 @@ async fn connect() { let mut dst = build_client(); let mut src = build_client(); - // Have all swarms listen on a local memory address. - let (relay_addr, _) = relay.listen().await; - let (dst_addr, _) = dst.listen().await; - src.listen().await; + // Have all swarms listen on a local TCP address. + let (memory_addr, relay_addr) = relay.listen().await; + relay.remove_external_address(&memory_addr); + relay.add_external_address(relay_addr.clone()); + + let (dst_mem_addr, dst_tcp_addr) = dst.listen().await; + let (src_mem_addr, _) = src.listen().await; + + dst.remove_external_address(&dst_mem_addr); + src.remove_external_address(&src_mem_addr); + + assert!(src.external_addresses().next().is_none()); + assert!(dst.external_addresses().next().is_none()); let relay_peer_id = *relay.local_peer_id(); let dst_peer_id = *dst.local_peer_id(); @@ -80,11 +90,12 @@ async fn connect() { break; } } + ClientEvent::Identify(_) => {} other => panic!("Unexpected event: {other:?}."), } } - let dst_addr = dst_addr.with(Protocol::P2p(dst_peer_id)); + let dst_addr = dst_tcp_addr.with(Protocol::P2p(dst_peer_id)); let established_conn_id = src .wait(move |e| match e { @@ -109,20 +120,33 @@ async fn connect() { assert_eq!(established_conn_id, reported_conn_id); } -fn build_relay() -> Swarm { +fn build_relay() -> Swarm { Swarm::new_ephemeral(|identity| { let local_peer_id = identity.public().to_peer_id(); - relay::Behaviour::new( - local_peer_id, - relay::Config { - reservation_duration: Duration::from_secs(2), - ..Default::default() - }, - ) + Relay { + relay: relay::Behaviour::new( + local_peer_id, + relay::Config { + reservation_duration: Duration::from_secs(2), + ..Default::default() + }, + ), + identify: identify::Behaviour::new(identify::Config::new( + "/relay".to_owned(), + identity.public(), + )), + } }) } +#[derive(NetworkBehaviour)] +#[behaviour(prelude = "libp2p_swarm::derive_prelude")] +struct Relay { + relay: relay::Behaviour, + identify: identify::Behaviour, +} + fn build_client() -> Swarm { let local_key = identity::Keypair::generate_ed25519(); let local_peer_id = local_key.public().to_peer_id(); @@ -142,6 +166,10 @@ fn build_client() -> Swarm { Client { relay: behaviour, dcutr: dcutr::Behaviour::new(local_peer_id), + identify: identify::Behaviour::new(identify::Config::new( + "/client".to_owned(), + local_key.public(), + )), }, local_peer_id, Config::with_async_std_executor(), @@ -153,6 +181,7 @@ fn build_client() -> Swarm { struct Client { relay: relay::client::Behaviour, dcutr: dcutr::Behaviour, + identify: identify::Behaviour, } async fn wait_for_reservation( @@ -163,14 +192,16 @@ async fn wait_for_reservation( ) { let mut new_listen_addr_for_relayed_addr = false; let mut reservation_req_accepted = false; + let mut addr_observed = false; + loop { + if new_listen_addr_for_relayed_addr && reservation_req_accepted && addr_observed { + break; + } + match client.next_swarm_event().await { - SwarmEvent::NewListenAddr { address, .. } if address != client_addr => {} SwarmEvent::NewListenAddr { address, .. } if address == client_addr => { new_listen_addr_for_relayed_addr = true; - if reservation_req_accepted { - break; - } } SwarmEvent::Behaviour(ClientEvent::Relay( relay::client::Event::ReservationReqAccepted { @@ -180,15 +211,16 @@ async fn wait_for_reservation( }, )) if relay_peer_id == peer_id && renewal == is_renewal => { reservation_req_accepted = true; - if new_listen_addr_for_relayed_addr { - break; - } } SwarmEvent::Dialing { peer_id: Some(peer_id), .. } if peer_id == relay_peer_id => {} SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == relay_peer_id => {} + SwarmEvent::Behaviour(ClientEvent::Identify(identify::Event::Received { .. })) => { + addr_observed = true; + } + SwarmEvent::Behaviour(ClientEvent::Identify(_)) => {} e => panic!("{e:?}"), } }