diff --git a/core/src/banking_stage/consume_worker.rs b/core/src/banking_stage/consume_worker.rs index 143106cd1add52..45c00909d754c0 100644 --- a/core/src/banking_stage/consume_worker.rs +++ b/core/src/banking_stage/consume_worker.rs @@ -8,7 +8,12 @@ use { solana_measure::measure_us, solana_poh::leader_bank_notifier::LeaderBankNotifier, solana_runtime::bank::Bank, +<<<<<<< HEAD solana_sdk::clock::Slot, +======= + solana_runtime_transaction::transaction_with_meta::TransactionWithMeta, + solana_sdk::timing::AtomicInterval, +>>>>>>> c83075b27 (Revert "Report ConsumeWorkerMetrics at slot transitions (#3212)" (#4667)) solana_svm::transaction_error_metrics::TransactionErrorMetrics, std::{ sync::{ @@ -178,8 +183,8 @@ fn try_drain_iter(work: T, receiver: &Receiver) -> impl Iterator /// done. pub(crate) struct ConsumeWorkerMetrics { id: String, + interval: AtomicInterval, has_data: AtomicBool, - slot: AtomicU64, count_metrics: ConsumeWorkerCountMetrics, error_metrics: ConsumeWorkerTransactionErrorMetrics, @@ -187,34 +192,23 @@ pub(crate) struct ConsumeWorkerMetrics { } impl ConsumeWorkerMetrics { - /// Report and reset metrics when the worker did some work and: - /// a) (when a leader) Previous slot is not the same as current. - /// b) (when not a leader) report the metrics accumulated so far. - pub fn maybe_report_and_reset(&self, slot: Option) { - let prev_slot_id: u64 = self.slot.load(Ordering::Relaxed); - if let Some(slot) = slot { - if slot != prev_slot_id { - if !self.has_data.swap(false, Ordering::Relaxed) { - return; - } - self.count_metrics.report_and_reset(&self.id, slot); - self.timing_metrics.report_and_reset(&self.id, slot); - self.error_metrics.report_and_reset(&self.id, slot); - self.slot.swap(slot, Ordering::Relaxed); - } - } else if prev_slot_id != 0 { - self.count_metrics.report_and_reset(&self.id, prev_slot_id); - self.timing_metrics.report_and_reset(&self.id, prev_slot_id); - self.error_metrics.report_and_reset(&self.id, prev_slot_id); - self.slot.swap(0, Ordering::Relaxed); + /// Report and reset metrics iff the interval has elapsed and the worker did some work. + pub fn maybe_report_and_reset(&self) { + const REPORT_INTERVAL_MS: u64 = 1000; + if self.interval.should_update(REPORT_INTERVAL_MS) + && self.has_data.swap(false, Ordering::Relaxed) + { + self.count_metrics.report_and_reset(&self.id); + self.timing_metrics.report_and_reset(&self.id); + self.error_metrics.report_and_reset(&self.id); } } fn new(id: u32) -> Self { Self { id: id.to_string(), + interval: AtomicInterval::default(), has_data: AtomicBool::new(false), - slot: AtomicU64::new(0), count_metrics: ConsumeWorkerCountMetrics::default(), error_metrics: ConsumeWorkerTransactionErrorMetrics::default(), timing_metrics: ConsumeWorkerTimingMetrics::default(), @@ -451,7 +445,7 @@ impl Default for ConsumeWorkerCountMetrics { } impl ConsumeWorkerCountMetrics { - fn report_and_reset(&self, id: &str, slot: u64) { + fn report_and_reset(&self, id: &str) { datapoint_info!( "banking_stage_worker_counts", "id" => id, @@ -499,11 +493,6 @@ impl ConsumeWorkerCountMetrics { self.max_prioritization_fees.swap(0, Ordering::Relaxed), i64 ), - ( - "slot", - slot, - i64 - ), ); } } @@ -522,7 +511,7 @@ struct ConsumeWorkerTimingMetrics { } impl ConsumeWorkerTimingMetrics { - fn report_and_reset(&self, id: &str, slot: u64) { + fn report_and_reset(&self, id: &str) { datapoint_info!( "banking_stage_worker_timing", "id" => id, @@ -563,11 +552,6 @@ impl ConsumeWorkerTimingMetrics { self.wait_for_bank_failure_us.swap(0, Ordering::Relaxed), i64 ), - ( - "slot", - slot, - i64 - ), ); } } @@ -601,7 +585,7 @@ struct ConsumeWorkerTransactionErrorMetrics { } impl ConsumeWorkerTransactionErrorMetrics { - fn report_and_reset(&self, id: &str, slot: u64) { + fn report_and_reset(&self, id: &str) { datapoint_info!( "banking_stage_worker_error_metrics", "id" => id, @@ -712,11 +696,6 @@ impl ConsumeWorkerTransactionErrorMetrics { .swap(0, Ordering::Relaxed), i64 ), - ( - "slot", - slot, - i64 - ), ); } } diff --git a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs index cfcfbc2dbf986f..cc8efefc03df55 100644 --- a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs +++ b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs @@ -140,7 +140,7 @@ impl SchedulerController { .maybe_report_and_reset_interval(should_report); self.worker_metrics .iter() - .for_each(|metrics| metrics.maybe_report_and_reset(new_leader_slot)); + .for_each(|metrics| metrics.maybe_report_and_reset()); } Ok(())