Skip to content

Commit

Permalink
Merge pull request #181 from chainbound/nico/feat/sidecar-driver
Browse files Browse the repository at this point in the history
feat(sidecar): added driver on higher register (`tokio::select`)
  • Loading branch information
merklefruit authored Aug 5, 2024
2 parents 368983d + 90f6a8a commit 1c01e7b
Show file tree
Hide file tree
Showing 11 changed files with 359 additions and 255 deletions.
192 changes: 13 additions & 179 deletions bolt-sidecar/bin/sidecar.rs
Original file line number Diff line number Diff line change
@@ -1,188 +1,22 @@
use std::time::Duration;

use alloy::{rpc::types::beacon::events::HeadEvent, signers};
use tokio::sync::mpsc;

use bolt_sidecar::{
commitments::{
server::{CommitmentsApiServer, Event},
spec,
},
crypto::{bls::Signer, SignableBLS, SignerBLS},
primitives::{
CommitmentRequest, ConstraintsMessage, FetchPayloadRequest, LocalPayloadFetcher,
SignedConstraints,
},
start_builder_proxy_server,
state::{ConsensusState, ExecutionState, HeadTracker, StateClient},
BeaconClient, BuilderProxyConfig, Config, ConstraintsApi, LocalBuilder, MevBoostClient,
};
use bolt_sidecar::{Config, SidecarDriver};
use eyre::{bail, Result};
use tracing::info;

#[tokio::main]
async fn main() -> eyre::Result<()> {
async fn main() -> Result<()> {
// TODO: improve telemetry setup (#116)
tracing_subscriber::fmt::init();

let config = Config::parse_from_cli()?;

tracing::info!(chain = config.chain.name(), "Starting Bolt sidecar");

// TODO: support external signers
// probably it's cleanest to have the Config parser initialize a generic Signer
let signer = Signer::new(config.private_key.clone().unwrap());

// TODO: support external signers
let commitment_signer = signers::local::PrivateKeySigner::random();

let state_client = StateClient::new(config.execution_api_url.clone());
let mut execution_state = ExecutionState::new(state_client, config.limits).await?;

let mevboost_client = MevBoostClient::new(config.mevboost_url.clone());
let beacon_client = BeaconClient::new(config.beacon_api_url.clone());

let (api_events, mut api_events_rx) = mpsc::channel(1024);

let mut api_server = CommitmentsApiServer::new(format!("0.0.0.0:{}", config.rpc_port));

api_server.run(api_events).await?;

let mut consensus_state = ConsensusState::new(
beacon_client.clone(),
config.validator_indexes.clone(),
config.chain.commitment_deadline(),
);

// TODO: this can be replaced with ethereum_consensus::clock::from_system_time()
// but using beacon node events is easier to work on a custom devnet for now
// (as we don't need to specify genesis time and slot duration)
let mut head_tracker = HeadTracker::start(beacon_client);

let builder_proxy_config = BuilderProxyConfig {
mevboost_url: config.mevboost_url.clone(),
server_port: config.mevboost_proxy_port,
let config = match Config::parse_from_cli() {
Ok(config) => config,
Err(err) => bail!("Failed to parse CLI arguments: {:?}", err),
};

let (payload_tx, mut payload_rx) = mpsc::channel(16);
let payload_fetcher = LocalPayloadFetcher::new(payload_tx);

let mut local_builder = LocalBuilder::new(&config);

tokio::spawn(async move {
if let Err(e) = start_builder_proxy_server(payload_fetcher, builder_proxy_config).await {
tracing::error!("Builder API proxy failed: {:?}", e);
}
});

// TODO: parallelize this
loop {
tokio::select! {
Some(Event { mut request, response }) = api_events_rx.recv() => {
let start = std::time::Instant::now();

let validator_index = match consensus_state.validate_request(&request) {
Ok(index) => index,
Err(e) => {
tracing::error!(err = ?e, "Consensus State: Failed to validate request");
let _ = response.send(Err(spec::Error::ValidationFailed(e.to_string())));
continue;
}
};

if let Err(e) = execution_state.validate_commitment_request(&mut request).await {
tracing::error!(err = ?e, "Execution State: Failed to validate request");
let _ = response.send(Err(spec::Error::ValidationFailed(e.to_string())));
continue;
};

// TODO: match when we have more request types
let CommitmentRequest::Inclusion(ref req) = request;
tracing::info!(
elapsed = ?start.elapsed(),
digest = ?req.digest(),
"Validation against execution state passed"
);

// parse the request into constraints and sign them with the sidecar signer
let slot = req.slot;
let message = ConstraintsMessage::build(validator_index, req.clone());
let signature = signer.sign(&message.digest())?.to_string();
let signed_constraints = SignedConstraints { message, signature };

execution_state.add_constraint(slot, signed_constraints);

// Create a commitment by signing the request with the commitment signer
match request.commit_and_sign(&commitment_signer).await {
Ok(commitment) => {
let _ = response.send(Ok(commitment));
},
Err(e) => {
tracing::error!(err = ?e, "Failed to sign commitment");
let _ = response.send(Err(spec::Error::Internal));
}
}
},
Ok(HeadEvent { slot, .. }) = head_tracker.next_head() => {
tracing::info!(slot, "Received new head event");

// We use None to signal that we want to fetch the latest EL head
if let Err(e) = execution_state.update_head(None, slot).await {
tracing::error!(err = ?e, "Failed to update execution state head");
}

if let Err(e) = consensus_state.update_head(slot).await {
tracing::error!(err = ?e, "Failed to update consensus state head");
}
},
Some(slot) = consensus_state.commitment_deadline.wait() => {
tracing::info!(slot, "Commitment deadline reached, starting to build local block");

let Some(template) = execution_state.remove_block_template(slot) else {
tracing::warn!("No block template found for slot {slot} when requested");
continue;
};

tracing::trace!(?template.signed_constraints_list, "Submitting constraints to MEV-Boost");

// TODO: fix retry logic, and move this to separate task
let max_retries = 5;
let mut i = 0;
'inner: while let Err(e) = mevboost_client
.submit_constraints(&template.signed_constraints_list)
.await
{
tracing::error!(err = ?e, "Error submitting constraints, retrying...");
tokio::time::sleep(Duration::from_millis(100)).await;
i+=1;
if i >= max_retries {
tracing::error!("Max retries reached while submitting to MEV-Boost");
break 'inner
}
}

if let Err(e) = local_builder.build_new_local_payload(&template).await {
tracing::error!(err = ?e, "CRITICAL: Error while building local payload at slot deadline for {slot}");
};
},
Some(FetchPayloadRequest { slot, response_tx }) = payload_rx.recv() => {
tracing::info!(slot, "Received local payload request");

let Some(payload_and_bid) = local_builder.get_cached_payload() else {
tracing::warn!("No local payload found for {slot}");
let _ = response_tx.send(None);
continue;
};

if let Err(e) = response_tx.send(Some(payload_and_bid)) {
tracing::error!(err = ?e, "Failed to send payload and bid in response channel");
} else {
tracing::debug!("Sent payload and bid to response channel");
}
},
Ok(_) = tokio::signal::ctrl_c() => {
tracing::info!("Received SIGINT, shutting down...");
break;
}
}
}
info!(chain = config.chain.name(), "Starting Bolt sidecar");
match SidecarDriver::new(config).await {
Ok(driver) => driver.run_forever().await,
Err(err) => bail!("Failed to initialize the sidecar driver: {:?}", err),
};

Ok(())
}
3 changes: 2 additions & 1 deletion bolt-sidecar/src/api/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,8 @@ where
.route(GET_PAYLOAD_PATH, post(BuilderProxyServer::get_payload))
.with_state(server);

let listener = TcpListener::bind(format!("0.0.0.0:{}", config.server_port)).await?;
let addr = format!("0.0.0.0:{}", config.server_port);
let listener = TcpListener::bind(addr).await?;
axum::serve(listener, router).await?;

Ok(())
Expand Down
42 changes: 23 additions & 19 deletions bolt-sidecar/src/api/commitments/server.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{
collections::HashSet,
fmt::Debug,
fmt,
future::Future,
net::{SocketAddr, ToSocketAddrs},
pin::Pin,
Expand All @@ -9,13 +9,14 @@ use std::{
};

use alloy::primitives::{Address, Signature};
use axum::{extract::State, http::HeaderMap, routing::post, Json};
use axum::{extract::State, http::HeaderMap, routing::post, Json, Router};
use axum_extra::extract::WithRejection;
use serde_json::Value;
use tokio::{
net::TcpListener,
sync::{mpsc, oneshot},
};
use tracing::error;

use crate::{
common::CARGO_PKG_VERSION,
Expand Down Expand Up @@ -94,8 +95,8 @@ pub struct CommitmentsApiServer {
signal: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
}

impl Debug for CommitmentsApiServer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
impl fmt::Debug for CommitmentsApiServer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("CommitmentsApiServer")
.field("addr", &self.addr)
.finish()
Expand Down Expand Up @@ -125,33 +126,37 @@ impl CommitmentsApiServer {
}
}

/// Runs the JSON-RPC server in the background, sending events to the provided channel.
pub async fn run(&mut self, events_tx: mpsc::Sender<Event>) -> eyre::Result<()> {
/// Runs the JSON-RPC server, sending events to the provided channel.
pub async fn run(&mut self, events_tx: mpsc::Sender<Event>) {
let api = Arc::new(CommitmentsApiInner::new(events_tx));

let router = axum::Router::new()
let router = Router::new()
.route("/", post(Self::handle_rpc))
.with_state(api);

let listener = TcpListener::bind(self.addr).await?;
let addr = listener.local_addr()?;
let listener = match TcpListener::bind(self.addr).await {
Ok(listener) => listener,
Err(err) => {
error!(?err, "Failed to bind Commitments API server");
panic!("Failed to bind Commitments API server");
}
};

let addr = listener.local_addr().expect("Failed to get local address");
self.addr = addr;

tracing::info!("Commitments RPC server bound to {addr}");

let signal = self.signal.take();
let signal = self.signal.take().expect("Signal not set");

tokio::spawn(async move {
if let Err(e) = axum::serve(listener, router)
.with_graceful_shutdown(signal.unwrap())
if let Err(err) = axum::serve(listener, router)
.with_graceful_shutdown(signal)
.await
{
tracing::error!("Server error: {:?}", e);
tracing::error!(?err, "Commitments API Server error");
}
});

Ok(())
}

/// Returns the local addr the server is listening on (or configured with).
Expand All @@ -175,10 +180,9 @@ impl CommitmentsApiServer {
match payload.method.as_str() {
GET_VERSION_METHOD => {
let version_string = format!("bolt-sidecar-v{CARGO_PKG_VERSION}");
let result = serde_json::to_value(version_string).unwrap_or(Value::Null);
Ok(Json(JsonResponse {
id: payload.id,
result,
result: Value::String(version_string),
..Default::default()
}))
}
Expand Down Expand Up @@ -289,7 +293,7 @@ mod test {

let (events_tx, _) = mpsc::channel(1);

server.run(events_tx).await.unwrap();
server.run(events_tx).await;
let addr = server.local_addr();

let sk = SecretKey::random(&mut rand::thread_rng());
Expand Down Expand Up @@ -333,7 +337,7 @@ mod test {

let (events_tx, mut events) = mpsc::channel(1);

server.run(events_tx).await.unwrap();
server.run(events_tx).await;
let addr = server.local_addr();

let sk = SecretKey::random(&mut rand::thread_rng());
Expand Down
21 changes: 16 additions & 5 deletions bolt-sidecar/src/api/commitments/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ use alloy::primitives::SignatureError;
use axum::{extract::rejection::JsonRejection, http::StatusCode, response::IntoResponse, Json};
use thiserror::Error;

use crate::primitives::{commitment::InclusionCommitment, InclusionRequest};
use crate::{
primitives::{commitment::InclusionCommitment, InclusionRequest},
state::{consensus::ConsensusError, ValidationError},
};

use super::jsonrpc::JsonResponse;

Expand All @@ -18,9 +21,12 @@ pub enum Error {
/// Request rejected.
#[error("Request rejected: {0}")]
Rejected(#[from] RejectionError),
/// Consensus validation failed.
#[error("Consensus validation error: {0}")]
Consensus(#[from] ConsensusError),
/// Request validation failed.
#[error("{0}")]
ValidationFailed(String),
#[error("Validation failed: {0}")]
Validation(#[from] ValidationError),
/// Duplicate request.
#[error("Duplicate request")]
Duplicate,
Expand Down Expand Up @@ -80,9 +86,14 @@ impl IntoResponse for Error {
Json(JsonResponse::from_error(-32005, err.to_string())),
)
.into_response(),
Error::ValidationFailed(message) => (
Error::Consensus(err) => (
StatusCode::BAD_REQUEST,
Json(JsonResponse::from_error(-32006, err.to_string())),
)
.into_response(),
Error::Validation(err) => (
StatusCode::BAD_REQUEST,
Json(JsonResponse::from_error(-32006, message)),
Json(JsonResponse::from_error(-32006, err.to_string())),
)
.into_response(),
Error::MalformedHeader => (
Expand Down
2 changes: 1 addition & 1 deletion bolt-sidecar/src/client/mevboost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{
};

/// A client for interacting with the MEV-Boost API.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct MevBoostClient {
url: Url,
client: reqwest::Client,
Expand Down
Loading

0 comments on commit 1c01e7b

Please sign in to comment.