diff --git a/Cargo.lock b/Cargo.lock index 73a76d54dfe..b0380178273 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2732,6 +2732,7 @@ dependencies = [ "libp2p-ping", "libp2p-relay", "libp2p-swarm", + "once_cell", "prometheus-client", ] @@ -4019,9 +4020,9 @@ dependencies = [ [[package]] name = "prometheus-client" -version = "0.20.0" +version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e227aeb6c2cfec819e999c4773b35f8c7fa37298a203ff46420095458eee567e" +checksum = "38974b1966bd5b6c7c823a20c1e07d5b84b171db20bac601e9b529720f7299f8" dependencies = [ "dtoa", "itoa", diff --git a/examples/metrics/Cargo.toml b/examples/metrics/Cargo.toml index ba08cdaebe7..d170cc2f28d 100644 --- a/examples/metrics/Cargo.toml +++ b/examples/metrics/Cargo.toml @@ -12,4 +12,4 @@ hyper = { version = "0.14", features = ["server", "tcp", "http1"] } libp2p = { path = "../../libp2p", features = ["async-std", "metrics", "ping", "noise", "identify", "tcp", "yamux", "macros"] } log = "0.4.0" tokio = { version = "1", features = ["rt-multi-thread"] } -prometheus-client = "0.20.0" +prometheus-client = "0.21.0" diff --git a/examples/metrics/src/http_service.rs b/examples/metrics/src/http_service.rs index 84102c2b558..46cb7aacb84 100644 --- a/examples/metrics/src/http_service.rs +++ b/examples/metrics/src/http_service.rs @@ -33,7 +33,7 @@ const METRICS_CONTENT_TYPE: &str = "application/openmetrics-text;charset=utf-8;v pub(crate) async fn metrics_server(registry: Registry) -> Result<(), std::io::Error> { // Serve on localhost. - let addr = ([127, 0, 0, 1], 0).into(); + let addr = ([127, 0, 0, 1], 8080).into(); // Use the tokio runtime to run the hyper server. let rt = tokio::runtime::Runtime::new()?; diff --git a/misc/metrics/CHANGELOG.md b/misc/metrics/CHANGELOG.md index 4c653ca0051..ca090d60171 100644 --- a/misc/metrics/CHANGELOG.md +++ b/misc/metrics/CHANGELOG.md @@ -1,9 +1,29 @@ ## 0.13.0 - unreleased +- Previously `libp2p-metrics::identify` would increase a counter / gauge / histogram on each + received identify information. These metrics are misleading, as e.g. they depend on the identify + interval and don't represent the set of currently connected peers. With this change, identify + information is tracked for the currently connected peers only. Instead of an increase on each + received identify information, metrics represent the status quo (Gauge). + + Metrics removed: + - `libp2p_identify_protocols` + - `libp2p_identify_received_info_listen_addrs` + - `libp2p_identify_received_info_protocols` + - `libp2p_identify_listen_addresses` + + Metrics added: + - `libp2p_identify_remote_protocols` + - `libp2p_identify_remote_listen_addresses` + - `libp2p_identify_local_observed_addresses` + + See [PR 3325]. + - Raise MSRV to 1.65. See [PR 3715]. [PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715 +[PR 3325]: https://github.com/libp2p/rust-libp2p/pull/3325 ## 0.12.0 diff --git a/misc/metrics/Cargo.toml b/misc/metrics/Cargo.toml index 9fafe680115..78cb5b3fa2c 100644 --- a/misc/metrics/Cargo.toml +++ b/misc/metrics/Cargo.toml @@ -27,7 +27,8 @@ libp2p-ping = { workspace = true, optional = true } libp2p-relay = { workspace = true, optional = true } libp2p-swarm = { workspace = true } libp2p-identity = { workspace = true } -prometheus-client = "0.20.0" +prometheus-client = { version = "0.21.0" } +once_cell = "1.16.0" [target.'cfg(not(target_os = "unknown"))'.dependencies] libp2p-gossipsub = { workspace = true, optional = true } diff --git a/misc/metrics/src/identify.rs b/misc/metrics/src/identify.rs index ffd0cdb9fc2..e3e147062b3 100644 --- a/misc/metrics/src/identify.rs +++ b/misc/metrics/src/identify.rs @@ -21,39 +21,78 @@ use crate::protocol_stack; use libp2p_identity::PeerId; use libp2p_swarm::StreamProtocol; -use prometheus_client::encoding::{EncodeLabelSet, EncodeMetric, MetricEncoder}; +use once_cell::sync::Lazy; +use prometheus_client::collector::Collector; +use prometheus_client::encoding::EncodeLabelSet; use prometheus_client::metrics::counter::Counter; -use prometheus_client::metrics::family::Family; -use prometheus_client::metrics::histogram::{exponential_buckets, Histogram}; -use prometheus_client::metrics::MetricType; -use prometheus_client::registry::Registry; +use prometheus_client::metrics::family::ConstFamily; +use prometheus_client::metrics::gauge::ConstGauge; +use prometheus_client::registry::{Descriptor, LocalMetric, Registry}; +use prometheus_client::MaybeOwned; +use std::borrow::Cow; use std::collections::HashMap; -use std::iter; use std::sync::{Arc, Mutex}; +static PROTOCOLS_DESCRIPTOR: Lazy = Lazy::new(|| { + Descriptor::new( + "remote_protocols", + r#"Number of connected nodes supporting a specific protocol, with "unrecognized" for each + peer supporting one or more unrecognized protocols"#, + None, + None, + vec![], + ) +}); +static LISTEN_ADDRESSES_DESCRIPTOR: Lazy = Lazy::new(|| { + Descriptor::new( + "remote_listen_addresses", + "Number of connected nodes advertising a specific listen address", + None, + None, + vec![], + ) +}); +static OBSERVED_ADDRESSES_DESCRIPTOR: Lazy = Lazy::new(|| { + Descriptor::new( + "local_observed_addresses", + "Number of connected nodes observing the local node at a specific address", + None, + None, + vec![], + ) +}); +const ALLOWED_PROTOCOLS: &[StreamProtocol] = &[ + #[cfg(feature = "dcutr")] + libp2p_dcutr::PROTOCOL_NAME, + // #[cfg(feature = "gossipsub")] + // #[cfg(not(target_os = "unknown"))] + // TODO: Add Gossipsub protocol name + libp2p_identify::PROTOCOL_NAME, + libp2p_identify::PUSH_PROTOCOL_NAME, + #[cfg(feature = "kad")] + libp2p_kad::PROTOCOL_NAME, + #[cfg(feature = "ping")] + libp2p_ping::PROTOCOL_NAME, + #[cfg(feature = "relay")] + libp2p_relay::STOP_PROTOCOL_NAME, + #[cfg(feature = "relay")] + libp2p_relay::HOP_PROTOCOL_NAME, +]; + pub(crate) struct Metrics { - protocols: Protocols, + peers: Peers, error: Counter, pushed: Counter, received: Counter, - received_info_listen_addrs: Histogram, - received_info_protocols: Histogram, sent: Counter, - listen_addresses: Family, } impl Metrics { pub(crate) fn new(registry: &mut Registry) -> Self { let sub_registry = registry.sub_registry_with_prefix("identify"); - let protocols = Protocols::default(); - sub_registry.register( - "protocols", - "Number of connected nodes supporting a specific protocol, with \ - \"unrecognized\" for each peer supporting one or more unrecognized \ - protocols", - protocols.clone(), - ); + let peers = Peers::default(); + sub_registry.register_collector(Box::new(peers.clone())); let error = Counter::default(); sub_registry.register( @@ -78,24 +117,6 @@ impl Metrics { received.clone(), ); - let received_info_listen_addrs = - Histogram::new(iter::once(0.0).chain(exponential_buckets(1.0, 2.0, 9))); - sub_registry.register( - "received_info_listen_addrs", - "Number of listen addresses for remote peer received in \ - identification information", - received_info_listen_addrs.clone(), - ); - - let received_info_protocols = - Histogram::new(iter::once(0.0).chain(exponential_buckets(1.0, 2.0, 9))); - sub_registry.register( - "received_info_protocols", - "Number of protocols supported by the remote peer received in \ - identification information", - received_info_protocols.clone(), - ); - let sent = Counter::default(); sub_registry.register( "sent", @@ -104,22 +125,12 @@ impl Metrics { sent.clone(), ); - let listen_addresses = Family::default(); - sub_registry.register( - "listen_addresses", - "Number of listen addresses for remote peer per protocol stack", - listen_addresses.clone(), - ); - Self { - protocols, + peers, error, pushed, received, - received_info_listen_addrs, - received_info_protocols, sent, - listen_addresses, } } } @@ -134,58 +145,8 @@ impl super::Recorder for Metrics { self.pushed.inc(); } libp2p_identify::Event::Received { peer_id, info, .. } => { - { - let mut protocols = info - .protocols - .iter() - .filter(|p| { - let allowed_protocols: &[StreamProtocol] = &[ - #[cfg(feature = "dcutr")] - libp2p_dcutr::PROTOCOL_NAME, - // #[cfg(feature = "gossipsub")] - // #[cfg(not(target_os = "unknown"))] - // TODO: Add Gossipsub protocol name - libp2p_identify::PROTOCOL_NAME, - libp2p_identify::PUSH_PROTOCOL_NAME, - #[cfg(feature = "kad")] - libp2p_kad::PROTOCOL_NAME, - #[cfg(feature = "ping")] - libp2p_ping::PROTOCOL_NAME, - #[cfg(feature = "relay")] - libp2p_relay::STOP_PROTOCOL_NAME, - #[cfg(feature = "relay")] - libp2p_relay::HOP_PROTOCOL_NAME, - ]; - - allowed_protocols.contains(p) - }) - .map(|p| p.to_string()) - .collect::>(); - - // Signal via an additional label value that one or more - // protocols of the remote peer have not been recognized. - if protocols.len() < info.protocols.len() { - protocols.push("unrecognized".to_string()); - } - - protocols.sort_unstable(); - protocols.dedup(); - - self.protocols.add(*peer_id, protocols); - } - self.received.inc(); - self.received_info_protocols - .observe(info.protocols.len() as f64); - self.received_info_listen_addrs - .observe(info.listen_addrs.len() as f64); - for listen_addr in &info.listen_addrs { - self.listen_addresses - .get_or_create(&AddressLabels { - protocols: protocol_stack::as_string(listen_addr), - }) - .inc(); - } + self.peers.record(*peer_id, info.clone()); } libp2p_identify::Event::Sent { .. } => { self.sent.inc(); @@ -203,7 +164,7 @@ impl super::Recorder>>>, -} +#[derive(Default, Debug, Clone)] +struct Peers(Arc>>); -impl Protocols { - fn add(&self, peer: PeerId, protocols: Vec) { - self.peers - .lock() - .expect("Lock not to be poisoned") - .insert(peer, protocols); +impl Peers { + fn record(&self, peer_id: PeerId, info: libp2p_identify::Info) { + self.0.lock().unwrap().insert(peer_id, info); } - fn remove(&self, peer: PeerId) { - self.peers - .lock() - .expect("Lock not to be poisoned") - .remove(&peer); + fn remove(&self, peer_id: PeerId) { + self.0.lock().unwrap().remove(&peer_id); } } -impl EncodeMetric for Protocols { - fn encode(&self, mut encoder: MetricEncoder) -> Result<(), std::fmt::Error> { - let count_by_protocol = self - .peers - .lock() - .expect("Lock not to be poisoned") - .iter() - .fold( - HashMap::::default(), - |mut acc, (_, protocols)| { - for protocol in protocols { - let count = acc.entry(protocol.to_string()).or_default(); - *count += 1; - } - acc - }, - ); +impl Collector for Peers { + fn collect<'a>( + &'a self, + ) -> Box, MaybeOwned<'a, Box>)> + 'a> + { + let mut count_by_protocols: HashMap = Default::default(); + let mut count_by_listen_addresses: HashMap = Default::default(); + let mut count_by_observed_addresses: HashMap = Default::default(); + + for (_, peer_info) in self.0.lock().unwrap().iter() { + { + let mut protocols: Vec<_> = peer_info + .protocols + .iter() + .map(|p| { + if ALLOWED_PROTOCOLS.contains(&p) { + p.to_string() + } else { + "unrecognized".to_string() + } + }) + .collect(); + protocols.sort(); + protocols.dedup(); + + for protocol in protocols.into_iter() { + let count = count_by_protocols.entry(protocol).or_default(); + *count += 1; + } + } + + { + let mut addrs: Vec<_> = peer_info + .listen_addrs + .iter() + .map(protocol_stack::as_string) + .collect(); + addrs.sort(); + addrs.dedup(); + + for addr in addrs { + let count = count_by_listen_addresses.entry(addr).or_default(); + *count += 1; + } + } - for (protocol, count) in count_by_protocol { - encoder - .encode_family(&[("protocol", protocol)])? - .encode_gauge(&count)?; + { + let count = count_by_observed_addresses + .entry(protocol_stack::as_string(&peer_info.observed_addr)) + .or_default(); + *count += 1; + } } - Ok(()) - } + let count_by_protocols: Box = + Box::new(ConstFamily::new(count_by_protocols.into_iter().map( + |(protocol, count)| ([("protocol", protocol)], ConstGauge::new(count)), + ))); + + let count_by_listen_addresses: Box = + Box::new(ConstFamily::new(count_by_listen_addresses.into_iter().map( + |(protocol, count)| ([("listen_address", protocol)], ConstGauge::new(count)), + ))); + + let count_by_observed_addresses: Box = Box::new(ConstFamily::new( + count_by_observed_addresses + .into_iter() + .map(|(protocol, count)| { + ([("observed_address", protocol)], ConstGauge::new(count)) + }), + )); - fn metric_type(&self) -> MetricType { - MetricType::Gauge + Box::new( + [ + ( + Cow::Borrowed(&*PROTOCOLS_DESCRIPTOR), + MaybeOwned::Owned(count_by_protocols), + ), + ( + Cow::Borrowed(&*LISTEN_ADDRESSES_DESCRIPTOR), + MaybeOwned::Owned(count_by_listen_addresses), + ), + ( + Cow::Borrowed(&*OBSERVED_ADDRESSES_DESCRIPTOR), + MaybeOwned::Owned(count_by_observed_addresses), + ), + ] + .into_iter(), + ) } } diff --git a/protocols/gossipsub/Cargo.toml b/protocols/gossipsub/Cargo.toml index 9cc46dc3664..813428bc008 100644 --- a/protocols/gossipsub/Cargo.toml +++ b/protocols/gossipsub/Cargo.toml @@ -35,7 +35,7 @@ wasm-timer = "0.2.5" instant = "0.1.11" void = "1.0.2" # Metrics dependencies -prometheus-client = "0.20.0" +prometheus-client = "0.21.0" [dev-dependencies] async-std = { version = "1.6.3", features = ["unstable"] }