Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!(node): make syncer batch sizes configurable #327

Merged
merged 3 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ jobs:
test-wasm:
runs-on: ubuntu-latest
env:
WASM_BINDGEN_TEST_TIMEOUT: 60
WASM_BINDGEN_TEST_TIMEOUT: 120
steps:
- uses: actions/checkout@v1

Expand Down
1 change: 1 addition & 0 deletions cli/src/native.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ pub(crate) async fn run(args: Params) -> Result<()> {
p2p_local_keypair,
p2p_bootnodes,
p2p_listen_on: args.listen_addrs,
sync_batch_size: 512,
blockstore,
store,
})
Expand Down
1 change: 1 addition & 0 deletions node-wasm/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ impl WasmNodeConfig {
p2p_bootnodes,
p2p_local_keypair,
p2p_listen_on: vec![],
sync_batch_size: 128,
blockstore,
store,
})
Expand Down
1 change: 1 addition & 0 deletions node/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ async fn main() {
p2p_local_keypair,
p2p_bootnodes,
p2p_listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()],
sync_batch_size: 512,
blockstore,
store,
})
Expand Down
3 changes: 3 additions & 0 deletions node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ where
pub p2p_bootnodes: Vec<Multiaddr>,
/// List of the addresses where [`Node`] will listen for incoming connections.
pub p2p_listen_on: Vec<Multiaddr>,
/// Maximum size of batches in which headers will be synced.
zvolin marked this conversation as resolved.
Show resolved Hide resolved
pub sync_batch_size: u64,
/// The blockstore for bitswap.
pub blockstore: B,
/// The store for headers.
Expand Down Expand Up @@ -123,6 +125,7 @@ where
store: store.clone(),
p2p: p2p.clone(),
event_pub: event_channel.publisher(),
batch_size: config.sync_batch_size,
})?);

let daser = Arc::new(Daser::start(DaserArgs {
Expand Down
2 changes: 1 addition & 1 deletion node/src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use tokio_util::sync::CancellationToken;
use tracing::{debug, info, instrument, trace, warn};

mod header_ex;
mod header_session;
pub(crate) mod header_session;
pub(crate) mod shwap;
mod swarm;

Expand Down
126 changes: 89 additions & 37 deletions node/src/p2p/header_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ use crate::executor::spawn;
use crate::p2p::header_ex::utils::HeaderRequestExt;
use crate::p2p::{P2pCmd, P2pError};

const MAX_AMOUNT_PER_REQ: u64 = 64;
const MAX_CONCURRENT_REQS: usize = 8;
pub(crate) const MIN_AMOUNT_PER_REQ: u64 = 8;
pub(crate) const MAX_AMOUNT_PER_REQ: u64 = 64;
pub(crate) const MAX_CONCURRENT_REQS: usize = 8;

type Result<T, E = P2pError> = std::result::Result<T, E>;

Expand All @@ -19,6 +20,7 @@ pub(crate) struct HeaderSession {
response_tx: mpsc::Sender<(u64, u64, Result<Vec<ExtendedHeader>>)>,
response_rx: mpsc::Receiver<(u64, u64, Result<Vec<ExtendedHeader>>)>,
ongoing: usize,
batch_size: u64,
}

impl HeaderSession {
Expand All @@ -32,13 +34,18 @@ impl HeaderSession {
/// [`Store::get_stored_header_ranges`]: crate::store::Store::get_stored_header_ranges
pub(crate) fn new(range: BlockRange, cmd_tx: mpsc::Sender<P2pCmd>) -> Self {
let (response_tx, response_rx) = mpsc::channel(MAX_CONCURRENT_REQS);
let batch_size = range
.len()
.div_ceil(MAX_CONCURRENT_REQS as u64)
.clamp(MIN_AMOUNT_PER_REQ, MAX_AMOUNT_PER_REQ);

HeaderSession {
to_fetch: Some(range),
cmd_tx,
response_tx,
response_rx,
ongoing: 0,
batch_size,
}
}

Expand Down Expand Up @@ -100,7 +107,7 @@ impl HeaderSession {
}

pub(crate) async fn send_next_request(&mut self) -> Result<()> {
let Some(range) = take_next_batch(&mut self.to_fetch, MAX_AMOUNT_PER_REQ) else {
let Some(range) = take_next_batch(&mut self.to_fetch, self.batch_size) else {
return Ok(());
};

Expand Down Expand Up @@ -161,28 +168,50 @@ mod tests {
use crate::test_utils::async_test;
use celestia_types::test_utils::ExtendedHeaderGenerator;

#[async_test]
async fn retry_on_missing_range() {
async fn test_batching(to_fetch: u64, batches: usize, batch_size: u64) {
let (_p2p, mut p2p_mock) = P2p::mocked();
let mut gen = ExtendedHeaderGenerator::new();
let headers = gen.next_many(64);
let headers = gen.next_many(to_fetch);

let mut session = HeaderSession::new(1..=to_fetch, p2p_mock.cmd_tx.clone());
assert_eq!(session.batch_size, batch_size);

let mut session = HeaderSession::new(1..=64, p2p_mock.cmd_tx.clone());
let (result_tx, result_rx) = oneshot::channel();
spawn(async move {
let res = session.run().await;
result_tx.send(res).unwrap();
});

let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
assert_eq!(height, 1);
assert_eq!(amount, 64);
respond_to.send(Ok(headers[..60].to_vec())).unwrap();
// all batches but last should have batch_size
let full_batches = batches - 1;
for i in 1..=full_batches {
let (height, amount, respond_to) =
p2p_mock.expect_header_request_for_height_cmd().await;

let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
assert_eq!(height, 61);
assert_eq!(amount, 4);
respond_to.send(Ok(headers[60..64].to_vec())).unwrap();
// batches will be taken backward
let end_offset = i as u64 * batch_size;
let expected_start = 1 + to_fetch - end_offset;
assert_eq!(height, expected_start);
assert_eq!(amount, batch_size);

let start = (height - 1) as usize;
let end = start + amount as usize;
respond_to.send(Ok(headers[start..end].to_vec())).unwrap();
}

// last batch should be what's left
let leftover = to_fetch - batch_size * full_batches as u64;
if leftover > 0 {
let (height, amount, respond_to) =
p2p_mock.expect_header_request_for_height_cmd().await;

assert_eq!(height, 1);
assert_eq!(amount, leftover);

let start = (height - 1) as usize;
let end = start + amount as usize;
respond_to.send(Ok(headers[start..end].to_vec())).unwrap();
}

p2p_mock.expect_no_cmd().await;

Expand All @@ -191,34 +220,57 @@ mod tests {
}

#[async_test]
async fn nine_batches() {
async fn split_range_to_batches() {
// single batch smaller or equal MIN_AMOUNT_PER_REQ
test_batching(1, 1, MIN_AMOUNT_PER_REQ).await;
test_batching(5, 1, MIN_AMOUNT_PER_REQ).await;
test_batching(7, 1, MIN_AMOUNT_PER_REQ).await;
test_batching(8, 1, MIN_AMOUNT_PER_REQ).await;

// many batches of MIN_AMOUNT_PER_REQ
test_batching(10, 2, MIN_AMOUNT_PER_REQ).await;
test_batching(16, 2, MIN_AMOUNT_PER_REQ).await;
test_batching(30, 4, MIN_AMOUNT_PER_REQ).await;
test_batching(50, 7, MIN_AMOUNT_PER_REQ).await;
test_batching(63, 8, MIN_AMOUNT_PER_REQ).await;
test_batching(64, 8, MIN_AMOUNT_PER_REQ).await;

// bigger batches which are fetched in a single go (with MAX_CONCURRENT_REQS requests)
test_batching(65, MAX_CONCURRENT_REQS, 9).await;
test_batching(128, MAX_CONCURRENT_REQS, 16).await;
test_batching(129, MAX_CONCURRENT_REQS, 17).await;
test_batching(256, MAX_CONCURRENT_REQS, 32).await;
test_batching(500, MAX_CONCURRENT_REQS, 63).await;
test_batching(512, MAX_CONCURRENT_REQS, 64).await;

// more than MAX_AMOUNT_PER_REQ batches
test_batching(520, 9, MAX_AMOUNT_PER_REQ).await;
test_batching(600, 10, MAX_AMOUNT_PER_REQ).await;
test_batching(1024, 16, MAX_AMOUNT_PER_REQ).await;
}

#[async_test]
async fn retry_on_missing_range() {
let (_p2p, mut p2p_mock) = P2p::mocked();
let mut gen = ExtendedHeaderGenerator::new();
let headers = gen.next_many(520);
let headers = gen.next_many(8);

let mut session = HeaderSession::new(1..=520, p2p_mock.cmd_tx.clone());
let mut session = HeaderSession::new(1..=8, p2p_mock.cmd_tx.clone());
let (result_tx, result_rx) = oneshot::channel();
spawn(async move {
let res = session.run().await;
result_tx.send(res).unwrap();
});

for i in (0..8).rev() {
let (height, amount, respond_to) =
p2p_mock.expect_header_request_for_height_cmd().await;
assert_eq!(height, 9 + 64 * i);
assert_eq!(amount, 64);
let start = (height - 1) as usize;
let end = start + amount as usize;
respond_to.send(Ok(headers[start..end].to_vec())).unwrap();
}

let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
assert_eq!(height, 1);
assert_eq!(amount, 8);
let start = (height - 1) as usize;
let end = start + amount as usize;
respond_to.send(Ok(headers[start..end].to_vec())).unwrap();
respond_to.send(Ok(headers[..6].to_vec())).unwrap();

let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
assert_eq!(height, 7);
assert_eq!(amount, 2);
respond_to.send(Ok(headers[6..8].to_vec())).unwrap();

p2p_mock.expect_no_cmd().await;

Expand All @@ -230,9 +282,9 @@ mod tests {
async fn not_found_is_not_fatal() {
let (_p2p, mut p2p_mock) = P2p::mocked();
let mut gen = ExtendedHeaderGenerator::new();
let headers = gen.next_many(64);
let headers = gen.next_many(8);

let mut session = HeaderSession::new(1..=64, p2p_mock.cmd_tx.clone());
let mut session = HeaderSession::new(1..=8, p2p_mock.cmd_tx.clone());
let (result_tx, result_rx) = oneshot::channel();
spawn(async move {
let res = session.run().await;
Expand All @@ -241,14 +293,14 @@ mod tests {

let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
assert_eq!(height, 1);
assert_eq!(amount, 64);
assert_eq!(amount, 8);
respond_to
.send(Err(P2pError::HeaderEx(HeaderExError::HeaderNotFound)))
.unwrap();

let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
assert_eq!(height, 1);
assert_eq!(amount, 64);
assert_eq!(amount, 8);
respond_to.send(Ok(headers.clone())).unwrap();

p2p_mock.expect_no_cmd().await;
Expand All @@ -261,7 +313,7 @@ mod tests {
async fn no_peers_is_fatal() {
let (_p2p, mut p2p_mock) = P2p::mocked();

let mut session = HeaderSession::new(1..=64, p2p_mock.cmd_tx.clone());
let mut session = HeaderSession::new(1..=8, p2p_mock.cmd_tx.clone());
let (result_tx, result_rx) = oneshot::channel();
spawn(async move {
let res = session.run().await;
Expand All @@ -270,7 +322,7 @@ mod tests {

let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
assert_eq!(height, 1);
assert_eq!(amount, 64);
assert_eq!(amount, 8);
respond_to.send(Err(P2pError::NoConnectedPeers)).unwrap();

p2p_mock.expect_no_cmd().await;
Expand Down
Loading
Loading