From 784ef5fb43e2630572640c519e2cbf7f6bb50e73 Mon Sep 17 00:00:00 2001 From: Lion - dapplion <35266934+dapplion@users.noreply.github.com> Date: Thu, 27 Jun 2024 20:21:40 +0200 Subject: [PATCH] Pass vec to range sync batch (#5710) * Pass vec to range sync batch --- .../network/src/sync/backfill_sync/mod.rs | 12 +--- beacon_node/network/src/sync/manager.rs | 59 ++++++++----------- .../network/src/sync/range_sync/batch.rs | 42 ++++--------- .../network/src/sync/range_sync/chain.rs | 10 +--- .../network/src/sync/range_sync/range.rs | 8 +-- 5 files changed, 46 insertions(+), 85 deletions(-) diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index ce7d04ac0ac..728642cc78b 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -369,7 +369,7 @@ impl BackFillSync { batch_id: BatchId, peer_id: &PeerId, request_id: Id, - beacon_block: Option>, + blocks: Vec>, ) -> Result { // check if we have this batch let batch = match self.batches.get_mut(&batch_id) { @@ -392,20 +392,14 @@ impl BackFillSync { } }; - if let Some(block) = beacon_block { - // This is not a stream termination, simply add the block to the request - if let Err(e) = batch.add_block(block) { - self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))?; - } - Ok(ProcessResult::Successful) - } else { + { // A stream termination has been sent. This batch has ended. Process a completed batch. // Remove the request from the peer's active batches self.active_requests .get_mut(peer_id) .map(|active_requests| active_requests.remove(&batch_id)); - match batch.download_completed() { + match batch.download_completed(blocks) { Ok(received) => { let awaiting_batches = self.processing_target.saturating_sub(batch_id) / BACKFILL_EPOCHS_PER_BATCH; diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 4c1a1e6b67a..fc1a218d82a 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -908,39 +908,32 @@ impl SyncManager { { match resp.responses { Ok(blocks) => { - for block in blocks - .into_iter() - .map(Some) - // chain the stream terminator - .chain(vec![None]) - { - match resp.sender_id { - RangeRequestId::RangeSync { chain_id, batch_id } => { - self.range_sync.blocks_by_range_response( - &mut self.network, - peer_id, - chain_id, - batch_id, - id, - block, - ); - self.update_sync_state(); - } - RangeRequestId::BackfillSync { batch_id } => { - match self.backfill_sync.on_block_response( - &mut self.network, - batch_id, - &peer_id, - id, - block, - ) { - Ok(ProcessResult::SyncCompleted) => self.update_sync_state(), - Ok(ProcessResult::Successful) => {} - Err(_error) => { - // The backfill sync has failed, errors are reported - // within. - self.update_sync_state(); - } + match resp.sender_id { + RangeRequestId::RangeSync { chain_id, batch_id } => { + self.range_sync.blocks_by_range_response( + &mut self.network, + peer_id, + chain_id, + batch_id, + id, + blocks, + ); + self.update_sync_state(); + } + RangeRequestId::BackfillSync { batch_id } => { + match self.backfill_sync.on_block_response( + &mut self.network, + batch_id, + &peer_id, + id, + blocks, + ) { + Ok(ProcessResult::SyncCompleted) => self.update_sync_state(), + Ok(ProcessResult::Successful) => {} + Err(_error) => { + // The backfill sync has failed, errors are reported + // within. + self.update_sync_state(); } } } diff --git a/beacon_node/network/src/sync/range_sync/batch.rs b/beacon_node/network/src/sync/range_sync/batch.rs index 75cb49d176d..baba8c9a620 100644 --- a/beacon_node/network/src/sync/range_sync/batch.rs +++ b/beacon_node/network/src/sync/range_sync/batch.rs @@ -116,7 +116,7 @@ pub enum BatchState { /// The batch has failed either downloading or processing, but can be requested again. AwaitingDownload, /// The batch is being downloaded. - Downloading(PeerId, Vec>, Id), + Downloading(PeerId, Id), /// The batch has been completely downloaded and is ready for processing. AwaitingProcessing(PeerId, Vec>), /// The batch is being processed. @@ -199,7 +199,7 @@ impl BatchInfo { /// Verifies if an incoming block belongs to this batch. pub fn is_expecting_block(&self, peer_id: &PeerId, request_id: &Id) -> bool { - if let BatchState::Downloading(expected_peer, _, expected_id) = &self.state { + if let BatchState::Downloading(expected_peer, expected_id) = &self.state { return peer_id == expected_peer && expected_id == request_id; } false @@ -209,7 +209,7 @@ impl BatchInfo { pub fn current_peer(&self) -> Option<&PeerId> { match &self.state { BatchState::AwaitingDownload | BatchState::Failed => None, - BatchState::Downloading(peer_id, _, _) + BatchState::Downloading(peer_id, _) | BatchState::AwaitingProcessing(peer_id, _) | BatchState::Processing(Attempt { peer_id, .. }) | BatchState::AwaitingValidation(Attempt { peer_id, .. }) => Some(peer_id), @@ -250,36 +250,18 @@ impl BatchInfo { &self.failed_processing_attempts } - /// Adds a block to a downloading batch. - pub fn add_block(&mut self, block: RpcBlock) -> Result<(), WrongState> { - match self.state.poison() { - BatchState::Downloading(peer, mut blocks, req_id) => { - blocks.push(block); - self.state = BatchState::Downloading(peer, blocks, req_id); - Ok(()) - } - BatchState::Poisoned => unreachable!("Poisoned batch"), - other => { - self.state = other; - Err(WrongState(format!( - "Add block for batch in wrong state {:?}", - self.state - ))) - } - } - } - /// Marks the batch as ready to be processed if the blocks are in the range. The number of /// received blocks is returned, or the wrong batch end on failure #[must_use = "Batch may have failed"] pub fn download_completed( &mut self, + blocks: Vec>, ) -> Result< usize, /* Received blocks */ Result<(Slot, Slot, BatchOperationOutcome), WrongState>, > { match self.state.poison() { - BatchState::Downloading(peer, blocks, _request_id) => { + BatchState::Downloading(peer, _request_id) => { // verify that blocks are in range if let Some(last_slot) = blocks.last().map(|b| b.slot()) { // the batch is non-empty @@ -336,7 +318,7 @@ impl BatchInfo { mark_failed: bool, ) -> Result { match self.state.poison() { - BatchState::Downloading(peer, _, _request_id) => { + BatchState::Downloading(peer, _request_id) => { // register the attempt and check if the batch can be tried again if mark_failed { self.failed_download_attempts.push(peer); @@ -369,7 +351,7 @@ impl BatchInfo { ) -> Result<(), WrongState> { match self.state.poison() { BatchState::AwaitingDownload => { - self.state = BatchState::Downloading(peer, Vec::new(), request_id); + self.state = BatchState::Downloading(peer, request_id); Ok(()) } BatchState::Poisoned => unreachable!("Poisoned batch"), @@ -536,13 +518,9 @@ impl std::fmt::Debug for BatchState { BatchState::AwaitingProcessing(ref peer, ref blocks) => { write!(f, "AwaitingProcessing({}, {} blocks)", peer, blocks.len()) } - BatchState::Downloading(peer, blocks, request_id) => write!( - f, - "Downloading({}, {} blocks, {})", - peer, - blocks.len(), - request_id - ), + BatchState::Downloading(peer, request_id) => { + write!(f, "Downloading({}, {})", peer, request_id) + } BatchState::Poisoned => f.write_str("Poisoned"), } } diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 63cafa9aca4..13c9f4be3bf 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -200,7 +200,7 @@ impl SyncingChain { batch_id: BatchId, peer_id: &PeerId, request_id: Id, - beacon_block: Option>, + blocks: Vec>, ) -> ProcessingResult { // check if we have this batch let batch = match self.batches.get_mut(&batch_id) { @@ -221,18 +221,14 @@ impl SyncingChain { } }; - if let Some(block) = beacon_block { - // This is not a stream termination, simply add the block to the request - batch.add_block(block)?; - Ok(KeepChain) - } else { + { // A stream termination has been sent. This batch has ended. Process a completed batch. // Remove the request from the peer's active batches self.peers .get_mut(peer_id) .map(|active_requests| active_requests.remove(&batch_id)); - match batch.download_completed() { + match batch.download_completed(blocks) { Ok(received) => { let awaiting_batches = batch_id .saturating_sub(self.optimistic_start.unwrap_or(self.processing_target)) diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index fe48db35b45..5393b8792c0 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -210,11 +210,11 @@ where chain_id: ChainId, batch_id: BatchId, request_id: Id, - beacon_block: Option>, + blocks: Vec>, ) { // check if this chunk removes the chain match self.chains.call_by_id(chain_id, |chain| { - chain.on_block_response(network, batch_id, &peer_id, request_id, beacon_block) + chain.on_block_response(network, batch_id, &peer_id, request_id, blocks) }) { Ok((removed_chain, sync_type)) => { if let Some((removed_chain, remove_reason)) = removed_chain { @@ -795,7 +795,7 @@ mod tests { rig.cx.update_execution_engine_state(EngineState::Offline); // send the response to the request - range.blocks_by_range_response(&mut rig.cx, peer1, chain1, batch1, id1, None); + range.blocks_by_range_response(&mut rig.cx, peer1, chain1, batch1, id1, vec![]); // the beacon processor shouldn't have received any work rig.expect_empty_processor(); @@ -809,7 +809,7 @@ mod tests { rig.complete_range_block_and_blobs_response(block_req, blob_req_opt); // send the response to the request - range.blocks_by_range_response(&mut rig.cx, peer2, chain2, batch2, id2, None); + range.blocks_by_range_response(&mut rig.cx, peer2, chain2, batch2, id2, vec![]); // the beacon processor shouldn't have received any work rig.expect_empty_processor();