Skip to content

Commit

Permalink
Rate limiting backfill sync (sigp#3936)
Browse files Browse the repository at this point in the history
- Introduce a new `rate_limiting_backfill_queue` - any new inbound backfill work events gets immediately sent to this FIFO queue **without any processing**
- Spawn a `backfill_scheduler` routine that pops a backfill event from the FIFO queue at specified intervals (currently halfway through a slot, or at 6s after slot start for 12s slots) and sends the event to `BeaconProcessor` via a `scheduled_backfill_work_tx` channel
- This channel gets polled last in the `InboundEvents`, and work event received is  wrapped in a `InboundEvent::ScheduledBackfillWork` enum variant, which gets processed immediately or queued by the `BeaconProcessor` (existing logic applies from here)

Diagram comparing backfill processing with / without rate-limiting:
sigp#3212 (comment)

See this comment for @paulhauner's  explanation and solution: sigp#3212 (comment)

I've compared this branch (with backfill processing rate limited to to 1 and 3 batches per slot) against the latest stable version. The CPU usage during backfill sync is reduced by ~5% - 20%, more details on this page:

https://hackmd.io/@jimmygchen/SJuVpJL3j

The above testing is done on Goerli (as I don't currently have hardware for Mainnet), I'm guessing the differences are likely to be bigger on mainnet due to block size.

- [x] Experiment with processing multiple batches per slot. (need to think about how to do this for different slot durations)
- [x] Add option to disable rate-limiting, enabed by default.
- [x] (No longer required now we're reusing the reprocessing queue) Complete the `backfill_scheduler` task when backfill sync is completed or not required
  • Loading branch information
jimmygchen authored and Woodpile37 committed Jan 6, 2024
1 parent 04907da commit bb1916e
Show file tree
Hide file tree
Showing 9 changed files with 311 additions and 9 deletions.
4 changes: 2 additions & 2 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2887,7 +2887,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
metrics::start_timer(&metrics::FORK_CHOICE_PROCESS_BLOCK_TIMES);
let block_delay = self
.slot_clock
.seconds_from_current_slot_start(self.spec.seconds_per_slot)
.seconds_from_current_slot_start()
.ok_or(Error::UnableToComputeTimeAtSlot)?;

fork_choice
Expand Down Expand Up @@ -3479,7 +3479,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

let slot_delay = self
.slot_clock
.seconds_from_current_slot_start(self.spec.seconds_per_slot)
.seconds_from_current_slot_start()
.or_else(|| {
warn!(
self.log,
Expand Down
3 changes: 3 additions & 0 deletions beacon_node/beacon_chain/src/chain_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ pub struct ChainConfig {
///
/// This is useful for block builders and testing.
pub always_prepare_payload: bool,
/// Whether backfill sync processing should be rate-limited.
pub enable_backfill_rate_limiting: bool,
}

impl Default for ChainConfig {
Expand All @@ -93,6 +95,7 @@ impl Default for ChainConfig {
optimistic_finalized_sync: true,
shuffling_cache_size: crate::shuffling_cache::DEFAULT_CACHE_SIZE,
always_prepare_payload: false,
enable_backfill_rate_limiting: true,
}
}
}
65 changes: 62 additions & 3 deletions beacon_node/network/src/beacon_processor/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{service::NetworkMessage, sync::SyncMessage};
use beacon_chain::test_utils::{
AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType,
};
use beacon_chain::{BeaconChain, MAXIMUM_GOSSIP_CLOCK_DISPARITY};
use beacon_chain::{BeaconChain, ChainConfig, MAXIMUM_GOSSIP_CLOCK_DISPARITY};
use lighthouse_network::{
discv5::enr::{CombinedKey, EnrBuilder},
rpc::methods::{MetaData, MetaDataV2},
Expand All @@ -23,8 +23,8 @@ use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use types::{
Attestation, AttesterSlashing, EthSpec, MainnetEthSpec, ProposerSlashing, SignedBeaconBlock,
SignedVoluntaryExit, SubnetId,
Attestation, AttesterSlashing, Epoch, EthSpec, MainnetEthSpec, ProposerSlashing,
SignedBeaconBlock, SignedVoluntaryExit, SubnetId,
};

type E = MainnetEthSpec;
Expand Down Expand Up @@ -70,6 +70,10 @@ impl Drop for TestRig {

impl TestRig {
pub async fn new(chain_length: u64) -> Self {
Self::new_with_chain_config(chain_length, ChainConfig::default()).await
}

pub async fn new_with_chain_config(chain_length: u64, chain_config: ChainConfig) -> Self {
// This allows for testing voluntary exits without building out a massive chain.
let mut spec = E::default_spec();
spec.shard_committee_period = 2;
Expand All @@ -78,6 +82,7 @@ impl TestRig {
.spec(spec)
.deterministic_keypairs(VALIDATOR_COUNT)
.fresh_ephemeral_store()
.chain_config(chain_config)
.build();

harness.advance_slot();
Expand Down Expand Up @@ -261,6 +266,14 @@ impl TestRig {
self.beacon_processor_tx.try_send(event).unwrap();
}

pub fn enqueue_backfill_batch(&self) {
let event = WorkEvent::chain_segment(
ChainSegmentProcessId::BackSyncBatchId(Epoch::default()),
Vec::default(),
);
self.beacon_processor_tx.try_send(event).unwrap();
}

pub fn enqueue_unaggregated_attestation(&self) {
let (attestation, subnet_id) = self.attestations.first().unwrap().clone();
self.beacon_processor_tx
Expand Down Expand Up @@ -873,3 +886,49 @@ async fn test_rpc_block_reprocessing() {
// cache handle was dropped.
assert_eq!(next_block_root, rig.head_root());
}

/// Ensure that backfill batches get rate-limited and processing is scheduled at specified intervals.
#[tokio::test]
async fn test_backfill_sync_processing() {
let mut rig = TestRig::new(SMALL_CHAIN).await;
// Note: to verify the exact event times in an integration test is not straight forward here
// (not straight forward to manipulate `TestingSlotClock` due to cloning of `SlotClock` in code)
// and makes the test very slow, hence timing calculation is unit tested separately in
// `work_reprocessing_queue`.
for _ in 0..1 {
rig.enqueue_backfill_batch();
// ensure queued batch is not processed until later
rig.assert_no_events_for(Duration::from_millis(100)).await;
// A new batch should be processed within a slot.
rig.assert_event_journal_with_timeout(
&[CHAIN_SEGMENT_BACKFILL, WORKER_FREED, NOTHING_TO_DO],
rig.chain.slot_clock.slot_duration(),
)
.await;
}
}

/// Ensure that backfill batches get processed as fast as they can when rate-limiting is disabled.
#[tokio::test]
async fn test_backfill_sync_processing_rate_limiting_disabled() {
let chain_config = ChainConfig {
enable_backfill_rate_limiting: false,
..Default::default()
};
let mut rig = TestRig::new_with_chain_config(SMALL_CHAIN, chain_config).await;

for _ in 0..3 {
rig.enqueue_backfill_batch();
}

// ensure all batches are processed
rig.assert_event_journal_with_timeout(
&[
CHAIN_SEGMENT_BACKFILL,
CHAIN_SEGMENT_BACKFILL,
CHAIN_SEGMENT_BACKFILL,
],
Duration::from_millis(100),
)
.await;
}
Loading

0 comments on commit bb1916e

Please sign in to comment.