Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(node): Add syncing window for header sync #309

Merged
merged 6 commits into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 41 additions & 9 deletions node/src/p2p/header_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::p2p::{P2pCmd, P2pError};
use crate::store::header_ranges::{HeaderRange, RangeLengthExt};

const MAX_AMOUNT_PER_REQ: u64 = 64;
const MAX_CONCURRENT_REQS: usize = 1;
const MAX_CONCURRENT_REQS: usize = 8;
oblique marked this conversation as resolved.
Show resolved Hide resolved

type Result<T, E = P2pError> = std::result::Result<T, E>;

Expand Down Expand Up @@ -139,16 +139,18 @@ impl HeaderSession {
}
}

/// take a next batch of up to `limit` headers from the front of the `range_to_fetch`
fn take_next_batch(range_to_fetch: &mut Option<HeaderRange>, limit: u64) -> Option<HeaderRange> {
// calculate potential end before we modify range_to_fetch
// calculate potential end offset before we modify range_to_fetch
let end_offset = limit.checked_sub(1)?;

let to_fetch = range_to_fetch.take()?;
if to_fetch.len() <= limit {
Some(to_fetch)
} else {
let _ = range_to_fetch.insert(*to_fetch.start() + limit..=*to_fetch.end());
Some(*to_fetch.start()..=*to_fetch.start() + end_offset)
// to_fetch.len() > limit, we shouldn't underflow here
let _ = range_to_fetch.insert(*to_fetch.start()..=*to_fetch.end() - limit);
Some(*to_fetch.end() - end_offset..=*to_fetch.end())
}
}

Expand Down Expand Up @@ -201,18 +203,18 @@ mod tests {
result_tx.send(res).unwrap();
});

for i in 0..8 {
for i in (0..8).rev() {
let (height, amount, respond_to) =
p2p_mock.expect_header_request_for_height_cmd().await;
assert_eq!(height, 1 + 64 * i);
assert_eq!(height, 9 + 64 * i);
assert_eq!(amount, 64);
let start = (height - 1) as usize;
let end = start + amount as usize;
respond_to.send(Ok(headers[start..end].to_vec())).unwrap();
}

let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
assert_eq!(height, 513);
assert_eq!(height, 1);
assert_eq!(amount, 8);
let start = (height - 1) as usize;
let end = start + amount as usize;
Expand Down Expand Up @@ -299,8 +301,38 @@ mod tests {
fn take_next_batch_truncated_batch() {
let mut range_to_fetch = Some(1..=10);
let batch = take_next_batch(&mut range_to_fetch, 5);
assert_eq!(batch, Some(1..=5));
assert_eq!(range_to_fetch, Some(6..=10));
assert_eq!(batch, Some(6..=10));
assert_eq!(range_to_fetch, Some(1..=5));
}

#[test]
fn take_next_batch_truncated_calc() {
let mut range_to_fetch = Some(1..=512);

let batch = take_next_batch(&mut range_to_fetch, 64);
assert_eq!(batch, Some(449..=512));
assert_eq!(range_to_fetch, Some(1..=448));
let batch = take_next_batch(&mut range_to_fetch, 64);
assert_eq!(batch, Some(385..=448));
assert_eq!(range_to_fetch, Some(1..=384));
let batch = take_next_batch(&mut range_to_fetch, 64);
assert_eq!(batch, Some(321..=384));
assert_eq!(range_to_fetch, Some(1..=320));
let batch = take_next_batch(&mut range_to_fetch, 64);
assert_eq!(batch, Some(257..=320));
assert_eq!(range_to_fetch, Some(1..=256));
let batch = take_next_batch(&mut range_to_fetch, 64);
assert_eq!(batch, Some(193..=256));
assert_eq!(range_to_fetch, Some(1..=192));
let batch = take_next_batch(&mut range_to_fetch, 64);
assert_eq!(batch, Some(129..=192));
assert_eq!(range_to_fetch, Some(1..=128));
let batch = take_next_batch(&mut range_to_fetch, 64);
assert_eq!(batch, Some(65..=128));
assert_eq!(range_to_fetch, Some(1..=64));
let batch = take_next_batch(&mut range_to_fetch, 64);
assert_eq!(batch, Some(1..=64));
assert_eq!(range_to_fetch, None);
}

#[test]
Expand Down
97 changes: 67 additions & 30 deletions node/src/store/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::ops::RangeInclusive;
use celestia_types::ExtendedHeader;

use crate::executor::yield_now;
use crate::store::header_ranges::HeaderRange;
use crate::store::header_ranges::{HeaderRange, RangeLengthExt};
use crate::store::{Result, StoreError};

pub(crate) const VALIDATIONS_PER_YIELD: usize = 4;
Expand All @@ -13,34 +13,48 @@ pub(crate) const VALIDATIONS_PER_YIELD: usize = 4;
pub(crate) fn calculate_range_to_fetch(
head_height: u64,
store_headers: &[RangeInclusive<u64>],
syncing_window_edge: Option<u64>,
limit: u64,
) -> HeaderRange {
let mut missing_range = get_most_recent_missing_range(head_height, store_headers);

// truncate to syncing window, if height is known
if let Some(window_edge) = syncing_window_edge {
if missing_range.start() < &window_edge {
missing_range = window_edge + 1..=*missing_range.end();
}
}

// truncate number of headers to limit
if missing_range.len() > limit {
let end = missing_range.end();
let start = end.saturating_sub(limit) + 1;
missing_range = start..=*end;
}

missing_range
}

fn get_most_recent_missing_range(
head_height: u64,
store_headers: &[RangeInclusive<u64>],
) -> HeaderRange {
let mut store_headers_iter = store_headers.iter().rev();

let Some(store_head_range) = store_headers_iter.next() else {
// empty store, just fetch from head
return head_height.saturating_sub(limit) + 1..=head_height;
// empty store, we're missing everything
return 1..=head_height;
};

if store_head_range.end() != &head_height {
oblique marked this conversation as resolved.
Show resolved Hide resolved
// if we haven't caught up with network head, start from there
let fetch_start = u64::max(
store_head_range.end() + 1,
head_height.saturating_sub(limit) + 1,
);
return fetch_start..=head_height;
return store_head_range.end() + 1..=head_height;
}

// there exists a range contiguous with network head. inspect previous range end
let penultimate_range_end = store_headers_iter.next().map(|r| *r.end()).unwrap_or(0);

let fetch_end = store_head_range.start().saturating_sub(1);
let fetch_start = u64::max(
penultimate_range_end + 1,
fetch_end.saturating_sub(limit) + 1,
);

fetch_start..=fetch_end
penultimate_range_end + 1..=store_head_range.start().saturating_sub(1)
}

pub(crate) fn try_consolidate_ranges(
Expand Down Expand Up @@ -129,60 +143,83 @@ mod tests {
let head_height = 1024;
let ranges = [256..=512];

let fetch_range = calculate_range_to_fetch(head_height, &ranges, 16);
let fetch_range = calculate_range_to_fetch(head_height, &ranges, None, 16);
assert_eq!(fetch_range, 1009..=1024);

let fetch_range = calculate_range_to_fetch(head_height, &ranges, 511);
let fetch_range = calculate_range_to_fetch(head_height, &ranges, None, 511);
assert_eq!(fetch_range, 514..=1024);
let fetch_range = calculate_range_to_fetch(head_height, &ranges, 512);
let fetch_range = calculate_range_to_fetch(head_height, &ranges, None, 512);
assert_eq!(fetch_range, 513..=1024);
let fetch_range = calculate_range_to_fetch(head_height, &ranges, 513);
let fetch_range = calculate_range_to_fetch(head_height, &ranges, None, 513);
assert_eq!(fetch_range, 513..=1024);

let fetch_range = calculate_range_to_fetch(head_height, &ranges, 1024);
let fetch_range = calculate_range_to_fetch(head_height, &ranges, None, 1024);
assert_eq!(fetch_range, 513..=1024);
let fetch_range = calculate_range_to_fetch(head_height, &ranges, Some(900), 1024);
assert_eq!(fetch_range, 901..=1024);
}

#[test]
fn calculate_range_to_fetch_empty_store() {
let fetch_range = calculate_range_to_fetch(1, &[], 100);
let fetch_range = calculate_range_to_fetch(1, &[], None, 100);
assert_eq!(fetch_range, 1..=1);

let fetch_range = calculate_range_to_fetch(100, &[], 10);
let fetch_range = calculate_range_to_fetch(100, &[], None, 10);
assert_eq!(fetch_range, 91..=100);

let fetch_range = calculate_range_to_fetch(100, &[], Some(75), 50);
assert_eq!(fetch_range, 76..=100);
}

#[test]
fn calculate_range_to_fetch_fully_synced() {
let fetch_range = calculate_range_to_fetch(1, &[1..=1], 100);
let fetch_range = calculate_range_to_fetch(1, &[1..=1], None, 100);
assert!(fetch_range.is_empty());

let fetch_range = calculate_range_to_fetch(100, &[1..=100], 10);
let fetch_range = calculate_range_to_fetch(100, &[1..=100], None, 10);
assert!(fetch_range.is_empty());

let fetch_range = calculate_range_to_fetch(100, &[1..=100], Some(100), 10);
assert!(fetch_range.is_empty());
}

#[test]
fn calculate_range_to_fetch_caught_up() {
let head_height = 4000;

let fetch_range = calculate_range_to_fetch(head_height, &[3000..=4000], 500);
let fetch_range = calculate_range_to_fetch(head_height, &[3000..=4000], None, 500);
assert_eq!(fetch_range, 2500..=2999);
let fetch_range = calculate_range_to_fetch(head_height, &[500..=1000, 3000..=4000], 500);
let fetch_range = calculate_range_to_fetch(head_height, &[3000..=4000], Some(2600), 500);
assert_eq!(fetch_range, 2601..=2999);
let fetch_range =
calculate_range_to_fetch(head_height, &[500..=1000, 3000..=4000], None, 500);
assert_eq!(fetch_range, 2500..=2999);
let fetch_range = calculate_range_to_fetch(head_height, &[2500..=2800, 3000..=4000], 500);
let fetch_range =
calculate_range_to_fetch(head_height, &[2500..=2800, 3000..=4000], None, 500);
assert_eq!(fetch_range, 2801..=2999);
let fetch_range = calculate_range_to_fetch(head_height, &[300..=4000], 500);
let fetch_range =
calculate_range_to_fetch(head_height, &[2500..=2800, 3000..=4000], Some(2000), 500);
assert_eq!(fetch_range, 2801..=2999);
let fetch_range = calculate_range_to_fetch(head_height, &[300..=4000], None, 500);
assert_eq!(fetch_range, 1..=299);
let fetch_range = calculate_range_to_fetch(head_height, &[300..=4000], Some(2000), 500);
assert!(fetch_range.is_empty());
}

#[test]
fn calculate_range_to_fetch_catching_up() {
let head_height = 4000;

let fetch_range = calculate_range_to_fetch(head_height, &[2000..=3000], 500);
let fetch_range = calculate_range_to_fetch(head_height, &[2000..=3000], None, 500);
assert_eq!(fetch_range, 3501..=4000);
let fetch_range = calculate_range_to_fetch(head_height, &[1..=2998, 3000..=3800], 500);
let fetch_range = calculate_range_to_fetch(head_height, &[2000..=3000], Some(3600), 500);
assert_eq!(fetch_range, 3601..=4000);
let fetch_range =
calculate_range_to_fetch(head_height, &[1..=2998, 3000..=3800], None, 500);
assert_eq!(fetch_range, 3801..=4000);
let fetch_range =
calculate_range_to_fetch(head_height, &[1..=2998, 3000..=3800], Some(3900), 500);
assert_eq!(fetch_range, 3901..=4000);
}

#[test]
Expand Down
74 changes: 74 additions & 0 deletions node/src/syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use std::time::Duration;

use backoff::backoff::Backoff;
use backoff::ExponentialBackoffBuilder;
use celestia_tendermint::Time;
use celestia_types::ExtendedHeader;
use futures::FutureExt;
use serde::{Deserialize, Serialize};
Expand All @@ -36,6 +37,7 @@ type Result<T, E = SyncerError> = std::result::Result<T, E>;

const MAX_HEADERS_IN_BATCH: u64 = 512;
const TRY_INIT_BACKOFF_MAX_INTERVAL: Duration = Duration::from_secs(60);
const SYNCING_WINDOW: Duration = Duration::from_secs(30 * 24 * 60 * 60); // 30 days

/// Representation of all the errors that can occur when interacting with the [`Syncer`].
#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -180,6 +182,7 @@ where
headers_tx: mpsc::Sender<(Result<Vec<ExtendedHeader>, P2pError>, Duration)>,
headers_rx: mpsc::Receiver<(Result<Vec<ExtendedHeader>, P2pError>, Duration)>,
ongoing_batch: Option<Ongoing>,
estimated_syncing_window_end: Option<u64>,
}

struct Ongoing {
Expand Down Expand Up @@ -210,6 +213,7 @@ where
headers_tx,
headers_rx,
ongoing_batch: None,
estimated_syncing_window_end: None,
})
}

Expand Down Expand Up @@ -466,6 +470,7 @@ where
let next_batch = calculate_range_to_fetch(
subjective_head_height,
store_ranges.as_ref(),
self.estimated_syncing_window_end,
MAX_HEADERS_IN_BATCH,
);

Expand All @@ -474,6 +479,14 @@ where
return;
}

// make sure we're inside the syncing window before we start
if let Ok(known_header) = self.store.get_by_height(next_batch.end() + 1).await {
if !in_syncing_window(&known_header) {
self.estimated_syncing_window_end = Some(known_header.height().value());
return;
}
}

self.event_pub.send(NodeEvent::FetchingHeadersStarted {
from_height: *next_batch.start(),
to_height: *next_batch.end(),
Expand Down Expand Up @@ -540,6 +553,15 @@ where
}
}

fn in_syncing_window(header: &ExtendedHeader) -> bool {
let syncing_window_start = Time::now().checked_sub(SYNCING_WINDOW).unwrap_or_else(|| {
warn!("underflow when computing syncing window start, defaulting to unix epoch");
Time::unix_epoch()
});

header.time().after(syncing_window_start)
}

async fn try_init<S>(p2p: &P2p, store: &S) -> Result<u64>
where
S: Store,
Expand Down Expand Up @@ -713,6 +735,58 @@ mod tests {
p2p_mock.expect_no_cmd().await;
}

#[async_test]
async fn syncing_window_edge() {
let month_and_day_ago = Duration::from_secs(31 * 24 * 60 * 60);
let mut gen = ExtendedHeaderGenerator::new();
gen.set_time((Time::now() - month_and_day_ago).expect("to not underflow"));
let mut headers = gen.next_many(1200);
gen.set_time(Time::now());
headers.append(&mut gen.next_many(2049 - 1200));

let (syncer, store, mut p2p_mock) = initialized_syncer(headers[2048].clone()).await;
assert_syncing(&syncer, &store, &[2049..=2049], 2049).await;

// Syncer requested the first batch ([1537..=2048])
handle_session_batch(
&mut p2p_mock,
&headers,
vec![
(1537, 64),
(1601, 64),
(1665, 64),
(1729, 64),
(1793, 64),
(1857, 64),
(1921, 64),
(1985, 64),
],
)
.await;
assert_syncing(&syncer, &store, &[1537..=2049], 2049).await;

// Syncer requested the second batch ([1025, 1536]) hitting the syncing window
handle_session_batch(
&mut p2p_mock,
&headers,
vec![
(1025, 64),
(1089, 64),
(1153, 64),
(1217, 64),
(1281, 64),
(1345, 64),
(1409, 64),
(1473, 64),
],
)
.await;
assert_syncing(&syncer, &store, &[1025..=2049], 2049).await;

// Syncer is fully synced and awaiting for events
p2p_mock.expect_no_cmd().await;
}

#[async_test]
async fn start_with_filled_store() {
let events = EventChannel::new();
Expand Down
Loading
Loading