From 29ca9d4f6b2211ceabd702704767807974908fb3 Mon Sep 17 00:00:00 2001 From: thedevbirb Date: Thu, 20 Jun 2024 14:56:08 +0200 Subject: [PATCH] chore(sidecar): call trace manager cleanup and docs Co-authored-by: nicolas --- bolt-sidecar/.gitignore | 1 + bolt-sidecar/Cargo.lock | 1 + bolt-sidecar/Cargo.toml | 1 + .../src/builder/call_trace_manager.rs | 77 ++++-- bolt-sidecar/src/builder/mod.rs | 5 +- bolt-sidecar/src/builder/state_root.rs | 260 ++++-------------- bolt-sidecar/src/builder/template.rs | 1 + bolt-sidecar/src/client/rpc.rs | 30 +- 8 files changed, 145 insertions(+), 231 deletions(-) diff --git a/bolt-sidecar/.gitignore b/bolt-sidecar/.gitignore index 2f7896d1..5f32e705 100644 --- a/bolt-sidecar/.gitignore +++ b/bolt-sidecar/.gitignore @@ -1 +1,2 @@ target/ +.env \ No newline at end of file diff --git a/bolt-sidecar/Cargo.lock b/bolt-sidecar/Cargo.lock index 53c86e4b..853b68c4 100644 --- a/bolt-sidecar/Cargo.lock +++ b/bolt-sidecar/Cargo.lock @@ -1228,6 +1228,7 @@ dependencies = [ "cb-common", "cb-crypto", "clap", + "dotenvy", "ethereum-consensus", "eyre", "futures", diff --git a/bolt-sidecar/Cargo.toml b/bolt-sidecar/Cargo.toml index db684591..b608177f 100644 --- a/bolt-sidecar/Cargo.toml +++ b/bolt-sidecar/Cargo.toml @@ -60,6 +60,7 @@ hex = "0.4.3" eyre = "0.6.12" thiserror = "1.0" rand = "0.8.5" +dotenvy = "0.15.7" # tracing tracing = "0.1.40" diff --git a/bolt-sidecar/src/builder/call_trace_manager.rs b/bolt-sidecar/src/builder/call_trace_manager.rs index 8fe88d4b..8d7fcf1a 100644 --- a/bolt-sidecar/src/builder/call_trace_manager.rs +++ b/bolt-sidecar/src/builder/call_trace_manager.rs @@ -1,19 +1,21 @@ +//! Module that defines the [CallTraceManager] actor, which is responsible for +//! handling trace requests for transactions and accumulating the state diffs +//! for each block that is traced. + use std::{ collections::{HashMap, VecDeque}, - hash::Hash, pin::Pin, - task::Poll, + task::{Context, Poll}, }; -use alloy_eips::BlockId; -use alloy_primitives::{Address, BlockNumber, U64}; +use alloy_primitives::{BlockNumber, U64}; use alloy_rpc_types::{ state::{AccountOverride, StateOverride}, TransactionRequest, }; use alloy_rpc_types_trace::geth::{ AccountState, GethDebugBuiltInTracerType, GethDebugTracerType, GethDebugTracingCallOptions, - GethDebugTracingOptions, GethDefaultTracingOptions, GethTrace, PreStateFrame, TraceResult, + GethDebugTracingOptions, GethDefaultTracingOptions, GethTrace, PreStateFrame, }; use alloy_transport::TransportResult; use futures::{stream::FuturesOrdered, Future, StreamExt}; @@ -31,7 +33,9 @@ pub enum TraceCommand { /// considering the given block as starting point and accumulating /// the results on a state diff map. AddTrace { + /// The transaction to trace transaction: TransactionRequest, + /// The block in which the transaction should be simulated on block: BlockNumber, }, /// Request to get the accumulated state diffs for a bundle of transactions @@ -40,17 +44,22 @@ pub enum TraceCommand { /// The result is sent back through a response channel as soon as the last /// pending trace request for that block has been processed. FetchAccumulatedDiffs { + /// The block of the accumulated diffs to fetch block: BlockNumber, + /// The oneshot channel to receive the accumulated diffs res: oneshot::Sender>, }, } +/// The handle to control the [CallTraceManager] actor in a +/// thread-safe, non-blocking way. #[derive(Debug, Clone)] pub struct CallTraceHandle { cmd_tx: mpsc::Sender, } impl CallTraceHandle { + /// Request the trace for the given transaction on the provided block pub async fn add_trace(&self, transaction: TransactionRequest, block: BlockNumber) { let _ = self .cmd_tx @@ -58,17 +67,29 @@ impl CallTraceHandle { .await; } + /// Request the accumulated state diffs for a given block from previously + /// traced transactions. + /// + /// If the diffs are not available yet, this function + /// will hang until the last transaction has been processed and the diffs are ready. pub async fn fetch_accumulated_diffs(&self, block: BlockNumber) -> Option { let (res_tx, res_rx) = oneshot::channel(); let _ = self .cmd_tx - .send(TraceCommand::FetchAccumulatedDiffs { block, res: res_tx }); + .send(TraceCommand::FetchAccumulatedDiffs { block, res: res_tx }) + .await; res_rx.await.unwrap() } } +/// The [CallTraceManager] actor is responsible for handling trace requests for transactions +/// and accumulating the state diffs for each block that is traced. It listens for incoming +/// trace requests and processes them in the background using the given RPC client. +/// +/// The actor is implemented as a future that can be polled in the background. #[derive(Debug)] +#[must_use = "CallTraceManager does nothing unless polled"] pub struct CallTraceManager { rpc: RpcClient, cmd_rx: mpsc::Receiver, @@ -80,6 +101,33 @@ pub struct CallTraceManager { type TraceFuture = JoinHandle<(BlockNumber, TransportResult)>; +impl Future for CallTraceManager { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + + loop { + match this.cmd_rx.poll_recv(cx) { + Poll::Ready(Some(cmd)) => this.handle_new_trace_command(cmd), + Poll::Ready(None) => return Poll::Ready(()), + Poll::Pending => return Poll::Pending, + } + + match this.pending_traces.poll_next_unpin(cx) { + Poll::Ready(Some(Ok((block, trace_result)))) => { + this.handle_trace_result(block, trace_result) + } + Poll::Ready(Some(Err(e))) => { + tracing::error!(err = ?e, "Error while tracing transaction"); + } + Poll::Ready(None) => return Poll::Ready(()), + Poll::Pending => return Poll::Pending, + } + } + } +} + impl CallTraceManager { /// Creates a new [CallTraceManager] instance, which will listen for incoming /// trace requests and process them in the background using the given RPC client. @@ -100,22 +148,11 @@ impl CallTraceManager { ) } - /// Runs the [CallTraceManager] actor, processing incoming trace requests and - /// accumulating the resulting state diffs for each block in the background. - pub async fn run(mut self) { - tokio::select! { - Some(request) = self.cmd_rx.recv() => { - self.handle_new_trace_command(request); - }, - Some(Ok((block, trace_result))) = self.pending_traces.next() => { - self.handle_trace_result(block, trace_result); - } - } - } - fn handle_new_trace_command(&mut self, cmd: TraceCommand) { match cmd { TraceCommand::AddTrace { transaction, block } => { + tracing::debug!(block = block, "Received new transaction trace request"); + // TODO: handle the case where the block is in the future. // Requires a execution block interval ticker. @@ -133,6 +170,8 @@ impl CallTraceManager { } } TraceCommand::FetchAccumulatedDiffs { block, res } => { + tracing::debug!(block = block, "Fetching accumulated state diffs"); + if self.pending_traces.is_empty() { // If there are no pending traces for the given block, and the // accumulated state diffs are already available, send the result diff --git a/bolt-sidecar/src/builder/mod.rs b/bolt-sidecar/src/builder/mod.rs index cc237ee5..d70f2976 100644 --- a/bolt-sidecar/src/builder/mod.rs +++ b/bolt-sidecar/src/builder/mod.rs @@ -1,11 +1,8 @@ -#![allow(dead_code)] -#![allow(missing_docs)] -#![allow(unused_imports)] - pub mod template; pub use template::BlockTemplate; pub mod payload_builder; + pub mod state_root; pub mod call_trace_manager; diff --git a/bolt-sidecar/src/builder/state_root.rs b/bolt-sidecar/src/builder/state_root.rs index c248298c..bc855835 100644 --- a/bolt-sidecar/src/builder/state_root.rs +++ b/bolt-sidecar/src/builder/state_root.rs @@ -1,65 +1,28 @@ -use std::{collections::HashMap, pin::Pin, task::Poll}; - -use alloy_eips::BlockId; -use alloy_primitives::{keccak256, Address, BlockNumber}; -use alloy_rpc_types::TransactionRequest; -use alloy_rpc_types_trace::geth::{GethTrace, TraceResult}; -use futures::{stream::FuturesOrdered, Future}; -use tokio::sync::{mpsc, oneshot}; - -use crate::RpcClient; - -// Steps: -// - Get previous block state root -// - Create partial MPT from state root -// - Simulate fallback block using trace_callMany RPC endpoint (or multiple debug_traceCall in Geth) -// - Get state diffs of touched accounts -// - Get account proofs for touched accounts (at the previous block) from local EL node, using eth_getProof -// - Load account proofs into partial MPT -// - Apply state diffs to partial MPT -// - Build new state root from partial MPT -// - Build new valid header with new state root -// - ? -// - Profit - -// Components: -// 1. Add new methods to RpcClient: `eth_getProof`, `debug_traceCall`, `trace_callMany` -- DONE except `debug_traceCall` -// 2. Import, fork or re-implement an MPT library with "manual enough" access -// 3. Implement "state root calculator" using RpcClient and MPT library above - -// async fn debug_trace_call_many( -// &self, -// bundles: Vec, -// state_context: Option, -// opts: Option, -// ) -> RpcResult>> { -// let _permit = self.acquire_trace_permit().await; -// Ok(Self::debug_trace_call_many(self, bundles, state_context, opts).await?) -// } +//! State root builder, responsible for constructing a new block's state_root +//! from a series of transaction traces and storage proofs. #[cfg(test)] mod tests { use std::collections::HashSet; - use alloy_consensus::Account; use alloy_primitives::{keccak256, B256, U256}; - use alloy_rpc_types_trace::{ - geth::{ - GethDebugBuiltInTracerType, GethDebugTracerConfig, GethDebugTracerType, - GethDebugTracingCallOptions, GethDebugTracingOptions, GethDefaultTracingOptions, - GethTrace, PreStateConfig, PreStateFrame, - }, - parity::{ChangedType, Delta, TraceType}, - }; - use partial_mpt::{AccountData, StateTrie}; + use partial_mpt::StateTrie; - use crate::client::rpc::RpcClient; + use crate::{builder::CallTraceManager, client::rpc::RpcClient}; #[tokio::test] async fn test_trace_call() -> eyre::Result<()> { - let client = - RpcClient::new("https://nd-357-128-191.p2pify.com/31a0ef20aa969b0d191eb99065458caa"); + dotenvy::dotenv().ok(); + tracing_subscriber::fmt::init(); + + tracing::info!("Starting test_trace_call"); + + let rpc_url = std::env::var("RPC_URL").expect("RPC_URL must be set"); + let client = RpcClient::new(&rpc_url); + + let (call_trace_manager, call_trace_handler) = CallTraceManager::new(&rpc_url); + tokio::spawn(call_trace_manager); // https://etherscan.io/block/20125606 let block_number = 20125606; @@ -69,20 +32,6 @@ mod tests { let mut state_trie = StateTrie::from_root(latest_state_root); - // let mut trace_types = HashSet::new(); - // trace_types.insert(TraceType::StateDiff); - // trace_types.insert(TraceType::Trace); - - let mut debug_trace_call_responses = Vec::new(); - - let mut geth_debug_tracing_options = GethDebugTracingOptions::default().with_tracer( - GethDebugTracerType::BuiltInTracer(GethDebugBuiltInTracerType::PreStateTracer), - ); - geth_debug_tracing_options.config = - GethDefaultTracingOptions::default().with_disable_storage(false); - let geth_debug_tracing_call_options = GethDebugTracingCallOptions::default() - .with_tracing_options(geth_debug_tracing_options.clone()); - let tx_requests = latest_block .transactions .as_transactions() @@ -92,165 +41,62 @@ mod tests { .collect::>(); for tx in tx_requests.iter() { - let debug_trace_call_res = client - .debug_trace_call( - tx.clone(), - Some(block_number), - Some(geth_debug_tracing_call_options.clone()), - ) - .await?; - debug_trace_call_responses.push(debug_trace_call_res); + call_trace_handler.add_trace(tx.clone(), block_number).await; } - for (i, res) in debug_trace_call_responses.iter().enumerate() { - // println!("res[{}]: {:#?}", i, res); - if let GethTrace::PreStateTracer(trace) = res { - println!("trace[{}]: {:#?}", i, trace); - } - // println!("debug_trace_call_responses[{}]: {:#?}", i, res); - } + let diffs = call_trace_handler + .fetch_accumulated_diffs(block_number) + .await + .unwrap(); - // get the list of touched accounts from the state diffs - let touched_accounts = debug_trace_call_responses - .iter() - .map(|trace| match trace { - GethTrace::PreStateTracer(PreStateFrame::Default(frame)) => { - frame.0.keys().cloned().collect::>() - } - _ => vec![], - }) - .flatten() - .collect::>(); - - println!("Touched accounts: {:?}", touched_accounts.len()); + println!("Touched accounts: {:?}", diffs.keys().len()); // load the touched account proofs in the trie let start = std::time::Instant::now(); - for account in touched_accounts.clone() { + for account in diffs.keys().collect::>().clone() { let proof = client - .get_proof(account, vec![], Some(block_number)) + .get_proof(*account, vec![], Some(block_number)) .await?; state_trie.load_proof(proof).unwrap(); } + println!( "Loaded proofs for {} accounts in {:?}", - touched_accounts.len(), + diffs.keys().collect::>().len(), start.elapsed() ); // now apply state diffs to the trie - for trace in debug_trace_call_responses.iter() { - if let GethTrace::PreStateTracer(PreStateFrame::Default(frame)) = trace { - frame.0.iter().for_each(|(address, account)| { - if let Some(balance) = account.balance { - state_trie - .account_trie - .set_balance(*address, balance) - .unwrap(); - } - if let Some(nonce) = account.nonce { - state_trie - .account_trie - .set_nonce(*address, U256::from(nonce)) - .unwrap(); - } - if let Some(code) = account.code.clone() { - state_trie - .account_trie - .set_code_hash(*address, keccak256(code)) - .unwrap(); - } - for (key, value) in account.storage.iter() { - state_trie - .set_storage_value( - *address, - U256::from_be_bytes(key.0), - U256::from_be_bytes(value.0), - ) - .unwrap(); - } - }) - } else { - continue; + for (address, diff) in diffs.iter() { + if let Some(balance) = diff.balance { + state_trie + .account_trie + .set_balance(*address, balance) + .unwrap(); + } + if let Some(nonce) = diff.nonce { + state_trie + .account_trie + .set_nonce(*address, U256::from(nonce)) + .unwrap(); + } + if let Some(code) = diff.code.clone() { + state_trie + .account_trie + .set_code_hash(*address, keccak256(code)) + .unwrap(); + } + if let Some(ref state_diff) = diff.state_diff { + for (key, value) in state_diff.iter() { + state_trie + .set_storage_value( + *address, + U256::from_be_bytes(key.0), + U256::from_be_bytes(value.0), + ) + .unwrap(); + } } - - // for (address, diff) in trace.state_diff.unwrap().iter() { - // match diff.balance { - // Delta::Added(n) => { - // state_trie.account_trie.set_balance(*address, n).unwrap(); - // } - // Delta::Changed(ChangedType { to, .. }) => { - // state_trie.account_trie.set_balance(*address, to).unwrap() - // } - // Delta::Removed(_) => { - // state_trie - // .account_trie - // .set_balance(*address, U256::ZERO) - // .unwrap(); - // } - // Delta::Unchanged => { /* do nothing */ } - // } - // match diff.nonce { - // Delta::Added(n) => { - // state_trie.account_trie.set_nonce(*address, n.to()).unwrap(); - // } - // Delta::Changed(ChangedType { to, .. }) => { - // state_trie - // .account_trie - // .set_nonce(*address, to.to()) - // .unwrap(); - // } - // Delta::Removed(_) => { - // state_trie - // .account_trie - // .set_nonce(*address, U256::ZERO) - // .unwrap(); - // } - // Delta::Unchanged => { /* do nothing */ } - // } - // match diff.nonce { - // Delta::Added(n) => { - // state_trie.account_trie.set_nonce(*address, n.to()).unwrap(); - // } - // Delta::Changed(ChangedType { to, .. }) => { - // state_trie - // .account_trie - // .set_nonce(*address, to.to()) - // .unwrap(); - // } - // Delta::Removed(_) => { - // state_trie - // .account_trie - // .set_nonce(*address, U256::ZERO) - // .unwrap(); - // } - // Delta::Unchanged => { /* do nothing */ } - // } - - // // For each storage key-value pair - // for (key, value) in diff.storage.iter() { - // let key = U256::from_be_bytes(key.0); - - // match value { - // Delta::Added(n) => { - // state_trie - // .set_storage_value(*address, key, U256::from_be_bytes(n.0)) - // .unwrap(); - // } - // Delta::Changed(ChangedType { to, .. }) => { - // state_trie - // .set_storage_value(*address, key, U256::from_be_bytes(to.0)) - // .unwrap(); - // } - // Delta::Removed(_) => { - // state_trie - // .set_storage_value(*address, key, U256::ZERO) - // .unwrap(); - // } - // Delta::Unchanged => { /* do nothing */ } - // } - // } - // } } // now we can get the new state root diff --git a/bolt-sidecar/src/builder/template.rs b/bolt-sidecar/src/builder/template.rs index 10c23225..5cbe2622 100644 --- a/bolt-sidecar/src/builder/template.rs +++ b/bolt-sidecar/src/builder/template.rs @@ -154,6 +154,7 @@ impl StateDiff { } } +#[derive(Debug)] pub struct Unsigned { message: T, } diff --git a/bolt-sidecar/src/client/rpc.rs b/bolt-sidecar/src/client/rpc.rs index 65a74d21..f69a6ba7 100644 --- a/bolt-sidecar/src/client/rpc.rs +++ b/bolt-sidecar/src/client/rpc.rs @@ -2,6 +2,7 @@ //! It provides a simple interface to interact with the Execution layer JSON-RPC API. use alloy_rpc_types_trace::geth::{GethDebugTracingCallOptions, GethTrace}; +use futures::future::join_all; use std::{ collections::HashSet, ops::{Deref, DerefMut}, @@ -11,7 +12,7 @@ use std::{ use alloy::ClientBuilder; use alloy_eips::BlockNumberOrTag; use alloy_primitives::{Address, B256, U256, U64}; -use alloy_rpc_client as alloy; +use alloy_rpc_client::{self as alloy, Waiter}; use alloy_rpc_types::{Block, EIP1186AccountProofResponse, FeeHistory, TransactionRequest}; use alloy_rpc_types_trace::parity::{TraceResults, TraceType}; use alloy_transport::TransportResult; @@ -106,6 +107,32 @@ impl RpcClient { Ok(self.0.request("eth_getProof", params).await?) } + /// Perform multiple `eth_getProof` calls in a single batch. + pub async fn get_proof_batched( + &self, + opts: Vec<(Address, Vec, BlockNumberOrTag)>, + ) -> TransportResult> { + let mut batch = self.0.new_batch(); + + let mut proofs: Vec> = Vec::new(); + + for params in opts { + proofs.push( + batch + .add_call("eth_getProof", ¶ms) + .expect("Correct parameters"), + ); + } + + batch.send().await?; + + // Important: join_all will preserve the order of the proofs + join_all(proofs) + .await + .into_iter() + .collect::, _>>() + } + /// Performs multiple call traces on top of the same block. i.e. transaction n will be executed /// on top of a pending block with all n-1 transactions applied (traced) first. /// @@ -121,6 +148,7 @@ impl RpcClient { Ok(self.0.request("trace_callMany", params).await?) } + /// Performs the `debug_traceCall` JSON-RPC method. pub async fn debug_trace_call( &self, tx: TransactionRequest,