From caed1fe2c717ba1688a4eb0549284cddba8c9ea6 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 14 Feb 2023 14:09:29 +1300 Subject: [PATCH] refactor(swarm)!: remove `handler` from `NetworkBehaviourAction::Dial` (#3328) We create the `ConnectionId` for the new connection as part of `DialOpts`. This allows `NetworkBehaviour`s to accurately track state regarding their own dial attempts. This patch is the main enabler of https://github.com/libp2p/rust-libp2p/pull/3254. Removing the `handler` field will allow us to deprecate the `NetworkBehaviour::new_handler` function in favor of four new ones that give more control over the connection lifecycle. --- core/src/muxing/boxed.rs | 6 + protocols/autonat/src/behaviour.rs | 19 +- protocols/autonat/src/behaviour/as_server.rs | 3 +- protocols/dcutr/src/behaviour_impl.rs | 171 +++++---- protocols/dcutr/src/handler.rs | 50 +-- protocols/dcutr/src/handler/direct.rs | 19 +- protocols/dcutr/src/handler/relayed.rs | 10 +- protocols/floodsub/src/layer.rs | 15 +- protocols/gossipsub/src/behaviour.rs | 14 +- protocols/gossipsub/src/behaviour/tests.rs | 10 +- protocols/identify/src/behaviour.rs | 8 +- protocols/kad/src/behaviour.rs | 19 +- protocols/mdns/src/behaviour.rs | 4 +- protocols/ping/src/lib.rs | 4 +- protocols/relay/src/behaviour.rs | 13 +- protocols/relay/src/priv_client.rs | 74 ++-- protocols/rendezvous/src/client.rs | 21 +- protocols/rendezvous/src/server.rs | 12 +- protocols/request-response/src/lib.rs | 16 +- swarm-derive/src/lib.rs | 163 +++------ swarm/CHANGELOG.md | 36 ++ swarm/src/behaviour.rs | 349 +++---------------- swarm/src/behaviour/either.rs | 29 +- swarm/src/behaviour/toggle.rs | 12 +- swarm/src/connection/pool.rs | 201 +++++------ swarm/src/dial_opts.rs | 13 + swarm/src/dummy.rs | 4 +- swarm/src/keep_alive.rs | 3 +- swarm/src/lib.rs | 233 ++++++------- swarm/src/test.rs | 12 +- swarm/tests/swarm_derive.rs | 6 +- 31 files changed, 583 insertions(+), 966 deletions(-) diff --git a/core/src/muxing/boxed.rs b/core/src/muxing/boxed.rs index 99f7a87c6a5..e909fb9fbf1 100644 --- a/core/src/muxing/boxed.rs +++ b/core/src/muxing/boxed.rs @@ -13,6 +13,12 @@ pub struct StreamMuxerBox { inner: Pin + Send>>, } +impl fmt::Debug for StreamMuxerBox { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("StreamMuxerBox").finish_non_exhaustive() + } +} + /// Abstract type for asynchronous reading and writing. /// /// A [`SubstreamBox`] erases the concrete type it is given and only retains its `AsyncRead` diff --git a/protocols/autonat/src/behaviour.rs b/protocols/autonat/src/behaviour.rs index aae4beb5811..9ffbaefdfa9 100644 --- a/protocols/autonat/src/behaviour.rs +++ b/protocols/autonat/src/behaviour.rs @@ -38,7 +38,7 @@ use libp2p_swarm::{ ExpiredListenAddr, FromSwarm, }, ConnectionId, ExternalAddresses, ListenAddresses, NetworkBehaviour, NetworkBehaviourAction, - PollParameters, THandlerOutEvent, + PollParameters, THandlerInEvent, THandlerOutEvent, }; use std::{ collections::{HashMap, VecDeque}, @@ -208,10 +208,7 @@ pub struct Behaviour { last_probe: Option, pending_actions: VecDeque< - NetworkBehaviourAction< - ::OutEvent, - ::ConnectionHandler, - >, + NetworkBehaviourAction<::OutEvent, THandlerInEvent>, >, probe_id: ProbeId, @@ -389,14 +386,14 @@ impl Behaviour { &mut self, DialFailure { peer_id, - handler, + connection_id, error, - }: DialFailure<::ConnectionHandler>, + }: DialFailure, ) { self.inner .on_swarm_event(FromSwarm::DialFailure(DialFailure { peer_id, - handler, + connection_id, error, })); if let Some(event) = self.as_server().on_outbound_dial_error(peer_id, error) { @@ -560,10 +557,8 @@ impl NetworkBehaviour for Behaviour { } } -type Action = NetworkBehaviourAction< - ::OutEvent, - ::ConnectionHandler, ->; +type Action = + NetworkBehaviourAction<::OutEvent, THandlerInEvent>; // Trait implemented for `AsClient` and `AsServer` to handle events from the inner [`request_response::Behaviour`] Protocol. trait HandleInnerEvent { diff --git a/protocols/autonat/src/behaviour/as_server.rs b/protocols/autonat/src/behaviour/as_server.rs index 7a8c9f9c4b8..ec38678e4f5 100644 --- a/protocols/autonat/src/behaviour/as_server.rs +++ b/protocols/autonat/src/behaviour/as_server.rs @@ -29,7 +29,7 @@ use libp2p_request_response::{ }; use libp2p_swarm::{ dial_opts::{DialOpts, PeerCondition}, - ConnectionId, DialError, NetworkBehaviour, NetworkBehaviourAction, PollParameters, + ConnectionId, DialError, NetworkBehaviourAction, PollParameters, }; use std::{ collections::{HashMap, HashSet, VecDeque}, @@ -138,7 +138,6 @@ impl<'a> HandleInnerEvent for AsServer<'a> { ) .addresses(addrs) .build(), - handler: self.inner.new_handler(), }, ]) } diff --git a/protocols/dcutr/src/behaviour_impl.rs b/protocols/dcutr/src/behaviour_impl.rs index acfc4cb2985..a494b411e2d 100644 --- a/protocols/dcutr/src/behaviour_impl.rs +++ b/protocols/dcutr/src/behaviour_impl.rs @@ -29,12 +29,13 @@ use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailu use libp2p_swarm::dial_opts::{self, DialOpts}; use libp2p_swarm::{ ConnectionHandlerUpgrErr, ExternalAddresses, NetworkBehaviour, NetworkBehaviourAction, - NotifyHandler, PollParameters, + NotifyHandler, PollParameters, THandlerInEvent, }; use libp2p_swarm::{ConnectionId, THandlerOutEvent}; use std::collections::{HashMap, HashSet, VecDeque}; use std::task::{Context, Poll}; use thiserror::Error; +use void::Void; const MAX_NUMBER_OF_UPGRADE_ATTEMPTS: u8 = 3; @@ -68,7 +69,7 @@ pub enum Error { pub struct Behaviour { /// Queue of actions to return when polled. - queued_events: VecDeque>, + queued_events: VecDeque>>, /// All direct (non-relayed) connections. direct_connections: HashMap>, @@ -76,6 +77,12 @@ pub struct Behaviour { external_addresses: ExternalAddresses, local_peer_id: PeerId, + + direct_to_relayed_connections: HashMap, + + /// Indexed by the [`ConnectionId`] of the relayed connection and + /// the [`PeerId`] we are trying to establish a direct connection to. + outgoing_direct_connection_attempts: HashMap<(ConnectionId, PeerId), u8>, } impl Behaviour { @@ -85,6 +92,8 @@ impl Behaviour { direct_connections: Default::default(), external_addresses: Default::default(), local_peer_id, + direct_to_relayed_connections: Default::default(), + outgoing_direct_connection_attempts: Default::default(), } } @@ -148,40 +157,57 @@ impl Behaviour { fn on_dial_failure( &mut self, DialFailure { - peer_id, handler, .. - }: DialFailure<::ConnectionHandler>, + peer_id, + connection_id: failed_direct_connection, + .. + }: DialFailure, ) { - if let handler::Prototype::DirectConnection { - relayed_connection_id, - role: handler::Role::Initiator { attempt }, - } = handler + let peer_id = if let Some(peer_id) = peer_id { + peer_id + } else { + return; + }; + + let relayed_connection_id = if let Some(relayed_connection_id) = self + .direct_to_relayed_connections + .get(&failed_direct_connection) { - let peer_id = peer_id.expect("Peer of `Prototype::DirectConnection` is always known."); - if attempt < MAX_NUMBER_OF_UPGRADE_ATTEMPTS { - self.queued_events - .push_back(NetworkBehaviourAction::NotifyHandler { - handler: NotifyHandler::One(relayed_connection_id), - peer_id, - event: Either::Left(handler::relayed::Command::Connect { - attempt: attempt + 1, - obs_addrs: self.observed_addreses(), - }), - }) - } else { - self.queued_events.extend([ - NetworkBehaviourAction::NotifyHandler { - peer_id, - handler: NotifyHandler::One(relayed_connection_id), - event: Either::Left( - handler::relayed::Command::UpgradeFinishedDontKeepAlive, - ), - }, - NetworkBehaviourAction::GenerateEvent(Event::DirectConnectionUpgradeFailed { - remote_peer_id: peer_id, - error: Error::Dial, + *relayed_connection_id + } else { + return; + }; + + let attempt = if let Some(attempt) = self + .outgoing_direct_connection_attempts + .get(&(relayed_connection_id, peer_id)) + { + *attempt + } else { + return; + }; + + if attempt < MAX_NUMBER_OF_UPGRADE_ATTEMPTS { + self.queued_events + .push_back(NetworkBehaviourAction::NotifyHandler { + handler: NotifyHandler::One(relayed_connection_id), + peer_id, + event: Either::Left(handler::relayed::Command::Connect { + attempt: attempt + 1, + obs_addrs: self.observed_addreses(), }), - ]); - } + }) + } else { + self.queued_events.extend([ + NetworkBehaviourAction::NotifyHandler { + peer_id, + handler: NotifyHandler::One(relayed_connection_id), + event: Either::Left(handler::relayed::Command::UpgradeFinishedDontKeepAlive), + }, + NetworkBehaviourAction::GenerateEvent(Event::DirectConnectionUpgradeFailed { + remote_peer_id: peer_id, + error: Error::Dial, + }), + ]); } } @@ -215,15 +241,26 @@ impl NetworkBehaviour for Behaviour { type OutEvent = Event; fn new_handler(&mut self) -> Self::ConnectionHandler { - handler::Prototype::UnknownConnection + handler::Prototype } fn on_connection_handler_event( &mut self, event_source: PeerId, - connection: ConnectionId, + connection_id: ConnectionId, handler_event: THandlerOutEvent, ) { + let relayed_connection_id = match handler_event.as_ref() { + Either::Left(_) => connection_id, + Either::Right(_) => match self.direct_to_relayed_connections.get(&connection_id) { + None => { + // If the connection ID is unknown to us, it means we didn't create it so ignore any event coming from it. + return; + } + Some(relayed_connection_id) => *relayed_connection_id, + }, + }; + match handler_event { Either::Left(handler::relayed::Event::InboundConnectRequest { inbound_connect, @@ -231,7 +268,7 @@ impl NetworkBehaviour for Behaviour { }) => { self.queued_events.extend([ NetworkBehaviourAction::NotifyHandler { - handler: NotifyHandler::One(connection), + handler: NotifyHandler::One(relayed_connection_id), peer_id: event_source, event: Either::Left(handler::relayed::Command::AcceptInboundConnect { inbound_connect, @@ -256,16 +293,17 @@ impl NetworkBehaviour for Behaviour { )); } Either::Left(handler::relayed::Event::InboundConnectNegotiated(remote_addrs)) => { - self.queued_events.push_back(NetworkBehaviourAction::Dial { - opts: DialOpts::peer_id(event_source) - .addresses(remote_addrs) - .condition(dial_opts::PeerCondition::Always) - .build(), - handler: handler::Prototype::DirectConnection { - relayed_connection_id: connection, - role: handler::Role::Listener, - }, - }); + let opts = DialOpts::peer_id(event_source) + .addresses(remote_addrs) + .condition(dial_opts::PeerCondition::Always) + .build(); + + let maybe_direct_connection_id = opts.connection_id(); + + self.direct_to_relayed_connections + .insert(maybe_direct_connection_id, relayed_connection_id); + self.queued_events + .push_back(NetworkBehaviourAction::Dial { opts }); } Either::Left(handler::relayed::Event::OutboundNegotiationFailed { error }) => { self.queued_events @@ -276,27 +314,25 @@ impl NetworkBehaviour for Behaviour { }, )); } - Either::Left(handler::relayed::Event::OutboundConnectNegotiated { - remote_addrs, - attempt, - }) => { - self.queued_events.push_back(NetworkBehaviourAction::Dial { - opts: DialOpts::peer_id(event_source) - .condition(dial_opts::PeerCondition::Always) - .addresses(remote_addrs) - .override_role() - .build(), - handler: handler::Prototype::DirectConnection { - relayed_connection_id: connection, - role: handler::Role::Initiator { attempt }, - }, - }); + Either::Left(handler::relayed::Event::OutboundConnectNegotiated { remote_addrs }) => { + let opts = DialOpts::peer_id(event_source) + .condition(dial_opts::PeerCondition::Always) + .addresses(remote_addrs) + .override_role() + .build(); + + let maybe_direct_connection_id = opts.connection_id(); + + self.direct_to_relayed_connections + .insert(maybe_direct_connection_id, relayed_connection_id); + *self + .outgoing_direct_connection_attempts + .entry((relayed_connection_id, event_source)) + .or_default() += 1; + self.queued_events + .push_back(NetworkBehaviourAction::Dial { opts }); } - Either::Right(Either::Left( - handler::direct::Event::DirectConnectionUpgradeSucceeded { - relayed_connection_id, - }, - )) => { + Either::Right(handler::direct::Event::DirectConnectionEstablished) => { self.queued_events.extend([ NetworkBehaviourAction::NotifyHandler { peer_id: event_source, @@ -312,7 +348,6 @@ impl NetworkBehaviour for Behaviour { ), ]); } - Either::Right(Either::Right(event)) => void::unreachable(event), }; } @@ -320,7 +355,7 @@ impl NetworkBehaviour for Behaviour { &mut self, _cx: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll> { + ) -> Poll>> { if let Some(event) = self.queued_events.pop_front() { return Poll::Ready(event); } diff --git a/protocols/dcutr/src/handler.rs b/protocols/dcutr/src/handler.rs index 84d1c8d8611..01062415da6 100644 --- a/protocols/dcutr/src/handler.rs +++ b/protocols/dcutr/src/handler.rs @@ -20,61 +20,27 @@ use crate::protocol; use either::Either; -use libp2p_core::upgrade::DeniedUpgrade; use libp2p_core::{ConnectedPoint, PeerId}; -use libp2p_swarm::dummy; use libp2p_swarm::handler::SendWrapper; -use libp2p_swarm::{ConnectionHandler, ConnectionId, IntoConnectionHandler}; +use libp2p_swarm::{ConnectionHandler, IntoConnectionHandler}; pub mod direct; pub mod relayed; -pub enum Prototype { - DirectConnection { - role: Role, - relayed_connection_id: ConnectionId, - }, - UnknownConnection, -} - -pub enum Role { - Initiator { attempt: u8 }, - Listener, -} +pub struct Prototype; impl IntoConnectionHandler for Prototype { - type Handler = Either>; + type Handler = Either; fn into_handler(self, _remote_peer_id: &PeerId, endpoint: &ConnectedPoint) -> Self::Handler { - match self { - Self::UnknownConnection => { - if endpoint.is_relayed() { - Either::Left(relayed::Handler::new(endpoint.clone())) - } else { - Either::Right(Either::Right(dummy::ConnectionHandler)) - } - } - Self::DirectConnection { - relayed_connection_id, - .. - } => { - assert!( - !endpoint.is_relayed(), - "`Prototype::DirectConnection` is never created for relayed connection." - ); - Either::Right(Either::Left(direct::Handler::new(relayed_connection_id))) - } + if endpoint.is_relayed() { + Either::Left(relayed::Handler::new(endpoint.clone())) + } else { + Either::Right(direct::Handler::default()) // This is a direct connection. What we don't know is whether it is the one we created or another one that happened accidentally. } } fn inbound_protocol(&self) -> ::InboundProtocol { - match self { - Prototype::UnknownConnection => { - Either::Left(SendWrapper(Either::Left(protocol::inbound::Upgrade {}))) - } - Prototype::DirectConnection { .. } => { - Either::Left(SendWrapper(Either::Right(DeniedUpgrade))) - } - } + Either::Left(SendWrapper(Either::Left(protocol::inbound::Upgrade {}))) } } diff --git a/protocols/dcutr/src/handler/direct.rs b/protocols/dcutr/src/handler/direct.rs index b979c8211d7..aab212483eb 100644 --- a/protocols/dcutr/src/handler/direct.rs +++ b/protocols/dcutr/src/handler/direct.rs @@ -23,7 +23,7 @@ use libp2p_core::upgrade::DeniedUpgrade; use libp2p_swarm::handler::ConnectionEvent; use libp2p_swarm::{ - ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, ConnectionId, KeepAlive, + ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, SubstreamProtocol, }; use std::task::{Context, Poll}; @@ -31,23 +31,14 @@ use void::Void; #[derive(Debug)] pub enum Event { - DirectConnectionUpgradeSucceeded { relayed_connection_id: ConnectionId }, + DirectConnectionEstablished, } +#[derive(Default)] pub struct Handler { - relayed_connection_id: ConnectionId, reported: bool, } -impl Handler { - pub(crate) fn new(relayed_connection_id: ConnectionId) -> Self { - Self { - reported: false, - relayed_connection_id, - } - } -} - impl ConnectionHandler for Handler { type InEvent = void::Void; type OutEvent = Event; @@ -81,9 +72,7 @@ impl ConnectionHandler for Handler { if !self.reported { self.reported = true; return Poll::Ready(ConnectionHandlerEvent::Custom( - Event::DirectConnectionUpgradeSucceeded { - relayed_connection_id: self.relayed_connection_id, - }, + Event::DirectConnectionEstablished, )); } Poll::Pending diff --git a/protocols/dcutr/src/handler/relayed.rs b/protocols/dcutr/src/handler/relayed.rs index 0b487c3039e..aefaaeec933 100644 --- a/protocols/dcutr/src/handler/relayed.rs +++ b/protocols/dcutr/src/handler/relayed.rs @@ -92,7 +92,6 @@ pub enum Event { }, OutboundConnectNegotiated { remote_addrs: Vec, - attempt: u8, }, } @@ -118,13 +117,9 @@ impl fmt::Debug for Event { .debug_struct("Event::OutboundNegotiationFailed") .field("error", error) .finish(), - Event::OutboundConnectNegotiated { - remote_addrs, - attempt, - } => f + Event::OutboundConnectNegotiated { remote_addrs } => f .debug_struct("Event::OutboundConnectNegotiated") .field("remote_addrs", remote_addrs) - .field("attempt", attempt) .finish(), } } @@ -195,7 +190,7 @@ impl Handler { &mut self, FullyNegotiatedOutbound { protocol: protocol::outbound::Connect { obs_addrs }, - info: attempt, + .. }: FullyNegotiatedOutbound< ::OutboundProtocol, ::OutboundOpenInfo, @@ -208,7 +203,6 @@ impl Handler { self.queued_events.push_back(ConnectionHandlerEvent::Custom( Event::OutboundConnectNegotiated { remote_addrs: obs_addrs, - attempt, }, )); } diff --git a/protocols/floodsub/src/layer.rs b/protocols/floodsub/src/layer.rs index a82fd0cbf4b..d0d77b5afb9 100644 --- a/protocols/floodsub/src/layer.rs +++ b/protocols/floodsub/src/layer.rs @@ -30,7 +30,7 @@ use libp2p_core::PeerId; use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, FromSwarm}; use libp2p_swarm::{ dial_opts::DialOpts, ConnectionId, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, - OneShotHandler, PollParameters, THandlerOutEvent, + OneShotHandler, PollParameters, THandlerInEvent, THandlerOutEvent, }; use log::warn; use smallvec::SmallVec; @@ -41,12 +41,7 @@ use std::{collections::VecDeque, iter}; /// Network behaviour that handles the floodsub protocol. pub struct Floodsub { /// Events that need to be yielded to the outside when polling. - events: VecDeque< - NetworkBehaviourAction< - FloodsubEvent, - OneShotHandler, - >, - >, + events: VecDeque>, config: FloodsubConfig, @@ -107,10 +102,8 @@ impl Floodsub { } if self.target_peers.insert(peer_id) { - let handler = self.new_handler(); self.events.push_back(NetworkBehaviourAction::Dial { opts: DialOpts::peer_id(peer_id).build(), - handler, }); } } @@ -330,10 +323,8 @@ impl Floodsub { // We can be disconnected by the remote in case of inactivity for example, so we always // try to reconnect. if self.target_peers.contains(&peer_id) { - let handler = self.new_handler(); self.events.push_back(NetworkBehaviourAction::Dial { opts: DialOpts::peer_id(peer_id).build(), - handler, }); } } @@ -470,7 +461,7 @@ impl NetworkBehaviour for Floodsub { &mut self, _: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll> { + ) -> Poll>> { if let Some(event) = self.events.pop_front() { return Poll::Ready(event); } diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index eccecd3a498..f1b7adbdd0b 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -42,7 +42,7 @@ use libp2p_swarm::{ behaviour::{AddressChange, ConnectionClosed, ConnectionEstablished, FromSwarm}, dial_opts::DialOpts, ConnectionId, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, - THandlerOutEvent, + THandlerInEvent, THandlerOutEvent, }; use wasm_timer::Instant; @@ -215,7 +215,7 @@ pub struct Behaviour { config: Config, /// Events that need to be yielded to the outside when polling. - events: VecDeque>, + events: VecDeque>, /// Pools non-urgent control messages between heartbeats. control_pool: HashMap>, @@ -1130,10 +1130,8 @@ where if !self.peer_topics.contains_key(peer_id) { // Connect to peer debug!("Connecting to explicit peer {:?}", peer_id); - let handler = self.new_handler(); self.events.push_back(NetworkBehaviourAction::Dial { opts: DialOpts::peer_id(*peer_id).build(), - handler, }); } } @@ -1631,10 +1629,8 @@ where self.px_peers.insert(peer_id); // dial peer - let handler = self.new_handler(); self.events.push_back(NetworkBehaviourAction::Dial { opts: DialOpts::peer_id(peer_id).build(), - handler, }); } } @@ -3431,7 +3427,7 @@ where &mut self, cx: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll> { + ) -> Poll>> { if let Some(event) = self.events.pop_front() { return Poll::Ready(event); } @@ -3480,7 +3476,7 @@ fn peer_added_to_mesh( new_topics: Vec<&TopicHash>, mesh: &HashMap>, known_topics: Option<&BTreeSet>, - events: &mut VecDeque>, + events: &mut VecDeque>, connections: &HashMap, ) { // Ensure there is an active connection @@ -3521,7 +3517,7 @@ fn peer_removed_from_mesh( old_topic: &TopicHash, mesh: &HashMap>, known_topics: Option<&BTreeSet>, - events: &mut VecDeque>, + events: &mut VecDeque>, connections: &HashMap, ) { // Ensure there is an active connection diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index cc922e4756b..2f06da61a5e 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -1395,7 +1395,7 @@ fn test_explicit_peer_gets_connected() { .events .iter() .filter(|e| match e { - NetworkBehaviourAction::Dial { opts, handler: _ } => opts.get_peer_id() == Some(peer), + NetworkBehaviourAction::Dial { opts } => opts.get_peer_id() == Some(peer), _ => false, }) .count(); @@ -1436,8 +1436,7 @@ fn test_explicit_peer_reconnects() { gs.events .iter() .filter(|e| match e { - NetworkBehaviourAction::Dial { opts, handler: _ } => - opts.get_peer_id() == Some(*peer), + NetworkBehaviourAction::Dial { opts } => opts.get_peer_id() == Some(*peer), _ => false, }) .count(), @@ -1452,8 +1451,7 @@ fn test_explicit_peer_reconnects() { gs.events .iter() .filter(|e| match e { - NetworkBehaviourAction::Dial { opts, handler: _ } => - opts.get_peer_id() == Some(*peer), + NetworkBehaviourAction::Dial { opts } => opts.get_peer_id() == Some(*peer), _ => false, }) .count() @@ -1833,7 +1831,7 @@ fn test_connect_to_px_peers_on_handle_prune() { .events .iter() .filter_map(|e| match e { - NetworkBehaviourAction::Dial { opts, handler: _ } => opts.get_peer_id(), + NetworkBehaviourAction::Dial { opts } => opts.get_peer_id(), _ => None, }) .collect(); diff --git a/protocols/identify/src/behaviour.rs b/protocols/identify/src/behaviour.rs index 4e96d175de2..01cf0c8b39c 100644 --- a/protocols/identify/src/behaviour.rs +++ b/protocols/identify/src/behaviour.rs @@ -26,7 +26,7 @@ use libp2p_swarm::{ dial_opts::DialOpts, AddressScore, ConnectionHandlerUpgrErr, ConnectionId, DialError, ExternalAddresses, ListenAddresses, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, - THandlerOutEvent, + THandlerInEvent, THandlerOutEvent, }; use lru::LruCache; use std::num::NonZeroUsize; @@ -53,7 +53,7 @@ pub struct Behaviour { /// with current information about the local peer. requests: Vec, /// Pending events to be emitted when polled. - events: VecDeque>, + events: VecDeque>, /// The addresses of all peers that we have discovered. discovered_peers: PeerCache, @@ -198,10 +198,8 @@ impl Behaviour { if !self.requests.contains(&request) { self.requests.push(request); - let handler = self.new_handler(); self.events.push_back(NetworkBehaviourAction::Dial { opts: DialOpts::peer_id(p).build(), - handler, }); } } @@ -309,7 +307,7 @@ impl NetworkBehaviour for Behaviour { &mut self, _cx: &mut Context<'_>, params: &mut impl PollParameters, - ) -> Poll> { + ) -> Poll>> { if let Some(event) = self.events.pop_front() { return Poll::Ready(event); } diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index ab51102f513..43fd0fde912 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -46,7 +46,7 @@ use libp2p_swarm::behaviour::{ use libp2p_swarm::{ dial_opts::{self, DialOpts}, ConnectionId, DialError, ExternalAddresses, ListenAddresses, NetworkBehaviour, - NetworkBehaviourAction, NotifyHandler, PollParameters, THandlerOutEvent, + NetworkBehaviourAction, NotifyHandler, PollParameters, THandlerInEvent, THandlerOutEvent, }; use log::{debug, info, warn}; use smallvec::SmallVec; @@ -101,7 +101,7 @@ pub struct Kademlia { connection_idle_timeout: Duration, /// Queued events to return when the behaviour is being polled. - queued_events: VecDeque>>, + queued_events: VecDeque>>, listen_addresses: ListenAddresses, @@ -571,10 +571,8 @@ where RoutingUpdate::Failed } kbucket::InsertResult::Pending { disconnected } => { - let handler = self.new_handler(); self.queued_events.push_back(NetworkBehaviourAction::Dial { opts: DialOpts::peer_id(disconnected.into_preimage()).build(), - handler, }); RoutingUpdate::Pending } @@ -1221,11 +1219,9 @@ where // // Only try dialing peer if not currently connected. if !self.connected_peers.contains(disconnected.preimage()) { - let handler = self.new_handler(); self.queued_events.push_back(NetworkBehaviourAction::Dial { opts: DialOpts::peer_id(disconnected.into_preimage()) .build(), - handler, }) } } @@ -1917,12 +1913,7 @@ where } } - fn on_dial_failure( - &mut self, - DialFailure { peer_id, error, .. }: DialFailure< - ::ConnectionHandler, - >, - ) { + fn on_dial_failure(&mut self, DialFailure { peer_id, error, .. }: DialFailure) { let peer_id = match peer_id { Some(id) => id, // Not interested in dial failures to unknown peers. @@ -2292,7 +2283,7 @@ where &mut self, cx: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll> { + ) -> Poll>> { let now = Instant::now(); // Calculate the available capacity for queries triggered by background jobs. @@ -2391,10 +2382,8 @@ where }); } else if &peer_id != self.kbuckets.local_key().preimage() { query.inner.pending_rpcs.push((peer_id, event)); - let handler = self.new_handler(); self.queued_events.push_back(NetworkBehaviourAction::Dial { opts: DialOpts::peer_id(peer_id).build(), - handler, }); } } diff --git a/protocols/mdns/src/behaviour.rs b/protocols/mdns/src/behaviour.rs index 163927b2bda..472e1101d55 100644 --- a/protocols/mdns/src/behaviour.rs +++ b/protocols/mdns/src/behaviour.rs @@ -31,7 +31,7 @@ use libp2p_core::{Multiaddr, PeerId}; use libp2p_swarm::behaviour::FromSwarm; use libp2p_swarm::{ dummy, ListenAddresses, NetworkBehaviour, NetworkBehaviourAction, PollParameters, - THandlerOutEvent, + THandlerInEvent, THandlerOutEvent, }; use smallvec::SmallVec; use std::collections::hash_map::{Entry, HashMap}; @@ -223,7 +223,7 @@ where &mut self, cx: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll> { + ) -> Poll>> { // Poll ifwatch. while let Poll::Ready(Some(event)) = Pin::new(&mut self.if_watch).poll_next(cx) { match event { diff --git a/protocols/ping/src/lib.rs b/protocols/ping/src/lib.rs index da865f22f12..9da00949463 100644 --- a/protocols/ping/src/lib.rs +++ b/protocols/ping/src/lib.rs @@ -50,7 +50,7 @@ pub use handler::{Config, Failure, Success}; use libp2p_core::PeerId; use libp2p_swarm::{ behaviour::FromSwarm, ConnectionId, NetworkBehaviour, NetworkBehaviourAction, PollParameters, - THandlerOutEvent, + THandlerInEvent, THandlerOutEvent, }; use std::{ collections::VecDeque, @@ -137,7 +137,7 @@ impl NetworkBehaviour for Behaviour { &mut self, _: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll> { + ) -> Poll>> { if let Some(e) = self.events.pop_back() { let Event { result, peer } = &e; diff --git a/protocols/relay/src/behaviour.rs b/protocols/relay/src/behaviour.rs index 6919626d978..c94274e7d12 100644 --- a/protocols/relay/src/behaviour.rs +++ b/protocols/relay/src/behaviour.rs @@ -32,13 +32,14 @@ use libp2p_core::PeerId; use libp2p_swarm::behaviour::{ConnectionClosed, FromSwarm}; use libp2p_swarm::{ ConnectionHandlerUpgrErr, ConnectionId, ExternalAddresses, NetworkBehaviour, - NetworkBehaviourAction, NotifyHandler, PollParameters, THandlerOutEvent, + NetworkBehaviourAction, NotifyHandler, PollParameters, THandlerInEvent, THandlerOutEvent, }; use std::collections::{hash_map, HashMap, HashSet, VecDeque}; use std::num::NonZeroU32; use std::ops::Add; use std::task::{Context, Poll}; use std::time::Duration; +use void::Void; /// Configuration for the relay [`Behaviour`]. /// @@ -642,7 +643,7 @@ impl NetworkBehaviour for Behaviour { &mut self, _cx: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll> { + ) -> Poll>> { if let Some(action) = self.queued_actions.pop_front() { return Poll::Ready(action.build(self.local_peer_id, &self.external_addresses)); } @@ -745,7 +746,7 @@ impl Add for CircuitId { /// before being returned in [`Behaviour::poll`]. #[allow(clippy::large_enum_variant)] enum Action { - Done(NetworkBehaviourAction), + Done(NetworkBehaviourAction>), AcceptReservationPrototype { inbound_reservation_req: inbound_hop::ReservationReq, handler: NotifyHandler, @@ -753,8 +754,8 @@ enum Action { }, } -impl From> for Action { - fn from(action: NetworkBehaviourAction) -> Self { +impl From>> for Action { + fn from(action: NetworkBehaviourAction>) -> Self { Self::Done(action) } } @@ -764,7 +765,7 @@ impl Action { self, local_peer_id: PeerId, external_addresses: &ExternalAddresses, - ) -> NetworkBehaviourAction { + ) -> NetworkBehaviourAction> { match self { Action::Done(action) => action, Action::AcceptReservationPrototype { diff --git a/protocols/relay/src/priv_client.rs b/protocols/relay/src/priv_client.rs index 9695a22d30c..7d3f3de826f 100644 --- a/protocols/relay/src/priv_client.rs +++ b/protocols/relay/src/priv_client.rs @@ -36,8 +36,8 @@ use libp2p_core::PeerId; use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, FromSwarm}; use libp2p_swarm::dial_opts::DialOpts; use libp2p_swarm::{ - ConnectionHandlerUpgrErr, ConnectionId, NegotiatedSubstream, NetworkBehaviour, - NetworkBehaviourAction, NotifyHandler, PollParameters, THandlerOutEvent, + ConnectionHandlerUpgrErr, ConnectionId, DialFailure, NegotiatedSubstream, NetworkBehaviour, + NetworkBehaviourAction, NotifyHandler, PollParameters, THandlerInEvent, THandlerOutEvent, }; use std::collections::{hash_map, HashMap, VecDeque}; use std::io::{Error, ErrorKind, IoSlice}; @@ -45,6 +45,7 @@ use std::ops::DerefMut; use std::pin::Pin; use std::task::{Context, Poll}; use transport::Transport; +use void::Void; /// The events produced by the client `Behaviour`. #[derive(Debug)] @@ -99,7 +100,9 @@ pub struct Behaviour { directly_connected_peers: HashMap>, /// Queue of actions to return when polled. - queued_actions: VecDeque>, + queued_actions: VecDeque>>, + + pending_handler_commands: HashMap, } /// Create a new client relay [`Behaviour`] with it's corresponding [`Transport`]. @@ -110,6 +113,7 @@ pub fn new(local_peer_id: PeerId) -> (Transport, Behaviour) { from_transport, directly_connected_peers: Default::default(), queued_actions: Default::default(), + pending_handler_commands: Default::default(), }; (transport, behaviour) } @@ -173,12 +177,23 @@ impl NetworkBehaviour for Behaviour { .or_default() .push(connection_id); } + + if let Some(event) = self.pending_handler_commands.remove(&connection_id) { + self.queued_actions + .push_back(NetworkBehaviourAction::NotifyHandler { + peer_id, + handler: NotifyHandler::One(connection_id), + event: Either::Left(event), + }) + } } FromSwarm::ConnectionClosed(connection_closed) => { self.on_connection_closed(connection_closed) } + FromSwarm::DialFailure(DialFailure { connection_id, .. }) => { + self.pending_handler_commands.remove(&connection_id); + } FromSwarm::AddressChange(_) - | FromSwarm::DialFailure(_) | FromSwarm::ListenFailure(_) | FromSwarm::NewListener(_) | FromSwarm::NewListenAddr(_) @@ -242,16 +257,16 @@ impl NetworkBehaviour for Behaviour { }; self.queued_actions - .push_back(NetworkBehaviourAction::GenerateEvent(event)); + .push_back(NetworkBehaviourAction::GenerateEvent(event)) } fn poll( &mut self, cx: &mut Context<'_>, _poll_parameters: &mut impl PollParameters, - ) -> Poll> { - if let Some(event) = self.queued_actions.pop_front() { - return Poll::Ready(event); + ) -> Poll>> { + if let Some(action) = self.queued_actions.pop_front() { + return Poll::Ready(action); } let action = match ready!(self.from_transport.poll_next_unpin(cx)) { @@ -271,17 +286,15 @@ impl NetworkBehaviour for Behaviour { event: Either::Left(handler::In::Reserve { to_listener }), }, None => { - let handler = handler::Prototype::new( - self.local_peer_id, - Some(handler::In::Reserve { to_listener }), - ); - NetworkBehaviourAction::Dial { - opts: DialOpts::peer_id(relay_peer_id) - .addresses(vec![relay_addr]) - .extend_addresses_through_behaviour() - .build(), - handler, - } + let opts = DialOpts::peer_id(relay_peer_id) + .addresses(vec![relay_addr]) + .extend_addresses_through_behaviour() + .build(); + let relayed_connection_id = opts.connection_id(); + + self.pending_handler_commands + .insert(relayed_connection_id, handler::In::Reserve { to_listener }); + NetworkBehaviourAction::Dial { opts } } } } @@ -306,20 +319,21 @@ impl NetworkBehaviour for Behaviour { }), }, None => { - let handler = handler::Prototype::new( - self.local_peer_id, - Some(handler::In::EstablishCircuit { + let opts = DialOpts::peer_id(relay_peer_id) + .addresses(vec![relay_addr]) + .extend_addresses_through_behaviour() + .build(); + let connection_id = opts.connection_id(); + + self.pending_handler_commands.insert( + connection_id, + handler::In::EstablishCircuit { send_back, dst_peer_id, - }), + }, ); - NetworkBehaviourAction::Dial { - opts: DialOpts::peer_id(relay_peer_id) - .addresses(vec![relay_addr]) - .extend_addresses_through_behaviour() - .build(), - handler, - } + + NetworkBehaviourAction::Dial { opts } } } } diff --git a/protocols/rendezvous/src/client.rs b/protocols/rendezvous/src/client.rs index 23229cd12dd..ea722e7a7d7 100644 --- a/protocols/rendezvous/src/client.rs +++ b/protocols/rendezvous/src/client.rs @@ -22,7 +22,7 @@ use crate::codec::{Cookie, ErrorCode, Namespace, NewRegistration, Registration, use crate::handler; use crate::handler::outbound; use crate::handler::outbound::OpenInfo; -use crate::substream_handler::SubstreamConnectionHandler; +use crate::substream_handler::{InEvent, SubstreamConnectionHandler}; use futures::future::BoxFuture; use futures::future::FutureExt; use futures::stream::FuturesUnordered; @@ -34,19 +34,15 @@ use libp2p_core::{Multiaddr, PeerId, PeerRecord}; use libp2p_swarm::behaviour::FromSwarm; use libp2p_swarm::{ CloseConnection, ConnectionId, ExternalAddresses, NetworkBehaviour, NetworkBehaviourAction, - NotifyHandler, PollParameters, THandlerOutEvent, + NotifyHandler, PollParameters, THandlerInEvent, THandlerOutEvent, }; use std::collections::{HashMap, VecDeque}; use std::iter::FromIterator; use std::task::{Context, Poll}; +use void::Void; pub struct Behaviour { - events: VecDeque< - NetworkBehaviourAction< - Event, - SubstreamConnectionHandler, - >, - >, + events: VecDeque>>, keypair: Keypair, pending_register_requests: Vec<(Namespace, PeerId, Option)>, @@ -219,7 +215,7 @@ impl NetworkBehaviour for Behaviour { &mut self, cx: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll> { + ) -> Poll>> { if let Some(event) = self.events.pop_front() { return Poll::Ready(event); } @@ -293,12 +289,7 @@ fn handle_outbound_event( peer_id: PeerId, discovered_peers: &mut HashMap<(PeerId, Namespace), Vec>, expiring_registrations: &mut FuturesUnordered>, -) -> Vec< - NetworkBehaviourAction< - Event, - SubstreamConnectionHandler, - >, -> { +) -> Vec>> { match event { outbound::OutEvent::Registered { namespace, ttl } => { vec![NetworkBehaviourAction::GenerateEvent(Event::Registered { diff --git a/protocols/rendezvous/src/server.rs b/protocols/rendezvous/src/server.rs index d7e42652a2d..c38b8e760f3 100644 --- a/protocols/rendezvous/src/server.rs +++ b/protocols/rendezvous/src/server.rs @@ -20,7 +20,7 @@ use crate::codec::{Cookie, ErrorCode, Namespace, NewRegistration, Registration, Ttl}; use crate::handler::inbound; -use crate::substream_handler::{InboundSubstreamId, SubstreamConnectionHandler}; +use crate::substream_handler::{InEvent, InboundSubstreamId, SubstreamConnectionHandler}; use crate::{handler, MAX_TTL, MIN_TTL}; use bimap::BiMap; use futures::future::BoxFuture; @@ -31,7 +31,7 @@ use libp2p_core::PeerId; use libp2p_swarm::behaviour::FromSwarm; use libp2p_swarm::{ CloseConnection, ConnectionId, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, - PollParameters, THandlerOutEvent, + PollParameters, THandlerInEvent, THandlerOutEvent, }; use std::collections::{HashMap, HashSet, VecDeque}; use std::iter::FromIterator; @@ -40,9 +40,7 @@ use std::time::Duration; use void::Void; pub struct Behaviour { - events: VecDeque< - NetworkBehaviourAction>, - >, + events: VecDeque>>, registrations: Registrations, } @@ -148,7 +146,7 @@ impl NetworkBehaviour for Behaviour { &mut self, cx: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll> { + ) -> Poll>> { if let Poll::Ready(ExpiredRegistration(registration)) = self.registrations.poll(cx) { return Poll::Ready(NetworkBehaviourAction::GenerateEvent( Event::RegistrationExpired(registration), @@ -186,7 +184,7 @@ fn handle_inbound_event( connection: ConnectionId, id: InboundSubstreamId, registrations: &mut Registrations, -) -> Vec>> { +) -> Vec>> { match event { // bad registration inbound::OutEvent::RegistrationRequested(registration) diff --git a/protocols/request-response/src/lib.rs b/protocols/request-response/src/lib.rs index 760a17120c9..2986bf5191f 100644 --- a/protocols/request-response/src/lib.rs +++ b/protocols/request-response/src/lib.rs @@ -75,7 +75,7 @@ use libp2p_swarm::{ behaviour::{AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm}, dial_opts::DialOpts, ConnectionId, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, - THandlerOutEvent, + THandlerInEvent, THandlerOutEvent, }; use smallvec::SmallVec; use std::{ @@ -349,8 +349,9 @@ where /// The protocol codec for reading and writing requests and responses. codec: TCodec, /// Pending events to return from `poll`. - pending_events: - VecDeque, Handler>>, + pending_events: VecDeque< + NetworkBehaviourAction, RequestProtocol>, + >, /// The currently connected peers, their pending outbound and inbound responses and their known, /// reachable addresses, if any. connected: HashMap>, @@ -417,10 +418,8 @@ where }; if let Some(request) = self.try_send_request(peer, request) { - let handler = self.new_handler(); self.pending_events.push_back(NetworkBehaviourAction::Dial { opts: DialOpts::peer_id(*peer).build(), - handler, }); self.pending_outbound_requests .entry(*peer) @@ -696,10 +695,7 @@ where } } - fn on_dial_failure( - &mut self, - DialFailure { peer_id, .. }: DialFailure<::ConnectionHandler>, - ) { + fn on_dial_failure(&mut self, DialFailure { peer_id, .. }: DialFailure) { if let Some(peer) = peer_id { // If there are pending outgoing requests when a dial failure occurs, // it is implied that we are not connected to the peer, since pending @@ -931,7 +927,7 @@ where &mut self, _: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll> { + ) -> Poll>> { if let Some(ev) = self.pending_events.pop_front() { return Poll::Ready(ev); } else if self.pending_events.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD { diff --git a/swarm-derive/src/lib.rs b/swarm-derive/src/lib.rs index d4bd3b39f9b..5c31aff18de 100644 --- a/swarm-derive/src/lib.rs +++ b/swarm-derive/src/lib.rs @@ -74,6 +74,7 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { let expired_external_addr = quote! { #prelude_path::ExpiredExternalAddr }; let listener_error = quote! { #prelude_path::ListenerError }; let listener_closed = quote! { #prelude_path::ListenerClosed }; + let t_handler_in_event = quote! { #prelude_path::THandlerInEvent }; // Build the generics. let impl_generics = { @@ -325,87 +326,51 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { // Build the list of statements to put in the body of `on_swarm_event()` // for the `FromSwarm::DialFailure` variant. - let on_dial_failure_stmts = { - data_struct - .fields - .iter() - .enumerate() - // The outmost handler belongs to the last behaviour. - .rev() - .enumerate() - .map(|(enum_n, (field_n, field))| { - let handler = if field_n == 0 { - // Given that the iterator is reversed, this is the innermost handler only. - quote! { let handler = handlers } - } else { - quote! { - let (handlers, handler) = handlers.into_inner() - } - }; - - let inject = match field.ident { - Some(ref i) => quote! { - self.#i.on_swarm_event(#from_swarm::DialFailure(#dial_failure { - peer_id, - handler, - error, - })); - }, - None => quote! { - self.#enum_n.on_swarm_event(#from_swarm::DialFailure(#dial_failure { - peer_id, - handler, - error, - })); - }, - }; - quote! { - #handler; - #inject; - } - }) - }; + let on_dial_failure_stmts = data_struct + .fields + .iter() + .enumerate() + .map(|(enum_n, field)| match field.ident { + Some(ref i) => quote! { + self.#i.on_swarm_event(#from_swarm::DialFailure(#dial_failure { + peer_id, + connection_id, + error, + })); + }, + None => quote! { + self.#enum_n.on_swarm_event(#from_swarm::DialFailure(#dial_failure { + peer_id, + connection_id, + error, + })); + }, + }); // Build the list of statements to put in the body of `on_swarm_event()` // for the `FromSwarm::ListenFailure` variant. - let on_listen_failure_stmts = - { - data_struct.fields.iter().enumerate().rev().enumerate().map( - |(enum_n, (field_n, field))| { - let handler = if field_n == 0 { - quote! { let handler = handlers } - } else { - quote! { - let (handlers, handler) = handlers.into_inner() - } - }; - - let inject = match field.ident { - Some(ref i) => quote! { - self.#i.on_swarm_event(#from_swarm::ListenFailure(#listen_failure { - local_addr, - send_back_addr, - error, - handler, - })); - }, - None => quote! { - self.#enum_n.on_swarm_event(#from_swarm::ListenFailure(#listen_failure { - local_addr, - send_back_addr, - error, - handler, - })); - }, - }; - - quote! { - #handler; - #inject; - } - }, - ) - }; + let on_listen_failure_stmts = data_struct + .fields + .iter() + .enumerate() + .map(|(enum_n, field)| match field.ident { + Some(ref i) => quote! { + self.#i.on_swarm_event(#from_swarm::ListenFailure(#listen_failure { + local_addr, + send_back_addr, + connection_id, + error + })); + }, + None => quote! { + self.#enum_n.on_swarm_event(#from_swarm::ListenFailure(#listen_failure { + local_addr, + send_back_addr, + connection_id, + error + })); + }, + }); // Build the list of statements to put in the body of `on_swarm_event()` // for the `FromSwarm::NewListener` variant. @@ -650,38 +615,6 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { wrapped_event = quote!{ #either_ident::Left(#wrapped_event) }; } - // `Dial` provides a handler of the specific behaviour triggering the - // event. Though in order for the final handler to be able to handle - // protocols of all behaviours, the provided handler needs to be - // combined with handlers of all other behaviours. - let provided_handler_and_new_handlers = { - let mut out_handler = None; - - for (f_n, f) in data_struct.fields.iter().enumerate() { - let f_name = match f.ident { - Some(ref i) => quote! { self.#i }, - None => quote! { self.#f_n }, - }; - - let builder = if field_n == f_n { - // The behaviour that triggered the event. Thus, instead of - // creating a new handler, use the provided handler. - quote! { provided_handler } - } else { - quote! { #f_name.new_handler() } - }; - - match out_handler { - Some(h) => { - out_handler = Some(quote! { #into_connection_handler::select(#h, #builder) }) - } - ref mut h @ None => *h = Some(builder), - } - } - - out_handler.unwrap_or(quote! {()}) // TODO: See test `empty`. - }; - let generate_event_match_arm = { // If the `NetworkBehaviour`'s `OutEvent` is generated by the derive macro, wrap the sub // `NetworkBehaviour` `OutEvent` in the variant of the generated `OutEvent`. If the @@ -708,8 +641,8 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { quote!{ match #trait_to_impl::poll(&mut self.#field, cx, poll_params) { #generate_event_match_arm - std::task::Poll::Ready(#network_behaviour_action::Dial { opts, handler: provided_handler }) => { - return std::task::Poll::Ready(#network_behaviour_action::Dial { opts, handler: #provided_handler_and_new_handlers }); + std::task::Poll::Ready(#network_behaviour_action::Dial { opts }) => { + return std::task::Poll::Ready(#network_behaviour_action::Dial { opts }); } std::task::Poll::Ready(#network_behaviour_action::NotifyHandler { peer_id, handler, event }) => { return std::task::Poll::Ready(#network_behaviour_action::NotifyHandler { @@ -767,7 +700,7 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { } } - fn poll(&mut self, cx: &mut std::task::Context, poll_params: &mut impl #poll_parameters) -> std::task::Poll<#network_behaviour_action> { + fn poll(&mut self, cx: &mut std::task::Context, poll_params: &mut impl #poll_parameters) -> std::task::Poll<#network_behaviour_action>> { use #prelude_path::futures::*; #(#poll_stmts)* std::task::Poll::Pending @@ -785,10 +718,10 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { #connection_closed { peer_id, connection_id, endpoint, handler: handlers, remaining_established }) => { #(#on_connection_closed_stmts)* } #from_swarm::DialFailure( - #dial_failure { peer_id, handler: handlers, error }) + #dial_failure { peer_id, connection_id, error }) => { #(#on_dial_failure_stmts)* } #from_swarm::ListenFailure( - #listen_failure { local_addr, send_back_addr, handler: handlers, error }) + #listen_failure { local_addr, send_back_addr, connection_id, error }) => { #(#on_listen_failure_stmts)* } #from_swarm::NewListener( #new_listener { listener_id }) diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index 765cfdefbc2..2c6f116571e 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -1,5 +1,38 @@ # 0.42.0 [unreleased] +- Remove `handler` field from `NetworkBehaviourAction::Dial`. + Instead of constructing the handler early, you can now access the `ConnectionId` of the future connection on `DialOpts`. + `ConnectionId`s are `Copy` and will be used throughout the entire lifetime of the connection to report events. + This allows you to send events to a very specific connection, much like you previously could directly set state in the handler. + + Removing the `handler` field also reduces the type parameters of `NetworkBehaviourAction` from three to two. + The third one used to be defaulted to the `InEvent` of the `ConnectionHandler`. + You now have to manually specify that where you previously had to specify the `ConnectionHandler`. + This very likely will trigger **convoluted compile errors** about traits not being implemented. + + Within `NetworkBehaviourAction::poll`, the easiest way to migrate is to do this (in the example of `libp2p-floodsub`): + ```diff + --- a/protocols/floodsub/src/layer.rs + +++ b/protocols/floodsub/src/layer.rs + @@ -472,7 +465,7 @@ impl NetworkBehaviour for Floodsub { + &mut self, + _: &mut Context<'_>, + _: &mut impl PollParameters, + - ) -> Poll> { + + ) -> Poll>> { + ``` + + In other words: + + |Search|Replace| + |---|---| + |`NetworkBehaviourAction`|`NetworkBehaviourAction>`| + + If you reference `NetworkBehaviourAction` somewhere else as well, + you may have to fill in the type of `ConnectionHandler::InEvent` manually as the 2nd parameter. + + See [PR 3328]. + - Update to `libp2p-core` `v0.39.0`. - Removed deprecated Swarm constructors. For transition notes see [0.41.0](#0.41.0). See [PR 3170]. @@ -36,6 +69,8 @@ - Introduce `ListenError` and use it within `SwarmEvent::IncomingConnectionError`. See [PR 3375]. +- Remove `ConnectionId::new`. Manually creating `ConnectionId`s is now unsupported. See [PR 3327]. + [PR 3364]: https://github.com/libp2p/rust-libp2p/pull/3364 [PR 3170]: https://github.com/libp2p/rust-libp2p/pull/3170 [PR 3134]: https://github.com/libp2p/rust-libp2p/pull/3134 @@ -43,6 +78,7 @@ [PR 3264]: https://github.com/libp2p/rust-libp2p/pull/3264 [PR 3272]: https://github.com/libp2p/rust-libp2p/pull/3272 [PR 3327]: https://github.com/libp2p/rust-libp2p/pull/3327 +[PR 3328]: https://github.com/libp2p/rust-libp2p/pull/3328 [PR 3188]: https://github.com/libp2p/rust-libp2p/pull/3188 [PR 3377]: https://github.com/libp2p/rust-libp2p/pull/3377 [PR 3373]: https://github.com/libp2p/rust-libp2p/pull/3373 diff --git a/swarm/src/behaviour.rs b/swarm/src/behaviour.rs index 3545f9700ea..9200ed730d1 100644 --- a/swarm/src/behaviour.rs +++ b/swarm/src/behaviour.rs @@ -28,15 +28,13 @@ pub use listen_addresses::ListenAddresses; use crate::connection::ConnectionId; use crate::dial_opts::DialOpts; -use crate::handler::{ConnectionHandler, IntoConnectionHandler}; -use crate::{AddressRecord, AddressScore, DialError, ListenError, THandlerOutEvent}; +use crate::handler::IntoConnectionHandler; +use crate::{ + AddressRecord, AddressScore, DialError, ListenError, THandlerInEvent, THandlerOutEvent, +}; use libp2p_core::{transport::ListenerId, ConnectedPoint, Multiaddr, PeerId}; use std::{task::Context, task::Poll}; -/// Custom event that can be received by the [`ConnectionHandler`]. -pub(crate) type THandlerInEvent = - <::Handler as ConnectionHandler>::InEvent; - /// A [`NetworkBehaviour`] defines the behaviour of the local node on the network. /// /// In contrast to [`Transport`](libp2p_core::Transport) which defines **how** to send bytes on the @@ -126,7 +124,7 @@ pub trait NetworkBehaviour: 'static { /// Event generated by the `NetworkBehaviour` and that the swarm will report back. type OutEvent: Send + 'static; - /// Creates a new [`ConnectionHandler`] for a connection with a peer. + /// Creates a new [`ConnectionHandler`](crate::ConnectionHandler) for a connection with a peer. /// /// Every time an incoming connection is opened, and every time another [`NetworkBehaviour`] /// emitted a dial request, this method is called. @@ -158,8 +156,8 @@ pub trait NetworkBehaviour: 'static { /// Informs the behaviour about an event from the [`Swarm`](crate::Swarm). fn on_swarm_event(&mut self, event: FromSwarm); - /// Informs the behaviour about an event generated by the [`ConnectionHandler`] dedicated to the - /// peer identified by `peer_id`. for the behaviour. + /// Informs the behaviour about an event generated by the [`ConnectionHandler`](crate::ConnectionHandler) + /// dedicated to the peer identified by `peer_id`. for the behaviour. /// /// The [`PeerId`] is guaranteed to be in a connected state. In other words, /// [`FromSwarm::ConnectionEstablished`] has previously been received with this [`PeerId`]. @@ -178,7 +176,7 @@ pub trait NetworkBehaviour: 'static { &mut self, cx: &mut Context<'_>, params: &mut impl PollParameters, - ) -> Poll>; + ) -> Poll>>; } /// Parameters passed to `poll()`, that the `NetworkBehaviour` has access to. @@ -224,16 +222,8 @@ pub trait PollParameters { /// in whose context it is executing. /// /// [`Swarm`]: super::Swarm -// -// Note: `TInEvent` is needed to be able to implement -// [`NetworkBehaviourAction::map_in`], mapping the handler `InEvent` leaving the -// handler itself untouched. #[derive(Debug)] -pub enum NetworkBehaviourAction< - TOutEvent, - THandler: IntoConnectionHandler, - TInEvent = THandlerInEvent, -> { +pub enum NetworkBehaviourAction { /// Instructs the `Swarm` to return an event when it is being polled. GenerateEvent(TOutEvent), @@ -242,195 +232,28 @@ pub enum NetworkBehaviourAction< /// On success, [`NetworkBehaviour::on_swarm_event`] with `ConnectionEstablished` is invoked. /// On failure, [`NetworkBehaviour::on_swarm_event`] with `DialFailure` is invoked. /// - /// Note that the provided handler is returned to the [`NetworkBehaviour`] on connection failure - /// and connection closing. Thus it can be used to carry state, which otherwise would have to be - /// tracked in the [`NetworkBehaviour`] itself. E.g. a message destined to an unconnected peer - /// can be included in the handler, and thus directly send on connection success or extracted by - /// the [`NetworkBehaviour`] on connection failure. - /// - /// # Example carrying state in the handler - /// - /// ```rust - /// # use futures::executor::block_on; - /// # use futures::stream::StreamExt; - /// # use libp2p_core::identity; - /// # use libp2p_core::transport::{MemoryTransport, Transport}; - /// # use libp2p_core::upgrade::{self, DeniedUpgrade, InboundUpgrade, OutboundUpgrade}; - /// # use libp2p_core::PeerId; - /// # use libp2p_plaintext::PlainText2Config; - /// # use libp2p_swarm::{ - /// # ConnectionId, DialError, IntoConnectionHandler, KeepAlive, NegotiatedSubstream, - /// # FromSwarm, DialFailure, THandlerOutEvent, - /// # NetworkBehaviour, NetworkBehaviourAction, PollParameters, ConnectionHandler, - /// # ConnectionHandlerEvent, ConnectionHandlerUpgrErr, SubstreamProtocol, Swarm, SwarmEvent, - /// # }; - /// # use libp2p_swarm::handler::ConnectionEvent; - /// # use libp2p_swarm::dial_opts::{DialOpts, PeerCondition}; - /// # use libp2p_yamux as yamux; - /// # use std::collections::VecDeque; - /// # use std::task::{Context, Poll}; - /// # use void::Void; - /// # - /// # let local_key = identity::Keypair::generate_ed25519(); - /// # let local_public_key = local_key.public(); - /// # let local_peer_id = PeerId::from(local_public_key.clone()); - /// # - /// # let transport = MemoryTransport::default() - /// # .upgrade(upgrade::Version::V1) - /// # .authenticate(PlainText2Config { local_public_key }) - /// # .multiplex(yamux::YamuxConfig::default()) - /// # .boxed(); - /// # - /// # let mut swarm = Swarm::with_threadpool_executor(transport, MyBehaviour::default(), local_peer_id); - /// # - /// // Super precious message that we should better not lose. - /// let message = PreciousMessage("My precious message".to_string()); - /// - /// // Unfortunately this peer is offline, thus sending our message to it will fail. - /// let offline_peer = PeerId::random(); - /// - /// // Let's send it anyways. We should get it back in case connecting to the peer fails. - /// swarm.behaviour_mut().send(offline_peer, message); - /// - /// block_on(async { - /// // As expected, sending failed. But great news, we got our message back. - /// matches!( - /// swarm.next().await.expect("Infinite stream"), - /// SwarmEvent::Behaviour(PreciousMessage(_)) - /// ); - /// }); - /// - /// #[derive(Default)] - /// struct MyBehaviour { - /// outbox_to_swarm: VecDeque>, - /// } - /// - /// impl MyBehaviour { - /// fn send(&mut self, peer_id: PeerId, msg: PreciousMessage) { - /// self.outbox_to_swarm - /// .push_back(NetworkBehaviourAction::Dial { - /// opts: DialOpts::peer_id(peer_id) - /// .condition(PeerCondition::Always) - /// .build(), - /// handler: MyHandler { message: Some(msg) }, - /// }); - /// } - /// } - /// # - /// impl NetworkBehaviour for MyBehaviour { - /// # type ConnectionHandler = MyHandler; - /// # type OutEvent = PreciousMessage; - /// # - /// # fn new_handler(&mut self) -> Self::ConnectionHandler { - /// # MyHandler { message: None } - /// # } - /// # - /// # - /// # fn on_connection_handler_event( - /// # &mut self, - /// # _: PeerId, - /// # _: ConnectionId, - /// # _: THandlerOutEvent, - /// # ) { - /// # unreachable!(); - /// # } - /// # - /// fn on_swarm_event( - /// &mut self, - /// event: FromSwarm, - /// ) { - /// // As expected, sending the message failed. But lucky us, we got the handler back, thus - /// // the precious message is not lost and we can return it back to the user. - /// if let FromSwarm::DialFailure(DialFailure { handler, .. }) = event { - /// let msg = handler.message.unwrap(); - /// self.outbox_to_swarm - /// .push_back(NetworkBehaviourAction::GenerateEvent(msg)) - /// } - /// } - /// # - /// # fn poll( - /// # &mut self, - /// # _: &mut Context<'_>, - /// # _: &mut impl PollParameters, - /// # ) -> Poll> { - /// # if let Some(action) = self.outbox_to_swarm.pop_front() { - /// # return Poll::Ready(action); - /// # } - /// # Poll::Pending - /// # } - /// } - /// - /// # struct MyHandler { - /// # message: Option, - /// # } - /// # - /// # impl ConnectionHandler for MyHandler { - /// # type InEvent = Void; - /// # type OutEvent = Void; - /// # type Error = Void; - /// # type InboundProtocol = DeniedUpgrade; - /// # type OutboundProtocol = DeniedUpgrade; - /// # type InboundOpenInfo = (); - /// # type OutboundOpenInfo = Void; - /// # - /// # fn listen_protocol( - /// # &self, - /// # ) -> SubstreamProtocol { - /// # SubstreamProtocol::new(DeniedUpgrade, ()) - /// # } - /// # - /// # fn on_behaviour_event(&mut self, _event: Self::InEvent) {} - /// # - /// # fn on_connection_event( - /// # &mut self, - /// # event: ConnectionEvent< - /// # Self::InboundProtocol, - /// # Self::OutboundProtocol, - /// # Self::InboundOpenInfo, - /// # Self::OutboundOpenInfo, - /// # >, - /// # ) {} - /// # - /// # fn connection_keep_alive(&self) -> KeepAlive { - /// # KeepAlive::Yes - /// # } - /// # - /// # fn poll( - /// # &mut self, - /// # _: &mut Context<'_>, - /// # ) -> Poll< - /// # ConnectionHandlerEvent< - /// # Self::OutboundProtocol, - /// # Self::OutboundOpenInfo, - /// # Self::OutEvent, - /// # Self::Error, - /// # >, - /// # > { - /// # todo!("If `Self::message.is_some()` send the message to the remote.") - /// # } - /// # } - /// # #[derive(Debug, PartialEq, Eq)] - /// # struct PreciousMessage(String); - /// ``` - Dial { opts: DialOpts, handler: THandler }, + /// [`DialOpts`] provides access to the [`ConnectionId`] via [`DialOpts::connection_id`]. + /// This [`ConnectionId`] will be used throughout the connection's lifecycle to associate events with it. + /// This allows a [`NetworkBehaviour`] to identify a connection that resulted out of its own dial request. + Dial { opts: DialOpts }, /// Instructs the `Swarm` to send an event to the handler dedicated to a /// connection with a peer. /// /// If the `Swarm` is connected to the peer, the message is delivered to the - /// [`ConnectionHandler`] instance identified by the peer ID and connection ID. + /// [`ConnectionHandler`](crate::ConnectionHandler) instance identified by the peer ID and connection ID. /// /// If the specified connection no longer exists, the event is silently dropped. /// /// Typically the connection ID given is the same as the one passed to /// [`NetworkBehaviour::on_connection_handler_event`], i.e. whenever the behaviour wishes to /// respond to a request on the same connection (and possibly the same - /// substream, as per the implementation of [`ConnectionHandler`]). + /// substream, as per the implementation of [`ConnectionHandler`](crate::ConnectionHandler)). /// /// Note that even if the peer is currently connected, connections can get closed /// at any time and thus the event may not reach a handler. NotifyHandler { - /// The peer for whom a [`ConnectionHandler`] should be notified. + /// The peer for whom a [`ConnectionHandler`](crate::ConnectionHandler) should be notified. peer_id: PeerId, /// The options w.r.t. which connection handler to notify of the event. handler: NotifyHandler, @@ -459,10 +282,10 @@ pub enum NetworkBehaviourAction< /// /// Note: Closing a connection via /// [`NetworkBehaviourAction::CloseConnection`] does not inform the - /// corresponding [`ConnectionHandler`]. - /// Closing a connection via a [`ConnectionHandler`] can be done - /// either in a collaborative manner across [`ConnectionHandler`]s - /// with [`ConnectionHandler::connection_keep_alive`] or directly with + /// corresponding [`ConnectionHandler`](crate::ConnectionHandler). + /// Closing a connection via a [`ConnectionHandler`](crate::ConnectionHandler) can be done + /// either in a collaborative manner across [`ConnectionHandler`](crate::ConnectionHandler)s + /// with [`ConnectionHandler::connection_keep_alive`](crate::ConnectionHandler::connection_keep_alive) or directly with /// [`ConnectionHandlerEvent::Close`](crate::ConnectionHandlerEvent::Close). CloseConnection { /// The peer to disconnect. @@ -472,19 +295,15 @@ pub enum NetworkBehaviourAction< }, } -impl - NetworkBehaviourAction -{ +impl NetworkBehaviourAction { /// Map the handler event. pub fn map_in( self, f: impl FnOnce(TInEventOld) -> TInEventNew, - ) -> NetworkBehaviourAction { + ) -> NetworkBehaviourAction { match self { NetworkBehaviourAction::GenerateEvent(e) => NetworkBehaviourAction::GenerateEvent(e), - NetworkBehaviourAction::Dial { opts, handler } => { - NetworkBehaviourAction::Dial { opts, handler } - } + NetworkBehaviourAction::Dial { opts } => NetworkBehaviourAction::Dial { opts }, NetworkBehaviourAction::NotifyHandler { peer_id, handler, @@ -508,57 +327,15 @@ impl } } -impl NetworkBehaviourAction { +impl NetworkBehaviourAction { /// Map the event the swarm will return. - pub fn map_out(self, f: impl FnOnce(TOutEvent) -> E) -> NetworkBehaviourAction { - match self { - NetworkBehaviourAction::GenerateEvent(e) => NetworkBehaviourAction::GenerateEvent(f(e)), - NetworkBehaviourAction::Dial { opts, handler } => { - NetworkBehaviourAction::Dial { opts, handler } - } - NetworkBehaviourAction::NotifyHandler { - peer_id, - handler, - event, - } => NetworkBehaviourAction::NotifyHandler { - peer_id, - handler, - event, - }, - NetworkBehaviourAction::ReportObservedAddr { address, score } => { - NetworkBehaviourAction::ReportObservedAddr { address, score } - } - NetworkBehaviourAction::CloseConnection { - peer_id, - connection, - } => NetworkBehaviourAction::CloseConnection { - peer_id, - connection, - }, - } - } -} - -impl NetworkBehaviourAction -where - THandlerOld: IntoConnectionHandler, - ::Handler: ConnectionHandler, -{ - /// Map the handler. - pub fn map_handler( + pub fn map_out( self, - f: impl FnOnce(THandlerOld) -> THandlerNew, - ) -> NetworkBehaviourAction - where - THandlerNew: IntoConnectionHandler, - ::Handler: ConnectionHandler, - { + f: impl FnOnce(TOutEvent) -> E, + ) -> NetworkBehaviourAction { match self { - NetworkBehaviourAction::GenerateEvent(e) => NetworkBehaviourAction::GenerateEvent(e), - NetworkBehaviourAction::Dial { opts, handler } => NetworkBehaviourAction::Dial { - opts, - handler: f(handler), - }, + NetworkBehaviourAction::GenerateEvent(e) => NetworkBehaviourAction::GenerateEvent(f(e)), + NetworkBehaviourAction::Dial { opts } => NetworkBehaviourAction::Dial { opts }, NetworkBehaviourAction::NotifyHandler { peer_id, handler, @@ -582,50 +359,6 @@ where } } -impl NetworkBehaviourAction -where - THandlerOld: IntoConnectionHandler, - ::Handler: ConnectionHandler, -{ - /// Map the handler and handler event. - pub fn map_handler_and_in( - self, - f_handler: impl FnOnce(THandlerOld) -> THandlerNew, - f_in_event: impl FnOnce(TInEventOld) -> TInEventNew, - ) -> NetworkBehaviourAction - where - THandlerNew: IntoConnectionHandler, - ::Handler: ConnectionHandler, - { - match self { - NetworkBehaviourAction::GenerateEvent(e) => NetworkBehaviourAction::GenerateEvent(e), - NetworkBehaviourAction::Dial { opts, handler } => NetworkBehaviourAction::Dial { - opts, - handler: f_handler(handler), - }, - NetworkBehaviourAction::NotifyHandler { - peer_id, - handler, - event, - } => NetworkBehaviourAction::NotifyHandler { - peer_id, - handler, - event: f_in_event(event), - }, - NetworkBehaviourAction::ReportObservedAddr { address, score } => { - NetworkBehaviourAction::ReportObservedAddr { address, score } - } - NetworkBehaviourAction::CloseConnection { - peer_id, - connection, - } => NetworkBehaviourAction::CloseConnection { - peer_id, - connection, - }, - } - } -} - /// The options w.r.t. which connection handler to notify of an event. #[derive(Debug, Clone)] pub enum NotifyHandler { @@ -661,13 +394,13 @@ pub enum FromSwarm<'a, Handler: IntoConnectionHandler> { AddressChange(AddressChange<'a>), /// Informs the behaviour that the dial to a known /// or unknown node failed. - DialFailure(DialFailure<'a, Handler>), + DialFailure(DialFailure<'a>), /// Informs the behaviour that an error /// happened on an incoming connection during its initial handshake. /// /// This can include, for example, an error during the handshake of the encryption layer, or the /// connection unexpectedly closed. - ListenFailure(ListenFailure<'a, Handler>), + ListenFailure(ListenFailure<'a>), /// Informs the behaviour that a new listener was created. NewListener(NewListener), /// Informs the behaviour that we have started listening on a new multiaddr. @@ -722,10 +455,10 @@ pub struct AddressChange<'a> { /// [`FromSwarm`] variant that informs the behaviour that the dial to a known /// or unknown node failed. #[derive(Clone, Copy)] -pub struct DialFailure<'a, Handler> { +pub struct DialFailure<'a> { pub peer_id: Option, - pub handler: Handler, pub error: &'a DialError, + pub connection_id: ConnectionId, } /// [`FromSwarm`] variant that informs the behaviour that an error @@ -734,11 +467,11 @@ pub struct DialFailure<'a, Handler> { /// This can include, for example, an error during the handshake of the encryption layer, or the /// connection unexpectedly closed. #[derive(Clone, Copy)] -pub struct ListenFailure<'a, Handler> { +pub struct ListenFailure<'a> { pub local_addr: &'a Multiaddr, pub send_back_addr: &'a Multiaddr, pub error: &'a ListenError, - pub handler: Handler, + pub connection_id: ConnectionId, } /// [`FromSwarm`] variant that informs the behaviour that a new listener was created. @@ -794,7 +527,6 @@ pub struct ExpiredExternalAddr<'a> { impl<'a, Handler: IntoConnectionHandler> FromSwarm<'a, Handler> { fn map_handler( self, - map_into_handler: impl FnOnce(Handler) -> NewHandler, map_handler: impl FnOnce( ::Handler, ) -> ::Handler, @@ -802,13 +534,12 @@ impl<'a, Handler: IntoConnectionHandler> FromSwarm<'a, Handler> { where NewHandler: IntoConnectionHandler, { - self.maybe_map_handler(|h| Some(map_into_handler(h)), |h| Some(map_handler(h))) + self.maybe_map_handler(|h| Some(map_handler(h))) .expect("To return Some as all closures return Some.") } fn maybe_map_handler( self, - map_into_handler: impl FnOnce(Handler) -> Option, map_handler: impl FnOnce( ::Handler, ) -> Option<::Handler>, @@ -856,23 +587,23 @@ impl<'a, Handler: IntoConnectionHandler> FromSwarm<'a, Handler> { })), FromSwarm::DialFailure(DialFailure { peer_id, - handler, error, + connection_id, }) => Some(FromSwarm::DialFailure(DialFailure { peer_id, - handler: map_into_handler(handler)?, error, + connection_id, })), FromSwarm::ListenFailure(ListenFailure { local_addr, send_back_addr, + connection_id, error, - handler, }) => Some(FromSwarm::ListenFailure(ListenFailure { local_addr, send_back_addr, + connection_id, error, - handler: map_into_handler(handler)?, })), FromSwarm::NewListener(NewListener { listener_id }) => { Some(FromSwarm::NewListener(NewListener { listener_id })) diff --git a/swarm/src/behaviour/either.rs b/swarm/src/behaviour/either.rs index fb463661d74..f77a3e02cd7 100644 --- a/swarm/src/behaviour/either.rs +++ b/swarm/src/behaviour/either.rs @@ -21,6 +21,7 @@ use crate::behaviour::{self, NetworkBehaviour, NetworkBehaviourAction, PollParameters}; use crate::connection::ConnectionId; use crate::handler::either::IntoEitherHandler; +use crate::THandlerInEvent; use crate::THandlerOutEvent; use either::Either; use libp2p_core::{Multiaddr, PeerId}; @@ -51,20 +52,14 @@ where fn on_swarm_event(&mut self, event: behaviour::FromSwarm) { match self { - Either::Left(b) => b.on_swarm_event(event.map_handler( - |h| h.unwrap_left(), - |h| match h { - Either::Left(h) => h, - Either::Right(_) => unreachable!(), - }, - )), - Either::Right(b) => b.on_swarm_event(event.map_handler( - |h| h.unwrap_right(), - |h| match h { - Either::Right(h) => h, - Either::Left(_) => unreachable!(), - }, - )), + Either::Left(b) => b.on_swarm_event(event.map_handler(|h| match h { + Either::Left(h) => h, + Either::Right(_) => unreachable!(), + })), + Either::Right(b) => b.on_swarm_event(event.map_handler(|h| match h { + Either::Right(h) => h, + Either::Left(_) => unreachable!(), + })), } } @@ -89,14 +84,14 @@ where &mut self, cx: &mut Context<'_>, params: &mut impl PollParameters, - ) -> Poll> { + ) -> Poll>> { let event = match self { Either::Left(behaviour) => futures::ready!(behaviour.poll(cx, params)) .map_out(Either::Left) - .map_handler_and_in(IntoEitherHandler::Left, Either::Left), + .map_in(Either::Left), Either::Right(behaviour) => futures::ready!(behaviour.poll(cx, params)) .map_out(Either::Right) - .map_handler_and_in(IntoEitherHandler::Right, Either::Right), + .map_in(Either::Right), }; Poll::Ready(event) diff --git a/swarm/src/behaviour/toggle.rs b/swarm/src/behaviour/toggle.rs index b35765da080..1546a16bc86 100644 --- a/swarm/src/behaviour/toggle.rs +++ b/swarm/src/behaviour/toggle.rs @@ -26,7 +26,9 @@ use crate::handler::{ IntoConnectionHandler, KeepAlive, ListenUpgradeError, SubstreamProtocol, }; use crate::upgrade::SendWrapper; -use crate::{NetworkBehaviour, NetworkBehaviourAction, PollParameters, THandlerOutEvent}; +use crate::{ + NetworkBehaviour, NetworkBehaviourAction, PollParameters, THandlerInEvent, THandlerOutEvent, +}; use either::Either; use futures::future; use libp2p_core::{upgrade::DeniedUpgrade, ConnectedPoint, Multiaddr, PeerId}; @@ -84,7 +86,7 @@ where fn on_swarm_event(&mut self, event: FromSwarm) { if let Some(behaviour) = &mut self.inner { - if let Some(event) = event.maybe_map_handler(|h| h.inner, |h| h.inner) { + if let Some(event) = event.maybe_map_handler(|h| h.inner) { behaviour.on_swarm_event(event); } } @@ -105,11 +107,9 @@ where &mut self, cx: &mut Context<'_>, params: &mut impl PollParameters, - ) -> Poll> { + ) -> Poll>> { if let Some(inner) = self.inner.as_mut() { - inner.poll(cx, params).map(|action| { - action.map_handler(|h| ToggleIntoConnectionHandler { inner: Some(h) }) - }) + inner.poll(cx, params) } else { Poll::Pending } diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index 548ece04d9a..ed132accc3e 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -21,7 +21,6 @@ use crate::connection::{Connection, ConnectionId, PendingPoint}; use crate::{ - behaviour::THandlerInEvent, connection::{ Connected, ConnectionError, ConnectionLimit, IncomingInfo, PendingConnectionError, PendingInboundConnectionError, PendingOutboundConnectionError, @@ -72,7 +71,9 @@ impl ExecSwitch { } } - fn spawn(&mut self, task: BoxFuture<'static, ()>) { + fn spawn(&mut self, task: impl Future + Send + 'static) { + let task = task.boxed(); + match self { Self::Executor(executor) => executor.exec(task), Self::LocalSpawn(local) => local.push(task), @@ -100,7 +101,7 @@ where >, /// The pending connections that are currently being negotiated. - pending: HashMap>, + pending: HashMap, /// Size of the task command buffer (per task). task_command_buffer_size: usize, @@ -187,11 +188,9 @@ impl EstablishedConnection { } } -struct PendingConnection { +struct PendingConnection { /// [`PeerId`] of the remote peer. peer_id: Option, - /// Handler to handle connection once no longer pending but established. - handler: THandler, endpoint: PendingPoint, /// When dropped, notifies the task which then knows to terminate. abort_notifier: Option>, @@ -199,7 +198,7 @@ struct PendingConnection { accepted_at: Instant, } -impl PendingConnection { +impl PendingConnection { fn is_for_same_remote_as(&self, other: PeerId) -> bool { self.peer_id.map_or(false, |peer| peer == other) } @@ -228,10 +227,7 @@ pub enum PoolEvent { id: ConnectionId, peer_id: PeerId, endpoint: ConnectedPoint, - /// List of other connections to the same peer. - /// - /// Note: Does not include the connection reported through this event. - other_established_connection_ids: Vec, + connection: StreamMuxerBox, /// [`Some`] when the new connection is an outgoing connection. /// Addresses are dialed in parallel. Contains the addresses and errors /// of dial attempts that failed before the one successful dial. @@ -269,8 +265,6 @@ pub enum PoolEvent { id: ConnectionId, /// The error that occurred. error: PendingOutboundConnectionError, - /// The handler that was supposed to handle the connection. - handler: THandler, /// The (expected) peer of the failed connection. peer: Option, }, @@ -285,8 +279,6 @@ pub enum PoolEvent { local_addr: Multiaddr, /// The error that occurred. error: PendingInboundConnectionError, - /// The handler that was supposed to handle the connection. - handler: THandler, }, /// A node has produced an event. @@ -346,7 +338,11 @@ where pub fn get_established( &mut self, id: ConnectionId, - ) -> Option<&mut EstablishedConnection>> { + ) -> Option< + &mut EstablishedConnection< + <::Handler as ConnectionHandler>::InEvent, + >, + > { self.established .values_mut() .find_map(|connections| connections.get_mut(&id)) @@ -410,15 +406,6 @@ where self.established.keys() } - fn spawn(&mut self, task: BoxFuture<'static, ()>) { - self.executor.spawn(task) - } -} - -impl Pool -where - THandler: IntoConnectionHandler, -{ /// Adds a pending outgoing connection to the pool in the form of a `Future` /// that establishes and negotiates the connection. /// @@ -436,32 +423,26 @@ where >, >, peer: Option, - handler: THandler, role_override: Endpoint, dial_concurrency_factor_override: Option, - ) -> Result { - if let Err(limit) = self.counters.check_max_pending_outgoing() { - return Err((limit, handler)); - }; + connection_id: ConnectionId, + ) -> Result<(), ConnectionLimit> { + self.counters.check_max_pending_outgoing()?; let dial = ConcurrentDial::new( dials, dial_concurrency_factor_override.unwrap_or(self.dial_concurrency_factor), ); - let connection_id = ConnectionId::next(); - let (abort_notifier, abort_receiver) = oneshot::channel(); - self.spawn( - task::new_for_pending_outgoing_connection( + self.executor + .spawn(task::new_for_pending_outgoing_connection( connection_id, dial, abort_receiver, self.pending_connection_events_tx.clone(), - ) - .boxed(), - ); + )); let endpoint = PendingPoint::Dialer { role_override }; @@ -470,13 +451,13 @@ where connection_id, PendingConnection { peer_id: peer, - handler, endpoint, abort_notifier: Some(abort_notifier), accepted_at: Instant::now(), }, ); - Ok(connection_id) + + Ok(()) } /// Adds a pending incoming connection to the pool in the form of a @@ -487,44 +468,85 @@ where pub fn add_incoming( &mut self, future: TFut, - handler: THandler, info: IncomingInfo<'_>, - ) -> Result + connection_id: ConnectionId, + ) -> Result<(), ConnectionLimit> where TFut: Future> + Send + 'static, { let endpoint = info.create_connected_point(); - if let Err(limit) = self.counters.check_max_pending_incoming() { - return Err((limit, handler)); - } - - let connection_id = ConnectionId::next(); + self.counters.check_max_pending_incoming()?; let (abort_notifier, abort_receiver) = oneshot::channel(); - self.spawn( - task::new_for_pending_incoming_connection( + self.executor + .spawn(task::new_for_pending_incoming_connection( connection_id, future, abort_receiver, self.pending_connection_events_tx.clone(), - ) - .boxed(), - ); + )); self.counters.inc_pending_incoming(); self.pending.insert( connection_id, PendingConnection { peer_id: None, - handler, endpoint: endpoint.into(), abort_notifier: Some(abort_notifier), accepted_at: Instant::now(), }, ); - Ok(connection_id) + Ok(()) + } + + pub fn spawn_connection( + &mut self, + id: ConnectionId, + obtained_peer_id: PeerId, + endpoint: &ConnectedPoint, + muxer: StreamMuxerBox, + handler: ::Handler, + ) { + let conns = self.established.entry(obtained_peer_id).or_default(); + self.counters.inc_established(endpoint); + + let (command_sender, command_receiver) = mpsc::channel(self.task_command_buffer_size); + let (event_sender, event_receiver) = mpsc::channel(self.per_connection_event_buffer_size); + + conns.insert( + id, + EstablishedConnection { + endpoint: endpoint.clone(), + sender: command_sender, + }, + ); + self.established_connection_events.push(event_receiver); + if let Some(waker) = self.no_established_connections_waker.take() { + waker.wake(); + } + + let connection = Connection::new( + muxer, + handler, + self.substream_upgrade_protocol_override, + self.max_negotiating_inbound_streams, + ); + + self.executor.spawn(task::new_for_established_connection( + id, + obtained_peer_id, + connection, + command_receiver, + event_sender, + )) + } + + pub fn close_connection(&mut self, muxer: StreamMuxerBox) { + self.executor.spawn(async move { + let _ = muxer.close().await; + }); } /// Polls the connection pool for events. @@ -614,7 +636,6 @@ where } => { let PendingConnection { peer_id: expected_peer_id, - handler, endpoint, abort_notifier: _, accepted_at, @@ -695,20 +716,17 @@ where }); if let Err(error) = error { - self.spawn( - poll_fn(move |cx| { - if let Err(e) = ready!(muxer.poll_close_unpin(cx)) { - log::debug!( - "Failed to close connection {:?} to peer {}: {:?}", - id, - obtained_peer_id, - e - ); - } - Poll::Ready(()) - }) - .boxed(), - ); + self.executor.spawn(poll_fn(move |cx| { + if let Err(e) = ready!(muxer.poll_close_unpin(cx)) { + log::debug!( + "Failed to close connection {:?} to peer {}: {:?}", + id, + obtained_peer_id, + e + ); + } + Poll::Ready(()) + })); match endpoint { ConnectedPoint::Dialer { .. } => { @@ -716,7 +734,6 @@ where id, error: error .map(|t| vec![(endpoint.get_remote_address().clone(), t)]), - handler, peer: expected_peer_id.or(Some(obtained_peer_id)), }) } @@ -727,7 +744,6 @@ where return Poll::Ready(PoolEvent::PendingInboundConnectionError { id, error, - handler, send_back_addr, local_addr, }) @@ -735,51 +751,13 @@ where }; } - // Add the connection to the pool. - let conns = self.established.entry(obtained_peer_id).or_default(); - let other_established_connection_ids = conns.keys().cloned().collect(); - self.counters.inc_established(&endpoint); - - let (command_sender, command_receiver) = - mpsc::channel(self.task_command_buffer_size); - let (event_sender, event_receiver) = - mpsc::channel(self.per_connection_event_buffer_size); - - conns.insert( - id, - EstablishedConnection { - endpoint: endpoint.clone(), - sender: command_sender, - }, - ); - self.established_connection_events.push(event_receiver); - if let Some(waker) = self.no_established_connections_waker.take() { - waker.wake(); - } - - let connection = Connection::new( - muxer, - handler.into_handler(&obtained_peer_id, &endpoint), - self.substream_upgrade_protocol_override, - self.max_negotiating_inbound_streams, - ); - - self.spawn( - task::new_for_established_connection( - id, - obtained_peer_id, - connection, - command_receiver, - event_sender, - ) - .boxed(), - ); let established_in = accepted_at.elapsed(); + return Poll::Ready(PoolEvent::ConnectionEstablished { peer_id: obtained_peer_id, endpoint, id, - other_established_connection_ids, + connection: muxer, concurrent_dial_errors, established_in, }); @@ -787,7 +765,6 @@ where task::PendingConnectionEvent::PendingFailed { id, error } => { if let Some(PendingConnection { peer_id, - handler, endpoint, abort_notifier: _, accepted_at: _, // Ignoring the time it took for the connection to fail. @@ -800,7 +777,6 @@ where return Poll::Ready(PoolEvent::PendingOutboundConnectionError { id, error, - handler, peer: peer_id, }); } @@ -814,7 +790,6 @@ where return Poll::Ready(PoolEvent::PendingInboundConnectionError { id, error, - handler, send_back_addr, local_addr, }); diff --git a/swarm/src/dial_opts.rs b/swarm/src/dial_opts.rs index 77425f00d36..0d4ce9eb8f0 100644 --- a/swarm/src/dial_opts.rs +++ b/swarm/src/dial_opts.rs @@ -19,6 +19,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use crate::ConnectionId; use libp2p_core::connection::Endpoint; use libp2p_core::multiaddr::Protocol; use libp2p_core::multihash::Multihash; @@ -43,6 +44,7 @@ pub struct DialOpts { extend_addresses_through_behaviour: bool, role_override: Endpoint, dial_concurrency_factor_override: Option, + connection_id: ConnectionId, } impl DialOpts { @@ -83,6 +85,14 @@ impl DialOpts { self.peer_id } + /// Get the [`ConnectionId`] of this dial attempt. + /// + /// All future events of this dial will be associated with this ID. + /// See [`DialFailure`](crate::DialFailure) and [`ConnectionEstablished`](crate::behaviour::ConnectionEstablished). + pub fn connection_id(&self) -> ConnectionId { + self.connection_id + } + /// Retrieves the [`PeerId`] from the [`DialOpts`] if specified or otherwise tries to parse it /// from the multihash in the `/p2p` part of the address, if present. /// @@ -207,6 +217,7 @@ impl WithPeerId { extend_addresses_through_behaviour: true, role_override: self.role_override, dial_concurrency_factor_override: self.dial_concurrency_factor_override, + connection_id: ConnectionId::next(), } } } @@ -262,6 +273,7 @@ impl WithPeerIdWithAddresses { extend_addresses_through_behaviour: self.extend_addresses_through_behaviour, role_override: self.role_override, dial_concurrency_factor_override: self.dial_concurrency_factor_override, + connection_id: ConnectionId::next(), } } } @@ -305,6 +317,7 @@ impl WithoutPeerIdWithAddress { extend_addresses_through_behaviour: false, role_override: self.role_override, dial_concurrency_factor_override: None, + connection_id: ConnectionId::next(), } } } diff --git a/swarm/src/dummy.rs b/swarm/src/dummy.rs index 017d3afb687..2a7c1a9481f 100644 --- a/swarm/src/dummy.rs +++ b/swarm/src/dummy.rs @@ -5,7 +5,7 @@ use crate::handler::{ }; use crate::{ ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, SubstreamProtocol, - THandlerOutEvent, + THandlerInEvent, THandlerOutEvent, }; use libp2p_core::upgrade::DeniedUpgrade; use libp2p_core::PeerId; @@ -37,7 +37,7 @@ impl NetworkBehaviour for Behaviour { &mut self, _: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll> { + ) -> Poll>> { Poll::Pending } diff --git a/swarm/src/keep_alive.rs b/swarm/src/keep_alive.rs index 448e63638c6..f645224937b 100644 --- a/swarm/src/keep_alive.rs +++ b/swarm/src/keep_alive.rs @@ -4,6 +4,7 @@ use crate::handler::{ ConnectionEvent, ConnectionHandlerEvent, FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive, SubstreamProtocol, }; +use crate::THandlerInEvent; use crate::THandlerOutEvent; use libp2p_core::upgrade::DeniedUpgrade; use libp2p_core::PeerId; @@ -40,7 +41,7 @@ impl NetworkBehaviour for Behaviour { &mut self, _: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll> { + ) -> Poll>> { Poll::Pending } diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index bdef527e677..f2cacec67ff 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -92,6 +92,7 @@ pub mod derive_prelude { pub use crate::NetworkBehaviour; pub use crate::NetworkBehaviourAction; pub use crate::PollParameters; + pub use crate::THandlerInEvent; pub use either::Either; pub use futures::prelude as futures; pub use libp2p_core::transport::ListenerId; @@ -162,7 +163,7 @@ type THandler = ::ConnectionHandler; /// Custom event that can be received by the [`ConnectionHandler`] of the /// [`NetworkBehaviour`]. -type THandlerInEvent = +pub type THandlerInEvent = < as IntoConnectionHandler>::Handler as ConnectionHandler>::InEvent; /// Custom event that can be produced by the [`ConnectionHandler`] of the [`NetworkBehaviour`]. @@ -326,12 +327,6 @@ where /// List of nodes for which we deny any incoming connection. banned_peers: HashSet, - /// Connections for which we withhold any reporting. These belong to banned peers. - /// - /// Note: Connections to a peer that are established at the time of banning that peer - /// are not added here. Instead they are simply closed. - banned_peer_connections: HashSet, - /// Pending event to be delivered to connection handlers /// (or dropped if the peer disconnected) before the `behaviour` /// can be polled again. @@ -502,19 +497,13 @@ where /// swarm.dial("/ip6/::1/tcp/12345".parse::().unwrap()); /// ``` pub fn dial(&mut self, opts: impl Into) -> Result<(), DialError> { - let handler = self.behaviour.new_handler(); - self.dial_with_handler(opts.into(), handler) - } + let dial_opts = opts.into(); - fn dial_with_handler( - &mut self, - dial_opts: DialOpts, - handler: ::ConnectionHandler, - ) -> Result<(), DialError> { let peer_id = dial_opts .get_or_parse_peer_id() .map_err(DialError::InvalidPeerId)?; let condition = dial_opts.peer_condition(); + let connection_id = dial_opts.connection_id(); let should_dial = match (condition, peer_id) { (PeerCondition::Always, _) => true, @@ -530,8 +519,8 @@ where self.behaviour .on_swarm_event(FromSwarm::DialFailure(DialFailure { peer_id, - handler, error: &e, + connection_id, })); return Err(e); @@ -544,8 +533,8 @@ where self.behaviour .on_swarm_event(FromSwarm::DialFailure(DialFailure { peer_id: Some(peer_id), - handler, error: &error, + connection_id, })); return Err(error); @@ -572,8 +561,8 @@ where self.behaviour .on_swarm_event(FromSwarm::DialFailure(DialFailure { peer_id, - handler, error: &error, + connection_id, })); return Err(error); }; @@ -607,18 +596,18 @@ where match self.pool.add_outgoing( dials, peer_id, - handler, dial_opts.role_override(), dial_opts.dial_concurrency_override(), + connection_id, ) { - Ok(_connection_id) => Ok(()), - Err((connection_limit, handler)) => { + Ok(()) => Ok(()), + Err(connection_limit) => { let error = DialError::ConnectionLimit(connection_limit); self.behaviour .on_swarm_event(FromSwarm::DialFailure(DialFailure { peer_id, - handler, error: &error, + connection_id, })); Err(error) @@ -762,65 +751,68 @@ where peer_id, id, endpoint, - other_established_connection_ids, + connection, concurrent_dial_errors, established_in, } => { if self.banned_peers.contains(&peer_id) { - // Mark the connection for the banned peer as banned, thus withholding any - // future events from the connection to the behaviour. - self.banned_peer_connections.insert(id); - self.pool.disconnect(peer_id); + self.pool.close_connection(connection); return Some(SwarmEvent::BannedPeer { peer_id, endpoint }); - } else { - let num_established = NonZeroU32::new( - u32::try_from(other_established_connection_ids.len() + 1).unwrap(), - ) - .expect("n + 1 is always non-zero; qed"); - let non_banned_established = other_established_connection_ids - .into_iter() - .filter(|conn_id| !self.banned_peer_connections.contains(conn_id)) - .count(); + } - log::debug!( - "Connection established: {:?} {:?}; Total (peer): {}. Total non-banned (peer): {}", + let handler = self + .behaviour + .new_handler() + .into_handler(&peer_id, &endpoint); + + let other_established_connection_ids = self + .pool + .iter_established_connections_of_peer(&peer_id) + .collect::>(); + let num_established = NonZeroU32::new( + u32::try_from(other_established_connection_ids.len() + 1).unwrap(), + ) + .expect("n + 1 is always non-zero; qed"); + + self.pool + .spawn_connection(id, peer_id, &endpoint, connection, handler); + + log::debug!( + "Connection established: {:?} {:?}; Total (peer): {}.", + peer_id, + endpoint, + num_established, + ); + let failed_addresses = concurrent_dial_errors + .as_ref() + .map(|es| { + es.iter() + .map(|(a, _)| a) + .cloned() + .collect::>() + }) + .unwrap_or_default(); + self.behaviour + .on_swarm_event(FromSwarm::ConnectionEstablished( + behaviour::ConnectionEstablished { peer_id, - endpoint, - num_established, - non_banned_established + 1, - ); - let failed_addresses = concurrent_dial_errors - .as_ref() - .map(|es| { - es.iter() - .map(|(a, _)| a) - .cloned() - .collect::>() - }) - .unwrap_or_default(); - self.behaviour - .on_swarm_event(FromSwarm::ConnectionEstablished( - behaviour::ConnectionEstablished { - peer_id, - connection_id: id, - endpoint: &endpoint, - failed_addresses: &failed_addresses, - other_established: non_banned_established, - }, - )); - return Some(SwarmEvent::ConnectionEstablished { - peer_id, - num_established, - endpoint, - concurrent_dial_errors, - established_in, - }); - } + connection_id: id, + endpoint: &endpoint, + failed_addresses: &failed_addresses, + other_established: other_established_connection_ids.len(), + }, + )); + return Some(SwarmEvent::ConnectionEstablished { + peer_id, + num_established, + endpoint, + concurrent_dial_errors, + established_in, + }); } PoolEvent::PendingOutboundConnectionError { - id: _, + id: connection_id, error, - handler, peer, } => { let error = error.into(); @@ -828,8 +820,8 @@ where self.behaviour .on_swarm_event(FromSwarm::DialFailure(DialFailure { peer_id: peer, - handler, error: &error, + connection_id, })); if let Some(peer) = peer { @@ -844,11 +836,10 @@ where }); } PoolEvent::PendingInboundConnectionError { - id: _, + id, send_back_addr, local_addr, error, - handler, } => { let error = error.into(); @@ -858,7 +849,7 @@ where local_addr: &local_addr, send_back_addr: &send_back_addr, error: &error, - handler, + connection_id: id, })); return Some(SwarmEvent::IncomingConnectionError { local_addr, @@ -892,21 +883,15 @@ where let endpoint = connected.endpoint; let num_established = u32::try_from(remaining_established_connection_ids.len()).unwrap(); - let conn_was_reported = !self.banned_peer_connections.remove(&id); - if conn_was_reported { - let remaining_non_banned = remaining_established_connection_ids - .into_iter() - .filter(|conn_id| !self.banned_peer_connections.contains(conn_id)) - .count(); - self.behaviour - .on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed { - peer_id, - connection_id: id, - endpoint: &endpoint, - handler, - remaining_established: remaining_non_banned, - })); - } + + self.behaviour + .on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed { + peer_id, + connection_id: id, + endpoint: &endpoint, + handler, + remaining_established: num_established as usize, + })); return Some(SwarmEvent::ConnectionClosed { peer_id, endpoint, @@ -915,12 +900,8 @@ where }); } PoolEvent::ConnectionEvent { peer_id, id, event } => { - if self.banned_peer_connections.contains(&id) { - log::debug!("Ignoring event from banned peer: {} {:?}.", peer_id, id); - } else { - self.behaviour - .on_connection_handler_event(peer_id, id, event); - } + self.behaviour + .on_connection_handler_event(peer_id, id, event); } PoolEvent::AddressChange { peer_id, @@ -928,15 +909,13 @@ where new_endpoint, old_endpoint, } => { - if !self.banned_peer_connections.contains(&id) { - self.behaviour - .on_swarm_event(FromSwarm::AddressChange(AddressChange { - peer_id, - connection_id: id, - old: &old_endpoint, - new: &new_endpoint, - })); - } + self.behaviour + .on_swarm_event(FromSwarm::AddressChange(AddressChange { + peer_id, + connection_id: id, + old: &old_endpoint, + new: &new_endpoint, + })); } } @@ -957,30 +936,30 @@ where local_addr, send_back_addr, } => { - let handler = self.behaviour.new_handler(); + let connection_id = ConnectionId::next(); + match self.pool.add_incoming( upgrade, - handler, IncomingInfo { local_addr: &local_addr, send_back_addr: &send_back_addr, }, + connection_id, ) { - Ok(_connection_id) => { + Ok(()) => { return Some(SwarmEvent::IncomingConnection { local_addr, send_back_addr, }); } - Err((connection_limit, handler)) => { + Err(connection_limit) => { let error = ListenError::ConnectionLimit(connection_limit); - self.behaviour .on_swarm_event(FromSwarm::ListenFailure(ListenFailure { local_addr: &local_addr, send_back_addr: &send_back_addr, error: &error, - handler, + connection_id, })); log::warn!("Incoming connection rejected: {:?}", connection_limit); } @@ -1063,15 +1042,15 @@ where fn handle_behaviour_event( &mut self, - event: NetworkBehaviourAction, + event: NetworkBehaviourAction>, ) -> Option>> { match event { NetworkBehaviourAction::GenerateEvent(event) => { return Some(SwarmEvent::Behaviour(event)) } - NetworkBehaviourAction::Dial { opts, handler } => { + NetworkBehaviourAction::Dial { opts } => { let peer_id = opts.get_or_parse_peer_id(); - if let Ok(()) = self.dial_with_handler(opts, handler) { + if let Ok(()) = self.dial(opts) { if let Ok(Some(peer_id)) = peer_id { return Some(SwarmEvent::Dialing(peer_id)); } @@ -1571,7 +1550,6 @@ where listened_addrs: HashMap::new(), external_addrs: Addresses::default(), banned_peers: HashSet::new(), - banned_peer_connections: HashSet::new(), pending_event: None, } } @@ -1972,24 +1950,22 @@ mod tests { { // Setup to test that new connections of banned peers are not reported. swarm1.dial(addr2.clone()).unwrap(); - s1_expected_conns += 1; stage = Stage::BannedDial; } } Stage::BannedDial => { - if swarm2.network_info().num_peers() == 1 { - // The banned connection was established. Check that it was not reported to - // the behaviour of the banning swarm. - assert_eq!( - swarm2.behaviour.on_connection_established.len(), s2_expected_conns, - "No additional closed connections should be reported for the banned peer" - ); + // The banned connection was established. Check that it was not reported to + // the behaviour of the banning swarm. + assert_eq!( + swarm2.behaviour.on_connection_established.len(), + s2_expected_conns, + "No additional closed connections should be reported for the banned peer" + ); - // Setup to test that the banned connection is not reported upon closing - // even if the peer is unbanned. - swarm2.unban_peer_id(swarm1_id); - stage = Stage::Unbanned; - } + // Setup to test that the banned connection is not reported upon closing + // even if the peer is unbanned. + swarm2.unban_peer_id(swarm1_id); + stage = Stage::Unbanned; } Stage::Unbanned => { if swarm2.network_info().num_peers() == 0 { @@ -1998,7 +1974,6 @@ mod tests { swarm2.behaviour.on_connection_closed.len(), s2_expected_conns, "No additional closed connections should be reported for the banned peer" ); - assert!(swarm2.banned_peer_connections.is_empty()); // Setup to test that a ban lifted does not affect future connections. for _ in 0..num_connections { diff --git a/swarm/src/test.rs b/swarm/src/test.rs index 104f505afd2..9c8a780fd0c 100644 --- a/swarm/src/test.rs +++ b/swarm/src/test.rs @@ -24,7 +24,7 @@ use crate::behaviour::{ }; use crate::{ ConnectionHandler, ConnectionId, IntoConnectionHandler, NetworkBehaviour, - NetworkBehaviourAction, PollParameters, THandlerOutEvent, + NetworkBehaviourAction, PollParameters, THandlerInEvent, THandlerOutEvent, }; use libp2p_core::{multiaddr::Multiaddr, transport::ListenerId, ConnectedPoint, PeerId}; use std::collections::HashMap; @@ -45,7 +45,7 @@ where /// The next action to return from `poll`. /// /// An action is only returned once. - pub next_action: Option>, + pub next_action: Option>, } impl MockBehaviour @@ -82,7 +82,7 @@ where &mut self, _: &mut Context, _: &mut impl PollParameters, - ) -> Poll> { + ) -> Poll>> { self.next_action.take().map_or(Poll::Pending, Poll::Ready) } @@ -387,14 +387,14 @@ where } FromSwarm::DialFailure(DialFailure { peer_id, - handler, + connection_id, error, }) => { self.on_dial_failure.push(peer_id); self.inner .on_swarm_event(FromSwarm::DialFailure(DialFailure { peer_id, - handler, + connection_id, error, })); } @@ -478,7 +478,7 @@ where &mut self, cx: &mut Context, args: &mut impl PollParameters, - ) -> Poll> { + ) -> Poll>> { self.poll += 1; self.inner.poll(cx, args) } diff --git a/swarm/tests/swarm_derive.rs b/swarm/tests/swarm_derive.rs index 48eda80454a..87e9ce764f8 100644 --- a/swarm/tests/swarm_derive.rs +++ b/swarm/tests/swarm_derive.rs @@ -21,7 +21,9 @@ use futures::StreamExt; use libp2p_identify as identify; use libp2p_ping as ping; -use libp2p_swarm::{behaviour::FromSwarm, dummy, NetworkBehaviour, SwarmEvent, THandlerOutEvent}; +use libp2p_swarm::{ + behaviour::FromSwarm, dummy, NetworkBehaviour, SwarmEvent, THandlerInEvent, THandlerOutEvent, +}; use std::fmt::Debug; /// Small utility to check that a type implements `NetworkBehaviour`. @@ -436,7 +438,7 @@ fn custom_out_event_no_type_parameters() { &mut self, _ctx: &mut Context, _: &mut impl PollParameters, - ) -> Poll> { + ) -> Poll>> { Poll::Pending }