Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(perf): use libp2p-request-response #3646

Merged
merged 41 commits into from
May 28, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
e4afb8e
Have at most one stream in-flight
mxinden Mar 20, 2023
1c5384d
Implement single connection requests per second benchmark
mxinden Mar 20, 2023
2bc92fe
Use `colored` crate for log output
mxinden Mar 20, 2023
9dffaf7
Implement connections per second benchmark
mxinden Mar 20, 2023
bff4279
Add ping latency benchmark
mxinden Mar 20, 2023
dfa1e57
Fix units
mxinden Mar 20, 2023
ed6dd3f
Print benchmark output according to json schema
mxinden Mar 24, 2023
6778a6f
Merge branch 'master' of https://github.com/libp2p/rust-libp2p into perf
mxinden Apr 5, 2023
bebb7e7
Update data schema
mxinden Apr 8, 2023
a01ff2c
Introduce custom benchmark
mxinden Apr 11, 2023
8cef2d6
Fix forwarding upload and download bytes
mxinden Apr 11, 2023
095ceb6
Wait for remote to close read side
mxinden Apr 11, 2023
0019366
Provide option to pass secret key seed
mxinden Apr 12, 2023
2a68630
Follow go implementation entry point convention
mxinden Apr 12, 2023
d3c3ad4
Change to tokio runtime
thomaseizinger Apr 13, 2023
285a175
Fix DNS imports
mxinden Apr 13, 2023
cc064f5
Use nodelay don't reuse port
mxinden Apr 14, 2023
4cde5b5
Increase read buffer
mxinden Apr 14, 2023
3aec5e5
Set TCP nodelay on server
mxinden Apr 14, 2023
cef0f23
Merge server and client into single entry point
mxinden Apr 24, 2023
66a28b3
Run n times on same connection
mxinden Apr 24, 2023
da5cc28
Fix info log
mxinden Apr 24, 2023
90964ff
Use request-response and adjust to new flags
mxinden May 1, 2023
e984af6
Debug log level
mxinden May 1, 2023
59dd784
request-response closes for us
mxinden May 1, 2023
9d49f26
Bump timeouts
mxinden May 1, 2023
613e684
Log on info
mxinden May 1, 2023
f515473
Fix compilation
mxinden May 1, 2023
59d8f3f
Remove n-times flag
mxinden May 11, 2023
1d18761
Merge branch 'master' of https://github.com/libp2p/rust-libp2p into perf
mxinden May 11, 2023
08e4276
Provide ip:port
mxinden May 12, 2023
647d76c
Remove schema
mxinden May 19, 2023
c4c84e7
Differentiate by up- and download time
mxinden May 19, 2023
fa6886c
Remove Benchmark trait
mxinden May 19, 2023
488f007
Use serde camelCase
mxinden May 19, 2023
ec52392
Log with millis to debug RTT
mxinden May 26, 2023
15e3642
Merge branch 'master' of https://github.com/libp2p/rust-libp2p into perf
mxinden May 26, 2023
e343e5a
Use `swarm` to contruct server
mxinden May 28, 2023
539eb3d
Fix clippy
mxinden May 28, 2023
095f920
Merge branch 'master' of https://github.com/libp2p/rust-libp2p into perf
mxinden May 28, 2023
6bb18a5
Use tls instead of noise
mxinden May 28, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions protocols/perf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ categories = ["network-programming", "asynchronous"]
[dependencies]
anyhow = "1"
async-std = { version = "1.9.0", features = ["attributes"] }
async-trait = "0.1"
clap = { version = "4.1.6", features = ["derive"] }
colored = "2"
env_logger = "0.10.0"
futures = "0.3.26"
instant = "0.1.11"
Expand Down
273 changes: 224 additions & 49 deletions protocols/perf/src/bin/perf-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@
// DEALINGS IN THE SOFTWARE.

use anyhow::{bail, Result};
use async_trait::async_trait;
use clap::Parser;
use colored::*;
use futures::{future::Either, StreamExt};
use instant::Instant;
use libp2p_core::{muxing::StreamMuxerBox, transport::OrTransport, upgrade, Multiaddr, Transport};
use libp2p_dns::DnsConfig;
use libp2p_identity::PeerId;
use libp2p_perf::client::RunParams;
use libp2p_swarm::{SwarmBuilder, SwarmEvent};
use libp2p_swarm::{Swarm, SwarmBuilder, SwarmEvent};
use log::info;

#[derive(Debug, Parser)]
Expand All @@ -41,8 +44,217 @@ async fn main() -> Result<()> {

let opts = Opts::parse();

info!("Initiating performance tests with {}", opts.server_address);
let benchmarks: [Box<dyn Benchmark>; 3] = [
Box::new(Throughput {}),
Box::new(RequestsPerSecond {}),
Box::new(ConnectionsPerSecond {}),
];

for benchmark in benchmarks {
info!(
"{}",
format!("Start benchmark: {}", benchmark.name()).underline(),
);

benchmark.run(opts.server_address.clone()).await?;
}

Ok(())
}

#[async_trait]
trait Benchmark {
fn name(&self) -> &'static str;

async fn run(&self, server_address: Multiaddr) -> Result<()>;
}

struct Throughput {}

#[async_trait]
impl Benchmark for Throughput {
fn name(&self) -> &'static str {
"single connection single channel throughput"
}

async fn run(&self, server_address: Multiaddr) -> Result<()> {
let mut swarm = swarm().await;

let server_peer_id = connect(&mut swarm, server_address.clone()).await?;

swarm.behaviour_mut().perf(
server_peer_id,
RunParams {
to_send: 10 * 1024 * 1024,
to_receive: 10 * 1024 * 1024,
},
)?;

let stats = loop {
match swarm.next().await.unwrap() {
SwarmEvent::ConnectionEstablished {
peer_id, endpoint, ..
} => {
info!("Established connection to {:?} via {:?}", peer_id, endpoint);
}
SwarmEvent::OutgoingConnectionError { peer_id, error } => {
info!("Outgoing connection error to {:?}: {:?}", peer_id, error);
}
SwarmEvent::Behaviour(libp2p_perf::client::Event { id: _, result }) => {
break result?
}
e => panic!("{e:?}"),
}
};

let sent_mebibytes = stats.params.to_send as f64 / 1024.0 / 1024.0;
let sent_time = (stats.timers.write_done - stats.timers.write_start).as_secs_f64();
let sent_bandwidth_mebibit_second =
format!("{:.2} MiBit/s", (sent_mebibytes * 8.0) / sent_time).bold();

let received_mebibytes = stats.params.to_receive as f64 / 1024.0 / 1024.0;
let receive_time = (stats.timers.read_done - stats.timers.write_done).as_secs_f64();
let receive_bandwidth_mebibit_second =
format!("{:.2} MiBit/s", (received_mebibytes * 8.0) / receive_time).bold();

info!(
"Finished: sent {sent_mebibytes:.2} MiB in {sent_time:.2} s with \
{sent_bandwidth_mebibit_second} and received \
{received_mebibytes:.2} MiB in {receive_time:.2} s with \
{receive_bandwidth_mebibit_second}\n",
);
Ok(())
}
}

struct RequestsPerSecond {}

#[async_trait]
impl Benchmark for RequestsPerSecond {
fn name(&self) -> &'static str {
"single connection parallel requests per second"
}

async fn run(&self, server_address: Multiaddr) -> Result<()> {
let mut swarm = swarm().await;

let server_peer_id = connect(&mut swarm, server_address.clone()).await?;

let num = 10_000;
let to_send = 1;
let to_receive = 1;

for _ in 0..num {
swarm.behaviour_mut().perf(
server_peer_id,
RunParams {
to_send,
to_receive,
},
)?;
}

let mut finished = 0;
let start = Instant::now();

loop {
match swarm.next().await.unwrap() {
SwarmEvent::ConnectionEstablished {
peer_id, endpoint, ..
} => {
info!("Established connection to {:?} via {:?}", peer_id, endpoint);
}
SwarmEvent::OutgoingConnectionError { peer_id, error } => {
info!("Outgoing connection error to {:?}: {:?}", peer_id, error);
}
SwarmEvent::Behaviour(libp2p_perf::client::Event {
id: _,
result: Ok(_),
}) => {
finished += 1;

if finished == num {
break;
}
}
e => panic!("{e:?}"),
}
}

let duration = start.elapsed().as_secs_f64();
let requests_per_second = format!("{:.2} req/s", num as f64 / duration).bold();

info!(
"Finished: sent {num} {to_send} bytes requests with {to_receive} bytes response each within {duration:.2} s thus {requests_per_second}\n",
);

Ok(())
}
}

struct ConnectionsPerSecond {}

#[async_trait]
impl Benchmark for ConnectionsPerSecond {
fn name(&self) -> &'static str {
"sequential connections with single request per second"
}

async fn run(&self, server_address: Multiaddr) -> Result<()> {
let num = 100;
let to_send = 1;
let to_receive = 1;
let start = Instant::now();
let mut finished = 0;

for _ in 0..num {
let mut swarm = swarm().await;

let server_peer_id = connect(&mut swarm, server_address.clone()).await?;

swarm.behaviour_mut().perf(
server_peer_id,
RunParams {
to_send,
to_receive,
},
)?;

match swarm.next().await.unwrap() {
SwarmEvent::ConnectionEstablished {
peer_id, endpoint, ..
} => {
info!("Established connection to {:?} via {:?}", peer_id, endpoint);
}
SwarmEvent::OutgoingConnectionError { peer_id, error } => {
info!("Outgoing connection error to {:?}: {:?}", peer_id, error);
}
SwarmEvent::Behaviour(libp2p_perf::client::Event {
id: _,
result: Ok(_),
}) => {
finished += 1;

if finished == num {
break;
}
}
e => panic!("{e:?}"),
};
}

let duration = start.elapsed().as_secs_f64();
let connections_per_second = format!("{:.2} conns/s", num as f64 / duration).bold();

info!(
"Finished: established {num} connections with one {to_send} bytes request with {to_receive} bytes response each within {duration:.2} s thus {connections_per_second}\n",
);

Ok(())
}
}

async fn swarm() -> Swarm<libp2p_perf::client::Behaviour> {
// Create a random PeerId
let local_key = libp2p_identity::Keypair::generate_ed25519();
let local_peer_id = PeerId::from(local_key.public());
Expand Down Expand Up @@ -74,15 +286,21 @@ async fn main() -> Result<()> {
.boxed()
};

let mut swarm = SwarmBuilder::with_async_std_executor(
SwarmBuilder::with_async_std_executor(
transport,
libp2p_perf::client::Behaviour::default(),
local_peer_id,
)
.substream_upgrade_protocol_override(upgrade::Version::V1Lazy)
.build();
.build()
}

async fn connect(
swarm: &mut Swarm<libp2p_perf::client::Behaviour>,
server_address: Multiaddr,
) -> Result<PeerId> {
swarm.dial(server_address.clone()).unwrap();

swarm.dial(opts.server_address.clone()).unwrap();
let server_peer_id = loop {
match swarm.next().await.unwrap() {
SwarmEvent::ConnectionEstablished { peer_id, .. } => break peer_id,
Expand All @@ -93,48 +311,5 @@ async fn main() -> Result<()> {
}
};

info!(
"Connection to {} established. Launching benchmarks.",
opts.server_address
);

swarm.behaviour_mut().perf(
server_peer_id,
RunParams {
to_send: 10 * 1024 * 1024,
to_receive: 10 * 1024 * 1024,
},
)?;

let stats = loop {
match swarm.next().await.unwrap() {
SwarmEvent::ConnectionEstablished {
peer_id, endpoint, ..
} => {
info!("Established connection to {:?} via {:?}", peer_id, endpoint);
}
SwarmEvent::OutgoingConnectionError { peer_id, error } => {
info!("Outgoing connection error to {:?}: {:?}", peer_id, error);
}
SwarmEvent::Behaviour(libp2p_perf::client::Event { id: _, result }) => break result?,
e => panic!("{e:?}"),
}
};

let sent_mebibytes = stats.params.to_send as f64 / 1024.0 / 1024.0;
let sent_time = (stats.timers.write_done - stats.timers.write_start).as_secs_f64();
let sent_bandwidth_mebibit_second = (sent_mebibytes * 8.0) / sent_time;

let received_mebibytes = stats.params.to_receive as f64 / 1024.0 / 1024.0;
let receive_time = (stats.timers.read_done - stats.timers.write_done).as_secs_f64();
let receive_bandwidth_mebibit_second = (received_mebibytes * 8.0) / receive_time;

info!(
"Finished run: Sent {sent_mebibytes:.2} MiB in {sent_time:.2} s with \
{sent_bandwidth_mebibit_second:.2} MiBit/s and received \
{received_mebibytes:.2} MiB in {receive_time:.2} s with \
{receive_bandwidth_mebibit_second:.2} MiBit/s",
);

Ok(())
Ok(server_peer_id)
}
Loading