Skip to content

Commit

Permalink
transports/quic: follow naming convention of libp2p#2217
Browse files Browse the repository at this point in the history
  • Loading branch information
elenaf9 committed Oct 4, 2022
1 parent 831383a commit 8e729bf
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 50 deletions.
6 changes: 3 additions & 3 deletions transports/quic/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ pub enum ToEndpoint {
},
/// Instruct the endpoint to send a packet of data on its UDP socket.
SendUdpPacket(quinn_proto::Transmit),
/// The [`QuicTransport`] [`Dialer`] or [`Listener`] coupled to this endpoint was dropped.
/// The [`GenTransport`] [`Dialer`] or [`Listener`] coupled to this endpoint was dropped.
/// Once all pending connection closed the [`EndpointDriver`] should shut down.
Decoupled,
}
Expand Down Expand Up @@ -239,7 +239,7 @@ pub enum ToEndpoint {
/// - One channel per each existing connection that communicates messages from the [`EndpointDriver`]
/// to that [`Connection`].
/// - One channel for the [`EndpointDriver`] to send newly-opened connections to. The receiving
/// side is processed by the [`QuicTransport`][crate::QuicTransport].
/// side is processed by the [`GenTransport`][crate::GenTransport].
///
/// In order to avoid an unbounded buffering of events, we prioritize sending data on the UDP
/// socket over everything else. If the network interface is too busy to process our packets,
Expand Down Expand Up @@ -338,7 +338,7 @@ impl<P: Provider> EndpointDriver<P> {
}
}

/// Handle a message sent from either the [`QuicTransport`](super::QuicTransport) or a [`Connection`].
/// Handle a message sent from either the [`GenTransport`](super::GenTransport) or a [`Connection`].
fn handle_message(&mut self, to_endpoint: ToEndpoint) -> ControlFlow<()> {
match to_endpoint {
ToEndpoint::Dial { addr, result } => {
Expand Down
14 changes: 7 additions & 7 deletions transports/quic/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@
//! # #[cfg(feature = "async-std")]
//! # fn main() -> std::io::Result<()> {
//! #
//! use libp2p_quic::{AsyncStdTransport, Config};
//! use libp2p_quic as quic;
//! use libp2p_core::{Multiaddr, Transport};
//!
//! let keypair = libp2p_core::identity::Keypair::generate_ed25519();
//! let quic_config = Config::new(&keypair).expect("could not make config");
//! let quic_config = quic::Config::new(&keypair).expect("could not make config");
//!
//! let mut quic_transport = AsyncStdTransport::new(quic_config);
//! let mut quic_transport = quic::async_std::Transport::new(quic_config);
//!
//! let addr = "/ip4/127.0.0.1/udp/12345/quic".parse().expect("bad address?");
//! quic_transport.listen_on(addr).expect("listen error.");
Expand All @@ -46,7 +46,7 @@
//! # }
//! ```
//!
//! The [`QuicTransport`] struct implements the [`libp2p_core::Transport`]. See the
//! The [`GenTransport`] struct implements the [`libp2p_core::Transport`]. See the
//! documentation of [`libp2p_core`] and of libp2p in general to learn how to use the
//! [`Transport`][libp2p_core::Transport] trait.
//!
Expand All @@ -69,8 +69,8 @@ pub use endpoint::Config;
pub use muxer::QuicMuxer;
pub use quinn_proto::ConnectError as DialError;
#[cfg(feature = "async-std")]
pub use transport::{AsyncStd, AsyncStdTransport};
pub use transport::{Provider, QuicTransport, TransportError};
pub use transport::async_std;
#[cfg(feature = "tokio")]
pub use transport::{Tokio, TokioTransport};
pub use transport::tokio;
pub use transport::{GenTransport, Provider, TransportError};
pub use upgrade::Upgrade;
22 changes: 9 additions & 13 deletions transports/quic/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,10 @@ use crate::{endpoint::EndpointChannel, muxer::QuicMuxer, upgrade::Upgrade};
use crate::{Config, ConnectionError};

#[cfg(feature = "async-std")]
mod async_std;
pub mod async_std;
#[cfg(feature = "tokio")]
mod tokio;
#[cfg(feature = "async-std")]
pub use async_std::{AsyncStd, AsyncStdTransport};
pub mod tokio;
use futures::future::BoxFuture;
#[cfg(feature = "tokio")]
pub use tokio::{Tokio, TokioTransport};

use futures::channel::{mpsc, oneshot};
use futures::ready;
Expand All @@ -61,7 +57,7 @@ use std::{
};

#[derive(Debug)]
pub struct QuicTransport<P> {
pub struct GenTransport<P> {
config: Config,
listeners: SelectAll<Listener>,
/// Dialer for Ipv4 addresses if no matching listener exists.
Expand All @@ -72,7 +68,7 @@ pub struct QuicTransport<P> {
_marker: PhantomData<P>,
}

impl<P> QuicTransport<P> {
impl<P> GenTransport<P> {
pub fn new(config: Config) -> Self {
Self {
listeners: SelectAll::new(),
Expand Down Expand Up @@ -102,7 +98,7 @@ pub enum TransportError {
EndpointDriverCrashed,
}

impl<P: Provider> Transport for QuicTransport<P> {
impl<P: Provider> Transport for GenTransport<P> {
type Output = (PeerId, QuicMuxer);
type Error = TransportError;
type ListenerUpgrade = Upgrade;
Expand Down Expand Up @@ -615,7 +611,7 @@ mod test {
async fn tokio_close_listener() {
let keypair = libp2p_core::identity::Keypair::generate_ed25519();
let config = Config::new(&keypair).unwrap();
let transport = TokioTransport::new(config.clone());
let transport = super::tokio::Transport::new(config.clone());
test_close_listener(transport).await
}

Expand All @@ -624,16 +620,16 @@ mod test {
async fn async_std_close_listener() {
let keypair = libp2p_core::identity::Keypair::generate_ed25519();
let config = Config::new(&keypair).unwrap();
let transport = AsyncStdTransport::new(config.clone());
let transport = super::async_std::Transport::new(config.clone());
test_close_listener(transport).await
}

async fn test_close_listener<P: Provider>(mut transport: QuicTransport<P>) {
async fn test_close_listener<P: Provider>(mut transport: GenTransport<P>) {
assert!(poll_fn(|cx| Pin::new(&mut transport).as_mut().poll(cx))
.now_or_never()
.is_none());

// Run test twice to check that there is no unexpected behaviour if `QuicTransport.listener`
// Run test twice to check that there is no unexpected behaviour if `GenTransport.listener`
// is temporarily empty.
for _ in 0..2 {
let listener = transport
Expand Down
13 changes: 7 additions & 6 deletions transports/quic/src/transport/async_std.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,23 @@ use std::{
use async_std_crate::{net::UdpSocket, task::spawn};
use futures::{future::BoxFuture, ready, Future, FutureExt, Stream, StreamExt};

use crate::QuicTransport;
use crate::GenTransport;

use super::Provider;
use super::Provider as ProviderTrait;

pub type AsyncStdTransport = QuicTransport<AsyncStd>;
pub struct AsyncStd {
pub type Transport = GenTransport<Provider>;

pub struct Provider {
socket: Arc<UdpSocket>,
send_packet: Option<BoxFuture<'static, Result<(), io::Error>>>,
recv_stream: ReceiveStream,
}

impl Provider for AsyncStd {
impl ProviderTrait for Provider {
fn from_socket(socket: std::net::UdpSocket) -> io::Result<Self> {
let socket = Arc::new(socket.into());
let recv_stream = ReceiveStream::new(Arc::clone(&socket));
Ok(AsyncStd {
Ok(Provider {
socket,
send_packet: None,
recv_stream,
Expand Down
12 changes: 6 additions & 6 deletions transports/quic/src/transport/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,21 @@ use futures::{ready, Future};
use tokio_crate::{io::ReadBuf, net::UdpSocket};
use x509_parser::nom::AsBytes;

use crate::QuicTransport;
use crate::GenTransport;

use super::Provider;
use super::Provider as ProviderTrait;

pub type TokioTransport = QuicTransport<Tokio>;
pub struct Tokio {
pub type Transport = GenTransport<Provider>;
pub struct Provider {
socket: UdpSocket,
socket_recv_buffer: Vec<u8>,
next_packet_out: Option<(Vec<u8>, SocketAddr)>,
}

impl Provider for Tokio {
impl ProviderTrait for Provider {
fn from_socket(socket: std::net::UdpSocket) -> std::io::Result<Self> {
let socket = UdpSocket::from_std(socket)?;
Ok(Tokio {
Ok(Provider {
socket,
socket_recv_buffer: vec![0; 65536],
next_packet_out: None,
Expand Down
26 changes: 11 additions & 15 deletions transports/quic/tests/smoke.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,6 @@ use std::{io, iter};

#[cfg(feature = "async-std")]
use async_std_crate as async_std;
#[cfg(feature = "async-std")]
use libp2p_quic::AsyncStd;
#[cfg(feature = "tokio")]
use libp2p_quic::Tokio;
#[cfg(feature = "tokio")]
use tokio_crate as tokio;

Expand All @@ -40,7 +36,7 @@ async fn create_swarm<P: Provider>() -> Swarm<RequestResponse<PingCodec>> {
let keypair = generate_tls_keypair();
let peer_id = keypair.public().to_peer_id();
let config = quic::Config::new(&keypair).unwrap();
let transport = quic::QuicTransport::<P>::new(config);
let transport = quic::GenTransport::<P>::new(config);

let transport = Transport::map(transport, |(peer, muxer), _| {
(peer, StreamMuxerBox::new(muxer))
Expand All @@ -64,13 +60,13 @@ async fn start_listening(swarm: &mut Swarm<RequestResponse<PingCodec>>, addr: &s
#[cfg(feature = "tokio")]
#[tokio::test]
async fn tokio_smoke() {
smoke::<Tokio>().await
smoke::<quic::tokio::Provider>().await
}

#[cfg(feature = "async-std")]
#[async_std::test]
async fn async_std_smoke() {
smoke::<AsyncStd>().await
smoke::<quic::async_std::Provider>().await
}

async fn smoke<P: Provider>() {
Expand Down Expand Up @@ -295,8 +291,8 @@ impl RequestResponseCodec for PingCodec {
#[cfg(feature = "async-std")]
#[async_std::test]
async fn dial_failure() {
let mut a = create_swarm::<AsyncStd>().await;
let mut b = create_swarm::<AsyncStd>().await;
let mut a = create_swarm::<quic::async_std::Provider>().await;
let mut b = create_swarm::<quic::async_std::Provider>().await;

let addr = start_listening(&mut a, "/ip4/127.0.0.1/udp/0/quic").await;

Expand Down Expand Up @@ -445,19 +441,19 @@ fn concurrent_connections_and_streams() {
#[cfg(feature = "tokio")]
tokio::runtime::Runtime::new()
.unwrap()
.block_on(prop::<Tokio>(num_listener, num_streams));
.block_on(prop::<quic::tokio::Provider>(num_listener, num_streams));

#[cfg(feature = "async-std")]
async_std::task::block_on(prop::<AsyncStd>(num_listener, num_streams));
async_std::task::block_on(prop::<quic::async_std::Provider>(num_listener, num_streams));

// QuickCheck::new().quickcheck(prop as fn(_, _) -> _);
}

#[cfg(feature = "tokio")]
#[tokio::test]
async fn endpoint_reuse() {
let mut swarm_a = create_swarm::<Tokio>().await;
let mut swarm_b = create_swarm::<Tokio>().await;
let mut swarm_a = create_swarm::<quic::tokio::Provider>().await;
let mut swarm_b = create_swarm::<quic::tokio::Provider>().await;
let b_peer_id = *swarm_b.local_peer_id();

let a_addr = start_listening(&mut swarm_a, "/ip4/127.0.0.1/udp/0/quic").await;
Expand Down Expand Up @@ -547,8 +543,8 @@ async fn endpoint_reuse() {
#[cfg(feature = "async-std")]
#[async_std::test]
async fn ipv4_dial_ipv6() {
let mut swarm_a = create_swarm::<AsyncStd>().await;
let mut swarm_b = create_swarm::<AsyncStd>().await;
let mut swarm_a = create_swarm::<quic::async_std::Provider>().await;
let mut swarm_b = create_swarm::<quic::async_std::Provider>().await;

let a_addr = start_listening(&mut swarm_a, "/ip6/::1/udp/0/quic").await;

Expand Down

0 comments on commit 8e729bf

Please sign in to comment.