Skip to content

Commit

Permalink
Merge pull request #48 from chainbound/feat/sidecar/relay
Browse files Browse the repository at this point in the history
Sidecar: Add relay connections
  • Loading branch information
Jonas Bostoen authored Jun 3, 2024
2 parents bd57954 + 4499b8e commit f475b9d
Show file tree
Hide file tree
Showing 7 changed files with 204 additions and 39 deletions.
6 changes: 6 additions & 0 deletions bolt-sidecar/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
name = "bolt-sidecar"
version = "0.1.0"
edition = "2021"
default-run = "bolt-sidecar"

[dependencies]
# core
Expand Down Expand Up @@ -56,3 +57,8 @@ tracing-subscriber = "0.3.18"

[dev-dependencies]
alloy-node-bindings = { git = "https://github.com/alloy-rs/alloy" }


[[bin]]
name = "bolt-sidecar"
path = "bin/sidecar.rs"
26 changes: 7 additions & 19 deletions bolt-sidecar/src/main.rs → bolt-sidecar/bin/sidecar.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,20 @@
#![doc = include_str!("../README.md")]
#![warn(missing_debug_implementations, missing_docs, rustdoc::all)]
#![deny(unused_must_use, rust_2018_idioms)]
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
use bolt_sidecar::{
config::{Config, Opts},
json_rpc::start_server,
};

use clap::Parser;
use tracing::info;

mod client;
mod common;
mod config;
mod crypto;
mod json_rpc;
mod pubsub;
mod state;
mod template;
mod types;

#[tokio::main]
async fn main() -> eyre::Result<()> {
tracing_subscriber::fmt::init();

info!("Starting sidecar");

let opts = config::Opts::parse();

let config = config::Config::try_from(opts)?;

let shutdown_tx = json_rpc::start_server(config.rpc_port, config.private_key).await?;
let opts = Opts::parse();
let config = Config::try_from(opts)?;
let shutdown_tx = start_server(config.rpc_port, config.private_key, config.relays).await?;

tokio::signal::ctrl_c().await?;
shutdown_tx.send(()).await.ok();
Expand Down
26 changes: 21 additions & 5 deletions bolt-sidecar/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,41 @@ use std::str::FromStr;
use clap::Parser;
use secp256k1::{rand, SecretKey};

#[derive(Parser)]
pub(super) struct Opts {
/// Port to listen on for incoming JSON-RPC requests.
/// Command-line options for the sidecar
#[derive(Parser, Debug)]
pub struct Opts {
/// Port to listen on for incoming JSON-RPC requests
#[clap(short = 'p', long)]
pub(super) port: Option<u16>,
/// Private key to use for signing preconfirmation requests.
/// Private key to use for signing preconfirmation requests
#[clap(short = 'k', long)]
pub(super) private_key: String,
/// Max commitments to accept per block.
/// List of relay HTTP endpoints to use
#[clap(short = 'r', long)]
pub(super) relays: Vec<String>,
/// Max commitments to accept per block
#[clap(short = 'm', long)]
pub(super) max_commitments: Option<usize>,
}

/// Configuration options for the sidecar
#[derive(Debug)]
pub struct Config {
/// Port to listen on for incoming JSON-RPC requests
pub rpc_port: u16,
/// Private key to use for signing preconfirmation requests
pub private_key: SecretKey,
/// List of relay HTTP endpoints to use
pub relays: Vec<String>,
/// Limits for the sidecar
pub limits: Limits,
}

impl Default for Config {
fn default() -> Self {
Self {
rpc_port: 8000,
relays: Vec::new(),
private_key: SecretKey::new(&mut rand::thread_rng()),
limits: Limits::default(),
}
Expand All @@ -46,13 +58,17 @@ impl TryFrom<Opts> for Config {
config.limits.max_commitments_per_slot = max_commitments;
}

config.relays = opts.relays;
config.private_key = SecretKey::from_str(&opts.private_key)?;

Ok(config)
}
}

/// Limits for the sidecar.
#[derive(Debug)]
pub struct Limits {
/// Maximum number of commitments to accept per block
pub max_commitments_per_slot: usize,
}

Expand Down
31 changes: 22 additions & 9 deletions bolt-sidecar/src/json_rpc/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use super::types::InclusionRequestParams;
use crate::{
crypto::{SignableECDSA, Signer},
json_rpc::types::InclusionRequestResponse,
relays::RelayManager,
types::Slot,
};

Expand Down Expand Up @@ -51,24 +52,32 @@ pub trait CommitmentsRpc {
/// # Functionality
/// - We keep track of API requests in a local cache in order to avoid
/// accepting duplicate commitments from users.
/// - We also sign each request with a BLS signature to irrevocably bind
/// the request to this sidecar's identity.
/// - We also sign each request to irrevocably bind it to this
/// sidecar's validator identity.
pub struct JsonRpcApi {
/// A cache of commitment requests.
cache: Arc<RwLock<lru::LruCache<Slot, Vec<InclusionRequestParams>>>>,
/// The signer for this sidecar.
signer: Signer,
/// The manager to interact with all connected relays.
relay_manager: RelayManager,
}

impl JsonRpcApi {
/// Create a new instance of the JSON-RPC API.
pub fn new(private_key: SecretKey) -> Self {
pub fn new(private_key: SecretKey, relays: Vec<String>) -> Arc<Self> {
let cap = NonZeroUsize::new(DEFAULT_API_REQUEST_CACHE_SIZE).unwrap();

Self {
Arc::new(Self {
cache: Arc::new(RwLock::new(lru::LruCache::new(cap))),
signer: Signer::new(private_key),
}
relay_manager: RelayManager::new(relays),
})
}

/// Shut down the API and all connected relays gracefully.
pub fn shutdown(&self) {
self.relay_manager.shutdown();
}
}

Expand Down Expand Up @@ -102,6 +111,7 @@ impl CommitmentsRpc for JsonRpcApi {
}

{
// check for duplicate requests and update the cache if necessary
let mut cache = self.cache.write();
if let Some(commitments) = cache.get_mut(&params.message.slot) {
if commitments.iter().any(|p| p == &params) {
Expand All @@ -122,12 +132,15 @@ impl CommitmentsRpc for JsonRpcApi {

// TODO: check if there is enough time left in the current slot

// TODO: If valid, broadcast the commitment to all connected relays

Ok(serde_json::to_value(InclusionRequestResponse {
let response = serde_json::to_value(InclusionRequestResponse {
request: params,
signature: sidecar_signature,
})?)
})?;

// broadcast the commitment to all connected relays in the background
self.relay_manager.broadcast_commitment(response.clone());

Ok(response)
}
}

Expand Down
21 changes: 15 additions & 6 deletions bolt-sidecar/src/json_rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,35 @@ mod types;
use self::api::CommitmentsRpc;
use self::spec::{JsonRpcError, JsonRpcRequest, JsonRpcResponse};

pub async fn start_server(port: u16, pk: SecretKey) -> eyre::Result<mpsc::Sender<()>> {
/// Start the JSON-RPC server. Returns a sender that can be used to send a shutdown signal.
pub async fn start_server(
port: u16,
pk: SecretKey,
relays: Vec<String>,
) -> eyre::Result<mpsc::Sender<()>> {
let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1);
let cors = warp::cors().allow_any_origin().allow_method(Method::POST);

let rpc_api = Arc::new(api::JsonRpcApi::new(pk));
let rpc_api = api::JsonRpcApi::new(pk, relays);
let rpc_api_context = Arc::clone(&rpc_api);

let shutdown_fn = async move {
shutdown_rx.recv().await;
rpc_api.shutdown();
};

let rpc = warp::post()
.and(warp::path::end())
.and(warp::body::bytes())
.and(warp::header::exact("content-type", "application/json"))
.and(warp::any().map(move || Arc::clone(&rpc_api)))
.and(warp::any().map(move || Arc::clone(&rpc_api_context)))
.and_then(handle_rpc_request)
.and_then(|reply| async move { Ok::<_, Rejection>(warp::reply::json(&reply)) })
.recover(handle_rejection)
.with(cors);

let (addr, server) =
warp::serve(rpc).bind_with_graceful_shutdown(([0, 0, 0, 0], port), async move {
shutdown_rx.recv().await;
});
warp::serve(rpc).bind_with_graceful_shutdown(([0, 0, 0, 0], port), shutdown_fn);

tokio::spawn(server);
info!("RPC HTTP server listening on http://{}", addr);
Expand Down
19 changes: 19 additions & 0 deletions bolt-sidecar/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#![doc = include_str!("../README.md")]
#![warn(missing_debug_implementations, missing_docs, rustdoc::all)]
#![deny(unused_must_use, rust_2018_idioms)]
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]

mod client;
mod common;
mod crypto;
mod pubsub;
mod relays;
mod state;
mod template;
mod types;

/// Configuration and command-line argument parsing for the sidecar
pub mod config;

/// JSON-RPC server and handlers for the sidecar
pub mod json_rpc;
114 changes: 114 additions & 0 deletions bolt-sidecar/src/relays.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
use serde_json::Value;
use tokio::sync::broadcast;
use tracing::{debug, error, warn};

/// The endpoint for the relay constraints API (where to broadcast commitments).
const RELAY_CONSTRAINTS_ENDPOINT: &str = "/eth/v1/builder/constraints";

#[derive(Debug, Clone)]
pub enum RelayCommand {
BroadcastCommitment { params: Value },
Shutdown,
}

/// Component responsible for dispatching commands to all relays
pub struct RelayManager {
cmd_tx: broadcast::Sender<RelayCommand>,
}

impl RelayManager {
/// Create a new relay manager with the given endpoints and start the relay clients.
/// This method will spawn a new background task for each relay client.
pub fn new(endpoints: Vec<String>) -> Self {
let (cmd_tx, _) = broadcast::channel(64);

for endpoint in endpoints {
let relay = Relay {
cmd_rx: cmd_tx.subscribe(),
api: RelayClient {
endpoint: endpoint.trim_end_matches('/').to_string(),
client: reqwest::Client::new(),
},
};
tokio::spawn(relay.start());
}

Self { cmd_tx }
}

/// Broadcasts a commitment to all connected relays in the background.
pub fn broadcast_commitment(&self, params: Value) {
let _ = self
.cmd_tx
.send(RelayCommand::BroadcastCommitment { params });
}

/// Shuts down all relay clients gracefully.
pub fn shutdown(&self) {
let _ = self.cmd_tx.send(RelayCommand::Shutdown);
}
}

pub struct Relay<R> {
cmd_rx: broadcast::Receiver<RelayCommand>,
api: R,
}

pub trait RelayClientAPI {
fn endpoint(&self) -> &str;

fn broadcast_commitment(&self, params: Value);
}

impl<R: RelayClientAPI + Sync> Relay<R> {
/// Start the relay client in a background task
async fn start(mut self) {
loop {
tokio::select! {
Ok(cmd) = self.cmd_rx.recv() => {
match cmd {
RelayCommand::BroadcastCommitment { params } => {
self.api.broadcast_commitment(params);
}
RelayCommand::Shutdown => {
warn!("Shutting down relay client: {}", self.api.endpoint());
break;
}
}
},
}
}
}
}

pub struct RelayClient {
endpoint: String,
client: reqwest::Client,
}

impl RelayClientAPI for RelayClient {
fn endpoint(&self) -> &str {
&self.endpoint
}

fn broadcast_commitment(&self, params: Value) {
let endpoint = format!("{}{}", self.endpoint, RELAY_CONSTRAINTS_ENDPOINT);
let request = self.client.post(endpoint.clone()).json(&params);

tokio::spawn(async move {
let response = match request.send().await {
Ok(res) => res,
Err(e) => {
error!("Failed to broadcast commitment to {}: {}", endpoint, e);
return;
}
};

debug!(
"Broadcasted commitment to {} with status: {}",
endpoint,
response.status()
);
});
}
}

0 comments on commit f475b9d

Please sign in to comment.