diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index d01613f3dde..279cc0959bc 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -2887,7 +2887,7 @@ impl BeaconChain { metrics::start_timer(&metrics::FORK_CHOICE_PROCESS_BLOCK_TIMES); let block_delay = self .slot_clock - .seconds_from_current_slot_start(self.spec.seconds_per_slot) + .seconds_from_current_slot_start() .ok_or(Error::UnableToComputeTimeAtSlot)?; fork_choice @@ -3479,7 +3479,7 @@ impl BeaconChain { let slot_delay = self .slot_clock - .seconds_from_current_slot_start(self.spec.seconds_per_slot) + .seconds_from_current_slot_start() .or_else(|| { warn!( self.log, diff --git a/beacon_node/beacon_chain/src/chain_config.rs b/beacon_node/beacon_chain/src/chain_config.rs index eecaee6de54..a2d44b643da 100644 --- a/beacon_node/beacon_chain/src/chain_config.rs +++ b/beacon_node/beacon_chain/src/chain_config.rs @@ -68,6 +68,8 @@ pub struct ChainConfig { /// /// This is useful for block builders and testing. pub always_prepare_payload: bool, + /// Whether backfill sync processing should be rate-limited. + pub enable_backfill_rate_limiting: bool, } impl Default for ChainConfig { @@ -93,6 +95,7 @@ impl Default for ChainConfig { optimistic_finalized_sync: true, shuffling_cache_size: crate::shuffling_cache::DEFAULT_CACHE_SIZE, always_prepare_payload: false, + enable_backfill_rate_limiting: true, } } } diff --git a/beacon_node/network/src/beacon_processor/tests.rs b/beacon_node/network/src/beacon_processor/tests.rs index cf4934a6683..00ca60ba992 100644 --- a/beacon_node/network/src/beacon_processor/tests.rs +++ b/beacon_node/network/src/beacon_processor/tests.rs @@ -9,7 +9,7 @@ use crate::{service::NetworkMessage, sync::SyncMessage}; use beacon_chain::test_utils::{ AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType, }; -use beacon_chain::{BeaconChain, MAXIMUM_GOSSIP_CLOCK_DISPARITY}; +use beacon_chain::{BeaconChain, ChainConfig, MAXIMUM_GOSSIP_CLOCK_DISPARITY}; use lighthouse_network::{ discv5::enr::{CombinedKey, EnrBuilder}, rpc::methods::{MetaData, MetaDataV2}, @@ -23,8 +23,8 @@ use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; use types::{ - Attestation, AttesterSlashing, EthSpec, MainnetEthSpec, ProposerSlashing, SignedBeaconBlock, - SignedVoluntaryExit, SubnetId, + Attestation, AttesterSlashing, Epoch, EthSpec, MainnetEthSpec, ProposerSlashing, + SignedBeaconBlock, SignedVoluntaryExit, SubnetId, }; type E = MainnetEthSpec; @@ -70,6 +70,10 @@ impl Drop for TestRig { impl TestRig { pub async fn new(chain_length: u64) -> Self { + Self::new_with_chain_config(chain_length, ChainConfig::default()).await + } + + pub async fn new_with_chain_config(chain_length: u64, chain_config: ChainConfig) -> Self { // This allows for testing voluntary exits without building out a massive chain. let mut spec = E::default_spec(); spec.shard_committee_period = 2; @@ -78,6 +82,7 @@ impl TestRig { .spec(spec) .deterministic_keypairs(VALIDATOR_COUNT) .fresh_ephemeral_store() + .chain_config(chain_config) .build(); harness.advance_slot(); @@ -261,6 +266,14 @@ impl TestRig { self.beacon_processor_tx.try_send(event).unwrap(); } + pub fn enqueue_backfill_batch(&self) { + let event = WorkEvent::chain_segment( + ChainSegmentProcessId::BackSyncBatchId(Epoch::default()), + Vec::default(), + ); + self.beacon_processor_tx.try_send(event).unwrap(); + } + pub fn enqueue_unaggregated_attestation(&self) { let (attestation, subnet_id) = self.attestations.first().unwrap().clone(); self.beacon_processor_tx @@ -873,3 +886,49 @@ async fn test_rpc_block_reprocessing() { // cache handle was dropped. assert_eq!(next_block_root, rig.head_root()); } + +/// Ensure that backfill batches get rate-limited and processing is scheduled at specified intervals. +#[tokio::test] +async fn test_backfill_sync_processing() { + let mut rig = TestRig::new(SMALL_CHAIN).await; + // Note: to verify the exact event times in an integration test is not straight forward here + // (not straight forward to manipulate `TestingSlotClock` due to cloning of `SlotClock` in code) + // and makes the test very slow, hence timing calculation is unit tested separately in + // `work_reprocessing_queue`. + for _ in 0..1 { + rig.enqueue_backfill_batch(); + // ensure queued batch is not processed until later + rig.assert_no_events_for(Duration::from_millis(100)).await; + // A new batch should be processed within a slot. + rig.assert_event_journal_with_timeout( + &[CHAIN_SEGMENT_BACKFILL, WORKER_FREED, NOTHING_TO_DO], + rig.chain.slot_clock.slot_duration(), + ) + .await; + } +} + +/// Ensure that backfill batches get processed as fast as they can when rate-limiting is disabled. +#[tokio::test] +async fn test_backfill_sync_processing_rate_limiting_disabled() { + let chain_config = ChainConfig { + enable_backfill_rate_limiting: false, + ..Default::default() + }; + let mut rig = TestRig::new_with_chain_config(SMALL_CHAIN, chain_config).await; + + for _ in 0..3 { + rig.enqueue_backfill_batch(); + } + + // ensure all batches are processed + rig.assert_event_journal_with_timeout( + &[ + CHAIN_SEGMENT_BACKFILL, + CHAIN_SEGMENT_BACKFILL, + CHAIN_SEGMENT_BACKFILL, + ], + Duration::from_millis(100), + ) + .await; +} diff --git a/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs b/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs index 0e4a08f5d6c..969e1eea3db 100644 --- a/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs +++ b/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs @@ -11,6 +11,7 @@ //! Aggregated and unaggregated attestations that failed verification due to referencing an unknown //! block will be re-queued until their block is imported, or until they expire. use super::MAX_SCHEDULED_WORK_QUEUE_LEN; +use crate::beacon_processor::{ChainSegmentProcessId, Work, WorkEvent}; use crate::metrics; use crate::sync::manager::BlockProcessType; use beacon_chain::blob_verification::{AsBlock, BlockWrapper}; @@ -18,14 +19,17 @@ use beacon_chain::{BeaconChainTypes, GossipVerifiedBlock, MAXIMUM_GOSSIP_CLOCK_D use fnv::FnvHashMap; use futures::task::Poll; use futures::{Stream, StreamExt}; +use itertools::Itertools; use lighthouse_network::{MessageId, PeerId}; use logging::TimeLatch; use slog::{crit, debug, error, trace, warn, Logger}; use slot_clock::SlotClock; use std::collections::{HashMap, HashSet}; +use std::future::Future; use std::pin::Pin; use std::task::Context; use std::time::Duration; +use strum::AsRefStr; use task_executor::TaskExecutor; use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::time::error::Error as TimeError; @@ -64,7 +68,21 @@ const MAXIMUM_QUEUED_ATTESTATIONS: usize = 16_384; /// How many light client updates we keep before new ones get dropped. const MAXIMUM_QUEUED_LIGHT_CLIENT_UPDATES: usize = 128; +// Process backfill batch 50%, 60%, 80% through each slot. +// +// Note: use caution to set these fractions in a way that won't cause panic-y +// arithmetic. +pub const BACKFILL_SCHEDULE_IN_SLOT: [(u32, u32); 3] = [ + // One half: 6s on mainnet, 2.5s on Gnosis. + (1, 2), + // Three fifths: 7.2s on mainnet, 3s on Gnosis. + (3, 5), + // Four fifths: 9.6s on mainnet, 4s on Gnosis. + (4, 5), +]; + /// Messages that the scheduler can receive. +#[derive(AsRefStr)] pub enum ReprocessQueueMessage { /// A block that has been received early and we should queue for later processing. EarlyBlock(QueuedGossipBlock), @@ -83,6 +101,8 @@ pub enum ReprocessQueueMessage { UnknownBlockAggregate(QueuedAggregate), /// A light client optimistic update that references a parent root that has not been seen as a parent. UnknownLightClientOptimisticUpdate(QueuedLightClientUpdate), + /// A new backfill batch that needs to be scheduled for processing. + BackfillSync(QueuedBackfillBatch), } /// Events sent by the scheduler once they are ready for re-processing. @@ -92,6 +112,7 @@ pub enum ReadyWork { Unaggregate(QueuedUnaggregate), Aggregate(QueuedAggregate), LightClientUpdate(QueuedLightClientUpdate), + BackfillSync(QueuedBackfillBatch), } /// An Attestation for which the corresponding block was not seen while processing, queued for @@ -143,6 +164,40 @@ pub struct QueuedRpcBlock { pub should_process: bool, } +/// A backfill batch work that has been queued for processing later. +#[derive(Clone)] +pub struct QueuedBackfillBatch { + pub process_id: ChainSegmentProcessId, + pub blocks: Vec>>, +} + +impl TryFrom> for QueuedBackfillBatch { + type Error = WorkEvent; + + fn try_from(event: WorkEvent) -> Result> { + match event { + WorkEvent { + work: + Work::ChainSegment { + process_id: process_id @ ChainSegmentProcessId::BackSyncBatchId(_), + blocks, + }, + .. + } => Ok(QueuedBackfillBatch { process_id, blocks }), + _ => Err(event), + } + } +} + +impl From> for WorkEvent { + fn from(queued_backfill_batch: QueuedBackfillBatch) -> WorkEvent { + WorkEvent::chain_segment( + queued_backfill_batch.process_id, + queued_backfill_batch.blocks, + ) + } +} + /// Unifies the different messages processed by the block delay queue. enum InboundEvent { /// A gossip block that was queued for later processing and is ready for import. @@ -154,6 +209,8 @@ enum InboundEvent { ReadyAttestation(QueuedAttestationId), /// A light client update that is ready for re-processing. ReadyLightClientUpdate(QueuedLightClientUpdateId), + /// A backfill batch that was queued is ready for processing. + ReadyBackfillSync(QueuedBackfillBatch), /// A `DelayQueue` returned an error. DelayQueueError(TimeError, &'static str), /// A message sent to the `ReprocessQueue` @@ -190,6 +247,8 @@ struct ReprocessQueue { queued_lc_updates: FnvHashMap, DelayKey)>, /// Light Client Updates per parent_root. awaiting_lc_updates_per_parent_root: HashMap>, + /// Queued backfill batches + queued_backfill_batches: Vec>, /* Aux */ /// Next attestation id, used for both aggregated and unaggregated attestations @@ -199,6 +258,8 @@ struct ReprocessQueue { rpc_block_debounce: TimeLatch, attestation_delay_debounce: TimeLatch, lc_update_delay_debounce: TimeLatch, + next_backfill_batch_event: Option>>, + slot_clock: Pin>, } pub type QueuedLightClientUpdateId = usize; @@ -286,6 +347,20 @@ impl Stream for ReprocessQueue { Poll::Ready(None) | Poll::Pending => (), } + if let Some(next_backfill_batch_event) = self.next_backfill_batch_event.as_mut() { + match next_backfill_batch_event.as_mut().poll(cx) { + Poll::Ready(_) => { + let maybe_batch = self.queued_backfill_batches.pop(); + self.recompute_next_backfill_batch_event(); + + if let Some(batch) = maybe_batch { + return Poll::Ready(Some(InboundEvent::ReadyBackfillSync(batch))); + } + } + Poll::Pending => (), + } + } + // Last empty the messages channel. match self.work_reprocessing_rx.poll_recv(cx) { Poll::Ready(Some(message)) => return Poll::Ready(Some(InboundEvent::Msg(message))), @@ -322,12 +397,15 @@ pub fn spawn_reprocess_scheduler( queued_unaggregates: FnvHashMap::default(), awaiting_attestations_per_root: HashMap::new(), awaiting_lc_updates_per_parent_root: HashMap::new(), + queued_backfill_batches: Vec::new(), next_attestation: 0, next_lc_update: 0, early_block_debounce: TimeLatch::default(), rpc_block_debounce: TimeLatch::default(), attestation_delay_debounce: TimeLatch::default(), lc_update_delay_debounce: TimeLatch::default(), + next_backfill_batch_event: None, + slot_clock: Box::pin(slot_clock.clone()), }; executor.spawn( @@ -678,6 +756,14 @@ impl ReprocessQueue { } } } + InboundEvent::Msg(BackfillSync(queued_backfill_batch)) => { + self.queued_backfill_batches + .insert(0, queued_backfill_batch); + // only recompute if there is no `next_backfill_batch_event` already scheduled + if self.next_backfill_batch_event.is_none() { + self.recompute_next_backfill_batch_event(); + } + } // A block that was queued for later processing is now ready to be processed. InboundEvent::ReadyGossipBlock(ready_block) => { let block_root = ready_block.block.block_root; @@ -785,6 +871,33 @@ impl ReprocessQueue { } } } + InboundEvent::ReadyBackfillSync(queued_backfill_batch) => { + let millis_from_slot_start = slot_clock + .millis_from_current_slot_start() + .map_or("null".to_string(), |duration| { + duration.as_millis().to_string() + }); + + debug!( + log, + "Sending scheduled backfill work"; + "millis_from_slot_start" => millis_from_slot_start + ); + + if self + .ready_work_tx + .try_send(ReadyWork::BackfillSync(queued_backfill_batch.clone())) + .is_err() + { + error!( + log, + "Failed to send scheduled backfill work"; + "info" => "sending work back to queue" + ); + self.queued_backfill_batches + .insert(0, queued_backfill_batch); + } + } } metrics::set_gauge_vec( @@ -808,4 +921,95 @@ impl ReprocessQueue { self.lc_updates_delay_queue.len() as i64, ); } + + fn recompute_next_backfill_batch_event(&mut self) { + // only recompute the `next_backfill_batch_event` if there are backfill batches in the queue + if !self.queued_backfill_batches.is_empty() { + self.next_backfill_batch_event = Some(Box::pin(tokio::time::sleep( + ReprocessQueue::::duration_until_next_backfill_batch_event(&self.slot_clock), + ))); + } else { + self.next_backfill_batch_event = None + } + } + + /// Returns duration until the next scheduled processing time. The schedule ensure that backfill + /// processing is done in windows of time that aren't critical + fn duration_until_next_backfill_batch_event(slot_clock: &T::SlotClock) -> Duration { + let slot_duration = slot_clock.slot_duration(); + slot_clock + .millis_from_current_slot_start() + .and_then(|duration_from_slot_start| { + BACKFILL_SCHEDULE_IN_SLOT + .into_iter() + // Convert fractions to seconds from slot start. + .map(|(multiplier, divisor)| (slot_duration / divisor) * multiplier) + .find_or_first(|&event_duration_from_slot_start| { + event_duration_from_slot_start > duration_from_slot_start + }) + .map(|next_event_time| { + if duration_from_slot_start >= next_event_time { + // event is in the next slot, add duration to next slot + let duration_to_next_slot = slot_duration - duration_from_slot_start; + duration_to_next_slot + next_event_time + } else { + next_event_time - duration_from_slot_start + } + }) + }) + // If we can't read the slot clock, just wait another slot. + .unwrap_or(slot_duration) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use beacon_chain::builder::Witness; + use beacon_chain::eth1_chain::CachingEth1Backend; + use slot_clock::TestingSlotClock; + use store::MemoryStore; + use types::MainnetEthSpec as E; + use types::Slot; + + type TestBeaconChainType = + Witness, E, MemoryStore, MemoryStore>; + + #[test] + fn backfill_processing_schedule_calculation() { + let slot_duration = Duration::from_secs(12); + let slot_clock = TestingSlotClock::new(Slot::new(0), Duration::from_secs(0), slot_duration); + let current_slot_start = slot_clock.start_of(Slot::new(100)).unwrap(); + slot_clock.set_current_time(current_slot_start); + + let event_times = BACKFILL_SCHEDULE_IN_SLOT + .map(|(multiplier, divisor)| (slot_duration / divisor) * multiplier); + + for &event_duration_from_slot_start in event_times.iter() { + let duration_to_next_event = + ReprocessQueue::::duration_until_next_backfill_batch_event( + &slot_clock, + ); + + let current_time = slot_clock.millis_from_current_slot_start().unwrap(); + + assert_eq!( + duration_to_next_event, + event_duration_from_slot_start - current_time + ); + + slot_clock.set_current_time(current_slot_start + event_duration_from_slot_start) + } + + // check for next event beyond the current slot + let duration_to_next_slot = slot_clock.duration_to_next_slot().unwrap(); + let duration_to_next_event = + ReprocessQueue::::duration_until_next_backfill_batch_event( + &slot_clock, + ); + assert_eq!( + duration_to_next_event, + duration_to_next_slot + event_times[0] + ); + } } diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 083330753a6..de3d45b09e0 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -268,6 +268,14 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .min_values(0) .hidden(true) ) + .arg( + Arg::with_name("disable-backfill-rate-limiting") + .long("disable-backfill-rate-limiting") + .help("Disable the backfill sync rate-limiting. This allow users to just sync the entire chain as fast \ + as possible, however it can result in resource contention which degrades staking performance. Stakers \ + should generally choose to avoid this flag since backfill sync is not required for staking.") + .takes_value(false), + ) /* REST API related arguments */ .arg( Arg::with_name("http") diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index b9bdf1e965b..288f849f4cf 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -796,6 +796,10 @@ pub fn get_config( client_config.always_prefer_builder_payload = true; } + // Backfill sync rate-limiting + client_config.chain.enable_backfill_rate_limiting = + !cli_args.is_present("disable-backfill-rate-limiting"); + Ok(client_config) } diff --git a/common/slot_clock/src/lib.rs b/common/slot_clock/src/lib.rs index 760b2f9cdb1..8f7bbc1b783 100644 --- a/common/slot_clock/src/lib.rs +++ b/common/slot_clock/src/lib.rs @@ -104,12 +104,23 @@ pub trait SlotClock: Send + Sync + Sized + Clone { self.slot_duration() * 2 / INTERVALS_PER_SLOT as u32 } - /// Returns the `Duration` since the start of the current `Slot`. Useful in determining whether to apply proposer boosts. - fn seconds_from_current_slot_start(&self, seconds_per_slot: u64) -> Option { + /// Returns the `Duration` since the start of the current `Slot` at seconds precision. Useful in determining whether to apply proposer boosts. + fn seconds_from_current_slot_start(&self) -> Option { self.now_duration() .and_then(|now| now.checked_sub(self.genesis_duration())) .map(|duration_into_slot| { - Duration::from_secs(duration_into_slot.as_secs() % seconds_per_slot) + Duration::from_secs(duration_into_slot.as_secs() % self.slot_duration().as_secs()) + }) + } + + /// Returns the `Duration` since the start of the current `Slot` at milliseconds precision. + fn millis_from_current_slot_start(&self) -> Option { + self.now_duration() + .and_then(|now| now.checked_sub(self.genesis_duration())) + .map(|duration_into_slot| { + Duration::from_millis( + (duration_into_slot.as_millis() % self.slot_duration().as_millis()) as u64, + ) }) } diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index 8ea89f49dee..b6327ade15f 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -1052,6 +1052,19 @@ fn disable_upnp_flag() { .with_config(|config| assert!(!config.network.upnp_enabled)); } #[test] +fn disable_backfill_rate_limiting_flag() { + CommandLineTest::new() + .flag("disable-backfill-rate-limiting", None) + .run_with_zero_port() + .with_config(|config| assert!(!config.chain.enable_backfill_rate_limiting)); +} +#[test] +fn default_backfill_rate_limiting_flag() { + CommandLineTest::new() + .run_with_zero_port() + .with_config(|config| assert!(config.chain.enable_backfill_rate_limiting)); +} +#[test] fn default_boot_nodes() { let mainnet = vec![ // Lighthouse Team (Sigma Prime) diff --git a/testing/ef_tests/src/cases/fork_choice.rs b/testing/ef_tests/src/cases/fork_choice.rs index 0e8c9a9ae95..49342d21148 100644 --- a/testing/ef_tests/src/cases/fork_choice.rs +++ b/testing/ef_tests/src/cases/fork_choice.rs @@ -425,7 +425,7 @@ impl Tester { .harness .chain .slot_clock - .seconds_from_current_slot_start(self.spec.seconds_per_slot) + .seconds_from_current_slot_start() .unwrap(); let result = self