Skip to content

Commit

Permalink
feat: consensus state skeleton
Browse files Browse the repository at this point in the history
  • Loading branch information
merklefruit committed Jun 26, 2024
1 parent 256130a commit 43e0a2c
Show file tree
Hide file tree
Showing 9 changed files with 313 additions and 61 deletions.
7 changes: 2 additions & 5 deletions bolt-sidecar/src/client/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,16 +179,13 @@ impl DerefMut for RpcClient {
#[cfg(test)]
mod tests {
use alloy_consensus::constants::ETH_TO_WEI;
use alloy_node_bindings::{Anvil, AnvilInstance};
use alloy_primitives::{uint, Uint};
use alloy_rpc_types::EIP1186AccountProofResponse;
use reth_primitives::B256;

use super::*;
use crate::test_util::launch_anvil;

fn launch_anvil() -> AnvilInstance {
Anvil::new().block_time(1).spawn()
}
use super::*;

#[tokio::test]
async fn test_rpc_client() {
Expand Down
25 changes: 5 additions & 20 deletions bolt-sidecar/src/crypto/bls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,28 +105,13 @@ fn sign_with_prefix(key: &BlsSecretKey, data: &[u8]) -> Signature {

#[cfg(test)]
mod tests {
use crate::crypto::bls::SignerBLS;
use blst::min_pk::SecretKey;

use super::SignableBLS;
use super::Signer;

fn test_bls_secret_key() -> SecretKey {
SecretKey::key_gen(&[0u8; 32], &[]).unwrap()
}

struct TestSignableData {
data: Vec<u8>,
}

impl SignableBLS for TestSignableData {
fn digest(&self) -> Vec<u8> {
self.data.clone()
}
}
use crate::{
crypto::bls::{SignableBLS, Signer, SignerBLS},
test_util::{test_bls_secret_key, TestSignableData},
};

#[test]
fn test_signer() {
fn test_bls_signer() {
let key = test_bls_secret_key();
let pubkey = key.sk_to_pk();
let signer = Signer::new(key);
Expand Down
17 changes: 8 additions & 9 deletions bolt-sidecar/src/crypto/ecdsa.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,21 +55,20 @@ impl ECDSASigner {

#[cfg(test)]
mod tests {
use super::{ECDSASigner, SignableECDSA};
use secp256k1::{PublicKey, SecretKey};
use crate::test_util::TestSignableData;

impl SignableECDSA for String {
fn digest(&self) -> secp256k1::Message {
secp256k1::Message::from_digest_slice(self.as_bytes()).unwrap()
}
}
use super::ECDSASigner;
use secp256k1::{PublicKey, SecretKey};

#[test]
fn test_signer() {
fn test_ecdsa_signer() {
let secp256k1_key = SecretKey::from_slice(&[1; 32]).unwrap();
let signer = ECDSASigner::new(secp256k1_key);

let message = "hello world".to_string();
let message = TestSignableData {
data: vec![1, 2, 3, 4],
};

let signature = signer.sign_ecdsa(&message);
let pubkey = PublicKey::from_secret_key(&secp256k1::Secp256k1::new(), &secp256k1_key);

Expand Down
4 changes: 4 additions & 0 deletions bolt-sidecar/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,7 @@ pub mod primitives;

/// State management and fetching for EVM simulation
pub mod state;

/// Utilities for testing
#[cfg(test)]
mod test_util;
45 changes: 45 additions & 0 deletions bolt-sidecar/src/state/consensus.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#![allow(missing_docs)]
#![allow(unused_variables)]
#![allow(missing_debug_implementations)]

use beacon_api_client::{mainnet::Client, BlockId};
use ethereum_consensus::deneb::BeaconBlockHeader;
use reqwest::Url;

use crate::primitives::ChainHead;

#[derive(Debug, thiserror::Error)]
pub enum ConsensusError {
#[error("beacon API error: {0}")]
BeaconApiError(#[from] beacon_api_client::Error),
}

pub struct ConsensusState {
beacon_api_client: Client,
header: BeaconBlockHeader,
}

impl ConsensusState {
/// Create a new `ConsensusState` with the given beacon client HTTP URL.
pub fn new(url: &str) -> Self {
let url = Url::parse(url).expect("valid beacon client URL");
let beacon_api_client = Client::new(url);

Self {
beacon_api_client,
header: BeaconBlockHeader::default(),
}
}

/// Update the latest head and fetch the relevant data from the beacon chain.
pub async fn update_head(&mut self, head: ChainHead) -> Result<(), ConsensusError> {
let update = self
.beacon_api_client
.get_beacon_header(BlockId::Slot(head.slot()))
.await?;

self.header = update.header.message;

Ok(())
}
}
29 changes: 29 additions & 0 deletions bolt-sidecar/src/state/fetcher.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
#![allow(missing_docs)]
#![allow(unused_variables)]
#![allow(missing_debug_implementations)]

use std::{collections::HashMap, time::Duration};

use alloy_eips::BlockNumberOrTag;
Expand All @@ -16,6 +20,7 @@ const MAX_RETRIES: u32 = 8;
const RETRY_BACKOFF_MS: u64 = 200;

/// A trait for fetching state updates.
#[async_trait::async_trait]
pub trait StateFetcher {
async fn get_state_update(
&self,
Expand Down Expand Up @@ -52,6 +57,7 @@ impl StateClient {
}
}

#[async_trait::async_trait]
impl StateFetcher for StateClient {
// TODO: should this be durable i.e. retries?
// Yes
Expand Down Expand Up @@ -166,3 +172,26 @@ impl StateFetcher for StateClient {
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::test_util::launch_anvil;

#[tokio::test]
async fn test_state_client() {
let anvil = launch_anvil();
let client = StateClient::new(&anvil.endpoint(), 8);

let address = anvil.addresses().first().unwrap();
let state = client.get_account_state(address, None).await.unwrap();
assert_eq!(state.balance, U256::from(10000000000000000000000u128));
assert_eq!(state.transaction_count, 0);

let head = client.get_head().await.unwrap();
assert_eq!(head, 0);

let basefee = client.get_basefee(None).await.unwrap();
assert_eq!(basefee, 1_000_000_000);
}
}
118 changes: 118 additions & 0 deletions bolt-sidecar/src/state/head_tracker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
use std::time::Duration;

use alloy_rpc_types_beacon::events::HeadEvent;
use beacon_api_client::{mainnet::Client, Topic};
use futures::StreamExt;
use reqwest::Url;
use tokio::{sync::broadcast, task::AbortHandle};

/// Simple actor to keep track of the most recent head of the beacon chain
/// and broadcast updates to its subscribers.
///
/// Durability: the tracker will always attempt to reconnect to the provided
/// beacon client URL in case of disconnection or other errors.
#[derive(Debug)]
pub struct HeadTracker {
/// Channel to receive updates of the "Head" beacon topic
new_heads_rx: broadcast::Receiver<HeadEvent>,
/// Handle to the background task that listens for new head events.
/// Kept to allow for graceful shutdown.
quit: AbortHandle,
}

/// A topic for subscribing to new head events
#[derive(Debug)]
pub struct NewHeadsTopic;

impl Topic for NewHeadsTopic {
const NAME: &'static str = "head";

type Data = HeadEvent;
}

impl HeadTracker {
/// Create a new `HeadTracker` with the given beacon client HTTP URL and
/// start listening for new head events in the background
pub fn start(beacon_url: &str) -> Self {
let beacon_client = Client::new(Url::parse(beacon_url).expect("valid beacon url"));
let (new_heads_tx, new_heads_rx) = broadcast::channel(32);

let task = tokio::spawn(async move {
loop {
let mut event_stream = match beacon_client.get_events::<NewHeadsTopic>().await {
Ok(events) => events,
Err(err) => {
tracing::warn!("failed to subscribe to new heads topic: {:?}", err);
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
};

let event = match event_stream.next().await {
Some(Ok(event)) => event,
Some(Err(err)) => {
tracing::warn!("error reading new head event stream: {:?}", err);
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
None => {
tracing::warn!("new head event stream ended, retrying...");
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
};

if let Err(e) = new_heads_tx.send(event) {
tracing::warn!("failed to broadcast new head event to subscribers: {:?}", e);
}
}
});

Self {
new_heads_rx,
quit: task.abort_handle(),
}
}

/// Stop the tracker and cleanup resources
pub fn stop(self) {
self.quit.abort();
}

/// Get the next head event from the tracker
pub async fn next_head(&mut self) -> Result<HeadEvent, broadcast::error::RecvError> {
self.new_heads_rx.recv().await
}

/// Subscribe to new head events from the tracker
///
/// The returned channel will NOT contain any previously emitted events cached in
/// the tracker, but only new ones received after the call to this method
pub fn subscribe_new_heads(&self) -> broadcast::Receiver<HeadEvent> {
self.new_heads_rx.resubscribe()
}
}

#[cfg(test)]
mod tests {
use crate::{state::head_tracker::HeadTracker, test_util::try_get_beacon_api_url};

#[tokio::test]
async fn test_fetch_next_beacon_head() -> eyre::Result<()> {
let _ = tracing_subscriber::fmt::try_init();

let Some(url) = try_get_beacon_api_url().await else {
tracing::warn!("skipping test: beacon API URL is not reachable");
return Ok(());
};

let mut tracker = HeadTracker::start(url);

let head = tracker.next_head().await?;

assert!(head.slot > 0);
assert!(!head.block.is_empty());

Ok(())
}
}
Loading

0 comments on commit 43e0a2c

Please sign in to comment.