Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sidecar: Add relay connections #48

Merged
merged 2 commits into from
Jun 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions bolt-sidecar/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
name = "bolt-sidecar"
version = "0.1.0"
edition = "2021"
default-run = "bolt-sidecar"

[dependencies]
# core
Expand Down Expand Up @@ -56,3 +57,8 @@ tracing-subscriber = "0.3.18"

[dev-dependencies]
alloy-node-bindings = { git = "https://github.com/alloy-rs/alloy" }


[[bin]]
name = "bolt-sidecar"
path = "bin/sidecar.rs"
26 changes: 7 additions & 19 deletions bolt-sidecar/src/main.rs → bolt-sidecar/bin/sidecar.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,20 @@
#![doc = include_str!("../README.md")]
#![warn(missing_debug_implementations, missing_docs, rustdoc::all)]
#![deny(unused_must_use, rust_2018_idioms)]
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
use bolt_sidecar::{
config::{Config, Opts},
json_rpc::start_server,
};

use clap::Parser;
use tracing::info;

mod client;
mod common;
mod config;
mod crypto;
mod json_rpc;
mod pubsub;
mod state;
mod template;
mod types;

#[tokio::main]
async fn main() -> eyre::Result<()> {
tracing_subscriber::fmt::init();

info!("Starting sidecar");

let opts = config::Opts::parse();

let config = config::Config::try_from(opts)?;

let shutdown_tx = json_rpc::start_server(config.rpc_port, config.private_key).await?;
let opts = Opts::parse();
let config = Config::try_from(opts)?;
let shutdown_tx = start_server(config.rpc_port, config.private_key, config.relays).await?;

tokio::signal::ctrl_c().await?;
shutdown_tx.send(()).await.ok();
Expand Down
26 changes: 21 additions & 5 deletions bolt-sidecar/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,41 @@ use std::str::FromStr;
use clap::Parser;
use secp256k1::{rand, SecretKey};

#[derive(Parser)]
pub(super) struct Opts {
/// Port to listen on for incoming JSON-RPC requests.
/// Command-line options for the sidecar
#[derive(Parser, Debug)]
pub struct Opts {
/// Port to listen on for incoming JSON-RPC requests
#[clap(short = 'p', long)]
pub(super) port: Option<u16>,
/// Private key to use for signing preconfirmation requests.
/// Private key to use for signing preconfirmation requests
#[clap(short = 'k', long)]
pub(super) private_key: String,
/// Max commitments to accept per block.
/// List of relay HTTP endpoints to use
#[clap(short = 'r', long)]
pub(super) relays: Vec<String>,
/// Max commitments to accept per block
#[clap(short = 'm', long)]
pub(super) max_commitments: Option<usize>,
}

/// Configuration options for the sidecar
#[derive(Debug)]
pub struct Config {
/// Port to listen on for incoming JSON-RPC requests
pub rpc_port: u16,
/// Private key to use for signing preconfirmation requests
pub private_key: SecretKey,
/// List of relay HTTP endpoints to use
pub relays: Vec<String>,
/// Limits for the sidecar
pub limits: Limits,
}

impl Default for Config {
fn default() -> Self {
Self {
rpc_port: 8000,
relays: Vec::new(),
private_key: SecretKey::new(&mut rand::thread_rng()),
limits: Limits::default(),
}
Expand All @@ -46,13 +58,17 @@ impl TryFrom<Opts> for Config {
config.limits.max_commitments_per_slot = max_commitments;
}

config.relays = opts.relays;
config.private_key = SecretKey::from_str(&opts.private_key)?;

Ok(config)
}
}

/// Limits for the sidecar.
#[derive(Debug)]
pub struct Limits {
/// Maximum number of commitments to accept per block
pub max_commitments_per_slot: usize,
}

Expand Down
31 changes: 22 additions & 9 deletions bolt-sidecar/src/json_rpc/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use super::types::InclusionRequestParams;
use crate::{
crypto::{SignableECDSA, Signer},
json_rpc::types::InclusionRequestResponse,
relays::RelayManager,
types::Slot,
};

Expand Down Expand Up @@ -51,24 +52,32 @@ pub trait CommitmentsRpc {
/// # Functionality
/// - We keep track of API requests in a local cache in order to avoid
/// accepting duplicate commitments from users.
/// - We also sign each request with a BLS signature to irrevocably bind
/// the request to this sidecar's identity.
/// - We also sign each request to irrevocably bind it to this
/// sidecar's validator identity.
pub struct JsonRpcApi {
/// A cache of commitment requests.
cache: Arc<RwLock<lru::LruCache<Slot, Vec<InclusionRequestParams>>>>,
/// The signer for this sidecar.
signer: Signer,
/// The manager to interact with all connected relays.
relay_manager: RelayManager,
}

impl JsonRpcApi {
/// Create a new instance of the JSON-RPC API.
pub fn new(private_key: SecretKey) -> Self {
pub fn new(private_key: SecretKey, relays: Vec<String>) -> Arc<Self> {
let cap = NonZeroUsize::new(DEFAULT_API_REQUEST_CACHE_SIZE).unwrap();

Self {
Arc::new(Self {
cache: Arc::new(RwLock::new(lru::LruCache::new(cap))),
signer: Signer::new(private_key),
}
relay_manager: RelayManager::new(relays),
})
}

/// Shut down the API and all connected relays gracefully.
pub fn shutdown(&self) {
self.relay_manager.shutdown();
}
}

Expand Down Expand Up @@ -102,6 +111,7 @@ impl CommitmentsRpc for JsonRpcApi {
}

{
// check for duplicate requests and update the cache if necessary
let mut cache = self.cache.write();
if let Some(commitments) = cache.get_mut(&params.message.slot) {
if commitments.iter().any(|p| p == &params) {
Expand All @@ -122,12 +132,15 @@ impl CommitmentsRpc for JsonRpcApi {

// TODO: check if there is enough time left in the current slot

// TODO: If valid, broadcast the commitment to all connected relays

Ok(serde_json::to_value(InclusionRequestResponse {
let response = serde_json::to_value(InclusionRequestResponse {
request: params,
signature: sidecar_signature,
})?)
})?;

// broadcast the commitment to all connected relays in the background
self.relay_manager.broadcast_commitment(response.clone());

Ok(response)
}
}

Expand Down
21 changes: 15 additions & 6 deletions bolt-sidecar/src/json_rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,35 @@ mod types;
use self::api::CommitmentsRpc;
use self::spec::{JsonRpcError, JsonRpcRequest, JsonRpcResponse};

pub async fn start_server(port: u16, pk: SecretKey) -> eyre::Result<mpsc::Sender<()>> {
/// Start the JSON-RPC server. Returns a sender that can be used to send a shutdown signal.
pub async fn start_server(
port: u16,
pk: SecretKey,
relays: Vec<String>,
) -> eyre::Result<mpsc::Sender<()>> {
let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1);
let cors = warp::cors().allow_any_origin().allow_method(Method::POST);

let rpc_api = Arc::new(api::JsonRpcApi::new(pk));
let rpc_api = api::JsonRpcApi::new(pk, relays);
let rpc_api_context = Arc::clone(&rpc_api);

let shutdown_fn = async move {
shutdown_rx.recv().await;
rpc_api.shutdown();
};

let rpc = warp::post()
.and(warp::path::end())
.and(warp::body::bytes())
.and(warp::header::exact("content-type", "application/json"))
.and(warp::any().map(move || Arc::clone(&rpc_api)))
.and(warp::any().map(move || Arc::clone(&rpc_api_context)))
.and_then(handle_rpc_request)
.and_then(|reply| async move { Ok::<_, Rejection>(warp::reply::json(&reply)) })
.recover(handle_rejection)
.with(cors);

let (addr, server) =
warp::serve(rpc).bind_with_graceful_shutdown(([0, 0, 0, 0], port), async move {
shutdown_rx.recv().await;
});
warp::serve(rpc).bind_with_graceful_shutdown(([0, 0, 0, 0], port), shutdown_fn);

tokio::spawn(server);
info!("RPC HTTP server listening on http://{}", addr);
Expand Down
19 changes: 19 additions & 0 deletions bolt-sidecar/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#![doc = include_str!("../README.md")]
#![warn(missing_debug_implementations, missing_docs, rustdoc::all)]
#![deny(unused_must_use, rust_2018_idioms)]
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]

mod client;
mod common;
mod crypto;
mod pubsub;
mod relays;
mod state;
mod template;
mod types;

/// Configuration and command-line argument parsing for the sidecar
pub mod config;

/// JSON-RPC server and handlers for the sidecar
pub mod json_rpc;
114 changes: 114 additions & 0 deletions bolt-sidecar/src/relays.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
use serde_json::Value;
use tokio::sync::broadcast;
use tracing::{debug, error, warn};

/// The endpoint for the relay constraints API (where to broadcast commitments).
const RELAY_CONSTRAINTS_ENDPOINT: &str = "/eth/v1/builder/constraints";

#[derive(Debug, Clone)]
pub enum RelayCommand {
BroadcastCommitment { params: Value },
Shutdown,
}

/// Component responsible for dispatching commands to all relays
pub struct RelayManager {
cmd_tx: broadcast::Sender<RelayCommand>,
}

impl RelayManager {
/// Create a new relay manager with the given endpoints and start the relay clients.
/// This method will spawn a new background task for each relay client.
pub fn new(endpoints: Vec<String>) -> Self {
let (cmd_tx, _) = broadcast::channel(64);

for endpoint in endpoints {
let relay = Relay {
cmd_rx: cmd_tx.subscribe(),
api: RelayClient {
endpoint: endpoint.trim_end_matches('/').to_string(),
client: reqwest::Client::new(),
},
};
tokio::spawn(relay.start());
}

Self { cmd_tx }
}

/// Broadcasts a commitment to all connected relays in the background.
pub fn broadcast_commitment(&self, params: Value) {
let _ = self
.cmd_tx
.send(RelayCommand::BroadcastCommitment { params });
}

/// Shuts down all relay clients gracefully.
pub fn shutdown(&self) {
let _ = self.cmd_tx.send(RelayCommand::Shutdown);
}
}

pub struct Relay<R> {
cmd_rx: broadcast::Receiver<RelayCommand>,
api: R,
}

pub trait RelayClientAPI {
fn endpoint(&self) -> &str;

fn broadcast_commitment(&self, params: Value);
}

impl<R: RelayClientAPI + Sync> Relay<R> {
/// Start the relay client in a background task
async fn start(mut self) {
loop {
tokio::select! {
Ok(cmd) = self.cmd_rx.recv() => {
match cmd {
RelayCommand::BroadcastCommitment { params } => {
self.api.broadcast_commitment(params);
}
RelayCommand::Shutdown => {
warn!("Shutting down relay client: {}", self.api.endpoint());
break;
}
}
},
}
}
}
}

pub struct RelayClient {
endpoint: String,
client: reqwest::Client,
}

impl RelayClientAPI for RelayClient {
fn endpoint(&self) -> &str {
&self.endpoint
}

fn broadcast_commitment(&self, params: Value) {
let endpoint = format!("{}{}", self.endpoint, RELAY_CONSTRAINTS_ENDPOINT);
let request = self.client.post(endpoint.clone()).json(&params);

tokio::spawn(async move {
let response = match request.send().await {
Ok(res) => res,
Err(e) => {
error!("Failed to broadcast commitment to {}: {}", endpoint, e);
return;
}
};

debug!(
"Broadcasted commitment to {} with status: {}",
endpoint,
response.status()
);
});
}
}