Skip to content

Commit

Permalink
Merge pull request #127 from chainbound/fix/get-payload-mevboost
Browse files Browse the repository at this point in the history
fix(sidecar): deserialize get_payload response from mev-boost
  • Loading branch information
merklefruit authored Jul 15, 2024
2 parents 48a471e + db7d643 commit 96634a7
Show file tree
Hide file tree
Showing 9 changed files with 268 additions and 37 deletions.
3 changes: 1 addition & 2 deletions bolt-sidecar/bin/sidecar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,8 @@ async fn main() -> eyre::Result<()> {

execution_state.add_constraint(request.slot, signed_constraints.clone());


let res = serde_json::to_value(signed_constraints).map_err(Into::into);
let _ = response_tx.send(res).ok();
let _ = response_tx.send(res);
},
Ok(HeadEvent { slot, .. }) = head_tracker.next_head() => {
tracing::info!(slot, "Received new head event");
Expand Down
10 changes: 5 additions & 5 deletions bolt-sidecar/src/api/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use std::{sync::Arc, time::Duration};
use tokio::net::TcpListener;

use super::spec::{
BuilderApi, BuilderApiError, ConstraintsApi, GET_HEADER_PATH, GET_PAYLOAD_PATH,
REGISTER_VALIDATORS_PATH, STATUS_PATH,
BuilderApiError, ConstraintsApi, GET_HEADER_PATH, GET_PAYLOAD_PATH, REGISTER_VALIDATORS_PATH,
STATUS_PATH,
};
use crate::{
client::mevboost::MevBoostClient,
Expand All @@ -35,7 +35,7 @@ const GET_HEADER_WITH_PROOFS_TIMEOUT: Duration = Duration::from_millis(500);

/// A proxy server for the builder API.
/// Forwards all requests to the target after interception.
pub struct BuilderProxyServer<T: BuilderApi, P> {
pub struct BuilderProxyServer<T, P> {
proxy_target: T,
// TODO: fill with local payload when we fetch a payload
// in failed get_header
Expand Down Expand Up @@ -127,7 +127,7 @@ where
Ok(res) => match res {
Err(builder_err) => builder_err,
Ok(header) => {
tracing::debug!(elapsed = ?start.elapsed(), "Returning signed builder bid: {:?}", header);
tracing::debug!(elapsed = ?start.elapsed(), "Returning signed builder bid");
return Ok(Json(header));
}
},
Expand Down Expand Up @@ -215,7 +215,7 @@ where
});
};

tracing::info!("Local block found, returning: {payload:?}");
tracing::debug!("Local block found, returning: {payload:?}");
return Ok(Json(payload));
}

Expand Down
5 changes: 5 additions & 0 deletions bolt-sidecar/src/api/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ pub enum BuilderApiError {
InvalidFork(String),
#[error("Invalid local payload block hash. expected: {expected}, got: {have}")]
InvalidLocalPayloadBlockHash { expected: String, have: String },
#[error("Generic error: {0}")]
Generic(String),
}

impl IntoResponse for BuilderApiError {
Expand Down Expand Up @@ -110,6 +112,9 @@ impl IntoResponse for BuilderApiError {
BuilderApiError::InvalidLocalPayloadBlockHash { .. } => {
(StatusCode::BAD_REQUEST, self.to_string()).into_response()
}
BuilderApiError::Generic(err) => {
(StatusCode::INTERNAL_SERVER_ERROR, Json(err)).into_response()
}
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions bolt-sidecar/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,6 @@ pub mod rpc;

// Re-export the beacon_api_client
pub use beacon_api_client::mainnet::Client as BeaconClient;

#[cfg(test)]
mod test_util;
137 changes: 137 additions & 0 deletions bolt-sidecar/src/client/test_util/deneb_get_payload_response.json

Large diffs are not rendered by default.

110 changes: 110 additions & 0 deletions bolt-sidecar/src/client/test_util/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
#![allow(unused)]

use beacon_api_client::VersionedValue;
use ethereum_consensus::{
builder::SignedValidatorRegistration,
deneb::{
self,
mainnet::{BlobsBundle, SignedBlindedBeaconBlock},
},
types::mainnet::ExecutionPayload,
};
use reqwest::StatusCode;
use serde_json::Value;
use tokio::sync::watch;

use crate::{
api::{builder::GetHeaderParams, spec::BuilderApiError},
primitives::{BatchedSignedConstraints, GetPayloadResponse, PayloadAndBlobs, SignedBuilderBid},
BuilderApi, ConstraintsApi,
};

/// Create a `GetPayloadResponse` with a default `Deneb` execution payload.
pub fn make_get_payload_response() -> GetPayloadResponse {
let execution_payload = ExecutionPayload::Deneb(deneb::ExecutionPayload::default());

let blobs_bundle = BlobsBundle::default();

GetPayloadResponse::Deneb(PayloadAndBlobs {
execution_payload,
blobs_bundle,
})
}

pub struct MockMevBoost {
pub response_rx: watch::Receiver<Value>,
}

impl MockMevBoost {
pub fn new() -> (Self, watch::Sender<Value>) {
let (response_tx, response_rx) = watch::channel(Value::Null);
(Self { response_rx }, response_tx)
}
}

#[async_trait::async_trait]
impl BuilderApi for MockMevBoost {
async fn status(&self) -> Result<StatusCode, BuilderApiError> {
Err(BuilderApiError::Generic(
"MockMevBoost does not support getting status".to_string(),
))
}

async fn register_validators(
&self,
_registrations: Vec<SignedValidatorRegistration>,
) -> Result<(), BuilderApiError> {
Err(BuilderApiError::Generic(
"MockMevBoost does not support registering validators".to_string(),
))
}

async fn get_header(
&self,
_params: GetHeaderParams,
) -> Result<SignedBuilderBid, BuilderApiError> {
let response = self.response_rx.borrow().clone();
let bid = serde_json::from_value(response)?;
Ok(bid)
}

async fn get_payload(
&self,
_signed_block: SignedBlindedBeaconBlock,
) -> Result<GetPayloadResponse, BuilderApiError> {
let response = self.response_rx.borrow().clone();
let payload = serde_json::from_value(response)?;
Ok(payload)
}
}

#[async_trait::async_trait]
impl ConstraintsApi for MockMevBoost {
async fn submit_constraints(
&self,
_constraints: &BatchedSignedConstraints,
) -> Result<(), BuilderApiError> {
Err(BuilderApiError::Generic(
"MockMevBoost does not support submitting constraints".to_string(),
))
}

async fn get_header_with_proofs(
&self,
_params: GetHeaderParams,
) -> Result<VersionedValue<SignedBuilderBid>, BuilderApiError> {
let response = self.response_rx.borrow().clone();
let bid = serde_json::from_value(response)?;
Ok(bid)
}
}

#[test]
fn test_decode_get_payload_response() {
let stringified =
std::fs::read_to_string("./src/client/test_util/deneb_get_payload_response.json")
.expect("failed to read get payload response file");

let parsed_response: GetPayloadResponse =
serde_json::from_str(&stringified).expect("failed to parse get payload response");
}
23 changes: 1 addition & 22 deletions bolt-sidecar/src/primitives/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ impl Default for PayloadAndBlobs {
}
}

#[derive(Debug, serde::Serialize)]
#[derive(Debug, serde::Serialize, serde::Deserialize)]
#[serde(tag = "version", content = "data")]
pub enum GetPayloadResponse {
#[serde(rename = "bellatrix")]
Expand Down Expand Up @@ -191,27 +191,6 @@ impl GetPayloadResponse {
}
}

impl<'de> serde::Deserialize<'de> for GetPayloadResponse {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let value = serde_json::Value::deserialize(deserializer)?;
if let Ok(inner) = <_ as serde::Deserialize>::deserialize(&value) {
return Ok(Self::Capella(inner));
}
if let Ok(inner) = <_ as serde::Deserialize>::deserialize(&value) {
return Ok(Self::Deneb(inner));
}
if let Ok(inner) = <_ as serde::Deserialize>::deserialize(&value) {
return Ok(Self::Bellatrix(inner));
}
Err(serde::de::Error::custom(
"no variant could be deserialized from input for GetPayloadResponse",
))
}
}

/// A struct representing the current chain head.
#[derive(Debug, Clone)]
pub struct ChainHead {
Expand Down
6 changes: 2 additions & 4 deletions bolt-sidecar/src/state/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,10 +218,8 @@ impl<C: StateFetcher> ExecutionState<C> {
self.slot = slot;

// TODO: invalidate any state that we don't need anymore (will be based on block template)
let update = self
.client
.get_state_update(self.account_states.keys().collect::<Vec<_>>(), block_number)
.await?;
let accounts = self.account_states.keys().collect::<Vec<_>>();
let update = self.client.get_state_update(accounts, block_number).await?;

self.apply_state_update(update);

Expand Down
8 changes: 4 additions & 4 deletions bolt-sidecar/src/state/head_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl HeadTracker {
let mut event_stream = match beacon_client.get_events::<NewHeadsTopic>().await {
Ok(events) => events,
Err(err) => {
tracing::warn!("failed to subscribe to new heads topic: {:?}", err);
tracing::warn!(?err, "failed to subscribe to new heads topic, retrying...");
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
Expand All @@ -51,7 +51,7 @@ impl HeadTracker {
let event = match event_stream.next().await {
Some(Ok(event)) => event,
Some(Err(err)) => {
tracing::warn!("error reading new head event stream: {:?}", err);
tracing::warn!(?err, "error reading new head event stream, retrying...");
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
Expand All @@ -62,8 +62,8 @@ impl HeadTracker {
}
};

if let Err(e) = new_heads_tx.send(event) {
tracing::warn!("failed to broadcast new head event to subscribers: {:?}", e);
if let Err(err) = new_heads_tx.send(event) {
tracing::warn!(?err, "failed to broadcast new head event to subscribers");
}
}
});
Expand Down

0 comments on commit 96634a7

Please sign in to comment.