Skip to content

Commit

Permalink
add queue size metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
jxs committed Nov 2, 2023
1 parent d11fc9d commit e982a3e
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 39 deletions.
22 changes: 2 additions & 20 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3317,16 +3317,7 @@ where
Ok(Handler::new(
self.config.protocol_config(),
self.config.idle_timeout(),
self.metrics
.as_ref()
.expect("to be enabled")
.messages_added_to_queue
.clone(),
self.metrics
.as_ref()
.expect("to be enabled")
.messages_removed_from_queue
.clone(),
self.metrics.clone(),
))
}

Expand All @@ -3340,16 +3331,7 @@ where
Ok(Handler::new(
self.config.protocol_config(),
self.config.idle_timeout(),
self.metrics
.as_ref()
.expect("to be enabled")
.messages_added_to_queue
.clone(),
self.metrics
.as_ref()
.expect("to be enabled")
.messages_removed_from_queue
.clone(),
self.metrics.clone(),
))
}

Expand Down
28 changes: 19 additions & 9 deletions protocols/gossipsub/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::metrics::Metrics;
use crate::protocol::{GossipsubCodec, ProtocolConfig};
use crate::rpc_proto::proto;
use crate::types::{PeerKind, RawMessage, Rpc};
Expand All @@ -34,8 +35,9 @@ use libp2p_swarm::handler::{
SubstreamProtocol,
};
use libp2p_swarm::Stream;
use prometheus_client::metrics::counter::Counter;
use quick_protobuf::MessageWrite;
use smallvec::SmallVec;
use std::sync::atomic::Ordering;
use std::{
pin::Pin,
task::{Context, Poll},
Expand Down Expand Up @@ -127,8 +129,8 @@ pub struct EnabledHandler {
/// decisions about the keep alive state for this connection.
in_mesh: bool,

messages_added_to_queue: Counter,
messages_removed_from_queue: Counter,
/// TODO.
metrics: Option<Metrics>,
}

pub enum DisabledHandler {
Expand Down Expand Up @@ -171,8 +173,7 @@ impl Handler {
pub fn new(
protocol_config: ProtocolConfig,
idle_timeout: Duration,
messages_added_to_queue: Counter,
messages_removed_from_queue: Counter,
metrics: Option<Metrics>,
) -> Self {
Handler::Enabled(EnabledHandler {
listen_protocol: protocol_config,
Expand All @@ -187,8 +188,7 @@ impl Handler {
last_io_activity: Instant::now(),
in_mesh: false,
idle_timeout,
messages_added_to_queue,
messages_removed_from_queue,
metrics,
})
}
}
Expand Down Expand Up @@ -333,7 +333,9 @@ impl EnabledHandler {
// outbound idle state
Some(OutboundSubstreamState::WaitingOutput(substream)) => {
if let Some(message) = self.send_queue.pop() {
self.messages_removed_from_queue.inc();
if let Some(ref metrics) = self.metrics {
metrics.messages_removed_from_queue.inc();
}
self.send_queue.shrink_to_fit();
self.outbound_substream =
Some(OutboundSubstreamState::PendingSend(substream, message));
Expand Down Expand Up @@ -429,7 +431,15 @@ impl ConnectionHandler for Handler {
Handler::Enabled(handler) => match message {
HandlerIn::Message(m) => {
handler.send_queue.push(m);
handler.messages_added_to_queue.inc();
if let Some(ref metrics) = handler.metrics {
let gauge = metrics.messages_queue_bytes.inner();
let queue_size = handler
.send_queue
.iter()
.fold(0, |acc, m| acc + m.get_size());
gauge.store(queue_size as i64, Ordering::SeqCst);
metrics.messages_added_to_queue.inc();
}
}
HandlerIn::JoinedMesh => {
handler.in_mesh = true;
Expand Down
20 changes: 10 additions & 10 deletions protocols/gossipsub/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ impl Default for Config {
type EverSubscribed = bool;

/// A collection of metrics used throughout the Gossipsub behaviour.
#[derive(Clone)]
pub(crate) struct Metrics {
/* Configuration parameters */
/// Maximum number of topics for which we store metrics. This helps keep the metrics bounded.
Expand Down Expand Up @@ -177,6 +178,7 @@ pub(crate) struct Metrics {

pub messages_added_to_queue: Counter,
pub messages_removed_from_queue: Counter,
pub messages_queue_bytes: Gauge,
}

impl Metrics {
Expand Down Expand Up @@ -306,20 +308,17 @@ impl Metrics {
};
let messages_added_to_queue = {
let metric = Counter::default();
registry.register(
"messages_added_to_queue",
"TODO",
metric.clone(),
);
registry.register("messages_added_to_queue", "TODO", metric.clone());
metric
};
let messages_removed_from_queue = {
let metric = Counter::default();
registry.register(
"messages_removed_from_queue",
"TODO",
metric.clone(),
);
registry.register("messages_removed_from_queue", "TODO", metric.clone());
metric
};
let messages_queue_bytes = {
let metric = Gauge::default();
registry.register("messages_queue_bytes", "TODO", metric.clone());
metric
};

Expand Down Expand Up @@ -350,6 +349,7 @@ impl Metrics {
topic_iwant_msgs,
messages_added_to_queue,
messages_removed_from_queue,
messages_queue_bytes,
}
}

Expand Down

0 comments on commit e982a3e

Please sign in to comment.