Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add Transport::box_multiplexed #3313

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@

- Improve error messages in case keys cannot be decoded because of missing feature flags. See [PR 2972].

- Add `Transport::box_multiplexed` implementation for boxing multiplexed transports into the
`Boxed<(PeerId, StreamMuxerBox)>` type used with the swarm.
Deprecate `Multiplexed::boxed` in favor of `box_multiplexed`. See [PR 3313].

[PR 3313]: https://github.com/libp2p/rust-libp2p/pull/3313
[PR 3031]: https://github.com/libp2p/rust-libp2p/pull/3031
[PR 3058]: https://github.com/libp2p/rust-libp2p/pull/3058
[PR 3097]: https://github.com/libp2p/rust-libp2p/pull/3097
Expand Down
18 changes: 18 additions & 0 deletions core/src/either.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,24 @@ where
}
}

impl<T, A, B> From<EitherOutput<(T, A), (T, B)>> for (T, EitherOutput<A, B>) {
fn from(either_output: EitherOutput<(T, A), (T, B)>) -> Self {
match either_output {
EitherOutput::First((t, a)) => (t, EitherOutput::First(a)),
EitherOutput::Second((t, b)) => (t, EitherOutput::Second(b)),
}
}
}

impl<T, A, B> From<EitherOutput<(A, T), (B, T)>> for (EitherOutput<A, B>, T) {
fn from(either_output: EitherOutput<(A, T), (B, T)>) -> Self {
match either_output {
EitherOutput::First((a, t)) => (EitherOutput::First(a), t),
EitherOutput::Second((b, t)) => (EitherOutput::Second(b), t),
}
}
}

/// Implements `Future` and dispatches all method calls to either `First` or `Second`.
#[pin_project(project = EitherFutureProj)]
#[derive(Debug, Copy, Clone)]
Expand Down
22 changes: 21 additions & 1 deletion core/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ pub mod upgrade;
mod boxed;
mod optional;

use crate::ConnectedPoint;
use crate::{muxing::StreamMuxerBox, ConnectedPoint, PeerId, StreamMuxer};

use self::boxed::boxed;
pub use self::boxed::Boxed;
pub use self::choice::OrTransport;
pub use self::memory::MemoryTransport;
Expand Down Expand Up @@ -236,6 +237,25 @@ pub trait Transport {
{
upgrade::Builder::new(self, version)
}

/// Box a multiplexed transport, including the inner muxer
/// and all errors.
fn box_multiplexed<M>(self) -> Boxed<(PeerId, StreamMuxerBox)>
where
Self: Sized + Send + Unpin + 'static,
Self::Dial: Send + 'static,
Self::ListenerUpgrade: Send + 'static,
Self::Error: Send + Sync,
Self::Output: Into<(PeerId, M)>,
M: StreamMuxer + Send + 'static,
M::Substream: Send + 'static,
M::Error: Send + Sync + 'static,
{
boxed(self.map(|o, _| {
let (i, m) = o.into();
(i, StreamMuxerBox::new(m))
}))
}
}

/// The ID of a single listener.
Expand Down
7 changes: 4 additions & 3 deletions core/src/transport/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use crate::{
connection::ConnectedPoint,
muxing::{StreamMuxer, StreamMuxerBox},
transport::{
and_then::AndThen, boxed::boxed, timeout::TransportTimeout, ListenerId, Transport,
TransportError, TransportEvent,
and_then::AndThen, timeout::TransportTimeout, ListenerId, Transport, TransportError,
TransportEvent,
},
upgrade::{
self, apply_inbound, apply_outbound, InboundUpgrade, InboundUpgradeApply, OutboundUpgrade,
Expand Down Expand Up @@ -293,6 +293,7 @@ pub struct Multiplexed<T>(#[pin] T);
impl<T> Multiplexed<T> {
/// Boxes the authenticated, multiplexed transport, including
/// the [`StreamMuxer`] and custom transport errors.
#[deprecated(since = "0.39.0", note = "Use `Transport::box_multiplexed` instead.")]
pub fn boxed<M>(self) -> super::Boxed<(PeerId, StreamMuxerBox)>
where
T: Transport<Output = (PeerId, M)> + Sized + Send + Unpin + 'static,
Expand All @@ -303,7 +304,7 @@ impl<T> Multiplexed<T> {
M::Substream: Send + 'static,
M::Error: Send + Sync + 'static,
{
boxed(self.map(|(i, m), _| (i, StreamMuxerBox::new(m))))
self.box_multiplexed()
}

/// Adds a timeout to the setup and protocol upgrade process for all
Expand Down
4 changes: 2 additions & 2 deletions core/tests/transport_upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ fn upgrade_pipeline() {
.apply(HelloUpgrade {})
.apply(HelloUpgrade {})
.multiplex(MplexConfig::default())
.boxed();
.box_multiplexed();

let dialer_keys = identity::Keypair::generate_ed25519();
let dialer_id = dialer_keys.public().to_peer_id();
Expand All @@ -97,7 +97,7 @@ fn upgrade_pipeline() {
.apply(HelloUpgrade {})
.apply(HelloUpgrade {})
.multiplex(MplexConfig::default())
.boxed();
.box_multiplexed();

let listen_addr1 = Multiaddr::from(Protocol::Memory(random::<u64>()));
let listen_addr2 = listen_addr1.clone();
Expand Down
2 changes: 1 addition & 1 deletion examples/chat-tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
.expect("Signing libp2p-noise static DH keypair failed."),
)
.multiplex(mplex::MplexConfig::new())
.boxed();
.box_multiplexed();

// Create a Floodsub topic
let floodsub_topic = floodsub::Topic::new("chat");
Expand Down
2 changes: 1 addition & 1 deletion examples/ipfs-private.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ pub fn build_transport(
.authenticate(noise_config)
.multiplex(yamux_config)
.timeout(Duration::from_secs(20))
.boxed()
.box_multiplexed()
}

/// Get the current ipfs repo path, either from the IPFS_PATH environment variable or
Expand Down
2 changes: 1 addition & 1 deletion misc/metrics/examples/metrics/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ fn main() -> Result<(), Box<dyn Error>> {
.upgrade(Version::V1)
.authenticate(noise::NoiseAuthenticated::xx(&local_key)?)
.multiplex(yamux::YamuxConfig::default())
.boxed(),
.box_multiplexed(),
Behaviour::new(local_pub_key),
local_peer_id,
);
Expand Down
2 changes: 1 addition & 1 deletion misc/multistream-select/tests/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ fn mk_transport(up: upgrade::Version) -> (PeerId, TestTransport) {
local_public_key: keys.public(),
})
.multiplex(MplexConfig::default())
.boxed(),
.box_multiplexed(),
)
}

Expand Down
4 changes: 2 additions & 2 deletions muxers/mplex/benches/split_send_size.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ fn tcp_transport(split_send_size: usize) -> BenchTransport {
.authenticate(PlainText2Config { local_public_key })
.multiplex(mplex)
.timeout(Duration::from_secs(5))
.boxed()
.box_multiplexed()
}

fn mem_transport(split_send_size: usize) -> BenchTransport {
Expand All @@ -189,7 +189,7 @@ fn mem_transport(split_send_size: usize) -> BenchTransport {
.authenticate(PlainText2Config { local_public_key })
.multiplex(mplex)
.timeout(Duration::from_secs(5))
.boxed()
.box_multiplexed()
}

criterion_group!(split_send_size, prepare);
Expand Down
2 changes: 1 addition & 1 deletion protocols/autonat/examples/autonat_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
.upgrade(Version::V1)
.authenticate(noise::NoiseAuthenticated::xx(&local_key)?)
.multiplex(yamux::YamuxConfig::default())
.boxed();
.box_multiplexed();

let behaviour = Behaviour::new(local_key.public());

Expand Down
2 changes: 1 addition & 1 deletion protocols/autonat/examples/autonat_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
.upgrade(Version::V1)
.authenticate(noise::NoiseAuthenticated::xx(&local_key)?)
.multiplex(yamux::YamuxConfig::default())
.boxed();
.box_multiplexed();

let behaviour = Behaviour::new(local_key.public());

Expand Down
2 changes: 1 addition & 1 deletion protocols/autonat/tests/test_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ async fn init_swarm(config: Config) -> Swarm<Behaviour> {
.upgrade(Version::V1)
.authenticate(noise::NoiseAuthenticated::xx(&keypair).unwrap())
.multiplex(yamux::YamuxConfig::default())
.boxed();
.box_multiplexed();
let behaviour = Behaviour::new(local_id, config);
Swarm::with_async_std_executor(transport, behaviour, local_id)
}
Expand Down
2 changes: 1 addition & 1 deletion protocols/autonat/tests/test_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ async fn init_swarm(config: Config) -> Swarm<Behaviour> {
.upgrade(Version::V1)
.authenticate(noise::NoiseAuthenticated::xx(&keypair).unwrap())
.multiplex(yamux::YamuxConfig::default())
.boxed();
.box_multiplexed();
let behaviour = Behaviour::new(local_id, config);
Swarm::with_async_std_executor(transport, behaviour, local_id)
}
Expand Down
2 changes: 1 addition & 1 deletion protocols/dcutr/examples/dcutr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ fn main() -> Result<(), Box<dyn Error>> {
.expect("Signing libp2p-noise static DH keypair failed."),
)
.multiplex(libp2p_yamux::YamuxConfig::default())
.boxed();
.box_multiplexed();

#[derive(NetworkBehaviour)]
#[behaviour(
Expand Down
2 changes: 1 addition & 1 deletion protocols/dcutr/tests/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ where
.upgrade(Version::V1)
.authenticate(PlainText2Config { local_public_key })
.multiplex(libp2p_yamux::YamuxConfig::default())
.boxed()
.box_multiplexed()
}

#[derive(NetworkBehaviour)]
Expand Down
2 changes: 1 addition & 1 deletion protocols/gossipsub/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@
//! .upgrade(libp2p_core::upgrade::Version::V1)
//! .authenticate(libp2p_noise::NoiseAuthenticated::xx(&local_key).unwrap())
//! .multiplex(libp2p_mplex::MplexConfig::new())
//! .boxed();
//! .box_multiplexed();
//!
//! // Create a Gossipsub topic
//! let topic = libp2p_gossipsub::IdentTopic::new("example");
Expand Down
2 changes: 1 addition & 1 deletion protocols/gossipsub/tests/smoke.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ fn build_node() -> (Multiaddr, Swarm<Gossipsub>) {
local_public_key: public_key.clone(),
})
.multiplex(yamux::YamuxConfig::default())
.boxed();
.box_multiplexed();

let peer_id = public_key.to_peer_id();

Expand Down
2 changes: 1 addition & 1 deletion protocols/identify/examples/identify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
.upgrade(Version::V1)
.authenticate(libp2p_noise::NoiseAuthenticated::xx(&local_key).unwrap())
.multiplex(libp2p_yamux::YamuxConfig::default())
.boxed();
.box_multiplexed();

// Create a identify network behaviour.
let behaviour = identify::Behaviour::new(identify::Config::new(
Expand Down
2 changes: 1 addition & 1 deletion protocols/identify/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ mod tests {
.upgrade(upgrade::Version::V1)
.authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated())
.multiplex(MplexConfig::new())
.boxed();
.box_multiplexed();
(pubkey, transport)
}

Expand Down
2 changes: 1 addition & 1 deletion protocols/kad/src/behaviour/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ fn build_node_with_config(cfg: KademliaConfig) -> (Multiaddr, TestSwarm) {
.upgrade(upgrade::Version::V1)
.authenticate(noise::NoiseAuthenticated::xx(&local_key).unwrap())
.multiplex(yamux::YamuxConfig::default())
.boxed();
.box_multiplexed();

let local_id = local_public_key.to_peer_id();
let store = MemoryStore::new(local_id);
Expand Down
2 changes: 1 addition & 1 deletion protocols/mdns/tests/use-async-std.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ async fn create_swarm(config: Config) -> Result<Swarm<Behaviour>, Box<dyn Error>
.upgrade(Version::V1)
.authenticate(libp2p_noise::NoiseAuthenticated::xx(&id_keys).unwrap())
.multiplex(libp2p_yamux::YamuxConfig::default())
.boxed();
.box_multiplexed();
let behaviour = Behaviour::new(config, peer_id)?;
let mut swarm = Swarm::with_async_std_executor(transport, behaviour, peer_id);
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;
Expand Down
2 changes: 1 addition & 1 deletion protocols/mdns/tests/use-tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ async fn create_swarm(config: Config) -> Result<Swarm<Behaviour>, Box<dyn Error>
.upgrade(Version::V1)
.authenticate(libp2p_noise::NoiseAuthenticated::xx(&id_keys).unwrap())
.multiplex(libp2p_yamux::YamuxConfig::default())
.boxed();
.box_multiplexed();
let behaviour = Behaviour::new(config, peer_id)?;
let mut swarm = Swarm::with_tokio_executor(transport, behaviour, peer_id);
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;
Expand Down
2 changes: 1 addition & 1 deletion protocols/ping/tests/ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ fn mk_transport(muxer: MuxerChoice) -> (PeerId, transport::Boxed<(PeerId, Stream
MuxerChoice::Yamux => upgrade::EitherUpgrade::A(yamux::YamuxConfig::default()),
MuxerChoice::Mplex => upgrade::EitherUpgrade::B(mplex::MplexConfig::default()),
})
.boxed(),
.box_multiplexed(),
)
}

Expand Down
2 changes: 1 addition & 1 deletion protocols/relay/examples/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ fn main() -> Result<(), Box<dyn Error>> {
.expect("Signing libp2p-noise static DH keypair failed."),
)
.multiplex(libp2p_yamux::YamuxConfig::default())
.boxed();
.box_multiplexed();

let behaviour = Behaviour {
relay: relay::Behaviour::new(local_peer_id, Default::default()),
Expand Down
2 changes: 1 addition & 1 deletion protocols/relay/tests/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ where
.upgrade(upgrade::Version::V1)
.authenticate(PlainText2Config { local_public_key })
.multiplex(libp2p_yamux::YamuxConfig::default())
.boxed()
.box_multiplexed()
}

#[derive(NetworkBehaviour)]
Expand Down
2 changes: 1 addition & 1 deletion protocols/rendezvous/examples/discover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ async fn main() {
.upgrade(Version::V1)
.authenticate(libp2p_noise::NoiseAuthenticated::xx(&identity).unwrap())
.multiplex(libp2p_yamux::YamuxConfig::default())
.boxed(),
.box_multiplexed(),
MyBehaviour {
rendezvous: rendezvous::client::Behaviour::new(identity.clone()),
ping: ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(1))),
Expand Down
2 changes: 1 addition & 1 deletion protocols/rendezvous/examples/register.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ async fn main() {
.upgrade(Version::V1)
.authenticate(libp2p_noise::NoiseAuthenticated::xx(&identity).unwrap())
.multiplex(libp2p_yamux::YamuxConfig::default())
.boxed(),
.box_multiplexed(),
MyBehaviour {
rendezvous: rendezvous::client::Behaviour::new(identity.clone()),
ping: ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(1))),
Expand Down
2 changes: 1 addition & 1 deletion protocols/rendezvous/examples/register_with_identify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ async fn main() {
.upgrade(Version::V1)
.authenticate(libp2p_noise::NoiseAuthenticated::xx(&identity).unwrap())
.multiplex(libp2p_yamux::YamuxConfig::default())
.boxed(),
.box_multiplexed(),
MyBehaviour {
identify: identify::Behaviour::new(identify::Config::new(
"rendezvous-example/1.0.0".to_string(),
Expand Down
2 changes: 1 addition & 1 deletion protocols/rendezvous/examples/rendezvous_point.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ async fn main() {
.upgrade(Version::V1)
.authenticate(libp2p_noise::NoiseAuthenticated::xx(&identity).unwrap())
.multiplex(libp2p_yamux::YamuxConfig::default())
.boxed(),
.box_multiplexed(),
MyBehaviour {
identify: identify::Behaviour::new(identify::Config::new(
"rendezvous-example/1.0.0".to_string(),
Expand Down
2 changes: 1 addition & 1 deletion protocols/rendezvous/tests/harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ where
MplexConfig::new(),
))
.timeout(Duration::from_secs(5))
.boxed();
.box_multiplexed();

Swarm::with_tokio_executor(transport, behaviour_fn(peer_id, identity), peer_id)
}
Expand Down
2 changes: 1 addition & 1 deletion protocols/request-response/tests/ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ fn mk_transport() -> (PeerId, transport::Boxed<(PeerId, StreamMuxerBox)>) {
.upgrade(upgrade::Version::V1)
.authenticate(NoiseAuthenticated::xx(&id_keys).unwrap())
.multiplex(libp2p_yamux::YamuxConfig::default())
.boxed(),
.box_multiplexed(),
)
}

Expand Down
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ pub async fn development_transport(
mplex::MplexConfig::default(),
))
.timeout(std::time::Duration::from_secs(20))
.boxed())
.box_multiplexed())
}

/// Builds a `Transport` based on TCP/IP that supports the most commonly-used features of libp2p:
Expand Down Expand Up @@ -267,5 +267,5 @@ pub fn tokio_development_transport(
mplex::MplexConfig::default(),
))
.timeout(std::time::Duration::from_secs(20))
.boxed())
.box_multiplexed())
}
2 changes: 1 addition & 1 deletion src/transport_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub trait TransportExt: Transport {
/// .expect("Signing libp2p-noise static DH keypair failed."),
/// )
/// .multiplex(mplex::MplexConfig::new())
/// .boxed();
/// .box_multiplexed();
///
/// let (transport, sinks) = transport.with_bandwidth_logging();
/// ```
Expand Down
Loading