From 2a99817ee331c0295c1ae65231d957901c0c870c Mon Sep 17 00:00:00 2001 From: aspect Date: Wed, 11 Sep 2024 00:38:39 +0300 Subject: [PATCH] 15 rc4 updates (#554) * metrics: fix first snapshot sample + cleanup * Wallet SDK: scan() - fix UtxoContext processing latency during scan. Add UtxoProcessor notification lock to the scan processor. * cleanup --- Cargo.lock | 2 +- metrics/core/src/data.rs | 2 +- metrics/core/src/lib.rs | 92 ++----------------------------- wallet/core/src/imports.rs | 4 +- wallet/core/src/utxo/context.rs | 12 ++-- wallet/core/src/utxo/processor.rs | 10 ++-- wallet/core/src/utxo/scan.rs | 9 ++- 7 files changed, 28 insertions(+), 103 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c61450bd9a..557802dde4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3687,7 +3687,7 @@ dependencies = [ [[package]] name = "kaspa-wrpc-simple-client-example" -version = "0.14.5" +version = "0.14.7" dependencies = [ "futures", "kaspa-rpc-core", diff --git a/metrics/core/src/data.rs b/metrics/core/src/data.rs index d8030f1f3d..d47941aa7c 100644 --- a/metrics/core/src/data.rs +++ b/metrics/core/src/data.rs @@ -922,7 +922,7 @@ pub fn as_data_size(bytes: f64, si: bool) -> String { } /// Format supplied value as a float with 2 decimal places. -fn format_as_float(f: f64, short: bool) -> String { +pub fn format_as_float(f: f64, short: bool) -> String { if short { if f < 1000.0 { format_with_precision(f) diff --git a/metrics/core/src/lib.rs b/metrics/core/src/lib.rs index b88dcf0946..53519b0f0c 100644 --- a/metrics/core/src/lib.rs +++ b/metrics/core/src/lib.rs @@ -74,6 +74,8 @@ impl Metrics { let interval = interval(Duration::from_secs(1)); pin_mut!(interval); + let mut first = true; + loop { select! { _ = task_ctl_receiver.recv().fuse() => { @@ -81,17 +83,16 @@ impl Metrics { }, _ = interval.next().fuse() => { - // current_metrics_data = MetricsData::new(unixtime_as_millis_f64()); - if let Some(rpc) = this.rpc() { - // if let Err(err) = this.sample_metrics(rpc.clone(), &mut current_metrics_data).await { match this.sample_metrics(rpc.clone()).await { Ok(incoming_data) => { let last_metrics_data = current_metrics_data; current_metrics_data = incoming_data; this.data.lock().unwrap().replace(current_metrics_data.clone()); - if let Some(sink) = this.sink() { + if first { + first = false; + } else if let Some(sink) = this.sink() { let snapshot = MetricsSnapshot::from((&last_metrics_data, ¤t_metrics_data)); if let Some(future) = sink(snapshot) { future.await.ok(); @@ -100,7 +101,6 @@ impl Metrics { } Err(err) => { - // current_metrics_data = last_metrics_data.clone(); log_trace!("Metrics::sample_metrics() error: {}", err); } } @@ -120,87 +120,7 @@ impl Metrics { Ok(()) } - // --- samplers - async fn sample_metrics(self: &Arc, rpc: Arc) -> Result { - // let GetMetricsResponse { - // server_time: _, - // consensus_metrics, - // connection_metrics, - // bandwidth_metrics, - // process_metrics, - // storage_metrics, - // custom_metrics: _, - // } = - let response = rpc.get_metrics(true, true, true, true, true, false).await?; - - MetricsData::try_from(response) - - // if let Some(consensus_metrics) = consensus_metrics { - // data.node_blocks_submitted_count = consensus_metrics.node_blocks_submitted_count; - // data.node_headers_processed_count = consensus_metrics.node_headers_processed_count; - // data.node_dependencies_processed_count = consensus_metrics.node_dependencies_processed_count; - // data.node_bodies_processed_count = consensus_metrics.node_bodies_processed_count; - // data.node_transactions_processed_count = consensus_metrics.node_transactions_processed_count; - // data.node_chain_blocks_processed_count = consensus_metrics.node_chain_blocks_processed_count; - // data.node_mass_processed_count = consensus_metrics.node_mass_processed_count; - // // -- - // data.node_database_blocks_count = consensus_metrics.node_database_blocks_count; - // data.node_database_headers_count = consensus_metrics.node_database_headers_count; - // data.network_mempool_size = consensus_metrics.network_mempool_size; - // data.network_tip_hashes_count = consensus_metrics.network_tip_hashes_count; - // data.network_difficulty = consensus_metrics.network_difficulty; - // data.network_past_median_time = consensus_metrics.network_past_median_time; - // data.network_virtual_parent_hashes_count = consensus_metrics.network_virtual_parent_hashes_count; - // data.network_virtual_daa_score = consensus_metrics.network_virtual_daa_score; - // } - - // if let Some(connection_metrics) = connection_metrics { - // data.node_borsh_live_connections = connection_metrics.borsh_live_connections; - // data.node_borsh_connection_attempts = connection_metrics.borsh_connection_attempts; - // data.node_borsh_handshake_failures = connection_metrics.borsh_handshake_failures; - // data.node_json_live_connections = connection_metrics.json_live_connections; - // data.node_json_connection_attempts = connection_metrics.json_connection_attempts; - // data.node_json_handshake_failures = connection_metrics.json_handshake_failures; - // data.node_active_peers = connection_metrics.active_peers; - // } - - // if let Some(bandwidth_metrics) = bandwidth_metrics { - // data.node_borsh_bytes_tx = bandwidth_metrics.borsh_bytes_tx; - // data.node_borsh_bytes_rx = bandwidth_metrics.borsh_bytes_rx; - // data.node_json_bytes_tx = bandwidth_metrics.json_bytes_tx; - // data.node_json_bytes_rx = bandwidth_metrics.json_bytes_rx; - // data.node_p2p_bytes_tx = bandwidth_metrics.p2p_bytes_tx; - // data.node_p2p_bytes_rx = bandwidth_metrics.p2p_bytes_rx; - // data.node_grpc_user_bytes_tx = bandwidth_metrics.grpc_bytes_tx; - // data.node_grpc_user_bytes_rx = bandwidth_metrics.grpc_bytes_rx; - - // data.node_total_bytes_tx = bandwidth_metrics.borsh_bytes_tx - // + bandwidth_metrics.json_bytes_tx - // + bandwidth_metrics.p2p_bytes_tx - // + bandwidth_metrics.grpc_bytes_tx; - - // data.node_total_bytes_rx = bandwidth_metrics.borsh_bytes_rx - // + bandwidth_metrics.json_bytes_rx - // + bandwidth_metrics.p2p_bytes_rx - // + bandwidth_metrics.grpc_bytes_rx; - // } - - // if let Some(process_metrics) = process_metrics { - // data.node_resident_set_size_bytes = process_metrics.resident_set_size; - // data.node_virtual_memory_size_bytes = process_metrics.virtual_memory_size; - // data.node_cpu_cores = process_metrics.core_num; - // data.node_cpu_usage = process_metrics.cpu_usage; - // data.node_file_handles = process_metrics.fd_num; - // data.node_disk_io_read_bytes = process_metrics.disk_io_read_bytes; - // data.node_disk_io_write_bytes = process_metrics.disk_io_write_bytes; - // data.node_disk_io_read_per_sec = process_metrics.disk_io_read_per_sec; - // data.node_disk_io_write_per_sec = process_metrics.disk_io_write_per_sec; - // } - - // if let Some(storage_metrics) = storage_metrics { - // data.node_storage_size_bytes = storage_metrics.storage_size_bytes; - // } - // Ok(()) + MetricsData::try_from(rpc.get_metrics(true, true, true, true, true, false).await?) } } diff --git a/wallet/core/src/imports.rs b/wallet/core/src/imports.rs index 94638bf78f..9129d8349f 100644 --- a/wallet/core/src/imports.rs +++ b/wallet/core/src/imports.rs @@ -24,7 +24,9 @@ pub use crate::wallet::*; pub use crate::{storage, utils}; pub use ahash::{AHashMap, AHashSet}; -pub use async_std::sync::{Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard}; +pub use async_std::sync::{ + Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard, RwLock as AsyncRwLock, RwLockReadGuard as AsyncRwLockReadGuard, +}; pub use async_trait::async_trait; pub use borsh::{BorshDeserialize, BorshSerialize}; pub use cfg_if::cfg_if; diff --git a/wallet/core/src/utxo/context.rs b/wallet/core/src/utxo/context.rs index e8d1ff39be..39575a64f6 100644 --- a/wallet/core/src/utxo/context.rs +++ b/wallet/core/src/utxo/context.rs @@ -319,7 +319,7 @@ impl UtxoContext { } Ok(()) } else { - log_warn!("ignoring duplicate utxo entry"); + // log_warn!("Warning: Ignoring duplicate UTXO entry"); Ok(()) } } @@ -347,7 +347,7 @@ impl UtxoContext { remove_mature_ids.push(id); } } else if context.outgoing.get(&utxo.transaction_id()).is_none() { - log_error!("Error: UTXO not found in UtxoContext map!"); + // log_warm!("Warning: UTXO not found in UtxoContext map!"); } } @@ -374,10 +374,10 @@ impl UtxoContext { context.mature.sorted_insert_binary_asc_by_key(utxo_entry.clone(), |entry| entry.amount_as_ref()); } else { log_error!("Error: non-pending utxo promotion!"); - unreachable!("Error: non-pending utxo promotion!"); } } + // sanity check if self.context().outgoing.get(&txid).is_some() { unreachable!("Error: promotion of the outgoing transaction!"); } @@ -421,7 +421,7 @@ impl UtxoContext { let mut context = self.context(); let mut pending = vec![]; - let mut mature = vec![]; + let mut mature = Vec::with_capacity(utxo_entries.len()); let params = NetworkParams::from(self.processor().network_id()?); @@ -444,7 +444,6 @@ impl UtxoContext { } Maturity::Confirmed => { mature.push(utxo_entry.clone()); - context.mature.sorted_insert_binary_asc_by_key(utxo_entry.clone(), |entry| entry.amount_as_ref()); } } } else { @@ -452,6 +451,9 @@ impl UtxoContext { } } + context.mature.extend(mature.iter().cloned()); + context.mature.sort_by_key(|entry| entry.amount()); + (pending, mature) }; diff --git a/wallet/core/src/utxo/processor.rs b/wallet/core/src/utxo/processor.rs index 4195a00b5f..b72b9784ad 100644 --- a/wallet/core/src/utxo/processor.rs +++ b/wallet/core/src/utxo/processor.rs @@ -33,8 +33,6 @@ use kaspa_rpc_core::{ notify::connection::{ChannelConnection, ChannelType}, Notification, }; -// use workflow_core::task; -// use kaspa_metrics_core::{Metrics,Metric}; pub struct Inner { /// Coinbase UTXOs in stasis @@ -58,7 +56,7 @@ pub struct Inner { sync_proc: SyncMonitor, multiplexer: Multiplexer>, wallet_bus: Option>, - notification_guard: AsyncMutex<()>, + notification_guard: AsyncRwLock<()>, connect_disconnect_guard: AsyncMutex<()>, metrics: Arc, metrics_kinds: Mutex>, @@ -161,8 +159,8 @@ impl UtxoProcessor { &self.inner.multiplexer } - pub async fn notification_lock(&self) -> AsyncMutexGuard<()> { - self.inner.notification_guard.lock().await + pub async fn notification_lock(&self) -> AsyncRwLockReadGuard<()> { + self.inner.notification_guard.read().await } pub fn sync_proc(&self) -> &SyncMonitor { @@ -577,7 +575,7 @@ impl UtxoProcessor { } async fn handle_notification(&self, notification: Notification) -> Result<()> { - let _lock = self.notification_lock().await; + let _lock = self.inner.notification_guard.write().await; match notification { Notification::VirtualDaaScoreChanged(virtual_daa_score_changed_notification) => { diff --git a/wallet/core/src/utxo/scan.rs b/wallet/core/src/utxo/scan.rs index f01257c965..fff6effa94 100644 --- a/wallet/core/src/utxo/scan.rs +++ b/wallet/core/src/utxo/scan.rs @@ -55,6 +55,9 @@ impl Scan { } pub async fn scan(&self, utxo_context: &UtxoContext) -> Result<()> { + // block notifications while scanning... + let _lock = utxo_context.processor().notification_lock().await; + match &self.provider { Provider::AddressManager(address_manager) => self.scan_with_address_manager(address_manager, utxo_context).await, Provider::AddressSet(addresses) => self.scan_with_address_set(addresses, utxo_context).await, @@ -86,9 +89,9 @@ impl Scan { let ts = Instant::now(); let resp = utxo_context.processor().rpc_api().get_utxos_by_addresses(addresses).await?; - let elapsed_msec = ts.elapsed().as_secs_f32(); - if elapsed_msec > 1.0 { - log_warn!("get_utxos_by_address() fetched {} entries in: {} msec", resp.len(), elapsed_msec); + let elapsed_sec = ts.elapsed().as_secs_f32(); + if elapsed_sec > 1.0 { + log_warn!("get_utxos_by_address() fetched {} entries in: {} msec", resp.len(), elapsed_sec); } yield_executor().await;