diff --git a/bolt-sidecar/src/driver.rs b/bolt-sidecar/src/driver.rs index 0bdc6041..b13ad005 100644 --- a/bolt-sidecar/src/driver.rs +++ b/bolt-sidecar/src/driver.rs @@ -1,3 +1,4 @@ +use core::fmt; use std::time::{Duration, Instant}; use alloy::{ @@ -5,6 +6,11 @@ use alloy::{ signers::{local::PrivateKeySigner, Signer as SignerECDSA}, }; use beacon_api_client::mainnet::Client as BeaconClient; +use ethereum_consensus::{ + clock::{self, SlotStream, SystemTimeProvider}, + phase0::mainnet::SLOTS_PER_EPOCH, +}; +use futures::StreamExt; use tokio::sync::mpsc; use tracing::{debug, error, info, warn}; @@ -24,7 +30,6 @@ use crate::{ }; /// The driver for the sidecar, responsible for managing the main event loop. -#[derive(Debug)] pub struct SidecarDriver { head_tracker: HeadTracker, execution: ExecutionState, @@ -35,6 +40,24 @@ pub struct SidecarDriver { mevboost_client: MevBoostClient, api_events_rx: mpsc::Receiver, payload_requests_rx: mpsc::Receiver, + /// Stream of slots made from the consensus clock + slot_stream: SlotStream, +} + +impl fmt::Debug for SidecarDriver { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("SidecarDriver") + .field("head_tracker", &self.head_tracker) + .field("execution", &self.execution) + .field("consensus", &self.consensus) + .field("constraint_signer", &self.constraint_signer) + .field("commitment_signer", &self.commitment_signer) + .field("local_builder", &self.local_builder) + .field("mevboost_client", &self.mevboost_client) + .field("api_events_rx", &self.api_events_rx) + .field("payload_requests_rx", &self.payload_requests_rx) + .finish() + } } impl SidecarDriver { @@ -74,17 +97,27 @@ impl SidecarDriver SidecarDriver SidecarDriver { self.handle_fetch_payload_request(payload_request); } + Some(slot) = self.slot_stream.next() => { + if let Err(e) = self.consensus.update_slot(slot).await { + error!(err = ?e, "Failed to update consensus state slot"); + } + } } } } @@ -191,7 +230,7 @@ impl SidecarDriver SidecarDriver) -> fmt::Result { f.debug_struct("ConsensusState") - .field("header", &self.header) .field("epoch", &self.epoch) .field("latest_slot", &self.latest_slot) .field("latest_slot_timestamp", &self.latest_slot_timestamp) @@ -81,7 +79,6 @@ impl ConsensusState { ConsensusState { beacon_api_client, validator_indexes, - header: BeaconBlockHeader::default(), epoch: Epoch::default(), latest_slot: Default::default(), latest_slot_timestamp: Instant::now(), @@ -117,21 +114,16 @@ impl ConsensusState { } /// Update the latest head and fetch the relevant data from the beacon chain. - pub async fn update_head(&mut self, head: u64) -> Result<(), ConsensusError> { + pub async fn update_slot(&mut self, slot: u64) -> Result<(), ConsensusError> { // Reset the commitment deadline to start counting for the next slot. self.commitment_deadline = - CommitmentDeadline::new(head + 1, self.commitment_deadline_duration); - - let update = self.beacon_api_client.get_beacon_header(BlockId::Slot(head)).await?; - - self.header = update.header.message; + CommitmentDeadline::new(slot + 1, self.commitment_deadline_duration); // Update the timestamp with current time self.latest_slot_timestamp = Instant::now(); - self.latest_slot = head; + self.latest_slot = slot; - // Get the current value of slot and epoch - let slot = self.header.slot; + // Calculate the current value of epoch let epoch = slot / SLOTS_PER_EPOCH; // If the epoch has changed, update the proposer duties @@ -169,9 +161,12 @@ impl ConsensusState { #[cfg(test)] mod tests { - use super::*; use beacon_api_client::ProposerDuty; use reqwest::Url; + use tracing::warn; + + use super::*; + use crate::test_util::try_get_beacon_api_url; #[tokio::test] async fn test_find_validator_index_for_slot() { @@ -188,7 +183,6 @@ mod tests { // Create a ConsensusState with the sample proposer duties and validator indexes let state = ConsensusState { beacon_api_client: Client::new(Url::parse("http://localhost").unwrap()), - header: BeaconBlockHeader::default(), epoch: Epoch { value: 0, start_slot: 0, proposer_duties }, latest_slot_timestamp: Instant::now(), commitment_deadline: CommitmentDeadline::new(0, Duration::from_secs(1)), @@ -207,4 +201,50 @@ mod tests { Err(ConsensusError::ValidatorNotFound) )); } + + #[tokio::test] + async fn test_update_slot() -> eyre::Result<()> { + let _ = tracing_subscriber::fmt::try_init(); + + let commitment_deadline_duration = Duration::from_secs(1); + let validator_indexes = ValidatorIndexes::from(vec![100, 101, 102]); + + let Some(url) = try_get_beacon_api_url().await else { + warn!("skipping test: beacon API URL is not reachable"); + return Ok(()); + }; + + let beacon_client = BeaconClient::new(Url::parse(url).unwrap()); + + // Create the initial ConsensusState + let mut state = ConsensusState { + beacon_api_client: beacon_client, + epoch: Epoch::default(), + latest_slot: Default::default(), + latest_slot_timestamp: Instant::now(), + validator_indexes, + commitment_deadline: CommitmentDeadline::new(0, commitment_deadline_duration), + commitment_deadline_duration, + }; + + // Update the slot to 32 + state.update_slot(32).await.unwrap(); + + // Check values were updated correctly + assert_eq!(state.latest_slot, 32); + assert!(state.latest_slot_timestamp.elapsed().as_secs() < 1); + assert_eq!(state.epoch.value, 1); + assert_eq!(state.epoch.start_slot, 32); + + // Update the slot to 63, which should not update the epoch + state.update_slot(63).await.unwrap(); + + // Check values were updated correctly + assert_eq!(state.latest_slot, 63); + assert!(state.latest_slot_timestamp.elapsed().as_secs() < 1); + assert_eq!(state.epoch.value, 1); + assert_eq!(state.epoch.start_slot, 32); + + Ok(()) + } } diff --git a/bolt-sidecar/src/state/head_tracker.rs b/bolt-sidecar/src/state/head_tracker.rs index 3ca48ff5..5195881d 100644 --- a/bolt-sidecar/src/state/head_tracker.rs +++ b/bolt-sidecar/src/state/head_tracker.rs @@ -1,13 +1,22 @@ -use std::time::Duration; +use std::{ + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, + time::Duration, +}; use alloy::rpc::types::beacon::events::HeadEvent; use beacon_api_client::Topic; use futures::StreamExt; -use tokio::{sync::broadcast, task::AbortHandle}; +use tokio::{sync::broadcast, task::AbortHandle, time::sleep}; use tracing::warn; use crate::BeaconClient; +/// The delay between retries when attempting to reconnect to the beacon client +const RETRY_DELAY: Duration = Duration::from_secs(1); + /// Simple actor to keep track of the most recent head of the beacon chain /// and broadcast updates to its subscribers. /// @@ -17,6 +26,8 @@ use crate::BeaconClient; pub struct HeadTracker { /// Channel to receive updates of the "Head" beacon topic new_heads_rx: broadcast::Receiver, + /// The genesis timestamp of the beacon chain, used for calculating proposal times + beacon_genesis_timestamp: Arc, /// Handle to the background task that listens for new head events. /// Kept to allow for graceful shutdown. quit: AbortHandle, @@ -38,13 +49,29 @@ impl HeadTracker { pub fn start(beacon_client: BeaconClient) -> Self { let (new_heads_tx, new_heads_rx) = broadcast::channel(32); + let beacon_genesis_timestamp = Arc::new(AtomicU64::new(0)); + let beacon_genesis_timestamp_clone = beacon_genesis_timestamp.clone(); + let task = tokio::spawn(async move { loop { + // First, try to get the genesis timestamp and cache it. + let genesis_time = loop { + match beacon_client.get_genesis_details().await { + Ok(genesis_info) => break genesis_info.genesis_time, + Err(err) => { + warn!(?err, "failed to get genesis details"); + sleep(RETRY_DELAY).await; + continue; + } + } + }; + beacon_genesis_timestamp_clone.store(genesis_time, Ordering::Relaxed); + let mut event_stream = match beacon_client.get_events::().await { Ok(events) => events, Err(err) => { warn!(?err, "failed to subscribe to new heads topic, retrying..."); - tokio::time::sleep(Duration::from_secs(1)).await; + sleep(RETRY_DELAY).await; continue; } }; @@ -53,12 +80,12 @@ impl HeadTracker { Some(Ok(event)) => event, Some(Err(err)) => { warn!(?err, "error reading new head event stream, retrying..."); - tokio::time::sleep(Duration::from_secs(1)).await; + sleep(RETRY_DELAY).await; continue; } None => { warn!("new head event stream ended, retrying..."); - tokio::time::sleep(Duration::from_secs(1)).await; + sleep(RETRY_DELAY).await; continue; } }; @@ -69,7 +96,7 @@ impl HeadTracker { } }); - Self { new_heads_rx, quit: task.abort_handle() } + Self { new_heads_rx, beacon_genesis_timestamp, quit: task.abort_handle() } } /// Stop the tracker and cleanup resources @@ -89,6 +116,11 @@ impl HeadTracker { pub fn subscribe_new_heads(&self) -> broadcast::Receiver { self.new_heads_rx.resubscribe() } + + /// Get the genesis timestamp of the beacon chain + pub fn beacon_genesis_timestamp(&self) -> u64 { + self.beacon_genesis_timestamp.load(Ordering::Relaxed) + } } #[cfg(test)]