Skip to content

Commit

Permalink
feat(metrics): add chain info metrics (#2386)
Browse files Browse the repository at this point in the history
* remove server-config -> peer-metrics dependency

* feat(metrics): add chain info metric

* remove debug

* add commitment status and fix review issues

* fix build

* fix: fmt

---------

Co-authored-by: folex <[email protected]>
  • Loading branch information
kmd-fl and folex authored Sep 23, 2024
1 parent 7c2ff1a commit 53b7609
Show file tree
Hide file tree
Showing 9 changed files with 118 additions and 33 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/chain-listener/src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,7 @@ impl ChainListener {
.chain_connector
.get_commitment_status(commitment_id)
.await?;
self.observe(|m| m.observe_commiment_status(status as u64));
Ok(Some(status))
} else {
Ok(None)
Expand Down
13 changes: 13 additions & 0 deletions crates/peer-metrics/src/chain_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ pub struct ChainListenerMetrics {
// How many block we manage to process while processing the block
blocks_processed: Counter,
last_process_block: Gauge,
current_commitment_status: Gauge,
}

impl ChainListenerMetrics {
Expand Down Expand Up @@ -135,6 +136,13 @@ impl ChainListenerMetrics {
"Last processed block from the newHead subscription",
);

let current_commitment_status = register(
sub_registry,
Gauge::default(),
"current_commitment_status",
"Current commitment status",
);

Self {
ccp_requests_total,
ccp_replies_total,
Expand All @@ -147,6 +155,7 @@ impl ChainListenerMetrics {
last_seen_block,
blocks_processed,
last_process_block,
current_commitment_status,
}
}

Expand Down Expand Up @@ -185,4 +194,8 @@ impl ChainListenerMetrics {
self.blocks_processed.inc();
self.last_process_block.set(block_number as i64);
}

pub fn observe_commiment_status(&self, status: u64) {
self.current_commitment_status.set(status as i64);
}
}
68 changes: 57 additions & 11 deletions crates/peer-metrics/src/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,66 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/

use prometheus_client::encoding::EncodeLabelSet;
use prometheus_client::metrics::info::Info;
use prometheus_client::registry::Registry;

pub fn add_info_metrics(
registry: &mut Registry,
node_version: String,
air_version: String,
spell_version: String,
) {
pub struct NoxInfo {
pub version: NoxVersion,
pub chain_info: ChainInfo,
}

#[derive(Debug, Clone, Hash, Eq, PartialEq, EncodeLabelSet)]
pub struct NoxVersion {
pub node_version: String,
pub air_version: String,
pub spell_version: String,
}

#[derive(Debug, Clone, Hash, Eq, PartialEq, EncodeLabelSet)]
pub struct ChainInfo {
pub peer_id: String,
// Connector Settings
pub http_endpoint: String,
pub diamond_contract_address: String,
pub network_id: u64,
pub default_base_fee: Option<u64>,
pub default_priority_fee: Option<u64>,

// Listener Settings
pub ws_endpoint: String,
pub proof_poll_period_secs: u64,
pub min_batch_count: usize,
pub max_batch_count: usize,
pub max_proof_batch_size: usize,
pub epoch_end_window_secs: u64,
}

impl ChainInfo {
pub fn default(peer_id: String) -> ChainInfo {
ChainInfo {
peer_id,
http_endpoint: "".to_string(),
diamond_contract_address: "".to_string(),
network_id: 0,
default_base_fee: None,
default_priority_fee: None,
ws_endpoint: "".to_string(),
proof_poll_period_secs: 0,
min_batch_count: 0,
max_batch_count: 0,
max_proof_batch_size: 0,
epoch_end_window_secs: 0,
}
}
}

pub fn add_info_metrics(registry: &mut Registry, nox_info: NoxInfo) {
let sub_registry = registry.sub_registry_with_prefix("nox");
let info = Info::new(vec![
("node_version", node_version),
("air_version", air_version),
("spell_version", spell_version),
]);

let info = Info::new(nox_info.version);
sub_registry.register("build", "Nox Info", info);

let chain_info = Info::new(nox_info.chain_info);
sub_registry.register("chain", "Chain Nox Info", chain_info);
}
2 changes: 1 addition & 1 deletion crates/peer-metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub use connection_pool::ConnectionPoolMetrics;
pub use connectivity::ConnectivityMetrics;
pub use connectivity::Resolution;
pub use dispatcher::DispatcherMetrics;
pub use info::add_info_metrics;
pub use info::{add_info_metrics, ChainInfo, NoxInfo, NoxVersion};
use particle_execution::ParticleParams;
pub use particle_executor::{FunctionKind, ParticleExecutorMetrics, WorkerLabel, WorkerType};
pub use services_metrics::{
Expand Down
1 change: 0 additions & 1 deletion crates/server-config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ fs-utils = { workspace = true }
particle-protocol = { workspace = true }
fluence-libp2p = { workspace = true, features = ["tokio"] }
air-interpreter-fs = { workspace = true }
peer-metrics = { workspace = true }
fluence-keypair = { workspace = true }
types = { workspace = true }
core-distributor = { workspace = true }
Expand Down
7 changes: 0 additions & 7 deletions crates/server-config/src/network_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use std::time::Duration;

use config_utils::to_peer_id;
use particle_protocol::ProtocolConfig;
use peer_metrics::{ConnectionPoolMetrics, ConnectivityMetrics};

use crate::kademlia_config::KademliaConfig;
use crate::{BootstrapConfig, ResolvedConfig};
Expand All @@ -41,17 +40,13 @@ pub struct NetworkConfig {
pub kademlia_config: KademliaConfig,
pub particle_queue_buffer: usize,
pub bootstrap_frequency: usize,
pub connectivity_metrics: Option<ConnectivityMetrics>,
pub connection_pool_metrics: Option<ConnectionPoolMetrics>,
pub connection_limits: ConnectionLimits,
pub connection_idle_timeout: Duration,
}

impl NetworkConfig {
pub fn new(
libp2p_metrics: Option<Arc<Metrics>>,
connectivity_metrics: Option<ConnectivityMetrics>,
connection_pool_metrics: Option<ConnectionPoolMetrics>,
key_pair: Keypair,
config: &ResolvedConfig,
node_version: &'static str,
Expand All @@ -68,8 +63,6 @@ impl NetworkConfig {
kademlia_config: config.kademlia.clone(),
particle_queue_buffer: config.particle_queue_buffer,
bootstrap_frequency: config.bootstrap_frequency,
connectivity_metrics,
connection_pool_metrics,
connection_limits,
connection_idle_timeout: config.node_config.transport_config.connection_idle_timeout,
}
Expand Down
7 changes: 5 additions & 2 deletions nox/src/behaviour/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use connection_pool::ConnectionPoolBehaviour;
use health::HealthCheckRegistry;
use kademlia::{Kademlia, KademliaConfig};
use particle_protocol::{ExtendedParticle, PROTOCOL_NAME};
use peer_metrics::{ConnectionPoolMetrics, ConnectivityMetrics};
use server_config::NetworkConfig;

use crate::connectivity::Connectivity;
Expand Down Expand Up @@ -68,6 +69,8 @@ impl FluenceNetworkBehaviour {
pub fn new(
cfg: NetworkConfig,
health_registry: Option<&mut HealthCheckRegistry>,
connectivity_metrics: Option<ConnectivityMetrics>,
connection_pool_metrics: Option<ConnectionPoolMetrics>,
) -> (Self, Connectivity, mpsc::Receiver<ExtendedParticle>) {
let local_public_key = cfg.key_pair.public();
let identify = Identify::new(
Expand All @@ -86,7 +89,7 @@ impl FluenceNetworkBehaviour {
cfg.particle_queue_buffer,
cfg.protocol_config,
cfg.local_peer_id,
cfg.connection_pool_metrics,
connection_pool_metrics,
);

let connection_limits = ConnectionLimits::new(cfg.connection_limits);
Expand Down Expand Up @@ -119,7 +122,7 @@ impl FluenceNetworkBehaviour {
connection_pool: connection_pool_api,
bootstrap_nodes: cfg.bootstrap_nodes.into_iter().collect(),
bootstrap_frequency: cfg.bootstrap_frequency,
metrics: cfg.connectivity_metrics,
metrics: connectivity_metrics,
health,
};

Expand Down
51 changes: 41 additions & 10 deletions nox/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,8 +280,8 @@ impl<RT: AquaRuntime> Node<RT> {

let network_config = NetworkConfig::new(
libp2p_metrics.clone(),
connectivity_metrics,
connection_pool_metrics,
//connectivity_metrics,
//connection_pool_metrics,
key_pair,
&config,
node_version,
Expand All @@ -297,6 +297,8 @@ impl<RT: AquaRuntime> Node<RT> {
config.external_addresses(),
health_registry.as_mut(),
metrics_registry.as_mut(),
connectivity_metrics,
connection_pool_metrics,
)?;

let (services_metrics_backend, services_metrics) =
Expand Down Expand Up @@ -418,12 +420,35 @@ impl<RT: AquaRuntime> Node<RT> {
}),
};
if let Some(m) = metrics_registry.as_mut() {
peer_metrics::add_info_metrics(
m,
node_info.node_version.to_string(),
node_info.air_version.to_string(),
node_info.spell_version.clone(),
);
let mut chain_info = peer_metrics::ChainInfo::default(peer_id.to_string());
if let Some(connector_cfg) = &config.chain_config {
chain_info.http_endpoint = connector_cfg.http_endpoint.clone();
chain_info.diamond_contract_address =
connector_cfg.diamond_contract_address.clone();
chain_info.network_id = connector_cfg.network_id;
chain_info.default_base_fee = connector_cfg.default_base_fee.clone();
chain_info.default_priority_fee = connector_cfg.default_priority_fee.clone();
}

if let Some(chain_listener_cfg) = &config.chain_listener_config {
chain_info.ws_endpoint = chain_listener_cfg.ws_endpoint.clone();
chain_info.proof_poll_period_secs = chain_listener_cfg.proof_poll_period.as_secs();
chain_info.min_batch_count = chain_listener_cfg.min_batch_count;
chain_info.max_batch_count = chain_listener_cfg.max_batch_count;
chain_info.max_proof_batch_size = chain_listener_cfg.max_proof_batch_size;
chain_info.epoch_end_window_secs = chain_listener_cfg.epoch_end_window.as_secs();
}

let nox_info = peer_metrics::NoxInfo {
version: peer_metrics::NoxVersion {
node_version: node_info.node_version.to_string(),
air_version: node_info.air_version.to_string(),
spell_version: node_info.spell_version.to_string(),
},
chain_info,
};

peer_metrics::add_info_metrics(m, nox_info);
}
custom_service_functions.extend_one(make_peer_builtin(node_info));

Expand Down Expand Up @@ -526,15 +551,21 @@ impl<RT: AquaRuntime> Node<RT> {
external_addresses: Vec<Multiaddr>,
health_registry: Option<&mut HealthCheckRegistry>,
metrics_registry: Option<&mut Registry>,
connectivity_metrics: Option<ConnectivityMetrics>,
connection_pool_metrics: Option<ConnectionPoolMetrics>,
) -> eyre::Result<(
Swarm<FluenceNetworkBehaviour>,
Connectivity,
mpsc::Receiver<ExtendedParticle>,
)> {
let connection_idle_timeout = network_config.connection_idle_timeout;

let (behaviour, connectivity, particle_stream) =
FluenceNetworkBehaviour::new(network_config, health_registry);
let (behaviour, connectivity, particle_stream) = FluenceNetworkBehaviour::new(
network_config,
health_registry,
connectivity_metrics,
connection_pool_metrics,
);

let mut swarm = match metrics_registry {
None => SwarmBuilder::with_existing_identity(key_pair)
Expand Down

0 comments on commit 53b7609

Please sign in to comment.