Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add range sync metrics to track efficiency #6095

Merged
merged 2 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions beacon_node/network/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,36 @@ lazy_static! {
"Number of Syncing chains in range, per range type",
&["range_type"]
);
pub static ref SYNCING_CHAINS_REMOVED: Result<IntCounterVec> = try_create_int_counter_vec(
"sync_range_removed_chains_total",
"Total count of range syncing chains removed per range type",
&["range_type"]
);
pub static ref SYNCING_CHAINS_ADDED: Result<IntCounterVec> = try_create_int_counter_vec(
"sync_range_added_chains_total",
"Total count of range syncing chains added per range type",
&["range_type"]
);
pub static ref SYNCING_CHAINS_DROPPED_BLOCKS: Result<IntCounterVec> = try_create_int_counter_vec(
"sync_range_chains_dropped_blocks_total",
"Total count of dropped blocks when removing a syncing chain per range type",
&["range_type"]
);
pub static ref SYNCING_CHAINS_IGNORED_BLOCKS: Result<IntCounterVec> = try_create_int_counter_vec(
"sync_range_chains_ignored_blocks_total",
"Total count of ignored blocks when processing a syncing chain batch per chain type",
&["chain_type"]
);
pub static ref SYNCING_CHAINS_PROCESSED_BATCHES: Result<IntCounterVec> = try_create_int_counter_vec(
"sync_range_chains_processed_batches_total",
"Total count of processed batches in a syncing chain batch per chain type",
&["chain_type"]
);
pub static ref SYNCING_CHAIN_BATCH_AWAITING_PROCESSING: Result<Histogram> = try_create_histogram_with_buckets(
"sync_range_chain_batch_awaiting_processing_seconds",
"Time range sync batches spend in AwaitingProcessing state",
Ok(vec![0.01,0.02,0.05,0.1,0.2,0.5,1.0,2.0,5.0,10.0,20.0])
);
pub static ref SYNC_SINGLE_BLOCK_LOOKUPS: Result<IntGauge> = try_create_int_gauge(
"sync_single_block_lookups",
"Number of single block lookups underway"
Expand Down
14 changes: 8 additions & 6 deletions beacon_node/network/src/network_beacon_processor/sync_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
.process_blocks(downloaded_blocks.iter(), notify_execution_layer)
.await
{
(_, Ok(_)) => {
(imported_blocks, Ok(_)) => {
debug!(self.log, "Batch processed";
"batch_epoch" => epoch,
"first_block_slot" => start_slot,
Expand All @@ -335,7 +335,8 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"processed_blocks" => sent_blocks,
"service"=> "sync");
BatchProcessResult::Success {
was_non_empty: sent_blocks > 0,
sent_blocks,
imported_blocks,
}
}
(imported_blocks, Err(e)) => {
Expand All @@ -349,7 +350,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"service" => "sync");
match e.peer_action {
Some(penalty) => BatchProcessResult::FaultyFailure {
imported_blocks: imported_blocks > 0,
imported_blocks,
penalty,
},
None => BatchProcessResult::NonFaultyFailure,
Expand All @@ -368,7 +369,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
.sum::<usize>();

match self.process_backfill_blocks(downloaded_blocks) {
(_, Ok(_)) => {
(imported_blocks, Ok(_)) => {
debug!(self.log, "Backfill batch processed";
"batch_epoch" => epoch,
"first_block_slot" => start_slot,
Expand All @@ -377,7 +378,8 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"processed_blobs" => n_blobs,
"service"=> "sync");
BatchProcessResult::Success {
was_non_empty: sent_blocks > 0,
sent_blocks,
imported_blocks,
}
}
(_, Err(e)) => {
Expand All @@ -390,7 +392,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"service" => "sync");
match e.peer_action {
Some(penalty) => BatchProcessResult::FaultyFailure {
imported_blocks: false,
imported_blocks: 0,
penalty,
},
None => BatchProcessResult::NonFaultyFailure,
Expand Down
10 changes: 6 additions & 4 deletions beacon_node/network/src/sync/backfill_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
// result callback. This is done, because an empty batch could end a chain and the logic
// for removing chains and checking completion is in the callback.

let blocks = match batch.start_processing() {
let (blocks, _) = match batch.start_processing() {
Err(e) => {
return self
.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))
Expand Down Expand Up @@ -615,13 +615,15 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
"batch_epoch" => batch_id, "peer" => %peer, "client" => %network.client_type(&peer));

match result {
BatchProcessResult::Success { was_non_empty } => {
BatchProcessResult::Success {
imported_blocks, ..
} => {
if let Err(e) = batch.processing_completed(BatchProcessingResult::Success) {
self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))?;
}
// If the processed batch was not empty, we can validate previous unvalidated
// blocks.
if *was_non_empty {
if *imported_blocks > 0 {
self.advance_chain(network, batch_id);
}

Expand Down Expand Up @@ -677,7 +679,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {

Ok(BatchOperationOutcome::Continue) => {
// chain can continue. Check if it can be progressed
if *imported_blocks {
if *imported_blocks > 0 {
// At least one block was successfully verified and imported, then we can be sure all
// previous batches are valid and we only need to download the current failed
// batch.
Expand Down
5 changes: 3 additions & 2 deletions beacon_node/network/src/sync/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,12 @@ pub enum BlockProcessingResult<E: EthSpec> {
pub enum BatchProcessResult {
/// The batch was completed successfully. It carries whether the sent batch contained blocks.
Success {
was_non_empty: bool,
sent_blocks: usize,
imported_blocks: usize,
},
/// The batch processing failed. It carries whether the processing imported any block.
FaultyFailure {
imported_blocks: bool,
imported_blocks: usize,
penalty: PeerAction,
},
NonFaultyFailure,
Expand Down
28 changes: 21 additions & 7 deletions beacon_node/network/src/sync/range_sync/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use lighthouse_network::PeerId;
use std::collections::HashSet;
use std::hash::{Hash, Hasher};
use std::ops::Sub;
use std::time::{Duration, Instant};
use strum::Display;
use types::{Epoch, EthSpec, Slot};

Expand Down Expand Up @@ -118,7 +119,7 @@ pub enum BatchState<E: EthSpec> {
/// The batch is being downloaded.
Downloading(PeerId, Id),
/// The batch has been completely downloaded and is ready for processing.
AwaitingProcessing(PeerId, Vec<RpcBlock<E>>),
AwaitingProcessing(PeerId, Vec<RpcBlock<E>>, Instant),
/// The batch is being processed.
Processing(Attempt),
/// The batch was successfully processed and is waiting to be validated.
Expand Down Expand Up @@ -210,13 +211,26 @@ impl<E: EthSpec, B: BatchConfig> BatchInfo<E, B> {
match &self.state {
BatchState::AwaitingDownload | BatchState::Failed => None,
BatchState::Downloading(peer_id, _)
| BatchState::AwaitingProcessing(peer_id, _)
| BatchState::AwaitingProcessing(peer_id, _, _)
| BatchState::Processing(Attempt { peer_id, .. })
| BatchState::AwaitingValidation(Attempt { peer_id, .. }) => Some(peer_id),
BatchState::Poisoned => unreachable!("Poisoned batch"),
}
}

/// Returns the count of stored pending blocks if in awaiting processing state
pub fn pending_blocks(&self) -> usize {
match &self.state {
BatchState::AwaitingProcessing(_, blocks, _) => blocks.len(),
BatchState::AwaitingDownload
| BatchState::Downloading { .. }
| BatchState::Processing { .. }
| BatchState::AwaitingValidation { .. }
| BatchState::Poisoned
| BatchState::Failed => 0,
}
}

/// Returns a BlocksByRange request associated with the batch.
pub fn to_blocks_by_range_request(&self) -> (BlocksByRangeRequest, ByRangeRequestType) {
(
Expand Down Expand Up @@ -293,7 +307,7 @@ impl<E: EthSpec, B: BatchConfig> BatchInfo<E, B> {
}

let received = blocks.len();
self.state = BatchState::AwaitingProcessing(peer, blocks);
self.state = BatchState::AwaitingProcessing(peer, blocks, Instant::now());
Ok(received)
}
BatchState::Poisoned => unreachable!("Poisoned batch"),
Expand Down Expand Up @@ -365,11 +379,11 @@ impl<E: EthSpec, B: BatchConfig> BatchInfo<E, B> {
}
}

pub fn start_processing(&mut self) -> Result<Vec<RpcBlock<E>>, WrongState> {
pub fn start_processing(&mut self) -> Result<(Vec<RpcBlock<E>>, Duration), WrongState> {
match self.state.poison() {
BatchState::AwaitingProcessing(peer, blocks) => {
BatchState::AwaitingProcessing(peer, blocks, start_instant) => {
self.state = BatchState::Processing(Attempt::new::<B, E>(peer, &blocks));
Ok(blocks)
Ok((blocks, start_instant.elapsed()))
}
BatchState::Poisoned => unreachable!("Poisoned batch"),
other => {
Expand Down Expand Up @@ -515,7 +529,7 @@ impl<E: EthSpec> std::fmt::Debug for BatchState<E> {
}) => write!(f, "AwaitingValidation({})", peer_id),
BatchState::AwaitingDownload => f.write_str("AwaitingDownload"),
BatchState::Failed => f.write_str("Failed"),
BatchState::AwaitingProcessing(ref peer, ref blocks) => {
BatchState::AwaitingProcessing(ref peer, ref blocks, _) => {
write!(f, "AwaitingProcessing({}, {} blocks)", peer, blocks.len())
}
BatchState::Downloading(peer, request_id) => {
Expand Down
62 changes: 58 additions & 4 deletions beacon_node/network/src/sync/range_sync/chain.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use super::batch::{BatchInfo, BatchProcessingResult, BatchState};
use super::RangeSyncType;
use crate::metrics;
use crate::network_beacon_processor::ChainSegmentProcessId;
use crate::sync::network_context::RangeRequestId;
use crate::sync::{network_context::SyncNetworkContext, BatchOperationOutcome, BatchProcessResult};
Expand All @@ -11,6 +13,7 @@ use rand::{seq::SliceRandom, Rng};
use slog::{crit, debug, o, warn};
use std::collections::{btree_map::Entry, BTreeMap, HashSet};
use std::hash::{Hash, Hasher};
use strum::IntoStaticStr;
use types::{Epoch, EthSpec, Hash256, Slot};

/// Blocks are downloaded in batches from peers. This constant specifies how many epochs worth of
Expand Down Expand Up @@ -53,13 +56,23 @@ pub struct KeepChain;
pub type ChainId = u64;
pub type BatchId = Epoch;

#[derive(Debug, Copy, Clone, IntoStaticStr)]
pub enum SyncingChainType {
Head,
Finalized,
Backfill,
}

/// A chain of blocks that need to be downloaded. Peers who claim to contain the target head
/// root are grouped into the peer pool and queried for batches when downloading the
/// chain.
pub struct SyncingChain<T: BeaconChainTypes> {
/// A random id used to identify this chain.
id: ChainId,

/// SyncingChain type
pub chain_type: SyncingChainType,

/// The start of the chain segment. Any epoch previous to this one has been validated.
pub start_epoch: Epoch,

Expand Down Expand Up @@ -126,6 +139,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
target_head_slot: Slot,
target_head_root: Hash256,
peer_id: PeerId,
chain_type: SyncingChainType,
log: &slog::Logger,
) -> Self {
let mut peers = FnvHashMap::default();
Expand All @@ -135,6 +149,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {

SyncingChain {
id,
chain_type,
start_epoch,
target_head_slot,
target_head_root,
Expand Down Expand Up @@ -171,6 +186,14 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
self.validated_batches * EPOCHS_PER_BATCH
}

/// Returns the total count of pending blocks in all the batches of this chain
pub fn pending_blocks(&self) -> usize {
self.batches
.values()
.map(|batch| batch.pending_blocks())
.sum()
}

/// Removes a peer from the chain.
/// If the peer has active batches, those are considered failed and re-requested.
pub fn remove_peer(
Expand Down Expand Up @@ -305,7 +328,12 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
// result callback. This is done, because an empty batch could end a chain and the logic
// for removing chains and checking completion is in the callback.

let blocks = batch.start_processing()?;
let (blocks, duration_in_awaiting_processing) = batch.start_processing()?;
metrics::observe_duration(
&metrics::SYNCING_CHAIN_BATCH_AWAITING_PROCESSING,
duration_in_awaiting_processing,
);

let process_id = ChainSegmentProcessId::RangeBatchId(self.id, batch_id);
self.current_processing_batch = Some(batch_id);

Expand Down Expand Up @@ -469,10 +497,27 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
// We consider three cases. Batch was successfully processed, Batch failed processing due
// to a faulty peer, or batch failed processing but the peer can't be deemed faulty.
match result {
BatchProcessResult::Success { was_non_empty } => {
BatchProcessResult::Success {
sent_blocks,
imported_blocks,
} => {
if sent_blocks > imported_blocks {
let ignored_blocks = sent_blocks - imported_blocks;
metrics::inc_counter_vec_by(
&metrics::SYNCING_CHAINS_IGNORED_BLOCKS,
&[self.chain_type.into()],
ignored_blocks as u64,
);
}
metrics::inc_counter_vec(
&metrics::SYNCING_CHAINS_PROCESSED_BATCHES,
&[self.chain_type.into()],
);

batch.processing_completed(BatchProcessingResult::Success)?;

if *was_non_empty {
// was not empty = sent_blocks > 0
if *sent_blocks > 0 {
// If the processed batch was not empty, we can validate previous unvalidated
// blocks.
self.advance_chain(network, batch_id);
Expand Down Expand Up @@ -515,7 +560,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
match batch.processing_completed(BatchProcessingResult::FaultyFailure)? {
BatchOperationOutcome::Continue => {
// Chain can continue. Check if it can be moved forward.
if *imported_blocks {
if *imported_blocks > 0 {
// At least one block was successfully verified and imported, so we can be sure all
// previous batches are valid and we only need to download the current failed
// batch.
Expand Down Expand Up @@ -1142,3 +1187,12 @@ impl RemoveChain {
)
}
}

impl From<RangeSyncType> for SyncingChainType {
fn from(value: RangeSyncType) -> Self {
match value {
RangeSyncType::Head => Self::Head,
RangeSyncType::Finalized => Self::Finalized,
}
}
}
Loading
Loading