Skip to content

Commit

Permalink
chore(sidecar): call trace manager cleanup and docs
Browse files Browse the repository at this point in the history
Co-authored-by: nicolas <[email protected]>
  • Loading branch information
thedevbirb and merklefruit committed Jun 22, 2024
1 parent 991ee2b commit 29ca9d4
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 231 deletions.
1 change: 1 addition & 0 deletions bolt-sidecar/.gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
target/
.env
1 change: 1 addition & 0 deletions bolt-sidecar/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bolt-sidecar/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ hex = "0.4.3"
eyre = "0.6.12"
thiserror = "1.0"
rand = "0.8.5"
dotenvy = "0.15.7"

# tracing
tracing = "0.1.40"
Expand Down
77 changes: 58 additions & 19 deletions bolt-sidecar/src/builder/call_trace_manager.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
//! Module that defines the [CallTraceManager] actor, which is responsible for
//! handling trace requests for transactions and accumulating the state diffs
//! for each block that is traced.
use std::{
collections::{HashMap, VecDeque},
hash::Hash,
pin::Pin,
task::Poll,
task::{Context, Poll},
};

use alloy_eips::BlockId;
use alloy_primitives::{Address, BlockNumber, U64};
use alloy_primitives::{BlockNumber, U64};
use alloy_rpc_types::{
state::{AccountOverride, StateOverride},
TransactionRequest,
};
use alloy_rpc_types_trace::geth::{
AccountState, GethDebugBuiltInTracerType, GethDebugTracerType, GethDebugTracingCallOptions,
GethDebugTracingOptions, GethDefaultTracingOptions, GethTrace, PreStateFrame, TraceResult,
GethDebugTracingOptions, GethDefaultTracingOptions, GethTrace, PreStateFrame,
};
use alloy_transport::TransportResult;
use futures::{stream::FuturesOrdered, Future, StreamExt};
Expand All @@ -31,7 +33,9 @@ pub enum TraceCommand {
/// considering the given block as starting point and accumulating
/// the results on a state diff map.
AddTrace {
/// The transaction to trace
transaction: TransactionRequest,
/// The block in which the transaction should be simulated on
block: BlockNumber,
},
/// Request to get the accumulated state diffs for a bundle of transactions
Expand All @@ -40,35 +44,52 @@ pub enum TraceCommand {
/// The result is sent back through a response channel as soon as the last
/// pending trace request for that block has been processed.
FetchAccumulatedDiffs {
/// The block of the accumulated diffs to fetch
block: BlockNumber,
/// The oneshot channel to receive the accumulated diffs
res: oneshot::Sender<Option<StateOverride>>,
},
}

/// The handle to control the [CallTraceManager] actor in a
/// thread-safe, non-blocking way.
#[derive(Debug, Clone)]
pub struct CallTraceHandle {
cmd_tx: mpsc::Sender<TraceCommand>,
}

impl CallTraceHandle {
/// Request the trace for the given transaction on the provided block
pub async fn add_trace(&self, transaction: TransactionRequest, block: BlockNumber) {
let _ = self
.cmd_tx
.send(TraceCommand::AddTrace { transaction, block })
.await;
}

/// Request the accumulated state diffs for a given block from previously
/// traced transactions.
///
/// If the diffs are not available yet, this function
/// will hang until the last transaction has been processed and the diffs are ready.
pub async fn fetch_accumulated_diffs(&self, block: BlockNumber) -> Option<StateOverride> {
let (res_tx, res_rx) = oneshot::channel();
let _ = self
.cmd_tx
.send(TraceCommand::FetchAccumulatedDiffs { block, res: res_tx });
.send(TraceCommand::FetchAccumulatedDiffs { block, res: res_tx })
.await;

res_rx.await.unwrap()
}
}

/// The [CallTraceManager] actor is responsible for handling trace requests for transactions
/// and accumulating the state diffs for each block that is traced. It listens for incoming
/// trace requests and processes them in the background using the given RPC client.
///
/// The actor is implemented as a future that can be polled in the background.
#[derive(Debug)]
#[must_use = "CallTraceManager does nothing unless polled"]
pub struct CallTraceManager {
rpc: RpcClient,
cmd_rx: mpsc::Receiver<TraceCommand>,
Expand All @@ -80,6 +101,33 @@ pub struct CallTraceManager {

type TraceFuture = JoinHandle<(BlockNumber, TransportResult<GethTrace>)>;

impl Future for CallTraceManager {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();

loop {
match this.cmd_rx.poll_recv(cx) {
Poll::Ready(Some(cmd)) => this.handle_new_trace_command(cmd),
Poll::Ready(None) => return Poll::Ready(()),
Poll::Pending => return Poll::Pending,
}

match this.pending_traces.poll_next_unpin(cx) {
Poll::Ready(Some(Ok((block, trace_result)))) => {
this.handle_trace_result(block, trace_result)
}
Poll::Ready(Some(Err(e))) => {
tracing::error!(err = ?e, "Error while tracing transaction");
}
Poll::Ready(None) => return Poll::Ready(()),
Poll::Pending => return Poll::Pending,
}
}
}
}

impl CallTraceManager {
/// Creates a new [CallTraceManager] instance, which will listen for incoming
/// trace requests and process them in the background using the given RPC client.
Expand All @@ -100,22 +148,11 @@ impl CallTraceManager {
)
}

/// Runs the [CallTraceManager] actor, processing incoming trace requests and
/// accumulating the resulting state diffs for each block in the background.
pub async fn run(mut self) {
tokio::select! {
Some(request) = self.cmd_rx.recv() => {
self.handle_new_trace_command(request);
},
Some(Ok((block, trace_result))) = self.pending_traces.next() => {
self.handle_trace_result(block, trace_result);
}
}
}

fn handle_new_trace_command(&mut self, cmd: TraceCommand) {
match cmd {
TraceCommand::AddTrace { transaction, block } => {
tracing::debug!(block = block, "Received new transaction trace request");

// TODO: handle the case where the block is in the future.
// Requires a execution block interval ticker.

Expand All @@ -133,6 +170,8 @@ impl CallTraceManager {
}
}
TraceCommand::FetchAccumulatedDiffs { block, res } => {
tracing::debug!(block = block, "Fetching accumulated state diffs");

if self.pending_traces.is_empty() {
// If there are no pending traces for the given block, and the
// accumulated state diffs are already available, send the result
Expand Down
5 changes: 1 addition & 4 deletions bolt-sidecar/src/builder/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
#![allow(dead_code)]
#![allow(missing_docs)]
#![allow(unused_imports)]

pub mod template;
pub use template::BlockTemplate;

pub mod payload_builder;

pub mod state_root;

pub mod call_trace_manager;
Expand Down
Loading

0 comments on commit 29ca9d4

Please sign in to comment.