diff --git a/transports/quic/src/transport.rs b/transports/quic/src/transport.rs index e309d53a8f5..e56d55eee5a 100644 --- a/transports/quic/src/transport.rs +++ b/transports/quic/src/transport.rs @@ -93,8 +93,8 @@ impl Transport for QuicTransport { type Dial = Pin> + Send>>; fn listen_on(&mut self, addr: Multiaddr) -> Result> { - let socket_addr = multiaddr_to_socketaddr(&addr) - .ok_or(TransportError::MultiaddrNotSupported(addr))?; + let socket_addr = + multiaddr_to_socketaddr(&addr).ok_or(TransportError::MultiaddrNotSupported(addr))?; let listener_id = ListenerId::new(); let listener = Listener::new(listener_id, socket_addr, self.config.clone()) .map_err(TransportError::Other)?; diff --git a/transports/quic/tests/smoke.rs b/transports/quic/tests/smoke.rs index d00cb78cfbb..dbd66815406 100644 --- a/transports/quic/tests/smoke.rs +++ b/transports/quic/tests/smoke.rs @@ -2,15 +2,18 @@ use anyhow::Result; use async_trait::async_trait; use futures::future::FutureExt; use futures::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; +use futures::select; use futures::stream::StreamExt; use futures::task::Spawn; +use libp2p::core::multiaddr::Protocol; use libp2p::core::muxing::StreamMuxerBox; -use libp2p::core::{upgrade, Transport}; +use libp2p::core::{upgrade, ConnectedPoint, Transport}; use libp2p::request_response::{ ProtocolName, ProtocolSupport, RequestResponse, RequestResponseCodec, RequestResponseConfig, RequestResponseEvent, RequestResponseMessage, }; -use libp2p::swarm::{Swarm, SwarmEvent}; +use libp2p::swarm::dial_opts::{DialOpts, PeerCondition}; +use libp2p::swarm::{DialError, Swarm, SwarmEvent}; use libp2p_quic::{Config as QuicConfig, QuicTransport}; use rand::RngCore; use std::num::NonZeroU8; @@ -437,3 +440,105 @@ fn concurrent_connections_and_streams() { // QuickCheck::new().quickcheck(prop as fn(_, _) -> _); } + +#[async_std::test] +async fn endpoint_reuse() -> Result<()> { + setup_global_subscriber(); + + let mut swarm_a = create_swarm(false).await?; + let mut swarm_b = create_swarm(false).await?; + let b_peer_id = *swarm_b.local_peer_id(); + + swarm_a.listen_on("/ip4/127.0.0.1/udp/0/quic".parse()?)?; + let a_addr = match swarm_a.next().await { + Some(SwarmEvent::NewListenAddr { address, .. }) => address, + e => panic!("{:?}", e), + }; + + swarm_b.dial(a_addr.clone()).unwrap(); + let b_send_back_addr = loop { + select! { + ev = swarm_a.select_next_some() => match ev { + SwarmEvent::ConnectionEstablished { endpoint, .. } => { + break endpoint.get_remote_address().clone() + } + SwarmEvent::IncomingConnection { local_addr, ..} => { + assert!(swarm_a.listeners().any(|a| a == &local_addr)); + } + e => panic!("{:?}", e), + }, + ev = swarm_b.select_next_some() => match ev { + SwarmEvent::ConnectionEstablished { .. } => {}, + e => panic!("{:?}", e), + } + } + }; + + let dial_opts = DialOpts::peer_id(b_peer_id) + .addresses(vec![b_send_back_addr.clone()]) + .extend_addresses_through_behaviour() + .condition(PeerCondition::Always) + .build(); + swarm_a.dial(dial_opts).unwrap(); + + // Expect the dial to fail since b is not listening on an address. + loop { + select! { + ev = swarm_a.select_next_some() => match ev { + SwarmEvent::ConnectionEstablished { ..} => panic!("Unexpected dial success."), + SwarmEvent::OutgoingConnectionError {error, .. } => { + assert!(matches!(error, DialError::Transport(_))); + break + } + _ => {} + }, + _ = swarm_b.select_next_some() => {}, + } + } + swarm_b.listen_on("/ip4/127.0.0.1/udp/0/quic".parse()?)?; + let b_addr = match swarm_b.next().await { + Some(SwarmEvent::NewListenAddr { address, .. }) => address, + e => panic!("{:?}", e), + }; + + let dial_opts = DialOpts::peer_id(b_peer_id) + .addresses(vec![b_addr.clone(), b_send_back_addr]) + .condition(PeerCondition::Always) + .build(); + swarm_a.dial(dial_opts).unwrap(); + let expected_b_addr = b_addr.with(Protocol::P2p(b_peer_id.into())); + + let mut a_reported = false; + let mut b_reported = false; + while !a_reported || !b_reported { + select! { + ev = swarm_a.select_next_some() => match ev{ + SwarmEvent::ConnectionEstablished { endpoint, ..} => { + assert!(endpoint.is_dialer()); + assert_eq!(endpoint.get_remote_address(), &expected_b_addr); + a_reported = true; + } + SwarmEvent::OutgoingConnectionError {error, .. } => { + panic!("Unexpected error {:}", error) + } + _ => {} + }, + ev = swarm_b.select_next_some() => match ev{ + SwarmEvent::ConnectionEstablished { endpoint, ..} => { + match endpoint { + ConnectedPoint::Dialer{..} => panic!("Unexpected outbound connection"), + ConnectedPoint::Listener {send_back_addr, local_addr} => { + // Expect that the local listening endpoint was used for dialing. + assert!(swarm_b.listeners().any(|a| a == &local_addr)); + assert_eq!(send_back_addr, a_addr); + b_reported = true; + } + } + } + _ => {} + }, + } + } + + Ok(()) +}