Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core)!: make ConnectionIds globally unique #3327

Merged
merged 16 commits into from
Jan 23, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 24 additions & 11 deletions protocols/gossipsub/src/behaviour/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,15 +223,17 @@ where
}
};

#[allow(deprecated)]
gs.on_swarm_event(FromSwarm::ConnectionEstablished(ConnectionEstablished {
peer_id: peer,
connection_id: ConnectionId::new(0),
connection_id: ConnectionId::DUMMY,
endpoint: &endpoint,
failed_addresses: &[],
other_established: 0, // first connection
}));
if let Some(kind) = kind {
gs.on_connection_handler_event(peer, ConnectionId::new(1), HandlerEvent::PeerKind(kind));
#[allow(deprecated)]
gs.on_connection_handler_event(peer, ConnectionId::DUMMY, HandlerEvent::PeerKind(kind));
}
if explicit {
gs.add_explicit_peer(&peer);
Expand Down Expand Up @@ -577,9 +579,10 @@ fn test_join() {
for _ in 0..3 {
let random_peer = PeerId::random();
// inform the behaviour of a new peer
#[allow(deprecated)]
gs.on_swarm_event(FromSwarm::ConnectionEstablished(ConnectionEstablished {
peer_id: random_peer,
connection_id: ConnectionId::new(1),
connection_id: ConnectionId::DUMMY,
endpoint: &ConnectedPoint::Dialer {
address: "/ip4/127.0.0.1".parse::<Multiaddr>().unwrap(),
role_override: Endpoint::Dialer,
Expand Down Expand Up @@ -959,7 +962,10 @@ fn test_get_random_peers() {
*p,
PeerConnections {
kind: PeerKind::Gossipsubv1_1,
connections: vec![ConnectionId::new(1)],
connections: vec![
#[allow(deprecated)]
ConnectionId::DUMMY,
],
},
)
})
Expand Down Expand Up @@ -3009,7 +3015,8 @@ fn test_ignore_rpc_from_peers_below_graylist_threshold() {
//receive from p1
gs.on_connection_handler_event(
p1,
ConnectionId::new(0),
#[allow(deprecated)]
ConnectionId::DUMMY,
HandlerEvent::Message {
rpc: GossipsubRpc {
messages: vec![raw_message1],
Expand All @@ -3035,7 +3042,8 @@ fn test_ignore_rpc_from_peers_below_graylist_threshold() {
//receive from p2
gs.on_connection_handler_event(
p2,
ConnectionId::new(0),
#[allow(deprecated)]
ConnectionId::DUMMY,
HandlerEvent::Message {
rpc: GossipsubRpc {
messages: vec![raw_message3],
Expand Down Expand Up @@ -3647,7 +3655,8 @@ fn test_scoring_p4_invalid_signature() {

gs.on_connection_handler_event(
peers[0],
ConnectionId::new(0),
#[allow(deprecated)]
ConnectionId::DUMMY,
HandlerEvent::Message {
rpc: GossipsubRpc {
messages: vec![],
Expand Down Expand Up @@ -4128,10 +4137,11 @@ fn test_scoring_p6() {
}

//add additional connection for 3 others with addr
#[allow(deprecated)]
for id in others.iter().take(3) {
gs.on_swarm_event(FromSwarm::ConnectionEstablished(ConnectionEstablished {
peer_id: *id,
connection_id: ConnectionId::new(0),
connection_id: ConnectionId::DUMMY,
endpoint: &ConnectedPoint::Dialer {
address: addr.clone(),
role_override: Endpoint::Dialer,
Expand All @@ -4150,9 +4160,10 @@ fn test_scoring_p6() {

//add additional connection for 3 of the peers to addr2
for peer in peers.iter().take(3) {
#[allow(deprecated)]
gs.on_swarm_event(FromSwarm::ConnectionEstablished(ConnectionEstablished {
peer_id: *peer,
connection_id: ConnectionId::new(0),
connection_id: ConnectionId::DUMMY,
endpoint: &ConnectedPoint::Dialer {
address: addr2.clone(),
role_override: Endpoint::Dialer,
Expand Down Expand Up @@ -4180,9 +4191,10 @@ fn test_scoring_p6() {
);

//two times same ip doesn't count twice
#[allow(deprecated)]
gs.on_swarm_event(FromSwarm::ConnectionEstablished(ConnectionEstablished {
peer_id: peers[0],
connection_id: ConnectionId::new(0),
connection_id: ConnectionId::DUMMY,
endpoint: &ConnectedPoint::Dialer {
address: addr,
role_override: Endpoint::Dialer,
Expand Down Expand Up @@ -5194,7 +5206,8 @@ fn test_subscribe_and_graft_with_negative_score() {

let (mut gs2, _, _) = inject_nodes1().create_network();

let connection_id = ConnectionId::new(0);
#[allow(deprecated)]
let connection_id = ConnectionId::DUMMY;

let topic = Topic::new("test");

Expand Down
3 changes: 2 additions & 1 deletion protocols/kad/src/behaviour/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1294,7 +1294,8 @@ fn network_behaviour_on_address_change() {
let local_peer_id = PeerId::random();

let remote_peer_id = PeerId::random();
let connection_id = ConnectionId::new(1);
#[allow(deprecated)]
let connection_id = ConnectionId::DUMMY;
let old_address: Multiaddr = Protocol::Memory(1).into();
let new_address: Multiaddr = Protocol::Memory(2).into();

Expand Down
3 changes: 3 additions & 0 deletions swarm/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
- Remove type parameter from `PendingOutboundConnectionError` and `PendingInboundConnectionError`.
These two types are always used with `std::io::Error`. See [PR 3272].

- Remove `ConnectionId::new`. Manually creating `ConnectionId`s is now unsupported. See [PR 3327].

[PR 3170]: https://github.com/libp2p/rust-libp2p/pull/3170
[PR 3134]: https://github.com/libp2p/rust-libp2p/pull/3134
[PR 3153]: https://github.com/libp2p/rust-libp2p/pull/3153
[PR 3264]: https://github.com/libp2p/rust-libp2p/pull/3264
[PR 3272]: https://github.com/libp2p/rust-libp2p/pull/3272
[PR 3327]: https://github.com/libp2p/rust-libp2p/pull/3327

# 0.41.1

Expand Down
28 changes: 14 additions & 14 deletions swarm/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,30 +45,30 @@ use libp2p_core::upgrade::{InboundUpgradeApply, OutboundUpgradeApply};
use libp2p_core::{upgrade, UpgradeError};
use libp2p_core::{Endpoint, PeerId};
use std::future::Future;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::Waker;
use std::time::Duration;
use std::{fmt, io, mem, pin::Pin, task::Context, task::Poll};

static NEXT_CONNECTION_ID: AtomicUsize = AtomicUsize::new(1);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We start IDs at 1 so they can never clash with the DUMMY one below.


/// Connection identifier.
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct ConnectionId(usize);

impl ConnectionId {
/// Creates a `ConnectionId` from a non-negative integer.
/// A "dummy" [`ConnectionId`].
///
/// This is primarily useful for creating connection IDs
/// in test environments. There is in general no guarantee
/// that all connection IDs are based on non-negative integers.
pub fn new(id: usize) -> Self {
Self(id)
}
}

impl std::ops::Add<usize> for ConnectionId {
type Output = Self;

fn add(self, other: usize) -> Self {
Self(self.0 + other)
/// Really, you should not use this, not even for testing but it is here if you need it.
#[deprecated(
since = "0.42.0",
note = "Don't use this, it will be removed at a later stage again."
)]
pub const DUMMY: ConnectionId = ConnectionId(0);
Comment on lines +60 to +67
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not very happy about this but I propose to accept this technical debt to be able to move forward with the connection management PR.

Long-term, we would need to re-write the gossipsub and kademlia tests to not manually call lifecycle functions and thus only have Swarm construct ConnectionIds.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@libp2p/rust-libp2p-maintainers Please voice your opinion on this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it would be nice if this was possible. Other than that I don't see a better alternative for here.


/// Returns the next available [`ConnectionId`].
pub(crate) fn next() -> Self {
Self(NEXT_CONNECTION_ID.fetch_add(1, Ordering::SeqCst))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is very elegant.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Credits go to @rkuhn !

}
}

Expand Down
15 changes: 2 additions & 13 deletions swarm/src/connection/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,6 @@ where
/// The pending connections that are currently being negotiated.
pending: HashMap<ConnectionId, PendingConnection<THandler>>,

/// Next available identifier for a new connection / task.
next_connection_id: ConnectionId,

/// Size of the task command buffer (per task).
task_command_buffer_size: usize,

Expand Down Expand Up @@ -326,7 +323,6 @@ where
counters: ConnectionCounters::new(limits),
established: Default::default(),
pending: Default::default(),
next_connection_id: ConnectionId::new(0),
task_command_buffer_size: config.task_command_buffer_size,
dial_concurrency_factor: config.dial_concurrency_factor,
substream_upgrade_protocol_override: config.substream_upgrade_protocol_override,
Expand Down Expand Up @@ -412,13 +408,6 @@ where
self.established.keys()
}

fn next_connection_id(&mut self) -> ConnectionId {
let connection_id = self.next_connection_id;
self.next_connection_id = self.next_connection_id + 1;

connection_id
}

fn spawn(&mut self, task: BoxFuture<'static, ()>) {
self.executor.spawn(task)
}
Expand Down Expand Up @@ -458,7 +447,7 @@ where
dial_concurrency_factor_override.unwrap_or(self.dial_concurrency_factor),
);

let connection_id = self.next_connection_id();
let connection_id = ConnectionId::next();

let (abort_notifier, abort_receiver) = oneshot::channel();

Expand Down Expand Up @@ -508,7 +497,7 @@ where
return Err((limit, handler));
}

let connection_id = self.next_connection_id();
let connection_id = ConnectionId::next();

let (abort_notifier, abort_receiver) = oneshot::channel();

Expand Down