Skip to content

Commit

Permalink
transports/quic: test endpoint re-use
Browse files Browse the repository at this point in the history
  • Loading branch information
elenaf9 committed Jul 10, 2022
1 parent e5e5b34 commit 39d855c
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 4 deletions.
4 changes: 2 additions & 2 deletions transports/quic/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ impl Transport for QuicTransport {
type Dial = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;

fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<Self::Error>> {
let socket_addr = multiaddr_to_socketaddr(&addr)
.ok_or(TransportError::MultiaddrNotSupported(addr))?;
let socket_addr =
multiaddr_to_socketaddr(&addr).ok_or(TransportError::MultiaddrNotSupported(addr))?;
let listener_id = ListenerId::new();
let listener = Listener::new(listener_id, socket_addr, self.config.clone())
.map_err(TransportError::Other)?;
Expand Down
109 changes: 107 additions & 2 deletions transports/quic/tests/smoke.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,18 @@ use anyhow::Result;
use async_trait::async_trait;
use futures::future::FutureExt;
use futures::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use futures::select;
use futures::stream::StreamExt;
use futures::task::Spawn;
use libp2p::core::multiaddr::Protocol;
use libp2p::core::muxing::StreamMuxerBox;
use libp2p::core::{upgrade, Transport};
use libp2p::core::{upgrade, ConnectedPoint, Transport};
use libp2p::request_response::{
ProtocolName, ProtocolSupport, RequestResponse, RequestResponseCodec, RequestResponseConfig,
RequestResponseEvent, RequestResponseMessage,
};
use libp2p::swarm::{Swarm, SwarmEvent};
use libp2p::swarm::dial_opts::{DialOpts, PeerCondition};
use libp2p::swarm::{DialError, Swarm, SwarmEvent};
use libp2p_quic::{Config as QuicConfig, QuicTransport};
use rand::RngCore;
use std::num::NonZeroU8;
Expand Down Expand Up @@ -437,3 +440,105 @@ fn concurrent_connections_and_streams() {

// QuickCheck::new().quickcheck(prop as fn(_, _) -> _);
}

#[async_std::test]
async fn endpoint_reuse() -> Result<()> {
setup_global_subscriber();

let mut swarm_a = create_swarm(false).await?;
let mut swarm_b = create_swarm(false).await?;
let b_peer_id = *swarm_b.local_peer_id();

swarm_a.listen_on("/ip4/127.0.0.1/udp/0/quic".parse()?)?;
let a_addr = match swarm_a.next().await {
Some(SwarmEvent::NewListenAddr { address, .. }) => address,
e => panic!("{:?}", e),
};

swarm_b.dial(a_addr.clone()).unwrap();
let b_send_back_addr = loop {
select! {
ev = swarm_a.select_next_some() => match ev {
SwarmEvent::ConnectionEstablished { endpoint, .. } => {
break endpoint.get_remote_address().clone()
}
SwarmEvent::IncomingConnection { local_addr, ..} => {
assert!(swarm_a.listeners().any(|a| a == &local_addr));
}
e => panic!("{:?}", e),
},
ev = swarm_b.select_next_some() => match ev {
SwarmEvent::ConnectionEstablished { .. } => {},
e => panic!("{:?}", e),
}
}
};

let dial_opts = DialOpts::peer_id(b_peer_id)
.addresses(vec![b_send_back_addr.clone()])
.extend_addresses_through_behaviour()
.condition(PeerCondition::Always)
.build();
swarm_a.dial(dial_opts).unwrap();

// Expect the dial to fail since b is not listening on an address.
loop {
select! {
ev = swarm_a.select_next_some() => match ev {
SwarmEvent::ConnectionEstablished { ..} => panic!("Unexpected dial success."),
SwarmEvent::OutgoingConnectionError {error, .. } => {
assert!(matches!(error, DialError::Transport(_)));
break
}
_ => {}
},
_ = swarm_b.select_next_some() => {},
}
}
swarm_b.listen_on("/ip4/127.0.0.1/udp/0/quic".parse()?)?;
let b_addr = match swarm_b.next().await {
Some(SwarmEvent::NewListenAddr { address, .. }) => address,
e => panic!("{:?}", e),
};

let dial_opts = DialOpts::peer_id(b_peer_id)
.addresses(vec![b_addr.clone(), b_send_back_addr])
.condition(PeerCondition::Always)
.build();
swarm_a.dial(dial_opts).unwrap();
let expected_b_addr = b_addr.with(Protocol::P2p(b_peer_id.into()));

let mut a_reported = false;
let mut b_reported = false;
while !a_reported || !b_reported {
select! {
ev = swarm_a.select_next_some() => match ev{
SwarmEvent::ConnectionEstablished { endpoint, ..} => {
assert!(endpoint.is_dialer());
assert_eq!(endpoint.get_remote_address(), &expected_b_addr);
a_reported = true;
}
SwarmEvent::OutgoingConnectionError {error, .. } => {
panic!("Unexpected error {:}", error)
}
_ => {}
},
ev = swarm_b.select_next_some() => match ev{
SwarmEvent::ConnectionEstablished { endpoint, ..} => {
match endpoint {
ConnectedPoint::Dialer{..} => panic!("Unexpected outbound connection"),
ConnectedPoint::Listener {send_back_addr, local_addr} => {
// Expect that the local listening endpoint was used for dialing.
assert!(swarm_b.listeners().any(|a| a == &local_addr));
assert_eq!(send_back_addr, a_addr);
b_reported = true;
}
}
}
_ => {}
},
}
}

Ok(())
}

0 comments on commit 39d855c

Please sign in to comment.