-
Notifications
You must be signed in to change notification settings - Fork 25
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #97 from chainbound/feat/fallback-builder-api
feat(sidecar): integrate local builder with builder api
- Loading branch information
Showing
33 changed files
with
1,161 additions
and
597 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,142 +1,169 @@ | ||
use std::time::Duration; | ||
|
||
use alloy_rpc_types_beacon::events::HeadEvent; | ||
use tokio::sync::mpsc; | ||
use tracing::info; | ||
|
||
use bolt_sidecar::{ | ||
crypto::{ | ||
bls::{Signer, SignerBLS}, | ||
SignableBLS, | ||
}, | ||
json_rpc::{self, api::ApiError}, | ||
crypto::{bls::Signer, SignableBLS, SignerBLS}, | ||
json_rpc::api::{ApiError, ApiEvent}, | ||
primitives::{ | ||
BatchedSignedConstraints, ChainHead, CommitmentRequest, ConstraintsMessage, | ||
LocalPayloadFetcher, SignedConstraints, | ||
CommitmentRequest, ConstraintsMessage, FetchPayloadRequest, LocalPayloadFetcher, | ||
SignedConstraints, | ||
}, | ||
spec::ConstraintsApi, | ||
start_builder_proxy, | ||
state::{ | ||
fetcher::{StateClient, StateFetcher}, | ||
ConsensusState, ExecutionState, | ||
}, | ||
BuilderProxyConfig, Config, MevBoostClient, | ||
start_builder_proxy_server, start_rpc_server, | ||
state::{ConsensusState, ExecutionState, HeadTracker, StateClient}, | ||
BuilderProxyConfig, Config, ConstraintsApi, LocalBuilder, MevBoostClient, | ||
}; | ||
|
||
use tokio::sync::mpsc; | ||
use tracing::info; | ||
|
||
#[tokio::main] | ||
async fn main() -> eyre::Result<()> { | ||
tracing_subscriber::fmt::init(); | ||
|
||
info!("Starting sidecar"); | ||
|
||
let config = Config::parse_from_cli()?; | ||
|
||
info!(chain = config.chain.name(), "Starting Bolt sidecar"); | ||
|
||
// TODO: support external signers | ||
// probably it's cleanest to have the Config parser initialize a generic Signer | ||
let signer = Signer::new(config.private_key.clone().unwrap()); | ||
|
||
let state_client = StateClient::new(&config.execution_api_url, 8); | ||
let mevboost_client = MevBoostClient::new(&config.mevboost_url); | ||
let state_client = StateClient::new(&config.execution_api_url); | ||
let mut execution_state = ExecutionState::new(state_client).await?; | ||
|
||
let head = state_client.get_head().await?; | ||
let mut execution_state = ExecutionState::new(state_client, ChainHead::new(0, head)).await?; | ||
let mevboost_client = MevBoostClient::new(&config.mevboost_url); | ||
|
||
let (api_events, mut api_events_rx) = mpsc::channel(1024); | ||
let shutdown_tx = json_rpc::start_server(&config, api_events).await?; | ||
let consensus_state = ConsensusState::new(&config.beacon_api_url, &config.validator_indexes); | ||
let shutdown_tx = start_rpc_server(&config, api_events).await?; | ||
let mut consensus_state = ConsensusState::new( | ||
&config.beacon_api_url, | ||
&config.validator_indexes, | ||
config.chain.commitment_deadline(), | ||
); | ||
|
||
// TODO: this can be replaced with ethereum_consensus::clock::from_system_time() | ||
// but using beacon node events is easier to work on a custom devnet for now | ||
// (as we don't need to specify genesis time and slot duration) | ||
let mut head_tracker = HeadTracker::start(&config.beacon_api_url); | ||
|
||
let builder_proxy_config = BuilderProxyConfig { | ||
mevboost_url: config.mevboost_url, | ||
mevboost_url: config.mevboost_url.clone(), | ||
server_port: config.mevboost_proxy_port, | ||
}; | ||
|
||
let (payload_tx, mut payload_rx) = mpsc::channel(16); | ||
let payload_fetcher = LocalPayloadFetcher::new(payload_tx); | ||
|
||
let mut local_builder = LocalBuilder::new(&config); | ||
|
||
tokio::spawn(async move { | ||
loop { | ||
if let Err(e) = | ||
start_builder_proxy(payload_fetcher.clone(), builder_proxy_config.clone()).await | ||
{ | ||
tracing::error!("Builder API proxy failed: {:?}", e); | ||
tokio::time::sleep(Duration::from_secs(5)).await; | ||
} | ||
if let Err(e) = start_builder_proxy_server(payload_fetcher, builder_proxy_config).await { | ||
tracing::error!("Builder API proxy failed: {:?}", e); | ||
} | ||
}); | ||
|
||
// TODO: parallelize this | ||
loop { | ||
tokio::select! { | ||
Some(event) = api_events_rx.recv() => { | ||
tracing::info!("Received commitment request: {:?}", event.request); | ||
let request = event.request; | ||
Some(ApiEvent { request, response_tx }) = api_events_rx.recv() => { | ||
tracing::info!("Received commitment request: {:?}", request); | ||
|
||
let validator_index = match consensus_state.validate_request(&CommitmentRequest::Inclusion(request.clone())) { | ||
let validator_index = match consensus_state.validate_request(&request) { | ||
Ok(index) => index, | ||
Err(e) => { | ||
tracing::error!("Failed to validate request: {:?}", e); | ||
let _ = event.response.send(Err(ApiError::Custom(e.to_string()))); | ||
let _ = response_tx.send(Err(ApiError::Custom(e.to_string()))); | ||
continue; | ||
} | ||
}; | ||
|
||
if let Err(e) = execution_state | ||
.try_commit(&CommitmentRequest::Inclusion(request.clone())) | ||
.try_commit(&request) | ||
.await | ||
{ | ||
tracing::error!("Failed to commit request: {:?}", e); | ||
let _ = event.response.send(Err(ApiError::Custom(e.to_string()))); | ||
let _ = response_tx.send(Err(ApiError::Custom(e.to_string()))); | ||
continue; | ||
} | ||
|
||
// TODO: match when we have more request types | ||
let CommitmentRequest::Inclusion(request) = request; | ||
|
||
tracing::info!( | ||
tx_hash = %request.tx.tx_hash(), | ||
tx_hash = %request.tx.hash(), | ||
"Validation against execution state passed" | ||
); | ||
|
||
// parse the request into constraints and sign them with the sidecar signer | ||
let message = ConstraintsMessage::build(validator_index, request.slot, request.clone()); | ||
let message = ConstraintsMessage::build(validator_index, request.slot, request); | ||
|
||
let signature = signer.sign(&message.digest())?; | ||
let signed_constraints: BatchedSignedConstraints = | ||
vec![SignedConstraints { message, signature: signature.to_string() }]; | ||
let signature = signer.sign(&message.digest())?.to_string(); | ||
let signed_constraints = vec![SignedConstraints { message, signature }]; | ||
|
||
// TODO: fix retry logic | ||
let max_retries = 5; | ||
let mut i = 0; | ||
while let Err(e) = mevboost_client | ||
'inner: while let Err(e) = mevboost_client | ||
.submit_constraints(&signed_constraints) | ||
.await | ||
{ | ||
tracing::error!(error = ?e, "Error submitting constraints, retrying..."); | ||
tracing::error!(err = ?e, "Error submitting constraints, retrying..."); | ||
tokio::time::sleep(Duration::from_millis(100)).await; | ||
i+=1; | ||
if i >= max_retries { | ||
break | ||
break 'inner | ||
} | ||
} | ||
} | ||
Some(request) = payload_rx.recv() => { | ||
tracing::info!("Received local payload request: {:?}", request); | ||
let Some(response) = execution_state.get_block_template(request.slot) else { | ||
tracing::warn!("No block template found for slot {} when requested", request.slot); | ||
let _ = request.response.send(None); | ||
|
||
let res = serde_json::to_value(signed_constraints).map_err(Into::into); | ||
let _ = response_tx.send(res).ok(); | ||
}, | ||
Ok(HeadEvent { slot, .. }) = head_tracker.next_head() => { | ||
tracing::info!(slot, "Received new head event"); | ||
|
||
// We use None to signal that we want to fetch the latest EL head | ||
if let Err(e) = execution_state.update_head(None).await { | ||
tracing::error!(err = ?e, "Failed to update execution state head"); | ||
} | ||
|
||
if let Err(e) = consensus_state.update_head(slot).await { | ||
tracing::error!(err = ?e, "Failed to update consensus state head"); | ||
} | ||
}, | ||
Some(slot) = consensus_state.commitment_deadline.wait() => { | ||
tracing::info!(slot, "Commitment deadline reached, starting to build local block"); | ||
|
||
let Some(template) = execution_state.get_block_template(slot) else { | ||
tracing::warn!("No block template found for slot {slot} when requested"); | ||
continue; | ||
}; | ||
|
||
// For fallback block building, we need to turn a block template into an actual SignedBuilderBid. | ||
// This will also require building the full ExecutionPayload that we want the proposer to commit to. | ||
// Once we have that, we need to send it as response to the validator via the pending get_header RPC call. | ||
// The validator will then call get_payload with the corresponding SignedBlindedBeaconBlock. We then need to | ||
// respond with the full ExecutionPayload inside the BeaconBlock (+ blobs if any). | ||
if let Err(e) = local_builder.build_new_local_payload(template.transactions).await { | ||
tracing::error!(err = ?e, "CRITICAL: Error while building local payload at slot deadline for {slot}"); | ||
}; | ||
}, | ||
Some(FetchPayloadRequest { slot, response_tx }) = payload_rx.recv() => { | ||
tracing::info!(slot, "Received local payload request"); | ||
|
||
let _ = request.response.send(None); | ||
} | ||
let Some(payload_and_bid) = local_builder.get_cached_payload() else { | ||
tracing::warn!("No local payload found for {slot}"); | ||
let _ = response_tx.send(None); | ||
continue; | ||
}; | ||
|
||
else => break, | ||
if let Err(e) = response_tx.send(Some(payload_and_bid)) { | ||
tracing::error!(err = ?e, "Failed to send payload and bid in response channel"); | ||
} else { | ||
tracing::debug!("Sent payload and bid to response channel"); | ||
} | ||
}, | ||
Ok(_) = tokio::signal::ctrl_c() => { | ||
tracing::info!("Received SIGINT, shutting down..."); | ||
shutdown_tx.send(()).await.ok(); | ||
break; | ||
} | ||
} | ||
} | ||
|
||
tokio::signal::ctrl_c().await?; | ||
shutdown_tx.send(()).await.ok(); | ||
|
||
Ok(()) | ||
} |
Oops, something went wrong.