Skip to content

Commit

Permalink
Move utilities to the bottom
Browse files Browse the repository at this point in the history
  • Loading branch information
thomaseizinger committed Nov 2, 2022
1 parent 445eea2 commit 578e42e
Showing 1 changed file with 150 additions and 148 deletions.
298 changes: 150 additions & 148 deletions transports/quic/tests/smoke.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,6 @@ use std::io;
use std::num::NonZeroU8;
use std::time::Duration;

fn generate_tls_keypair() -> libp2p::identity::Keypair {
libp2p::identity::Keypair::generate_ed25519()
}

fn create_transport<P: Provider>() -> (PeerId, Boxed<(PeerId, StreamMuxerBox)>) {
let keypair = generate_tls_keypair();
let peer_id = keypair.public().to_peer_id();
let mut config = quic::Config::new(&keypair);
config.handshake_timeout = Duration::from_secs(1);

let transport = quic::GenTransport::<P>::new(config)
.map(|(p, c), _| (p, StreamMuxerBox::new(c)))
.boxed();

(peer_id, transport)
}

async fn start_listening(transport: &mut Boxed<(PeerId, StreamMuxerBox)>, addr: &str) -> Multiaddr {
transport.listen_on(addr.parse().unwrap()).unwrap();
match transport.next().await {
Some(TransportEvent::NewAddress { listen_addr, .. }) => listen_addr,
e => panic!("{:?}", e),
}
}

#[cfg(feature = "tokio")]
#[tokio::test]
async fn tokio_smoke() {
Expand All @@ -56,20 +31,6 @@ async fn async_std_smoke() {
smoke::<quic::async_std::Provider>().await
}

async fn smoke<P: Provider>() {
let _ = env_logger::try_init();

let (a_peer_id, mut a_transport) = create_transport::<P>();
let (b_peer_id, mut b_transport) = create_transport::<P>();

let addr = start_listening(&mut a_transport, "/ip4/127.0.0.1/udp/0/quic").await;
let ((a_connected, _, _), (b_connected, _)) =
connect(&mut a_transport, &mut b_transport, addr).await;

assert_eq!(a_connected, b_peer_id);
assert_eq!(b_connected, a_peer_id);
}

#[cfg(feature = "async-std")]
#[async_std::test]
async fn dial_failure() {
Expand All @@ -88,6 +49,116 @@ async fn dial_failure() {
};
}


#[cfg(feature = "tokio")]
#[tokio::test]
async fn endpoint_reuse() {
let _ = env_logger::try_init();
let (_, mut a_transport) = create_transport::<quic::tokio::Provider>();
let (_, mut b_transport) = create_transport::<quic::tokio::Provider>();

let a_addr = start_listening(&mut a_transport, "/ip4/127.0.0.1/udp/0/quic").await;
let ((_, b_send_back_addr, _), _) =
connect(&mut a_transport, &mut b_transport, a_addr.clone()).await;

// Expect the dial to fail since b is not listening on an address.
match dial(&mut a_transport, b_send_back_addr).await {
Ok(_) => panic!("Expected dial to fail"),
Err(error) => {
assert_eq!("Handshake with the remote timed out.", error.to_string())
}
};

let b_addr = start_listening(&mut b_transport, "/ip4/127.0.0.1/udp/0/quic").await;
let ((_, a_send_back_addr, _), _) = connect(&mut b_transport, &mut a_transport, b_addr).await;

assert_eq!(a_send_back_addr, a_addr);
}

#[cfg(feature = "async-std")]
#[async_std::test]
async fn ipv4_dial_ipv6() {
let _ = env_logger::try_init();
let (a_peer_id, mut a_transport) = create_transport::<quic::async_std::Provider>();
let (b_peer_id, mut b_transport) = create_transport::<quic::async_std::Provider>();

let a_addr = start_listening(&mut a_transport, "/ip6/::1/udp/0/quic").await;
let ((a_connected, _, _), (b_connected, _)) =
connect(&mut a_transport, &mut b_transport, a_addr).await;

assert_eq!(a_connected, b_peer_id);
assert_eq!(b_connected, a_peer_id);
}

#[cfg(feature = "async-std")]
#[async_std::test]
#[ignore] // Transport currently does not validate PeerId. Delete this test?
async fn wrong_peerid() {
use libp2p::PeerId;

let (a_peer_id, mut a_transport) = create_transport::<quic::async_std::Provider>();
let (b_peer_id, mut b_transport) = create_transport::<quic::async_std::Provider>();

let a_addr = start_listening(&mut a_transport, "/ip6/::1/udp/0/quic").await;
let a_addr_random_peer = a_addr.with(Protocol::P2p(PeerId::random().into()));

let ((a_connected, _, _), (b_connected, _)) =
connect(&mut a_transport, &mut b_transport, a_addr_random_peer).await;

assert_ne!(a_connected, b_peer_id);
assert_eq!(b_connected, a_peer_id);
}

#[cfg(feature = "async-std")]
fn new_tcp_quic_transport() -> (PeerId, Boxed<(PeerId, StreamMuxerBox)>) {
let keypair = generate_tls_keypair();
let peer_id = keypair.public().to_peer_id();
let mut config = quic::Config::new(&keypair);
config.handshake_timeout = Duration::from_secs(1);

let quic_transport = quic::async_std::Transport::new(config);
let tcp_transport = tcp::async_io::Transport::new(tcp::Config::default())
.upgrade(upgrade::Version::V1)
.authenticate(
noise::NoiseConfig::xx(
noise::Keypair::<noise::X25519Spec>::new()
.into_authentic(&keypair)
.unwrap(),
)
.into_authenticated(),
)
.multiplex(yamux::YamuxConfig::default());

let transport = OrTransport::new(quic_transport, tcp_transport)
.map(|either_output, _| match either_output {
EitherOutput::First((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)),
EitherOutput::Second((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)),
})
.boxed();

(peer_id, transport)
}

#[cfg(feature = "async-std")]
#[async_std::test]
async fn tcp_and_quic() {
let (a_peer_id, mut a_transport) = new_tcp_quic_transport();
let (b_peer_id, mut b_transport) = new_tcp_quic_transport();

let quic_addr = start_listening(&mut a_transport, "/ip4/127.0.0.1/udp/0/quic").await;
let tcp_addr = start_listening(&mut a_transport, "/ip4/127.0.0.1/tcp/0").await;

let ((a_connected, _, _), (b_connected, _)) =
connect(&mut a_transport, &mut b_transport, quic_addr).await;
assert_eq!(a_connected, b_peer_id);
assert_eq!(b_connected, a_peer_id);

let ((a_connected, _, _), (b_connected, _)) =
connect(&mut a_transport, &mut b_transport, tcp_addr).await;
assert_eq!(a_connected, b_peer_id);
assert_eq!(b_connected, a_peer_id);
}

// Note: This test should likely be ported to the muxer compliance test suite.
#[cfg(feature = "async-std")]
#[test]
Expand All @@ -112,6 +183,20 @@ fn concurrent_connections_and_streams_tokio() {
.quickcheck(prop::<quic::tokio::Provider> as fn(_, _) -> _);
}

async fn smoke<P: Provider>() {
let _ = env_logger::try_init();

let (a_peer_id, mut a_transport) = create_transport::<P>();
let (b_peer_id, mut b_transport) = create_transport::<P>();

let addr = start_listening(&mut a_transport, "/ip4/127.0.0.1/udp/0/quic").await;
let ((a_connected, _, _), (b_connected, _)) =
connect(&mut a_transport, &mut b_transport, addr).await;

assert_eq!(a_connected, b_peer_id);
assert_eq!(b_connected, a_peer_id);
}

fn prop<P: Provider + BlockOn>(
number_listeners: NonZeroU8,
number_streams: NonZeroU8,
Expand Down Expand Up @@ -270,115 +355,6 @@ async fn open_outbound_streams<P: Provider, const BUFFER_SIZE: usize>(
while let Ok(_) = future::poll_fn(|cx| connection.poll_unpin(cx)).await {}
}

#[cfg(feature = "tokio")]
#[tokio::test]
async fn endpoint_reuse() {
let _ = env_logger::try_init();
let (_, mut a_transport) = create_transport::<quic::tokio::Provider>();
let (_, mut b_transport) = create_transport::<quic::tokio::Provider>();

let a_addr = start_listening(&mut a_transport, "/ip4/127.0.0.1/udp/0/quic").await;
let ((_, b_send_back_addr, _), _) =
connect(&mut a_transport, &mut b_transport, a_addr.clone()).await;

// Expect the dial to fail since b is not listening on an address.
match dial(&mut a_transport, b_send_back_addr).await {
Ok(_) => panic!("Expected dial to fail"),
Err(error) => {
assert_eq!("Handshake with the remote timed out.", error.to_string())
}
};

let b_addr = start_listening(&mut b_transport, "/ip4/127.0.0.1/udp/0/quic").await;
let ((_, a_send_back_addr, _), _) = connect(&mut b_transport, &mut a_transport, b_addr).await;

assert_eq!(a_send_back_addr, a_addr);
}

#[cfg(feature = "async-std")]
#[async_std::test]
async fn ipv4_dial_ipv6() {
let _ = env_logger::try_init();
let (a_peer_id, mut a_transport) = create_transport::<quic::async_std::Provider>();
let (b_peer_id, mut b_transport) = create_transport::<quic::async_std::Provider>();

let a_addr = start_listening(&mut a_transport, "/ip6/::1/udp/0/quic").await;
let ((a_connected, _, _), (b_connected, _)) =
connect(&mut a_transport, &mut b_transport, a_addr).await;

assert_eq!(a_connected, b_peer_id);
assert_eq!(b_connected, a_peer_id);
}

#[cfg(feature = "async-std")]
#[async_std::test]
#[ignore] // Transport currently does not validate PeerId. Delete this test?
async fn wrong_peerid() {
use libp2p::PeerId;

let (a_peer_id, mut a_transport) = create_transport::<quic::async_std::Provider>();
let (b_peer_id, mut b_transport) = create_transport::<quic::async_std::Provider>();

let a_addr = start_listening(&mut a_transport, "/ip6/::1/udp/0/quic").await;
let a_addr_random_peer = a_addr.with(Protocol::P2p(PeerId::random().into()));

let ((a_connected, _, _), (b_connected, _)) =
connect(&mut a_transport, &mut b_transport, a_addr_random_peer).await;

assert_ne!(a_connected, b_peer_id);
assert_eq!(b_connected, a_peer_id);
}

#[cfg(feature = "async-std")]
fn new_tcp_quic_transport() -> (PeerId, Boxed<(PeerId, StreamMuxerBox)>) {
let keypair = generate_tls_keypair();
let peer_id = keypair.public().to_peer_id();
let mut config = quic::Config::new(&keypair);
config.handshake_timeout = Duration::from_secs(1);

let quic_transport = quic::async_std::Transport::new(config);
let tcp_transport = tcp::async_io::Transport::new(tcp::Config::default())
.upgrade(upgrade::Version::V1)
.authenticate(
noise::NoiseConfig::xx(
noise::Keypair::<noise::X25519Spec>::new()
.into_authentic(&keypair)
.unwrap(),
)
.into_authenticated(),
)
.multiplex(yamux::YamuxConfig::default());

let transport = OrTransport::new(quic_transport, tcp_transport)
.map(|either_output, _| match either_output {
EitherOutput::First((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)),
EitherOutput::Second((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)),
})
.boxed();

(peer_id, transport)
}

#[cfg(feature = "async-std")]
#[async_std::test]
async fn tcp_and_quic() {
let (a_peer_id, mut a_transport) = new_tcp_quic_transport();
let (b_peer_id, mut b_transport) = new_tcp_quic_transport();

let quic_addr = start_listening(&mut a_transport, "/ip4/127.0.0.1/udp/0/quic").await;
let tcp_addr = start_listening(&mut a_transport, "/ip4/127.0.0.1/tcp/0").await;

let ((a_connected, _, _), (b_connected, _)) =
connect(&mut a_transport, &mut b_transport, quic_addr).await;
assert_eq!(a_connected, b_peer_id);
assert_eq!(b_connected, a_peer_id);

let ((a_connected, _, _), (b_connected, _)) =
connect(&mut a_transport, &mut b_transport, tcp_addr).await;
assert_eq!(a_connected, b_peer_id);
assert_eq!(b_connected, a_peer_id);
}

/// Helper function for driving two transports until they established a connection.
async fn connect(
listener: &mut Boxed<(PeerId, StreamMuxerBox)>,
Expand Down Expand Up @@ -416,6 +392,32 @@ async fn dial(
}
}

fn generate_tls_keypair() -> libp2p::identity::Keypair {
libp2p::identity::Keypair::generate_ed25519()
}

fn create_transport<P: Provider>() -> (PeerId, Boxed<(PeerId, StreamMuxerBox)>) {
let keypair = generate_tls_keypair();
let peer_id = keypair.public().to_peer_id();
let mut config = quic::Config::new(&keypair);
config.handshake_timeout = Duration::from_secs(1);

let transport = quic::GenTransport::<P>::new(config)
.map(|(p, c), _| (p, StreamMuxerBox::new(c)))
.boxed();

(peer_id, transport)
}

async fn start_listening(transport: &mut Boxed<(PeerId, StreamMuxerBox)>, addr: &str) -> Multiaddr {
transport.listen_on(addr.parse().unwrap()).unwrap();
match transport.next().await {
Some(TransportEvent::NewAddress { listen_addr, .. }) => listen_addr,
e => panic!("{:?}", e),
}
}


trait BlockOn {
fn block_on<R>(future: impl Future<Output = R> + Send) -> R;
}
Expand Down

0 comments on commit 578e42e

Please sign in to comment.