Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(swarm): allow behaviours to share addresses of peers #4371

Merged
merged 28 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
1026947
swarm: Add NewExternalAddrOfPeer
StemCll Nov 28, 2023
c6a323c
deprecate request-response add_address
StemCll Nov 29, 2023
03fd927
swarm: add peer addresses
StemCll Nov 29, 2023
6110643
Move peer cache to peer addresses
StemCll Dec 10, 2023
3f227c7
Add pop function to peer address
StemCll Dec 10, 2023
959a131
Add CR changes
StemCll Dec 11, 2023
c306434
Remove public get_mut from peer addresses
StemCll Dec 14, 2023
6619f0d
identify: emit when identified new peer; add changelog entries and v…
StemCll Dec 17, 2023
1bda774
Revert version changes
StemCll Dec 25, 2023
32076ae
Extend swarm changelog entry; fix peer_addresses put logic
StemCll Dec 26, 2023
2277bd6
Add additional test for remove
StemCll Dec 26, 2023
8e1dc1e
Add connection test to request response
StemCll Dec 27, 2023
056d51e
Rever toml file format changes
StemCll Dec 27, 2023
b8f8bc5
Documentation fixes
thomaseizinger Dec 27, 2023
f5f162c
Update protocols/identify/CHANGELOG.md
thomaseizinger Dec 27, 2023
ff81f51
Revert autonat version bump
StemCll Dec 27, 2023
496c185
Rename put to add in PeerAddresses
StemCll Jan 1, 2024
db18158
Don't filter new external addresses
StemCll Jan 1, 2024
7fa87fb
Fix clippy warn
StemCll Jan 1, 2024
af0a91c
Merge branch 'master' into feat/swarm/report_remote_address
StemCll Jan 2, 2024
41b533e
Merge branch 'master' into feat/swarm/report_remote_address
StemCll Jan 17, 2024
31723f2
Review suggestions resolved
StemCll Jan 18, 2024
5059d51
Merge branch 'master' into feat/swarm/report_remote_address
StemCll Jan 20, 2024
da6294b
Re-work `PeerAddresses` to be limited in how many addresses we store
thomaseizinger Jan 21, 2024
6fae501
Allow new event in dctur test
thomaseizinger Jan 21, 2024
1866abd
Fix clippy warnings
thomaseizinger Jan 21, 2024
b1bedf7
Merge branch 'master' into feat/swarm/report_remote_address
thomaseizinger Jan 24, 2024
49d53d1
Merge branch 'master' into feat/swarm/report_remote_address
mergify[bot] Jan 24, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ libp2p-dcutr = { version = "0.11.0", path = "protocols/dcutr" }
libp2p-dns = { version = "0.41.1", path = "transports/dns" }
libp2p-floodsub = { version = "0.44.0", path = "protocols/floodsub" }
libp2p-gossipsub = { version = "0.46.1", path = "protocols/gossipsub" }
libp2p-identify = { version = "0.44.1", path = "protocols/identify" }
libp2p-identify = { version = "0.44.2", path = "protocols/identify" }
libp2p-identity = { version = "0.2.8" }
libp2p-kad = { version = "0.45.3", path = "protocols/kad" }
libp2p-mdns = { version = "0.45.1", path = "protocols/mdns" }
Expand All @@ -99,11 +99,11 @@ libp2p-pnet = { version = "0.24.0", path = "transports/pnet" }
libp2p-quic = { version = "0.10.2", path = "transports/quic" }
libp2p-relay = { version = "0.17.1", path = "protocols/relay" }
libp2p-rendezvous = { version = "0.14.0", path = "protocols/rendezvous" }
libp2p-request-response = { version = "0.26.1", path = "protocols/request-response" }
libp2p-request-response = { version = "0.26.2", path = "protocols/request-response" }
libp2p-server = { version = "0.12.5", path = "misc/server" }
libp2p-stream = { version = "0.1.0-alpha", path = "protocols/stream" }
libp2p-swarm = { version = "0.44.1", path = "swarm" }
libp2p-swarm-derive = { version = "=0.34.2", path = "swarm-derive" } # `libp2p-swarm-derive` may not be compatible with different `libp2p-swarm` non-breaking releases. E.g. `libp2p-swarm` might introduce a new enum variant `FromSwarm` (which is `#[non-exhaustive]`) in a non-breaking release. Older versions of `libp2p-swarm-derive` would not forward this enum variant within the `NetworkBehaviour` hierarchy. Thus the version pinning is required.
libp2p-swarm = { version = "0.44.2", path = "swarm" }
libp2p-swarm-derive = { version = "=0.34.3", path = "swarm-derive" } # `libp2p-swarm-derive` may not be compatible with different `libp2p-swarm` non-breaking releases. E.g. `libp2p-swarm` might introduce a new enum variant `FromSwarm` (which is `#[non-exhaustive]`) in a non-breaking release. Older versions of `libp2p-swarm-derive` would not forward this enum variant within the `NetworkBehaviour` hierarchy. Thus the version pinning is required.
libp2p-swarm-test = { version = "0.3.0", path = "swarm-test" }
libp2p-tcp = { version = "0.41.0", path = "transports/tcp" }
libp2p-tls = { version = "0.3.0", path = "transports/tls" }
Expand Down
2 changes: 1 addition & 1 deletion protocols/autonat/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ name = "libp2p-autonat"
edition = "2021"
rust-version = { workspace = true }
description = "NAT and firewall detection for libp2p"
version = "0.12.0"
authors = ["David Craven <[email protected]>", "Elena Frank <[email protected]>"]
version = "0.12.0"
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
keywords = ["peer-to-peer", "libp2p", "networking"]
Expand Down
1 change: 1 addition & 0 deletions protocols/autonat/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ impl Behaviour {
pub fn add_server(&mut self, peer: PeerId, address: Option<Multiaddr>) {
self.servers.insert(peer);
if let Some(addr) = address {
#[allow(deprecated)]
StemCll marked this conversation as resolved.
Show resolved Hide resolved
self.inner.add_address(&peer, addr);
}
}
Expand Down
1 change: 1 addition & 0 deletions protocols/dcutr/tests/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ async fn wait_for_reservation(
SwarmEvent::ExternalAddrConfirmed { address } if !is_renewal => {
assert_eq!(address, client_addr);
}
SwarmEvent::NewExternalAddrOfPeer { .. } => {}
e => panic!("{e:?}"),
}
}
Expand Down
7 changes: 7 additions & 0 deletions protocols/identify/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
## 0.44.2

- Emit `ToSwarm::NewExternalAddrOfPeer` for all external addresses of remote peers.
For this work, the address cache must be enabled via `identify::Config::with_cache_size`.
The default is 0, i.e. disabled.
See [PR 4371](https://github.com/libp2p/rust-libp2p/pull/4371).

## 0.44.1

- Ensure `Multiaddr` handled and returned by `Behaviour` are `/p2p` terminated.
Expand Down
2 changes: 1 addition & 1 deletion protocols/identify/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "libp2p-identify"
edition = "2021"
rust-version = { workspace = true }
description = "Nodes identifcation protocol for libp2p"
version = "0.44.1"
version = "0.44.2"
authors = ["Parity Technologies <[email protected]>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand Down
72 changes: 31 additions & 41 deletions protocols/identify/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,14 @@ use libp2p_identity::PublicKey;
use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm};
use libp2p_swarm::{
ConnectionDenied, DialError, ExternalAddresses, ListenAddresses, NetworkBehaviour,
NotifyHandler, StreamUpgradeError, THandlerInEvent, ToSwarm,
NotifyHandler, PeerAddresses, StreamUpgradeError, THandlerInEvent, ToSwarm,
};
use libp2p_swarm::{ConnectionId, THandler, THandlerOutEvent};
use lru::LruCache;

use std::collections::hash_map::Entry;
use std::num::NonZeroUsize;
use std::{
collections::{HashMap, HashSet, VecDeque},
iter::FromIterator,
task::Context,
task::Poll,
time::Duration,
Expand Down Expand Up @@ -200,9 +199,9 @@ impl Behaviour {
.or_default()
.insert(conn, addr);

if let Some(entry) = self.discovered_peers.get_mut(&peer_id) {
if let Some(cache) = self.discovered_peers.0.as_mut() {
for addr in failed_addresses {
entry.remove(addr);
cache.remove(&peer_id, addr);
}
}
}
Expand Down Expand Up @@ -268,13 +267,23 @@ impl NetworkBehaviour for Behaviour {
info.listen_addrs
.retain(|addr| multiaddr_matches_peer_id(addr, &peer_id));

// Replace existing addresses to prevent other peer from filling up our memory.
self.discovered_peers
.put(peer_id, info.listen_addrs.iter().cloned());

let observed = info.observed_addr.clone();
self.events
.push_back(ToSwarm::GenerateEvent(Event::Received { peer_id, info }));
.push_back(ToSwarm::GenerateEvent(Event::Received {
peer_id,
info: info.clone(),
}));

if let Some(ref mut discovered_peers) = self.discovered_peers.0 {
for address in &info.listen_addrs {
if discovered_peers.add(peer_id, address.clone()) {
self.events.push_back(ToSwarm::NewExternalAddrOfPeer {
peer_id,
address: address.clone(),
});
}
}
}

match self.our_observed_addresses.entry(id) {
Entry::Vacant(not_yet_observed) => {
Expand Down Expand Up @@ -387,11 +396,11 @@ impl NetworkBehaviour for Behaviour {
self.our_observed_addresses.remove(&connection_id);
}
FromSwarm::DialFailure(DialFailure { peer_id, error, .. }) => {
if let Some(entry) = peer_id.and_then(|id| self.discovered_peers.get_mut(&id)) {
if let DialError::Transport(errors) = error {
for (addr, _error) in errors {
entry.remove(addr);
}
if let (Some(peer_id), Some(cache), DialError::Transport(errors)) =
(peer_id, self.discovered_peers.0.as_mut(), error)
{
for (addr, _error) in errors {
cache.remove(&peer_id, addr);
}
}
}
Expand Down Expand Up @@ -445,42 +454,23 @@ fn multiaddr_matches_peer_id(addr: &Multiaddr, peer_id: &PeerId) -> bool {
true
}

struct PeerCache(Option<LruCache<PeerId, HashSet<Multiaddr>>>);
struct PeerCache(Option<PeerAddresses>);

impl PeerCache {
fn disabled() -> Self {
Self(None)
}

fn enabled(size: NonZeroUsize) -> Self {
Self(Some(LruCache::new(size)))
}

fn get_mut(&mut self, peer: &PeerId) -> Option<&mut HashSet<Multiaddr>> {
self.0.as_mut()?.get_mut(peer)
}

fn put(&mut self, peer: PeerId, addresses: impl Iterator<Item = Multiaddr>) {
let cache = match self.0.as_mut() {
None => return,
Some(cache) => cache,
};
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved

let addresses = addresses.filter_map(|a| a.with_p2p(peer).ok());
cache.put(peer, HashSet::from_iter(addresses));
Self(Some(PeerAddresses::new(size)))
}

fn get(&mut self, peer: &PeerId) -> Vec<Multiaddr> {
let cache = match self.0.as_mut() {
None => return Vec::new(),
Some(cache) => cache,
};

cache
.get(peer)
.cloned()
.map(Vec::from_iter)
.unwrap_or_default()
if let Some(cache) = self.0.as_mut() {
cache.get(peer).collect()
} else {
Vec::new()
}
}
}

Expand Down
74 changes: 73 additions & 1 deletion protocols/identify/tests/smoke.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ async fn only_emits_address_candidate_once_per_connection() {
async_std::task::spawn(swarm2.loop_on_next());

let swarm_events = futures::stream::poll_fn(|cx| swarm1.poll_next_unpin(cx))
.take(5)
.take(8)
.collect::<Vec<_>>()
.await;

Expand Down Expand Up @@ -156,6 +156,78 @@ async fn only_emits_address_candidate_once_per_connection() {
);
}

#[async_std::test]
async fn emits_unique_listen_addresses() {
let _ = tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.try_init();

let mut swarm1 = Swarm::new_ephemeral(|identity| {
identify::Behaviour::new(
identify::Config::new("a".to_string(), identity.public())
.with_agent_version("b".to_string())
.with_interval(Duration::from_secs(1))
.with_cache_size(10),
)
});
let mut swarm2 = Swarm::new_ephemeral(|identity| {
identify::Behaviour::new(
identify::Config::new("c".to_string(), identity.public())
.with_agent_version("d".to_string()),
)
});

let (swarm2_mem_listen_addr, swarm2_tcp_listen_addr) =
swarm2.listen().with_memory_addr_external().await;
let swarm2_peer_id = *swarm2.local_peer_id();
swarm1.connect(&mut swarm2).await;

async_std::task::spawn(swarm2.loop_on_next());

let swarm_events = futures::stream::poll_fn(|cx| swarm1.poll_next_unpin(cx))
.take(8)
.collect::<Vec<_>>()
.await;

let infos = swarm_events
.iter()
.filter_map(|e| match e {
SwarmEvent::Behaviour(identify::Event::Received { info, .. }) => Some(info.clone()),
_ => None,
})
.collect::<Vec<_>>();

assert!(
infos.len() > 1,
"should exchange identify payload more than once"
);

let listen_addrs = infos
.iter()
.map(|i| i.listen_addrs.clone())
.collect::<Vec<_>>();

for addrs in listen_addrs {
assert_eq!(addrs.len(), 2);
assert!(addrs.contains(&swarm2_mem_listen_addr));
assert!(addrs.contains(&swarm2_tcp_listen_addr));
}

let reported_addrs = swarm_events
.iter()
.filter_map(|e| match e {
SwarmEvent::NewExternalAddrOfPeer { peer_id, address } => {
Some((*peer_id, address.clone()))
}
_ => None,
})
.collect::<Vec<_>>();

assert_eq!(reported_addrs.len(), 2, "To have two addresses of remote");
assert!(reported_addrs.contains(&(swarm2_peer_id, swarm2_mem_listen_addr)));
assert!(reported_addrs.contains(&(swarm2_peer_id, swarm2_tcp_listen_addr)));
}

#[async_std::test]
async fn identify_push() {
let _ = tracing_subscriber::fmt()
Expand Down
5 changes: 5 additions & 0 deletions protocols/request-response/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 0.26.2

- Deprecate `Behaviour::add_address` in favor of `Swarm::add_peer_address`.
See [PR 4371](https://github.com/libp2p/rust-libp2p/pull/4371).

## 0.26.1

- Derive `PartialOrd` and `Ord` for `{Out,In}boundRequestId`.
Expand Down
4 changes: 2 additions & 2 deletions protocols/request-response/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "libp2p-request-response"
edition = "2021"
rust-version = { workspace = true }
description = "Generic Request/Response Protocols"
version = "0.26.1"
version = "0.26.2"
authors = ["Parity Technologies <[email protected]>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand Down Expand Up @@ -40,7 +40,7 @@ libp2p-yamux = { workspace = true }
rand = "0.8"
libp2p-swarm-test = { path = "../../swarm-test" }
futures_ringbuf = "0.4.0"
serde = { version = "1.0", features = ["derive"]}
serde = { version = "1.0", features = ["derive"] }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

# Passing arguments to the docsrs builder in order to properly document cfg's.
Expand Down
Loading