Skip to content

Commit

Permalink
feat(node): Add syncing window for header sync
Browse files Browse the repository at this point in the history
  • Loading branch information
fl0rek committed Jun 27, 2024
1 parent cd2afb1 commit ea7d372
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 22 deletions.
50 changes: 41 additions & 9 deletions node/src/p2p/header_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::p2p::{P2pCmd, P2pError};
use crate::store::header_ranges::{HeaderRange, RangeLengthExt};

const MAX_AMOUNT_PER_REQ: u64 = 64;
const MAX_CONCURRENT_REQS: usize = 1;
const MAX_CONCURRENT_REQS: usize = 8;

type Result<T, E = P2pError> = std::result::Result<T, E>;

Expand Down Expand Up @@ -139,16 +139,18 @@ impl HeaderSession {
}
}

/// take a next batch of up to `limit` headers from the front of the `range_to_fetch`
fn take_next_batch(range_to_fetch: &mut Option<HeaderRange>, limit: u64) -> Option<HeaderRange> {
// calculate potential end before we modify range_to_fetch
// calculate potential end offset before we modify range_to_fetch
let end_offset = limit.checked_sub(1)?;

let to_fetch = range_to_fetch.take()?;
if to_fetch.len() <= limit {
Some(to_fetch)
} else {
let _ = range_to_fetch.insert(*to_fetch.start() + limit..=*to_fetch.end());
Some(*to_fetch.start()..=*to_fetch.start() + end_offset)
// to_fetch.len() > limit, we shouldn't underflow here
let _ = range_to_fetch.insert(*to_fetch.start()..=*to_fetch.end() - limit);
Some(*to_fetch.end() - end_offset..=*to_fetch.end())
}
}

Expand Down Expand Up @@ -201,18 +203,18 @@ mod tests {
result_tx.send(res).unwrap();
});

for i in 0..8 {
for i in (0..8).rev() {
let (height, amount, respond_to) =
p2p_mock.expect_header_request_for_height_cmd().await;
assert_eq!(height, 1 + 64 * i);
assert_eq!(height, 9 + 64 * i);
assert_eq!(amount, 64);
let start = (height - 1) as usize;
let end = start + amount as usize;
respond_to.send(Ok(headers[start..end].to_vec())).unwrap();
}

let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
assert_eq!(height, 513);
assert_eq!(height, 1);
assert_eq!(amount, 8);
let start = (height - 1) as usize;
let end = start + amount as usize;
Expand Down Expand Up @@ -299,8 +301,38 @@ mod tests {
fn take_next_batch_truncated_batch() {
let mut range_to_fetch = Some(1..=10);
let batch = take_next_batch(&mut range_to_fetch, 5);
assert_eq!(batch, Some(1..=5));
assert_eq!(range_to_fetch, Some(6..=10));
assert_eq!(batch, Some(6..=10));
assert_eq!(range_to_fetch, Some(1..=5));
}

#[test]
fn take_next_batch_truncated_calc() {
let mut range_to_fetch = Some(1..=512);

let batch = take_next_batch(&mut range_to_fetch, 64);
assert_eq!(batch, Some(449..=512));
assert_eq!(range_to_fetch, Some(1..=448));
let batch = take_next_batch(&mut range_to_fetch, 64);
assert_eq!(batch, Some(385..=448));
assert_eq!(range_to_fetch, Some(1..=384));
let batch = take_next_batch(&mut range_to_fetch, 64);
assert_eq!(batch, Some(321..=384));
assert_eq!(range_to_fetch, Some(1..=320));
let batch = take_next_batch(&mut range_to_fetch, 64);
assert_eq!(batch, Some(257..=320));
assert_eq!(range_to_fetch, Some(1..=256));
let batch = take_next_batch(&mut range_to_fetch, 64);
assert_eq!(batch, Some(193..=256));
assert_eq!(range_to_fetch, Some(1..=192));
let batch = take_next_batch(&mut range_to_fetch, 64);
assert_eq!(batch, Some(129..=192));
assert_eq!(range_to_fetch, Some(1..=128));
let batch = take_next_batch(&mut range_to_fetch, 64);
assert_eq!(batch, Some(65..=128));
assert_eq!(range_to_fetch, Some(1..=64));
let batch = take_next_batch(&mut range_to_fetch, 64);
assert_eq!(batch, Some(1..=64));
assert_eq!(range_to_fetch, None);
}

#[test]
Expand Down
67 changes: 67 additions & 0 deletions node/src/syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use std::time::Duration;

use backoff::backoff::Backoff;
use backoff::ExponentialBackoffBuilder;
use celestia_tendermint::Time;
use celestia_types::ExtendedHeader;
use futures::FutureExt;
use serde::Serialize;
Expand All @@ -34,6 +35,7 @@ type Result<T, E = SyncerError> = std::result::Result<T, E>;

const MAX_HEADERS_IN_BATCH: u64 = 512;
const TRY_INIT_BACKOFF_MAX_INTERVAL: Duration = Duration::from_secs(60);
const SYNCING_WINDOW: Duration = Duration::from_secs(30 * 24 * 60 * 60); // 30 days

/// Representation of all the errors that can occur when interacting with the [`Syncer`].
#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -443,6 +445,19 @@ where
return;
}

let syncing_window_start = Time::now().checked_sub(SYNCING_WINDOW).unwrap_or_else(|| {
warn!("underflow when computing syncing window start, defaulting to unix epoch");
Time::unix_epoch()
});

// make sure we're inside the syncing window before we start
if let Ok(known_header) = self.store.get_by_height(next_batch.end() + 1).await {
if known_header.time().before(syncing_window_start) {
debug!("synced to the end of syncing window");
return;
}
}

if self.p2p.peer_tracker_info().num_connected_peers == 0 {
// No connected peers. We can't do the request.
// This will be recovered by `run`.
Expand Down Expand Up @@ -665,6 +680,58 @@ mod tests {
p2p_mock.expect_no_cmd().await;
}

#[async_test]
async fn syncing_window_edge() {
let month_and_day_ago = Duration::from_secs(31 * 24 * 60 * 60);
let mut gen = ExtendedHeaderGenerator::new();
gen.set_time((Time::now() - month_and_day_ago).expect("to not underflow"));
let mut headers = gen.next_many(1200);
gen.set_time(Time::now());
headers.append(&mut gen.next_many(2049 - 1200));

let (syncer, store, mut p2p_mock) = initialized_syncer(headers[2048].clone()).await;
assert_syncing(&syncer, &store, &[2049..=2049], 2049).await;

// Syncer requested the first batch ([1537..=2048])
handle_session_batch(
&mut p2p_mock,
&headers,
vec![
(1537, 64),
(1601, 64),
(1665, 64),
(1729, 64),
(1793, 64),
(1857, 64),
(1921, 64),
(1985, 64),
],
)
.await;
assert_syncing(&syncer, &store, &[1537..=2049], 2049).await;

// Syncer requested the second batch ([1025, 2049]) hitting the syncing window
handle_session_batch(
&mut p2p_mock,
&headers,
vec![
(1025, 64),
(1089, 64),
(1153, 64),
(1217, 64),
(1281, 64),
(1345, 64),
(1409, 64),
(1473, 64),
],
)
.await;
assert_syncing(&syncer, &store, &[1025..=2049], 2049).await;

// Syncer is fulling synced and awaiting for events
p2p_mock.expect_no_cmd().await;
}

#[async_test]
async fn start_with_filled_store() {
let (p2p, mut p2p_mock) = P2p::mocked();
Expand Down
61 changes: 48 additions & 13 deletions types/src/test_utils.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//! Utilities for writing tests.
use std::time::Duration;

use celestia_tendermint::block::header::{Header, Version};
use celestia_tendermint::block::{parts, Commit, CommitSig};
Expand All @@ -23,6 +24,7 @@ pub struct ExtendedHeaderGenerator {
chain_id: chain::Id,
key: SigningKey,
current_header: Option<ExtendedHeader>,
spoofed_time: Option<Time>,
}

impl ExtendedHeaderGenerator {
Expand All @@ -35,6 +37,7 @@ impl ExtendedHeaderGenerator {
chain_id,
key,
current_header: None,
spoofed_time: None,
}
}

Expand All @@ -53,7 +56,13 @@ impl ExtendedHeaderGenerator {
gen.current_header = if prev_height == 0 {
None
} else {
Some(generate_new(prev_height, &gen.chain_id, &gen.key, None))
Some(generate_new(
prev_height,
&gen.chain_id,
Time::now(),
&gen.key,
None,
))
};

gen
Expand All @@ -69,9 +78,10 @@ impl ExtendedHeaderGenerator {
/// ```
#[allow(clippy::should_implement_trait)]
pub fn next(&mut self) -> ExtendedHeader {
let time = self.get_and_increment_time();
let header = match self.current_header {
Some(ref header) => generate_next(1, header, &self.key, None),
None => generate_new(GENESIS_HEIGHT, &self.chain_id, &self.key, None),
Some(ref header) => generate_next(1, header, time, &self.key, None),
None => generate_new(GENESIS_HEIGHT, &self.chain_id, time, &self.key, None),
};

self.current_header = Some(header.clone());
Expand All @@ -91,9 +101,10 @@ impl ExtendedHeaderGenerator {
/// ```
#[allow(clippy::should_implement_trait)]
pub fn next_with_dah(&mut self, dah: DataAvailabilityHeader) -> ExtendedHeader {
let time = self.get_and_increment_time();
let header = match self.current_header {
Some(ref header) => generate_next(1, header, &self.key, Some(dah)),
None => generate_new(GENESIS_HEIGHT, &self.chain_id, &self.key, Some(dah)),
Some(ref header) => generate_next(1, header, time, &self.key, Some(dah)),
None => generate_new(GENESIS_HEIGHT, &self.chain_id, time, &self.key, Some(dah)),
};

self.current_header = Some(header.clone());
Expand Down Expand Up @@ -128,7 +139,8 @@ impl ExtendedHeaderGenerator {
///
/// This method does not change the state of `ExtendedHeaderGenerator`.
pub fn next_of(&self, header: &ExtendedHeader) -> ExtendedHeader {
generate_next(1, header, &self.key, None)
let time = self.spoofed_time.unwrap_or_else(Time::now);
generate_next(1, header, time, &self.key, None)
}

/// Generates the next header of the provided header with the given [`DataAvailabilityHeader`].
Expand All @@ -155,7 +167,8 @@ impl ExtendedHeaderGenerator {
header: &ExtendedHeader,
dah: DataAvailabilityHeader,
) -> ExtendedHeader {
generate_next(1, header, &self.key, Some(dah))
let time = self.spoofed_time.unwrap_or_else(Time::now);
generate_next(1, header, time, &self.key, Some(dah))
}

/// Generates the next amount of headers of the provided header.
Expand Down Expand Up @@ -220,9 +233,11 @@ impl ExtendedHeaderGenerator {
return;
}

let time = self.get_and_increment_time();

let header = match self.current_header {
Some(ref header) => generate_next(amount, header, &self.key, None),
None => generate_new(amount, &self.chain_id, &self.key, None),
Some(ref header) => generate_next(amount, header, time, &self.key, None),
None => generate_new(amount, &self.chain_id, time, &self.key, None),
};

self.current_header = Some(header.clone());
Expand Down Expand Up @@ -250,6 +265,24 @@ impl ExtendedHeaderGenerator {
pub fn fork(&self) -> ExtendedHeaderGenerator {
self.clone()
}

/// Change header generator time. Headers generated from now on will have `time` as creation
/// time.
pub fn set_time(&mut self, time: Time) {
self.spoofed_time = Some(time);
}

// private function which also increments time, since we cannot have multiple headers on the
// exact same timestamp
fn get_and_increment_time(&mut self) -> Time {
let Some(spoofed_time) = self.spoofed_time.take() else {
return Time::now();
};

self.spoofed_time =
Some((spoofed_time + Duration::from_millis(1)).expect("not to overflow"));
self.spoofed_time.unwrap()
}
}

impl Default for ExtendedHeaderGenerator {
Expand Down Expand Up @@ -338,6 +371,7 @@ pub(crate) fn random_bytes(len: usize) -> Vec<u8> {
fn generate_new(
height: u64,
chain_id: &chain::Id,
time: Time,
signing_key: &SigningKey,
dah: Option<DataAvailabilityHeader>,
) -> ExtendedHeader {
Expand Down Expand Up @@ -365,7 +399,7 @@ fn generate_new(
},
chain_id: chain_id.clone(),
height: height.try_into().unwrap(),
time: Time::now(),
time,
last_block_id,
last_commit_hash: Hash::default_sha256(),
data_hash: Hash::None,
Expand All @@ -391,7 +425,7 @@ fn generate_new(
},
signatures: vec![CommitSig::BlockIdFlagCommit {
validator_address,
timestamp: Time::now(),
timestamp: time,
signature: None,
}],
},
Expand Down Expand Up @@ -423,6 +457,7 @@ fn generate_new(
fn generate_next(
increment: u64,
current: &ExtendedHeader,
time: Time,
signing_key: &SigningKey,
dah: Option<DataAvailabilityHeader>,
) -> ExtendedHeader {
Expand All @@ -449,7 +484,7 @@ fn generate_next(
version: current.header.version,
chain_id: current.header.chain_id.clone(),
height,
time: Time::now(),
time,
last_block_id,
last_commit_hash: Hash::default_sha256(),
data_hash: Hash::None,
Expand All @@ -475,7 +510,7 @@ fn generate_next(
},
signatures: vec![CommitSig::BlockIdFlagCommit {
validator_address,
timestamp: Time::now(),
timestamp: time,
signature: None,
}],
},
Expand Down

0 comments on commit ea7d372

Please sign in to comment.