Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check da_checker before doing a block lookup request #5681

Merged
merged 3 commits into from
May 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions beacon_node/network/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,11 +244,19 @@ lazy_static! {
"sync_parent_block_lookups",
"Number of parent block lookups underway"
);
pub static ref SYNC_LOOKUP_CREATED: Result<IntCounter> = try_create_int_counter(
"sync_lookups_created_total",
"Total count of sync lookups created",
);
pub static ref SYNC_LOOKUP_DROPPED: Result<IntCounterVec> = try_create_int_counter_vec(
"sync_lookups_dropped_total",
"Total count of sync lookups dropped by reason",
&["reason"]
);
pub static ref SYNC_LOOKUP_COMPLETED: Result<IntCounter> = try_create_int_counter(
"sync_lookups_completed_total",
"Total count of sync lookups completed",
);

/*
* Block Delay Metrics
Expand Down
25 changes: 9 additions & 16 deletions beacon_node/network/src/sync/block_lookups/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ use crate::sync::block_lookups::{
BlobRequestState, BlockRequestState, PeerId, SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS,
};
use crate::sync::manager::{BlockProcessType, Id, SLOT_IMPORT_TOLERANCE};
use crate::sync::network_context::{
BlobsByRootSingleBlockRequest, BlocksByRootSingleRequest, SyncNetworkContext,
};
use crate::sync::network_context::SyncNetworkContext;
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::BeaconChainTypes;
use std::sync::Arc;
Expand Down Expand Up @@ -42,9 +40,6 @@ pub(crate) struct BlockIsProcessed(pub bool);
/// safety when handling a block/blob response ensuring we only mutate the correct corresponding
/// state.
pub trait RequestState<T: BeaconChainTypes> {
/// The type of the request .
type RequestType;

/// The type created after validation.
type VerifiedResponseType: Clone;

Expand All @@ -71,9 +66,11 @@ pub trait RequestState<T: BeaconChainTypes> {
.use_rand_available_peer()
.ok_or(LookupRequestError::NoPeers)?;

// make_request returns true only if a request was made
// make_request returns true only if a request needs to be made
if self.make_request(id, peer_id, downloaded_block_expected_blobs, cx)? {
self.get_state_mut().on_download_start()?;
} else {
self.get_state_mut().on_completed_request()?;
}

// Otherwise, attempt to progress awaiting processing
Expand All @@ -92,7 +89,9 @@ pub trait RequestState<T: BeaconChainTypes> {
Ok(())
}

/// Send the request to the network service.
/// Request the network context to prepare a request of a component of `block_root`. If the
/// request is not necessary because the component is already known / processed, return false.
/// Return true if it sent a request and we can expect an event back from the network.
fn make_request(
&self,
id: Id,
Expand Down Expand Up @@ -126,7 +125,6 @@ pub trait RequestState<T: BeaconChainTypes> {
}

impl<T: BeaconChainTypes> RequestState<T> for BlockRequestState<T::EthSpec> {
type RequestType = BlocksByRootSingleRequest;
type VerifiedResponseType = Arc<SignedBeaconBlock<T::EthSpec>>;

fn make_request(
Expand All @@ -136,12 +134,8 @@ impl<T: BeaconChainTypes> RequestState<T> for BlockRequestState<T::EthSpec> {
_: Option<usize>,
cx: &mut SyncNetworkContext<T>,
) -> Result<bool, LookupRequestError> {
cx.block_lookup_request(
id,
peer_id,
BlocksByRootSingleRequest(self.requested_block_root),
)
.map_err(LookupRequestError::SendFailed)
cx.block_lookup_request(id, peer_id, self.requested_block_root)
.map_err(LookupRequestError::SendFailed)
}

fn send_for_processing(
Expand Down Expand Up @@ -179,7 +173,6 @@ impl<T: BeaconChainTypes> RequestState<T> for BlockRequestState<T::EthSpec> {
}

impl<T: BeaconChainTypes> RequestState<T> for BlobRequestState<T::EthSpec> {
type RequestType = BlobsByRootSingleBlockRequest;
type VerifiedResponseType = FixedBlobSidecarList<T::EthSpec>;

fn make_request(
Expand Down
117 changes: 67 additions & 50 deletions beacon_node/network/src/sync/block_lookups/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use self::parent_chain::{compute_parent_chains, NodeChain};
pub use self::single_block_lookup::DownloadResult;
use self::single_block_lookup::{LookupRequestError, SingleBlockLookup};
use self::single_block_lookup::{LookupRequestError, LookupResult, SingleBlockLookup};
use super::manager::{BlockProcessType, BlockProcessingResult};
use super::network_context::{RpcProcessingResult, SyncNetworkContext};
use crate::metrics;
Expand All @@ -17,6 +17,7 @@ use lighthouse_network::{PeerAction, PeerId};
use lru_cache::LRUTimeCache;
pub use single_block_lookup::{BlobRequestState, BlockRequestState};
use slog::{debug, error, trace, warn, Logger};
use std::collections::hash_map::Entry;
use std::sync::Arc;
use std::time::Duration;
use store::Hash256;
Expand Down Expand Up @@ -266,6 +267,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
"peer_ids" => ?peers,
"block" => ?block_root,
);
metrics::inc_counter(&metrics::SYNC_LOOKUP_CREATED);

// If we know that this lookup has unknown parent (is awaiting a parent lookup to resolve),
// signal here to hold processing downloaded data.
Expand All @@ -276,14 +278,20 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
lookup.add_child_components(block_component);
}

if let Err(e) = lookup.continue_requests(cx) {
self.on_lookup_request_error(lookup.id, e, "new_current_lookup");
false
} else {
self.single_block_lookups.insert(lookup.id, lookup);
self.update_metrics();
true
}
let id = lookup.id;
let lookup = match self.single_block_lookups.entry(id) {
Entry::Vacant(entry) => entry.insert(lookup),
Entry::Occupied(_) => {
// Should never happen
warn!(self.log, "Lookup exists with same id"; "id" => id);
return false;
}
};

let result = lookup.continue_requests(cx);
self.on_lookup_result(id, result, "new_current_lookup", cx);
self.update_metrics();
true
}

/* Lookup responses */
Expand All @@ -296,9 +304,8 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
response: RpcProcessingResult<R::VerifiedResponseType>,
cx: &mut SyncNetworkContext<T>,
) {
if let Err(e) = self.on_download_response_inner::<R>(id, peer_id, response, cx) {
self.on_lookup_request_error(id, e, "download_response");
}
let result = self.on_download_response_inner::<R>(id, peer_id, response, cx);
self.on_lookup_result(id, result, "download_response", cx);
}

/// Process a block or blob response received from a single lookup request.
Expand All @@ -308,7 +315,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
peer_id: PeerId,
response: RpcProcessingResult<R::VerifiedResponseType>,
cx: &mut SyncNetworkContext<T>,
) -> Result<(), LookupRequestError> {
) -> Result<LookupResult, LookupRequestError> {
// Downscore peer even if lookup is not known
// Only downscore lookup verify errors. RPC errors are downscored in the network handler.
if let Err(LookupFailure::LookupVerifyError(e)) = &response {
Expand All @@ -321,7 +328,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
// We don't have the ability to cancel in-flight RPC requests. So this can happen
// if we started this RPC request, and later saw the block/blobs via gossip.
debug!(self.log, "Block returned for single block lookup not present"; "id" => id);
return Ok(());
return Err(LookupRequestError::UnknownLookup);
};

let block_root = lookup.block_root();
Expand All @@ -346,7 +353,6 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
peer_id,
})?;
// continue_request will send for processing as the request state is AwaitingProcessing
lookup.continue_request::<R>(cx)
}
Err(e) => {
debug!(self.log,
Expand All @@ -359,9 +365,10 @@ impl<T: BeaconChainTypes> BlockLookups<T> {

request_state.on_download_failure()?;
// continue_request will retry a download as the request state is AwaitingDownload
lookup.continue_request::<R>(cx)
}
}

lookup.continue_requests(cx)
}

/* Error responses */
Expand All @@ -388,27 +395,26 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
result: BlockProcessingResult<T::EthSpec>,
cx: &mut SyncNetworkContext<T>,
) {
if let Err(e) = match process_type {
let lookup_result = match process_type {
BlockProcessType::SingleBlock { id } => {
self.on_processing_result_inner::<BlockRequestState<T::EthSpec>>(id, result, cx)
}
BlockProcessType::SingleBlob { id } => {
self.on_processing_result_inner::<BlobRequestState<T::EthSpec>>(id, result, cx)
}
} {
self.on_lookup_request_error(process_type.id(), e, "processing_result");
}
};
self.on_lookup_result(process_type.id(), lookup_result, "processing_result", cx);
}

pub fn on_processing_result_inner<R: RequestState<T>>(
&mut self,
lookup_id: SingleLookupId,
result: BlockProcessingResult<T::EthSpec>,
cx: &mut SyncNetworkContext<T>,
) -> Result<(), LookupRequestError> {
) -> Result<LookupResult, LookupRequestError> {
let Some(lookup) = self.single_block_lookups.get_mut(&lookup_id) else {
debug!(self.log, "Unknown single block lookup"; "id" => lookup_id);
return Ok(());
return Err(LookupRequestError::UnknownLookup);
};

let block_root = lookup.block_root();
Expand Down Expand Up @@ -442,15 +448,17 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
// wrong. If we already had both a block and blobs response processed, we should penalize the
// blobs peer because they did not provide all blobs on the initial request.
if lookup.both_components_processed() {
let blob_peer = lookup
if let Some(blob_peer) = lookup
.blob_request_state
.state
.on_post_process_validation_failure()?;
cx.report_peer(
blob_peer,
PeerAction::MidToleranceError,
"sent_incomplete_blobs",
);
.on_post_process_validation_failure()?
{
cx.report_peer(
blob_peer,
PeerAction::MidToleranceError,
"sent_incomplete_blobs",
);
}
}
Action::Retry
}
Expand Down Expand Up @@ -527,47 +535,41 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
Action::Retry => {
// Trigger download for all components in case `MissingComponents` failed the blob
// request. Also if blobs are `AwaitingProcessing` and need to be progressed
lookup.continue_requests(cx)?;
lookup.continue_requests(cx)
}
Action::ParentUnknown { parent_root } => {
let peers = lookup.all_available_peers().cloned().collect::<Vec<_>>();
lookup.set_awaiting_parent(parent_root);
debug!(self.log, "Marking lookup as awaiting parent"; "lookup" => %block_root, "parent_root" => %parent_root);
self.search_parent_of_child(parent_root, block_root, &peers, cx);
Ok(LookupResult::Pending)
}
Action::Drop => {
// Drop with noop
self.drop_lookup_and_children(lookup_id);
self.update_metrics();
Err(LookupRequestError::Failed)
}
Action::Continue => {
// Drop this completed lookup only
self.single_block_lookups.remove(&lookup_id);
self.update_metrics();
debug!(self.log, "Dropping completed lookup"; "block" => %block_root);
// Block imported, continue the requests of pending child blocks
self.continue_child_lookups(block_root, cx);
Ok(LookupResult::Completed)
}
}
Ok(())
}

/// Makes progress on the immediate children of `block_root`
pub fn continue_child_lookups(&mut self, block_root: Hash256, cx: &mut SyncNetworkContext<T>) {
let mut failed_lookups = vec![]; // < need to clean failed lookups latter to re-borrow &mut self
let mut lookup_results = vec![]; // < need to buffer lookup results to not re-borrow &mut self

for (id, lookup) in self.single_block_lookups.iter_mut() {
if lookup.awaiting_parent() == Some(block_root) {
lookup.resolve_awaiting_parent();
debug!(self.log, "Continuing child lookup"; "parent_root" => %block_root, "block_root" => %lookup.block_root());
if let Err(e) = lookup.continue_requests(cx) {
failed_lookups.push((*id, e));
}
let result = lookup.continue_requests(cx);
lookup_results.push((*id, result));
}
}

for (id, e) in failed_lookups {
self.on_lookup_request_error(id, e, "continue_child_lookups");
for (id, result) in lookup_results {
self.on_lookup_result(id, result, "continue_child_lookups", cx);
}
}

Expand All @@ -592,16 +594,31 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}

/// Common handler a lookup request error, drop it and update metrics
fn on_lookup_request_error(
fn on_lookup_result(
&mut self,
id: SingleLookupId,
error: LookupRequestError,
result: Result<LookupResult, LookupRequestError>,
source: &str,
cx: &mut SyncNetworkContext<T>,
) {
debug!(self.log, "Dropping lookup on request error"; "id" => id, "source" => source, "error" => ?error);
metrics::inc_counter_vec(&metrics::SYNC_LOOKUP_DROPPED, &[error.as_metric()]);
self.drop_lookup_and_children(id);
self.update_metrics();
match result {
Ok(LookupResult::Pending) => {} // no action
Ok(LookupResult::Completed) => {
if let Some(lookup) = self.single_block_lookups.remove(&id) {
debug!(self.log, "Dropping completed lookup"; "block" => %lookup.block_root());
metrics::inc_counter(&metrics::SYNC_LOOKUP_COMPLETED);
// Block imported, continue the requests of pending child blocks
self.continue_child_lookups(lookup.block_root(), cx);
self.update_metrics();
}
}
Err(error) => {
debug!(self.log, "Dropping lookup on request error"; "id" => id, "source" => source, "error" => ?error);
metrics::inc_counter_vec(&metrics::SYNC_LOOKUP_DROPPED, &[error.into()]);
self.drop_lookup_and_children(id);
self.update_metrics();
}
}
}

/* Helper functions */
Expand Down
Loading