Skip to content

Commit

Permalink
update rust test plans to use enums instead.
Browse files Browse the repository at this point in the history
  • Loading branch information
jxs committed Jan 16, 2023
1 parent acac1d9 commit f999445
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 30 deletions.
29 changes: 29 additions & 0 deletions multidim-interop/rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions multidim-interop/rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ tokio = { version = "1.24.1", features = ["full"] }

libp2pv0500 = { package = "libp2p", version = "0.50.0", features = ["websocket", "webrtc", "quic", "mplex", "yamux", "tcp", "tokio", "ping", "noise", "tls", "dns", "rsa", "macros"] }
rand = "0.8.5"
strum = { version = "0.24.1", features = ["derive"] }

61 changes: 36 additions & 25 deletions multidim-interop/rust/src/bin/testplan_0500.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,44 +11,42 @@ use libp2p::core::upgrade::EitherUpgrade;
use libp2p::swarm::{keep_alive, NetworkBehaviour, SwarmEvent};
use libp2p::websocket::WsConfig;
use libp2p::{
core, identity, mplex, noise, ping, webrtc, yamux, Multiaddr, PeerId, Swarm, Transport,
core, identity, mplex, noise, ping, webrtc, yamux, Multiaddr, PeerId, Swarm, Transport as _,
};
use libp2pv0500 as libp2p;
use testplan::{run_ping, PingSwarm};
use testplan::{run_ping, Muxer, PingSwarm, SecProtocol, Transport};

fn build_builder<T, C>(
builder: core::transport::upgrade::Builder<T>,
secure_channel_param: &str,
muxer_param: &str,
secure_channel_param: SecProtocol,
muxer_param: Muxer,
local_key: &identity::Keypair,
) -> Boxed<(libp2p::PeerId, StreamMuxerBox)>
where
T: Transport<Output = C> + Send + Unpin + 'static,
T: libp2p::Transport<Output = C> + Send + Unpin + 'static,
<T as libp2p::Transport>::Error: Sync + Send + 'static,
<T as libp2p::Transport>::ListenerUpgrade: Send,
<T as libp2p::Transport>::Dial: Send,
C: AsyncRead + AsyncWrite + Send + Unpin + 'static,
{
let mux_upgrade = match muxer_param {
"yamux" => EitherUpgrade::A(yamux::YamuxConfig::default()),
"mplex" => EitherUpgrade::B(mplex::MplexConfig::default()),
_ => panic!("Unsupported muxer"),
Muxer::Yamux => EitherUpgrade::A(yamux::YamuxConfig::default()),
Muxer::Mplex => EitherUpgrade::B(mplex::MplexConfig::default()),
};

let timeout = Duration::from_secs(5);

match secure_channel_param {
"noise" => builder
SecProtocol::Noise => builder
.authenticate(noise::NoiseAuthenticated::xx(&local_key).unwrap())
.multiplex(mux_upgrade)
.timeout(timeout)
.boxed(),
"tls" => builder
SecProtocol::Tls => builder
.authenticate(libp2p::tls::Config::new(&local_key).unwrap())
.multiplex(mux_upgrade)
.timeout(timeout)
.boxed(),
_ => panic!("Unsupported secure channel"),
}
}

Expand All @@ -57,46 +55,60 @@ async fn main() -> Result<()> {
let local_key = identity::Keypair::generate_ed25519();
let local_peer_id = PeerId::from(local_key.public());

let transport_param =
env::var("transport").context("transport environment variable is not set")?;
let secure_channel_param =
env::var("security").context("security environment variable is not set")?;
let muxer_param = env::var("muxer").context("muxer environment variable is not set")?;
let transport_param: testplan::Transport = env::var("transport")
.context("transport environment variable is not set")?
.parse()
.context("unsupported transport")?;

let secure_channel_param: testplan::SecProtocol = env::var("security")
.context("security environment variable is not set")?
.parse()
.context("unsupported secure channel")?;

let muxer_param: Muxer = env::var("muxer")
.context("muxer environment variable is not set")?
.parse()
.context("unsupported muxer")?;
let ip = env::var("ip").context("ip environment variable is not set")?;

let is_dialer = env::var("is_dialer")
.unwrap_or("true".into())
.parse::<bool>()?;

let redis_addr = env::var("REDIS_ADDR")
.map(|addr| format!("redis://{addr}"))
.unwrap_or("redis://redis:6379".into());

let client = redis::Client::open(redis_addr).context("Could not connect to redis")?;

let (boxed_transport, local_addr) = match transport_param.as_str() {
"quic-v1" => {
let (boxed_transport, local_addr) = match transport_param {
Transport::QuicV1 => {
let builder =
libp2p::quic::tokio::Transport::new(libp2p::quic::Config::new(&local_key))
.map(|(p, c), _| (p, StreamMuxerBox::new(c)));
(builder.boxed(), format!("/ip4/{ip}/udp/0/quic-v1"))
}
"tcp" => {
Transport::Tcp => {
let builder = libp2p::tcp::tokio::Transport::new(libp2p::tcp::Config::new())
.upgrade(libp2p::core::upgrade::Version::V1Lazy);

(
build_builder(builder, &secure_channel_param, &muxer_param, &local_key),
build_builder(builder, secure_channel_param, muxer_param, &local_key),
format!("/ip4/{ip}/tcp/0"),
)
}
"ws" => {
Transport::Ws => {
let builder = WsConfig::new(libp2p::tcp::tokio::Transport::new(
libp2p::tcp::Config::new(),
))
.upgrade(libp2p::core::upgrade::Version::V1Lazy);

(
build_builder(builder, &secure_channel_param, &muxer_param, &local_key),
build_builder(builder, secure_channel_param, muxer_param, &local_key),
format!("/ip4/{ip}/tcp/0/ws"),
)
}
"webrtc" => (
Transport::Webrtc => (
webrtc::tokio::Transport::new(
local_key,
webrtc::tokio::Certificate::generate(&mut rand::thread_rng())?,
Expand All @@ -105,7 +117,6 @@ async fn main() -> Result<()> {
.boxed(),
format!("/ip4/{ip}/udp/0/webrtc"),
),
_ => panic!("Unsupported"),
};

let swarm = OrphanRuleWorkaround(Swarm::with_tokio_executor(
Expand All @@ -119,7 +130,7 @@ async fn main() -> Result<()> {

// Use peer id as a String so that `run_ping` does not depend on a specific libp2p version.
let local_peer_id = local_peer_id.to_string();
run_ping(client, swarm, &local_addr, &local_peer_id).await?;
run_ping(client, swarm, &local_addr, &local_peer_id, is_dialer).await?;

Ok(())
}
Expand Down
37 changes: 32 additions & 5 deletions multidim-interop/rust/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,40 @@
use std::env;
use std::time::Duration;

use anyhow::{Context, Result};
use env_logger::Env;
use log::info;
use redis::{AsyncCommands, Client as Rclient};
use strum::EnumString;

const REDIS_TIMEOUT: usize = 10;

/// Supported transports by rust-libp2p.
#[derive(Clone, Debug, EnumString)]
#[strum(serialize_all = "kebab-case")]
pub enum Transport {
Tcp,
QuicV1,
Webrtc,
Ws,
}

/// Supported stream multiplexers by rust-libp2p.
#[derive(Clone, Debug, EnumString)]
#[strum(serialize_all = "kebab-case")]
pub enum Muxer {
Mplex,
Yamux,
}

/// Supported security protocols by rust-libp2p.
#[derive(Clone, Debug, EnumString)]
#[strum(serialize_all = "kebab-case")]
pub enum SecProtocol {
Noise,
Tls,
}

/// PingSwarm allows us to abstract over libp2p versions for `run_ping`.
#[async_trait::async_trait]
pub trait PingSwarm: Sized + Send + 'static {
async fn listen_on(&mut self, address: &str) -> Result<String>;
Expand All @@ -23,11 +50,15 @@ pub trait PingSwarm: Sized + Send + 'static {
fn local_peer_id(&self) -> String;
}

/// Run a ping interop test. Based on `is_dialer`, either dial the address
/// retrieved via `listenAddr` key over the redis connection. Or wait to be pinged and have
/// `dialerDone` key ready on the redis connection.
pub async fn run_ping<S>(
client: Rclient,
mut swarm: S,
local_addr: &str,
local_peer_id: &str,
is_dialer: bool,
) -> Result<()>
where
S: PingSwarm,
Expand All @@ -37,10 +68,6 @@ where
info!("Running ping test: {}", swarm.local_peer_id());
env_logger::Builder::from_env(Env::default().default_filter_or("info")).init();

let is_dialer = env::var("is_dialer")
.unwrap_or("true".into())
.parse::<bool>()?;

info!(
"Test instance, listening for incoming connections on: {:?}.",
local_addr
Expand Down

0 comments on commit f999445

Please sign in to comment.