Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add webrtc transport multidim interop #100

Merged
merged 11 commits into from
Jan 18, 2023
29 changes: 29 additions & 0 deletions multidim-interop/rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions multidim-interop/rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ tokio = { version = "1.24.1", features = ["full"] }

libp2pv0500 = { package = "libp2p", version = "0.50.0", features = ["websocket", "webrtc", "quic", "mplex", "yamux", "tcp", "tokio", "ping", "noise", "tls", "dns", "rsa", "macros"] }
rand = "0.8.5"
strum = { version = "0.24.1", features = ["derive"] }

65 changes: 40 additions & 25 deletions multidim-interop/rust/src/bin/testplan_0500.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,44 +11,46 @@ use libp2p::core::upgrade::EitherUpgrade;
use libp2p::swarm::{keep_alive, NetworkBehaviour, SwarmEvent};
use libp2p::websocket::WsConfig;
use libp2p::{
core, identity, mplex, noise, ping, webrtc, yamux, Multiaddr, PeerId, Swarm, Transport,
core, identity, mplex, noise, ping, webrtc, yamux, Multiaddr, PeerId, Swarm, Transport as _,
};
use libp2pv0500 as libp2p;
use testplan::{run_ping, PingSwarm};
use testplan::{run_ping, Muxer, PingSwarm, SecProtocol, Transport};

fn build_builder<T, C>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function doesn't have much value because the type constraints are so insane. You are likely better off by just inlining things and accepting the duplication between the different invocations.

Copy link
Member Author

@jxs jxs Jan 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still feel it's more lines of code and repetition, but feel free to submit a PR addressing that after Thomas

builder: core::transport::upgrade::Builder<T>,
secure_channel_param: &str,
muxer_param: &str,
secure_channel_param: SecProtocol,
muxer_param: Muxer,
local_key: &identity::Keypair,
) -> Boxed<(libp2p::PeerId, StreamMuxerBox)>
where
T: Transport<Output = C> + Send + Unpin + 'static,
T: libp2p::Transport<Output = C> + Send + Unpin + 'static,
<T as libp2p::Transport>::Error: Sync + Send + 'static,
<T as libp2p::Transport>::ListenerUpgrade: Send,
<T as libp2p::Transport>::Dial: Send,
C: AsyncRead + AsyncWrite + Send + Unpin + 'static,
{
let mux_upgrade = match muxer_param {
"yamux" => EitherUpgrade::A(yamux::YamuxConfig::default()),
"mplex" => EitherUpgrade::B(mplex::MplexConfig::default()),
_ => panic!("Unsupported muxer"),
Muxer::Yamux => EitherUpgrade::A(yamux::YamuxConfig::default()),
Muxer::Mplex => EitherUpgrade::B(mplex::MplexConfig::default()),
muxer => panic!("muxer {muxer:?} not supported for build_builder"),
};

let timeout = Duration::from_secs(5);

match secure_channel_param {
"noise" => builder
SecProtocol::Noise => builder
.authenticate(noise::NoiseAuthenticated::xx(&local_key).unwrap())
.multiplex(mux_upgrade)
.timeout(timeout)
.boxed(),
"tls" => builder
SecProtocol::Tls => builder
.authenticate(libp2p::tls::Config::new(&local_key).unwrap())
.multiplex(mux_upgrade)
.timeout(timeout)
.boxed(),
_ => panic!("Unsupported secure channel"),
sec_protocol => {
panic!("security protocol {sec_protocol:?} not supported for build_builder")
}
}
}

Expand All @@ -57,46 +59,60 @@ async fn main() -> Result<()> {
let local_key = identity::Keypair::generate_ed25519();
let local_peer_id = PeerId::from(local_key.public());

let transport_param =
env::var("transport").context("transport environment variable is not set")?;
let secure_channel_param =
env::var("security").context("security environment variable is not set")?;
let muxer_param = env::var("muxer").context("muxer environment variable is not set")?;
let transport_param: testplan::Transport = env::var("transport")
.context("transport environment variable is not set")?
.parse()
.context("unsupported transport")?;

let secure_channel_param: testplan::SecProtocol = env::var("security")
.context("security environment variable is not set")?
.parse()
.context("unsupported secure channel")?;

let muxer_param: Muxer = env::var("muxer")
.context("muxer environment variable is not set")?
.parse()
.context("unsupported muxer")?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Until we move the test to our repository (where we deduplicate the test using Git revisions), I don't think it makes much sense to be typed here because we have to repeat it in every test.

Copy link
Member Author

@jxs jxs Jan 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's now all on the testplans lib, which we can depend on the rust-libp2p repo and use from there wdyt?
update: wrote this before reading libp2p/rust-libp2p#3331 (comment) let's continue there.

let ip = env::var("ip").context("ip environment variable is not set")?;

let is_dialer = env::var("is_dialer")
.unwrap_or("true".into())
.parse::<bool>()?;

let redis_addr = env::var("REDIS_ADDR")
.map(|addr| format!("redis://{addr}"))
.unwrap_or("redis://redis:6379".into());

let client = redis::Client::open(redis_addr).context("Could not connect to redis")?;

let (boxed_transport, local_addr) = match transport_param.as_str() {
"quic-v1" => {
let (boxed_transport, local_addr) = match transport_param {
Transport::QuicV1 => {
let builder =
libp2p::quic::tokio::Transport::new(libp2p::quic::Config::new(&local_key))
.map(|(p, c), _| (p, StreamMuxerBox::new(c)));
(builder.boxed(), format!("/ip4/{ip}/udp/0/quic-v1"))
}
"tcp" => {
Transport::Tcp => {
let builder = libp2p::tcp::tokio::Transport::new(libp2p::tcp::Config::new())
.upgrade(libp2p::core::upgrade::Version::V1Lazy);

(
build_builder(builder, &secure_channel_param, &muxer_param, &local_key),
build_builder(builder, secure_channel_param, muxer_param, &local_key),
format!("/ip4/{ip}/tcp/0"),
)
}
"ws" => {
Transport::Ws => {
let builder = WsConfig::new(libp2p::tcp::tokio::Transport::new(
libp2p::tcp::Config::new(),
))
.upgrade(libp2p::core::upgrade::Version::V1Lazy);

(
build_builder(builder, &secure_channel_param, &muxer_param, &local_key),
build_builder(builder, secure_channel_param, muxer_param, &local_key),
format!("/ip4/{ip}/tcp/0/ws"),
)
}
"webrtc" => (
Transport::Webrtc => (
webrtc::tokio::Transport::new(
local_key,
webrtc::tokio::Certificate::generate(&mut rand::thread_rng())?,
Expand All @@ -105,7 +121,6 @@ async fn main() -> Result<()> {
.boxed(),
format!("/ip4/{ip}/udp/0/webrtc"),
),
_ => panic!("Unsupported"),
};

let swarm = OrphanRuleWorkaround(Swarm::with_tokio_executor(
Expand All @@ -119,7 +134,7 @@ async fn main() -> Result<()> {

// Use peer id as a String so that `run_ping` does not depend on a specific libp2p version.
let local_peer_id = local_peer_id.to_string();
run_ping(client, swarm, &local_addr, &local_peer_id).await?;
run_ping(client, swarm, &local_addr, &local_peer_id, is_dialer).await?;

Ok(())
}
Expand Down
41 changes: 36 additions & 5 deletions multidim-interop/rust/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,44 @@
use std::env;
use std::time::Duration;

use anyhow::{Context, Result};
use env_logger::Env;
use log::info;
use redis::{AsyncCommands, Client as Rclient};
use strum::EnumString;

const REDIS_TIMEOUT: usize = 10;

/// Supported transports by rust-libp2p.
#[derive(Clone, Debug, EnumString)]
#[strum(serialize_all = "kebab-case")]
pub enum Transport {
Tcp,
QuicV1,
Webrtc,
Ws,
}
mxinden marked this conversation as resolved.
Show resolved Hide resolved

/// Supported stream multiplexers by rust-libp2p.
#[derive(Clone, Debug, EnumString)]
#[strum(serialize_all = "kebab-case")]
pub enum Muxer {
Mplex,
Yamux,
Quic,
Webrtc,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These aren't strictly muxers. I think we should leave the muxer variable empty when we don't explicitly set one. IMO, that will be cleaner implementation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah makes sense, thanks Thomas addressed :)

}

/// Supported security protocols by rust-libp2p.
#[derive(Clone, Debug, EnumString)]
#[strum(serialize_all = "kebab-case")]
pub enum SecProtocol {
Noise,
Tls,
Quic,
Webrtc,
}

/// PingSwarm allows us to abstract over libp2p versions for `run_ping`.
#[async_trait::async_trait]
pub trait PingSwarm: Sized + Send + 'static {
async fn listen_on(&mut self, address: &str) -> Result<String>;
Expand All @@ -23,11 +54,15 @@ pub trait PingSwarm: Sized + Send + 'static {
fn local_peer_id(&self) -> String;
}

/// Run a ping interop test. Based on `is_dialer`, either dial the address
/// retrieved via `listenAddr` key over the redis connection. Or wait to be pinged and have
/// `dialerDone` key ready on the redis connection.
pub async fn run_ping<S>(
client: Rclient,
mut swarm: S,
local_addr: &str,
local_peer_id: &str,
is_dialer: bool,
) -> Result<()>
where
S: PingSwarm,
Expand All @@ -37,10 +72,6 @@ where
info!("Running ping test: {}", swarm.local_peer_id());
env_logger::Builder::from_env(Env::default().default_filter_or("info")).init();

let is_dialer = env::var("is_dialer")
.unwrap_or("true".into())
.parse::<bool>()?;

info!(
"Test instance, listening for incoming connections on: {:?}.",
local_addr
Expand Down