Skip to content

Commit

Permalink
Check da_checker before doing a block lookup request
Browse files Browse the repository at this point in the history
  • Loading branch information
dapplion committed May 1, 2024
1 parent beaa586 commit 2a314ee
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 19 deletions.
21 changes: 6 additions & 15 deletions beacon_node/network/src/sync/block_lookups/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ use crate::sync::block_lookups::{
BlobRequestState, BlockRequestState, PeerId, SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS,
};
use crate::sync::manager::{BlockProcessType, Id, SLOT_IMPORT_TOLERANCE};
use crate::sync::network_context::{
BlobsByRootSingleBlockRequest, BlocksByRootSingleRequest, SyncNetworkContext,
};
use crate::sync::network_context::SyncNetworkContext;
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::BeaconChainTypes;
use std::sync::Arc;
Expand Down Expand Up @@ -42,9 +40,6 @@ pub(crate) struct BlockIsProcessed(pub bool);
/// safety when handling a block/blob response ensuring we only mutate the correct corresponding
/// state.
pub trait RequestState<T: BeaconChainTypes> {
/// The type of the request .
type RequestType;

/// The type created after validation.
type VerifiedResponseType: Clone;

Expand All @@ -71,9 +66,11 @@ pub trait RequestState<T: BeaconChainTypes> {
.use_rand_available_peer()
.ok_or(LookupRequestError::NoPeers)?;

// make_request returns true only if a request was made
// make_request returns true only if a request needs to be made
if self.make_request(id, peer_id, downloaded_block_expected_blobs, cx)? {
self.get_state_mut().on_download_start()?;
} else {
self.get_state_mut().on_completed_request()?;
}

// Otherwise, attempt to progress awaiting processing
Expand Down Expand Up @@ -126,7 +123,6 @@ pub trait RequestState<T: BeaconChainTypes> {
}

impl<T: BeaconChainTypes> RequestState<T> for BlockRequestState<T::EthSpec> {
type RequestType = BlocksByRootSingleRequest;
type VerifiedResponseType = Arc<SignedBeaconBlock<T::EthSpec>>;

fn make_request(
Expand All @@ -136,12 +132,8 @@ impl<T: BeaconChainTypes> RequestState<T> for BlockRequestState<T::EthSpec> {
_: Option<usize>,
cx: &mut SyncNetworkContext<T>,
) -> Result<bool, LookupRequestError> {
cx.block_lookup_request(
id,
peer_id,
BlocksByRootSingleRequest(self.requested_block_root),
)
.map_err(LookupRequestError::SendFailed)
cx.block_lookup_request(id, peer_id, self.requested_block_root)
.map_err(LookupRequestError::SendFailed)
}

fn send_for_processing(
Expand Down Expand Up @@ -179,7 +171,6 @@ impl<T: BeaconChainTypes> RequestState<T> for BlockRequestState<T::EthSpec> {
}

impl<T: BeaconChainTypes> RequestState<T> for BlobRequestState<T::EthSpec> {
type RequestType = BlobsByRootSingleBlockRequest;
type VerifiedResponseType = FixedBlobSidecarList<T::EthSpec>;

fn make_request(
Expand Down
12 changes: 10 additions & 2 deletions beacon_node/network/src/sync/block_lookups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -598,9 +598,17 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
error: LookupRequestError,
source: &str,
) {
debug!(self.log, "Dropping lookup on request error"; "id" => id, "source" => source, "error" => ?error);
match error {
LookupRequestError::AlreadyImported => {
debug!(self.log, "Lookup components already imported"; "id" => id);
self.single_block_lookups.remove(&id);
}
other => {
debug!(self.log, "Dropping lookup on request error"; "id" => id, "source" => source, "error" => ?other);
self.drop_lookup_and_children(id);
}
}
metrics::inc_counter_vec(&metrics::SYNC_LOOKUP_DROPPED, &[error.as_metric()]);
self.drop_lookup_and_children(id);
self.update_metrics();
}

Expand Down
25 changes: 25 additions & 0 deletions beacon_node/network/src/sync/block_lookups/single_block_lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub enum LookupRequestError {
NoPeers,
SendFailed(&'static str),
BadState(String),
AlreadyImported,
}

pub struct SingleBlockLookup<T: BeaconChainTypes> {
Expand Down Expand Up @@ -119,6 +120,17 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
// TODO: Check what's necessary to download, specially for blobs
self.continue_request::<BlockRequestState<T::EthSpec>>(cx)?;
self.continue_request::<BlobRequestState<T::EthSpec>>(cx)?;

// If all components of this lookup are already processed, there will be no future events
// that can make progress so it must be dropped. This case can happen if we receive the
// components from gossip during a retry. This case is not technically an error, but it's
// easier to handle this way similarly to `BlockError::BlockIsAlreadyKnown`.
if self.block_request_state.state.is_processed()
&& self.blob_request_state.state.is_processed()
{
return Err(LookupRequestError::AlreadyImported);
}

Ok(())
}

Expand Down Expand Up @@ -411,6 +423,19 @@ impl<T: Clone> SingleLookupRequestState<T> {
}
}

/// Mark a request as complete without any download or processing
pub fn on_completed_request(&mut self) -> Result<(), LookupRequestError> {
match &self.state {
State::AwaitingDownload => {
self.state = State::Processed(peer_id);
Ok(())
}
other => Err(LookupRequestError::BadState(format!(
"Bad state on_completed_request expected AwaitingDownload got {other}"
))),
}
}

/// The total number of failures, whether it be processing or downloading.
pub fn failed_attempts(&self) -> u8 {
self.failed_processing + self.failed_downloading
Expand Down
12 changes: 10 additions & 2 deletions beacon_node/network/src/sync/network_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,12 +296,18 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
}
}

/// Request block of `block_root` if necessary by checking:
/// - If the da_checker has a pending block from gossip or a previous request
pub fn block_lookup_request(
&mut self,
lookup_id: SingleLookupId,
peer_id: PeerId,
request: BlocksByRootSingleRequest,
block_root: Hash256,
) -> Result<bool, &'static str> {
if self.chain.data_availability_checker.has_block(&block_root) {
return Ok(false);
}

let id = SingleLookupReqId {
lookup_id,
req_id: self.next_id(),
Expand All @@ -311,11 +317,13 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
self.log,
"Sending BlocksByRoot Request";
"method" => "BlocksByRoot",
"block_root" => ?request.0,
"block_root" => ?block_root,
"peer" => %peer_id,
"id" => ?id
);

let request = BlocksByRootSingleRequest(block_root);

self.send_network_msg(NetworkMessage::SendRequest {
peer_id,
request: Request::BlocksByRoot(request.into_request(&self.chain.spec)),
Expand Down

0 comments on commit 2a314ee

Please sign in to comment.