Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move sync lookup trait function to its caller #5704

Merged
merged 2 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 1 addition & 54 deletions beacon_node/network/src/sync/block_lookups/common.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use crate::sync::block_lookups::single_block_lookup::{
LookupRequestError, SingleBlockLookup, SingleLookupRequestState,
};
use crate::sync::block_lookups::{
BlobRequestState, BlockRequestState, PeerId, SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS,
};
use crate::sync::block_lookups::{BlobRequestState, BlockRequestState, PeerId};
use crate::sync::manager::{BlockProcessType, Id, SLOT_IMPORT_TOLERANCE};
use crate::sync::network_context::SyncNetworkContext;
use beacon_chain::block_verification_types::RpcBlock;
Expand All @@ -26,11 +24,6 @@ pub enum ResponseType {
/// is further back than the most recent head slot.
pub(crate) const PARENT_DEPTH_TOLERANCE: usize = SLOT_IMPORT_TOLERANCE * 2;

/// Wrapper around bool to prevent mixing this argument with `BlockIsProcessed`
pub(crate) struct AwaitingParent(pub bool);
/// Wrapper around bool to prevent mixing this argument with `AwaitingParent`
pub(crate) struct BlockIsProcessed(pub bool);

/// This trait unifies common single block lookup functionality across blocks and blobs. This
/// includes making requests, verifying responses, and handling processing results. A
/// `SingleBlockLookup` includes both a `BlockRequestState` and a `BlobRequestState`, this trait is
Expand All @@ -43,52 +36,6 @@ pub trait RequestState<T: BeaconChainTypes> {
/// The type created after validation.
type VerifiedResponseType: Clone;

/// Potentially makes progress on this request if it's in a progress-able state
fn continue_request(
&mut self,
id: Id,
awaiting_parent: AwaitingParent,
downloaded_block_expected_blobs: Option<usize>,
block_is_processed: BlockIsProcessed,
cx: &mut SyncNetworkContext<T>,
) -> Result<(), LookupRequestError> {
// Attempt to progress awaiting downloads
if self.get_state().is_awaiting_download() {
// Verify the current request has not exceeded the maximum number of attempts.
let request_state = self.get_state();
if request_state.failed_attempts() >= SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS {
let cannot_process = request_state.more_failed_processing_attempts();
return Err(LookupRequestError::TooManyAttempts { cannot_process });
}

let peer_id = self
.get_state_mut()
.use_rand_available_peer()
.ok_or(LookupRequestError::NoPeers)?;

// make_request returns true only if a request needs to be made
if self.make_request(id, peer_id, downloaded_block_expected_blobs, cx)? {
self.get_state_mut().on_download_start()?;
} else {
self.get_state_mut().on_completed_request()?;
}

// Otherwise, attempt to progress awaiting processing
// If this request is awaiting a parent lookup to be processed, do not send for processing.
// The request will be rejected with unknown parent error.
} else if !awaiting_parent.0
&& (block_is_processed.0 || matches!(Self::response_type(), ResponseType::Block))
{
// maybe_start_processing returns Some if state == AwaitingProcess. This pattern is
// useful to conditionally access the result data.
if let Some(result) = self.get_state_mut().maybe_start_processing() {
return Self::send_for_processing(id, result, cx);
}
}

Ok(())
}

/// Request the network context to prepare a request of a component of `block_root`. If the
/// request is not necessary because the component is already known / processed, return false.
/// Return true if it sent a request and we can expect an event back from the network.
Expand Down
50 changes: 40 additions & 10 deletions beacon_node/network/src/sync/block_lookups/single_block_lookup.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::common::{AwaitingParent, BlockIsProcessed};
use super::{BlockComponent, PeerId};
use super::common::ResponseType;
use super::{BlockComponent, PeerId, SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS};
use crate::sync::block_lookups::common::RequestState;
use crate::sync::block_lookups::Id;
use crate::sync::network_context::SyncNetworkContext;
Expand Down Expand Up @@ -150,7 +150,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
}
}

/// Wrapper around `RequestState::continue_request` to inject lookup data
/// Potentially makes progress on this request if it's in a progress-able state
pub fn continue_request<R: RequestState<T>>(
&mut self,
cx: &mut SyncNetworkContext<T>,
Expand All @@ -163,13 +163,43 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
.peek_downloaded_data()
.map(|block| block.num_expected_blobs());
let block_is_processed = self.block_request_state.state.is_processed();
R::request_state_mut(self).continue_request(
id,
AwaitingParent(awaiting_parent),
downloaded_block_expected_blobs,
BlockIsProcessed(block_is_processed),
cx,
)
let request = R::request_state_mut(self);

// Attempt to progress awaiting downloads
if request.get_state().is_awaiting_download() {
// Verify the current request has not exceeded the maximum number of attempts.
let request_state = request.get_state();
if request_state.failed_attempts() >= SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS {
let cannot_process = request_state.more_failed_processing_attempts();
return Err(LookupRequestError::TooManyAttempts { cannot_process });
}

let peer_id = request
.get_state_mut()
.use_rand_available_peer()
.ok_or(LookupRequestError::NoPeers)?;

// make_request returns true only if a request needs to be made
if request.make_request(id, peer_id, downloaded_block_expected_blobs, cx)? {
request.get_state_mut().on_download_start()?;
} else {
request.get_state_mut().on_completed_request()?;
}

// Otherwise, attempt to progress awaiting processing
// If this request is awaiting a parent lookup to be processed, do not send for processing.
// The request will be rejected with unknown parent error.
} else if !awaiting_parent
&& (block_is_processed || matches!(R::response_type(), ResponseType::Block))
{
// maybe_start_processing returns Some if state == AwaitingProcess. This pattern is
// useful to conditionally access the result data.
if let Some(result) = request.get_state_mut().maybe_start_processing() {
return R::send_for_processing(id, result, cx);
}
}

Ok(())
}

/// Add all given peers to both block and blob request states.
Expand Down
Loading