Skip to content

Commit

Permalink
syncing window estimate in calculations
Browse files Browse the repository at this point in the history
  • Loading branch information
fl0rek committed Jul 1, 2024
1 parent 82582af commit 0e3df13
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 38 deletions.
97 changes: 67 additions & 30 deletions node/src/store/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::ops::RangeInclusive;
use celestia_types::ExtendedHeader;

use crate::executor::yield_now;
use crate::store::header_ranges::HeaderRange;
use crate::store::header_ranges::{HeaderRange, RangeLengthExt};
use crate::store::{Result, StoreError};

pub(crate) const VALIDATIONS_PER_YIELD: usize = 4;
Expand All @@ -13,34 +13,48 @@ pub(crate) const VALIDATIONS_PER_YIELD: usize = 4;
pub(crate) fn calculate_range_to_fetch(
head_height: u64,
store_headers: &[RangeInclusive<u64>],
syncing_window_edge: Option<u64>,
limit: u64,
) -> HeaderRange {
let mut missing_range = get_most_recent_missing_range(head_height, store_headers);

// truncate to syncing window, if height is known
if let Some(window_edge) = syncing_window_edge {
if missing_range.start() < &window_edge {
missing_range = window_edge + 1..=*missing_range.end();
}
}

// truncate number of headers to limit
if missing_range.len() > limit {
let end = missing_range.end();
let start = end.saturating_sub(limit) + 1;
missing_range = start..=*end;
}

missing_range
}

fn get_most_recent_missing_range(
head_height: u64,
store_headers: &[RangeInclusive<u64>],
) -> HeaderRange {
let mut store_headers_iter = store_headers.iter().rev();

let Some(store_head_range) = store_headers_iter.next() else {
// empty store, just fetch from head
return head_height.saturating_sub(limit) + 1..=head_height;
// empty store, we're missing everything
return 1..=head_height;
};

if store_head_range.end() != &head_height {
// if we haven't caught up with network head, start from there
let fetch_start = u64::max(
store_head_range.end() + 1,
head_height.saturating_sub(limit) + 1,
);
return fetch_start..=head_height;
return store_head_range.end() + 1..=head_height;
}

// there exists a range contiguous with network head. inspect previous range end
let penultimate_range_end = store_headers_iter.next().map(|r| *r.end()).unwrap_or(0);

let fetch_end = store_head_range.start().saturating_sub(1);
let fetch_start = u64::max(
penultimate_range_end + 1,
fetch_end.saturating_sub(limit) + 1,
);

fetch_start..=fetch_end
penultimate_range_end + 1..=store_head_range.start().saturating_sub(1)
}

pub(crate) fn try_consolidate_ranges(
Expand Down Expand Up @@ -129,60 +143,83 @@ mod tests {
let head_height = 1024;
let ranges = [256..=512];

let fetch_range = calculate_range_to_fetch(head_height, &ranges, 16);
let fetch_range = calculate_range_to_fetch(head_height, &ranges, None, 16);
assert_eq!(fetch_range, 1009..=1024);

let fetch_range = calculate_range_to_fetch(head_height, &ranges, 511);
let fetch_range = calculate_range_to_fetch(head_height, &ranges, None, 511);
assert_eq!(fetch_range, 514..=1024);
let fetch_range = calculate_range_to_fetch(head_height, &ranges, 512);
let fetch_range = calculate_range_to_fetch(head_height, &ranges, None, 512);
assert_eq!(fetch_range, 513..=1024);
let fetch_range = calculate_range_to_fetch(head_height, &ranges, 513);
let fetch_range = calculate_range_to_fetch(head_height, &ranges, None, 513);
assert_eq!(fetch_range, 513..=1024);

let fetch_range = calculate_range_to_fetch(head_height, &ranges, 1024);
let fetch_range = calculate_range_to_fetch(head_height, &ranges, None, 1024);
assert_eq!(fetch_range, 513..=1024);
let fetch_range = calculate_range_to_fetch(head_height, &ranges, Some(900), 1024);
assert_eq!(fetch_range, 901..=1024);
}

#[test]
fn calculate_range_to_fetch_empty_store() {
let fetch_range = calculate_range_to_fetch(1, &[], 100);
let fetch_range = calculate_range_to_fetch(1, &[], None, 100);
assert_eq!(fetch_range, 1..=1);

let fetch_range = calculate_range_to_fetch(100, &[], 10);
let fetch_range = calculate_range_to_fetch(100, &[], None, 10);
assert_eq!(fetch_range, 91..=100);

let fetch_range = calculate_range_to_fetch(100, &[], Some(75), 50);
assert_eq!(fetch_range, 76..=100);
}

#[test]
fn calculate_range_to_fetch_fully_synced() {
let fetch_range = calculate_range_to_fetch(1, &[1..=1], 100);
let fetch_range = calculate_range_to_fetch(1, &[1..=1], None, 100);
assert!(fetch_range.is_empty());

let fetch_range = calculate_range_to_fetch(100, &[1..=100], 10);
let fetch_range = calculate_range_to_fetch(100, &[1..=100], None, 10);
assert!(fetch_range.is_empty());

let fetch_range = calculate_range_to_fetch(100, &[1..=100], Some(100), 10);
assert!(fetch_range.is_empty());
}

#[test]
fn calculate_range_to_fetch_caught_up() {
let head_height = 4000;

let fetch_range = calculate_range_to_fetch(head_height, &[3000..=4000], 500);
let fetch_range = calculate_range_to_fetch(head_height, &[3000..=4000], None, 500);
assert_eq!(fetch_range, 2500..=2999);
let fetch_range = calculate_range_to_fetch(head_height, &[500..=1000, 3000..=4000], 500);
let fetch_range = calculate_range_to_fetch(head_height, &[3000..=4000], Some(2600), 500);
assert_eq!(fetch_range, 2601..=2999);
let fetch_range =
calculate_range_to_fetch(head_height, &[500..=1000, 3000..=4000], None, 500);
assert_eq!(fetch_range, 2500..=2999);
let fetch_range = calculate_range_to_fetch(head_height, &[2500..=2800, 3000..=4000], 500);
let fetch_range =
calculate_range_to_fetch(head_height, &[2500..=2800, 3000..=4000], None, 500);
assert_eq!(fetch_range, 2801..=2999);
let fetch_range = calculate_range_to_fetch(head_height, &[300..=4000], 500);
let fetch_range =
calculate_range_to_fetch(head_height, &[2500..=2800, 3000..=4000], Some(2000), 500);
assert_eq!(fetch_range, 2801..=2999);
let fetch_range = calculate_range_to_fetch(head_height, &[300..=4000], None, 500);
assert_eq!(fetch_range, 1..=299);
let fetch_range = calculate_range_to_fetch(head_height, &[300..=4000], Some(2000), 500);
assert!(fetch_range.is_empty());
}

#[test]
fn calculate_range_to_fetch_catching_up() {
let head_height = 4000;

let fetch_range = calculate_range_to_fetch(head_height, &[2000..=3000], 500);
let fetch_range = calculate_range_to_fetch(head_height, &[2000..=3000], None, 500);
assert_eq!(fetch_range, 3501..=4000);
let fetch_range = calculate_range_to_fetch(head_height, &[1..=2998, 3000..=3800], 500);
let fetch_range = calculate_range_to_fetch(head_height, &[2000..=3000], Some(3600), 500);
assert_eq!(fetch_range, 3601..=4000);
let fetch_range =
calculate_range_to_fetch(head_height, &[1..=2998, 3000..=3800], None, 500);
assert_eq!(fetch_range, 3801..=4000);
let fetch_range =
calculate_range_to_fetch(head_height, &[1..=2998, 3000..=3800], Some(3900), 500);
assert_eq!(fetch_range, 3901..=4000);
}

#[test]
Expand Down
22 changes: 14 additions & 8 deletions node/src/syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ where
headers_tx: mpsc::Sender<Result<Vec<ExtendedHeader>, P2pError>>,
headers_rx: mpsc::Receiver<Result<Vec<ExtendedHeader>, P2pError>>,
ongoing_batch: Option<Ongoing>,
estimated_syncing_window_end: Option<u64>,
}

struct Ongoing {
Expand Down Expand Up @@ -206,6 +207,7 @@ where
headers_tx,
headers_rx,
ongoing_batch: None,
estimated_syncing_window_end: None,
})
}

Expand Down Expand Up @@ -437,6 +439,7 @@ where
let next_batch = calculate_range_to_fetch(
subjective_head_height,
store_ranges.as_ref(),
self.estimated_syncing_window_end,
MAX_HEADERS_IN_BATCH,
);

Expand All @@ -445,16 +448,10 @@ where
return;
}

let syncing_window_start = Time::now().checked_sub(SYNCING_WINDOW).unwrap_or_else(|| {
warn!("underflow when computing syncing window start, defaulting to unix epoch");
Time::unix_epoch()
});

// make sure we're inside the syncing window before we start
if let Ok(known_header) = self.store.get_by_height(next_batch.end() + 1).await {
if known_header.time().before(syncing_window_start) {
debug!("synced to the end of syncing window");
return;
if !in_syncing_window(&known_header) {
self.estimated_syncing_window_end = Some(known_header.height().value());
}
}

Expand Down Expand Up @@ -510,6 +507,15 @@ where
}
}

fn in_syncing_window(header: &ExtendedHeader) -> bool {
let syncing_window_start = Time::now().checked_sub(SYNCING_WINDOW).unwrap_or_else(|| {
warn!("underflow when computing syncing window start, defaulting to unix epoch");
Time::unix_epoch()
});

header.time().after(syncing_window_start)
}

async fn try_init<S>(p2p: &P2p, store: &S) -> Result<u64>
where
S: Store,
Expand Down

0 comments on commit 0e3df13

Please sign in to comment.