Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sidecar): added driver on higher register (tokio::select) #181

Merged
merged 3 commits into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 13 additions & 179 deletions bolt-sidecar/bin/sidecar.rs
Original file line number Diff line number Diff line change
@@ -1,188 +1,22 @@
use std::time::Duration;

use alloy::{rpc::types::beacon::events::HeadEvent, signers};
use tokio::sync::mpsc;

use bolt_sidecar::{
commitments::{
server::{CommitmentsApiServer, Event},
spec,
},
crypto::{bls::Signer, SignableBLS, SignerBLS},
primitives::{
CommitmentRequest, ConstraintsMessage, FetchPayloadRequest, LocalPayloadFetcher,
SignedConstraints,
},
start_builder_proxy_server,
state::{ConsensusState, ExecutionState, HeadTracker, StateClient},
BeaconClient, BuilderProxyConfig, Config, ConstraintsApi, LocalBuilder, MevBoostClient,
};
use bolt_sidecar::{Config, SidecarDriver};
use eyre::{bail, Result};
use tracing::info;

#[tokio::main]
async fn main() -> eyre::Result<()> {
async fn main() -> Result<()> {
// TODO: improve telemetry setup (#116)
tracing_subscriber::fmt::init();

let config = Config::parse_from_cli()?;

tracing::info!(chain = config.chain.name(), "Starting Bolt sidecar");

// TODO: support external signers
// probably it's cleanest to have the Config parser initialize a generic Signer
let signer = Signer::new(config.private_key.clone().unwrap());

// TODO: support external signers
let commitment_signer = signers::local::PrivateKeySigner::random();

let state_client = StateClient::new(config.execution_api_url.clone());
let mut execution_state = ExecutionState::new(state_client, config.limits).await?;

let mevboost_client = MevBoostClient::new(config.mevboost_url.clone());
let beacon_client = BeaconClient::new(config.beacon_api_url.clone());

let (api_events, mut api_events_rx) = mpsc::channel(1024);

let mut api_server = CommitmentsApiServer::new(format!("0.0.0.0:{}", config.rpc_port));

api_server.run(api_events).await?;

let mut consensus_state = ConsensusState::new(
beacon_client.clone(),
config.validator_indexes.clone(),
config.chain.commitment_deadline(),
);

// TODO: this can be replaced with ethereum_consensus::clock::from_system_time()
// but using beacon node events is easier to work on a custom devnet for now
// (as we don't need to specify genesis time and slot duration)
let mut head_tracker = HeadTracker::start(beacon_client);

let builder_proxy_config = BuilderProxyConfig {
mevboost_url: config.mevboost_url.clone(),
server_port: config.mevboost_proxy_port,
let config = match Config::parse_from_cli() {
Ok(config) => config,
Err(err) => bail!("Failed to parse CLI arguments: {:?}", err),
};

let (payload_tx, mut payload_rx) = mpsc::channel(16);
let payload_fetcher = LocalPayloadFetcher::new(payload_tx);

let mut local_builder = LocalBuilder::new(&config);

tokio::spawn(async move {
if let Err(e) = start_builder_proxy_server(payload_fetcher, builder_proxy_config).await {
tracing::error!("Builder API proxy failed: {:?}", e);
}
});

// TODO: parallelize this
loop {
tokio::select! {
Some(Event { mut request, response }) = api_events_rx.recv() => {
let start = std::time::Instant::now();

let validator_index = match consensus_state.validate_request(&request) {
Ok(index) => index,
Err(e) => {
tracing::error!(err = ?e, "Consensus State: Failed to validate request");
let _ = response.send(Err(spec::Error::ValidationFailed(e.to_string())));
continue;
}
};

if let Err(e) = execution_state.validate_commitment_request(&mut request).await {
tracing::error!(err = ?e, "Execution State: Failed to validate request");
let _ = response.send(Err(spec::Error::ValidationFailed(e.to_string())));
continue;
};

// TODO: match when we have more request types
let CommitmentRequest::Inclusion(ref req) = request;
tracing::info!(
elapsed = ?start.elapsed(),
digest = ?req.digest(),
"Validation against execution state passed"
);

// parse the request into constraints and sign them with the sidecar signer
let slot = req.slot;
let message = ConstraintsMessage::build(validator_index, req.clone());
let signature = signer.sign(&message.digest())?.to_string();
let signed_constraints = SignedConstraints { message, signature };

execution_state.add_constraint(slot, signed_constraints);

// Create a commitment by signing the request with the commitment signer
match request.commit_and_sign(&commitment_signer).await {
Ok(commitment) => {
let _ = response.send(Ok(commitment));
},
Err(e) => {
tracing::error!(err = ?e, "Failed to sign commitment");
let _ = response.send(Err(spec::Error::Internal));
}
}
},
Ok(HeadEvent { slot, .. }) = head_tracker.next_head() => {
tracing::info!(slot, "Received new head event");

// We use None to signal that we want to fetch the latest EL head
if let Err(e) = execution_state.update_head(None, slot).await {
tracing::error!(err = ?e, "Failed to update execution state head");
}

if let Err(e) = consensus_state.update_head(slot).await {
tracing::error!(err = ?e, "Failed to update consensus state head");
}
},
Some(slot) = consensus_state.commitment_deadline.wait() => {
tracing::info!(slot, "Commitment deadline reached, starting to build local block");

let Some(template) = execution_state.remove_block_template(slot) else {
tracing::warn!("No block template found for slot {slot} when requested");
continue;
};

tracing::trace!(?template.signed_constraints_list, "Submitting constraints to MEV-Boost");

// TODO: fix retry logic, and move this to separate task
let max_retries = 5;
let mut i = 0;
'inner: while let Err(e) = mevboost_client
.submit_constraints(&template.signed_constraints_list)
.await
{
tracing::error!(err = ?e, "Error submitting constraints, retrying...");
tokio::time::sleep(Duration::from_millis(100)).await;
i+=1;
if i >= max_retries {
tracing::error!("Max retries reached while submitting to MEV-Boost");
break 'inner
}
}

if let Err(e) = local_builder.build_new_local_payload(&template).await {
tracing::error!(err = ?e, "CRITICAL: Error while building local payload at slot deadline for {slot}");
};
},
Some(FetchPayloadRequest { slot, response_tx }) = payload_rx.recv() => {
tracing::info!(slot, "Received local payload request");

let Some(payload_and_bid) = local_builder.get_cached_payload() else {
tracing::warn!("No local payload found for {slot}");
let _ = response_tx.send(None);
continue;
};

if let Err(e) = response_tx.send(Some(payload_and_bid)) {
tracing::error!(err = ?e, "Failed to send payload and bid in response channel");
} else {
tracing::debug!("Sent payload and bid to response channel");
}
},
Ok(_) = tokio::signal::ctrl_c() => {
tracing::info!("Received SIGINT, shutting down...");
break;
}
}
}
info!(chain = config.chain.name(), "Starting Bolt sidecar");
match SidecarDriver::new(config).await {
Ok(driver) => driver.run_forever().await,
Err(err) => bail!("Failed to initialize the sidecar driver: {:?}", err),
};

Ok(())
}
3 changes: 2 additions & 1 deletion bolt-sidecar/src/api/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,8 @@ where
.route(GET_PAYLOAD_PATH, post(BuilderProxyServer::get_payload))
.with_state(server);

let listener = TcpListener::bind(format!("0.0.0.0:{}", config.server_port)).await?;
let addr = format!("0.0.0.0:{}", config.server_port);
let listener = TcpListener::bind(addr).await?;
axum::serve(listener, router).await?;

Ok(())
Expand Down
42 changes: 23 additions & 19 deletions bolt-sidecar/src/api/commitments/server.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{
collections::HashSet,
fmt::Debug,
fmt,
future::Future,
net::{SocketAddr, ToSocketAddrs},
pin::Pin,
Expand All @@ -9,13 +9,14 @@ use std::{
};

use alloy::primitives::{Address, Signature};
use axum::{extract::State, http::HeaderMap, routing::post, Json};
use axum::{extract::State, http::HeaderMap, routing::post, Json, Router};
use axum_extra::extract::WithRejection;
use serde_json::Value;
use tokio::{
net::TcpListener,
sync::{mpsc, oneshot},
};
use tracing::error;

use crate::{
common::CARGO_PKG_VERSION,
Expand Down Expand Up @@ -94,8 +95,8 @@ pub struct CommitmentsApiServer {
signal: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
}

impl Debug for CommitmentsApiServer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
impl fmt::Debug for CommitmentsApiServer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("CommitmentsApiServer")
.field("addr", &self.addr)
.finish()
Expand Down Expand Up @@ -125,33 +126,37 @@ impl CommitmentsApiServer {
}
}

/// Runs the JSON-RPC server in the background, sending events to the provided channel.
pub async fn run(&mut self, events_tx: mpsc::Sender<Event>) -> eyre::Result<()> {
/// Runs the JSON-RPC server, sending events to the provided channel.
pub async fn run(&mut self, events_tx: mpsc::Sender<Event>) {
let api = Arc::new(CommitmentsApiInner::new(events_tx));

let router = axum::Router::new()
let router = Router::new()
.route("/", post(Self::handle_rpc))
.with_state(api);

let listener = TcpListener::bind(self.addr).await?;
let addr = listener.local_addr()?;
let listener = match TcpListener::bind(self.addr).await {
Ok(listener) => listener,
Err(err) => {
error!(?err, "Failed to bind Commitments API server");
panic!("Failed to bind Commitments API server");
}
};

let addr = listener.local_addr().expect("Failed to get local address");
self.addr = addr;

tracing::info!("Commitments RPC server bound to {addr}");

let signal = self.signal.take();
let signal = self.signal.take().expect("Signal not set");

tokio::spawn(async move {
if let Err(e) = axum::serve(listener, router)
.with_graceful_shutdown(signal.unwrap())
if let Err(err) = axum::serve(listener, router)
.with_graceful_shutdown(signal)
.await
{
tracing::error!("Server error: {:?}", e);
tracing::error!(?err, "Commitments API Server error");
}
});

Ok(())
}

/// Returns the local addr the server is listening on (or configured with).
Expand All @@ -175,10 +180,9 @@ impl CommitmentsApiServer {
match payload.method.as_str() {
GET_VERSION_METHOD => {
let version_string = format!("bolt-sidecar-v{CARGO_PKG_VERSION}");
let result = serde_json::to_value(version_string).unwrap_or(Value::Null);
Ok(Json(JsonResponse {
id: payload.id,
result,
result: Value::String(version_string),
..Default::default()
}))
}
Expand Down Expand Up @@ -289,7 +293,7 @@ mod test {

let (events_tx, _) = mpsc::channel(1);

server.run(events_tx).await.unwrap();
server.run(events_tx).await;
let addr = server.local_addr();

let sk = SecretKey::random(&mut rand::thread_rng());
Expand Down Expand Up @@ -333,7 +337,7 @@ mod test {

let (events_tx, mut events) = mpsc::channel(1);

server.run(events_tx).await.unwrap();
server.run(events_tx).await;
let addr = server.local_addr();

let sk = SecretKey::random(&mut rand::thread_rng());
Expand Down
21 changes: 16 additions & 5 deletions bolt-sidecar/src/api/commitments/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ use alloy::primitives::SignatureError;
use axum::{extract::rejection::JsonRejection, http::StatusCode, response::IntoResponse, Json};
use thiserror::Error;

use crate::primitives::{commitment::InclusionCommitment, InclusionRequest};
use crate::{
primitives::{commitment::InclusionCommitment, InclusionRequest},
state::{consensus::ConsensusError, ValidationError},
};

use super::jsonrpc::JsonResponse;

Expand All @@ -18,9 +21,12 @@ pub enum Error {
/// Request rejected.
#[error("Request rejected: {0}")]
Rejected(#[from] RejectionError),
/// Consensus validation failed.
#[error("Consensus validation error: {0}")]
Consensus(#[from] ConsensusError),
/// Request validation failed.
#[error("{0}")]
ValidationFailed(String),
#[error("Validation failed: {0}")]
Validation(#[from] ValidationError),
/// Duplicate request.
#[error("Duplicate request")]
Duplicate,
Expand Down Expand Up @@ -80,9 +86,14 @@ impl IntoResponse for Error {
Json(JsonResponse::from_error(-32005, err.to_string())),
)
.into_response(),
Error::ValidationFailed(message) => (
Error::Consensus(err) => (
StatusCode::BAD_REQUEST,
Json(JsonResponse::from_error(-32006, err.to_string())),
)
.into_response(),
Error::Validation(err) => (
StatusCode::BAD_REQUEST,
Json(JsonResponse::from_error(-32006, message)),
Json(JsonResponse::from_error(-32006, err.to_string())),
)
.into_response(),
Error::MalformedHeader => (
Expand Down
2 changes: 1 addition & 1 deletion bolt-sidecar/src/client/mevboost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{
};

/// A client for interacting with the MEV-Boost API.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct MevBoostClient {
url: Url,
client: reqwest::Client,
Expand Down
Loading
Loading