From 2a59aedc2ffcc25cd24a11893bb0d2515bc1256d Mon Sep 17 00:00:00 2001 From: thedevbirb Date: Fri, 28 Jun 2024 18:36:09 +0200 Subject: [PATCH] fix(sidecar): builder api + fallback wip Co-authored-by merklefruit --- Justfile | 6 +++ bolt-sidecar/bin/sidecar.rs | 59 +++++++++++++-------- bolt-sidecar/src/api/builder.rs | 23 ++++---- bolt-sidecar/src/builder/mod.rs | 4 +- bolt-sidecar/src/builder/payload_builder.rs | 47 ++++++++++++---- bolt-sidecar/src/json_rpc/api.rs | 57 ++++++++++---------- bolt-sidecar/src/primitives/mod.rs | 31 +++++++++-- scripts/kurtosis_config.yaml | 2 +- 8 files changed, 155 insertions(+), 74 deletions(-) diff --git a/Justfile b/Justfile index a824b857..0ae5d272 100644 --- a/Justfile +++ b/Justfile @@ -76,6 +76,12 @@ sidecar-dump: @id=$(docker ps -n 100 | grep sidecar | awk -F' ' '{print $1}') && \ docker logs $id 2>&1 | tee sidecar_dump.log + +# show the logs for the bolt devnet builder +kill-builder: + @id=$(docker ps -n 100 | grep bolt-builder | awk -F' ' '{print $1}') && \ + docker stop $id + # show the dora explorer in the browser. NOTE: works only for Linux and MacOS at the moment dora: @url=$(just inspect | grep 'dora\s*http' | awk -F'-> ' '{print $2}' | awk '{print $1}') && \ diff --git a/bolt-sidecar/bin/sidecar.rs b/bolt-sidecar/bin/sidecar.rs index 9d414d1f..37af2489 100644 --- a/bolt-sidecar/bin/sidecar.rs +++ b/bolt-sidecar/bin/sidecar.rs @@ -6,10 +6,10 @@ use bolt_sidecar::{ bls::{Signer, SignerBLS}, SignableBLS, }, - json_rpc, + json_rpc::{self, api::ApiEvent}, primitives::{ - BatchedSignedConstraints, ChainHead, ConstraintsMessage, LocalPayloadFetcher, - SignedConstraints, + BatchedSignedConstraints, ChainHead, ConstraintsMessage, FetchPayloadRequest, + LocalPayloadFetcher, SignedConstraints, }, spec::ConstraintsApi, start_builder_proxy, @@ -53,11 +53,14 @@ async fn main() -> eyre::Result<()> { let (payload_tx, mut payload_rx) = mpsc::channel(16); let payload_fetcher = LocalPayloadFetcher::new(payload_tx); + tracing::info!("JWT secret: {}", config.jwt_hex); + let mut local_builder = LocalBuilder::new( BlsSecretKey::try_from(config.builder_private_key.to_bytes().as_ref())?, &config.execution_api_url, &config.engine_api_url, &config.jwt_hex, + &config.beacon_api_url, config.fee_recipient, ); @@ -75,9 +78,8 @@ async fn main() -> eyre::Result<()> { // TODO: parallelize this loop { tokio::select! { - Some(event) = api_events_rx.recv() => { - tracing::info!("Received commitment request: {:?}", event.request); - let request = event.request; + Some(ApiEvent { request, response_tx }) = api_events_rx.recv() => { + tracing::info!("Received commitment request: {:?}", request); // if let Err (e) = consensus_state.validate_request(&CommitmentRequest::Inclusion(request.clone())) { // tracing::error!("Failed to validate request: {:?}", e); @@ -90,7 +92,7 @@ async fn main() -> eyre::Result<()> { // .await // { // tracing::error!("Failed to commit request: {:?}", e); - // let _ = event.response.send(Err(ApiError::Custom(e.to_string()))); + // let _ = response_tx.send(Err(ApiError::Custom(e.to_string()))); // continue; // } execution_state.commit_transaction(request.slot, request.tx.clone()); @@ -111,7 +113,7 @@ async fn main() -> eyre::Result<()> { // TODO: fix retry logic let max_retries = 5; let mut i = 0; - while let Err(e) = mevboost_client + 'inner: while let Err(e) = mevboost_client .submit_constraints(&signed_constraints) .await { @@ -119,15 +121,19 @@ async fn main() -> eyre::Result<()> { tokio::time::sleep(Duration::from_millis(100)).await; i+=1; if i >= max_retries { - break + break 'inner } } + + let res = serde_json::to_value(signed_constraints).map_err(Into::into); + let _ = response_tx.send(res).ok(); } - Some(request) = payload_rx.recv() => { - tracing::info!("Received local payload request: {:?}", request); - let Some(response) = execution_state.get_block_template(request.slot) else { - tracing::warn!("No block template found for slot {} when requested", request.slot); - let _ = request.response.send(None); + Some(FetchPayloadRequest { slot, response_tx }) = payload_rx.recv() => { + tracing::info!(slot, "Received local payload request"); + + let Some(template) = execution_state.get_block_template(slot) else { + tracing::warn!("No block template found for slot {slot} when requested"); + let _ = response_tx.send(None); continue; }; @@ -136,17 +142,28 @@ async fn main() -> eyre::Result<()> { // Once we have that, we need to send it as response to the validator via the pending get_header RPC call. // The validator will then call get_payload with the corresponding SignedBlindedBeaconBlock. We then need to // respond with the full ExecutionPayload inside the BeaconBlock (+ blobs if any). - let payload_and_bid = local_builder.build_new_local_payload(response.transactions).await?; + let payload_and_bid = match local_builder.build_new_local_payload(template.transactions).await { + Ok(res) => res, + Err(e) => { + tracing::error!(err = ?e, "CRITICAL: Error while building local payload for slot {slot}"); + let _ = response_tx.send(None); + continue; + } + }; - let _ = request.response.send(Some(payload_and_bid)); + if let Err(e) = response_tx.send(Some(payload_and_bid)) { + tracing::error!(err = ?e, "Failed to send payload and bid in response channel"); + } else { + tracing::debug!("Sent payload and bid to response channel"); + } + }, + Ok(_) = tokio::signal::ctrl_c() => { + tracing::info!("Received SIGINT, shutting down..."); + shutdown_tx.send(()).await.ok(); + break; } - - else => break, } } - tokio::signal::ctrl_c().await?; - shutdown_tx.send(()).await.ok(); - Ok(()) } diff --git a/bolt-sidecar/src/api/builder.rs b/bolt-sidecar/src/api/builder.rs index 0e8631a1..146f26f6 100644 --- a/bolt-sidecar/src/api/builder.rs +++ b/bolt-sidecar/src/api/builder.rs @@ -129,16 +129,21 @@ where }; // On ANY error, we fall back to locally built block - tracing::error!(slot, elapsed = ?start.elapsed(), err = ?err, "Proxy error, fetching local payload instead"); + tracing::warn!(slot, elapsed = ?start.elapsed(), err = ?err, "Proxy error, fetching local payload instead"); - let payload = server - .payload_fetcher - .fetch_payload(slot) - .await - // TODO: handle failure? In this case, we don't have a fallback block - // which means we haven't made any commitments. This means the beacon client should - // fallback to local block building. - .ok_or(BuilderApiError::FailedToFetchLocalPayload(slot))?; + let payload = match server.payload_fetcher.fetch_payload(slot).await { + Some(payload) => { + tracing::info!(elapsed = ?start.elapsed(), "Fetched local payload for slot {slot}"); + payload + } + None => { + // TODO: handle failure? In this case, we don't have a fallback block + // which means we haven't made any commitments. This means the beacon client should + // fallback to local block building. + tracing::error!("No local payload produced for slot {slot}"); + return Err(BuilderApiError::FailedToFetchLocalPayload(slot)); + } + }; let hash = payload.bid.message.header.block_hash.clone(); let number = payload.bid.message.header.block_number; diff --git a/bolt-sidecar/src/builder/mod.rs b/bolt-sidecar/src/builder/mod.rs index 1e2d59fa..9fe6537e 100644 --- a/bolt-sidecar/src/builder/mod.rs +++ b/bolt-sidecar/src/builder/mod.rs @@ -74,6 +74,7 @@ impl LocalBuilder { execution_rpc_url: &str, engine_rpc_url: &str, engine_jwt_secret: &str, + beacon_api_url: &str, fee_recipient: Address, ) -> Self { Self { @@ -81,8 +82,9 @@ impl LocalBuilder { fallback_builder: FallbackPayloadBuilder::new( engine_jwt_secret, fee_recipient, - execution_rpc_url, engine_rpc_url, + execution_rpc_url, + beacon_api_url, ), } } diff --git a/bolt-sidecar/src/builder/payload_builder.rs b/bolt-sidecar/src/builder/payload_builder.rs index 05b52039..10707e85 100644 --- a/bolt-sidecar/src/builder/payload_builder.rs +++ b/bolt-sidecar/src/builder/payload_builder.rs @@ -37,6 +37,7 @@ const DEFAULT_EXTRA_DATA: &str = "Selfbuilt w Bolt"; pub struct FallbackPayloadBuilder { extra_data: Bytes, fee_recipient: Address, + beacon_api_url: String, execution_rpc_client: RpcClient, engine_hinter: EngineHinter, } @@ -48,6 +49,7 @@ impl FallbackPayloadBuilder { fee_recipient: Address, engine_rpc_url: &str, execution_rpc_url: &str, + beacon_api_url: &str, ) -> Self { let engine_hinter = EngineHinter { client: reqwest::Client::new(), @@ -58,6 +60,7 @@ impl FallbackPayloadBuilder { Self { fee_recipient, engine_hinter, + beacon_api_url: beacon_api_url.to_string(), extra_data: hex::encode(DEFAULT_EXTRA_DATA).into(), execution_rpc_client: RpcClient::new(execution_rpc_url), } @@ -99,9 +102,10 @@ impl FallbackPayloadBuilder { transactions: Vec, ) -> Result { let latest_block = self.execution_rpc_client.get_block(None, true).await?; + tracing::info!(num = ?latest_block.header.number, "got latest block"); // TODO: refactor this once ConsensusState (https://github.com/chainbound/bolt/issues/58) is ready - let beacon_api_endpoint = reqwest::Url::parse("http://remotebeast:3500").unwrap(); + let beacon_api_endpoint = reqwest::Url::parse(&self.beacon_api_url).unwrap(); let beacon_api = beacon_api_client::mainnet::Client::new(beacon_api_endpoint); let withdrawals = beacon_api @@ -111,6 +115,7 @@ impl FallbackPayloadBuilder { .into_iter() .map(to_reth_withdrawal) .collect::>(); + tracing::info!(amount = ?withdrawals.len(), "got withdrawals"); let withdrawals = if withdrawals.is_empty() { None @@ -122,7 +127,10 @@ impl FallbackPayloadBuilder { // when using the beacon_api_client crate directly, so we use reqwest temporarily. // this is to be refactored. let prev_randao = reqwest::Client::new() - .get("http://remotebeast:3500/eth/v1/beacon/states/head/randao") + .get(format!( + "{}/eth/v1/beacon/states/head/randao", + self.beacon_api_url + )) .send() .await .unwrap() @@ -135,17 +143,20 @@ impl FallbackPayloadBuilder { .as_str() .unwrap(); let prev_randao = B256::from_hex(prev_randao).unwrap(); + tracing::info!("got prev_randao"); let parent_beacon_block_root = beacon_api .get_beacon_block_root(BlockId::Head) .await .unwrap(); + tracing::info!(parent = ?parent_beacon_block_root, "got parent_beacon_block_root"); let versioned_hashes = transactions .iter() .flat_map(|tx| tx.blob_versioned_hashes()) .flatten() .collect::>(); + tracing::info!(amount = ?versioned_hashes.len(), "got versioned_hashes"); let base_fee = calc_next_block_base_fee( latest_block.header.gas_used, @@ -179,7 +190,7 @@ impl FallbackPayloadBuilder { }; let mut hints = Hints::default(); - let max_iterations = 5; + let max_iterations = 10; let mut i = 0; loop { let header = build_header_with_hints_and_context(&latest_block, &hints, &ctx); @@ -190,24 +201,38 @@ impl FallbackPayloadBuilder { let hinted_hash = hints.block_hash.unwrap_or(sealed_block.hash()); let exec_payload = to_alloy_execution_payload(&sealed_block, hinted_hash); + tracing::info!("pre hint fetch"); let engine_hint = self .engine_hinter .fetch_next_payload_hint(&exec_payload, &versioned_hashes, parent_beacon_block_root) .await?; + tracing::info!(hint = ?engine_hint, "post hint fetch"); match engine_hint { EngineApiHint::BlockHash(hash) => hints.block_hash = Some(hash), - EngineApiHint::GasUsed(gas) => hints.gas_used = Some(gas), - EngineApiHint::StateRoot(hash) => hints.state_root = Some(hash), - EngineApiHint::ReceiptsRoot(hash) => hints.receipts_root = Some(hash), - EngineApiHint::LogsBloom(bloom) => hints.logs_bloom = Some(bloom), + EngineApiHint::GasUsed(gas) => { + hints.gas_used = Some(gas); + hints.block_hash = None; + } + EngineApiHint::StateRoot(hash) => { + hints.state_root = Some(hash); + hints.block_hash = None + } + EngineApiHint::ReceiptsRoot(hash) => { + hints.receipts_root = Some(hash); + hints.block_hash = None + } + EngineApiHint::LogsBloom(bloom) => { + hints.logs_bloom = Some(bloom); + hints.block_hash = None + } EngineApiHint::ValidPayload => return Ok(sealed_block), } if i > max_iterations { return Err(BuilderError::Custom( - "Failed to fetch all missing header values from geth error messages" + "Too many iterations: Failed to fetch all missing header values from geth error messages" .to_string(), )); } @@ -249,8 +274,12 @@ impl EngineHinter { versioned_hashes: &[B256], parent_beacon_root: B256, ) -> Result { + tracing::info!("jwt_hex: {:?}", self.jwt_hex); + let auth_jwt = secret_to_bearer_header(&JwtSecret::from_hex(&self.jwt_hex)?); + tracing::info!("auth_jwt: {:?}", auth_jwt); + let body = format!( r#"{{"id":1,"jsonrpc":"2.0","method":"engine_newPayloadV3","params":[{}, {}, "{:?}"]}}"#, serde_json::to_string(&exec_payload)?, @@ -376,7 +405,7 @@ mod tests { let execution = "http://remotebeast:8545"; let engine = "http://remotebeast:8551"; - let builder = FallbackPayloadBuilder::new(&jwt, Address::default(), engine, execution); + let builder = FallbackPayloadBuilder::new(&jwt, Address::default(), engine, execution, ""); let sk = SigningKey::from_slice(hex::decode(raw_sk)?.as_slice())?; let signer = PrivateKeySigner::from_signing_key(sk.clone()); diff --git a/bolt-sidecar/src/json_rpc/api.rs b/bolt-sidecar/src/json_rpc/api.rs index 92a4e91b..6f4dcf89 100644 --- a/bolt-sidecar/src/json_rpc/api.rs +++ b/bolt-sidecar/src/json_rpc/api.rs @@ -55,7 +55,7 @@ pub struct ApiEvent { /// TODO: change to commitment request pub request: InclusionRequest, /// The sender to respond to. - pub response: oneshot::Sender, + pub response_tx: oneshot::Sender, } /// The struct that implements handlers for all JSON-RPC API methods. @@ -85,7 +85,6 @@ impl JsonRpcApi { } } -#[allow(dead_code)] fn internal_error() -> ApiError { ApiError::Custom("internal server error".to_string()) } @@ -103,25 +102,25 @@ impl CommitmentsRpc for JsonRpcApi { )); }; - let params = serde_json::from_value::(params)?; + let request = serde_json::from_value::(params)?; #[allow(irrefutable_let_patterns)] // TODO: remove this when we have more request types - let CommitmentRequest::Inclusion(params) = params + let CommitmentRequest::Inclusion(request) = request else { return Err(ApiError::Custom( "request must be an inclusion request".to_string(), )); }; - info!(?params, "received inclusion commitment request"); + info!(?request, "received inclusion commitment request"); - let tx_sender = params.tx.recover_signer().ok_or(ApiError::Custom( + let tx_sender = request.tx.recover_signer().ok_or(ApiError::Custom( "failed to recover signer from transaction".to_string(), ))?; // validate the user's signature - let signer_address = params + let signer_address = request .signature - .recover_address_from_prehash(¶ms.digest())?; + .recover_address_from_prehash(&request.digest())?; // TODO: relax this check to allow for external signers to request commitments // about transactions that they did not sign themselves @@ -135,41 +134,41 @@ impl CommitmentsRpc for JsonRpcApi { { // check for duplicate requests and update the cache if necessary let mut cache = self.cache.write(); - if let Some(commitments) = cache.get_mut(¶ms.slot) { + if let Some(commitments) = cache.get_mut(&request.slot) { if commitments .iter() - .any(|p| matches!(p, CommitmentRequest::Inclusion(req) if req == ¶ms)) + .any(|p| matches!(p, CommitmentRequest::Inclusion(req) if req == &request)) { return Err(ApiError::DuplicateRequest); } - commitments.push(params.clone().into()); + commitments.push(request.clone().into()); } else { - cache.put(params.slot, vec![params.clone().into()]); + cache.put(request.slot, vec![request.clone().into()]); } } // Drop the lock - let (tx, rx) = oneshot::channel(); + let (response_tx, response_rx) = oneshot::channel(); // send the request to the event loop - self.event_tx - .send(ApiEvent { - request: params.clone(), - response: tx, - }) - .await - .map_err(|e| { - internal_error_with_message(format!("error sending api event, err = {:?}", e)) - })?; - - let _response = rx.await.map_err(|e| { - internal_error_with_message(format!( - "error receiving api event response, err = {:?}", - e - )) + let event = ApiEvent { + request, + response_tx, + }; + self.event_tx.send(event).await.map_err(|e| { + internal_error_with_message(format!("error sending api event, err = {:?}", e)) })?; - Ok(serde_json::to_value("test")?) + match response_rx.await { + // TODO: format the user response to be more clear. Right now it's just the raw + // signed constraints object. + // Docs: https://chainbound.github.io/bolt-docs/api/commitments-api#bolt_inclusionpreconfirmation + Ok(event_response) => event_response, + Err(e) => { + tracing::error!(err = ?e, "error receiving api event response from event loop"); + Err(internal_error()) + } + } } } diff --git a/bolt-sidecar/src/primitives/mod.rs b/bolt-sidecar/src/primitives/mod.rs index a3479520..0d99277b 100644 --- a/bolt-sidecar/src/primitives/mod.rs +++ b/bolt-sidecar/src/primitives/mod.rs @@ -3,7 +3,7 @@ use std::sync::{atomic::AtomicU64, Arc}; -use alloy_primitives::{B256, U256}; +use alloy_primitives::U256; use ethereum_consensus::{ capella, crypto::{KzgCommitment, PublicKey as BlsPublicKey, Signature as BlsSignature}, @@ -87,7 +87,7 @@ pub struct MerkleMultiProof { #[derive(Debug)] pub struct FetchPayloadRequest { pub slot: u64, - pub response: oneshot::Sender>, + pub response_tx: oneshot::Sender>, } #[derive(Debug)] @@ -112,11 +112,34 @@ impl PayloadFetcher for LocalPayloadFetcher { async fn fetch_payload(&self, slot: u64) -> Option { let (tx, rx) = oneshot::channel(); - let fetch_params = FetchPayloadRequest { slot, response: tx }; + let fetch_params = FetchPayloadRequest { + slot, + response_tx: tx, + }; self.tx.send(fetch_params).await.ok()?; - rx.await.ok().flatten() + match rx.await { + Ok(Some(payload_and_bid)) => { + tracing::debug!("LocalPayloadFetcher -- fetched payload for slot {}", slot); + Some(payload_and_bid) + } + Ok(None) => { + tracing::warn!( + "LocalPayloadFetcher -- no payload fetched for slot {}", + slot + ); + None + } + Err(e) => { + tracing::error!( + "LocalPayloadFetcher -- error fetching payload for slot {}: {:?}", + slot, + e + ); + None + } + } } } diff --git a/scripts/kurtosis_config.yaml b/scripts/kurtosis_config.yaml index dc13cbf8..f540a3ba 100644 --- a/scripts/kurtosis_config.yaml +++ b/scripts/kurtosis_config.yaml @@ -1,5 +1,5 @@ network_params: - seconds_per_slot: 2 # 2 seconds are the minimum for testing + seconds_per_slot: 4 # 2 seconds are the minimum for testing tx_spammer_params: tx_spammer_extra_args: ["--slot-time=1", "--accounts=10", "--txcount=1"]