diff --git a/core/src/transport.rs b/core/src/transport.rs index 2541734586a..b621e7531ea 100644 --- a/core/src/transport.rs +++ b/core/src/transport.rs @@ -25,14 +25,13 @@ //! any desired protocols. The rest of the module defines combinators for //! modifying a transport through composition with other transports or protocol upgrades. -use crate::ConnectedPoint; +use crate::{ConnectedPoint, ConnectionInfo, muxing::{StreamMuxer, StreamMuxerBox}}; use futures::prelude::*; use multiaddr::Multiaddr; use std::{error::Error, fmt}; use std::time::Duration; pub mod and_then; -pub mod boxed; pub mod choice; pub mod dummy; pub mod map; @@ -41,8 +40,10 @@ pub mod memory; pub mod timeout; pub mod upgrade; +mod boxed; mod optional; +pub use self::boxed::Boxed; pub use self::choice::OrTransport; pub use self::memory::MemoryTransport; pub use self::optional::OptionalTransport; @@ -128,14 +129,24 @@ pub trait Transport { where Self: Sized; - /// Turns the transport into an abstract boxed (i.e. heap-allocated) transport. - fn boxed(self) -> boxed::Boxed - where Self: Sized + Clone + Send + Sync + 'static, - Self::Dial: Send + 'static, - Self::Listener: Send + 'static, - Self::ListenerUpgrade: Send + 'static, + /// Boxes an authenticated, multiplexed transport, including the + /// `StreamMuxer` and transport errors. + fn boxed(self) -> boxed::Boxed<(I, StreamMuxerBox), std::io::Error> + where + Self: Transport + Sized + Clone + Send + Sync + 'static, + Self::Dial: Send + 'static, + Self::Listener: Send + 'static, + Self::ListenerUpgrade: Send + 'static, + Self::Error: Send + Sync, + I: ConnectionInfo, + M: StreamMuxer + Send + Sync + 'static, + M::Substream: Send + 'static, + M::OutboundSubstream: Send + 'static + { - boxed::boxed(self) + boxed::boxed( + self.map(|(i, m), _| (i, StreamMuxerBox::new(m))) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))) } /// Applies a function on the connections created by the transport. diff --git a/core/src/transport/boxed.rs b/core/src/transport/boxed.rs index f0d52b121e4..12d4d28f8a7 100644 --- a/core/src/transport/boxed.rs +++ b/core/src/transport/boxed.rs @@ -24,7 +24,6 @@ use multiaddr::Multiaddr; use std::{error, fmt, pin::Pin, sync::Arc}; /// See the `Transport::boxed` method. -#[inline] pub fn boxed(transport: T) -> Boxed where T: Transport + Clone + Send + Sync + 'static, @@ -37,9 +36,14 @@ where } } -pub type Dial = Pin> + Send>>; -pub type Listener = Pin, E>, E>> + Send>>; -pub type ListenerUpgrade = Pin> + Send>>; +/// See the `Transport::boxed` method. +pub struct Boxed { + inner: Arc + Send + Sync>, +} + +type Dial = Pin> + Send>>; +type Listener = Pin, E>, E>> + Send>>; +type ListenerUpgrade = Pin> + Send>>; trait Abstract { fn listen_on(&self, addr: Multiaddr) -> Result, TransportError>; @@ -68,11 +72,6 @@ where } } -/// See the `Transport::boxed` method. -pub struct Boxed { - inner: Arc + Send + Sync>, -} - impl fmt::Debug for Boxed { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "BoxedTransport") diff --git a/core/tests/network_dial_error.rs b/core/tests/network_dial_error.rs index 94792898e09..f0a2eb36fc1 100644 --- a/core/tests/network_dial_error.rs +++ b/core/tests/network_dial_error.rs @@ -36,22 +36,11 @@ use libp2p_core::{ use libp2p_noise as noise; use rand::Rng; use rand::seq::SliceRandom; -use std::{io, error::Error, fmt, task::Poll}; +use std::{io, task::Poll}; use util::TestHandler; type TestNetwork = Network; -type TestTransport = transport::boxed::Boxed<(PeerId, StreamMuxerBox), BoxError>; - -#[derive(Debug)] -struct BoxError(Box); - -impl Error for BoxError {} - -impl fmt::Display for BoxError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "Transport error: {}", self.0) - } -} +type TestTransport = transport::Boxed<(PeerId, StreamMuxerBox), io::Error>; fn new_network(cfg: NetworkConfig) -> TestNetwork { let local_key = identity::Keypair::generate_ed25519(); @@ -61,13 +50,11 @@ fn new_network(cfg: NetworkConfig) -> TestNetwork { .upgrade(upgrade::Version::V1) .authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated()) .multiplex(libp2p_mplex::MplexConfig::new()) - .map(|(conn_info, muxer), _| (conn_info, StreamMuxerBox::new(muxer))) .and_then(|(peer, mplex), _| { // Gracefully close the connection to allow protocol // negotiation to complete. util::CloseMuxer::new(mplex).map_ok(move |mplex| (peer, mplex)) }) - .map_err(|e| BoxError(Box::new(e))) .boxed(); TestNetwork::new(transport, local_public_key.into(), cfg) } diff --git a/misc/multistream-select/CHANGELOG.md b/misc/multistream-select/CHANGELOG.md index 7eb9593d353..112e4bda38c 100644 --- a/misc/multistream-select/CHANGELOG.md +++ b/misc/multistream-select/CHANGELOG.md @@ -1,5 +1,8 @@ # 0.8.3 [unreleased] +- Fix a regression resulting in a panic with the `V1Lazy` protocol. + [PR 1783](https://github.com/libp2p/rust-libp2p/pull/1783). + - Fix a potential deadlock during protocol negotiation due to a missing flush, potentially resulting in sporadic protocol upgrade timeouts. diff --git a/misc/multistream-select/Cargo.toml b/misc/multistream-select/Cargo.toml index b9553550213..83ecae280c0 100644 --- a/misc/multistream-select/Cargo.toml +++ b/misc/multistream-select/Cargo.toml @@ -19,6 +19,10 @@ unsigned-varint = "0.5" [dev-dependencies] async-std = "1.6.2" +env_logger = "*" +libp2p-core = { path = "../../core" } +libp2p-mplex = { path = "../../muxers/mplex" } +libp2p-plaintext = { path = "../../protocols/plaintext" } quickcheck = "0.9.0" rand = "0.7.2" rw-stream-sink = "0.2.1" diff --git a/misc/multistream-select/src/negotiated.rs b/misc/multistream-select/src/negotiated.rs index bfc1c9d176f..a5c00cbcab5 100644 --- a/misc/multistream-select/src/negotiated.rs +++ b/misc/multistream-select/src/negotiated.rs @@ -127,6 +127,7 @@ impl Negotiated { if let Message::Header(v) = &msg { if *v == version { + *this.state = State::Expecting { io, protocol, version }; continue } } diff --git a/misc/multistream-select/tests/transport.rs b/misc/multistream-select/tests/transport.rs new file mode 100644 index 00000000000..c8695f8cc6e --- /dev/null +++ b/misc/multistream-select/tests/transport.rs @@ -0,0 +1,134 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use libp2p_core::{ + connection::{ConnectionHandler, ConnectionHandlerEvent, Substream, SubstreamEndpoint}, + identity, + muxing::StreamMuxerBox, + upgrade, + multiaddr::Protocol, + Multiaddr, + Network, + network::{NetworkEvent, NetworkConfig}, + PeerId, + Transport, + transport::{self, MemoryTransport} +}; +use libp2p_mplex::MplexConfig; +use libp2p_plaintext::PlainText2Config; +use futures::{channel::oneshot, ready, prelude::*}; +use rand::random; +use std::{io, task::{Context, Poll}}; + +type TestTransport = transport::Boxed<(PeerId, StreamMuxerBox), io::Error>; +type TestNetwork = Network; + +fn mk_transport(up: upgrade::Version) -> (PeerId, TestTransport) { + let keys = identity::Keypair::generate_ed25519(); + let id = keys.public().into_peer_id(); + (id, MemoryTransport::default() + .upgrade(up) + .authenticate(PlainText2Config { local_public_key: keys.public() }) + .multiplex(MplexConfig::default()) + .boxed()) +} + +/// Tests the transport upgrade process with all supported +/// upgrade protocol versions. +#[test] +fn transport_upgrade() { + let _ = env_logger::try_init(); + + fn run(up: upgrade::Version) { + let (dialer_id, dialer_transport) = mk_transport(up); + let (listener_id, listener_transport) = mk_transport(up); + + let listen_addr = Multiaddr::from(Protocol::Memory(random::())); + + let mut dialer = TestNetwork::new(dialer_transport, dialer_id, NetworkConfig::default()); + let mut listener = TestNetwork::new(listener_transport, listener_id, NetworkConfig::default()); + + listener.listen_on(listen_addr).unwrap(); + let (addr_sender, addr_receiver) = oneshot::channel(); + + let client = async move { + let addr = addr_receiver.await.unwrap(); + dialer.dial(&addr, TestHandler()).unwrap(); + futures::future::poll_fn(move |cx| { + loop { + match ready!(dialer.poll(cx)) { + NetworkEvent::ConnectionEstablished { .. } => { + return Poll::Ready(()) + } + _ => {} + } + } + }).await + }; + + let mut addr_sender = Some(addr_sender); + let server = futures::future::poll_fn(move |cx| { + loop { + match ready!(listener.poll(cx)) { + NetworkEvent::NewListenerAddress { listen_addr, .. } => { + addr_sender.take().unwrap().send(listen_addr).unwrap(); + } + NetworkEvent::IncomingConnection { connection, .. } => { + listener.accept(connection, TestHandler()).unwrap(); + } + NetworkEvent::ConnectionEstablished { .. } => { + return Poll::Ready(()) + } + _ => {} + } + } + }); + + async_std::task::block_on(future::select(Box::pin(server), Box::pin(client))); + } + + run(upgrade::Version::V1); + run(upgrade::Version::V1Lazy); +} + +struct TestHandler(); + +impl ConnectionHandler for TestHandler { + type InEvent = (); + type OutEvent = (); + type Error = io::Error; + type Substream = Substream; + type OutboundOpenInfo = (); + + fn inject_substream(&mut self, _: Self::Substream, _: SubstreamEndpoint) + {} + + fn inject_event(&mut self, _: Self::InEvent) + {} + + fn inject_address_change(&mut self, _: &Multiaddr) + {} + + fn poll(&mut self, _: &mut Context<'_>) + -> Poll, Self::Error>> + { + Poll::Pending + } +} diff --git a/protocols/gossipsub/tests/smoke.rs b/protocols/gossipsub/tests/smoke.rs index 288496220ed..57df14ade41 100644 --- a/protocols/gossipsub/tests/smoke.rs +++ b/protocols/gossipsub/tests/smoke.rs @@ -23,14 +23,13 @@ use log::debug; use quickcheck::{QuickCheck, TestResult}; use rand::{random, seq::SliceRandom, SeedableRng}; use std::{ - io::Error, pin::Pin, task::{Context, Poll}, time::Duration, }; use libp2p_core::{ - identity, multiaddr::Protocol, muxing::StreamMuxerBox, transport::MemoryTransport, upgrade, + identity, multiaddr::Protocol, transport::MemoryTransport, upgrade, Multiaddr, Transport, }; use libp2p_gossipsub::{ @@ -151,10 +150,7 @@ fn build_node() -> (Multiaddr, Swarm) { .authenticate(PlainText2Config { local_public_key: public_key.clone(), }) - .multiplex(yamux::Config::default()) - .map(|(p, m), _| (p, StreamMuxerBox::new(m))) - .map_err(|e| -> Error { panic!("Failed to create transport: {:?}", e) }) - .boxed(); + .multiplex(yamux::Config::default()); let peer_id = public_key.clone().into_peer_id(); diff --git a/protocols/kad/src/behaviour/test.rs b/protocols/kad/src/behaviour/test.rs index a1258c2919e..77e3ba53a9b 100644 --- a/protocols/kad/src/behaviour/test.rs +++ b/protocols/kad/src/behaviour/test.rs @@ -38,7 +38,6 @@ use libp2p_core::{ identity, transport::MemoryTransport, multiaddr::{Protocol, Multiaddr, multiaddr}, - muxing::StreamMuxerBox, upgrade }; use libp2p_noise as noise; @@ -46,7 +45,7 @@ use libp2p_swarm::Swarm; use libp2p_yamux as yamux; use quickcheck::*; use rand::{Rng, random, thread_rng, rngs::StdRng, SeedableRng}; -use std::{collections::{HashSet, HashMap}, time::Duration, io, num::NonZeroUsize, u64}; +use std::{collections::{HashSet, HashMap}, time::Duration, num::NonZeroUsize, u64}; use multihash::{wrap, Code, Multihash}; type TestSwarm = Swarm>; @@ -62,10 +61,7 @@ fn build_node_with_config(cfg: KademliaConfig) -> (Multiaddr, TestSwarm) { let transport = MemoryTransport::default() .upgrade(upgrade::Version::V1) .authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated()) - .multiplex(yamux::Config::default()) - .map(|(p, m), _| (p, StreamMuxerBox::new(m))) - .map_err(|e| -> io::Error { panic!("Failed to create transport: {:?}", e); }) - .boxed(); + .multiplex(yamux::Config::default()); let local_id = local_public_key.clone().into_peer_id(); let store = MemoryStore::new(local_id.clone()); diff --git a/protocols/ping/tests/ping.rs b/protocols/ping/tests/ping.rs index 6539a102ea3..3273d211c8e 100644 --- a/protocols/ping/tests/ping.rs +++ b/protocols/ping/tests/ping.rs @@ -25,7 +25,7 @@ use libp2p_core::{ PeerId, identity, muxing::StreamMuxerBox, - transport::{Transport, boxed::Boxed}, + transport::{self, Transport}, upgrade }; use libp2p_mplex as mplex; @@ -196,7 +196,7 @@ fn max_failures() { fn mk_transport(muxer: MuxerChoice) -> ( PeerId, - Boxed< + transport::Boxed< (PeerId, StreamMuxerBox), io::Error > @@ -204,8 +204,7 @@ fn mk_transport(muxer: MuxerChoice) -> ( let id_keys = identity::Keypair::generate_ed25519(); let peer_id = id_keys.public().into_peer_id(); let noise_keys = noise::Keypair::::new().into_authentic(&id_keys).unwrap(); - - let transport = TcpConfig::new() + (peer_id, TcpConfig::new() .nodelay(true) .upgrade(upgrade::Version::V1) .authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated()) @@ -215,11 +214,7 @@ fn mk_transport(muxer: MuxerChoice) -> ( MuxerChoice::Mplex => upgrade::EitherUpgrade::B(mplex::MplexConfig::default()), }) - .map(|(peer, muxer), _| (peer, StreamMuxerBox::new(muxer))) - .map_err(|err| io::Error::new(io::ErrorKind::Other, err)) - .boxed(); - - (peer_id, transport) + .boxed()) } #[derive(Debug, Copy, Clone)] diff --git a/protocols/request-response/tests/ping.rs b/protocols/request-response/tests/ping.rs index 3125a9175bc..6c001d47cfc 100644 --- a/protocols/request-response/tests/ping.rs +++ b/protocols/request-response/tests/ping.rs @@ -26,7 +26,7 @@ use libp2p_core::{ PeerId, identity, muxing::StreamMuxerBox, - transport::{Transport, boxed::Boxed}, + transport::{self, Transport}, upgrade::{self, read_one, write_one} }; use libp2p_noise::{NoiseConfig, X25519Spec, Keypair}; @@ -213,19 +213,16 @@ fn ping_protocol_throttled() { let () = async_std::task::block_on(peer2); } -fn mk_transport() -> (PeerId, Boxed<(PeerId, StreamMuxerBox), io::Error>) { +fn mk_transport() -> (PeerId, transport::Boxed<(PeerId, StreamMuxerBox), io::Error>) { let id_keys = identity::Keypair::generate_ed25519(); let peer_id = id_keys.public().into_peer_id(); let noise_keys = Keypair::::new().into_authentic(&id_keys).unwrap(); - let transport = TcpConfig::new() + (peer_id, TcpConfig::new() .nodelay(true) .upgrade(upgrade::Version::V1) .authenticate(NoiseConfig::xx(noise_keys).into_authenticated()) .multiplex(libp2p_yamux::Config::default()) - .map(|(peer, muxer), _| (peer, StreamMuxerBox::new(muxer))) - .map_err(|err| io::Error::new(io::ErrorKind::Other, err)) - .boxed(); - (peer_id, transport) + .boxed()) } // Simple Ping-Pong Protocol diff --git a/src/lib.rs b/src/lib.rs index ba1af3ba5dd..cc4708ff390 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -303,8 +303,8 @@ pub fn build_tcp_ws_noise_mplex_yamux(keypair: identity::Keypair) .upgrade(core::upgrade::Version::V1) .authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated()) .multiplex(core::upgrade::SelectUpgrade::new(yamux::Config::default(), mplex::MplexConfig::new())) - .map(|(peer, muxer), _| (peer, core::muxing::StreamMuxerBox::new(muxer))) - .timeout(std::time::Duration::from_secs(20))) + .timeout(std::time::Duration::from_secs(20)) + .boxed()) } /// Builds an implementation of `Transport` that is suitable for usage with the `Swarm`. @@ -335,6 +335,6 @@ pub fn build_tcp_ws_pnet_noise_mplex_yamux(keypair: identity::Keypair, psk: PreS .upgrade(core::upgrade::Version::V1) .authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated()) .multiplex(core::upgrade::SelectUpgrade::new(yamux::Config::default(), mplex::MplexConfig::new())) - .map(|(peer, muxer), _| (peer, core::muxing::StreamMuxerBox::new(muxer))) - .timeout(std::time::Duration::from_secs(20))) + .timeout(std::time::Duration::from_secs(20)) + .boxed()) } diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 78149956c57..79043d4e091 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -110,7 +110,7 @@ use libp2p_core::{ PendingConnectionError, Substream }, - transport::{TransportError, boxed::Boxed as BoxTransport}, + transport::{self, TransportError}, muxing::{StreamMuxer, StreamMuxerBox}, network::{ Network, @@ -261,7 +261,7 @@ where TConnInfo: ConnectionInfo, { network: Network< - BoxTransport<(TConnInfo, StreamMuxerBox), io::Error>, + transport::Boxed<(TConnInfo, StreamMuxerBox), io::Error>, TInEvent, TOutEvent, NodeHandlerWrapperBuilder, @@ -972,7 +972,7 @@ impl<'a> PollParameters for SwarmPollParameters<'a> { /// including the underlying [`Network`]. pub struct SwarmBuilder { local_peer_id: PeerId, - transport: BoxTransport<(TConnInfo, StreamMuxerBox), io::Error>, + transport: transport::Boxed<(TConnInfo, StreamMuxerBox), io::Error>, behaviour: TBehaviour, network_config: NetworkConfig, } @@ -996,14 +996,9 @@ where TBehaviour: NetworkBehaviour, TTrans::ListenerUpgrade: Send + 'static, TTrans::Dial: Send + 'static, { - let transport = transport - .map(|(conn_info, muxer), _| (conn_info, StreamMuxerBox::new(muxer))) - .map_err(|err| io::Error::new(io::ErrorKind::Other, err)) - .boxed(); - SwarmBuilder { local_peer_id, - transport, + transport: transport.boxed(), behaviour, network_config: Default::default(), } @@ -1220,10 +1215,7 @@ mod tests { let transport = transport::MemoryTransport::default() .upgrade(upgrade::Version::V1) .authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated()) - .multiplex(libp2p_mplex::MplexConfig::new()) - .map(|(p, m), _| (p, StreamMuxerBox::new(m))) - .map_err(|e| -> io::Error { panic!("Failed to create transport: {:?}", e); }) - .boxed(); + .multiplex(libp2p_mplex::MplexConfig::new()); let behaviour = CallTraceBehaviour::new(MockBehaviour::new(handler_proto)); SwarmBuilder::new(transport, behaviour, pubkey.into()).build() }