diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 6852761d8bf..88083b57151 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -16,7 +16,7 @@ use fnv::FnvHashMap; use lighthouse_network::{PeerAction, PeerId}; use lru_cache::LRUTimeCache; pub use single_block_lookup::{BlobRequestState, BlockRequestState}; -use slog::{debug, error, trace, warn, Logger}; +use slog::{debug, error, warn, Logger}; use std::collections::hash_map::Entry; use std::sync::Arc; use std::time::Duration; @@ -233,13 +233,17 @@ impl BlockLookups { .iter_mut() .find(|(_id, lookup)| lookup.is_for_block(block_root)) { - trace!(self.log, "Adding peer to existing single block lookup"; "block_root" => %block_root); - lookup.add_peers(peers); + for peer in peers { + if lookup.add_peer(*peer) { + debug!(self.log, "Adding peer to existing single block lookup"; "block_root" => ?block_root, "peer" => ?peer); + } + } + if let Some(block_component) = block_component { let component_type = block_component.get_type(); let imported = lookup.add_child_components(block_component); if !imported { - debug!(self.log, "Lookup child component ignored"; "block_root" => %block_root, "type" => component_type); + debug!(self.log, "Lookup child component ignored"; "block_root" => ?block_root, "type" => component_type); } } return true; @@ -252,10 +256,15 @@ impl BlockLookups { .iter() .any(|(_, lookup)| lookup.is_for_block(awaiting_parent)) { + warn!(self.log, "Ignoring child lookup parent lookup not found"; "block_root" => ?awaiting_parent); return false; } } + // If we know that this lookup has unknown parent (is awaiting a parent lookup to resolve), + // signal here to hold processing downloaded data. + let mut lookup = SingleBlockLookup::new(block_root, peers, cx.next_id(), awaiting_parent); + let msg = if block_component.is_some() { "Searching for components of a block with unknown parent" } else { @@ -265,14 +274,11 @@ impl BlockLookups { self.log, "{}", msg; "peer_ids" => ?peers, - "block" => ?block_root, + "block_root" => ?block_root, + "id" => lookup.id, ); metrics::inc_counter(&metrics::SYNC_LOOKUP_CREATED); - // If we know that this lookup has unknown parent (is awaiting a parent lookup to resolve), - // signal here to hold processing downloaded data. - let mut lookup = SingleBlockLookup::new(block_root, peers, cx.next_id(), awaiting_parent); - // Add block components to the new request if let Some(block_component) = block_component { lookup.add_child_components(block_component); @@ -338,7 +344,8 @@ impl BlockLookups { Ok((response, seen_timestamp)) => { debug!(self.log, "Received lookup download success"; - "block_root" => %block_root, + "block_root" => ?block_root, + "id" => id, "peer_id" => %peer_id, "response_type" => ?response_type, ); @@ -357,7 +364,8 @@ impl BlockLookups { Err(e) => { debug!(self.log, "Received lookup download failure"; - "block_root" => %block_root, + "block_root" => ?block_root, + "id" => id, "peer_id" => %peer_id, "response_type" => ?response_type, "error" => %e, @@ -380,7 +388,7 @@ impl BlockLookups { req.should_drop_lookup_on_disconnected_peer(peer_id ); if should_drop_lookup { - debug!(self.log, "Dropping single lookup after peer disconnection"; "block_root" => %req.block_root()); + debug!(self.log, "Dropping single lookup after peer disconnection"; "block_root" => ?req.block_root()); } !should_drop_lookup @@ -425,6 +433,7 @@ impl BlockLookups { "Received lookup processing result"; "component" => ?R::response_type(), "block_root" => ?block_root, + "id" => lookup_id, "result" => ?result, ); @@ -496,7 +505,7 @@ impl BlockLookups { debug!( self.log, "Single block lookup failed. Execution layer is offline / unsynced / misconfigured"; - "block_root" => %block_root, + "block_root" => ?block_root, "error" => ?e ); Action::Drop @@ -505,7 +514,7 @@ impl BlockLookups { if e.category() == AvailabilityCheckErrorCategory::Internal => { // There errors indicate internal problems and should not downscore the peer - warn!(self.log, "Internal availability check failure"; "block_root" => %block_root, "error" => ?e); + warn!(self.log, "Internal availability check failure"; "block_root" => ?block_root, "error" => ?e); // Here we choose *not* to call `on_processing_failure` because this could result in a bad // lookup state transition. This error invalidates both blob and block requests, and we don't know the @@ -514,7 +523,7 @@ impl BlockLookups { Action::Drop } other => { - debug!(self.log, "Invalid lookup component"; "block_root" => %block_root, "component" => ?R::response_type(), "error" => ?other); + debug!(self.log, "Invalid lookup component"; "block_root" => ?block_root, "component" => ?R::response_type(), "error" => ?other); let peer_id = request_state.on_processing_failure()?; cx.report_peer( peer_id, @@ -540,7 +549,7 @@ impl BlockLookups { Action::ParentUnknown { parent_root } => { let peers = lookup.all_available_peers().cloned().collect::>(); lookup.set_awaiting_parent(parent_root); - debug!(self.log, "Marking lookup as awaiting parent"; "lookup" => %block_root, "parent_root" => %parent_root); + debug!(self.log, "Marking lookup as awaiting parent"; "id" => lookup.id, "block_root" => ?block_root, "parent_root" => ?parent_root); self.search_parent_of_child(parent_root, block_root, &peers, cx); Ok(LookupResult::Pending) } @@ -562,7 +571,7 @@ impl BlockLookups { for (id, lookup) in self.single_block_lookups.iter_mut() { if lookup.awaiting_parent() == Some(block_root) { lookup.resolve_awaiting_parent(); - debug!(self.log, "Continuing child lookup"; "parent_root" => %block_root, "block_root" => %lookup.block_root()); + debug!(self.log, "Continuing child lookup"; "parent_root" => ?block_root, "id" => id, "block_root" => ?lookup.block_root()); let result = lookup.continue_requests(cx); lookup_results.push((*id, result)); } @@ -578,7 +587,7 @@ impl BlockLookups { /// dropped. pub fn drop_lookup_and_children(&mut self, dropped_id: SingleLookupId) { if let Some(dropped_lookup) = self.single_block_lookups.remove(&dropped_id) { - debug!(self.log, "Dropping child lookup"; "id" => ?dropped_id, "block_root" => %dropped_lookup.block_root()); + debug!(self.log, "Dropping child lookup"; "id" => ?dropped_id, "block_root" => ?dropped_lookup.block_root()); let child_lookups = self .single_block_lookups @@ -605,11 +614,13 @@ impl BlockLookups { Ok(LookupResult::Pending) => {} // no action Ok(LookupResult::Completed) => { if let Some(lookup) = self.single_block_lookups.remove(&id) { - debug!(self.log, "Dropping completed lookup"; "block" => %lookup.block_root()); + debug!(self.log, "Dropping completed lookup"; "block" => ?lookup.block_root(), "id" => id); metrics::inc_counter(&metrics::SYNC_LOOKUP_COMPLETED); // Block imported, continue the requests of pending child blocks self.continue_child_lookups(lookup.block_root(), cx); self.update_metrics(); + } else { + debug!(self.log, "Attempting to drop non-existent lookup"; "id" => id); } } Err(error) => { diff --git a/beacon_node/network/src/sync/block_lookups/parent_chain.rs b/beacon_node/network/src/sync/block_lookups/parent_chain.rs index 55f2cfe1292..7f4fe5119f6 100644 --- a/beacon_node/network/src/sync/block_lookups/parent_chain.rs +++ b/beacon_node/network/src/sync/block_lookups/parent_chain.rs @@ -55,7 +55,7 @@ pub(crate) fn compute_parent_chains(nodes: &[Node]) -> Vec { // Iterate blocks with no children for tip in nodes { let mut block_root = tip.block_root; - if parent_to_child.get(&block_root).is_none() { + if !parent_to_child.contains_key(&block_root) { let mut chain = vec![]; // Resolve chain of blocks diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index b642ec8e5b2..692d4d6445a 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -172,17 +172,12 @@ impl SingleBlockLookup { ) } - /// Add all given peers to both block and blob request states. - pub fn add_peer(&mut self, peer_id: PeerId) { - self.block_request_state.state.add_peer(&peer_id); - self.blob_request_state.state.add_peer(&peer_id); - } - - /// Add all given peers to both block and blob request states. - pub fn add_peers(&mut self, peers: &[PeerId]) { - for peer in peers { - self.add_peer(*peer); - } + /// Add peer to all request states. The peer must be able to serve this request. + /// Returns true if the peer was newly inserted into some request state. + pub fn add_peer(&mut self, peer_id: PeerId) -> bool { + let inserted_block = self.block_request_state.state.add_peer(&peer_id); + let inserted_blob = self.blob_request_state.state.add_peer(&peer_id); + inserted_block || inserted_blob } /// Returns true if the block has already been downloaded. @@ -464,9 +459,10 @@ impl SingleLookupRequestState { self.failed_processing >= self.failed_downloading } - /// This method should be used for peers wrapped in `PeerId::BlockAndBlobs`. - pub fn add_peer(&mut self, peer_id: &PeerId) { - self.available_peers.insert(*peer_id); + /// Add peer to this request states. The peer must be able to serve this request. + /// Returns true if the peer is newly inserted. + pub fn add_peer(&mut self, peer_id: &PeerId) -> bool { + self.available_peers.insert(*peer_id) } /// If a peer disconnects, this request could be failed. If so, an error is returned diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 0836d97c49f..8ca2cadaade 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -56,7 +56,7 @@ use lighthouse_network::rpc::RPCError; use lighthouse_network::types::{NetworkGlobals, SyncState}; use lighthouse_network::SyncInfo; use lighthouse_network::{PeerAction, PeerId}; -use slog::{crit, debug, error, info, trace, warn, Logger}; +use slog::{crit, debug, error, info, o, trace, warn, Logger}; use std::ops::Sub; use std::sync::Arc; use std::time::Duration; @@ -257,9 +257,16 @@ impl SyncManager { beacon_chain.clone(), log.clone(), ), - range_sync: RangeSync::new(beacon_chain.clone(), log.clone()), - backfill_sync: BackFillSync::new(beacon_chain.clone(), network_globals, log.clone()), - block_lookups: BlockLookups::new(log.clone()), + range_sync: RangeSync::new( + beacon_chain.clone(), + log.new(o!("service" => "range_sync")), + ), + backfill_sync: BackFillSync::new( + beacon_chain.clone(), + network_globals, + log.new(o!("service" => "backfill_sync")), + ), + block_lookups: BlockLookups::new(log.new(o!("service"=> "lookup_sync"))), log: log.clone(), } }