From 9c0a7cec34281f82e0730c7a5fcfb4687b40a7be Mon Sep 17 00:00:00 2001 From: thedevbirb Date: Wed, 15 Jan 2025 14:58:46 +0100 Subject: [PATCH 1/4] chore(sidecar): instrument poll with rpc_url --- .../src/api/commitments/firewall/processor.rs | 32 ++++++++----------- 1 file changed, 13 insertions(+), 19 deletions(-) diff --git a/bolt-sidecar/src/api/commitments/firewall/processor.rs b/bolt-sidecar/src/api/commitments/firewall/processor.rs index 6704decf..e2d2c2e8 100644 --- a/bolt-sidecar/src/api/commitments/firewall/processor.rs +++ b/bolt-sidecar/src/api/commitments/firewall/processor.rs @@ -17,7 +17,7 @@ use tokio_tungstenite::{ tungstenite::{self, Message}, MaybeTlsStream, WebSocketStream, }; -use tracing::{debug, error, info, trace, warn}; +use tracing::{debug, error, info, instrument, trace, warn}; use uuid::Uuid; use crate::{ @@ -140,12 +140,12 @@ impl CommitmentRequestProcessor { impl Future for CommitmentRequestProcessor { type Output = InterruptReason; + #[instrument(name = "", skip_all, fields(url = %self.url))] fn poll( self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll { let this = self.get_mut(); - let rpc_url = this.url.clone(); loop { let mut progress = false; @@ -170,7 +170,7 @@ impl Future for CommitmentRequestProcessor { // Try to send the message to the sink, for later flushing. if let Err(e) = this.write_sink.start_send_unpin(message) { - error!(?e, ?rpc_url, "failed to send message to websocket write sink"); + error!(?e, "failed to send message to websocket write sink"); continue; } } @@ -210,7 +210,7 @@ impl Future for CommitmentRequestProcessor { progress = true; let Some(res_message) = maybe_message else { - warn!(?rpc_url, "websocket read stream terminated"); + warn!("websocket read stream terminated"); return Poll::Ready(InterruptReason::ReadStreamTerminated); }; @@ -219,27 +219,23 @@ impl Future for CommitmentRequestProcessor { this.handle_text_message(text); } Ok(Message::Close(_)) => { - warn!(?rpc_url, "websocket connection closed by server"); + warn!("websocket connection closed by server"); return Poll::Ready(InterruptReason::ConnectionClosed); } Ok(Message::Binary(data)) => { - debug!( - ?rpc_url, - bytes_len = data.len(), - "received unexpected binary message" - ); + debug!(bytes_len = data.len(), "received unexpected binary message"); } Ok(Message::Ping(_)) => { - trace!(?rpc_url, "received ping message"); + trace!("received ping message"); } Ok(Message::Pong(_)) => { - trace!(?rpc_url, "received pong message"); + trace!("received pong message"); } Ok(Message::Frame(_)) => { - debug!(?rpc_url, "received unexpected raw frame"); + debug!("received unexpected raw frame"); } Err(e) => { - error!(?e, ?rpc_url, "error reading message from websocket connection"); + error!(?e, "error reading message from websocket connection"); } } } @@ -292,15 +288,13 @@ impl CommitmentRequestProcessor { } fn handle_text_message(&mut self, text: String) { - let rpc_url = self.url.clone(); - - trace!(?rpc_url, text, "received text message from websocket connection"); + trace!(text, "received text message from websocket connection"); let (tx, rx) = oneshot::channel(); let request = match serde_json::from_str::(&text) { Ok(req) => req, Err(e) => { - warn!(?e, ?rpc_url, "failed to parse JSON-RPC request"); + warn!(?e, "failed to parse JSON-RPC request"); return; } }; @@ -363,7 +357,7 @@ impl CommitmentRequestProcessor { self.pending_commitment_responses.push(PendingCommitmentResponse::new(rx, id)); } other => { - warn!(?rpc_url, "unsupported method: {}", other); + warn!("unsupported method: {}", other); } }; } From a52df0f58588898af5a5af26ddaa4cd352807b4c Mon Sep 17 00:00:00 2001 From: thedevbirb Date: Wed, 15 Jan 2025 15:30:01 +0100 Subject: [PATCH 2/4] fix(sidecar): drop pingticker and use intervals --- .../src/api/commitments/firewall/processor.rs | 33 ++++++----- .../src/api/commitments/firewall/receiver.rs | 57 ++----------------- 2 files changed, 26 insertions(+), 64 deletions(-) diff --git a/bolt-sidecar/src/api/commitments/firewall/processor.rs b/bolt-sidecar/src/api/commitments/firewall/processor.rs index e2d2c2e8..a4e75ddd 100644 --- a/bolt-sidecar/src/api/commitments/firewall/processor.rs +++ b/bolt-sidecar/src/api/commitments/firewall/processor.rs @@ -4,6 +4,7 @@ use futures::{ }; use serde::Serialize; use serde_json::{json, Value}; +use std::time::Duration; use std::{collections::VecDeque, future::Future, pin::Pin, task::Poll}; use tokio::{ net::TcpStream, @@ -12,6 +13,7 @@ use tokio::{ mpsc, oneshot::{self, error::RecvError}, }, + time::Interval, }; use tokio_tungstenite::{ tungstenite::{self, Message}, @@ -39,6 +41,12 @@ use crate::{ }, }; +/// The interval at which to send ping messages from connected clients. +#[cfg(test)] +const PING_INTERVAL: Duration = Duration::from_secs(4); +#[cfg(not(test))] +const PING_INTERVAL: Duration = Duration::from_secs(30); + /// The reason for interrupting the [CommitmentRequestProcessor] future. #[derive(Debug)] pub enum InterruptReason { @@ -99,8 +107,8 @@ pub struct CommitmentRequestProcessor { write_sink: SplitSink>, Message>, /// The websocket reader stream. read_stream: SplitStream>>, - /// The receiver for keep-alive ping messages. - ping_rx: broadcast::Receiver<()>, + /// The interval at which to send ping messages to the connected websocket server. + ping_interval: Interval, /// The receiver for shutdown signals. shutdown_rx: broadcast::Receiver<()>, /// The collection of pending commitment requests responses, sent with [api_events_tx]. @@ -118,7 +126,6 @@ impl CommitmentRequestProcessor { state: ProcessorState, tx: mpsc::Sender, stream: WebSocketStream>, - ping_rx: broadcast::Receiver<()>, shutdown_rx: broadcast::Receiver<()>, ) -> Self { let (write_sink, read_stream) = stream.split(); @@ -129,7 +136,7 @@ impl CommitmentRequestProcessor { api_events_tx: tx, write_sink, read_stream, - ping_rx, + ping_interval: tokio::time::interval(PING_INTERVAL), shutdown_rx, pending_commitment_responses: FuturesUnordered::new(), outgoing_messages: VecDeque::new(), @@ -192,6 +199,10 @@ impl Future for CommitmentRequestProcessor { } // 4. Handle shutdown signals before accepting any new work + // + // FIXME: (thedevbirb, 2025-01-15) this is not using the context waker to poll the shutdown signal. + // As such it will be triggered only if a message is sent _and_ another + // event will wake this task. match this.shutdown_rx.try_recv() { Ok(_) => { info!("received shutdown signal. closing websocket connection..."); @@ -241,18 +252,14 @@ impl Future for CommitmentRequestProcessor { } // 6. Handle ping messages - match this.ping_rx.try_recv() { - Ok(_) => { + match this.ping_interval.poll_tick(cx) { + Poll::Ready(_) => { progress = true; + + trace!("sending ping message to websocket connection"); this.outgoing_messages.push_back(Message::Ping(vec![8, 0, 1, 7])); } - Err(TryRecvError::Closed) => { - error!("ping channel for keep-alive messages closed"); - } - Err(TryRecvError::Lagged(lost)) => { - error!("ping channel for keep-alives lagged by {} messages", lost) - } - _ => {} + Poll::Pending => { /* fallthrough */ } } if !progress { diff --git a/bolt-sidecar/src/api/commitments/firewall/receiver.rs b/bolt-sidecar/src/api/commitments/firewall/receiver.rs index 03ab6226..6b94c85c 100644 --- a/bolt-sidecar/src/api/commitments/firewall/receiver.rs +++ b/bolt-sidecar/src/api/commitments/firewall/receiver.rs @@ -1,8 +1,5 @@ use alloy::signers::local::PrivateKeySigner; -use std::{ - fmt::{self, Debug, Formatter}, - time::Duration, -}; +use std::fmt::{self, Debug, Formatter}; use thiserror::Error; use tokio::sync::{broadcast, mpsc}; use tokio_tungstenite::{ @@ -28,14 +25,9 @@ use super::{ processor::{CommitmentRequestProcessor, InterruptReason, ProcessorState}, }; -/// The interval at which to send ping messages from connected clients. -#[cfg(test)] -const PING_INTERVAL: Duration = Duration::from_secs(4); -#[cfg(not(test))] -const PING_INTERVAL: Duration = Duration::from_secs(30); - /// The maximum number of retries to attempt when reconnecting to a websocket server. -const MAX_RETRIES: usize = 1000; +/// Try indefinitely. +const MAX_RETRIES: usize = usize::MAX; /// The maximum messages size to receive via websocket connection, in bits, set to 32MiB. /// @@ -114,10 +106,8 @@ impl CommitmentsReceiver { // mspc channel where every websocket connection will send commitment events over its own // tx to a single receiver. let (api_events_tx, api_events_rx) = mpsc::channel(self.urls.len() * 2); - let (ping_tx, ping_rx) = broadcast::channel::<()>(1); let (shutdown_tx, shutdown_rx) = broadcast::channel::<()>(1); - PingTicker::new(PING_INTERVAL).spawn(ping_tx); ShutdownTicker::new(self.signal).spawn(shutdown_tx); let signer = PrivateKeySigner::from_signing_key(self.operator_private_key.0); @@ -129,7 +119,6 @@ impl CommitmentsReceiver { // task. let url = url.to_string(); let api_events_tx = api_events_tx.clone(); - let ping_rx = ping_rx.resubscribe(); let shutdown_rx = shutdown_rx.resubscribe(); let signer = signer.clone(); @@ -151,12 +140,10 @@ impl CommitmentsReceiver { .expect("failed to produce JWT"); let api_events_tx = api_events_tx.clone(); - let ping_rx = ping_rx.resubscribe(); let shutdown_rx = shutdown_rx.resubscribe(); async move { - handle_connection(url, state, jwt, api_events_tx, ping_rx, shutdown_rx) - .await + handle_connection(url, state, jwt, api_events_tx, shutdown_rx).await } }, handle_error, @@ -180,7 +167,6 @@ async fn handle_connection( state: ProcessorState, jwt: String, api_events_tx: mpsc::Sender, - ping_rx: broadcast::Receiver<()>, shutdown_rx: broadcast::Receiver<()>, ) -> Result<(), ConnectionHandlerError> { let ws_config = @@ -197,14 +183,8 @@ async fn handle_connection( // For each opened connection, create a new commitment processor // able to handle incoming message requests. - let commitment_request_processor = CommitmentRequestProcessor::new( - url, - state, - api_events_tx, - stream, - ping_rx, - shutdown_rx, - ); + let commitment_request_processor = + CommitmentRequestProcessor::new(url, state, api_events_tx, stream, shutdown_rx); let interrupt_reason = commitment_request_processor.await; Err(ConnectionHandlerError::ProcessorInterrupted(interrupt_reason)) } @@ -242,31 +222,6 @@ fn handle_error(err: &ConnectionHandlerError) -> bool { !is_shutdown } -/// A ping ticker that sends ping messages to connected clients at regular intervals. -struct PingTicker { - interval: Duration, -} - -impl PingTicker { - pub fn new(interval: Duration) -> Self { - Self { interval } - } - - /// Spawn a task to ping connected clients at regular intervals. - pub fn spawn(self, tx: broadcast::Sender<()>) { - tokio::spawn(async move { - let mut interval = tokio::time::interval(self.interval); - - loop { - interval.tick().await; - if tx.send(()).is_err() { - error!("internal error while sending ping task: dropped receiver") - } - } - }); - } -} - /// A shutdown ticker that sends a shutdown signal to connected clients. struct ShutdownTicker { signal: ShutdownSignal, From 987410d6cb61be7a15e84ab6b4b1f2cb37e18095 Mon Sep 17 00:00:00 2001 From: thedevbirb Date: Wed, 15 Jan 2025 16:35:05 +0100 Subject: [PATCH 3/4] fix(sidecar): use watch channel for shutdown --- .../src/api/commitments/firewall/processor.rs | 24 +++++++++++-------- .../src/api/commitments/firewall/receiver.rs | 12 +++++----- 2 files changed, 20 insertions(+), 16 deletions(-) diff --git a/bolt-sidecar/src/api/commitments/firewall/processor.rs b/bolt-sidecar/src/api/commitments/firewall/processor.rs index a4e75ddd..c3c58cfb 100644 --- a/bolt-sidecar/src/api/commitments/firewall/processor.rs +++ b/bolt-sidecar/src/api/commitments/firewall/processor.rs @@ -1,4 +1,5 @@ use futures::{ + pin_mut, stream::{FuturesUnordered, SplitSink, SplitStream}, FutureExt, SinkExt, StreamExt, }; @@ -9,9 +10,9 @@ use std::{collections::VecDeque, future::Future, pin::Pin, task::Poll}; use tokio::{ net::TcpStream, sync::{ - broadcast::{self, error::TryRecvError}, mpsc, oneshot::{self, error::RecvError}, + watch, }, time::Interval, }; @@ -110,7 +111,7 @@ pub struct CommitmentRequestProcessor { /// The interval at which to send ping messages to the connected websocket server. ping_interval: Interval, /// The receiver for shutdown signals. - shutdown_rx: broadcast::Receiver<()>, + shutdown_rx: watch::Receiver<()>, /// The collection of pending commitment requests responses, sent with [api_events_tx]. /// SAFETY: the `poll` implementation of this struct promptly handles these responses and /// ensures this vector doesn't grow indefinitely. @@ -126,7 +127,7 @@ impl CommitmentRequestProcessor { state: ProcessorState, tx: mpsc::Sender, stream: WebSocketStream>, - shutdown_rx: broadcast::Receiver<()>, + shutdown_rx: watch::Receiver<()>, ) -> Self { let (write_sink, read_stream) = stream.split(); @@ -200,18 +201,21 @@ impl Future for CommitmentRequestProcessor { // 4. Handle shutdown signals before accepting any new work // - // FIXME: (thedevbirb, 2025-01-15) this is not using the context waker to poll the shutdown signal. - // As such it will be triggered only if a message is sent _and_ another - // event will wake this task. - match this.shutdown_rx.try_recv() { - Ok(_) => { + // NOTE: (thedevbirb, 2025-01-15) this is a temporary workaround to ensure that the + // shutdown signal wakes the task. https://github.com/chainbound/bolt/issues/673 + // will be a generalized solution to this problem. + let mut shutdown = this.shutdown_rx.clone(); + let shutdown = shutdown.changed(); + pin_mut!(shutdown); + match shutdown.poll_unpin(cx) { + Poll::Ready(Ok(_)) => { info!("received shutdown signal. closing websocket connection..."); return Poll::Ready(InterruptReason::Shutdown); } - Err(TryRecvError::Closed) => { + Poll::Ready(Err(_)) => { error!("shutdown channel closed"); } - _ => {} + Poll::Pending => { /* fallthrough */ } } // Incoming work tasks diff --git a/bolt-sidecar/src/api/commitments/firewall/receiver.rs b/bolt-sidecar/src/api/commitments/firewall/receiver.rs index 6b94c85c..28d4b4e3 100644 --- a/bolt-sidecar/src/api/commitments/firewall/receiver.rs +++ b/bolt-sidecar/src/api/commitments/firewall/receiver.rs @@ -1,7 +1,7 @@ use alloy::signers::local::PrivateKeySigner; use std::fmt::{self, Debug, Formatter}; use thiserror::Error; -use tokio::sync::{broadcast, mpsc}; +use tokio::sync::{mpsc, watch}; use tokio_tungstenite::{ connect_async_with_config, tungstenite::{self, client::IntoClientRequest, protocol::WebSocketConfig}, @@ -106,7 +106,7 @@ impl CommitmentsReceiver { // mspc channel where every websocket connection will send commitment events over its own // tx to a single receiver. let (api_events_tx, api_events_rx) = mpsc::channel(self.urls.len() * 2); - let (shutdown_tx, shutdown_rx) = broadcast::channel::<()>(1); + let (shutdown_tx, shutdown_rx) = watch::channel(()); ShutdownTicker::new(self.signal).spawn(shutdown_tx); @@ -119,7 +119,7 @@ impl CommitmentsReceiver { // task. let url = url.to_string(); let api_events_tx = api_events_tx.clone(); - let shutdown_rx = shutdown_rx.resubscribe(); + let shutdown_rx = shutdown_rx.clone(); let signer = signer.clone(); tokio::spawn(async move { @@ -140,7 +140,7 @@ impl CommitmentsReceiver { .expect("failed to produce JWT"); let api_events_tx = api_events_tx.clone(); - let shutdown_rx = shutdown_rx.resubscribe(); + let shutdown_rx = shutdown_rx.clone(); async move { handle_connection(url, state, jwt, api_events_tx, shutdown_rx).await @@ -167,7 +167,7 @@ async fn handle_connection( state: ProcessorState, jwt: String, api_events_tx: mpsc::Sender, - shutdown_rx: broadcast::Receiver<()>, + shutdown_rx: watch::Receiver<()>, ) -> Result<(), ConnectionHandlerError> { let ws_config = WebSocketConfig { max_message_size: Some(MAX_MESSAGE_SIZE), ..Default::default() }; @@ -234,7 +234,7 @@ impl ShutdownTicker { } /// Spawn a task to shutdown message to the connected clients. - pub fn spawn(self, tx: broadcast::Sender<()>) { + pub fn spawn(self, tx: watch::Sender<()>) { tokio::spawn(async move { self.signal.await; From add06f1b47aecacfad38743cb04a4d69e3a76da8 Mon Sep 17 00:00:00 2001 From: thedevbirb Date: Wed, 15 Jan 2025 17:24:56 +0100 Subject: [PATCH 4/4] chore(sidecar): allow retry_with_backoff/_if to retry indefinitely via Option --- .../src/api/commitments/firewall/receiver.rs | 6 +-- bolt-sidecar/src/common/backoff.rs | 42 ++++++++++++------- bolt-sidecar/src/driver.rs | 2 +- 3 files changed, 29 insertions(+), 21 deletions(-) diff --git a/bolt-sidecar/src/api/commitments/firewall/receiver.rs b/bolt-sidecar/src/api/commitments/firewall/receiver.rs index 28d4b4e3..946574a2 100644 --- a/bolt-sidecar/src/api/commitments/firewall/receiver.rs +++ b/bolt-sidecar/src/api/commitments/firewall/receiver.rs @@ -25,10 +25,6 @@ use super::{ processor::{CommitmentRequestProcessor, InterruptReason, ProcessorState}, }; -/// The maximum number of retries to attempt when reconnecting to a websocket server. -/// Try indefinitely. -const MAX_RETRIES: usize = usize::MAX; - /// The maximum messages size to receive via websocket connection, in bits, set to 32MiB. /// /// It is enough to account for a commitment request with 6 blobs and the largest @@ -124,7 +120,7 @@ impl CommitmentsReceiver { tokio::spawn(async move { retry_with_backoff_if( - MAX_RETRIES, + None, Some(retry_config), // NOTE: this needs to be a closure because it must be re-called upon failure. // As such we also need to clone the inputs again. diff --git a/bolt-sidecar/src/common/backoff.rs b/bolt-sidecar/src/common/backoff.rs index 4ffc6eb6..e39058c7 100644 --- a/bolt-sidecar/src/common/backoff.rs +++ b/bolt-sidecar/src/common/backoff.rs @@ -17,8 +17,10 @@ pub struct RetryConfig { } /// Retry a future with exponential backoff and jitter. +/// +/// If `max_retries` is `None`, the future will be retried indefinitely. pub async fn retry_with_backoff( - max_retries: usize, + max_retries: Option, config: Option, fut: impl Fn() -> F, ) -> Result @@ -30,16 +32,22 @@ where let backoff = ExponentialBackoff::from_millis(config.initial_delay_ms) .factor(config.factor) - .max_delay(Duration::from_secs(config.max_delay_secs)) - .take(max_retries) - .map(jitter); + .max_delay(Duration::from_secs(config.max_delay_secs)); - Retry::spawn(backoff, fut).await + match max_retries { + Some(max_retries) => { + let backoff = backoff.take(max_retries).map(jitter); + Retry::spawn(backoff, fut).await + } + None => Retry::spawn(backoff, fut).await, + } } /// Retry a future with exponential backoff and jitter if the error matches a condition. +/// +/// If `max_retries` is `None`, the future will be retried indefinitely. pub async fn retry_with_backoff_if( - max_retries: usize, + max_retries: Option, config: Option, fut: impl Fn() -> F, condition: impl FnMut(&E) -> bool, @@ -52,11 +60,15 @@ where let backoff = ExponentialBackoff::from_millis(config.initial_delay_ms) .factor(config.factor) - .max_delay(Duration::from_secs(config.max_delay_secs)) - .take(max_retries) - .map(jitter); + .max_delay(Duration::from_secs(config.max_delay_secs)); - RetryIf::spawn(backoff, fut, condition).await + match max_retries { + Some(max_retries) => { + let backoff = backoff.take(max_retries).map(jitter); + RetryIf::spawn(backoff, fut, condition).await + } + None => RetryIf::spawn(backoff, fut, condition).await, + } } #[cfg(test)] @@ -99,7 +111,7 @@ mod tests { async fn test_retry_success_without_retry() { let counter = Arc::new(Mutex::new(Counter::new(0))); - let result = retry_with_backoff(5, None, || { + let result = retry_with_backoff(Some(5), None, || { let counter = Arc::clone(&counter); async move { let mut counter = counter.lock().await; @@ -116,7 +128,7 @@ mod tests { async fn test_retry_until_success() { let counter = Arc::new(Mutex::new(Counter::new(3))); // Fail 3 times, succeed on 4th - let result = retry_with_backoff(5, None, || async { + let result = retry_with_backoff(Some(5), None, || async { let counter = Arc::clone(&counter); let mut counter = counter.lock().await; counter.retryable_fn().await @@ -132,7 +144,7 @@ mod tests { let counter = Arc::new(Mutex::new(Counter::new(3))); // Fail 3 times, succeed on 4th let result = retry_with_backoff_if( - 5, + Some(5), None, || async { let counter = Arc::clone(&counter); @@ -151,7 +163,7 @@ mod tests { async fn test_max_retries_reached() { let counter = Arc::new(Mutex::new(Counter::new(5))); // Fail 5 times, max retries = 3 - let result = retry_with_backoff(3, None, || { + let result = retry_with_backoff(Some(3), None, || { let counter = Arc::clone(&counter); async move { let mut counter = counter.lock().await; @@ -169,7 +181,7 @@ mod tests { let counter = Arc::new(Mutex::new(Counter::new(3))); // Fail 3 times, succeed on 4th let start_time = Instant::now(); - let result = retry_with_backoff(5, None, || { + let result = retry_with_backoff(Some(5), None, || { let counter = Arc::clone(&counter); async move { let mut counter = counter.lock().await; diff --git a/bolt-sidecar/src/driver.rs b/bolt-sidecar/src/driver.rs index cadfbdde..5f1e3141 100644 --- a/bolt-sidecar/src/driver.rs +++ b/bolt-sidecar/src/driver.rs @@ -430,7 +430,7 @@ impl SidecarDriver { let constraints_client = Arc::new(self.constraints_client.clone()); // Submit constraints to the constraints service with an exponential retry mechanism. - tokio::spawn(retry_with_backoff(10, None, move || { + tokio::spawn(retry_with_backoff(Some(10), None, move || { let constraints_client = Arc::clone(&constraints_client); let constraints = Arc::clone(&constraints); async move {