Skip to content

Commit

Permalink
fix(dcutr): exchange address candidates
Browse files Browse the repository at this point in the history
In a serious of refactorings, we seem to have introduced a bug where we where exchanged the _external_ addresses of our node as part of `libp2p-dcutr`. This is ironically quite pointless. If we have external addresses, then there is no need for hole-punching (i.e. DCUtR).

Instead of gathering external addresses, we use an LRU cache to store our observed addresses. Repeatedly observed addresses will be tried first which should increase the success rate of a hole-punch.

Resolves: #4496.

Pull-Request: #4624.
  • Loading branch information
thomaseizinger authored Oct 26, 2023
1 parent 5b1cc3b commit fc6efaf
Show file tree
Hide file tree
Showing 9 changed files with 119 additions and 48 deletions.
12 changes: 11 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions examples/dcutr/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
info: identify::Info { observed_addr, .. },
..
})) => {
info!("Relay told us our public address: {:?}", observed_addr);
swarm.add_external_address(observed_addr);
info!("Relay told us our observed address: {observed_addr}");
learned_observed_addr = true;
}
event => panic!("{event:?}"),
Expand Down
1 change: 0 additions & 1 deletion hole-punching-tests/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,6 @@ async fn client_connect_to_relay(
..
})) => {
log::info!("Relay told us our public address: {observed_addr}");
swarm.add_external_address(observed_addr);
break;
}
SwarmEvent::ConnectionEstablished { connection_id, .. }
Expand Down
4 changes: 4 additions & 0 deletions protocols/dcutr/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@

[PR 4558]: https://github.com/libp2p/rust-libp2p/pull/4558

- Exchange address _candidates_ instead of external addresses in `CONNECT`.
If hole-punching wasn't working properly for you until now, this might be the reason why.
See [PR 4624](https://github.com/libp2p/rust-libp2p/pull/4624).

## 0.10.0

- Raise MSRV to 1.65.
Expand Down
1 change: 1 addition & 0 deletions protocols/dcutr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ quick-protobuf = "0.8"
quick-protobuf-codec = { workspace = true }
thiserror = "1.0"
void = "1"
lru = "0.11.1"

[dev-dependencies]
async-std = { version = "1.12.0", features = ["attributes"] }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ use libp2p_identity::PeerId;
use libp2p_swarm::behaviour::{ConnectionClosed, DialFailure, FromSwarm};
use libp2p_swarm::dial_opts::{self, DialOpts};
use libp2p_swarm::{
dummy, ConnectionDenied, ConnectionHandler, ConnectionId, THandler, THandlerOutEvent,
};
use libp2p_swarm::{
ExternalAddresses, NetworkBehaviour, NotifyHandler, StreamUpgradeError, THandlerInEvent,
ToSwarm,
dummy, ConnectionDenied, ConnectionHandler, ConnectionId, NewExternalAddrCandidate, THandler,
THandlerOutEvent,
};
use libp2p_swarm::{NetworkBehaviour, NotifyHandler, StreamUpgradeError, THandlerInEvent, ToSwarm};
use lru::LruCache;
use std::collections::{HashMap, HashSet, VecDeque};
use std::num::NonZeroUsize;
use std::task::{Context, Poll};
use thiserror::Error;
use void::Void;
Expand Down Expand Up @@ -79,9 +79,7 @@ pub struct Behaviour {
/// All direct (non-relayed) connections.
direct_connections: HashMap<PeerId, HashSet<ConnectionId>>,

external_addresses: ExternalAddresses,

local_peer_id: PeerId,
address_candidates: Candidates,

direct_to_relayed_connections: HashMap<ConnectionId, ConnectionId>,

Expand All @@ -95,20 +93,14 @@ impl Behaviour {
Behaviour {
queued_events: Default::default(),
direct_connections: Default::default(),
external_addresses: Default::default(),
local_peer_id,
address_candidates: Candidates::new(local_peer_id),
direct_to_relayed_connections: Default::default(),
outgoing_direct_connection_attempts: Default::default(),
}
}

fn observed_addresses(&self) -> Vec<Multiaddr> {
self.external_addresses
.iter()
.filter(|a| !a.iter().any(|p| p == Protocol::P2pCircuit))
.cloned()
.map(|a| a.with(Protocol::P2p(self.local_peer_id)))
.collect()
self.address_candidates.iter().cloned().collect()
}

fn on_dial_failure(
Expand Down Expand Up @@ -359,13 +351,14 @@ impl NetworkBehaviour for Behaviour {
}

fn on_swarm_event(&mut self, event: FromSwarm) {
self.external_addresses.on_swarm_event(&event);

match event {
FromSwarm::ConnectionClosed(connection_closed) => {
self.on_connection_closed(connection_closed)
}
FromSwarm::DialFailure(dial_failure) => self.on_dial_failure(dial_failure),
FromSwarm::NewExternalAddrCandidate(NewExternalAddrCandidate { addr }) => {
self.address_candidates.add(addr.clone());
}
FromSwarm::AddressChange(_)
| FromSwarm::ConnectionEstablished(_)
| FromSwarm::ListenFailure(_)
Expand All @@ -374,13 +367,48 @@ impl NetworkBehaviour for Behaviour {
| FromSwarm::ExpiredListenAddr(_)
| FromSwarm::ListenerError(_)
| FromSwarm::ListenerClosed(_)
| FromSwarm::NewExternalAddrCandidate(_)
| FromSwarm::ExternalAddrExpired(_)
| FromSwarm::ExternalAddrConfirmed(_) => {}
}
}
}

/// Stores our address candidates.
///
/// We use an [`LruCache`] to favor addresses that are reported more often.
/// When attempting a hole-punch, we will try more frequent addresses first.
/// Most of these addresses will come from observations by other nodes (via e.g. the identify protocol).
/// More common observations mean a more likely stable port-mapping and thus a higher chance of a successful hole-punch.
struct Candidates {
inner: LruCache<Multiaddr, ()>,
me: PeerId,
}

impl Candidates {
fn new(me: PeerId) -> Self {
Self {
inner: LruCache::new(NonZeroUsize::new(20).expect("20 > 0")),
me,
}
}

fn add(&mut self, mut address: Multiaddr) {
if is_relayed(&address) {
return;
}

if address.iter().last() != Some(Protocol::P2p(self.me)) {
address.push(Protocol::P2p(self.me));
}

self.inner.push(address, ());
}

fn iter(&self) -> impl Iterator<Item = &Multiaddr> {
self.inner.iter().map(|(a, _)| a)
}
}

fn is_relayed(addr: &Multiaddr) -> bool {
addr.iter().any(|p| p == Protocol::P2pCircuit)
}
2 changes: 1 addition & 1 deletion protocols/dcutr/src/handler/relayed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

//! [`ConnectionHandler`] handling relayed connection potentially upgraded to a direct connection.
use crate::behaviour_impl::MAX_NUMBER_OF_UPGRADE_ATTEMPTS;
use crate::behaviour::MAX_NUMBER_OF_UPGRADE_ATTEMPTS;
use crate::protocol;
use either::Either;
use futures::future;
Expand Down
6 changes: 2 additions & 4 deletions protocols/dcutr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]

mod behaviour_impl; // TODO: Rename back `behaviour` once deprecation symbols are removed.
mod behaviour;
mod handler;
mod protocol;

Expand All @@ -33,9 +33,7 @@ mod proto {
pub(crate) use self::holepunch::pb::{mod_HolePunch::*, HolePunch};
}

pub use behaviour_impl::Behaviour;
pub use behaviour_impl::Error;
pub use behaviour_impl::Event;
pub use behaviour::{Behaviour, Error, Event};
pub use protocol::PROTOCOL_NAME;
pub mod inbound {
pub use crate::protocol::inbound::UpgradeError;
Expand Down
72 changes: 52 additions & 20 deletions protocols/dcutr/tests/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use libp2p_core::multiaddr::{Multiaddr, Protocol};
use libp2p_core::transport::upgrade::Version;
use libp2p_core::transport::{MemoryTransport, Transport};
use libp2p_dcutr as dcutr;
use libp2p_identify as identify;
use libp2p_identity as identity;
use libp2p_identity::PeerId;
use libp2p_plaintext as plaintext;
Expand All @@ -38,10 +39,19 @@ async fn connect() {
let mut dst = build_client();
let mut src = build_client();

// Have all swarms listen on a local memory address.
let (relay_addr, _) = relay.listen().await;
let (dst_addr, _) = dst.listen().await;
src.listen().await;
// Have all swarms listen on a local TCP address.
let (memory_addr, relay_addr) = relay.listen().await;
relay.remove_external_address(&memory_addr);
relay.add_external_address(relay_addr.clone());

let (dst_mem_addr, dst_tcp_addr) = dst.listen().await;
let (src_mem_addr, _) = src.listen().await;

dst.remove_external_address(&dst_mem_addr);
src.remove_external_address(&src_mem_addr);

assert!(src.external_addresses().next().is_none());
assert!(dst.external_addresses().next().is_none());

let relay_peer_id = *relay.local_peer_id();
let dst_peer_id = *dst.local_peer_id();
Expand Down Expand Up @@ -80,11 +90,12 @@ async fn connect() {
break;
}
}
ClientEvent::Identify(_) => {}
other => panic!("Unexpected event: {other:?}."),
}
}

let dst_addr = dst_addr.with(Protocol::P2p(dst_peer_id));
let dst_addr = dst_tcp_addr.with(Protocol::P2p(dst_peer_id));

let established_conn_id = src
.wait(move |e| match e {
Expand All @@ -109,20 +120,33 @@ async fn connect() {
assert_eq!(established_conn_id, reported_conn_id);
}

fn build_relay() -> Swarm<relay::Behaviour> {
fn build_relay() -> Swarm<Relay> {
Swarm::new_ephemeral(|identity| {
let local_peer_id = identity.public().to_peer_id();

relay::Behaviour::new(
local_peer_id,
relay::Config {
reservation_duration: Duration::from_secs(2),
..Default::default()
},
)
Relay {
relay: relay::Behaviour::new(
local_peer_id,
relay::Config {
reservation_duration: Duration::from_secs(2),
..Default::default()
},
),
identify: identify::Behaviour::new(identify::Config::new(
"/relay".to_owned(),
identity.public(),
)),
}
})
}

#[derive(NetworkBehaviour)]
#[behaviour(prelude = "libp2p_swarm::derive_prelude")]
struct Relay {
relay: relay::Behaviour,
identify: identify::Behaviour,
}

fn build_client() -> Swarm<Client> {
let local_key = identity::Keypair::generate_ed25519();
let local_peer_id = local_key.public().to_peer_id();
Expand All @@ -142,6 +166,10 @@ fn build_client() -> Swarm<Client> {
Client {
relay: behaviour,
dcutr: dcutr::Behaviour::new(local_peer_id),
identify: identify::Behaviour::new(identify::Config::new(
"/client".to_owned(),
local_key.public(),
)),
},
local_peer_id,
Config::with_async_std_executor(),
Expand All @@ -153,6 +181,7 @@ fn build_client() -> Swarm<Client> {
struct Client {
relay: relay::client::Behaviour,
dcutr: dcutr::Behaviour,
identify: identify::Behaviour,
}

async fn wait_for_reservation(
Expand All @@ -163,14 +192,16 @@ async fn wait_for_reservation(
) {
let mut new_listen_addr_for_relayed_addr = false;
let mut reservation_req_accepted = false;
let mut addr_observed = false;

loop {
if new_listen_addr_for_relayed_addr && reservation_req_accepted && addr_observed {
break;
}

match client.next_swarm_event().await {
SwarmEvent::NewListenAddr { address, .. } if address != client_addr => {}
SwarmEvent::NewListenAddr { address, .. } if address == client_addr => {
new_listen_addr_for_relayed_addr = true;
if reservation_req_accepted {
break;
}
}
SwarmEvent::Behaviour(ClientEvent::Relay(
relay::client::Event::ReservationReqAccepted {
Expand All @@ -180,15 +211,16 @@ async fn wait_for_reservation(
},
)) if relay_peer_id == peer_id && renewal == is_renewal => {
reservation_req_accepted = true;
if new_listen_addr_for_relayed_addr {
break;
}
}
SwarmEvent::Dialing {
peer_id: Some(peer_id),
..
} if peer_id == relay_peer_id => {}
SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == relay_peer_id => {}
SwarmEvent::Behaviour(ClientEvent::Identify(identify::Event::Received { .. })) => {
addr_observed = true;
}
SwarmEvent::Behaviour(ClientEvent::Identify(_)) => {}
e => panic!("{e:?}"),
}
}
Expand Down

0 comments on commit fc6efaf

Please sign in to comment.