From 4773b489c5c04a9e6bfc8573d5d9b2320e9fe101 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Wed, 15 Nov 2023 19:10:41 +0300 Subject: [PATCH 1/3] Clone state ahead of block production --- beacon_node/beacon_chain/src/beacon_chain.rs | 61 +++++++++++++++---- beacon_node/beacon_chain/src/builder.rs | 1 + .../beacon_chain/src/state_advance_timer.rs | 48 ++++++++++++--- 3 files changed, 89 insertions(+), 21 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 2fd70056cc1..a5ad39a5514 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -482,6 +482,8 @@ pub struct BeaconChain { pub data_availability_checker: Arc>, /// The KZG trusted setup used by this chain. pub kzg: Option>, + #[allow(clippy::type_complexity)] + pub block_production_state: Arc)>>>, } pub enum BeaconBlockResponseType { @@ -4030,7 +4032,16 @@ impl BeaconChain { ); (re_org_state.pre_state, re_org_state.state_root) } - // Normal case: proposing a block atop the current head. Use the snapshot cache. + // Normal case: proposing a block atop the current head using the cache. + else if let Some((_, cached_state)) = self + .block_production_state + .lock() + .take() + .filter(|(cached_block_root, _)| *cached_block_root == head_block_root) + { + (cached_state.pre_state, cached_state.state_root) + } + // Fall back to a direct read of the snapshot cache. else if let Some(pre_state) = self .snapshot_cache .try_read_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT) @@ -4038,6 +4049,12 @@ impl BeaconChain { snapshot_cache.get_state_for_block_production(head_block_root) }) { + warn!( + self.log, + "Block production cache miss"; + "message" => "falling back to snapshot cache clone", + "slot" => slot + ); (pre_state.pre_state, pre_state.state_root) } else { warn!( @@ -4161,12 +4178,27 @@ impl BeaconChain { drop(proposer_head_timer); let re_org_parent_block = proposer_head.parent_node.root; - // Only attempt a re-org if we hit the snapshot cache. + // Only attempt a re-org if we hit the block production cache or snapshot cache. let pre_state = self - .snapshot_cache - .try_read_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT) - .and_then(|snapshot_cache| { - snapshot_cache.get_state_for_block_production(re_org_parent_block) + .block_production_state + .lock() + .take() + .and_then(|(cached_block_root, state)| { + (cached_block_root == re_org_parent_block).then_some(state) + }) + .or_else(|| { + warn!( + self.log, + "Block production cache miss"; + "message" => "falling back to snapshot cache during re-org", + "slot" => slot, + "block_root" => ?re_org_parent_block + ); + self.snapshot_cache + .try_read_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT) + .and_then(|snapshot_cache| { + snapshot_cache.get_state_for_block_production(re_org_parent_block) + }) }) .or_else(|| { debug!( @@ -5326,15 +5358,18 @@ impl BeaconChain { /// /// This function will result in a call to `forkchoiceUpdated` on the EL if we're in the /// tail-end of the slot (as defined by `self.config.prepare_payload_lookahead`). + /// + /// Return `Ok(Some(head_block_root))` if this node prepared to propose at the next slot on + /// top of `head_block_root`. pub async fn prepare_beacon_proposer( self: &Arc, current_slot: Slot, - ) -> Result<(), Error> { + ) -> Result, Error> { let prepare_slot = current_slot + 1; // There's no need to run the proposer preparation routine before the bellatrix fork. if self.slot_is_prior_to_bellatrix(prepare_slot) { - return Ok(()); + return Ok(None); } let execution_layer = self @@ -5347,7 +5382,7 @@ impl BeaconChain { if !self.config.always_prepare_payload && !execution_layer.has_any_proposer_preparation_data().await { - return Ok(()); + return Ok(None); } // Load the cached head and its forkchoice update parameters. @@ -5394,7 +5429,7 @@ impl BeaconChain { let Some((forkchoice_update_params, Some(pre_payload_attributes))) = maybe_prep_data else { // Appropriate log messages have already been logged above and in // `get_pre_payload_attributes`. - return Ok(()); + return Ok(None); }; // If the execution layer doesn't have any proposer data for this validator then we assume @@ -5405,7 +5440,7 @@ impl BeaconChain { .has_proposer_preparation_data(proposer) .await { - return Ok(()); + return Ok(None); } // Fetch payload attributes from the execution layer's cache, or compute them from scratch @@ -5500,7 +5535,7 @@ impl BeaconChain { "prepare_slot" => prepare_slot, "validator" => proposer, ); - return Ok(()); + return Ok(None); }; // If we are close enough to the proposal slot, send an fcU, which will have payload @@ -5523,7 +5558,7 @@ impl BeaconChain { .await?; } - Ok(()) + Ok(Some(head_root)) } pub async fn update_execution_engine_forkchoice( diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index bffb23aeb7e..fbd255126ee 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -925,6 +925,7 @@ where .map_err(|e| format!("Error initializing DataAvailabiltyChecker: {:?}", e))?, ), kzg, + block_production_state: Arc::new(Mutex::new(None)), }; let head = beacon_chain.head_snapshot(); diff --git a/beacon_node/beacon_chain/src/state_advance_timer.rs b/beacon_node/beacon_chain/src/state_advance_timer.rs index f3e97168a5c..b3908c589c3 100644 --- a/beacon_node/beacon_chain/src/state_advance_timer.rs +++ b/beacon_node/beacon_chain/src/state_advance_timer.rs @@ -227,19 +227,51 @@ async fn state_advance_timer( // Prepare proposers so that the node can send payload attributes in the case where // it decides to abandon a proposer boost re-org. - if let Err(e) = beacon_chain.prepare_beacon_proposer(current_slot).await { - warn!( - log, - "Unable to prepare proposer with lookahead"; - "error" => ?e, - "slot" => next_slot, - ); - } + let proposer_head = beacon_chain + .prepare_beacon_proposer(current_slot) + .await + .unwrap_or_else(|e| { + warn!( + log, + "Unable to prepare proposer with lookahead"; + "error" => ?e, + "slot" => next_slot, + ); + None + }); // Use a blocking task to avoid blocking the core executor whilst waiting for locks // in `ForkChoiceSignalTx`. beacon_chain.task_executor.clone().spawn_blocking( move || { + // If we're proposing, clone the head state preemptively so that it isn't on the hot + // path of proposing. We can delete this once we have tree-states. + if let Some(proposer_head) = proposer_head { + if let Some(proposer_state) = beacon_chain + .snapshot_cache + .try_read_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT) + .and_then(|snapshot_cache| { + snapshot_cache.get_state_for_block_production(proposer_head) + }) + { + *beacon_chain.block_production_state.lock() = + Some((proposer_head, proposer_state)); + debug!( + log, + "Cloned state ready for block production"; + "head_block_root" => ?proposer_head, + "slot" => next_slot + ); + } + } else { + warn!( + log, + "Block production state missing from snapshot cache"; + "head_block_root" => ?proposer_head, + "slot" => next_slot + ); + } + // Signal block proposal for the next slot (if it happens to be waiting). if let Some(tx) = &beacon_chain.fork_choice_signal_tx { if let Err(e) = tx.notify_fork_choice_complete(next_slot) { From e8c25d5789a9c5037567c51f8b09db535d7e45e2 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Mon, 20 Nov 2023 12:16:35 +0300 Subject: [PATCH 2/3] Add pruning and fix logging --- beacon_node/beacon_chain/src/beacon_chain.rs | 3 ++ .../beacon_chain/src/state_advance_timer.rs | 30 ++++++++++++++----- 2 files changed, 25 insertions(+), 8 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index a5ad39a5514..ddee0dd3153 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -482,6 +482,9 @@ pub struct BeaconChain { pub data_availability_checker: Arc>, /// The KZG trusted setup used by this chain. pub kzg: Option>, + /// State with complete tree hash cache, ready for block production. + /// + /// NB: We can delete this once we have tree-states. #[allow(clippy::type_complexity)] pub block_production_state: Arc)>>>, } diff --git a/beacon_node/beacon_chain/src/state_advance_timer.rs b/beacon_node/beacon_chain/src/state_advance_timer.rs index b3908c589c3..00c3455c032 100644 --- a/beacon_node/beacon_chain/src/state_advance_timer.rs +++ b/beacon_node/beacon_chain/src/state_advance_timer.rs @@ -45,6 +45,9 @@ const MAX_ADVANCE_DISTANCE: u64 = 4; /// impact whilst having 8 epochs without a block is a comfortable grace period. const MAX_FORK_CHOICE_DISTANCE: u64 = 256; +/// Drop any unused block production state cache after this many slots. +const MAX_BLOCK_PRODUCTION_CACHE_DISTANCE: u64 = 4; + #[derive(Debug)] enum Error { BeaconChain(BeaconChainError), @@ -244,8 +247,8 @@ async fn state_advance_timer( // in `ForkChoiceSignalTx`. beacon_chain.task_executor.clone().spawn_blocking( move || { - // If we're proposing, clone the head state preemptively so that it isn't on the hot - // path of proposing. We can delete this once we have tree-states. + // If we're proposing, clone the head state preemptively so that it isn't on + // the hot path of proposing. We can delete this once we have tree-states. if let Some(proposer_head) = proposer_head { if let Some(proposer_state) = beacon_chain .snapshot_cache @@ -262,14 +265,25 @@ async fn state_advance_timer( "head_block_root" => ?proposer_head, "slot" => next_slot ); + } else { + warn!( + log, + "Block production state missing from snapshot cache"; + "head_block_root" => ?proposer_head, + "slot" => next_slot + ); } } else { - warn!( - log, - "Block production state missing from snapshot cache"; - "head_block_root" => ?proposer_head, - "slot" => next_slot - ); + // If we aren't proposing, drop any old block production cache to save + // memory. + let mut cache = beacon_chain.block_production_state.lock(); + if let Some((_, state)) = &*cache { + if state.pre_state.slot() + MAX_BLOCK_PRODUCTION_CACHE_DISTANCE + <= current_slot + { + drop(cache.take()); + } + } } // Signal block proposal for the next slot (if it happens to be waiting). From 6d5d558a35c6863cf263f535805a1fc3d83488b2 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Sat, 25 Nov 2023 14:05:09 +0400 Subject: [PATCH 3/3] Don't hold 2 states in mem --- .../beacon_chain/src/state_advance_timer.rs | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/beacon_node/beacon_chain/src/state_advance_timer.rs b/beacon_node/beacon_chain/src/state_advance_timer.rs index 00c3455c032..c04815ebc13 100644 --- a/beacon_node/beacon_chain/src/state_advance_timer.rs +++ b/beacon_node/beacon_chain/src/state_advance_timer.rs @@ -250,6 +250,18 @@ async fn state_advance_timer( // If we're proposing, clone the head state preemptively so that it isn't on // the hot path of proposing. We can delete this once we have tree-states. if let Some(proposer_head) = proposer_head { + let mut cache = beacon_chain.block_production_state.lock(); + + // Avoid holding two states in memory. It's OK to hold the lock because + // we always lock the block production cache before the snapshot cache + // and we prefer for block production to wait for the block production + // cache if a clone is in-progress. + if cache + .as_ref() + .map_or(false, |(cached_head, _)| *cached_head != proposer_head) + { + drop(cache.take()); + } if let Some(proposer_state) = beacon_chain .snapshot_cache .try_read_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT) @@ -257,8 +269,7 @@ async fn state_advance_timer( snapshot_cache.get_state_for_block_production(proposer_head) }) { - *beacon_chain.block_production_state.lock() = - Some((proposer_head, proposer_state)); + *cache = Some((proposer_head, proposer_state)); debug!( log, "Cloned state ready for block production";