Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: log bandwidth on substream instead of socket level #3180

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,11 @@

# 0.51.0 [unreleased]

- Count bandwidth at the application level. Previously `BandwidthLogging` would implement `Transport` and now implements `StreamMuxer` ([PR 3180](https://github.com/libp2p/rust-libp2p/pull/3180)).
- `BandwidthLogging::new` now requires a 2nd argument: `Arc<BandwidthSinks>`
- Remove `BandwidthFuture`
- Rename `BandwidthConnecLogging` to `InstrumentedStream`
- Remove `SimpleProtocol` due to being unused. See [`libp2p::core::upgrade`](https://docs.rs/libp2p/0.50.0/libp2p/core/upgrade/index.html) for alternatives. See [PR 3191].

- Update individual crates.
- Update to [`libp2p-dcutr` `v0.9.0`](protocols/dcutr/CHANGELOG.md#090).

Expand Down
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ env_logger = "0.10.0"
clap = { version = "4.0.13", features = ["derive"] }
tokio = { version = "1.15", features = ["io-util", "io-std", "macros", "rt", "rt-multi-thread"] }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why split these with a newline?

libp2p-mplex = { version = "0.38.0", path = "muxers/mplex" }
libp2p-noise = { version = "0.41.0", path = "transports/noise" }
libp2p-tcp = { version = "0.38.0", path = "transports/tcp", features = ["tokio"] }
Comment on lines +139 to +141
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a deliberate choice to link these with a version?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be fixed with #3261.


[workspace]
members = [
"core",
Expand Down
149 changes: 53 additions & 96 deletions src/bandwidth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,13 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::{
core::{
transport::{TransportError, TransportEvent},
Transport,
},
Multiaddr,
};
use crate::core::muxing::{StreamMuxer, StreamMuxerEvent};

use futures::{
io::{IoSlice, IoSliceMut},
prelude::*,
ready,
};
use libp2p_core::transport::ListenerId;
use std::{
convert::TryFrom as _,
io,
Expand All @@ -43,130 +36,94 @@ use std::{
task::{Context, Poll},
};

/// Wraps around a `Transport` and counts the number of bytes that go through all the opened
/// connections.
/// Wraps around a [`StreamMuxer`] and counts the number of bytes that go through all the opened
/// streams.
#[derive(Clone)]
#[pin_project::pin_project]
pub struct BandwidthLogging<TInner> {
pub(crate) struct BandwidthLogging<SMInner> {
#[pin]
inner: TInner,
inner: SMInner,
sinks: Arc<BandwidthSinks>,
}

impl<TInner> BandwidthLogging<TInner> {
/// Creates a new [`BandwidthLogging`] around the transport.
pub fn new(inner: TInner) -> (Self, Arc<BandwidthSinks>) {
let sink = Arc::new(BandwidthSinks {
inbound: AtomicU64::new(0),
outbound: AtomicU64::new(0),
});

let trans = BandwidthLogging {
inner,
sinks: sink.clone(),
};

(trans, sink)
impl<SMInner> BandwidthLogging<SMInner> {
/// Creates a new [`BandwidthLogging`] around the stream muxer.
pub(crate) fn new(inner: SMInner, sinks: Arc<BandwidthSinks>) -> Self {
Self { inner, sinks }
}
}

impl<TInner> Transport for BandwidthLogging<TInner>
impl<SMInner> StreamMuxer for BandwidthLogging<SMInner>
where
TInner: Transport,
SMInner: StreamMuxer,
{
type Output = BandwidthConnecLogging<TInner::Output>;
type Error = TInner::Error;
type ListenerUpgrade = BandwidthFuture<TInner::ListenerUpgrade>;
type Dial = BandwidthFuture<TInner::Dial>;
type Substream = InstrumentedStream<SMInner::Substream>;
type Error = SMInner::Error;

fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
) -> Poll<Result<StreamMuxerEvent, Self::Error>> {
let this = self.project();
match this.inner.poll(cx) {
Poll::Ready(event) => {
let event = event.map_upgrade({
let sinks = this.sinks.clone();
|inner| BandwidthFuture { inner, sinks }
});
Poll::Ready(event)
}
Poll::Pending => Poll::Pending,
}
}

fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<Self::Error>> {
self.inner.listen_on(addr)
this.inner.poll(cx)
}

fn remove_listener(&mut self, id: ListenerId) -> bool {
self.inner.remove_listener(id)
}

fn dial(&mut self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
let sinks = self.sinks.clone();
self.inner
.dial(addr)
.map(move |fut| BandwidthFuture { inner: fut, sinks })
}

fn dial_as_listener(
&mut self,
addr: Multiaddr,
) -> Result<Self::Dial, TransportError<Self::Error>> {
let sinks = self.sinks.clone();
self.inner
.dial_as_listener(addr)
.map(move |fut| BandwidthFuture { inner: fut, sinks })
}

fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
self.inner.address_translation(server, observed)
fn poll_inbound(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>> {
let this = self.project();
let inner = ready!(this.inner.poll_inbound(cx)?);
let logged = InstrumentedStream {
inner,
sinks: this.sinks.clone(),
};
Poll::Ready(Ok(logged))
}
}

/// Wraps around a `Future` that produces a connection. Wraps the connection around a bandwidth
/// counter.
#[pin_project::pin_project]
pub struct BandwidthFuture<TInner> {
#[pin]
inner: TInner,
sinks: Arc<BandwidthSinks>,
}

impl<TInner: TryFuture> Future for BandwidthFuture<TInner> {
type Output = Result<BandwidthConnecLogging<TInner::Ok>, TInner::Error>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
fn poll_outbound(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>> {
let this = self.project();
let inner = ready!(this.inner.try_poll(cx)?);
let logged = BandwidthConnecLogging {
let inner = ready!(this.inner.poll_outbound(cx)?);
let logged = InstrumentedStream {
inner,
sinks: this.sinks.clone(),
};
Poll::Ready(Ok(logged))
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let this = self.project();
this.inner.poll_close(cx)
}
}

/// Allows obtaining the average bandwidth of the connections created from a [`BandwidthLogging`].
/// Allows obtaining the average bandwidth of the streams.
pub struct BandwidthSinks {
inbound: AtomicU64,
outbound: AtomicU64,
}

impl BandwidthSinks {
/// Returns the total number of bytes that have been downloaded on all the connections spawned
/// through the [`BandwidthLogging`].
/// Returns a new [`BandwidthSinks`].
pub(crate) fn new() -> Arc<Self> {
Arc::new(Self {
inbound: AtomicU64::new(0),
outbound: AtomicU64::new(0),
})
}

/// Returns the total number of bytes that have been downloaded on all the streams.
///
/// > **Note**: This method is by design subject to race conditions. The returned value should
/// > only ever be used for statistics purposes.
pub fn total_inbound(&self) -> u64 {
self.inbound.load(Ordering::Relaxed)
}

/// Returns the total number of bytes that have been uploaded on all the connections spawned
/// through the [`BandwidthLogging`].
/// Returns the total number of bytes that have been uploaded on all the streams.
///
/// > **Note**: This method is by design subject to race conditions. The returned value should
/// > only ever be used for statistics purposes.
Expand All @@ -175,15 +132,15 @@ impl BandwidthSinks {
}
}

/// Wraps around an `AsyncRead + AsyncWrite` and logs the bandwidth that goes through it.
/// Wraps around an [`AsyncRead`] + [`AsyncWrite`] and logs the bandwidth that goes through it.
#[pin_project::pin_project]
pub struct BandwidthConnecLogging<TInner> {
pub(crate) struct InstrumentedStream<SMInner> {
#[pin]
inner: TInner,
inner: SMInner,
sinks: Arc<BandwidthSinks>,
}

impl<TInner: AsyncRead> AsyncRead for BandwidthConnecLogging<TInner> {
impl<SMInner: AsyncRead> AsyncRead for InstrumentedStream<SMInner> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
Expand Down Expand Up @@ -213,7 +170,7 @@ impl<TInner: AsyncRead> AsyncRead for BandwidthConnecLogging<TInner> {
}
}

impl<TInner: AsyncWrite> AsyncWrite for BandwidthConnecLogging<TInner> {
impl<SMInner: AsyncWrite> AsyncWrite for InstrumentedStream<SMInner> {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
Expand Down
66 changes: 59 additions & 7 deletions src/transport_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,74 @@

//! Provides the `TransportExt` trait.

use crate::{bandwidth::BandwidthLogging, bandwidth::BandwidthSinks, Transport};
use crate::core::{
muxing::{StreamMuxer, StreamMuxerBox},
transport::Boxed,
PeerId,
};
use crate::{
bandwidth::{BandwidthLogging, BandwidthSinks},
Transport,
};
use std::sync::Arc;

/// Trait automatically implemented on all objects that implement `Transport`. Provides some
/// additional utilities.
pub trait TransportExt: Transport {
/// Adds a layer on the `Transport` that logs all trafic that passes through the sockets
/// Adds a layer on the `Transport` that logs all trafic that passes through the streams
/// created by it.
///
melekes marked this conversation as resolved.
Show resolved Hide resolved
/// This method returns an `Arc<BandwidthSinks>` that can be used to retreive the total number
/// of bytes transferred through the sockets.
fn with_bandwidth_logging(self) -> (BandwidthLogging<Self>, Arc<BandwidthSinks>)
/// This method returns an `Arc<BandwidthSinks>` that can be used to retrieve the total number
/// of bytes transferred through the streams.
///
/// # Example
///
/// ```
/// use libp2p_mplex as mplex;
/// use libp2p_noise as noise;
/// use libp2p_tcp as tcp;
/// use libp2p::{
/// core::upgrade,
/// identity,
/// TransportExt,
/// Transport,
/// };
///
/// let id_keys = identity::Keypair::generate_ed25519();
///
/// let transport = tcp::tokio::Transport::new(tcp::Config::default().nodelay(true))
/// .upgrade(upgrade::Version::V1)
/// .authenticate(
/// noise::NoiseAuthenticated::xx(&id_keys)
/// .expect("Signing libp2p-noise static DH keypair failed."),
/// )
/// .multiplex(mplex::MplexConfig::new())
/// .boxed();
///
/// let (transport, sinks) = transport.with_bandwidth_logging();
/// ```
fn with_bandwidth_logging<S>(self) -> (Boxed<(PeerId, StreamMuxerBox)>, Arc<BandwidthSinks>)
melekes marked this conversation as resolved.
Show resolved Hide resolved
where
Self: Sized,
Self: Sized + Send + Unpin + 'static,
Self::Dial: Send + 'static,
Self::ListenerUpgrade: Send + 'static,
Self::Error: Send + Sync,
Self::Output: Into<(PeerId, S)>,
S: StreamMuxer + Send + 'static,
S::Substream: Send + 'static,
S::Error: Send + Sync + 'static,
{
BandwidthLogging::new(self)
let sinks = BandwidthSinks::new();
let sinks_copy = sinks.clone();
let transport = Transport::map(self, |output, _| {
let (peer_id, stream_muxer_box) = output.into();
(
peer_id,
StreamMuxerBox::new(BandwidthLogging::new(stream_muxer_box, sinks_copy)),
)
})
.boxed();
(transport, sinks)
}
}

Expand Down