Skip to content

Commit

Permalink
Check da_checker before doing a block lookup request (#5681)
Browse files Browse the repository at this point in the history
Squashed commit of the following:

commit 4a23356
Author: dapplion <[email protected]>
Date:   Wed May 1 20:23:56 2024 +0900

    Ensure consistent handling of lookup result

commit 2a314ee
Author: dapplion <[email protected]>
Date:   Wed May 1 18:30:50 2024 +0900

    Check da_checker before doing a block lookup request
  • Loading branch information
jimmygchen committed May 1, 2024
1 parent 5a467cf commit 1544247
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 85 deletions.
8 changes: 8 additions & 0 deletions beacon_node/network/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,11 +244,19 @@ lazy_static! {
"sync_parent_block_lookups",
"Number of parent block lookups underway"
);
pub static ref SYNC_LOOKUP_CREATED: Result<IntCounter> = try_create_int_counter(
"sync_lookups_created_total",
"Total count of sync lookups created",
);
pub static ref SYNC_LOOKUP_DROPPED: Result<IntCounterVec> = try_create_int_counter_vec(
"sync_lookups_dropped_total",
"Total count of sync lookups dropped by reason",
&["reason"]
);
pub static ref SYNC_LOOKUP_COMPLETED: Result<IntCounter> = try_create_int_counter(
"sync_lookups_completed_total",
"Total count of sync lookups completed",
);

/*
* Block Delay Metrics
Expand Down
25 changes: 9 additions & 16 deletions beacon_node/network/src/sync/block_lookups/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ use crate::sync::block_lookups::{
BlobRequestState, BlockRequestState, PeerId, SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS,
};
use crate::sync::manager::{BlockProcessType, Id, SLOT_IMPORT_TOLERANCE};
use crate::sync::network_context::{
BlobsByRootSingleBlockRequest, BlocksByRootSingleRequest, SyncNetworkContext,
};
use crate::sync::network_context::SyncNetworkContext;
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::BeaconChainTypes;
use std::sync::Arc;
Expand Down Expand Up @@ -42,9 +40,6 @@ pub(crate) struct BlockIsProcessed(pub bool);
/// safety when handling a block/blob response ensuring we only mutate the correct corresponding
/// state.
pub trait RequestState<T: BeaconChainTypes> {
/// The type of the request .
type RequestType;

/// The type created after validation.
type VerifiedResponseType: Clone;

Expand All @@ -71,9 +66,11 @@ pub trait RequestState<T: BeaconChainTypes> {
.use_rand_available_peer()
.ok_or(LookupRequestError::NoPeers)?;

// make_request returns true only if a request was made
// make_request returns true only if a request needs to be made
if self.make_request(id, peer_id, downloaded_block_expected_blobs, cx)? {
self.get_state_mut().on_download_start()?;
} else {
self.get_state_mut().on_completed_request()?;
}

// Otherwise, attempt to progress awaiting processing
Expand All @@ -92,7 +89,9 @@ pub trait RequestState<T: BeaconChainTypes> {
Ok(())
}

/// Send the request to the network service.
/// Request the network context to prepare a request of a component of `block_root`. If the
/// request is not necessary because the component is already known / processed, return false.
/// Return true if it sent a request and we can expect an event back from the network.
fn make_request(
&self,
id: Id,
Expand Down Expand Up @@ -126,7 +125,6 @@ pub trait RequestState<T: BeaconChainTypes> {
}

impl<T: BeaconChainTypes> RequestState<T> for BlockRequestState<T::EthSpec> {
type RequestType = BlocksByRootSingleRequest;
type VerifiedResponseType = Arc<SignedBeaconBlock<T::EthSpec>>;

fn make_request(
Expand All @@ -136,12 +134,8 @@ impl<T: BeaconChainTypes> RequestState<T> for BlockRequestState<T::EthSpec> {
_: Option<usize>,
cx: &mut SyncNetworkContext<T>,
) -> Result<bool, LookupRequestError> {
cx.block_lookup_request(
id,
peer_id,
BlocksByRootSingleRequest(self.requested_block_root),
)
.map_err(LookupRequestError::SendFailed)
cx.block_lookup_request(id, peer_id, self.requested_block_root)
.map_err(LookupRequestError::SendFailed)
}

fn send_for_processing(
Expand Down Expand Up @@ -179,7 +173,6 @@ impl<T: BeaconChainTypes> RequestState<T> for BlockRequestState<T::EthSpec> {
}

impl<T: BeaconChainTypes> RequestState<T> for BlobRequestState<T::EthSpec> {
type RequestType = BlobsByRootSingleBlockRequest;
type VerifiedResponseType = FixedBlobSidecarList<T::EthSpec>;

fn make_request(
Expand Down
117 changes: 67 additions & 50 deletions beacon_node/network/src/sync/block_lookups/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use self::parent_chain::{compute_parent_chains, NodeChain};
pub use self::single_block_lookup::DownloadResult;
use self::single_block_lookup::{LookupRequestError, SingleBlockLookup};
use self::single_block_lookup::{LookupRequestError, LookupResult, SingleBlockLookup};
use super::manager::{BlockProcessType, BlockProcessingResult};
use super::network_context::{RpcProcessingResult, SyncNetworkContext};
use crate::metrics;
Expand All @@ -17,6 +17,7 @@ use lighthouse_network::{PeerAction, PeerId};
use lru_cache::LRUTimeCache;
pub use single_block_lookup::{BlobRequestState, BlockRequestState};
use slog::{debug, error, trace, warn, Logger};
use std::collections::hash_map::Entry;
use std::sync::Arc;
use std::time::Duration;
use store::Hash256;
Expand Down Expand Up @@ -266,6 +267,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
"peer_ids" => ?peers,
"block" => ?block_root,
);
metrics::inc_counter(&metrics::SYNC_LOOKUP_CREATED);

// If we know that this lookup has unknown parent (is awaiting a parent lookup to resolve),
// signal here to hold processing downloaded data.
Expand All @@ -276,14 +278,20 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
lookup.add_child_components(block_component);
}

if let Err(e) = lookup.continue_requests(cx) {
self.on_lookup_request_error(lookup.id, e, "new_current_lookup");
false
} else {
self.single_block_lookups.insert(lookup.id, lookup);
self.update_metrics();
true
}
let id = lookup.id;
let lookup = match self.single_block_lookups.entry(id) {
Entry::Vacant(entry) => entry.insert(lookup),
Entry::Occupied(_) => {
// Should never happen
warn!(self.log, "Lookup exists with same id"; "id" => id);
return false;
}
};

let result = lookup.continue_requests(cx);
self.on_lookup_result(id, result, "new_current_lookup", cx);
self.update_metrics();
true
}

/* Lookup responses */
Expand All @@ -296,9 +304,8 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
response: RpcProcessingResult<R::VerifiedResponseType>,
cx: &mut SyncNetworkContext<T>,
) {
if let Err(e) = self.on_download_response_inner::<R>(id, peer_id, response, cx) {
self.on_lookup_request_error(id, e, "download_response");
}
let result = self.on_download_response_inner::<R>(id, peer_id, response, cx);
self.on_lookup_result(id, result, "download_response", cx);
}

/// Process a block or blob response received from a single lookup request.
Expand All @@ -308,7 +315,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
peer_id: PeerId,
response: RpcProcessingResult<R::VerifiedResponseType>,
cx: &mut SyncNetworkContext<T>,
) -> Result<(), LookupRequestError> {
) -> Result<LookupResult, LookupRequestError> {
// Downscore peer even if lookup is not known
// Only downscore lookup verify errors. RPC errors are downscored in the network handler.
if let Err(LookupFailure::LookupVerifyError(e)) = &response {
Expand All @@ -321,7 +328,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
// We don't have the ability to cancel in-flight RPC requests. So this can happen
// if we started this RPC request, and later saw the block/blobs via gossip.
debug!(self.log, "Block returned for single block lookup not present"; "id" => id);
return Ok(());
return Err(LookupRequestError::UnknownLookup);
};

let block_root = lookup.block_root();
Expand All @@ -346,7 +353,6 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
peer_id,
})?;
// continue_request will send for processing as the request state is AwaitingProcessing
lookup.continue_request::<R>(cx)
}
Err(e) => {
debug!(self.log,
Expand All @@ -359,9 +365,10 @@ impl<T: BeaconChainTypes> BlockLookups<T> {

request_state.on_download_failure()?;
// continue_request will retry a download as the request state is AwaitingDownload
lookup.continue_request::<R>(cx)
}
}

lookup.continue_requests(cx)
}

/* Error responses */
Expand All @@ -381,27 +388,26 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
result: BlockProcessingResult<T::EthSpec>,
cx: &mut SyncNetworkContext<T>,
) {
if let Err(e) = match process_type {
let lookup_result = match process_type {
BlockProcessType::SingleBlock { id } => {
self.on_processing_result_inner::<BlockRequestState<T::EthSpec>>(id, result, cx)
}
BlockProcessType::SingleBlob { id } => {
self.on_processing_result_inner::<BlobRequestState<T::EthSpec>>(id, result, cx)
}
} {
self.on_lookup_request_error(process_type.id(), e, "processing_result");
}
};
self.on_lookup_result(process_type.id(), lookup_result, "processing_result", cx);
}

pub fn on_processing_result_inner<R: RequestState<T>>(
&mut self,
lookup_id: SingleLookupId,
result: BlockProcessingResult<T::EthSpec>,
cx: &mut SyncNetworkContext<T>,
) -> Result<(), LookupRequestError> {
) -> Result<LookupResult, LookupRequestError> {
let Some(lookup) = self.single_block_lookups.get_mut(&lookup_id) else {
debug!(self.log, "Unknown single block lookup"; "id" => lookup_id);
return Ok(());
return Err(LookupRequestError::UnknownLookup);
};

let block_root = lookup.block_root();
Expand Down Expand Up @@ -435,15 +441,17 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
// wrong. If we already had both a block and blobs response processed, we should penalize the
// blobs peer because they did not provide all blobs on the initial request.
if lookup.both_components_processed() {
let blob_peer = lookup
if let Some(blob_peer) = lookup
.blob_request_state
.state
.on_post_process_validation_failure()?;
cx.report_peer(
blob_peer,
PeerAction::MidToleranceError,
"sent_incomplete_blobs",
);
.on_post_process_validation_failure()?
{
cx.report_peer(
blob_peer,
PeerAction::MidToleranceError,
"sent_incomplete_blobs",
);
}
}
Action::Retry
}
Expand Down Expand Up @@ -520,47 +528,41 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
Action::Retry => {
// Trigger download for all components in case `MissingComponents` failed the blob
// request. Also if blobs are `AwaitingProcessing` and need to be progressed
lookup.continue_requests(cx)?;
lookup.continue_requests(cx)
}
Action::ParentUnknown { parent_root } => {
let peers = lookup.all_available_peers().cloned().collect::<Vec<_>>();
lookup.set_awaiting_parent(parent_root);
debug!(self.log, "Marking lookup as awaiting parent"; "lookup" => %block_root, "parent_root" => %parent_root);
self.search_parent_of_child(parent_root, block_root, &peers, cx);
Ok(LookupResult::Pending)
}
Action::Drop => {
// Drop with noop
self.drop_lookup_and_children(lookup_id);
self.update_metrics();
Err(LookupRequestError::Failed)
}
Action::Continue => {
// Drop this completed lookup only
self.single_block_lookups.remove(&lookup_id);
self.update_metrics();
debug!(self.log, "Dropping completed lookup"; "block" => %block_root);
// Block imported, continue the requests of pending child blocks
self.continue_child_lookups(block_root, cx);
Ok(LookupResult::Completed)
}
}
Ok(())
}

/// Makes progress on the immediate children of `block_root`
pub fn continue_child_lookups(&mut self, block_root: Hash256, cx: &mut SyncNetworkContext<T>) {
let mut failed_lookups = vec![]; // < need to clean failed lookups latter to re-borrow &mut self
let mut lookup_results = vec![]; // < need to buffer lookup results to not re-borrow &mut self

for (id, lookup) in self.single_block_lookups.iter_mut() {
if lookup.awaiting_parent() == Some(block_root) {
lookup.resolve_awaiting_parent();
debug!(self.log, "Continuing child lookup"; "parent_root" => %block_root, "block_root" => %lookup.block_root());
if let Err(e) = lookup.continue_requests(cx) {
failed_lookups.push((*id, e));
}
let result = lookup.continue_requests(cx);
lookup_results.push((*id, result));
}
}

for (id, e) in failed_lookups {
self.on_lookup_request_error(id, e, "continue_child_lookups");
for (id, result) in lookup_results {
self.on_lookup_result(id, result, "continue_child_lookups", cx);
}
}

Expand All @@ -585,16 +587,31 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}

/// Common handler a lookup request error, drop it and update metrics
fn on_lookup_request_error(
fn on_lookup_result(
&mut self,
id: SingleLookupId,
error: LookupRequestError,
result: Result<LookupResult, LookupRequestError>,
source: &str,
cx: &mut SyncNetworkContext<T>,
) {
debug!(self.log, "Dropping lookup on request error"; "id" => id, "source" => source, "error" => ?error);
metrics::inc_counter_vec(&metrics::SYNC_LOOKUP_DROPPED, &[error.as_metric()]);
self.drop_lookup_and_children(id);
self.update_metrics();
match result {
Ok(LookupResult::Pending) => {} // no action
Ok(LookupResult::Completed) => {
if let Some(lookup) = self.single_block_lookups.remove(&id) {
debug!(self.log, "Dropping completed lookup"; "block" => %lookup.block_root());
metrics::inc_counter(&metrics::SYNC_LOOKUP_COMPLETED);
// Block imported, continue the requests of pending child blocks
self.continue_child_lookups(lookup.block_root(), cx);
self.update_metrics();
}
}
Err(error) => {
debug!(self.log, "Dropping lookup on request error"; "id" => id, "source" => source, "error" => ?error);
metrics::inc_counter_vec(&metrics::SYNC_LOOKUP_DROPPED, &[error.into()]);
self.drop_lookup_and_children(id);
self.update_metrics();
}
}
}

/* Helper functions */
Expand Down
Loading

0 comments on commit 1544247

Please sign in to comment.