Skip to content

Commit

Permalink
refactor(swarm)!: remove handler from NetworkBehaviourAction::Dial (
Browse files Browse the repository at this point in the history
#3328)

We create the `ConnectionId` for the new connection as part of `DialOpts`. This allows `NetworkBehaviour`s to accurately track state regarding their own dial attempts.

This patch is the main enabler of #3254. Removing the `handler` field will allow us to deprecate the `NetworkBehaviour::new_handler` function in favor of four new ones that give more control over the connection lifecycle.
  • Loading branch information
thomaseizinger authored Feb 14, 2023
1 parent 9247cfa commit caed1fe
Show file tree
Hide file tree
Showing 31 changed files with 583 additions and 966 deletions.
6 changes: 6 additions & 0 deletions core/src/muxing/boxed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ pub struct StreamMuxerBox {
inner: Pin<Box<dyn StreamMuxer<Substream = SubstreamBox, Error = io::Error> + Send>>,
}

impl fmt::Debug for StreamMuxerBox {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("StreamMuxerBox").finish_non_exhaustive()
}
}

/// Abstract type for asynchronous reading and writing.
///
/// A [`SubstreamBox`] erases the concrete type it is given and only retains its `AsyncRead`
Expand Down
19 changes: 7 additions & 12 deletions protocols/autonat/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use libp2p_swarm::{
ExpiredListenAddr, FromSwarm,
},
ConnectionId, ExternalAddresses, ListenAddresses, NetworkBehaviour, NetworkBehaviourAction,
PollParameters, THandlerOutEvent,
PollParameters, THandlerInEvent, THandlerOutEvent,
};
use std::{
collections::{HashMap, VecDeque},
Expand Down Expand Up @@ -208,10 +208,7 @@ pub struct Behaviour {
last_probe: Option<Instant>,

pending_actions: VecDeque<
NetworkBehaviourAction<
<Self as NetworkBehaviour>::OutEvent,
<Self as NetworkBehaviour>::ConnectionHandler,
>,
NetworkBehaviourAction<<Self as NetworkBehaviour>::OutEvent, THandlerInEvent<Self>>,
>,

probe_id: ProbeId,
Expand Down Expand Up @@ -389,14 +386,14 @@ impl Behaviour {
&mut self,
DialFailure {
peer_id,
handler,
connection_id,
error,
}: DialFailure<<Self as NetworkBehaviour>::ConnectionHandler>,
}: DialFailure,
) {
self.inner
.on_swarm_event(FromSwarm::DialFailure(DialFailure {
peer_id,
handler,
connection_id,
error,
}));
if let Some(event) = self.as_server().on_outbound_dial_error(peer_id, error) {
Expand Down Expand Up @@ -560,10 +557,8 @@ impl NetworkBehaviour for Behaviour {
}
}

type Action = NetworkBehaviourAction<
<Behaviour as NetworkBehaviour>::OutEvent,
<Behaviour as NetworkBehaviour>::ConnectionHandler,
>;
type Action =
NetworkBehaviourAction<<Behaviour as NetworkBehaviour>::OutEvent, THandlerInEvent<Behaviour>>;

// Trait implemented for `AsClient` and `AsServer` to handle events from the inner [`request_response::Behaviour`] Protocol.
trait HandleInnerEvent {
Expand Down
3 changes: 1 addition & 2 deletions protocols/autonat/src/behaviour/as_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use libp2p_request_response::{
};
use libp2p_swarm::{
dial_opts::{DialOpts, PeerCondition},
ConnectionId, DialError, NetworkBehaviour, NetworkBehaviourAction, PollParameters,
ConnectionId, DialError, NetworkBehaviourAction, PollParameters,
};
use std::{
collections::{HashMap, HashSet, VecDeque},
Expand Down Expand Up @@ -138,7 +138,6 @@ impl<'a> HandleInnerEvent for AsServer<'a> {
)
.addresses(addrs)
.build(),
handler: self.inner.new_handler(),
},
])
}
Expand Down
171 changes: 103 additions & 68 deletions protocols/dcutr/src/behaviour_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,13 @@ use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailu
use libp2p_swarm::dial_opts::{self, DialOpts};
use libp2p_swarm::{
ConnectionHandlerUpgrErr, ExternalAddresses, NetworkBehaviour, NetworkBehaviourAction,
NotifyHandler, PollParameters,
NotifyHandler, PollParameters, THandlerInEvent,
};
use libp2p_swarm::{ConnectionId, THandlerOutEvent};
use std::collections::{HashMap, HashSet, VecDeque};
use std::task::{Context, Poll};
use thiserror::Error;
use void::Void;

const MAX_NUMBER_OF_UPGRADE_ATTEMPTS: u8 = 3;

Expand Down Expand Up @@ -68,14 +69,20 @@ pub enum Error {

pub struct Behaviour {
/// Queue of actions to return when polled.
queued_events: VecDeque<NetworkBehaviourAction<Event, handler::Prototype>>,
queued_events: VecDeque<NetworkBehaviourAction<Event, Either<handler::relayed::Command, Void>>>,

/// All direct (non-relayed) connections.
direct_connections: HashMap<PeerId, HashSet<ConnectionId>>,

external_addresses: ExternalAddresses,

local_peer_id: PeerId,

direct_to_relayed_connections: HashMap<ConnectionId, ConnectionId>,

/// Indexed by the [`ConnectionId`] of the relayed connection and
/// the [`PeerId`] we are trying to establish a direct connection to.
outgoing_direct_connection_attempts: HashMap<(ConnectionId, PeerId), u8>,
}

impl Behaviour {
Expand All @@ -85,6 +92,8 @@ impl Behaviour {
direct_connections: Default::default(),
external_addresses: Default::default(),
local_peer_id,
direct_to_relayed_connections: Default::default(),
outgoing_direct_connection_attempts: Default::default(),
}
}

Expand Down Expand Up @@ -148,40 +157,57 @@ impl Behaviour {
fn on_dial_failure(
&mut self,
DialFailure {
peer_id, handler, ..
}: DialFailure<<Self as NetworkBehaviour>::ConnectionHandler>,
peer_id,
connection_id: failed_direct_connection,
..
}: DialFailure,
) {
if let handler::Prototype::DirectConnection {
relayed_connection_id,
role: handler::Role::Initiator { attempt },
} = handler
let peer_id = if let Some(peer_id) = peer_id {
peer_id
} else {
return;
};

let relayed_connection_id = if let Some(relayed_connection_id) = self
.direct_to_relayed_connections
.get(&failed_direct_connection)
{
let peer_id = peer_id.expect("Peer of `Prototype::DirectConnection` is always known.");
if attempt < MAX_NUMBER_OF_UPGRADE_ATTEMPTS {
self.queued_events
.push_back(NetworkBehaviourAction::NotifyHandler {
handler: NotifyHandler::One(relayed_connection_id),
peer_id,
event: Either::Left(handler::relayed::Command::Connect {
attempt: attempt + 1,
obs_addrs: self.observed_addreses(),
}),
})
} else {
self.queued_events.extend([
NetworkBehaviourAction::NotifyHandler {
peer_id,
handler: NotifyHandler::One(relayed_connection_id),
event: Either::Left(
handler::relayed::Command::UpgradeFinishedDontKeepAlive,
),
},
NetworkBehaviourAction::GenerateEvent(Event::DirectConnectionUpgradeFailed {
remote_peer_id: peer_id,
error: Error::Dial,
*relayed_connection_id
} else {
return;
};

let attempt = if let Some(attempt) = self
.outgoing_direct_connection_attempts
.get(&(relayed_connection_id, peer_id))
{
*attempt
} else {
return;
};

if attempt < MAX_NUMBER_OF_UPGRADE_ATTEMPTS {
self.queued_events
.push_back(NetworkBehaviourAction::NotifyHandler {
handler: NotifyHandler::One(relayed_connection_id),
peer_id,
event: Either::Left(handler::relayed::Command::Connect {
attempt: attempt + 1,
obs_addrs: self.observed_addreses(),
}),
]);
}
})
} else {
self.queued_events.extend([
NetworkBehaviourAction::NotifyHandler {
peer_id,
handler: NotifyHandler::One(relayed_connection_id),
event: Either::Left(handler::relayed::Command::UpgradeFinishedDontKeepAlive),
},
NetworkBehaviourAction::GenerateEvent(Event::DirectConnectionUpgradeFailed {
remote_peer_id: peer_id,
error: Error::Dial,
}),
]);
}
}

Expand Down Expand Up @@ -215,23 +241,34 @@ impl NetworkBehaviour for Behaviour {
type OutEvent = Event;

fn new_handler(&mut self) -> Self::ConnectionHandler {
handler::Prototype::UnknownConnection
handler::Prototype
}

fn on_connection_handler_event(
&mut self,
event_source: PeerId,
connection: ConnectionId,
connection_id: ConnectionId,
handler_event: THandlerOutEvent<Self>,
) {
let relayed_connection_id = match handler_event.as_ref() {
Either::Left(_) => connection_id,
Either::Right(_) => match self.direct_to_relayed_connections.get(&connection_id) {
None => {
// If the connection ID is unknown to us, it means we didn't create it so ignore any event coming from it.
return;
}
Some(relayed_connection_id) => *relayed_connection_id,
},
};

match handler_event {
Either::Left(handler::relayed::Event::InboundConnectRequest {
inbound_connect,
remote_addr,
}) => {
self.queued_events.extend([
NetworkBehaviourAction::NotifyHandler {
handler: NotifyHandler::One(connection),
handler: NotifyHandler::One(relayed_connection_id),
peer_id: event_source,
event: Either::Left(handler::relayed::Command::AcceptInboundConnect {
inbound_connect,
Expand All @@ -256,16 +293,17 @@ impl NetworkBehaviour for Behaviour {
));
}
Either::Left(handler::relayed::Event::InboundConnectNegotiated(remote_addrs)) => {
self.queued_events.push_back(NetworkBehaviourAction::Dial {
opts: DialOpts::peer_id(event_source)
.addresses(remote_addrs)
.condition(dial_opts::PeerCondition::Always)
.build(),
handler: handler::Prototype::DirectConnection {
relayed_connection_id: connection,
role: handler::Role::Listener,
},
});
let opts = DialOpts::peer_id(event_source)
.addresses(remote_addrs)
.condition(dial_opts::PeerCondition::Always)
.build();

let maybe_direct_connection_id = opts.connection_id();

self.direct_to_relayed_connections
.insert(maybe_direct_connection_id, relayed_connection_id);
self.queued_events
.push_back(NetworkBehaviourAction::Dial { opts });
}
Either::Left(handler::relayed::Event::OutboundNegotiationFailed { error }) => {
self.queued_events
Expand All @@ -276,27 +314,25 @@ impl NetworkBehaviour for Behaviour {
},
));
}
Either::Left(handler::relayed::Event::OutboundConnectNegotiated {
remote_addrs,
attempt,
}) => {
self.queued_events.push_back(NetworkBehaviourAction::Dial {
opts: DialOpts::peer_id(event_source)
.condition(dial_opts::PeerCondition::Always)
.addresses(remote_addrs)
.override_role()
.build(),
handler: handler::Prototype::DirectConnection {
relayed_connection_id: connection,
role: handler::Role::Initiator { attempt },
},
});
Either::Left(handler::relayed::Event::OutboundConnectNegotiated { remote_addrs }) => {
let opts = DialOpts::peer_id(event_source)
.condition(dial_opts::PeerCondition::Always)
.addresses(remote_addrs)
.override_role()
.build();

let maybe_direct_connection_id = opts.connection_id();

self.direct_to_relayed_connections
.insert(maybe_direct_connection_id, relayed_connection_id);
*self
.outgoing_direct_connection_attempts
.entry((relayed_connection_id, event_source))
.or_default() += 1;
self.queued_events
.push_back(NetworkBehaviourAction::Dial { opts });
}
Either::Right(Either::Left(
handler::direct::Event::DirectConnectionUpgradeSucceeded {
relayed_connection_id,
},
)) => {
Either::Right(handler::direct::Event::DirectConnectionEstablished) => {
self.queued_events.extend([
NetworkBehaviourAction::NotifyHandler {
peer_id: event_source,
Expand All @@ -312,15 +348,14 @@ impl NetworkBehaviour for Behaviour {
),
]);
}
Either::Right(Either::Right(event)) => void::unreachable(event),
};
}

fn poll(
&mut self,
_cx: &mut Context<'_>,
_: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
) -> Poll<NetworkBehaviourAction<Self::OutEvent, THandlerInEvent<Self>>> {
if let Some(event) = self.queued_events.pop_front() {
return Poll::Ready(event);
}
Expand Down
Loading

0 comments on commit caed1fe

Please sign in to comment.