From bf4cbd3b0a11fdb810a9e0bc1c3c3ed0f8873689 Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Wed, 26 Jun 2024 16:53:53 -0700 Subject: [PATCH 1/3] Remove all batches related to a peer on disconnect (#5969) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Remove all batches related to a peer on disconnect * Cleanup map entries after disconnect * Allow lookups to continue in case of disconnections * Pretty response types * fmt * Fix lints * Remove lookup if it cannot progress * Fix tests * Remove poll_close on rpc behaviour * Remove redundant test * Fix issue raised by lion * Revert pretty response types * Cleanup * Fix test * Merge remote-tracking branch 'origin/release-v5.2.1' into rpc-error-on-disconnect-revert * Apply suggestions from joao Co-authored-by: João Oliveira * Fix log * update request status on no peers found * Do not remove lookup after peer disconnection * Add comments about expected event api * Update single_block_lookup.rs * Update mod.rs * Merge branch 'rpc-error-on-disconnect-revert' into 5969-review * Merge pull request #10 from dapplion/5969-review Add comments about expected event api --- .../lighthouse_network/src/rpc/handler.rs | 31 ------ .../lighthouse_network/tests/rpc_tests.rs | 94 +------------------ .../network/src/sync/backfill_sync/mod.rs | 38 +++++++- .../network/src/sync/block_lookups/mod.rs | 42 +++++---- .../sync/block_lookups/single_block_lookup.rs | 38 ++++++-- .../network/src/sync/block_lookups/tests.rs | 48 ++++++---- .../src/sync/block_sidecar_coupling.rs | 14 ++- beacon_node/network/src/sync/manager.rs | 27 +++++- .../network/src/sync/network_context.rs | 76 ++++++++++++++- .../src/sync/network_context/requests.rs | 13 ++- .../network/src/sync/range_sync/chain.rs | 26 ++++- .../network/src/sync/range_sync/range.rs | 5 +- 12 files changed, 270 insertions(+), 182 deletions(-) diff --git a/beacon_node/lighthouse_network/src/rpc/handler.rs b/beacon_node/lighthouse_network/src/rpc/handler.rs index b7166efc376..6f338ebc8be 100644 --- a/beacon_node/lighthouse_network/src/rpc/handler.rs +++ b/beacon_node/lighthouse_network/src/rpc/handler.rs @@ -352,37 +352,6 @@ where !matches!(self.state, HandlerState::Deactivated) } - // NOTE: This function gets polled to completion upon a connection close. - fn poll_close(&mut self, _: &mut Context<'_>) -> Poll> { - // Inform the network behaviour of any failed requests - - while let Some(substream_id) = self.outbound_substreams.keys().next().cloned() { - let outbound_info = self - .outbound_substreams - .remove(&substream_id) - .expect("The value must exist for a key"); - // If the state of the connection is closing, we do not need to report this case to - // the behaviour, as the connection has just closed non-gracefully - if matches!(outbound_info.state, OutboundSubstreamState::Closing(_)) { - continue; - } - - // Register this request as an RPC Error - return Poll::Ready(Some(HandlerEvent::Err(HandlerErr::Outbound { - error: RPCError::Disconnected, - proto: outbound_info.proto, - id: outbound_info.req_id, - }))); - } - - // Also handle any events that are awaiting to be sent to the behaviour - if !self.events_out.is_empty() { - return Poll::Ready(Some(self.events_out.remove(0))); - } - - Poll::Ready(None) - } - fn poll( &mut self, cx: &mut Context<'_>, diff --git a/beacon_node/lighthouse_network/tests/rpc_tests.rs b/beacon_node/lighthouse_network/tests/rpc_tests.rs index 8d29f5158bc..527b853dc30 100644 --- a/beacon_node/lighthouse_network/tests/rpc_tests.rs +++ b/beacon_node/lighthouse_network/tests/rpc_tests.rs @@ -3,7 +3,7 @@ mod common; use common::Protocol; -use lighthouse_network::rpc::{methods::*, RPCError}; +use lighthouse_network::rpc::methods::*; use lighthouse_network::{rpc::max_rpc_size, NetworkEvent, ReportSource, Request, Response}; use slog::{debug, warn, Level}; use ssz::Encode; @@ -1012,98 +1012,6 @@ fn test_tcp_blocks_by_root_chunked_rpc_terminates_correctly() { }) } -#[test] -fn test_disconnect_triggers_rpc_error() { - // set up the logging. The level and enabled logging or not - let log_level = Level::Debug; - let enable_logging = false; - - let log = common::build_log(log_level, enable_logging); - let spec = E::default_spec(); - - let rt = Arc::new(Runtime::new().unwrap()); - // get sender/receiver - rt.block_on(async { - let (mut sender, mut receiver) = common::build_node_pair( - Arc::downgrade(&rt), - &log, - ForkName::Base, - &spec, - Protocol::Tcp, - ) - .await; - - // BlocksByRoot Request - let rpc_request = Request::BlocksByRoot(BlocksByRootRequest::new( - // Must have at least one root for the request to create a stream - vec![Hash256::from_low_u64_be(0)], - &spec, - )); - - // build the sender future - let sender_future = async { - loop { - match sender.next_event().await { - NetworkEvent::PeerConnectedOutgoing(peer_id) => { - // Send a STATUS message - debug!(log, "Sending RPC"); - sender - .send_request(peer_id, 42, rpc_request.clone()) - .unwrap(); - } - NetworkEvent::RPCFailed { error, id: 42, .. } => match error { - RPCError::Disconnected => return, - other => panic!("received unexpected error {:?}", other), - }, - other => { - warn!(log, "Ignoring other event {:?}", other); - } - } - } - }; - - // determine messages to send (PeerId, RequestId). If some, indicates we still need to send - // messages - let mut sending_peer = None; - let receiver_future = async { - loop { - // this future either drives the sending/receiving or times out allowing messages to be - // sent in the timeout - match futures::future::select( - Box::pin(receiver.next_event()), - Box::pin(tokio::time::sleep(Duration::from_secs(1))), - ) - .await - { - futures::future::Either::Left((ev, _)) => match ev { - NetworkEvent::RequestReceived { peer_id, .. } => { - sending_peer = Some(peer_id); - } - other => { - warn!(log, "Ignoring other event {:?}", other); - } - }, - futures::future::Either::Right((_, _)) => {} // The timeout hit, send messages if required - } - - // if we need to send messages send them here. This will happen after a delay - if let Some(peer_id) = sending_peer.take() { - warn!(log, "Receiver got request, disconnecting peer"); - receiver.__hard_disconnect_testing_only(peer_id); - } - } - }; - - tokio::select! { - _ = sender_future => {} - _ = receiver_future => {} - _ = sleep(Duration::from_secs(30)) => { - panic!("Future timed out"); - } - } - }) -} - /// Establishes a pair of nodes and disconnects the pair based on the selected protocol via an RPC /// Goodbye message. fn goodbye_test(log_level: Level, enable_logging: bool, protocol: Protocol) { diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index ce7d04ac0ac..5431e1bcdc1 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -307,7 +307,11 @@ impl BackFillSync { /// A peer has disconnected. /// If the peer has active batches, those are considered failed and re-requested. #[must_use = "A failure here indicates the backfill sync has failed and the global sync state should be updated"] - pub fn peer_disconnected(&mut self, peer_id: &PeerId) -> Result<(), BackFillError> { + pub fn peer_disconnected( + &mut self, + peer_id: &PeerId, + network: &mut SyncNetworkContext, + ) -> Result<(), BackFillError> { if matches!( self.state(), BackFillState::Failed | BackFillState::NotRequired @@ -315,7 +319,37 @@ impl BackFillSync { return Ok(()); } - self.active_requests.remove(peer_id); + if let Some(batch_ids) = self.active_requests.remove(peer_id) { + // fail the batches. + for id in batch_ids { + if let Some(batch) = self.batches.get_mut(&id) { + match batch.download_failed(false) { + Ok(BatchOperationOutcome::Failed { blacklist: _ }) => { + self.fail_sync(BackFillError::BatchDownloadFailed(id))?; + } + Ok(BatchOperationOutcome::Continue) => {} + Err(e) => { + self.fail_sync(BackFillError::BatchInvalidState(id, e.0))?; + } + } + // If we have run out of peers in which to retry this batch, the backfill state + // transitions to a paused state. + // We still need to reset the state for all the affected batches, so we should not + // short circuit early. + if self.retry_batch_download(network, id).is_err() { + debug!( + self.log, + "Batch could not be retried"; + "batch_id" => id, + "error" => "no synced peers" + ); + } + } else { + debug!(self.log, "Batch not found while removing peer"; + "peer" => %peer_id, "batch" => id) + } + } + } // Remove the peer from the participation list self.participating_peers.remove(peer_id); diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index f685b7e59db..0148a6548dd 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -1,3 +1,25 @@ +//! Implements block lookup sync. +//! +//! Block lookup sync is triggered when a peer claims to have imported a block we don't know about. +//! For example, a peer attesting to a head block root that is not in our fork-choice. Lookup sync +//! is recursive in nature, as we may discover that this attested head block root has a parent that +//! is also unknown to us. +//! +//! Block lookup is implemented as an event-driven state machine. It sends events to the network and +//! beacon processor, and expects some set of events back. A discrepancy in the expected event API +//! will result in lookups getting "stuck". A lookup becomes stuck when there is no future event +//! that will trigger the lookup to make progress. There's a fallback mechanism that drops lookups +//! that live for too long, logging the line "Notify the devs a sync lookup is stuck". +//! +//! The expected event API is documented in the code paths that are making assumptions with the +//! comment prefix "Lookup sync event safety:" +//! +//! Block lookup sync attempts to not re-download or re-process data that we already have. Block +//! components are cached temporarily in multiple places before they are imported into fork-choice. +//! Therefore, block lookup sync must peek these caches correctly to decide when to skip a download +//! or consider a lookup complete. These caches are read from the `SyncNetworkContext` and its state +//! returned to this module as `LookupRequestResult` variants. + use self::parent_chain::{compute_parent_chains, NodeChain}; pub use self::single_block_lookup::DownloadResult; use self::single_block_lookup::{LookupRequestError, LookupResult, SingleBlockLookup}; @@ -410,21 +432,9 @@ impl BlockLookups { /* Error responses */ pub fn peer_disconnected(&mut self, peer_id: &PeerId) { - self.single_block_lookups.retain(|_, lookup| { + for (_, lookup) in self.single_block_lookups.iter_mut() { lookup.remove_peer(peer_id); - - // Note: this condition should be removed in the future. It's not strictly necessary to drop a - // lookup if there are no peers left. Lookup should only be dropped if it can not make progress - if lookup.has_no_peers() { - debug!(self.log, - "Dropping single lookup after peer disconnection"; - "block_root" => ?lookup.block_root() - ); - false - } else { - true - } - }); + } } /* Processing responses */ @@ -787,12 +797,12 @@ impl BlockLookups { }; if stuck_lookup.id == ancestor_stuck_lookup.id { - warn!(self.log, "Notify the devs, a sync lookup is stuck"; + warn!(self.log, "Notify the devs a sync lookup is stuck"; "block_root" => ?stuck_lookup.block_root(), "lookup" => ?stuck_lookup, ); } else { - warn!(self.log, "Notify the devs, a sync lookup is stuck"; + warn!(self.log, "Notify the devs a sync lookup is stuck"; "block_root" => ?stuck_lookup.block_root(), "lookup" => ?stuck_lookup, "ancestor_block_root" => ?ancestor_stuck_lookup.block_root(), diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 13efd36ab7e..e17991286af 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -197,21 +197,36 @@ impl SingleBlockLookup { } let Some(peer_id) = self.use_rand_available_peer() else { - // Allow lookup to not have any peers. In that case do nothing. If the lookup does - // not have peers for some time, it will be dropped. + // Allow lookup to not have any peers and do nothing. This is an optimization to not + // lose progress of lookups created from a block with unknown parent before we receive + // attestations for said block. + // Lookup sync event safety: If a lookup requires peers to make progress, and does + // not receive any new peers for some time it will be dropped. If it receives a new + // peer it must attempt to make progress. + R::request_state_mut(self) + .get_state_mut() + .update_awaiting_download_status("no peers"); return Ok(()); }; let request = R::request_state_mut(self); match request.make_request(id, peer_id, downloaded_block_expected_blobs, cx)? { LookupRequestResult::RequestSent(req_id) => { + // Lookup sync event safety: If make_request returns `RequestSent`, we are + // guaranteed that `BlockLookups::on_download_response` will be called exactly + // with this `req_id`. request.get_state_mut().on_download_start(req_id)? } LookupRequestResult::NoRequestNeeded => { + // Lookup sync event safety: Advances this request to the terminal `Processed` + // state. If all requests reach this state, the request is marked as completed + // in `Self::continue_requests`. request.get_state_mut().on_completed_request()? } // Sync will receive a future event to make progress on the request, do nothing now LookupRequestResult::Pending(reason) => { + // Lookup sync event safety: Refer to the code paths constructing + // `LookupRequestResult::Pending` request .get_state_mut() .update_awaiting_download_status(reason); @@ -222,16 +237,28 @@ impl SingleBlockLookup { // Otherwise, attempt to progress awaiting processing // If this request is awaiting a parent lookup to be processed, do not send for processing. // The request will be rejected with unknown parent error. + // + // TODO: The condition `block_is_processed || Block` can be dropped after checking for + // unknown parent root when import RPC blobs } else if !awaiting_parent && (block_is_processed || matches!(R::response_type(), ResponseType::Block)) { // maybe_start_processing returns Some if state == AwaitingProcess. This pattern is // useful to conditionally access the result data. if let Some(result) = request.get_state_mut().maybe_start_processing() { + // Lookup sync event safety: If `send_for_processing` returns Ok() we are guaranteed + // that `BlockLookups::on_processing_result` will be called exactly once with this + // lookup_id return R::send_for_processing(id, result, cx); } + // Lookup sync event safety: If the request is not in `AwaitingDownload` or + // `AwaitingProcessing` state it is guaranteed to receive some event to make progress. } + // Lookup sync event safety: If a lookup is awaiting a parent we are guaranteed to either: + // (1) attempt to make progress with `BlockLookups::continue_child_lookups` if the parent + // lookup completes, or (2) get dropped if the parent fails and is dropped. + Ok(()) } @@ -246,10 +273,9 @@ impl SingleBlockLookup { self.peers.insert(peer_id) } - /// Remove peer from available peers. Return true if there are no more available peers and all - /// requests are not expecting any future event (AwaitingDownload). - pub fn remove_peer(&mut self, peer_id: &PeerId) -> bool { - self.peers.remove(peer_id) + /// Remove peer from available peers. + pub fn remove_peer(&mut self, peer_id: &PeerId) { + self.peers.remove(peer_id); } /// Returns true if this lookup has zero peers diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index a607151bde9..02b07fa43e1 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -290,6 +290,7 @@ impl TestRig { .0 } + #[track_caller] fn expect_no_active_single_lookups(&self) { assert!( self.active_single_lookups().is_empty(), @@ -298,6 +299,7 @@ impl TestRig { ); } + #[track_caller] fn expect_no_active_lookups(&self) { self.expect_no_active_single_lookups(); } @@ -539,10 +541,6 @@ impl TestRig { }) } - fn peer_disconnected(&mut self, disconnected_peer_id: PeerId) { - self.send_sync_message(SyncMessage::Disconnect(disconnected_peer_id)); - } - /// Return RPCErrors for all active requests of peer fn rpc_error_all_active_requests(&mut self, disconnected_peer_id: PeerId) { self.drain_network_rx(); @@ -562,6 +560,10 @@ impl TestRig { } } + fn peer_disconnected(&mut self, peer_id: PeerId) { + self.send_sync_message(SyncMessage::Disconnect(peer_id)); + } + fn drain_network_rx(&mut self) { while let Ok(event) = self.network_rx.try_recv() { self.network_rx_queue.push(event); @@ -1026,6 +1028,28 @@ fn test_single_block_lookup_failure() { rig.expect_empty_network(); } +#[test] +fn test_single_block_lookup_peer_disconnected_then_rpc_error() { + let mut rig = TestRig::test_setup(); + + let block_hash = Hash256::random(); + let peer_id = rig.new_connected_peer(); + + // Trigger the request. + rig.trigger_unknown_block_from_attestation(block_hash, peer_id); + let id = rig.expect_block_lookup_request(block_hash); + + // The peer disconnect event reaches sync before the rpc error. + rig.peer_disconnected(peer_id); + // The lookup is not removed as it can still potentially make progress. + rig.assert_single_lookups_count(1); + // The request fails. + rig.single_lookup_failed(id, peer_id, RPCError::Disconnected); + rig.expect_block_lookup_request(block_hash); + // The request should be removed from the network context on disconnection. + rig.expect_empty_network(); +} + #[test] fn test_single_block_lookup_becomes_parent_request() { let mut rig = TestRig::test_setup(); @@ -1289,19 +1313,9 @@ fn test_lookup_peer_disconnected_no_peers_left_while_request() { rig.trigger_unknown_parent_block(peer_id, trigger_block.into()); rig.peer_disconnected(peer_id); rig.rpc_error_all_active_requests(peer_id); - rig.expect_no_active_lookups(); -} - -#[test] -fn test_lookup_peer_disconnected_no_peers_left_not_while_request() { - let mut rig = TestRig::test_setup(); - let peer_id = rig.new_connected_peer(); - let trigger_block = rig.rand_block(); - rig.trigger_unknown_parent_block(peer_id, trigger_block.into()); - rig.peer_disconnected(peer_id); - // Note: this test case may be removed in the future. It's not strictly necessary to drop a - // lookup if there are no peers left. Lookup should only be dropped if it can not make progress - rig.expect_no_active_lookups(); + // Erroring all rpc requests and disconnecting the peer shouldn't remove the requests + // from the lookups map as they can still progress. + rig.assert_single_lookups_count(2); } #[test] diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index d159733cbc7..f31f2921ea2 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -1,4 +1,5 @@ use beacon_chain::block_verification_types::RpcBlock; +use lighthouse_network::PeerId; use ssz_types::VariableList; use std::{collections::VecDeque, sync::Arc}; use types::{BlobSidecar, EthSpec, SignedBeaconBlock}; @@ -17,16 +18,19 @@ pub struct BlocksAndBlobsRequestInfo { is_sidecars_stream_terminated: bool, /// Used to determine if this accumulator should wait for a sidecars stream termination request_type: ByRangeRequestType, + /// The peer the request was made to. + pub(crate) peer_id: PeerId, } impl BlocksAndBlobsRequestInfo { - pub fn new(request_type: ByRangeRequestType) -> Self { + pub fn new(request_type: ByRangeRequestType, peer_id: PeerId) -> Self { Self { accumulated_blocks: <_>::default(), accumulated_sidecars: <_>::default(), is_blocks_stream_terminated: <_>::default(), is_sidecars_stream_terminated: <_>::default(), request_type, + peer_id, } } @@ -109,12 +113,14 @@ mod tests { use super::BlocksAndBlobsRequestInfo; use crate::sync::range_sync::ByRangeRequestType; use beacon_chain::test_utils::{generate_rand_block_and_blobs, NumBlobs}; + use lighthouse_network::PeerId; use rand::SeedableRng; use types::{test_utils::XorShiftRng, ForkName, MinimalEthSpec as E}; #[test] fn no_blobs_into_responses() { - let mut info = BlocksAndBlobsRequestInfo::::new(ByRangeRequestType::Blocks); + let peer_id = PeerId::random(); + let mut info = BlocksAndBlobsRequestInfo::::new(ByRangeRequestType::Blocks, peer_id); let mut rng = XorShiftRng::from_seed([42; 16]); let blocks = (0..4) .map(|_| generate_rand_block_and_blobs::(ForkName::Base, NumBlobs::None, &mut rng).0) @@ -133,7 +139,9 @@ mod tests { #[test] fn empty_blobs_into_responses() { - let mut info = BlocksAndBlobsRequestInfo::::new(ByRangeRequestType::BlocksAndBlobs); + let peer_id = PeerId::random(); + let mut info = + BlocksAndBlobsRequestInfo::::new(ByRangeRequestType::BlocksAndBlobs, peer_id); let mut rng = XorShiftRng::from_seed([42; 16]); let blocks = (0..4) .map(|_| { diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 4c1a1e6b67a..0f8cab18c92 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -372,16 +372,39 @@ impl SyncManager { Err(_) => self.update_sync_state(), }, } + } else { + debug!( + self.log, + "RPC error for range request has no associated entry in network context, ungraceful disconnect"; + "peer_id" => %peer_id, + "request_id" => %id, + "error" => ?error, + ); } } } } + /// Handles a peer disconnect. + /// + /// It is important that a peer disconnect retries all the batches/lookups as + /// there is no way to guarantee that libp2p always emits a error along with + /// the disconnect. fn peer_disconnect(&mut self, peer_id: &PeerId) { + // Inject a Disconnected error on all requests associated with the disconnected peer + // to retry all batches/lookups + for request_id in self.network.peer_disconnected(peer_id) { + self.inject_error(*peer_id, request_id, RPCError::Disconnected); + } + + // Remove peer from all data structures self.range_sync.peer_disconnect(&mut self.network, peer_id); + let _ = self + .backfill_sync + .peer_disconnected(peer_id, &mut self.network); self.block_lookups.peer_disconnected(peer_id); + // Regardless of the outcome, we update the sync status. - let _ = self.backfill_sync.peer_disconnected(peer_id); self.update_sync_state(); } @@ -951,7 +974,7 @@ impl SyncManager { self.network.insert_range_blocks_and_blobs_request( id, resp.sender_id, - BlocksAndBlobsRequestInfo::new(resp.request_type), + BlocksAndBlobsRequestInfo::new(resp.request_type, peer_id), ); // inform range that the request needs to be treated as failed // With time we will want to downgrade this log diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index f3f82ee011f..6f89b954b3a 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -177,6 +177,46 @@ impl SyncNetworkContext { } } + /// Returns the ids of all the requests made to the given peer_id. + pub fn peer_disconnected(&mut self, peer_id: &PeerId) -> Vec { + let failed_range_ids = + self.range_blocks_and_blobs_requests + .iter() + .filter_map(|(id, request)| { + if request.1.peer_id == *peer_id { + Some(SyncRequestId::RangeBlockAndBlobs { id: *id }) + } else { + None + } + }); + + let failed_block_ids = self + .blocks_by_root_requests + .iter() + .filter_map(|(id, request)| { + if request.peer_id == *peer_id { + Some(SyncRequestId::SingleBlock { id: *id }) + } else { + None + } + }); + let failed_blob_ids = self + .blobs_by_root_requests + .iter() + .filter_map(|(id, request)| { + if request.peer_id == *peer_id { + Some(SyncRequestId::SingleBlob { id: *id }) + } else { + None + } + }); + + failed_range_ids + .chain(failed_block_ids) + .chain(failed_blob_ids) + .collect() + } + pub fn network_globals(&self) -> &NetworkGlobals { &self.network_beacon_processor.network_globals } @@ -272,8 +312,13 @@ impl SyncNetworkContext { sender_id: RangeRequestId, ) -> Result { let id = self.blocks_by_range_request(peer_id, batch_type, request)?; - self.range_blocks_and_blobs_requests - .insert(id, (sender_id, BlocksAndBlobsRequestInfo::new(batch_type))); + self.range_blocks_and_blobs_requests.insert( + id, + ( + sender_id, + BlocksAndBlobsRequestInfo::new(batch_type, peer_id), + ), + ); Ok(id) } @@ -343,7 +388,10 @@ impl SyncNetworkContext { // Block is known are currently processing, expect a future event with the result of // processing. BlockProcessStatus::NotValidated { .. } => { - return Ok(LookupRequestResult::Pending("block in processing cache")) + // Lookup sync event safety: If the block is currently in the processing cache, we + // are guaranteed to receive a `SyncMessage::GossipBlockProcessResult` that will + // make progress on this lookup + return Ok(LookupRequestResult::Pending("block in processing cache")); } // Block is fully validated. If it's not yet imported it's waiting for missing block // components. Consider this request completed and do nothing. @@ -366,6 +414,12 @@ impl SyncNetworkContext { let request = BlocksByRootSingleRequest(block_root); + // Lookup sync event safety: If network_send.send() returns Ok(_) we are guaranteed that + // eventually at least one this 3 events will be received: + // - StreamTermination(request_id): handled by `Self::on_single_block_response` + // - RPCError(request_id): handled by `Self::on_single_block_response` + // - Disconnect(peer_id) handled by `Self::peer_disconnected``which converts it to a + // ` RPCError(request_id)`event handled by the above method self.network_send .send(NetworkMessage::SendRequest { peer_id, @@ -375,7 +429,7 @@ impl SyncNetworkContext { .map_err(|_| RpcRequestSendError::NetworkSendError)?; self.blocks_by_root_requests - .insert(id, ActiveBlocksByRootRequest::new(request)); + .insert(id, ActiveBlocksByRootRequest::new(request, peer_id)); Ok(LookupRequestResult::RequestSent(req_id)) } @@ -408,6 +462,13 @@ impl SyncNetworkContext { // latter handle the case where if the peer sent no blobs, penalize. // - if `downloaded_block_expected_blobs` is Some = block is downloading or processing. // - if `num_expected_blobs` returns Some = block is processed. + // + // Lookup sync event safety: Reaching this code means that a block is not in any pre-import + // cache nor in the request state of this lookup. Therefore, the block must either: (1) not + // be downloaded yet or (2) the block is already imported into the fork-choice. + // In case (1) the lookup must either successfully download the block or get dropped. + // In case (2) the block will be downloaded, processed, reach `BlockIsAlreadyKnown` and + // get dropped as completed. return Ok(LookupRequestResult::Pending("waiting for block download")); }; @@ -444,6 +505,7 @@ impl SyncNetworkContext { indices, }; + // Lookup sync event safety: Refer to `Self::block_lookup_request` `network_send.send` call self.network_send .send(NetworkMessage::SendRequest { peer_id, @@ -453,7 +515,7 @@ impl SyncNetworkContext { .map_err(|_| RpcRequestSendError::NetworkSendError)?; self.blobs_by_root_requests - .insert(id, ActiveBlobsByRootRequest::new(request)); + .insert(id, ActiveBlobsByRootRequest::new(request, peer_id)); Ok(LookupRequestResult::RequestSent(req_id)) } @@ -660,6 +722,8 @@ impl SyncNetworkContext { .ok_or(SendErrorProcessor::ProcessorNotAvailable)?; debug!(self.log, "Sending block for processing"; "block" => ?block_root, "id" => id); + // Lookup sync event safety: If `beacon_processor.send_rpc_beacon_block` returns Ok() sync + // must receive a single `SyncMessage::BlockComponentProcessed` with this process type beacon_processor .send_rpc_beacon_block( block_root, @@ -689,6 +753,8 @@ impl SyncNetworkContext { .ok_or(SendErrorProcessor::ProcessorNotAvailable)?; debug!(self.log, "Sending blobs for processing"; "block" => ?block_root, "id" => id); + // Lookup sync event safety: If `beacon_processor.send_rpc_blobs` returns Ok() sync + // must receive a single `SyncMessage::BlockComponentProcessed` event with this process type beacon_processor .send_rpc_blobs( block_root, diff --git a/beacon_node/network/src/sync/network_context/requests.rs b/beacon_node/network/src/sync/network_context/requests.rs index 6e4683701bd..8387e9b0e1a 100644 --- a/beacon_node/network/src/sync/network_context/requests.rs +++ b/beacon_node/network/src/sync/network_context/requests.rs @@ -1,5 +1,8 @@ use beacon_chain::get_block_root; -use lighthouse_network::rpc::{methods::BlobsByRootRequest, BlocksByRootRequest}; +use lighthouse_network::{ + rpc::{methods::BlobsByRootRequest, BlocksByRootRequest}, + PeerId, +}; use std::sync::Arc; use strum::IntoStaticStr; use types::{ @@ -20,13 +23,15 @@ pub enum LookupVerifyError { pub struct ActiveBlocksByRootRequest { request: BlocksByRootSingleRequest, resolved: bool, + pub(crate) peer_id: PeerId, } impl ActiveBlocksByRootRequest { - pub fn new(request: BlocksByRootSingleRequest) -> Self { + pub fn new(request: BlocksByRootSingleRequest, peer_id: PeerId) -> Self { Self { request, resolved: false, + peer_id, } } @@ -94,14 +99,16 @@ pub struct ActiveBlobsByRootRequest { request: BlobsByRootSingleBlockRequest, blobs: Vec>>, resolved: bool, + pub(crate) peer_id: PeerId, } impl ActiveBlobsByRootRequest { - pub fn new(request: BlobsByRootSingleBlockRequest) -> Self { + pub fn new(request: BlobsByRootSingleBlockRequest, peer_id: PeerId) -> Self { Self { request, blobs: vec![], resolved: false, + peer_id, } } diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 63cafa9aca4..122e8287e61 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -174,8 +174,30 @@ impl SyncingChain { /// Removes a peer from the chain. /// If the peer has active batches, those are considered failed and re-requested. - pub fn remove_peer(&mut self, peer_id: &PeerId) -> ProcessingResult { - self.peers.remove(peer_id); + pub fn remove_peer( + &mut self, + peer_id: &PeerId, + network: &mut SyncNetworkContext, + ) -> ProcessingResult { + if let Some(batch_ids) = self.peers.remove(peer_id) { + // fail the batches. + for id in batch_ids { + if let Some(batch) = self.batches.get_mut(&id) { + if let BatchOperationOutcome::Failed { blacklist } = + batch.download_failed(true)? + { + return Err(RemoveChain::ChainFailed { + blacklist, + failing_batch: id, + }); + } + self.retry_batch_download(network, id)?; + } else { + debug!(self.log, "Batch not found while removing peer"; + "peer" => %peer_id, "batch" => id) + } + } + } if self.peers.is_empty() { Err(RemoveChain::EmptyPeerPool) diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index fe48db35b45..c8e82666840 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -278,8 +278,9 @@ where /// for this peer. If so we mark the batch as failed. The batch may then hit it's maximum /// retries. In this case, we need to remove the chain. fn remove_peer(&mut self, network: &mut SyncNetworkContext, peer_id: &PeerId) { - for (removed_chain, sync_type, remove_reason) in - self.chains.call_all(|chain| chain.remove_peer(peer_id)) + for (removed_chain, sync_type, remove_reason) in self + .chains + .call_all(|chain| chain.remove_peer(peer_id, network)) { self.on_chain_removed( removed_chain, From b38019cb1071e5b5a3a3c1b4ec95836842b7ff64 Mon Sep 17 00:00:00 2001 From: Lion - dapplion <35266934+dapplion@users.noreply.github.com> Date: Thu, 27 Jun 2024 01:53:55 +0200 Subject: [PATCH 2/3] Attempt to continue lookups after adding peers (#5993) * Attempt to continue lookups after adding peers --- .../network/src/sync/block_lookups/mod.rs | 22 ++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 0148a6548dd..0ae9bfec521 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -291,7 +291,7 @@ impl BlockLookups { } } - if let Err(e) = self.add_peers_to_lookup_and_ancestors(lookup_id, peers) { + if let Err(e) = self.add_peers_to_lookup_and_ancestors(lookup_id, peers, cx) { warn!(self.log, "Error adding peers to ancestor lookup"; "error" => ?e); } @@ -844,14 +844,17 @@ impl BlockLookups { &mut self, lookup_id: SingleLookupId, peers: &[PeerId], + cx: &mut SyncNetworkContext, ) -> Result<(), String> { let lookup = self .single_block_lookups .get_mut(&lookup_id) .ok_or(format!("Unknown lookup for id {lookup_id}"))?; + let mut added_some_peer = false; for peer in peers { if lookup.add_peer(*peer) { + added_some_peer = true; debug!(self.log, "Adding peer to existing single block lookup"; "block_root" => ?lookup.block_root(), "peer" => ?peer @@ -859,22 +862,25 @@ impl BlockLookups { } } - // We may choose to attempt to continue a lookup here. It is possible that a lookup had zero - // peers and after adding this set of peers it can make progress again. Note that this - // recursive function iterates from child to parent, so continuing the child first is weird. - // However, we choose to not attempt to continue the lookup for simplicity. It's not - // strictly required and just and optimization for a rare corner case. - if let Some(parent_root) = lookup.awaiting_parent() { if let Some((&child_id, _)) = self .single_block_lookups .iter() .find(|(_, l)| l.block_root() == parent_root) { - self.add_peers_to_lookup_and_ancestors(child_id, peers) + self.add_peers_to_lookup_and_ancestors(child_id, peers, cx) } else { Err(format!("Lookup references unknown parent {parent_root:?}")) } + } else if added_some_peer { + // If this lookup is not awaiting a parent and we added at least one peer, attempt to + // make progress. It is possible that a lookup is created with zero peers, attempted to + // make progress, and then receives peers. After that time the lookup will never be + // pruned with `drop_lookups_without_peers` because it has peers. This is rare corner + // case, but it can result in stuck lookups. + let result = lookup.continue_requests(cx); + self.on_lookup_result(lookup_id, result, "add_peers", cx); + Ok(()) } else { Ok(()) } From 9e12c21f268c80a3f002ae0ca27477f9f512eb6f Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Fri, 28 Jun 2024 12:10:32 +1000 Subject: [PATCH 3/3] Release v5.2.1 (testing branch) (#5989) * Release v5.2.1 --- Cargo.lock | 8 ++++---- beacon_node/Cargo.toml | 2 +- boot_node/Cargo.toml | 2 +- common/lighthouse_version/src/lib.rs | 4 ++-- lcli/Cargo.toml | 2 +- lighthouse/Cargo.toml | 2 +- 6 files changed, 10 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a1865289b05..814a9b45ed1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -855,7 +855,7 @@ dependencies = [ [[package]] name = "beacon_node" -version = "5.2.0" +version = "5.2.1" dependencies = [ "beacon_chain", "clap", @@ -1061,7 +1061,7 @@ dependencies = [ [[package]] name = "boot_node" -version = "5.2.0" +version = "5.2.1" dependencies = [ "beacon_node", "clap", @@ -4322,7 +4322,7 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "lcli" -version = "5.2.0" +version = "5.2.1" dependencies = [ "account_utils", "beacon_chain", @@ -4893,7 +4893,7 @@ dependencies = [ [[package]] name = "lighthouse" -version = "5.2.0" +version = "5.2.1" dependencies = [ "account_manager", "account_utils", diff --git a/beacon_node/Cargo.toml b/beacon_node/Cargo.toml index b95720e807f..a5fd29c971f 100644 --- a/beacon_node/Cargo.toml +++ b/beacon_node/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "beacon_node" -version = "5.2.0" +version = "5.2.1" authors = [ "Paul Hauner ", "Age Manning "] edition = { workspace = true } diff --git a/common/lighthouse_version/src/lib.rs b/common/lighthouse_version/src/lib.rs index 6fb06cc543c..d32d7994689 100644 --- a/common/lighthouse_version/src/lib.rs +++ b/common/lighthouse_version/src/lib.rs @@ -17,8 +17,8 @@ pub const VERSION: &str = git_version!( // NOTE: using --match instead of --exclude for compatibility with old Git "--match=thiswillnevermatchlol" ], - prefix = "Lighthouse/v5.2.0-", - fallback = "Lighthouse/v5.2.0" + prefix = "Lighthouse/v5.2.1-", + fallback = "Lighthouse/v5.2.1" ); /// Returns the first eight characters of the latest commit hash for this build. diff --git a/lcli/Cargo.toml b/lcli/Cargo.toml index 73dd93dc3e6..3cddd8ee60b 100644 --- a/lcli/Cargo.toml +++ b/lcli/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "lcli" description = "Lighthouse CLI (modeled after zcli)" -version = "5.2.0" +version = "5.2.1" authors = ["Paul Hauner "] edition = { workspace = true } diff --git a/lighthouse/Cargo.toml b/lighthouse/Cargo.toml index 20466b5de73..a1674d8d2c0 100644 --- a/lighthouse/Cargo.toml +++ b/lighthouse/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lighthouse" -version = "5.2.0" +version = "5.2.1" authors = ["Sigma Prime "] edition = { workspace = true } autotests = false