Skip to content

Commit

Permalink
fix(sidecar): drop pingticker and use intervals
Browse files Browse the repository at this point in the history
  • Loading branch information
thedevbirb committed Jan 15, 2025
1 parent 9c0a7ce commit a52df0f
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 64 deletions.
33 changes: 20 additions & 13 deletions bolt-sidecar/src/api/commitments/firewall/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use futures::{
};
use serde::Serialize;
use serde_json::{json, Value};
use std::time::Duration;
use std::{collections::VecDeque, future::Future, pin::Pin, task::Poll};
use tokio::{
net::TcpStream,
Expand All @@ -12,6 +13,7 @@ use tokio::{
mpsc,
oneshot::{self, error::RecvError},
},
time::Interval,
};
use tokio_tungstenite::{
tungstenite::{self, Message},
Expand Down Expand Up @@ -39,6 +41,12 @@ use crate::{
},
};

/// The interval at which to send ping messages from connected clients.
#[cfg(test)]
const PING_INTERVAL: Duration = Duration::from_secs(4);
#[cfg(not(test))]
const PING_INTERVAL: Duration = Duration::from_secs(30);

/// The reason for interrupting the [CommitmentRequestProcessor] future.
#[derive(Debug)]
pub enum InterruptReason {
Expand Down Expand Up @@ -99,8 +107,8 @@ pub struct CommitmentRequestProcessor {
write_sink: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
/// The websocket reader stream.
read_stream: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
/// The receiver for keep-alive ping messages.
ping_rx: broadcast::Receiver<()>,
/// The interval at which to send ping messages to the connected websocket server.
ping_interval: Interval,
/// The receiver for shutdown signals.
shutdown_rx: broadcast::Receiver<()>,
/// The collection of pending commitment requests responses, sent with [api_events_tx].
Expand All @@ -118,7 +126,6 @@ impl CommitmentRequestProcessor {
state: ProcessorState,
tx: mpsc::Sender<CommitmentEvent>,
stream: WebSocketStream<MaybeTlsStream<TcpStream>>,
ping_rx: broadcast::Receiver<()>,
shutdown_rx: broadcast::Receiver<()>,
) -> Self {
let (write_sink, read_stream) = stream.split();
Expand All @@ -129,7 +136,7 @@ impl CommitmentRequestProcessor {
api_events_tx: tx,
write_sink,
read_stream,
ping_rx,
ping_interval: tokio::time::interval(PING_INTERVAL),
shutdown_rx,
pending_commitment_responses: FuturesUnordered::new(),
outgoing_messages: VecDeque::new(),
Expand Down Expand Up @@ -192,6 +199,10 @@ impl Future for CommitmentRequestProcessor {
}

// 4. Handle shutdown signals before accepting any new work
//
// FIXME: (thedevbirb, 2025-01-15) this is not using the context waker to poll the shutdown signal.
// As such it will be triggered only if a message is sent _and_ another
// event will wake this task.
match this.shutdown_rx.try_recv() {
Ok(_) => {
info!("received shutdown signal. closing websocket connection...");
Expand Down Expand Up @@ -241,18 +252,14 @@ impl Future for CommitmentRequestProcessor {
}

// 6. Handle ping messages
match this.ping_rx.try_recv() {
Ok(_) => {
match this.ping_interval.poll_tick(cx) {
Poll::Ready(_) => {
progress = true;

trace!("sending ping message to websocket connection");
this.outgoing_messages.push_back(Message::Ping(vec![8, 0, 1, 7]));
}
Err(TryRecvError::Closed) => {
error!("ping channel for keep-alive messages closed");
}
Err(TryRecvError::Lagged(lost)) => {
error!("ping channel for keep-alives lagged by {} messages", lost)
}
_ => {}
Poll::Pending => { /* fallthrough */ }
}

if !progress {
Expand Down
57 changes: 6 additions & 51 deletions bolt-sidecar/src/api/commitments/firewall/receiver.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
use alloy::signers::local::PrivateKeySigner;
use std::{
fmt::{self, Debug, Formatter},
time::Duration,
};
use std::fmt::{self, Debug, Formatter};
use thiserror::Error;
use tokio::sync::{broadcast, mpsc};
use tokio_tungstenite::{
Expand All @@ -28,14 +25,9 @@ use super::{
processor::{CommitmentRequestProcessor, InterruptReason, ProcessorState},
};

/// The interval at which to send ping messages from connected clients.
#[cfg(test)]
const PING_INTERVAL: Duration = Duration::from_secs(4);
#[cfg(not(test))]
const PING_INTERVAL: Duration = Duration::from_secs(30);

/// The maximum number of retries to attempt when reconnecting to a websocket server.
const MAX_RETRIES: usize = 1000;
/// Try indefinitely.
const MAX_RETRIES: usize = usize::MAX;

/// The maximum messages size to receive via websocket connection, in bits, set to 32MiB.
///
Expand Down Expand Up @@ -114,10 +106,8 @@ impl CommitmentsReceiver {
// mspc channel where every websocket connection will send commitment events over its own
// tx to a single receiver.
let (api_events_tx, api_events_rx) = mpsc::channel(self.urls.len() * 2);
let (ping_tx, ping_rx) = broadcast::channel::<()>(1);
let (shutdown_tx, shutdown_rx) = broadcast::channel::<()>(1);

PingTicker::new(PING_INTERVAL).spawn(ping_tx);
ShutdownTicker::new(self.signal).spawn(shutdown_tx);

let signer = PrivateKeySigner::from_signing_key(self.operator_private_key.0);
Expand All @@ -129,7 +119,6 @@ impl CommitmentsReceiver {
// task.
let url = url.to_string();
let api_events_tx = api_events_tx.clone();
let ping_rx = ping_rx.resubscribe();
let shutdown_rx = shutdown_rx.resubscribe();
let signer = signer.clone();

Expand All @@ -151,12 +140,10 @@ impl CommitmentsReceiver {
.expect("failed to produce JWT");

let api_events_tx = api_events_tx.clone();
let ping_rx = ping_rx.resubscribe();
let shutdown_rx = shutdown_rx.resubscribe();

async move {
handle_connection(url, state, jwt, api_events_tx, ping_rx, shutdown_rx)
.await
handle_connection(url, state, jwt, api_events_tx, shutdown_rx).await
}
},
handle_error,
Expand All @@ -180,7 +167,6 @@ async fn handle_connection(
state: ProcessorState,
jwt: String,
api_events_tx: mpsc::Sender<CommitmentEvent>,
ping_rx: broadcast::Receiver<()>,
shutdown_rx: broadcast::Receiver<()>,
) -> Result<(), ConnectionHandlerError> {
let ws_config =
Expand All @@ -197,14 +183,8 @@ async fn handle_connection(

// For each opened connection, create a new commitment processor
// able to handle incoming message requests.
let commitment_request_processor = CommitmentRequestProcessor::new(
url,
state,
api_events_tx,
stream,
ping_rx,
shutdown_rx,
);
let commitment_request_processor =
CommitmentRequestProcessor::new(url, state, api_events_tx, stream, shutdown_rx);
let interrupt_reason = commitment_request_processor.await;
Err(ConnectionHandlerError::ProcessorInterrupted(interrupt_reason))
}
Expand Down Expand Up @@ -242,31 +222,6 @@ fn handle_error(err: &ConnectionHandlerError) -> bool {
!is_shutdown
}

/// A ping ticker that sends ping messages to connected clients at regular intervals.
struct PingTicker {
interval: Duration,
}

impl PingTicker {
pub fn new(interval: Duration) -> Self {
Self { interval }
}

/// Spawn a task to ping connected clients at regular intervals.
pub fn spawn(self, tx: broadcast::Sender<()>) {
tokio::spawn(async move {
let mut interval = tokio::time::interval(self.interval);

loop {
interval.tick().await;
if tx.send(()).is_err() {
error!("internal error while sending ping task: dropped receiver")
}
}
});
}
}

/// A shutdown ticker that sends a shutdown signal to connected clients.
struct ShutdownTicker {
signal: ShutdownSignal,
Expand Down

0 comments on commit a52df0f

Please sign in to comment.