Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(sidecar): ping messages were not sent #683

Merged
merged 4 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 44 additions & 39 deletions bolt-sidecar/src/api/commitments/firewall/processor.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,26 @@
use futures::{
pin_mut,
stream::{FuturesUnordered, SplitSink, SplitStream},
FutureExt, SinkExt, StreamExt,
};
use serde::Serialize;
use serde_json::{json, Value};
use std::time::Duration;
use std::{collections::VecDeque, future::Future, pin::Pin, task::Poll};
use tokio::{
net::TcpStream,
sync::{
broadcast::{self, error::TryRecvError},
mpsc,
oneshot::{self, error::RecvError},
watch,
},
time::Interval,
};
use tokio_tungstenite::{
tungstenite::{self, Message},
MaybeTlsStream, WebSocketStream,
};
use tracing::{debug, error, info, trace, warn};
use tracing::{debug, error, info, instrument, trace, warn};
use uuid::Uuid;

use crate::{
Expand All @@ -39,6 +42,12 @@ use crate::{
},
};

/// The interval at which to send ping messages from connected clients.
#[cfg(test)]
const PING_INTERVAL: Duration = Duration::from_secs(4);
#[cfg(not(test))]
const PING_INTERVAL: Duration = Duration::from_secs(30);

/// The reason for interrupting the [CommitmentRequestProcessor] future.
#[derive(Debug)]
pub enum InterruptReason {
Expand Down Expand Up @@ -99,10 +108,10 @@ pub struct CommitmentRequestProcessor {
write_sink: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
/// The websocket reader stream.
read_stream: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
/// The receiver for keep-alive ping messages.
ping_rx: broadcast::Receiver<()>,
/// The interval at which to send ping messages to the connected websocket server.
ping_interval: Interval,
/// The receiver for shutdown signals.
shutdown_rx: broadcast::Receiver<()>,
shutdown_rx: watch::Receiver<()>,
/// The collection of pending commitment requests responses, sent with [api_events_tx].
/// SAFETY: the `poll` implementation of this struct promptly handles these responses and
/// ensures this vector doesn't grow indefinitely.
Expand All @@ -118,8 +127,7 @@ impl CommitmentRequestProcessor {
state: ProcessorState,
tx: mpsc::Sender<CommitmentEvent>,
stream: WebSocketStream<MaybeTlsStream<TcpStream>>,
ping_rx: broadcast::Receiver<()>,
shutdown_rx: broadcast::Receiver<()>,
shutdown_rx: watch::Receiver<()>,
) -> Self {
let (write_sink, read_stream) = stream.split();

Expand All @@ -129,7 +137,7 @@ impl CommitmentRequestProcessor {
api_events_tx: tx,
write_sink,
read_stream,
ping_rx,
ping_interval: tokio::time::interval(PING_INTERVAL),
shutdown_rx,
pending_commitment_responses: FuturesUnordered::new(),
outgoing_messages: VecDeque::new(),
Expand All @@ -140,12 +148,12 @@ impl CommitmentRequestProcessor {
impl Future for CommitmentRequestProcessor {
type Output = InterruptReason;

#[instrument(name = "", skip_all, fields(url = %self.url))]
fn poll(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let this = self.get_mut();
let rpc_url = this.url.clone();

loop {
let mut progress = false;
Expand All @@ -170,7 +178,7 @@ impl Future for CommitmentRequestProcessor {

// Try to send the message to the sink, for later flushing.
if let Err(e) = this.write_sink.start_send_unpin(message) {
error!(?e, ?rpc_url, "failed to send message to websocket write sink");
error!(?e, "failed to send message to websocket write sink");
continue;
}
}
Expand All @@ -192,15 +200,22 @@ impl Future for CommitmentRequestProcessor {
}

// 4. Handle shutdown signals before accepting any new work
match this.shutdown_rx.try_recv() {
Ok(_) => {
//
// NOTE: (thedevbirb, 2025-01-15) this is a temporary workaround to ensure that the
// shutdown signal wakes the task. https://github.com/chainbound/bolt/issues/673
// will be a generalized solution to this problem.
let mut shutdown = this.shutdown_rx.clone();
let shutdown = shutdown.changed();
pin_mut!(shutdown);
match shutdown.poll_unpin(cx) {
Poll::Ready(Ok(_)) => {
info!("received shutdown signal. closing websocket connection...");
return Poll::Ready(InterruptReason::Shutdown);
}
Err(TryRecvError::Closed) => {
Poll::Ready(Err(_)) => {
error!("shutdown channel closed");
}
_ => {}
Poll::Pending => { /* fallthrough */ }
}

// Incoming work tasks
Expand All @@ -210,7 +225,7 @@ impl Future for CommitmentRequestProcessor {
progress = true;

let Some(res_message) = maybe_message else {
warn!(?rpc_url, "websocket read stream terminated");
warn!("websocket read stream terminated");
return Poll::Ready(InterruptReason::ReadStreamTerminated);
};

Expand All @@ -219,44 +234,36 @@ impl Future for CommitmentRequestProcessor {
this.handle_text_message(text);
}
Ok(Message::Close(_)) => {
warn!(?rpc_url, "websocket connection closed by server");
warn!("websocket connection closed by server");
return Poll::Ready(InterruptReason::ConnectionClosed);
}
Ok(Message::Binary(data)) => {
debug!(
?rpc_url,
bytes_len = data.len(),
"received unexpected binary message"
);
debug!(bytes_len = data.len(), "received unexpected binary message");
}
Ok(Message::Ping(_)) => {
trace!(?rpc_url, "received ping message");
trace!("received ping message");
}
Ok(Message::Pong(_)) => {
trace!(?rpc_url, "received pong message");
trace!("received pong message");
}
Ok(Message::Frame(_)) => {
debug!(?rpc_url, "received unexpected raw frame");
debug!("received unexpected raw frame");
}
Err(e) => {
error!(?e, ?rpc_url, "error reading message from websocket connection");
error!(?e, "error reading message from websocket connection");
}
}
}

// 6. Handle ping messages
match this.ping_rx.try_recv() {
Ok(_) => {
match this.ping_interval.poll_tick(cx) {
Poll::Ready(_) => {
progress = true;

trace!("sending ping message to websocket connection");
this.outgoing_messages.push_back(Message::Ping(vec![8, 0, 1, 7]));
}
Err(TryRecvError::Closed) => {
error!("ping channel for keep-alive messages closed");
}
Err(TryRecvError::Lagged(lost)) => {
error!("ping channel for keep-alives lagged by {} messages", lost)
}
_ => {}
Poll::Pending => { /* fallthrough */ }
}

if !progress {
Expand Down Expand Up @@ -292,15 +299,13 @@ impl CommitmentRequestProcessor {
}

fn handle_text_message(&mut self, text: String) {
let rpc_url = self.url.clone();

trace!(?rpc_url, text, "received text message from websocket connection");
trace!(text, "received text message from websocket connection");
let (tx, rx) = oneshot::channel();

let request = match serde_json::from_str::<JsonRpcRequestUuid>(&text) {
Ok(req) => req,
Err(e) => {
warn!(?e, ?rpc_url, "failed to parse JSON-RPC request");
warn!(?e, "failed to parse JSON-RPC request");
return;
}
};
Expand Down Expand Up @@ -363,7 +368,7 @@ impl CommitmentRequestProcessor {
self.pending_commitment_responses.push(PendingCommitmentResponse::new(rx, id));
}
other => {
warn!(?rpc_url, "unsupported method: {}", other);
warn!("unsupported method: {}", other);
}
};
}
Expand Down
71 changes: 11 additions & 60 deletions bolt-sidecar/src/api/commitments/firewall/receiver.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
use alloy::signers::local::PrivateKeySigner;
use std::{
fmt::{self, Debug, Formatter},
time::Duration,
};
use std::fmt::{self, Debug, Formatter};
use thiserror::Error;
use tokio::sync::{broadcast, mpsc};
use tokio::sync::{mpsc, watch};
use tokio_tungstenite::{
connect_async_with_config,
tungstenite::{self, client::IntoClientRequest, protocol::WebSocketConfig},
Expand All @@ -28,15 +25,6 @@ use super::{
processor::{CommitmentRequestProcessor, InterruptReason, ProcessorState},
};

/// The interval at which to send ping messages from connected clients.
#[cfg(test)]
const PING_INTERVAL: Duration = Duration::from_secs(4);
#[cfg(not(test))]
const PING_INTERVAL: Duration = Duration::from_secs(30);

/// The maximum number of retries to attempt when reconnecting to a websocket server.
const MAX_RETRIES: usize = 1000;

/// The maximum messages size to receive via websocket connection, in bits, set to 32MiB.
///
/// It is enough to account for a commitment request with 6 blobs and the largest
Expand Down Expand Up @@ -114,10 +102,8 @@ impl CommitmentsReceiver {
// mspc channel where every websocket connection will send commitment events over its own
// tx to a single receiver.
let (api_events_tx, api_events_rx) = mpsc::channel(self.urls.len() * 2);
let (ping_tx, ping_rx) = broadcast::channel::<()>(1);
let (shutdown_tx, shutdown_rx) = broadcast::channel::<()>(1);
let (shutdown_tx, shutdown_rx) = watch::channel(());

PingTicker::new(PING_INTERVAL).spawn(ping_tx);
ShutdownTicker::new(self.signal).spawn(shutdown_tx);

let signer = PrivateKeySigner::from_signing_key(self.operator_private_key.0);
Expand All @@ -129,13 +115,12 @@ impl CommitmentsReceiver {
// task.
let url = url.to_string();
let api_events_tx = api_events_tx.clone();
let ping_rx = ping_rx.resubscribe();
let shutdown_rx = shutdown_rx.resubscribe();
let shutdown_rx = shutdown_rx.clone();
let signer = signer.clone();

tokio::spawn(async move {
retry_with_backoff_if(
MAX_RETRIES,
None,
Some(retry_config),
// NOTE: this needs to be a closure because it must be re-called upon failure.
// As such we also need to clone the inputs again.
Expand All @@ -151,12 +136,10 @@ impl CommitmentsReceiver {
.expect("failed to produce JWT");

let api_events_tx = api_events_tx.clone();
let ping_rx = ping_rx.resubscribe();
let shutdown_rx = shutdown_rx.resubscribe();
let shutdown_rx = shutdown_rx.clone();

async move {
handle_connection(url, state, jwt, api_events_tx, ping_rx, shutdown_rx)
.await
handle_connection(url, state, jwt, api_events_tx, shutdown_rx).await
}
},
handle_error,
Expand All @@ -180,8 +163,7 @@ async fn handle_connection(
state: ProcessorState,
jwt: String,
api_events_tx: mpsc::Sender<CommitmentEvent>,
ping_rx: broadcast::Receiver<()>,
shutdown_rx: broadcast::Receiver<()>,
shutdown_rx: watch::Receiver<()>,
) -> Result<(), ConnectionHandlerError> {
let ws_config =
WebSocketConfig { max_message_size: Some(MAX_MESSAGE_SIZE), ..Default::default() };
Expand All @@ -197,14 +179,8 @@ async fn handle_connection(

// For each opened connection, create a new commitment processor
// able to handle incoming message requests.
let commitment_request_processor = CommitmentRequestProcessor::new(
url,
state,
api_events_tx,
stream,
ping_rx,
shutdown_rx,
);
let commitment_request_processor =
CommitmentRequestProcessor::new(url, state, api_events_tx, stream, shutdown_rx);
let interrupt_reason = commitment_request_processor.await;
Err(ConnectionHandlerError::ProcessorInterrupted(interrupt_reason))
}
Expand Down Expand Up @@ -242,31 +218,6 @@ fn handle_error(err: &ConnectionHandlerError) -> bool {
!is_shutdown
}

/// A ping ticker that sends ping messages to connected clients at regular intervals.
struct PingTicker {
interval: Duration,
}

impl PingTicker {
pub fn new(interval: Duration) -> Self {
Self { interval }
}

/// Spawn a task to ping connected clients at regular intervals.
pub fn spawn(self, tx: broadcast::Sender<()>) {
tokio::spawn(async move {
let mut interval = tokio::time::interval(self.interval);

loop {
interval.tick().await;
if tx.send(()).is_err() {
error!("internal error while sending ping task: dropped receiver")
}
}
});
}
}

/// A shutdown ticker that sends a shutdown signal to connected clients.
struct ShutdownTicker {
signal: ShutdownSignal,
Expand All @@ -279,7 +230,7 @@ impl ShutdownTicker {
}

/// Spawn a task to shutdown message to the connected clients.
pub fn spawn(self, tx: broadcast::Sender<()>) {
pub fn spawn(self, tx: watch::Sender<()>) {
tokio::spawn(async move {
self.signal.await;

Expand Down
Loading