diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 7c497e74584..18e2a45a49f 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -121,7 +121,6 @@ use store::{ use task_executor::{ShutdownReason, TaskExecutor}; use tokio_stream::Stream; use tree_hash::TreeHash; -use types::blob_sidecar::FixedBlobSidecarList; use types::payload::BlockProductionVersion; use types::*; @@ -2913,7 +2912,7 @@ impl BeaconChain { self: &Arc, slot: Slot, block_root: Hash256, - blobs: FixedBlobSidecarList, + blobs: Vec>>, ) -> Result> { // If this block has already been imported to forkchoice it must have been available, so // we don't need to process its blobs again. @@ -2927,7 +2926,7 @@ impl BeaconChain { if let Some(event_handler) = self.event_handler.as_ref() { if event_handler.has_blob_sidecar_subscribers() { - for blob in blobs.iter().filter_map(|maybe_blob| maybe_blob.as_ref()) { + for blob in blobs.iter() { event_handler.register(EventKind::BlobSidecar( SseBlobSidecar::from_blob_sidecar(blob), )); @@ -3185,17 +3184,13 @@ impl BeaconChain { self: &Arc, slot: Slot, block_root: Hash256, - blobs: FixedBlobSidecarList, + blobs: Vec>>, ) -> Result> { // Need to scope this to ensure the lock is dropped before calling `process_availability` // Even an explicit drop is not enough to convince the borrow checker. { let mut slashable_cache = self.observed_slashable.write(); - for header in blobs - .into_iter() - .filter_map(|b| b.as_ref().map(|b| b.signed_block_header.clone())) - .unique() - { + for header in blobs.iter().map(|b| b.signed_block_header.clone()).unique() { if verify_header_signature::>(self, &header).is_ok() { slashable_cache .observe_slashable( diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 3ef105c6d34..0f0ab93f085 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -15,7 +15,7 @@ use std::fmt::Debug; use std::num::NonZeroUsize; use std::sync::Arc; use task_executor::TaskExecutor; -use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList}; +use types::blob_sidecar::{BlobIdentifier, BlobSidecar}; use types::{BlobSidecarList, ChainSpec, Epoch, EthSpec, Hash256, SignedBeaconBlock}; mod child_components; @@ -170,13 +170,13 @@ impl DataAvailabilityChecker { pub fn put_rpc_blobs( &self, block_root: Hash256, - blobs: FixedBlobSidecarList, + blobs: Vec>>, ) -> Result, AvailabilityCheckError> { let Some(kzg) = self.kzg.as_ref() else { return Err(AvailabilityCheckError::KzgNotInitialized); }; - let verified_blobs = KzgVerifiedBlobList::new(Vec::from(blobs).into_iter().flatten(), kzg) + let verified_blobs = KzgVerifiedBlobList::new(blobs.into_iter(), kzg) .map_err(AvailabilityCheckError::Kzg)?; self.availability_cache diff --git a/beacon_node/beacon_chain/src/data_availability_checker/child_components.rs b/beacon_node/beacon_chain/src/data_availability_checker/child_components.rs index 184dfc45001..e3e8364920a 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/child_components.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/child_components.rs @@ -14,16 +14,6 @@ pub struct ChildComponents { pub downloaded_blobs: FixedBlobSidecarList, } -impl From> for ChildComponents { - fn from(value: RpcBlock) -> Self { - let (block_root, block, blobs) = value.deconstruct(); - let fixed_blobs = blobs.map(|blobs| { - FixedBlobSidecarList::from(blobs.into_iter().map(Some).collect::>()) - }); - Self::new(block_root, Some(block), fixed_blobs) - } -} - impl ChildComponents { pub fn empty(block_root: Hash256) -> Self { Self { @@ -32,34 +22,56 @@ impl ChildComponents { downloaded_blobs: <_>::default(), } } + pub fn new( block_root: Hash256, block: Option>>, - blobs: Option>, - ) -> Self { + blobs: Option>>>, + ) -> Result { let mut cache = Self::empty(block_root); if let Some(block) = block { cache.merge_block(block); } if let Some(blobs) = blobs { - cache.merge_blobs(blobs); + cache.merge_blobs(blobs)?; } - cache + Ok(cache) + } + + pub fn new_from_rpc_block(value: RpcBlock) -> Result { + let (block_root, block, blobs) = value.deconstruct(); + // Safe to unwrap because we are constructing the struct from a valid RpcBlock + Self::new(block_root, Some(block), blobs.map(Into::into)) } pub fn merge_block(&mut self, block: Arc>) { self.downloaded_block = Some(block); } - pub fn merge_blob(&mut self, blob: Arc>) { + pub fn merge_blob(&mut self, blob: Arc>) -> Result<(), String> { + if blob.index >= E::max_blobs_per_block() as u64 { + return Err(format!("Blob index {} is out of range", blob.index)); + } + self.merge_blob_unchecked(blob); + Ok(()) + } + + pub fn merge_blobs(&mut self, blobs: Vec>>) -> Result<(), String> { + for blob in blobs.into_iter() { + self.merge_blob(blob)?; + } + Ok(()) + } + + pub fn merge_blob_unchecked(&mut self, blob: Arc>) { if let Some(blob_ref) = self.downloaded_blobs.get_mut(blob.index as usize) { *blob_ref = Some(blob); } } - pub fn merge_blobs(&mut self, blobs: FixedBlobSidecarList) { - for blob in blobs.iter().flatten() { - self.merge_blob(blob.clone()); + pub fn merge_blobs_unchecked(&mut self, blobs: Vec>>) { + for blob in blobs.into_iter() { + self.merge_blob_unchecked(blob); } } diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 27b9e676da6..f6eeec2131a 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -26,7 +26,6 @@ use tokio::sync::mpsc::{self, error::TrySendError}; use types::*; pub use sync_methods::ChainSegmentProcessId; -use types::blob_sidecar::FixedBlobSidecarList; pub type Error = TrySendError>; @@ -426,11 +425,11 @@ impl NetworkBeaconProcessor { pub fn send_rpc_blobs( self: &Arc, block_root: Hash256, - blobs: FixedBlobSidecarList, + blobs: Vec>>, seen_timestamp: Duration, process_type: BlockProcessType, ) -> Result<(), Error> { - let blob_count = blobs.iter().filter(|b| b.is_some()).count(); + let blob_count = blobs.len(); if blob_count == 0 { return Ok(()); } diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index 887974c6e0b..d8596d92e76 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -23,8 +23,7 @@ use std::time::Duration; use store::KzgCommitment; use tokio::sync::mpsc; use types::beacon_block_body::format_kzg_commitments; -use types::blob_sidecar::FixedBlobSidecarList; -use types::{Epoch, Hash256}; +use types::{BlobSidecar, Epoch, Hash256}; /// Id associated to a batch processing request, either a sync batch or a parent lookup. #[derive(Clone, Debug, PartialEq)] @@ -201,7 +200,7 @@ impl NetworkBeaconProcessor { pub fn generate_rpc_blobs_process_fn( self: Arc, block_root: Hash256, - blobs: FixedBlobSidecarList, + blobs: Vec>>, seen_timestamp: Duration, process_type: BlockProcessType, ) -> AsyncFn { @@ -217,24 +216,17 @@ impl NetworkBeaconProcessor { pub async fn process_rpc_blobs( self: Arc>, block_root: Hash256, - blobs: FixedBlobSidecarList, + blobs: Vec>>, seen_timestamp: Duration, process_type: BlockProcessType, ) { - let Some(slot) = blobs - .iter() - .find_map(|blob| blob.as_ref().map(|blob| blob.slot())) - else { + let Some(slot) = blobs.iter().map(|blob| blob.slot()).next() else { return; }; let (indices, commitments): (Vec, Vec) = blobs .iter() - .filter_map(|blob_opt| { - blob_opt - .as_ref() - .map(|blob| (blob.index, blob.kzg_commitment)) - }) + .map(|blob| (blob.index, blob.kzg_commitment)) .unzip(); let commitments = format_kzg_commitments(&commitments); diff --git a/beacon_node/network/src/sync/block_lookups/common.rs b/beacon_node/network/src/sync/block_lookups/common.rs index 3bd39301b21..d1fceae71c8 100644 --- a/beacon_node/network/src/sync/block_lookups/common.rs +++ b/beacon_node/network/src/sync/block_lookups/common.rs @@ -14,8 +14,7 @@ use beacon_chain::data_availability_checker::ChildComponents; use beacon_chain::BeaconChainTypes; use std::sync::Arc; use std::time::Duration; -use types::blob_sidecar::FixedBlobSidecarList; -use types::{Hash256, SignedBeaconBlock}; +use types::{BlobSidecar, Hash256, SignedBeaconBlock}; #[derive(Debug, Copy, Clone)] pub enum ResponseType { @@ -266,8 +265,8 @@ impl RequestState for BlockRequestState impl RequestState for BlobRequestState { type RequestType = BlobsByRootSingleBlockRequest; - type VerifiedResponseType = FixedBlobSidecarList; - type ReconstructedResponseType = FixedBlobSidecarList; + type VerifiedResponseType = Vec>>; + type ReconstructedResponseType = Vec>>; fn new_request(&self) -> Self::RequestType { BlobsByRootSingleBlockRequest { @@ -286,25 +285,24 @@ impl RequestState for BlobRequestState) -> Option { + fn get_parent_root(verified_response: &Vec>>) -> Option { verified_response .into_iter() - .filter_map(|blob| blob.as_ref()) .map(|blob| blob.block_parent_root()) .next() } fn add_to_child_components( - verified_response: FixedBlobSidecarList, + verified_response: Vec>>, components: &mut ChildComponents, ) { - components.merge_blobs(verified_response); + components.merge_blobs(verified_response).unwrap(); } fn verified_to_reconstructed( _block_root: Hash256, - blobs: FixedBlobSidecarList, - ) -> FixedBlobSidecarList { + blobs: Vec>>, + ) -> Vec>> { blobs } @@ -312,7 +310,7 @@ impl RequestState for BlobRequestState, block_root: Hash256, - verified: FixedBlobSidecarList, + verified: Vec>>, duration: Duration, cx: &SyncNetworkContext, ) -> Result<(), LookupRequestError> { diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 25652c8074f..329ae4703ff 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -1,5 +1,6 @@ use self::single_block_lookup::SingleBlockLookup; use super::manager::BlockProcessingResult; +use super::network_context::{LookupFailure, LookupVerifyError}; use super::BatchProcessResult; use super::{manager::BlockProcessType, network_context::SyncNetworkContext}; use crate::metrics; @@ -20,7 +21,6 @@ pub use common::Lookup; pub use common::Parent; pub use common::RequestState; use fnv::FnvHashMap; -use lighthouse_network::rpc::RPCError; use lighthouse_network::{PeerAction, PeerId}; use lru_cache::LRUTimeCache; pub use single_block_lookup::{BlobRequestState, BlockRequestState}; @@ -30,8 +30,7 @@ use std::collections::{HashMap, VecDeque}; use std::sync::Arc; use std::time::Duration; use store::Hash256; -use types::blob_sidecar::FixedBlobSidecarList; -use types::Slot; +use types::{BlobSidecar, Slot}; pub mod common; mod parent_lookup; @@ -638,10 +637,13 @@ impl BlockLookups { id: SingleLookupReqId, peer_id: &PeerId, cx: &mut SyncNetworkContext, - error: RPCError, + error: LookupFailure, ) { - // Downscore peer even if lookup is not known - self.downscore_on_rpc_error(peer_id, &error, cx); + // Only downscore lookup verify errors. RPC errors are downscored in the network handler. + if let LookupFailure::LookupVerifyError(e) = &error { + // Downscore peer even if lookup is not known + self.downscore_on_rpc_error(peer_id, e, cx); + } let Some(mut parent_lookup) = self.get_parent_lookup::(id) else { debug!(self.log, @@ -671,14 +673,17 @@ impl BlockLookups { id: SingleLookupReqId, peer_id: &PeerId, cx: &mut SyncNetworkContext, - error: RPCError, + error: LookupFailure, ) { - // Downscore peer even if lookup is not known - self.downscore_on_rpc_error(peer_id, &error, cx); + // Only downscore lookup verify errors. RPC errors are downscored in the network handler. + if let LookupFailure::LookupVerifyError(e) = &error { + // Downscore peer even if lookup is not known + self.downscore_on_rpc_error(peer_id, e, cx); + } let log = self.log.clone(); let Some(mut lookup) = self.get_single_lookup::(id) else { - debug!(log, "Error response to dropped lookup"; "error" => ?error); + debug!(log, "Error response to dropped lookup"; "error" => %error); return; }; let block_root = lookup.block_root(); @@ -801,8 +806,17 @@ impl BlockLookups { BlockError::ParentUnknown(block) => { let slot = block.slot(); let parent_root = block.parent_root(); - lookup.add_child_components(block.into()); - Action::ParentUnknown { parent_root, slot } + + match ChildComponents::new_from_rpc_block(block) { + Ok(child) => { + lookup.add_child_components(child); + Action::ParentUnknown { parent_root, slot } + } + Err(e) => { + warn!(self.log, "Dropping parent lookup for invalid block"; "block_root" => %root, "error" => ?e); + Action::Drop + } + } } ref e @ BlockError::ExecutionPayloadError(ref epe) if !epe.penalize_peer() => { // These errors indicate that the execution layer is offline @@ -1252,7 +1266,7 @@ impl BlockLookups { fn send_blobs_for_processing( &self, block_root: Hash256, - blobs: FixedBlobSidecarList, + blobs: Vec>>, duration: Duration, process_type: BlockProcessType, cx: &SyncNetworkContext, @@ -1322,31 +1336,15 @@ impl BlockLookups { pub fn downscore_on_rpc_error( &self, peer_id: &PeerId, - error: &RPCError, + error: &LookupVerifyError, cx: &SyncNetworkContext, ) { // Note: logging the report event here with the full error display. The log inside // `report_peer` only includes a smaller string, like "invalid_data" - debug!(self.log, "reporting peer for sync lookup error"; "error" => %error); - if let Some(action) = match error { - // Protocol errors are heavily penalized - RPCError::SSZDecodeError(..) - | RPCError::IoError(..) - | RPCError::ErrorResponse(..) - | RPCError::InvalidData(..) - | RPCError::HandlerRejected => Some(PeerAction::LowToleranceError), - // Timing / network errors are less penalized - // TODO: Is IoError a protocol error or network error? - RPCError::StreamTimeout | RPCError::IncompleteStream | RPCError::NegotiationTimeout => { - Some(PeerAction::MidToleranceError) - } - // Not supporting a specific protocol is tolerated. TODO: Are you sure? - RPCError::UnsupportedProtocol => None, - // Our fault, don't penalize peer - RPCError::InternalError(..) | RPCError::Disconnected => None, - } { - cx.report_peer(*peer_id, action, error.into()); - } + let error_str: &'static str = error.into(); + + debug!(self.log, "reporting peer for sync lookup error"; "error" => error_str); + cx.report_peer(*peer_id, PeerAction::LowToleranceError, error_str); } pub fn update_metrics(&self) { diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 5bb663967d7..09b8ebafb07 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -167,7 +167,9 @@ impl SingleBlockLookup { if let Some(block) = downloaded_block { existing_components.merge_block(block); } - existing_components.merge_blobs(downloaded_blobs); + let downloaded_blobs = downloaded_blobs.into_iter().cloned().flatten().collect(); + + existing_components.merge_blobs_unchecked(downloaded_blobs); } else { self.child_components = Some(components); } diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index fc6ac28bdc7..8e3b35ee5d3 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -15,7 +15,7 @@ use beacon_chain::test_utils::{ build_log, generate_rand_block_and_blobs, BeaconChainHarness, EphemeralHarnessType, NumBlobs, }; use beacon_processor::WorkEvent; -use lighthouse_network::rpc::RPCResponseErrorCode; +use lighthouse_network::rpc::{RPCError, RPCResponseErrorCode}; use lighthouse_network::types::SyncState; use lighthouse_network::{NetworkGlobals, Request}; use slog::info; diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 23bd1010bfe..58daab5539d 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -57,12 +57,10 @@ use lighthouse_network::types::{NetworkGlobals, SyncState}; use lighthouse_network::SyncInfo; use lighthouse_network::{PeerAction, PeerId}; use slog::{crit, debug, error, info, trace, warn, Logger}; -use std::ops::IndexMut; use std::ops::Sub; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; -use types::blob_sidecar::FixedBlobSidecarList; use types::{BlobSidecar, EthSpec, Hash256, SignedBeaconBlock, Slot}; /// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync @@ -574,32 +572,42 @@ impl SyncManager { let block_slot = block.slot(); let parent_root = block.parent_root(); debug!(self.log, "Received unknown parent block message"; "block_root" => %block_root, "parent_root" => %parent_root); - self.handle_unknown_parent( - peer_id, - block_root, - parent_root, - block_slot, - block.into(), - ); + match ChildComponents::new_from_rpc_block(block) { + Ok(child) => self.handle_unknown_parent( + peer_id, + block_root, + parent_root, + block_slot, + child, + ), + Err(e) => { + warn!(self.log, "Peer sent invalid block"; "msg" => e, "peer_id" => %peer_id); + return; + } + } } SyncMessage::UnknownParentBlob(peer_id, blob) => { let blob_slot = blob.slot(); let block_root = blob.block_root(); let parent_root = blob.block_parent_root(); - let blob_index = blob.index; - if blob_index >= T::EthSpec::max_blobs_per_block() as u64 { - warn!(self.log, "Peer sent blob with invalid index"; "index" => blob_index, "peer_id" => %peer_id); - return; - } - let mut blobs = FixedBlobSidecarList::default(); - *blobs.index_mut(blob_index as usize) = Some(blob); + let child_components = match ChildComponents::new( + block_root, + None, + Some(vec![blob]), + ) { + Err(e) => { + warn!(self.log, "Peer sent invalid blob"; "msg" => e, "peer_id" => %peer_id); + return; + } + Ok(child_components) => child_components, + }; debug!(self.log, "Received unknown parent blob message"; "block_root" => %block_root, "parent_root" => %parent_root); self.handle_unknown_parent( peer_id, block_root, parent_root, blob_slot, - ChildComponents::new(block_root, None, Some(blobs)), + child_components, ); } SyncMessage::UnknownBlockHashFromAttestation(peer_id, block_root) => { diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index fdfb744e210..6abd36d3e53 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -17,12 +17,12 @@ use fnv::FnvHashMap; use lighthouse_network::rpc::methods::BlobsByRangeRequest; use lighthouse_network::rpc::{BlocksByRangeRequest, GoodbyeReason, RPCError}; use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource, Request}; +pub use requests::LookupVerifyError; use slog::{debug, trace, warn}; use std::collections::hash_map::Entry; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; -use types::blob_sidecar::FixedBlobSidecarList; use types::{BlobSidecar, EthSpec, SignedBeaconBlock}; mod requests; @@ -51,7 +51,33 @@ pub enum RpcEvent { RPCError(RPCError), } -pub type RpcProcessingResult = Option>; +pub type RpcProcessingResult = Option>; + +pub enum LookupFailure { + RpcError(RPCError), + LookupVerifyError(LookupVerifyError), +} + +impl std::fmt::Display for LookupFailure { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + LookupFailure::RpcError(e) => write!(f, "RPC Error: {:?}", e), + LookupFailure::LookupVerifyError(e) => write!(f, "Lookup Verify Error: {:?}", e), + } + } +} + +impl From for LookupFailure { + fn from(e: RPCError) -> Self { + LookupFailure::RpcError(e) + } +} + +impl From for LookupFailure { + fn from(e: LookupVerifyError) -> Self { + LookupFailure::LookupVerifyError(e) + } +} /// Wraps a Network channel to employ various RPC related network functionality for the Sync manager. This includes management of a global RPC request Id. pub struct SyncNetworkContext { @@ -445,17 +471,17 @@ impl SyncNetworkContext { // TODO: We could NOT drop the request here, and penalize the peer again if // sends multiple penalizable chunks after the first invalid. request.remove(); - Err(e) + Err(e.into()) } } } RpcEvent::StreamTermination => match request.remove().terminate() { Ok(_) => return None, - Err(e) => Err(e), + Err(e) => Err(e.into()), }, RpcEvent::RPCError(e) => { request.remove(); - Err(e) + Err(e.into()) } }) } @@ -464,50 +490,31 @@ impl SyncNetworkContext { &mut self, request_id: SingleLookupReqId, blob: RpcEvent>>, - ) -> RpcProcessingResult> { + ) -> RpcProcessingResult>>> { let Entry::Occupied(mut request) = self.blobs_by_root_requests.entry(request_id) else { return None; }; Some(match blob { RpcEvent::Response(blob, _) => match request.get_mut().add_response(blob) { - // TODO: Should deal only with Vec> - Ok(Some(blobs)) => to_fixed_blob_sidecar_list(blobs) - .map(|blobs| (blobs, timestamp_now())) - .map_err(RPCError::InvalidData), + Ok(Some(blobs)) => Ok((blobs, timestamp_now())), Ok(None) => return None, Err(e) => { request.remove(); - Err(e) + Err(e.into()) } }, RpcEvent::StreamTermination => { // Stream terminator match request.remove().terminate() { - // TODO: Should deal only with Vec> - Some(blobs) => to_fixed_blob_sidecar_list(blobs) - .map(|blobs| (blobs, timestamp_now())) - .map_err(RPCError::InvalidData), + Some(blobs) => Ok((blobs, timestamp_now())), None => return None, } } RpcEvent::RPCError(e) => { request.remove(); - Err(e) + Err(e.into()) } }) } } - -fn to_fixed_blob_sidecar_list( - blobs: Vec>>, -) -> Result, String> { - let mut fixed_list = FixedBlobSidecarList::default(); - for blob in blobs.into_iter() { - let index = blob.index as usize; - *fixed_list - .get_mut(index) - .ok_or("invalid index".to_string())? = Some(blob) - } - Ok(fixed_list) -} diff --git a/beacon_node/network/src/sync/network_context/requests.rs b/beacon_node/network/src/sync/network_context/requests.rs index 91876bf9c5d..0522b7fa384 100644 --- a/beacon_node/network/src/sync/network_context/requests.rs +++ b/beacon_node/network/src/sync/network_context/requests.rs @@ -1,10 +1,21 @@ use beacon_chain::get_block_root; -use lighthouse_network::rpc::{methods::BlobsByRootRequest, BlocksByRootRequest, RPCError}; +use lighthouse_network::rpc::{methods::BlobsByRootRequest, BlocksByRootRequest}; use std::sync::Arc; +use strum::IntoStaticStr; use types::{ blob_sidecar::BlobIdentifier, BlobSidecar, ChainSpec, EthSpec, Hash256, SignedBeaconBlock, }; +#[derive(Debug, PartialEq, Eq, IntoStaticStr)] +pub enum LookupVerifyError { + NoResponseReturned, + TooManyResponses, + UnrequestedBlockRoot(Hash256), + UnrequestedBlobIndex(u64), + InvalidInclusionProof, + DuplicateData, +} + pub struct ActiveBlocksByRootRequest { request: BlocksByRootSingleRequest, resolved: bool, @@ -24,16 +35,14 @@ impl ActiveBlocksByRootRequest { pub fn add_response( &mut self, block: Arc>, - ) -> Result>, RPCError> { + ) -> Result>, LookupVerifyError> { if self.resolved { - return Err(RPCError::InvalidData("too many responses".to_string())); + return Err(LookupVerifyError::TooManyResponses); } let block_root = get_block_root(&block); if self.request.0 != block_root { - return Err(RPCError::InvalidData(format!( - "un-requested block root {block_root:?}" - ))); + return Err(LookupVerifyError::UnrequestedBlockRoot(block_root)); } // Valid data, blocks by root expects a single response @@ -41,11 +50,11 @@ impl ActiveBlocksByRootRequest { Ok(block) } - pub fn terminate(self) -> Result<(), RPCError> { + pub fn terminate(self) -> Result<(), LookupVerifyError> { if self.resolved { Ok(()) } else { - Err(RPCError::InvalidData("no response returned".to_string())) + Err(LookupVerifyError::NoResponseReturned) } } } @@ -101,28 +110,23 @@ impl ActiveBlobsByRootRequest { pub fn add_response( &mut self, blob: Arc>, - ) -> Result>>>, RPCError> { + ) -> Result>>>, LookupVerifyError> { if self.resolved { - return Err(RPCError::InvalidData("too many responses".to_string())); + return Err(LookupVerifyError::TooManyResponses); } let block_root = blob.block_root(); if self.request.block_root != block_root { - return Err(RPCError::InvalidData(format!( - "un-requested block root {block_root:?}" - ))); + return Err(LookupVerifyError::UnrequestedBlockRoot(block_root)); } if !blob.verify_blob_sidecar_inclusion_proof().unwrap_or(false) { - return Err(RPCError::InvalidData("invalid inclusion proof".to_string())); + return Err(LookupVerifyError::InvalidInclusionProof); } if !self.request.indices.contains(&blob.index) { - return Err(RPCError::InvalidData(format!( - "un-requested blob index {}", - blob.index - ))); + return Err(LookupVerifyError::UnrequestedBlobIndex(blob.index)); } if self.blobs.iter().any(|b| b.index == blob.index) { - return Err(RPCError::InvalidData("duplicated data".to_string())); + return Err(LookupVerifyError::DuplicateData); } self.blobs.push(blob);