Skip to content

Commit

Permalink
Merge pull request #683 from chainbound/lore/fix/processor-pings
Browse files Browse the repository at this point in the history
fix(sidecar): ping messages were not sent
  • Loading branch information
merklefruit authored Jan 15, 2025
2 parents bb3287a + add06f1 commit c0b9664
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 115 deletions.
83 changes: 44 additions & 39 deletions bolt-sidecar/src/api/commitments/firewall/processor.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,26 @@
use futures::{
pin_mut,
stream::{FuturesUnordered, SplitSink, SplitStream},
FutureExt, SinkExt, StreamExt,
};
use serde::Serialize;
use serde_json::{json, Value};
use std::time::Duration;
use std::{collections::VecDeque, future::Future, pin::Pin, task::Poll};
use tokio::{
net::TcpStream,
sync::{
broadcast::{self, error::TryRecvError},
mpsc,
oneshot::{self, error::RecvError},
watch,
},
time::Interval,
};
use tokio_tungstenite::{
tungstenite::{self, Message},
MaybeTlsStream, WebSocketStream,
};
use tracing::{debug, error, info, trace, warn};
use tracing::{debug, error, info, instrument, trace, warn};
use uuid::Uuid;

use crate::{
Expand All @@ -39,6 +42,12 @@ use crate::{
},
};

/// The interval at which to send ping messages from connected clients.
#[cfg(test)]
const PING_INTERVAL: Duration = Duration::from_secs(4);
#[cfg(not(test))]
const PING_INTERVAL: Duration = Duration::from_secs(30);

/// The reason for interrupting the [CommitmentRequestProcessor] future.
#[derive(Debug)]
pub enum InterruptReason {
Expand Down Expand Up @@ -99,10 +108,10 @@ pub struct CommitmentRequestProcessor {
write_sink: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
/// The websocket reader stream.
read_stream: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
/// The receiver for keep-alive ping messages.
ping_rx: broadcast::Receiver<()>,
/// The interval at which to send ping messages to the connected websocket server.
ping_interval: Interval,
/// The receiver for shutdown signals.
shutdown_rx: broadcast::Receiver<()>,
shutdown_rx: watch::Receiver<()>,
/// The collection of pending commitment requests responses, sent with [api_events_tx].
/// SAFETY: the `poll` implementation of this struct promptly handles these responses and
/// ensures this vector doesn't grow indefinitely.
Expand All @@ -118,8 +127,7 @@ impl CommitmentRequestProcessor {
state: ProcessorState,
tx: mpsc::Sender<CommitmentEvent>,
stream: WebSocketStream<MaybeTlsStream<TcpStream>>,
ping_rx: broadcast::Receiver<()>,
shutdown_rx: broadcast::Receiver<()>,
shutdown_rx: watch::Receiver<()>,
) -> Self {
let (write_sink, read_stream) = stream.split();

Expand All @@ -129,7 +137,7 @@ impl CommitmentRequestProcessor {
api_events_tx: tx,
write_sink,
read_stream,
ping_rx,
ping_interval: tokio::time::interval(PING_INTERVAL),
shutdown_rx,
pending_commitment_responses: FuturesUnordered::new(),
outgoing_messages: VecDeque::new(),
Expand All @@ -140,12 +148,12 @@ impl CommitmentRequestProcessor {
impl Future for CommitmentRequestProcessor {
type Output = InterruptReason;

#[instrument(name = "", skip_all, fields(url = %self.url))]
fn poll(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let this = self.get_mut();
let rpc_url = this.url.clone();

loop {
let mut progress = false;
Expand All @@ -170,7 +178,7 @@ impl Future for CommitmentRequestProcessor {

// Try to send the message to the sink, for later flushing.
if let Err(e) = this.write_sink.start_send_unpin(message) {
error!(?e, ?rpc_url, "failed to send message to websocket write sink");
error!(?e, "failed to send message to websocket write sink");
continue;
}
}
Expand All @@ -192,15 +200,22 @@ impl Future for CommitmentRequestProcessor {
}

// 4. Handle shutdown signals before accepting any new work
match this.shutdown_rx.try_recv() {
Ok(_) => {
//
// NOTE: (thedevbirb, 2025-01-15) this is a temporary workaround to ensure that the
// shutdown signal wakes the task. https://github.com/chainbound/bolt/issues/673
// will be a generalized solution to this problem.
let mut shutdown = this.shutdown_rx.clone();
let shutdown = shutdown.changed();
pin_mut!(shutdown);
match shutdown.poll_unpin(cx) {
Poll::Ready(Ok(_)) => {
info!("received shutdown signal. closing websocket connection...");
return Poll::Ready(InterruptReason::Shutdown);
}
Err(TryRecvError::Closed) => {
Poll::Ready(Err(_)) => {
error!("shutdown channel closed");
}
_ => {}
Poll::Pending => { /* fallthrough */ }
}

// Incoming work tasks
Expand All @@ -210,7 +225,7 @@ impl Future for CommitmentRequestProcessor {
progress = true;

let Some(res_message) = maybe_message else {
warn!(?rpc_url, "websocket read stream terminated");
warn!("websocket read stream terminated");
return Poll::Ready(InterruptReason::ReadStreamTerminated);
};

Expand All @@ -219,44 +234,36 @@ impl Future for CommitmentRequestProcessor {
this.handle_text_message(text);
}
Ok(Message::Close(_)) => {
warn!(?rpc_url, "websocket connection closed by server");
warn!("websocket connection closed by server");
return Poll::Ready(InterruptReason::ConnectionClosed);
}
Ok(Message::Binary(data)) => {
debug!(
?rpc_url,
bytes_len = data.len(),
"received unexpected binary message"
);
debug!(bytes_len = data.len(), "received unexpected binary message");
}
Ok(Message::Ping(_)) => {
trace!(?rpc_url, "received ping message");
trace!("received ping message");
}
Ok(Message::Pong(_)) => {
trace!(?rpc_url, "received pong message");
trace!("received pong message");
}
Ok(Message::Frame(_)) => {
debug!(?rpc_url, "received unexpected raw frame");
debug!("received unexpected raw frame");
}
Err(e) => {
error!(?e, ?rpc_url, "error reading message from websocket connection");
error!(?e, "error reading message from websocket connection");
}
}
}

// 6. Handle ping messages
match this.ping_rx.try_recv() {
Ok(_) => {
match this.ping_interval.poll_tick(cx) {
Poll::Ready(_) => {
progress = true;

trace!("sending ping message to websocket connection");
this.outgoing_messages.push_back(Message::Ping(vec![8, 0, 1, 7]));
}
Err(TryRecvError::Closed) => {
error!("ping channel for keep-alive messages closed");
}
Err(TryRecvError::Lagged(lost)) => {
error!("ping channel for keep-alives lagged by {} messages", lost)
}
_ => {}
Poll::Pending => { /* fallthrough */ }
}

if !progress {
Expand Down Expand Up @@ -292,15 +299,13 @@ impl CommitmentRequestProcessor {
}

fn handle_text_message(&mut self, text: String) {
let rpc_url = self.url.clone();

trace!(?rpc_url, text, "received text message from websocket connection");
trace!(text, "received text message from websocket connection");
let (tx, rx) = oneshot::channel();

let request = match serde_json::from_str::<JsonRpcRequestUuid>(&text) {
Ok(req) => req,
Err(e) => {
warn!(?e, ?rpc_url, "failed to parse JSON-RPC request");
warn!(?e, "failed to parse JSON-RPC request");
return;
}
};
Expand Down Expand Up @@ -363,7 +368,7 @@ impl CommitmentRequestProcessor {
self.pending_commitment_responses.push(PendingCommitmentResponse::new(rx, id));
}
other => {
warn!(?rpc_url, "unsupported method: {}", other);
warn!("unsupported method: {}", other);
}
};
}
Expand Down
71 changes: 11 additions & 60 deletions bolt-sidecar/src/api/commitments/firewall/receiver.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
use alloy::signers::local::PrivateKeySigner;
use std::{
fmt::{self, Debug, Formatter},
time::Duration,
};
use std::fmt::{self, Debug, Formatter};
use thiserror::Error;
use tokio::sync::{broadcast, mpsc};
use tokio::sync::{mpsc, watch};
use tokio_tungstenite::{
connect_async_with_config,
tungstenite::{self, client::IntoClientRequest, protocol::WebSocketConfig},
Expand All @@ -28,15 +25,6 @@ use super::{
processor::{CommitmentRequestProcessor, InterruptReason, ProcessorState},
};

/// The interval at which to send ping messages from connected clients.
#[cfg(test)]
const PING_INTERVAL: Duration = Duration::from_secs(4);
#[cfg(not(test))]
const PING_INTERVAL: Duration = Duration::from_secs(30);

/// The maximum number of retries to attempt when reconnecting to a websocket server.
const MAX_RETRIES: usize = 1000;

/// The maximum messages size to receive via websocket connection, in bits, set to 32MiB.
///
/// It is enough to account for a commitment request with 6 blobs and the largest
Expand Down Expand Up @@ -114,10 +102,8 @@ impl CommitmentsReceiver {
// mspc channel where every websocket connection will send commitment events over its own
// tx to a single receiver.
let (api_events_tx, api_events_rx) = mpsc::channel(self.urls.len() * 2);
let (ping_tx, ping_rx) = broadcast::channel::<()>(1);
let (shutdown_tx, shutdown_rx) = broadcast::channel::<()>(1);
let (shutdown_tx, shutdown_rx) = watch::channel(());

PingTicker::new(PING_INTERVAL).spawn(ping_tx);
ShutdownTicker::new(self.signal).spawn(shutdown_tx);

let signer = PrivateKeySigner::from_signing_key(self.operator_private_key.0);
Expand All @@ -129,13 +115,12 @@ impl CommitmentsReceiver {
// task.
let url = url.to_string();
let api_events_tx = api_events_tx.clone();
let ping_rx = ping_rx.resubscribe();
let shutdown_rx = shutdown_rx.resubscribe();
let shutdown_rx = shutdown_rx.clone();
let signer = signer.clone();

tokio::spawn(async move {
retry_with_backoff_if(
MAX_RETRIES,
None,
Some(retry_config),
// NOTE: this needs to be a closure because it must be re-called upon failure.
// As such we also need to clone the inputs again.
Expand All @@ -151,12 +136,10 @@ impl CommitmentsReceiver {
.expect("failed to produce JWT");

let api_events_tx = api_events_tx.clone();
let ping_rx = ping_rx.resubscribe();
let shutdown_rx = shutdown_rx.resubscribe();
let shutdown_rx = shutdown_rx.clone();

async move {
handle_connection(url, state, jwt, api_events_tx, ping_rx, shutdown_rx)
.await
handle_connection(url, state, jwt, api_events_tx, shutdown_rx).await
}
},
handle_error,
Expand All @@ -180,8 +163,7 @@ async fn handle_connection(
state: ProcessorState,
jwt: String,
api_events_tx: mpsc::Sender<CommitmentEvent>,
ping_rx: broadcast::Receiver<()>,
shutdown_rx: broadcast::Receiver<()>,
shutdown_rx: watch::Receiver<()>,
) -> Result<(), ConnectionHandlerError> {
let ws_config =
WebSocketConfig { max_message_size: Some(MAX_MESSAGE_SIZE), ..Default::default() };
Expand All @@ -197,14 +179,8 @@ async fn handle_connection(

// For each opened connection, create a new commitment processor
// able to handle incoming message requests.
let commitment_request_processor = CommitmentRequestProcessor::new(
url,
state,
api_events_tx,
stream,
ping_rx,
shutdown_rx,
);
let commitment_request_processor =
CommitmentRequestProcessor::new(url, state, api_events_tx, stream, shutdown_rx);
let interrupt_reason = commitment_request_processor.await;
Err(ConnectionHandlerError::ProcessorInterrupted(interrupt_reason))
}
Expand Down Expand Up @@ -242,31 +218,6 @@ fn handle_error(err: &ConnectionHandlerError) -> bool {
!is_shutdown
}

/// A ping ticker that sends ping messages to connected clients at regular intervals.
struct PingTicker {
interval: Duration,
}

impl PingTicker {
pub fn new(interval: Duration) -> Self {
Self { interval }
}

/// Spawn a task to ping connected clients at regular intervals.
pub fn spawn(self, tx: broadcast::Sender<()>) {
tokio::spawn(async move {
let mut interval = tokio::time::interval(self.interval);

loop {
interval.tick().await;
if tx.send(()).is_err() {
error!("internal error while sending ping task: dropped receiver")
}
}
});
}
}

/// A shutdown ticker that sends a shutdown signal to connected clients.
struct ShutdownTicker {
signal: ShutdownSignal,
Expand All @@ -279,7 +230,7 @@ impl ShutdownTicker {
}

/// Spawn a task to shutdown message to the connected clients.
pub fn spawn(self, tx: broadcast::Sender<()>) {
pub fn spawn(self, tx: watch::Sender<()>) {
tokio::spawn(async move {
self.signal.await;

Expand Down
Loading

0 comments on commit c0b9664

Please sign in to comment.