Skip to content

Commit

Permalink
Move upgrades to libp2p-swarm
Browse files Browse the repository at this point in the history
  • Loading branch information
thomaseizinger committed Nov 2, 2023
1 parent ec24c4d commit 5d5a042
Show file tree
Hide file tree
Showing 41 changed files with 653 additions and 837 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ pub use peer_record::PeerRecord;
pub use signed_envelope::SignedEnvelope;
pub use translation::address_translation;
pub use transport::Transport;
pub use upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};

#[derive(Debug, thiserror::Error)]
#[error(transparent)]
Expand Down
301 changes: 283 additions & 18 deletions core/src/transport/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,31 +20,25 @@

//! Configuration of transport protocol upgrades.
pub use crate::upgrade::Version;
pub(crate) use multistream_select::Version;

use crate::upgrade::{InboundConnectionUpgrade, OutboundConnectionUpgrade};
use crate::{connection::ConnectedPoint, Negotiated};
use crate::{
connection::ConnectedPoint,
muxing::{StreamMuxer, StreamMuxerBox},
transport::{
and_then::AndThen, boxed::boxed, timeout::TransportTimeout, ListenerId, Transport,
TransportError, TransportEvent,
},
upgrade::{
self, apply_inbound, apply_outbound, InboundConnectionUpgrade, InboundUpgradeApply,
OutboundConnectionUpgrade, OutboundUpgradeApply, UpgradeError,
},
Negotiated,
upgrade::{self},
};
use futures::{prelude::*, ready};
use futures::{future::Either, prelude::*, ready};
use libp2p_identity::PeerId;
use log::debug;
use multiaddr::Multiaddr;
use std::{
error::Error,
fmt,
pin::Pin,
task::{Context, Poll},
time::Duration,
};
use multistream_select::{self, DialerSelectFuture, ListenerSelectFuture, NegotiationError};
use std::{error::Error, fmt, time::Duration};
use std::{mem, pin::Pin, task::Context, task::Poll};

/// A `Builder` facilitates upgrading of a [`Transport`] for use with
/// a `Swarm`.
Expand Down Expand Up @@ -108,7 +102,7 @@ where
let version = self.version;
Authenticated(Builder::new(
self.inner.and_then(move |conn, endpoint| Authenticate {
inner: upgrade::apply(conn, upgrade, endpoint, version),
inner: apply(conn, upgrade, endpoint, version),
}),
version,
))
Expand Down Expand Up @@ -242,7 +236,7 @@ where
{
let version = self.0.version;
Multiplexed(self.0.inner.and_then(move |(i, c), endpoint| {
let upgrade = upgrade::apply(c, upgrade, endpoint, version);
let upgrade = apply(c, upgrade, endpoint, version);
Multiplex {
peer_id: Some(i),
upgrade,
Expand Down Expand Up @@ -276,7 +270,7 @@ where
{
let version = self.0.version;
Multiplexed(self.0.inner.and_then(move |(peer_id, c), endpoint| {
let upgrade = upgrade::apply(c, up(&peer_id, &endpoint), endpoint, version);
let upgrade = apply(c, up(&peer_id, &endpoint), endpoint, version);
Multiplex {
peer_id: Some(peer_id),
upgrade,
Expand Down Expand Up @@ -616,3 +610,274 @@ where
U: InboundConnectionUpgrade<Negotiated<C>>,
{
}

// TODO: Still needed?
/// Applies an upgrade to the inbound and outbound direction of a connection or substream.
pub(crate) fn apply<C, U>(
conn: C,
up: U,
cp: ConnectedPoint,
v: Version,
) -> Either<InboundUpgradeApply<C, U>, OutboundUpgradeApply<C, U>>
where
C: AsyncRead + AsyncWrite + Unpin,
U: InboundConnectionUpgrade<Negotiated<C>> + OutboundConnectionUpgrade<Negotiated<C>>,
{
match cp {
ConnectedPoint::Dialer { role_override, .. } if role_override.is_dialer() => {
Either::Right(apply_outbound(conn, up, v))
}
_ => Either::Left(apply_inbound(conn, up)),
}
}

/// Tries to perform an upgrade on an inbound connection or substream.
pub(crate) fn apply_inbound<C, U>(conn: C, up: U) -> InboundUpgradeApply<C, U>
where
C: AsyncRead + AsyncWrite + Unpin,
U: InboundConnectionUpgrade<Negotiated<C>>,
{
InboundUpgradeApply {
inner: InboundUpgradeApplyState::Init {
future: multistream_select::listener_select_proto(conn, up.protocol_info()),
upgrade: up,
},
}
}

/// Tries to perform an upgrade on an outbound connection or substream.
pub(crate) fn apply_outbound<C, U>(conn: C, up: U, v: Version) -> OutboundUpgradeApply<C, U>
where
C: AsyncRead + AsyncWrite + Unpin,
U: OutboundConnectionUpgrade<Negotiated<C>>,
{
OutboundUpgradeApply {
inner: OutboundUpgradeApplyState::Init {
future: multistream_select::dialer_select_proto(conn, up.protocol_info(), v),
upgrade: up,
},
}
}

/// Future returned by `apply_inbound`. Drives the upgrade process.
pub struct InboundUpgradeApply<C, U>
where
C: AsyncRead + AsyncWrite + Unpin,
U: InboundConnectionUpgrade<Negotiated<C>>,
{
inner: InboundUpgradeApplyState<C, U>,
}

#[allow(clippy::large_enum_variant)]
enum InboundUpgradeApplyState<C, U>
where
C: AsyncRead + AsyncWrite + Unpin,
U: InboundConnectionUpgrade<Negotiated<C>>,
{
Init {
future: ListenerSelectFuture<C, U::Info>,
upgrade: U,
},
Upgrade {
future: Pin<Box<U::Future>>,
name: String,
},
Undefined,
}

impl<C, U> Unpin for InboundUpgradeApply<C, U>
where
C: AsyncRead + AsyncWrite + Unpin,
U: InboundConnectionUpgrade<Negotiated<C>>,
{
}

impl<C, U> Future for InboundUpgradeApply<C, U>
where
C: AsyncRead + AsyncWrite + Unpin,
U: InboundConnectionUpgrade<Negotiated<C>>,
{
type Output = Result<U::Output, UpgradeError<U::Error>>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
match mem::replace(&mut self.inner, InboundUpgradeApplyState::Undefined) {
InboundUpgradeApplyState::Init {
mut future,
upgrade,
} => {
let (info, io) = match Future::poll(Pin::new(&mut future), cx)? {
Poll::Ready(x) => x,
Poll::Pending => {
self.inner = InboundUpgradeApplyState::Init { future, upgrade };
return Poll::Pending;
}
};
self.inner = InboundUpgradeApplyState::Upgrade {
future: Box::pin(upgrade.upgrade_inbound(io, info.clone())),
name: info.as_ref().to_owned(),
};
}
InboundUpgradeApplyState::Upgrade { mut future, name } => {
match Future::poll(Pin::new(&mut future), cx) {
Poll::Pending => {
self.inner = InboundUpgradeApplyState::Upgrade { future, name };
return Poll::Pending;
}
Poll::Ready(Ok(x)) => {
log::trace!("Upgraded inbound stream to {name}");
return Poll::Ready(Ok(x));
}
Poll::Ready(Err(e)) => {
debug!("Failed to upgrade inbound stream to {name}");
return Poll::Ready(Err(UpgradeError::Apply(e)));
}
}
}
InboundUpgradeApplyState::Undefined => {
panic!("InboundUpgradeApplyState::poll called after completion")
}
}
}
}
}

/// Future returned by `apply_outbound`. Drives the upgrade process.
pub struct OutboundUpgradeApply<C, U>
where
C: AsyncRead + AsyncWrite + Unpin,
U: OutboundConnectionUpgrade<Negotiated<C>>,
{
inner: OutboundUpgradeApplyState<C, U>,
}

enum OutboundUpgradeApplyState<C, U>
where
C: AsyncRead + AsyncWrite + Unpin,
U: OutboundConnectionUpgrade<Negotiated<C>>,
{
Init {
future: DialerSelectFuture<C, <U::InfoIter as IntoIterator>::IntoIter>,
upgrade: U,
},
Upgrade {
future: Pin<Box<U::Future>>,
name: String,
},
Undefined,
}

impl<C, U> Unpin for OutboundUpgradeApply<C, U>
where
C: AsyncRead + AsyncWrite + Unpin,
U: OutboundConnectionUpgrade<Negotiated<C>>,
{
}

impl<C, U> Future for OutboundUpgradeApply<C, U>
where
C: AsyncRead + AsyncWrite + Unpin,
U: OutboundConnectionUpgrade<Negotiated<C>>,
{
type Output = Result<U::Output, UpgradeError<U::Error>>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
match mem::replace(&mut self.inner, OutboundUpgradeApplyState::Undefined) {
OutboundUpgradeApplyState::Init {
mut future,
upgrade,
} => {
let (info, connection) = match Future::poll(Pin::new(&mut future), cx)? {
Poll::Ready(x) => x,
Poll::Pending => {
self.inner = OutboundUpgradeApplyState::Init { future, upgrade };
return Poll::Pending;
}
};
self.inner = OutboundUpgradeApplyState::Upgrade {
future: Box::pin(upgrade.upgrade_outbound(connection, info.clone())),
name: info.as_ref().to_owned(),
};
}
OutboundUpgradeApplyState::Upgrade { mut future, name } => {
match Future::poll(Pin::new(&mut future), cx) {
Poll::Pending => {
self.inner = OutboundUpgradeApplyState::Upgrade { future, name };
return Poll::Pending;
}
Poll::Ready(Ok(x)) => {
log::trace!("Upgraded outbound stream to {name}",);
return Poll::Ready(Ok(x));
}
Poll::Ready(Err(e)) => {
debug!("Failed to upgrade outbound stream to {name}",);
return Poll::Ready(Err(UpgradeError::Apply(e)));
}
}
}
OutboundUpgradeApplyState::Undefined => {
panic!("OutboundUpgradeApplyState::poll called after completion")
}
}
}
}
}

/// Error that can happen when upgrading a connection or substream to use a protocol.
#[derive(Debug)]
pub enum UpgradeError<E> {
/// Error during the negotiation process.
Select(NegotiationError),
/// Error during the post-negotiation handshake.
Apply(E),
}

impl<E> UpgradeError<E> {
pub fn map_err<F, T>(self, f: F) -> UpgradeError<T>
where
F: FnOnce(E) -> T,
{
match self {
UpgradeError::Select(e) => UpgradeError::Select(e),
UpgradeError::Apply(e) => UpgradeError::Apply(f(e)),
}
}

pub fn into_err<T>(self) -> UpgradeError<T>
where
T: From<E>,
{
self.map_err(Into::into)
}
}

impl<E> fmt::Display for UpgradeError<E>
where
E: fmt::Display,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
UpgradeError::Select(_) => write!(f, "Multistream select failed"),
UpgradeError::Apply(_) => write!(f, "Handshake failed"),
}
}
}

impl<E> std::error::Error for UpgradeError<E>
where
E: std::error::Error + 'static,
{
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
UpgradeError::Select(e) => Some(e),
UpgradeError::Apply(e) => Some(e),
}
}
}

impl<E> From<NegotiationError> for UpgradeError<E> {
fn from(e: NegotiationError) -> Self {
UpgradeError::Select(e)
}
}
Loading

0 comments on commit 5d5a042

Please sign in to comment.