Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: commitment deadline in case of missed block #188

Merged
merged 5 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 47 additions & 12 deletions bolt-sidecar/src/driver.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
use core::fmt;
use std::time::{Duration, Instant};

use alloy::{
rpc::types::beacon::events::HeadEvent,
signers::{local::PrivateKeySigner, Signer as SignerECDSA},
};
use beacon_api_client::mainnet::Client as BeaconClient;
use ethereum_consensus::{
clock::{self, SlotStream, SystemTimeProvider},
phase0::mainnet::SLOTS_PER_EPOCH,
};
use futures::StreamExt;
use tokio::sync::mpsc;
use tracing::{debug, error, info, warn};

Expand All @@ -24,7 +30,6 @@ use crate::{
};

/// The driver for the sidecar, responsible for managing the main event loop.
#[derive(Debug)]
pub struct SidecarDriver<C, BLS, ECDSA> {
head_tracker: HeadTracker,
execution: ExecutionState<C>,
Expand All @@ -35,6 +40,24 @@ pub struct SidecarDriver<C, BLS, ECDSA> {
mevboost_client: MevBoostClient,
api_events_rx: mpsc::Receiver<CommitmentEvent>,
payload_requests_rx: mpsc::Receiver<FetchPayloadRequest>,
/// Stream of slots made from the consensus clock
slot_stream: SlotStream<SystemTimeProvider>,
}

impl fmt::Debug for SidecarDriver<StateClient, BlsSigner, PrivateKeySigner> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SidecarDriver")
.field("head_tracker", &self.head_tracker)
.field("execution", &self.execution)
.field("consensus", &self.consensus)
.field("constraint_signer", &self.constraint_signer)
.field("commitment_signer", &self.commitment_signer)
.field("local_builder", &self.local_builder)
.field("mevboost_client", &self.mevboost_client)
.field("api_events_rx", &self.api_events_rx)
.field("payload_requests_rx", &self.payload_requests_rx)
.finish()
}
}

impl SidecarDriver<StateClient, BlsSigner, PrivateKeySigner> {
Expand Down Expand Up @@ -74,17 +97,27 @@ impl<C: StateFetcher, BLS: SignerBLS, ECDSA: SignerECDSA> SidecarDriver<C, BLS,
let (api_events_tx, api_events_rx) = mpsc::channel(1024);
CommitmentsApiServer::new(api_addr).run(api_events_tx).await;

// TODO: this can be replaced with ethereum_consensus::clock::from_system_time()
// but using beacon node events is easier to work on a custom devnet for now
// (as we don't need to specify genesis time and slot duration)
let head_tracker = HeadTracker::start(beacon_client.clone());

// TODO: head tracker initializes the genesis timestamp with '0' value
// we should add an async fn to fetch the value for safety
// Initialize the consensus clock.
let consensus_clock = clock::from_system_time(
head_tracker.beacon_genesis_timestamp(),
cfg.chain.slot_time(),
SLOTS_PER_EPOCH,
);
let slot_stream = consensus_clock.into_stream();

let consensus = ConsensusState::new(
beacon_client.clone(),
beacon_client,
cfg.validator_indexes.clone(),
cfg.chain.commitment_deadline(),
);

// TODO: this can be replaced with ethereum_consensus::clock::from_system_time()
// but using beacon node events is easier to work on a custom devnet for now
// (as we don't need to specify genesis time and slot duration)
let head_tracker = HeadTracker::start(beacon_client);

let builder_proxy_cfg = BuilderProxyConfig {
mevboost_url: cfg.mevboost_url.clone(),
server_port: cfg.mevboost_proxy_port,
Expand All @@ -110,6 +143,7 @@ impl<C: StateFetcher, BLS: SignerBLS, ECDSA: SignerECDSA> SidecarDriver<C, BLS,
mevboost_client,
api_events_rx,
payload_requests_rx,
slot_stream,
})
}

Expand All @@ -132,6 +166,11 @@ impl<C: StateFetcher, BLS: SignerBLS, ECDSA: SignerECDSA> SidecarDriver<C, BLS,
Some(payload_request) = self.payload_requests_rx.recv() => {
self.handle_fetch_payload_request(payload_request);
}
Some(slot) = self.slot_stream.next() => {
if let Err(e) = self.consensus.update_slot(slot).await {
error!(err = ?e, "Failed to update consensus state slot");
}
}
}
}
}
Expand Down Expand Up @@ -191,7 +230,7 @@ impl<C: StateFetcher, BLS: SignerBLS, ECDSA: SignerECDSA> SidecarDriver<C, BLS,
};
}

/// Handle a new head event, updating the execution and consensus state.
/// Handle a new head event, updating the execution state.
async fn handle_new_head_event(&mut self, head_event: HeadEvent) {
let slot = head_event.slot;
info!(slot, "Received new head event");
Expand All @@ -200,10 +239,6 @@ impl<C: StateFetcher, BLS: SignerBLS, ECDSA: SignerECDSA> SidecarDriver<C, BLS,
if let Err(e) = self.execution.update_head(None, slot).await {
error!(err = ?e, "Failed to update execution state head");
}

if let Err(e) = self.consensus.update_head(slot).await {
error!(err = ?e, "Failed to update consensus state head");
}
}

/// Handle a commitment deadline event, submitting constraints to the MEV-Boost service.
Expand Down
72 changes: 56 additions & 16 deletions bolt-sidecar/src/state/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use std::{
time::{Duration, Instant},
};

use beacon_api_client::{mainnet::Client, BlockId, ProposerDuty};
use ethereum_consensus::{deneb::BeaconBlockHeader, phase0::mainnet::SLOTS_PER_EPOCH};
use beacon_api_client::{mainnet::Client, ProposerDuty};
use ethereum_consensus::phase0::mainnet::SLOTS_PER_EPOCH;

use super::CommitmentDeadline;
use crate::{
Expand Down Expand Up @@ -41,7 +41,6 @@ pub struct Epoch {
#[allow(missing_debug_implementations)]
pub struct ConsensusState {
beacon_api_client: Client,
header: BeaconBlockHeader,
epoch: Epoch,
validator_indexes: ValidatorIndexes,
// Timestamp of when the latest slot was received
Expand All @@ -62,7 +61,6 @@ pub struct ConsensusState {
impl fmt::Debug for ConsensusState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ConsensusState")
.field("header", &self.header)
.field("epoch", &self.epoch)
.field("latest_slot", &self.latest_slot)
.field("latest_slot_timestamp", &self.latest_slot_timestamp)
Expand All @@ -81,7 +79,6 @@ impl ConsensusState {
ConsensusState {
beacon_api_client,
validator_indexes,
header: BeaconBlockHeader::default(),
epoch: Epoch::default(),
latest_slot: Default::default(),
latest_slot_timestamp: Instant::now(),
Expand Down Expand Up @@ -117,21 +114,16 @@ impl ConsensusState {
}

/// Update the latest head and fetch the relevant data from the beacon chain.
pub async fn update_head(&mut self, head: u64) -> Result<(), ConsensusError> {
pub async fn update_slot(&mut self, slot: u64) -> Result<(), ConsensusError> {
// Reset the commitment deadline to start counting for the next slot.
self.commitment_deadline =
CommitmentDeadline::new(head + 1, self.commitment_deadline_duration);

let update = self.beacon_api_client.get_beacon_header(BlockId::Slot(head)).await?;

self.header = update.header.message;
CommitmentDeadline::new(slot + 1, self.commitment_deadline_duration);

// Update the timestamp with current time
self.latest_slot_timestamp = Instant::now();
self.latest_slot = head;
self.latest_slot = slot;

// Get the current value of slot and epoch
let slot = self.header.slot;
// Calculate the current value of epoch
let epoch = slot / SLOTS_PER_EPOCH;

// If the epoch has changed, update the proposer duties
Expand Down Expand Up @@ -169,9 +161,12 @@ impl ConsensusState {

#[cfg(test)]
mod tests {
use super::*;
use beacon_api_client::ProposerDuty;
use reqwest::Url;
use tracing::warn;

use super::*;
use crate::test_util::try_get_beacon_api_url;

#[tokio::test]
async fn test_find_validator_index_for_slot() {
Expand All @@ -188,7 +183,6 @@ mod tests {
// Create a ConsensusState with the sample proposer duties and validator indexes
let state = ConsensusState {
beacon_api_client: Client::new(Url::parse("http://localhost").unwrap()),
header: BeaconBlockHeader::default(),
epoch: Epoch { value: 0, start_slot: 0, proposer_duties },
latest_slot_timestamp: Instant::now(),
commitment_deadline: CommitmentDeadline::new(0, Duration::from_secs(1)),
Expand All @@ -207,4 +201,50 @@ mod tests {
Err(ConsensusError::ValidatorNotFound)
));
}

#[tokio::test]
async fn test_update_slot() -> eyre::Result<()> {
let _ = tracing_subscriber::fmt::try_init();

let commitment_deadline_duration = Duration::from_secs(1);
let validator_indexes = ValidatorIndexes::from(vec![100, 101, 102]);

let Some(url) = try_get_beacon_api_url().await else {
warn!("skipping test: beacon API URL is not reachable");
return Ok(());
};

let beacon_client = BeaconClient::new(Url::parse(url).unwrap());

// Create the initial ConsensusState
let mut state = ConsensusState {
beacon_api_client: beacon_client,
epoch: Epoch::default(),
latest_slot: Default::default(),
latest_slot_timestamp: Instant::now(),
validator_indexes,
commitment_deadline: CommitmentDeadline::new(0, commitment_deadline_duration),
commitment_deadline_duration,
};

// Update the slot to 32
state.update_slot(32).await.unwrap();

// Check values were updated correctly
assert_eq!(state.latest_slot, 32);
assert!(state.latest_slot_timestamp.elapsed().as_secs() < 1);
assert_eq!(state.epoch.value, 1);
assert_eq!(state.epoch.start_slot, 32);

// Update the slot to 63, which should not update the epoch
state.update_slot(63).await.unwrap();

// Check values were updated correctly
assert_eq!(state.latest_slot, 63);
assert!(state.latest_slot_timestamp.elapsed().as_secs() < 1);
assert_eq!(state.epoch.value, 1);
assert_eq!(state.epoch.start_slot, 32);

Ok(())
}
}
44 changes: 38 additions & 6 deletions bolt-sidecar/src/state/head_tracker.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
use std::time::Duration;
use std::{
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
time::Duration,
};

use alloy::rpc::types::beacon::events::HeadEvent;
use beacon_api_client::Topic;
use futures::StreamExt;
use tokio::{sync::broadcast, task::AbortHandle};
use tokio::{sync::broadcast, task::AbortHandle, time::sleep};
use tracing::warn;

use crate::BeaconClient;

/// The delay between retries when attempting to reconnect to the beacon client
const RETRY_DELAY: Duration = Duration::from_secs(1);

/// Simple actor to keep track of the most recent head of the beacon chain
/// and broadcast updates to its subscribers.
///
Expand All @@ -17,6 +26,8 @@ use crate::BeaconClient;
pub struct HeadTracker {
/// Channel to receive updates of the "Head" beacon topic
new_heads_rx: broadcast::Receiver<HeadEvent>,
/// The genesis timestamp of the beacon chain, used for calculating proposal times
beacon_genesis_timestamp: Arc<AtomicU64>,
/// Handle to the background task that listens for new head events.
/// Kept to allow for graceful shutdown.
quit: AbortHandle,
Expand All @@ -38,13 +49,29 @@ impl HeadTracker {
pub fn start(beacon_client: BeaconClient) -> Self {
let (new_heads_tx, new_heads_rx) = broadcast::channel(32);

let beacon_genesis_timestamp = Arc::new(AtomicU64::new(0));
let beacon_genesis_timestamp_clone = beacon_genesis_timestamp.clone();

let task = tokio::spawn(async move {
loop {
// First, try to get the genesis timestamp and cache it.
let genesis_time = loop {
match beacon_client.get_genesis_details().await {
Ok(genesis_info) => break genesis_info.genesis_time,
Err(err) => {
warn!(?err, "failed to get genesis details");
sleep(RETRY_DELAY).await;
continue;
}
}
};
beacon_genesis_timestamp_clone.store(genesis_time, Ordering::Relaxed);

let mut event_stream = match beacon_client.get_events::<NewHeadsTopic>().await {
Ok(events) => events,
Err(err) => {
warn!(?err, "failed to subscribe to new heads topic, retrying...");
tokio::time::sleep(Duration::from_secs(1)).await;
sleep(RETRY_DELAY).await;
continue;
}
};
Expand All @@ -53,12 +80,12 @@ impl HeadTracker {
Some(Ok(event)) => event,
Some(Err(err)) => {
warn!(?err, "error reading new head event stream, retrying...");
tokio::time::sleep(Duration::from_secs(1)).await;
sleep(RETRY_DELAY).await;
continue;
}
None => {
warn!("new head event stream ended, retrying...");
tokio::time::sleep(Duration::from_secs(1)).await;
sleep(RETRY_DELAY).await;
continue;
}
};
Expand All @@ -69,7 +96,7 @@ impl HeadTracker {
}
});

Self { new_heads_rx, quit: task.abort_handle() }
Self { new_heads_rx, beacon_genesis_timestamp, quit: task.abort_handle() }
}

/// Stop the tracker and cleanup resources
Expand All @@ -89,6 +116,11 @@ impl HeadTracker {
pub fn subscribe_new_heads(&self) -> broadcast::Receiver<HeadEvent> {
self.new_heads_rx.resubscribe()
}

/// Get the genesis timestamp of the beacon chain
pub fn beacon_genesis_timestamp(&self) -> u64 {
self.beacon_genesis_timestamp.load(Ordering::Relaxed)
}
}

#[cfg(test)]
Expand Down
Loading