From 310311aed134d747bf7ba31a3bb034d8d0899606 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 2 Nov 2023 20:17:37 +1100 Subject: [PATCH] Pass negotiated protocol directly to connection handler --- protocols/dcutr/src/handler/relayed.rs | 19 +- protocols/gossipsub/src/handler.rs | 13 +- protocols/identify/src/handler.rs | 16 +- protocols/kad/src/handler.rs | 19 +- protocols/perf/src/client/handler.rs | 12 +- protocols/perf/src/server/handler.rs | 9 +- protocols/ping/src/handler.rs | 11 +- protocols/relay/src/behaviour/handler.rs | 11 +- protocols/relay/src/priv_client/handler.rs | 11 +- protocols/request-response/src/handler.rs | 19 +- swarm/src/behaviour/toggle.rs | 47 ++- swarm/src/connection.rs | 93 +++-- swarm/src/dummy.rs | 10 +- swarm/src/handler.rs | 46 +-- swarm/src/handler/either.rs | 210 +++++----- swarm/src/handler/map_in.rs | 6 +- swarm/src/handler/map_out.rs | 6 +- swarm/src/handler/multi.rs | 451 --------------------- swarm/src/handler/one_shot.rs | 12 +- swarm/src/handler/pending.rs | 11 +- swarm/src/handler/select.rs | 292 ++++++------- swarm/src/upgrade/denied.rs | 2 +- swarm/tests/connection_close.rs | 7 +- 23 files changed, 412 insertions(+), 921 deletions(-) delete mode 100644 swarm/src/handler/multi.rs diff --git a/protocols/dcutr/src/handler/relayed.rs b/protocols/dcutr/src/handler/relayed.rs index 17de6280819..e8494645478 100644 --- a/protocols/dcutr/src/handler/relayed.rs +++ b/protocols/dcutr/src/handler/relayed.rs @@ -92,10 +92,7 @@ impl Handler { fn on_fully_negotiated_inbound( &mut self, - FullyNegotiatedInbound { - protocol: output, .. - }: FullyNegotiatedInbound< - ::InboundProtocol, + FullyNegotiatedInbound { stream: output, .. }: FullyNegotiatedInbound< ::InboundOpenInfo, >, ) { @@ -122,9 +119,7 @@ impl Handler { fn on_fully_negotiated_outbound( &mut self, - FullyNegotiatedOutbound { - protocol: stream, .. - }: FullyNegotiatedOutbound< + FullyNegotiatedOutbound { stream: stream, .. }: FullyNegotiatedOutbound< ::OutboundProtocol, ::OutboundOpenInfo, >, @@ -150,8 +145,7 @@ impl Handler { fn on_listen_upgrade_error( &mut self, ListenUpgradeError { error, .. }: ListenUpgradeError< - ::InboundOpenInfo, - ::InboundProtocol, + <::InboundProtocol as UpgradeInfo>::Info, >, ) { void::unreachable(error.into_inner()); @@ -290,12 +284,7 @@ impl ConnectionHandler for Handler { fn on_connection_event( &mut self, - event: ConnectionEvent< - Self::InboundProtocol, - Self::OutboundProtocol, - Self::InboundOpenInfo, - Self::OutboundOpenInfo, - >, + event: ConnectionEvent, ) { match event { ConnectionEvent::FullyNegotiatedInbound(fully_negotiated_inbound) => { diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index 747a5072911..397f5f01b57 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -192,7 +192,9 @@ impl EnabledHandler { fn on_fully_negotiated_outbound( &mut self, - FullyNegotiatedOutbound { protocol, .. }: FullyNegotiatedOutbound< + FullyNegotiatedOutbound { + stream: protocol, .. + }: FullyNegotiatedOutbound< ::OutboundProtocol, ::OutboundOpenInfo, >, @@ -455,12 +457,7 @@ impl ConnectionHandler for Handler { fn on_connection_event( &mut self, - event: ConnectionEvent< - Self::InboundProtocol, - Self::OutboundProtocol, - Self::InboundOpenInfo, - Self::OutboundOpenInfo, - >, + event: ConnectionEvent, ) { match self { Handler::Enabled(handler) => { @@ -492,7 +489,7 @@ impl ConnectionHandler for Handler { match event { ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound { - protocol, + stream: protocol, .. }) => match protocol { Either::Left(protocol) => handler.on_fully_negotiated_inbound(protocol), diff --git a/protocols/identify/src/handler.rs b/protocols/identify/src/handler.rs index 2d7312b0641..9186bdfc414 100644 --- a/protocols/identify/src/handler.rs +++ b/protocols/identify/src/handler.rs @@ -143,10 +143,7 @@ impl Handler { fn on_fully_negotiated_inbound( &mut self, - FullyNegotiatedInbound { - protocol: output, .. - }: FullyNegotiatedInbound< - ::InboundProtocol, + FullyNegotiatedInbound { stream: output, .. }: FullyNegotiatedInbound< ::InboundOpenInfo, >, ) { @@ -180,9 +177,7 @@ impl Handler { fn on_fully_negotiated_outbound( &mut self, - FullyNegotiatedOutbound { - protocol: output, .. - }: FullyNegotiatedOutbound< + FullyNegotiatedOutbound { stream: output, .. }: FullyNegotiatedOutbound< ::OutboundProtocol, ::OutboundOpenInfo, >, @@ -373,12 +368,7 @@ impl ConnectionHandler for Handler { fn on_connection_event( &mut self, - event: ConnectionEvent< - Self::InboundProtocol, - Self::OutboundProtocol, - Self::InboundOpenInfo, - Self::OutboundOpenInfo, - >, + event: ConnectionEvent, ) { match event { ConnectionEvent::FullyNegotiatedInbound(fully_negotiated_inbound) => { diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index c0b957d55b8..90efa0d80de 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -492,7 +492,10 @@ impl Handler { fn on_fully_negotiated_outbound( &mut self, - FullyNegotiatedOutbound { protocol, info: () }: FullyNegotiatedOutbound< + FullyNegotiatedOutbound { + stream: protocol, + info: (), + }: FullyNegotiatedOutbound< ::OutboundProtocol, ::OutboundOpenInfo, >, @@ -519,10 +522,9 @@ impl Handler { fn on_fully_negotiated_inbound( &mut self, - FullyNegotiatedInbound { protocol, .. }: FullyNegotiatedInbound< - ::InboundProtocol, - ::InboundOpenInfo, - >, + FullyNegotiatedInbound { + stream: protocol, .. + }: FullyNegotiatedInbound<::InboundOpenInfo>, ) { // If `self.allow_listening` is false, then we produced a `DeniedUpgrade` and `protocol` // is a `Void`. @@ -755,12 +757,7 @@ impl ConnectionHandler for Handler { fn on_connection_event( &mut self, - event: ConnectionEvent< - Self::InboundProtocol, - Self::OutboundProtocol, - Self::InboundOpenInfo, - Self::OutboundOpenInfo, - >, + event: ConnectionEvent, ) { match event { ConnectionEvent::FullyNegotiatedOutbound(fully_negotiated_outbound) => { diff --git a/protocols/perf/src/client/handler.rs b/protocols/perf/src/client/handler.rs index d1d46cfaf48..08e5ca99331 100644 --- a/protocols/perf/src/client/handler.rs +++ b/protocols/perf/src/client/handler.rs @@ -106,19 +106,15 @@ impl ConnectionHandler for Handler { fn on_connection_event( &mut self, - event: ConnectionEvent< - Self::InboundProtocol, - Self::OutboundProtocol, - Self::InboundOpenInfo, - Self::OutboundOpenInfo, - >, + event: ConnectionEvent, ) { match event { ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound { - protocol, .. + stream: protocol, + .. }) => void::unreachable(protocol), ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { - protocol, + stream: protocol, info: (), }) => { let Command { id, params } = self diff --git a/protocols/perf/src/server/handler.rs b/protocols/perf/src/server/handler.rs index 6681179fe1f..4674280fd39 100644 --- a/protocols/perf/src/server/handler.rs +++ b/protocols/perf/src/server/handler.rs @@ -78,16 +78,11 @@ impl ConnectionHandler for Handler { fn on_connection_event( &mut self, - event: ConnectionEvent< - Self::InboundProtocol, - Self::OutboundProtocol, - Self::InboundOpenInfo, - Self::OutboundOpenInfo, - >, + event: ConnectionEvent, ) { match event { ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound { - protocol, + stream: protocol, info: _, }) => { if self diff --git a/protocols/ping/src/handler.rs b/protocols/ping/src/handler.rs index b75a5114825..f23611baceb 100644 --- a/protocols/ping/src/handler.rs +++ b/protocols/ping/src/handler.rs @@ -24,7 +24,7 @@ use futures::prelude::*; use futures_timer::Delay; use libp2p_identity::PeerId; use libp2p_swarm::handler::{ - ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, + ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, UpgradeInfo, }; use libp2p_swarm::{ ConnectionHandler, ConnectionHandlerEvent, ReadyUpgrade, Stream, StreamUpgradeError, @@ -330,22 +330,23 @@ impl ConnectionHandler for Handler { fn on_connection_event( &mut self, event: ConnectionEvent< - Self::InboundProtocol, - Self::OutboundProtocol, + ::Info, + ::Info, Self::InboundOpenInfo, Self::OutboundOpenInfo, >, ) { match event { ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound { - protocol: mut stream, + mut stream, + protocol, .. }) => { stream.ignore_for_keep_alive(); self.inbound = Some(protocol::recv_ping(stream).boxed()); } ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { - protocol: mut stream, + stream: mut stream, .. }) => { stream.ignore_for_keep_alive(); diff --git a/protocols/relay/src/behaviour/handler.rs b/protocols/relay/src/behaviour/handler.rs index 0c21728713a..98af444f601 100644 --- a/protocols/relay/src/behaviour/handler.rs +++ b/protocols/relay/src/behaviour/handler.rs @@ -873,22 +873,17 @@ impl ConnectionHandler for Handler { fn on_connection_event( &mut self, - event: ConnectionEvent< - Self::InboundProtocol, - Self::OutboundProtocol, - Self::InboundOpenInfo, - Self::OutboundOpenInfo, - >, + event: ConnectionEvent, ) { match event { ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound { - protocol: stream, + stream: stream, .. }) => { self.on_fully_negotiated_inbound(stream); } ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { - protocol: stream, + stream: stream, .. }) => { self.on_fully_negotiated_outbound(stream); diff --git a/protocols/relay/src/priv_client/handler.rs b/protocols/relay/src/priv_client/handler.rs index 67dcb6bccf0..eea022ec344 100644 --- a/protocols/relay/src/priv_client/handler.rs +++ b/protocols/relay/src/priv_client/handler.rs @@ -474,16 +474,11 @@ impl ConnectionHandler for Handler { fn on_connection_event( &mut self, - event: ConnectionEvent< - Self::InboundProtocol, - Self::OutboundProtocol, - Self::InboundOpenInfo, - Self::OutboundOpenInfo, - >, + event: ConnectionEvent, ) { match event { ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound { - protocol: stream, + stream: stream, .. }) => { if self @@ -495,7 +490,7 @@ impl ConnectionHandler for Handler { } } ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { - protocol: stream, + stream: stream, .. }) => { let pending_request = self.pending_requests.pop_front().expect( diff --git a/protocols/request-response/src/handler.rs b/protocols/request-response/src/handler.rs index f4f5bf96c6c..934b0536565 100644 --- a/protocols/request-response/src/handler.rs +++ b/protocols/request-response/src/handler.rs @@ -123,12 +123,9 @@ where fn on_fully_negotiated_inbound( &mut self, FullyNegotiatedInbound { - protocol: (mut stream, protocol), + stream: (mut stream, protocol), info: (), - }: FullyNegotiatedInbound< - ::InboundProtocol, - ::InboundOpenInfo, - >, + }: FullyNegotiatedInbound<::InboundOpenInfo>, ) { let mut codec = self.codec.clone(); let request_id = self.next_inbound_request_id(); @@ -171,7 +168,7 @@ where fn on_fully_negotiated_outbound( &mut self, FullyNegotiatedOutbound { - protocol: (mut stream, protocol), + stream: (mut stream, protocol), info: (), }: FullyNegotiatedOutbound< ::OutboundProtocol, @@ -247,8 +244,7 @@ where fn on_listen_upgrade_error( &mut self, ListenUpgradeError { error, .. }: ListenUpgradeError< - ::InboundOpenInfo, - ::InboundProtocol, + <::InboundProtocol as UpgradeInfo>::Info, >, ) { void::unreachable(error) @@ -463,12 +459,7 @@ where fn on_connection_event( &mut self, - event: ConnectionEvent< - Self::InboundProtocol, - Self::OutboundProtocol, - Self::InboundOpenInfo, - Self::OutboundOpenInfo, - >, + event: ConnectionEvent, ) { match event { ConnectionEvent::FullyNegotiatedInbound(fully_negotiated_inbound) => { diff --git a/swarm/src/behaviour/toggle.rs b/swarm/src/behaviour/toggle.rs index cf5cc5c2155..970e0e5b937 100644 --- a/swarm/src/behaviour/toggle.rs +++ b/swarm/src/behaviour/toggle.rs @@ -23,13 +23,13 @@ use crate::connection::ConnectionId; use crate::handler::{ AddressChange, ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, ListenUpgradeError, SubstreamProtocol, + UpgradeInfo, }; use crate::upgrade::DeniedUpgrade; use crate::{ ConnectionDenied, NetworkBehaviour, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, }; use either::Either; -use futures::future; use libp2p_core::{Endpoint, Multiaddr}; use libp2p_identity::PeerId; use std::{task::Context, task::Poll}; @@ -198,16 +198,17 @@ where fn on_fully_negotiated_inbound( &mut self, FullyNegotiatedInbound { - protocol: out, + protocol, + stream, info, }: FullyNegotiatedInbound< - ::InboundProtocol, + <::InboundProtocol as UpgradeInfo>::Info, ::InboundOpenInfo, >, ) { - let out = match out { - future::Either::Left(out) => out, - future::Either::Right(v) => void::unreachable(v), + let protocol = match protocol { + Either::Left(out) => out, + Either::Right(v) => todo!("should be void but doesn't implement `AsRef`"), }; if let Either::Left(info) = info { @@ -216,8 +217,9 @@ where .expect("Can't receive an inbound substream if disabled; QED") .on_connection_event(ConnectionEvent::FullyNegotiatedInbound( FullyNegotiatedInbound { - protocol: out, + stream, info, + protocol, }, )); } else { @@ -229,7 +231,6 @@ where &mut self, ListenUpgradeError { info, error: err }: ListenUpgradeError< ::InboundOpenInfo, - ::InboundProtocol, >, ) { let (inner, info) = match (self.inner.as_mut(), info) { @@ -245,16 +246,16 @@ where `on_listen_upgrade_error` in disabled state.", ), }; - - let err = match err { - Either::Left(e) => e, - Either::Right(v) => void::unreachable(v), - }; - - inner.on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError { - info, - error: err, - })); + // + // let err = match err { + // Either::Left(e) => e, + // Either::Right(v) => void::unreachable(v), + // }; + // + // inner.on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError { + // info, + // error: err, + // })); } } @@ -316,8 +317,8 @@ where fn on_connection_event( &mut self, event: ConnectionEvent< - Self::InboundProtocol, - Self::OutboundProtocol, + ::Info, + ::Info, Self::InboundOpenInfo, Self::OutboundOpenInfo, >, @@ -327,15 +328,17 @@ where self.on_fully_negotiated_inbound(fully_negotiated_inbound) } ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { - protocol: out, + protocol: protocol, info, + stream, }) => self .inner .as_mut() .expect("Can't receive an outbound substream if disabled; QED") .on_connection_event(ConnectionEvent::FullyNegotiatedOutbound( FullyNegotiatedOutbound { - protocol: out, + protocol, + stream, info, }, )), diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 368884ca002..355ea822c9f 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -59,6 +59,7 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::task::Waker; use std::time::Duration; use std::{fmt, io, mem, pin::Pin, task::Context, task::Poll}; +use void::Void; static NEXT_CONNECTION_ID: AtomicUsize = AtomicUsize::new(1); @@ -117,21 +118,11 @@ where /// The underlying handler. handler: THandler, /// Futures that upgrade incoming substreams. - negotiating_in: FuturesUnordered< - StreamUpgrade< - THandler::InboundOpenInfo, - ::Output, - ::Error, - >, - >, + negotiating_in: + FuturesUnordered>, /// Futures that upgrade outgoing substreams. - negotiating_out: FuturesUnordered< - StreamUpgrade< - THandler::OutboundOpenInfo, - ::Output, - ::Error, - >, - >, + negotiating_out: + FuturesUnordered>, /// The currently planned connection & handler shutdown. shutdown: Shutdown, /// The substream upgrade protocol override, if any. @@ -315,9 +306,13 @@ where // In case the [`ConnectionHandler`] can not make any more progress, poll the negotiating outbound streams. match negotiating_out.poll_next_unpin(cx) { Poll::Pending | Poll::Ready(None) => {} - Poll::Ready(Some((info, Ok(protocol)))) => { + Poll::Ready(Some((info, Ok((protocol, stream))))) => { handler.on_connection_event(ConnectionEvent::FullyNegotiatedOutbound( - FullyNegotiatedOutbound { protocol, info }, + FullyNegotiatedOutbound { + stream, + protocol, + info, + }, )); continue; } @@ -333,9 +328,13 @@ where // make any more progress, poll the negotiating inbound streams. match negotiating_in.poll_next_unpin(cx) { Poll::Pending | Poll::Ready(None) => {} - Poll::Ready(Some((info, Ok(protocol)))) => { + Poll::Ready(Some((info, Ok((protocol, stream))))) => { handler.on_connection_event(ConnectionEvent::FullyNegotiatedInbound( - FullyNegotiatedInbound { protocol, info }, + FullyNegotiatedInbound { + stream, + protocol, + info, + }, )); continue; } @@ -522,24 +521,24 @@ impl<'a> IncomingInfo<'a> { } } -struct StreamUpgrade { +struct StreamUpgrade { user_data: Option, timeout: Delay, - upgrade: BoxFuture<'static, Result>>, + upgrade: BoxFuture<'static, Result<(Upgrade::Info, Stream), StreamUpgradeError>>, } -impl StreamUpgrade { - fn new_outbound( +impl StreamUpgrade +where + Upgrade: UpgradeInfo, +{ + fn new_outbound( substream: SubstreamBox, user_data: UserData, timeout: Delay, upgrade: Upgrade, version_override: Option, counter: ActiveStreamCounter, - ) -> Self - where - Upgrade: OutboundUpgrade, - { + ) -> Self { let effective_version = match version_override { Some(version_override) if version_override != upgrade::Version::default() => { log::debug!( @@ -566,26 +565,23 @@ impl StreamUpgrade { .await .map_err(to_stream_upgrade_error)?; - let output = upgrade - .upgrade_outbound(Stream::new(stream, counter), info) - .await - .map_err(StreamUpgradeError::Apply)?; + let stream = Stream::new(stream, counter); - Ok(output) + Ok((info, stream)) }), } } } -impl StreamUpgrade { - fn new_inbound( +impl StreamUpgrade +where + Upgrade: UpgradeInfo, +{ + fn new_inbound( substream: SubstreamBox, protocol: SubstreamProtocol, counter: ActiveStreamCounter, - ) -> Self - where - Upgrade: InboundUpgrade, - { + ) -> Self { let timeout = *protocol.timeout(); let (upgrade, open_info) = protocol.into_upgrade(); let protocols = upgrade.protocol_info().into_iter_send(); @@ -599,12 +595,9 @@ impl StreamUpgrade { .await .map_err(to_stream_upgrade_error)?; - let output = upgrade - .upgrade_inbound(Stream::new(stream, counter), info) - .await - .map_err(StreamUpgradeError::Apply)?; + let stream = Stream::new(stream, counter); - Ok(output) + Ok((info, stream)) }), } } @@ -620,10 +613,16 @@ fn to_stream_upgrade_error(e: NegotiationError) -> StreamUpgradeError { } } -impl Unpin for StreamUpgrade {} +impl Unpin for StreamUpgrade where P: UpgradeInfo {} -impl Future for StreamUpgrade { - type Output = (UserData, Result>); +impl Future for StreamUpgrade +where + P: UpgradeInfo, +{ + type Output = ( + UserData, + Result<(P::Info, Stream), StreamUpgradeError>, + ); fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { match self.timeout.poll_unpin(cx) { @@ -1163,11 +1162,11 @@ mod tests { ) { match event { ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound { - protocol, + stream: protocol, .. }) => void::unreachable(protocol), ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { - protocol, + stream: protocol, .. }) => void::unreachable(protocol), ConnectionEvent::DialUpgradeError(DialUpgradeError { error, .. }) => { diff --git a/swarm/src/dummy.rs b/swarm/src/dummy.rs index 0a7f3a5e51c..6a32e761e99 100644 --- a/swarm/src/dummy.rs +++ b/swarm/src/dummy.rs @@ -1,7 +1,7 @@ use crate::behaviour::{FromSwarm, NetworkBehaviour, ToSwarm}; use crate::connection::ConnectionId; use crate::handler::{ - ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, + ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, UpgradeInfo, }; use crate::upgrade::DeniedUpgrade; use crate::{ @@ -111,8 +111,8 @@ impl crate::handler::ConnectionHandler for ConnectionHandler { fn on_connection_event( &mut self, event: ConnectionEvent< - Self::InboundProtocol, - Self::OutboundProtocol, + ::Info, + ::Info, Self::InboundOpenInfo, Self::OutboundOpenInfo, >, @@ -120,10 +120,10 @@ impl crate::handler::ConnectionHandler for ConnectionHandler { match event { ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound { protocol, .. - }) => void::unreachable(protocol), + }) => todo!("should be void but doesn't implement `AsRef`"), ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { protocol, .. - }) => void::unreachable(protocol), + }) => todo!("should be void but doesn't implement `AsRef`"), ConnectionEvent::DialUpgradeError(DialUpgradeError { info: _, error }) => match error { StreamUpgradeError::Timeout => unreachable!(), StreamUpgradeError::Apply(e) => void::unreachable(e), diff --git a/swarm/src/handler.rs b/swarm/src/handler.rs index 1a13aac2155..e52f645504f 100644 --- a/swarm/src/handler.rs +++ b/swarm/src/handler.rs @@ -41,7 +41,6 @@ pub mod either; mod map_in; mod map_out; -pub mod multi; mod one_shot; mod pending; mod select; @@ -53,7 +52,7 @@ pub use one_shot::{OneShotHandler, OneShotHandlerConfig}; pub use pending::PendingConnectionHandler; pub use select::ConnectionHandlerSelect; -use crate::StreamProtocol; +use crate::{Stream, StreamProtocol}; use ::either::Either; use libp2p_core::Multiaddr; use once_cell::sync::Lazy; @@ -63,6 +62,7 @@ use std::collections::hash_set::{Difference, Intersection}; use std::collections::HashSet; use std::iter::Peekable; use std::{error, fmt, io, task::Context, task::Poll, time::Duration}; +use void::Void; /// A handler for a set of protocols used on a connection with a remote. /// @@ -105,9 +105,9 @@ pub trait ConnectionHandler: Send + 'static { /// The type of errors returned by [`ConnectionHandler::poll`]. type Error: error::Error + fmt::Debug + Send + 'static; /// The inbound upgrade for the protocol(s) used by the handler. - type InboundProtocol: InboundUpgrade; + type InboundProtocol: UpgradeInfo; /// The outbound upgrade for the protocol(s) used by the handler. - type OutboundProtocol: OutboundUpgrade; + type OutboundProtocol: UpgradeInfo; /// The type of additional information returned from `listen_protocol`. type InboundOpenInfo: Send + 'static; /// The type of additional information passed to an `OutboundSubstreamRequest`. @@ -204,8 +204,8 @@ pub trait ConnectionHandler: Send + 'static { fn on_connection_event( &mut self, event: ConnectionEvent< - Self::InboundProtocol, - Self::OutboundProtocol, + ::Info, + ::Info, Self::InboundOpenInfo, Self::OutboundOpenInfo, >, @@ -214,7 +214,7 @@ pub trait ConnectionHandler: Send + 'static { /// Enumeration with the list of the possible stream events /// to pass to [`on_connection_event`](ConnectionHandler::on_connection_event). -pub enum ConnectionEvent<'a, IP: InboundUpgrade, OP: OutboundUpgrade, IOI, OOI> { +pub enum ConnectionEvent<'a, IP, OP, IOI, OOI> { /// Informs the handler about the output of a successful upgrade on a new inbound substream. FullyNegotiatedInbound(FullyNegotiatedInbound), /// Informs the handler about the output of a successful upgrade on a new outbound stream. @@ -222,9 +222,9 @@ pub enum ConnectionEvent<'a, IP: InboundUpgrade, OP: OutboundUpgrade, IOI, OOI> /// Informs the handler about a change in the address of the remote. AddressChange(AddressChange<'a>), /// Informs the handler that upgrading an outbound substream to the given protocol has failed. - DialUpgradeError(DialUpgradeError), + DialUpgradeError(DialUpgradeError), /// Informs the handler that upgrading an inbound substream to the given protocol has failed. - ListenUpgradeError(ListenUpgradeError), + ListenUpgradeError(ListenUpgradeError), /// The local [`ConnectionHandler`] added or removed support for one or more protocols. LocalProtocolsChange(ProtocolsChange<'a>), /// The remote [`ConnectionHandler`] now supports a different set of protocols. @@ -233,12 +233,8 @@ pub enum ConnectionEvent<'a, IP: InboundUpgrade, OP: OutboundUpgrade, IOI, OOI> impl<'a, IP, OP, IOI, OOI> fmt::Debug for ConnectionEvent<'a, IP, OP, IOI, OOI> where - IP: InboundUpgrade + fmt::Debug, - IP::Output: fmt::Debug, - IP::Error: fmt::Debug, - OP: OutboundUpgrade + fmt::Debug, - OP::Output: fmt::Debug, - OP::Error: fmt::Debug, + IP: fmt::Debug, + OP: fmt::Debug, IOI: fmt::Debug, OOI: fmt::Debug, { @@ -267,7 +263,7 @@ where } } -impl<'a, IP: InboundUpgrade, OP: OutboundUpgrade, IOI, OOI> ConnectionEvent<'a, IP, OP, IOI, OOI> { +impl<'a, IP, OP, IOI, OOI> ConnectionEvent<'a, IP, OP, IOI, OOI> { /// Whether the event concerns an outbound stream. pub fn is_outbound(&self) -> bool { match self { @@ -306,8 +302,9 @@ impl<'a, IP: InboundUpgrade, OP: OutboundUpgrade, IOI, OOI> ConnectionEvent<'a, /// [`ConnectionHandler`] implementation to stop a malicious remote node to open and keep alive /// an excessive amount of inbound substreams. #[derive(Debug)] -pub struct FullyNegotiatedInbound { - pub protocol: IP::Output, +pub struct FullyNegotiatedInbound { + pub stream: Stream, + pub protocol: P, pub info: IOI, } @@ -316,8 +313,9 @@ pub struct FullyNegotiatedInbound { /// The `protocol` field is the information that was previously passed to /// [`ConnectionHandlerEvent::OutboundSubstreamRequest`]. #[derive(Debug)] -pub struct FullyNegotiatedOutbound { - pub protocol: OP::Output, +pub struct FullyNegotiatedOutbound { + pub stream: Stream, + pub protocol: P, pub info: OOI, } @@ -446,17 +444,17 @@ impl<'a> Iterator for ProtocolsRemoved<'a> { /// [`ConnectionEvent`] variant that informs the handler /// that upgrading an outbound substream to the given protocol has failed. #[derive(Debug)] -pub struct DialUpgradeError { +pub struct DialUpgradeError { pub info: OOI, - pub error: StreamUpgradeError, + pub error: StreamUpgradeError, } /// [`ConnectionEvent`] variant that informs the handler /// that upgrading an inbound substream to the given protocol has failed. #[derive(Debug)] -pub struct ListenUpgradeError { +pub struct ListenUpgradeError { pub info: IOI, - pub error: IP::Error, + pub error: Void, } /// Configuration of inbound or outbound substream protocol(s) diff --git a/swarm/src/handler/either.rs b/swarm/src/handler/either.rs index 2c42498b895..2e906b7bfe8 100644 --- a/swarm/src/handler/either.rs +++ b/swarm/src/handler/either.rs @@ -20,7 +20,7 @@ use crate::handler::{ ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, FullyNegotiatedInbound, - InboundUpgrade, ListenUpgradeError, SubstreamProtocol, + InboundUpgrade, ListenUpgradeError, SubstreamProtocol, UpgradeInfo, }; use either::Either; use futures::future; @@ -34,37 +34,36 @@ where pub(crate) fn transpose( self, ) -> Either, FullyNegotiatedInbound> { - match self { - FullyNegotiatedInbound { - protocol: future::Either::Left(protocol), - info: Either::Left(info), - } => Either::Left(FullyNegotiatedInbound { protocol, info }), - FullyNegotiatedInbound { - protocol: future::Either::Right(protocol), - info: Either::Right(info), - } => Either::Right(FullyNegotiatedInbound { protocol, info }), - _ => unreachable!(), - } + // match self { + // FullyNegotiatedInbound { + // protocol: future::Either::Left(protocol), + // info: Either::Left(info), + // } => Either::Left(FullyNegotiatedInbound { protocol, info }), + // FullyNegotiatedInbound { + // protocol: future::Either::Right(protocol), + // info: Either::Right(info), + // } => Either::Right(FullyNegotiatedInbound { protocol, info }), + // _ => unreachable!(), + // } + todo!() } } -impl ListenUpgradeError, Either> -where - RIP: InboundUpgrade, - LIP: InboundUpgrade, -{ - fn transpose(self) -> Either, ListenUpgradeError> { - match self { - ListenUpgradeError { - error: Either::Left(error), - info: Either::Left(info), - } => Either::Left(ListenUpgradeError { error, info }), - ListenUpgradeError { - error: Either::Right(error), - info: Either::Right(info), - } => Either::Right(ListenUpgradeError { error, info }), - _ => unreachable!(), - } +impl ListenUpgradeError> { + fn transpose(self) -> Either, ListenUpgradeError> { + // match self { + // ListenUpgradeError { + // error: Either::Left(error), + // info: Either::Left(info), + // } => Either::Left(ListenUpgradeError { error, info }), + // ListenUpgradeError { + // error: Either::Right(error), + // info: Either::Right(info), + // } => Either::Right(ListenUpgradeError { error, info }), + // _ => unreachable!(), + // } + + todo!() } } @@ -141,85 +140,86 @@ where fn on_connection_event( &mut self, event: ConnectionEvent< - Self::InboundProtocol, - Self::OutboundProtocol, + ::Info, + ::Info, Self::InboundOpenInfo, Self::OutboundOpenInfo, >, ) { - match event { - ConnectionEvent::FullyNegotiatedInbound(fully_negotiated_inbound) => { - match (fully_negotiated_inbound.transpose(), self) { - (Either::Left(fully_negotiated_inbound), Either::Left(handler)) => handler - .on_connection_event(ConnectionEvent::FullyNegotiatedInbound( - fully_negotiated_inbound, - )), - (Either::Right(fully_negotiated_inbound), Either::Right(handler)) => handler - .on_connection_event(ConnectionEvent::FullyNegotiatedInbound( - fully_negotiated_inbound, - )), - _ => unreachable!(), - } - } - ConnectionEvent::FullyNegotiatedOutbound(fully_negotiated_outbound) => { - match (fully_negotiated_outbound.transpose(), self) { - (Either::Left(fully_negotiated_outbound), Either::Left(handler)) => handler - .on_connection_event(ConnectionEvent::FullyNegotiatedOutbound( - fully_negotiated_outbound, - )), - (Either::Right(fully_negotiated_outbound), Either::Right(handler)) => handler - .on_connection_event(ConnectionEvent::FullyNegotiatedOutbound( - fully_negotiated_outbound, - )), - _ => unreachable!(), - } - } - ConnectionEvent::DialUpgradeError(dial_upgrade_error) => { - match (dial_upgrade_error.transpose(), self) { - (Either::Left(dial_upgrade_error), Either::Left(handler)) => handler - .on_connection_event(ConnectionEvent::DialUpgradeError(dial_upgrade_error)), - (Either::Right(dial_upgrade_error), Either::Right(handler)) => handler - .on_connection_event(ConnectionEvent::DialUpgradeError(dial_upgrade_error)), - _ => unreachable!(), - } - } - ConnectionEvent::ListenUpgradeError(listen_upgrade_error) => { - match (listen_upgrade_error.transpose(), self) { - (Either::Left(listen_upgrade_error), Either::Left(handler)) => handler - .on_connection_event(ConnectionEvent::ListenUpgradeError( - listen_upgrade_error, - )), - (Either::Right(listen_upgrade_error), Either::Right(handler)) => handler - .on_connection_event(ConnectionEvent::ListenUpgradeError( - listen_upgrade_error, - )), - _ => unreachable!(), - } - } - ConnectionEvent::AddressChange(address_change) => match self { - Either::Left(handler) => { - handler.on_connection_event(ConnectionEvent::AddressChange(address_change)) - } - Either::Right(handler) => { - handler.on_connection_event(ConnectionEvent::AddressChange(address_change)) - } - }, - ConnectionEvent::LocalProtocolsChange(supported_protocols) => match self { - Either::Left(handler) => handler.on_connection_event( - ConnectionEvent::LocalProtocolsChange(supported_protocols), - ), - Either::Right(handler) => handler.on_connection_event( - ConnectionEvent::LocalProtocolsChange(supported_protocols), - ), - }, - ConnectionEvent::RemoteProtocolsChange(supported_protocols) => match self { - Either::Left(handler) => handler.on_connection_event( - ConnectionEvent::RemoteProtocolsChange(supported_protocols), - ), - Either::Right(handler) => handler.on_connection_event( - ConnectionEvent::RemoteProtocolsChange(supported_protocols), - ), - }, - } + todo!() + // match event { + // ConnectionEvent::FullyNegotiatedInbound(fully_negotiated_inbound) => { + // match (fully_negotiated_inbound.transpose(), self) { + // (Either::Left(fully_negotiated_inbound), Either::Left(handler)) => handler + // .on_connection_event(ConnectionEvent::FullyNegotiatedInbound( + // fully_negotiated_inbound, + // )), + // (Either::Right(fully_negotiated_inbound), Either::Right(handler)) => handler + // .on_connection_event(ConnectionEvent::FullyNegotiatedInbound( + // fully_negotiated_inbound, + // )), + // _ => unreachable!(), + // } + // } + // ConnectionEvent::FullyNegotiatedOutbound(fully_negotiated_outbound) => { + // match (fully_negotiated_outbound.transpose(), self) { + // (Either::Left(fully_negotiated_outbound), Either::Left(handler)) => handler + // .on_connection_event(ConnectionEvent::FullyNegotiatedOutbound( + // fully_negotiated_outbound, + // )), + // (Either::Right(fully_negotiated_outbound), Either::Right(handler)) => handler + // .on_connection_event(ConnectionEvent::FullyNegotiatedOutbound( + // fully_negotiated_outbound, + // )), + // _ => unreachable!(), + // } + // } + // ConnectionEvent::DialUpgradeError(dial_upgrade_error) => { + // match (dial_upgrade_error.transpose(), self) { + // (Either::Left(dial_upgrade_error), Either::Left(handler)) => handler + // .on_connection_event(ConnectionEvent::DialUpgradeError(dial_upgrade_error)), + // (Either::Right(dial_upgrade_error), Either::Right(handler)) => handler + // .on_connection_event(ConnectionEvent::DialUpgradeError(dial_upgrade_error)), + // _ => unreachable!(), + // } + // } + // ConnectionEvent::ListenUpgradeError(listen_upgrade_error) => { + // match (listen_upgrade_error.transpose(), self) { + // (Either::Left(listen_upgrade_error), Either::Left(handler)) => handler + // .on_connection_event(ConnectionEvent::ListenUpgradeError( + // listen_upgrade_error, + // )), + // (Either::Right(listen_upgrade_error), Either::Right(handler)) => handler + // .on_connection_event(ConnectionEvent::ListenUpgradeError( + // listen_upgrade_error, + // )), + // _ => unreachable!(), + // } + // } + // ConnectionEvent::AddressChange(address_change) => match self { + // Either::Left(handler) => { + // handler.on_connection_event(ConnectionEvent::AddressChange(address_change)) + // } + // Either::Right(handler) => { + // handler.on_connection_event(ConnectionEvent::AddressChange(address_change)) + // } + // }, + // ConnectionEvent::LocalProtocolsChange(supported_protocols) => match self { + // Either::Left(handler) => handler.on_connection_event( + // ConnectionEvent::LocalProtocolsChange(supported_protocols), + // ), + // Either::Right(handler) => handler.on_connection_event( + // ConnectionEvent::LocalProtocolsChange(supported_protocols), + // ), + // }, + // ConnectionEvent::RemoteProtocolsChange(supported_protocols) => match self { + // Either::Left(handler) => handler.on_connection_event( + // ConnectionEvent::RemoteProtocolsChange(supported_protocols), + // ), + // Either::Right(handler) => handler.on_connection_event( + // ConnectionEvent::RemoteProtocolsChange(supported_protocols), + // ), + // }, + // } } } diff --git a/swarm/src/handler/map_in.rs b/swarm/src/handler/map_in.rs index e3458eb5451..eb7d43c097e 100644 --- a/swarm/src/handler/map_in.rs +++ b/swarm/src/handler/map_in.rs @@ -19,7 +19,7 @@ // DEALINGS IN THE SOFTWARE. use crate::handler::{ - ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, SubstreamProtocol, + ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, SubstreamProtocol, UpgradeInfo, }; use std::{fmt::Debug, marker::PhantomData, task::Context, task::Poll}; @@ -89,8 +89,8 @@ where fn on_connection_event( &mut self, event: ConnectionEvent< - Self::InboundProtocol, - Self::OutboundProtocol, + ::Info, + ::Info, Self::InboundOpenInfo, Self::OutboundOpenInfo, >, diff --git a/swarm/src/handler/map_out.rs b/swarm/src/handler/map_out.rs index cc06a4c50c8..28329532c2d 100644 --- a/swarm/src/handler/map_out.rs +++ b/swarm/src/handler/map_out.rs @@ -19,7 +19,7 @@ // DEALINGS IN THE SOFTWARE. use crate::handler::{ - ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, SubstreamProtocol, + ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, SubstreamProtocol, UpgradeInfo, }; use std::fmt::Debug; use std::task::{Context, Poll}; @@ -93,8 +93,8 @@ where fn on_connection_event( &mut self, event: ConnectionEvent< - Self::InboundProtocol, - Self::OutboundProtocol, + ::Info, + ::Info, Self::InboundOpenInfo, Self::OutboundOpenInfo, >, diff --git a/swarm/src/handler/multi.rs b/swarm/src/handler/multi.rs deleted file mode 100644 index ee7cb88f67d..00000000000 --- a/swarm/src/handler/multi.rs +++ /dev/null @@ -1,451 +0,0 @@ -// Copyright 2020 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -//! A [`ConnectionHandler`] implementation that combines multiple other [`ConnectionHandler`]s -//! indexed by some key. - -use crate::handler::{ - AddressChange, ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError, - FullyNegotiatedInbound, FullyNegotiatedOutbound, ListenUpgradeError, SubstreamProtocol, -}; -use crate::upgrade::{InboundUpgrade, IntoIteratorSend, OutboundUpgrade, UpgradeInfo}; -use crate::Stream; -use futures::{future::BoxFuture, prelude::*}; -use rand::Rng; -use std::{ - cmp, - collections::{HashMap, HashSet}, - error, - fmt::{self, Debug}, - hash::Hash, - iter::{self, FromIterator}, - task::{Context, Poll}, - time::Duration, -}; - -/// A [`ConnectionHandler`] for multiple [`ConnectionHandler`]s of the same type. -#[derive(Clone)] -pub struct MultiHandler { - handlers: HashMap, -} - -impl fmt::Debug for MultiHandler -where - K: fmt::Debug + Eq + Hash, - H: fmt::Debug, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("MultiHandler") - .field("handlers", &self.handlers) - .finish() - } -} - -impl MultiHandler -where - K: Clone + Debug + Hash + Eq + Send + 'static, - H: ConnectionHandler, -{ - /// Create and populate a `MultiHandler` from the given handler iterator. - /// - /// It is an error for any two protocols handlers to share the same protocol name. - pub fn try_from_iter(iter: I) -> Result - where - I: IntoIterator, - { - let m = MultiHandler { - handlers: HashMap::from_iter(iter), - }; - uniq_proto_names( - m.handlers - .values() - .map(|h| h.listen_protocol().into_upgrade().0), - )?; - Ok(m) - } - - fn on_listen_upgrade_error( - &mut self, - ListenUpgradeError { - error: (key, error), - mut info, - }: ListenUpgradeError< - ::InboundOpenInfo, - ::InboundProtocol, - >, - ) { - if let Some(h) = self.handlers.get_mut(&key) { - if let Some(i) = info.take(&key) { - h.on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError { - info: i, - error, - })); - } - } - } -} - -impl ConnectionHandler for MultiHandler -where - K: Clone + Debug + Hash + Eq + Send + 'static, - H: ConnectionHandler, - H::InboundProtocol: InboundUpgrade, - H::OutboundProtocol: OutboundUpgrade, -{ - type FromBehaviour = (K, ::FromBehaviour); - type ToBehaviour = (K, ::ToBehaviour); - type Error = ::Error; - type InboundProtocol = Upgrade::InboundProtocol>; - type OutboundProtocol = ::OutboundProtocol; - type InboundOpenInfo = Info::InboundOpenInfo>; - type OutboundOpenInfo = (K, ::OutboundOpenInfo); - - fn listen_protocol(&self) -> SubstreamProtocol { - let (upgrade, info, timeout) = self - .handlers - .iter() - .map(|(key, handler)| { - let proto = handler.listen_protocol(); - let timeout = *proto.timeout(); - let (upgrade, info) = proto.into_upgrade(); - (key.clone(), (upgrade, info, timeout)) - }) - .fold( - (Upgrade::new(), Info::new(), Duration::from_secs(0)), - |(mut upg, mut inf, mut timeout), (k, (u, i, t))| { - upg.upgrades.push((k.clone(), u)); - inf.infos.push((k, i)); - timeout = cmp::max(timeout, t); - (upg, inf, timeout) - }, - ); - SubstreamProtocol::new(upgrade, info).with_timeout(timeout) - } - - fn on_connection_event( - &mut self, - event: ConnectionEvent< - Self::InboundProtocol, - Self::OutboundProtocol, - Self::InboundOpenInfo, - Self::OutboundOpenInfo, - >, - ) { - match event { - ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { - protocol, - info: (key, arg), - }) => { - if let Some(h) = self.handlers.get_mut(&key) { - h.on_connection_event(ConnectionEvent::FullyNegotiatedOutbound( - FullyNegotiatedOutbound { - protocol, - info: arg, - }, - )); - } else { - log::error!("FullyNegotiatedOutbound: no handler for key") - } - } - ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound { - protocol: (key, arg), - mut info, - }) => { - if let Some(h) = self.handlers.get_mut(&key) { - if let Some(i) = info.take(&key) { - h.on_connection_event(ConnectionEvent::FullyNegotiatedInbound( - FullyNegotiatedInbound { - protocol: arg, - info: i, - }, - )); - } - } else { - log::error!("FullyNegotiatedInbound: no handler for key") - } - } - ConnectionEvent::AddressChange(AddressChange { new_address }) => { - for h in self.handlers.values_mut() { - h.on_connection_event(ConnectionEvent::AddressChange(AddressChange { - new_address, - })); - } - } - ConnectionEvent::DialUpgradeError(DialUpgradeError { - info: (key, arg), - error, - }) => { - if let Some(h) = self.handlers.get_mut(&key) { - h.on_connection_event(ConnectionEvent::DialUpgradeError(DialUpgradeError { - info: arg, - error, - })); - } else { - log::error!("DialUpgradeError: no handler for protocol") - } - } - ConnectionEvent::ListenUpgradeError(listen_upgrade_error) => { - self.on_listen_upgrade_error(listen_upgrade_error) - } - ConnectionEvent::LocalProtocolsChange(supported_protocols) => { - for h in self.handlers.values_mut() { - h.on_connection_event(ConnectionEvent::LocalProtocolsChange( - supported_protocols.clone(), - )); - } - } - ConnectionEvent::RemoteProtocolsChange(supported_protocols) => { - for h in self.handlers.values_mut() { - h.on_connection_event(ConnectionEvent::RemoteProtocolsChange( - supported_protocols.clone(), - )); - } - } - } - } - - fn on_behaviour_event(&mut self, (key, event): Self::FromBehaviour) { - if let Some(h) = self.handlers.get_mut(&key) { - h.on_behaviour_event(event) - } else { - log::error!("on_behaviour_event: no handler for key") - } - } - - fn connection_keep_alive(&self) -> bool { - self.handlers - .values() - .map(|h| h.connection_keep_alive()) - .max() - .unwrap_or(false) - } - - fn poll( - &mut self, - cx: &mut Context<'_>, - ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::ToBehaviour, - Self::Error, - >, - > { - // Calling `gen_range(0, 0)` (see below) would panic, so we have return early to avoid - // that situation. - if self.handlers.is_empty() { - return Poll::Pending; - } - - // Not always polling handlers in the same order should give anyone the chance to make progress. - let pos = rand::thread_rng().gen_range(0..self.handlers.len()); - - for (k, h) in self.handlers.iter_mut().skip(pos) { - if let Poll::Ready(e) = h.poll(cx) { - let e = e - .map_outbound_open_info(|i| (k.clone(), i)) - .map_custom(|p| (k.clone(), p)); - return Poll::Ready(e); - } - } - - for (k, h) in self.handlers.iter_mut().take(pos) { - if let Poll::Ready(e) = h.poll(cx) { - let e = e - .map_outbound_open_info(|i| (k.clone(), i)) - .map_custom(|p| (k.clone(), p)); - return Poll::Ready(e); - } - } - - Poll::Pending - } -} - -/// Split [`MultiHandler`] into parts. -impl IntoIterator for MultiHandler { - type Item = ::Item; - type IntoIter = std::collections::hash_map::IntoIter; - - fn into_iter(self) -> Self::IntoIter { - self.handlers.into_iter() - } -} - -/// Index and protocol name pair used as `UpgradeInfo::Info`. -#[derive(Debug, Clone)] -pub struct IndexedProtoName(usize, H); - -impl> AsRef for IndexedProtoName { - fn as_ref(&self) -> &str { - self.1.as_ref() - } -} - -/// The aggregated `InboundOpenInfo`s of supported inbound substream protocols. -#[derive(Clone)] -pub struct Info { - infos: Vec<(K, I)>, -} - -impl Info { - fn new() -> Self { - Info { infos: Vec::new() } - } - - pub fn take(&mut self, k: &K) -> Option { - if let Some(p) = self.infos.iter().position(|(key, _)| key == k) { - return Some(self.infos.remove(p).1); - } - None - } -} - -/// Inbound and outbound upgrade for all [`ConnectionHandler`]s. -#[derive(Clone)] -pub struct Upgrade { - upgrades: Vec<(K, H)>, -} - -impl Upgrade { - fn new() -> Self { - Upgrade { - upgrades: Vec::new(), - } - } -} - -impl fmt::Debug for Upgrade -where - K: fmt::Debug + Eq + Hash, - H: fmt::Debug, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Upgrade") - .field("upgrades", &self.upgrades) - .finish() - } -} - -impl UpgradeInfo for Upgrade -where - H: UpgradeInfo, - K: Send + 'static, -{ - type Info = IndexedProtoName; - type InfoIter = std::vec::IntoIter; - - fn protocol_info(&self) -> Self::InfoIter { - self.upgrades - .iter() - .enumerate() - .flat_map(|(i, (_, h))| iter::repeat(i).zip(h.protocol_info().into_iter_send())) - .map(|(i, h)| IndexedProtoName(i, h)) - .collect::>() - .into_iter() - } -} - -impl InboundUpgrade for Upgrade -where - H: InboundUpgrade, - K: Send + 'static, -{ - type Output = (K, ::Output); - type Error = (K, ::Error); - type Future = BoxFuture<'static, Result>; - - fn upgrade_inbound(mut self, resource: Stream, info: Self::Info) -> Self::Future { - let IndexedProtoName(index, info) = info; - let (key, upgrade) = self.upgrades.remove(index); - upgrade - .upgrade_inbound(resource, info) - .map(move |out| match out { - Ok(o) => Ok((key, o)), - Err(e) => Err((key, e)), - }) - .boxed() - } -} - -impl OutboundUpgrade for Upgrade -where - H: OutboundUpgrade, - K: Send + 'static, -{ - type Output = (K, ::Output); - type Error = (K, ::Error); - type Future = BoxFuture<'static, Result>; - - fn upgrade_outbound(mut self, resource: Stream, info: Self::Info) -> Self::Future { - let IndexedProtoName(index, info) = info; - let (key, upgrade) = self.upgrades.remove(index); - upgrade - .upgrade_outbound(resource, info) - .map(move |out| match out { - Ok(o) => Ok((key, o)), - Err(e) => Err((key, e)), - }) - .boxed() - } -} - -/// Check that no two protocol names are equal. -fn uniq_proto_names(iter: I) -> Result<(), DuplicateProtonameError> -where - I: Iterator, - T: UpgradeInfo, -{ - let mut set = HashSet::new(); - for infos in iter { - for i in infos.protocol_info().into_iter_send() { - let v = Vec::from(i.as_ref()); - if set.contains(&v) { - return Err(DuplicateProtonameError(v)); - } else { - set.insert(v); - } - } - } - Ok(()) -} - -/// It is an error if two handlers share the same protocol name. -#[derive(Debug, Clone)] -pub struct DuplicateProtonameError(Vec); - -impl DuplicateProtonameError { - /// The protocol name bytes that occured in more than one handler. - pub fn protocol_name(&self) -> &[u8] { - &self.0 - } -} - -impl fmt::Display for DuplicateProtonameError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - if let Ok(s) = std::str::from_utf8(&self.0) { - write!(f, "duplicate protocol name: {s}") - } else { - write!(f, "duplicate protocol name: {:?}", self.0) - } - } -} - -impl error::Error for DuplicateProtonameError {} diff --git a/swarm/src/handler/one_shot.rs b/swarm/src/handler/one_shot.rs index 910c5ff52f5..bfba55e8dda 100644 --- a/swarm/src/handler/one_shot.rs +++ b/swarm/src/handler/one_shot.rs @@ -20,7 +20,7 @@ use crate::handler::{ ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError, - FullyNegotiatedInbound, FullyNegotiatedOutbound, SubstreamProtocol, + FullyNegotiatedInbound, FullyNegotiatedOutbound, SubstreamProtocol, UpgradeInfo, }; use crate::upgrade::{InboundUpgrade, OutboundUpgrade}; use crate::StreamUpgradeError; @@ -167,8 +167,8 @@ where fn on_connection_event( &mut self, event: ConnectionEvent< - Self::InboundProtocol, - Self::OutboundProtocol, + ::Info, + ::Info, Self::InboundOpenInfo, Self::OutboundOpenInfo, >, @@ -178,17 +178,17 @@ where protocol: out, .. }) => { - self.events_out.push(Ok(out.into())); + self.events_out.push(Ok(todo!())); } ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { protocol: out, .. }) => { self.dial_negotiated -= 1; - self.events_out.push(Ok(out.into())); + self.events_out.push(Ok(todo!())); } ConnectionEvent::DialUpgradeError(DialUpgradeError { error, .. }) => { - self.events_out.push(Err(error)); + self.events_out.push(Err(todo!())); } ConnectionEvent::AddressChange(_) | ConnectionEvent::ListenUpgradeError(_) diff --git a/swarm/src/handler/pending.rs b/swarm/src/handler/pending.rs index 49acb65ec9e..386e459c140 100644 --- a/swarm/src/handler/pending.rs +++ b/swarm/src/handler/pending.rs @@ -21,7 +21,7 @@ use crate::handler::{ ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, FullyNegotiatedInbound, - FullyNegotiatedOutbound, SubstreamProtocol, + FullyNegotiatedOutbound, SubstreamProtocol, UpgradeInfo, }; use crate::upgrade::PendingUpgrade; use crate::StreamProtocol; @@ -74,8 +74,8 @@ impl ConnectionHandler for PendingConnectionHandler { fn on_connection_event( &mut self, event: ConnectionEvent< - Self::InboundProtocol, - Self::OutboundProtocol, + ::Info, + ::Info, Self::InboundOpenInfo, Self::OutboundOpenInfo, >, @@ -83,12 +83,13 @@ impl ConnectionHandler for PendingConnectionHandler { match event { ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound { protocol, .. - }) => void::unreachable(protocol), + }) => todo!("should be void but doesn't implement `AsRef`"), ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { protocol, info: _info, + stream, }) => { - void::unreachable(protocol); + todo!("should be void but doesn't implement `AsRef`"); #[allow(unreachable_code, clippy::used_underscore_binding)] { void::unreachable(_info); diff --git a/swarm/src/handler/select.rs b/swarm/src/handler/select.rs index 83b56201773..40ccd82ad16 100644 --- a/swarm/src/handler/select.rs +++ b/swarm/src/handler/select.rs @@ -21,7 +21,7 @@ use crate::handler::{ AddressChange, ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, InboundUpgrade, ListenUpgradeError, - OutboundUpgrade, StreamUpgradeError, SubstreamProtocol, + OutboundUpgrade, StreamUpgradeError, SubstreamProtocol, UpgradeInfo, }; use crate::upgrade::SelectUpgrade; use either::Either; @@ -58,17 +58,18 @@ where pub(crate) fn transpose( self, ) -> Either, FullyNegotiatedOutbound> { - match self { - FullyNegotiatedOutbound { - protocol: future::Either::Left(protocol), - info: Either::Left(info), - } => Either::Left(FullyNegotiatedOutbound { protocol, info }), - FullyNegotiatedOutbound { - protocol: future::Either::Right(protocol), - info: Either::Right(info), - } => Either::Right(FullyNegotiatedOutbound { protocol, info }), - _ => panic!("wrong API usage: the protocol doesn't match the upgrade info"), - } + // match self { + // FullyNegotiatedOutbound { + // protocol: future::Either::Left(protocol), + // info: Either::Left(info), + // } => Either::Left(FullyNegotiatedOutbound { protocol, info }), + // FullyNegotiatedOutbound { + // protocol: future::Either::Right(protocol), + // info: Either::Right(info), + // } => Either::Right(FullyNegotiatedOutbound { protocol, info }), + // _ => panic!("wrong API usage: the protocol doesn't match the upgrade info"), + // } + todo!() } } @@ -80,59 +81,58 @@ where pub(crate) fn transpose( self, ) -> Either, FullyNegotiatedInbound> { - match self { - FullyNegotiatedInbound { - protocol: future::Either::Left(protocol), - info: (i1, _i2), - } => Either::Left(FullyNegotiatedInbound { protocol, info: i1 }), - FullyNegotiatedInbound { - protocol: future::Either::Right(protocol), - info: (_i1, i2), - } => Either::Right(FullyNegotiatedInbound { protocol, info: i2 }), - } + // match self { + // FullyNegotiatedInbound { + // protocol: future::Either::Left(protocol), + // info: (i1, _i2), + // } => Either::Left(FullyNegotiatedInbound { protocol, info: i1 }), + // FullyNegotiatedInbound { + // protocol: future::Either::Right(protocol), + // info: (_i1, i2), + // } => Either::Right(FullyNegotiatedInbound { protocol, info: i2 }), + // } + todo!() } } -impl DialUpgradeError, Either> +impl DialUpgradeError> where - S1OP: OutboundUpgrade, - S2OP: OutboundUpgrade, S1OOI: Send + 'static, S2OOI: Send + 'static, { - pub(crate) fn transpose( - self, - ) -> Either, DialUpgradeError> { - match self { - DialUpgradeError { - info: Either::Left(info), - error: StreamUpgradeError::Apply(Either::Left(err)), - } => Either::Left(DialUpgradeError { - info, - error: StreamUpgradeError::Apply(err), - }), - DialUpgradeError { - info: Either::Right(info), - error: StreamUpgradeError::Apply(Either::Right(err)), - } => Either::Right(DialUpgradeError { - info, - error: StreamUpgradeError::Apply(err), - }), - DialUpgradeError { - info: Either::Left(info), - error: e, - } => Either::Left(DialUpgradeError { - info, - error: e.map_upgrade_err(|_| panic!("already handled above")), - }), - DialUpgradeError { - info: Either::Right(info), - error: e, - } => Either::Right(DialUpgradeError { - info, - error: e.map_upgrade_err(|_| panic!("already handled above")), - }), - } + pub(crate) fn transpose(self) -> Either, DialUpgradeError> { + // match self { + // DialUpgradeError { + // info: Either::Left(info), + // error: StreamUpgradeError::Apply(Either::Left(err)), + // } => Either::Left(DialUpgradeError { + // info, + // error: StreamUpgradeError::Apply(err), + // }), + // DialUpgradeError { + // info: Either::Right(info), + // error: StreamUpgradeError::Apply(Either::Right(err)), + // } => Either::Right(DialUpgradeError { + // info, + // error: StreamUpgradeError::Apply(err), + // }), + // DialUpgradeError { + // info: Either::Left(info), + // error: e, + // } => Either::Left(DialUpgradeError { + // info, + // error: e.map_upgrade_err(|_| panic!("already handled above")), + // }), + // DialUpgradeError { + // info: Either::Right(info), + // error: e, + // } => Either::Right(DialUpgradeError { + // info, + // error: e.map_upgrade_err(|_| panic!("already handled above")), + // }), + // } + + todo!() } } @@ -146,27 +146,26 @@ where ListenUpgradeError { info: (i1, i2), error, - }: ListenUpgradeError< - ::InboundOpenInfo, - ::InboundProtocol, - >, + }: ListenUpgradeError<::InboundOpenInfo>, ) { - match error { - Either::Left(error) => { - self.proto1 - .on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError { - info: i1, - error, - })); - } - Either::Right(error) => { - self.proto2 - .on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError { - info: i2, - error, - })); - } - } + // match error { + // Either::Left(error) => { + // self.proto1 + // .on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError { + // info: i1, + // error, + // })); + // } + // Either::Right(error) => { + // self.proto2 + // .on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError { + // info: i2, + // error, + // })); + // } + // } + + todo!() } } @@ -265,77 +264,78 @@ where fn on_connection_event( &mut self, event: ConnectionEvent< - Self::InboundProtocol, - Self::OutboundProtocol, + ::Info, + ::Info, Self::InboundOpenInfo, Self::OutboundOpenInfo, >, ) { - match event { - ConnectionEvent::FullyNegotiatedOutbound(fully_negotiated_outbound) => { - match fully_negotiated_outbound.transpose() { - Either::Left(f) => self - .proto1 - .on_connection_event(ConnectionEvent::FullyNegotiatedOutbound(f)), - Either::Right(f) => self - .proto2 - .on_connection_event(ConnectionEvent::FullyNegotiatedOutbound(f)), - } - } - ConnectionEvent::FullyNegotiatedInbound(fully_negotiated_inbound) => { - match fully_negotiated_inbound.transpose() { - Either::Left(f) => self - .proto1 - .on_connection_event(ConnectionEvent::FullyNegotiatedInbound(f)), - Either::Right(f) => self - .proto2 - .on_connection_event(ConnectionEvent::FullyNegotiatedInbound(f)), - } - } - ConnectionEvent::AddressChange(address) => { - self.proto1 - .on_connection_event(ConnectionEvent::AddressChange(AddressChange { - new_address: address.new_address, - })); - - self.proto2 - .on_connection_event(ConnectionEvent::AddressChange(AddressChange { - new_address: address.new_address, - })); - } - ConnectionEvent::DialUpgradeError(dial_upgrade_error) => { - match dial_upgrade_error.transpose() { - Either::Left(err) => self - .proto1 - .on_connection_event(ConnectionEvent::DialUpgradeError(err)), - Either::Right(err) => self - .proto2 - .on_connection_event(ConnectionEvent::DialUpgradeError(err)), - } - } - ConnectionEvent::ListenUpgradeError(listen_upgrade_error) => { - self.on_listen_upgrade_error(listen_upgrade_error) - } - ConnectionEvent::LocalProtocolsChange(supported_protocols) => { - self.proto1 - .on_connection_event(ConnectionEvent::LocalProtocolsChange( - supported_protocols.clone(), - )); - self.proto2 - .on_connection_event(ConnectionEvent::LocalProtocolsChange( - supported_protocols, - )); - } - ConnectionEvent::RemoteProtocolsChange(supported_protocols) => { - self.proto1 - .on_connection_event(ConnectionEvent::RemoteProtocolsChange( - supported_protocols.clone(), - )); - self.proto2 - .on_connection_event(ConnectionEvent::RemoteProtocolsChange( - supported_protocols, - )); - } - } + // match event { + // ConnectionEvent::FullyNegotiatedOutbound(fully_negotiated_outbound) => { + // match fully_negotiated_outbound.transpose() { + // Either::Left(f) => self + // .proto1 + // .on_connection_event(ConnectionEvent::FullyNegotiatedOutbound(f)), + // Either::Right(f) => self + // .proto2 + // .on_connection_event(ConnectionEvent::FullyNegotiatedOutbound(f)), + // } + // } + // ConnectionEvent::FullyNegotiatedInbound(fully_negotiated_inbound) => { + // match fully_negotiated_inbound.transpose() { + // Either::Left(f) => self + // .proto1 + // .on_connection_event(ConnectionEvent::FullyNegotiatedInbound(f)), + // Either::Right(f) => self + // .proto2 + // .on_connection_event(ConnectionEvent::FullyNegotiatedInbound(f)), + // } + // } + // ConnectionEvent::AddressChange(address) => { + // self.proto1 + // .on_connection_event(ConnectionEvent::AddressChange(AddressChange { + // new_address: address.new_address, + // })); + // + // self.proto2 + // .on_connection_event(ConnectionEvent::AddressChange(AddressChange { + // new_address: address.new_address, + // })); + // } + // ConnectionEvent::DialUpgradeError(dial_upgrade_error) => { + // match dial_upgrade_error.transpose() { + // Either::Left(err) => self + // .proto1 + // .on_connection_event(ConnectionEvent::DialUpgradeError(err)), + // Either::Right(err) => self + // .proto2 + // .on_connection_event(ConnectionEvent::DialUpgradeError(err)), + // } + // } + // ConnectionEvent::ListenUpgradeError(listen_upgrade_error) => { + // self.on_listen_upgrade_error(listen_upgrade_error) + // } + // ConnectionEvent::LocalProtocolsChange(supported_protocols) => { + // self.proto1 + // .on_connection_event(ConnectionEvent::LocalProtocolsChange( + // supported_protocols.clone(), + // )); + // self.proto2 + // .on_connection_event(ConnectionEvent::LocalProtocolsChange( + // supported_protocols, + // )); + // } + // ConnectionEvent::RemoteProtocolsChange(supported_protocols) => { + // self.proto1 + // .on_connection_event(ConnectionEvent::RemoteProtocolsChange( + // supported_protocols.clone(), + // )); + // self.proto2 + // .on_connection_event(ConnectionEvent::RemoteProtocolsChange( + // supported_protocols, + // )); + // } + // } + todo!() } } diff --git a/swarm/src/upgrade/denied.rs b/swarm/src/upgrade/denied.rs index 1efc2c89f95..55f4d2253c7 100644 --- a/swarm/src/upgrade/denied.rs +++ b/swarm/src/upgrade/denied.rs @@ -30,7 +30,7 @@ use void::Void; pub struct DeniedUpgrade; impl UpgradeInfo for DeniedUpgrade { - type Info = StreamProtocol; + type Info = StreamProtocol; // This should be `Void` but it doesn't implement `AsRef<&str>`. type InfoIter = iter::Empty; fn protocol_info(&self) -> Self::InfoIter { diff --git a/swarm/tests/connection_close.rs b/swarm/tests/connection_close.rs index bf5f423166d..0b2825eeb83 100644 --- a/swarm/tests/connection_close.rs +++ b/swarm/tests/connection_close.rs @@ -140,12 +140,7 @@ impl ConnectionHandler for HandlerWithState { fn on_connection_event( &mut self, - _: ConnectionEvent< - Self::InboundProtocol, - Self::OutboundProtocol, - Self::InboundOpenInfo, - Self::OutboundOpenInfo, - >, + _: ConnectionEvent, ) { } }