Skip to content

Commit

Permalink
Revert "Report ConsumeWorkerMetrics at slot transitions (#3212)" (#4667)
Browse files Browse the repository at this point in the history
(cherry picked from commit c83075b)

# Conflicts:
#	core/src/banking_stage/consume_worker.rs
  • Loading branch information
apfitzge authored and mergify[bot] committed Jan 30, 2025
1 parent c48f513 commit 5b1294b
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 41 deletions.
59 changes: 19 additions & 40 deletions core/src/banking_stage/consume_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@ use {
solana_measure::measure_us,
solana_poh::leader_bank_notifier::LeaderBankNotifier,
solana_runtime::bank::Bank,
<<<<<<< HEAD
solana_sdk::clock::Slot,
=======
solana_runtime_transaction::transaction_with_meta::TransactionWithMeta,
solana_sdk::timing::AtomicInterval,
>>>>>>> c83075b27 (Revert "Report ConsumeWorkerMetrics at slot transitions (#3212)" (#4667))
solana_svm::transaction_error_metrics::TransactionErrorMetrics,
std::{
sync::{
Expand Down Expand Up @@ -178,43 +183,32 @@ fn try_drain_iter<T>(work: T, receiver: &Receiver<T>) -> impl Iterator<Item = T>
/// done.
pub(crate) struct ConsumeWorkerMetrics {
id: String,
interval: AtomicInterval,
has_data: AtomicBool,
slot: AtomicU64,

count_metrics: ConsumeWorkerCountMetrics,
error_metrics: ConsumeWorkerTransactionErrorMetrics,
timing_metrics: ConsumeWorkerTimingMetrics,
}

impl ConsumeWorkerMetrics {
/// Report and reset metrics when the worker did some work and:
/// a) (when a leader) Previous slot is not the same as current.
/// b) (when not a leader) report the metrics accumulated so far.
pub fn maybe_report_and_reset(&self, slot: Option<Slot>) {
let prev_slot_id: u64 = self.slot.load(Ordering::Relaxed);
if let Some(slot) = slot {
if slot != prev_slot_id {
if !self.has_data.swap(false, Ordering::Relaxed) {
return;
}
self.count_metrics.report_and_reset(&self.id, slot);
self.timing_metrics.report_and_reset(&self.id, slot);
self.error_metrics.report_and_reset(&self.id, slot);
self.slot.swap(slot, Ordering::Relaxed);
}
} else if prev_slot_id != 0 {
self.count_metrics.report_and_reset(&self.id, prev_slot_id);
self.timing_metrics.report_and_reset(&self.id, prev_slot_id);
self.error_metrics.report_and_reset(&self.id, prev_slot_id);
self.slot.swap(0, Ordering::Relaxed);
/// Report and reset metrics iff the interval has elapsed and the worker did some work.
pub fn maybe_report_and_reset(&self) {
const REPORT_INTERVAL_MS: u64 = 1000;
if self.interval.should_update(REPORT_INTERVAL_MS)
&& self.has_data.swap(false, Ordering::Relaxed)
{
self.count_metrics.report_and_reset(&self.id);
self.timing_metrics.report_and_reset(&self.id);
self.error_metrics.report_and_reset(&self.id);
}
}

fn new(id: u32) -> Self {
Self {
id: id.to_string(),
interval: AtomicInterval::default(),
has_data: AtomicBool::new(false),
slot: AtomicU64::new(0),
count_metrics: ConsumeWorkerCountMetrics::default(),
error_metrics: ConsumeWorkerTransactionErrorMetrics::default(),
timing_metrics: ConsumeWorkerTimingMetrics::default(),
Expand Down Expand Up @@ -451,7 +445,7 @@ impl Default for ConsumeWorkerCountMetrics {
}

impl ConsumeWorkerCountMetrics {
fn report_and_reset(&self, id: &str, slot: u64) {
fn report_and_reset(&self, id: &str) {
datapoint_info!(
"banking_stage_worker_counts",
"id" => id,
Expand Down Expand Up @@ -499,11 +493,6 @@ impl ConsumeWorkerCountMetrics {
self.max_prioritization_fees.swap(0, Ordering::Relaxed),
i64
),
(
"slot",
slot,
i64
),
);
}
}
Expand All @@ -522,7 +511,7 @@ struct ConsumeWorkerTimingMetrics {
}

impl ConsumeWorkerTimingMetrics {
fn report_and_reset(&self, id: &str, slot: u64) {
fn report_and_reset(&self, id: &str) {
datapoint_info!(
"banking_stage_worker_timing",
"id" => id,
Expand Down Expand Up @@ -563,11 +552,6 @@ impl ConsumeWorkerTimingMetrics {
self.wait_for_bank_failure_us.swap(0, Ordering::Relaxed),
i64
),
(
"slot",
slot,
i64
),
);
}
}
Expand Down Expand Up @@ -601,7 +585,7 @@ struct ConsumeWorkerTransactionErrorMetrics {
}

impl ConsumeWorkerTransactionErrorMetrics {
fn report_and_reset(&self, id: &str, slot: u64) {
fn report_and_reset(&self, id: &str) {
datapoint_info!(
"banking_stage_worker_error_metrics",
"id" => id,
Expand Down Expand Up @@ -712,11 +696,6 @@ impl ConsumeWorkerTransactionErrorMetrics {
.swap(0, Ordering::Relaxed),
i64
),
(
"slot",
slot,
i64
),
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl<T: LikeClusterInfo> SchedulerController<T> {
.maybe_report_and_reset_interval(should_report);
self.worker_metrics
.iter()
.for_each(|metrics| metrics.maybe_report_and_reset(new_leader_slot));
.for_each(|metrics| metrics.maybe_report_and_reset());
}

Ok(())
Expand Down

0 comments on commit 5b1294b

Please sign in to comment.