Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(compaction): metric by frequence and #sst side, compaction doc #829

Merged
merged 4 commits into from
Mar 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions rust/meta/src/hummock/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ impl CompactStatus {
let is_select_level_leveling = matches!(prior, LevelHandler::Nonoverlapping(_, _));
let target_level = select_level + 1;
let is_target_level_leveling = matches!(posterior, LevelHandler::Nonoverlapping(_, _));
// plan to select and merge table(s) in `select_level` into `target_level`
match prior {
LevelHandler::Overlapping(l_n, compacting_key_ranges)
| LevelHandler::Nonoverlapping(l_n, compacting_key_ranges) => {
Expand All @@ -110,6 +111,8 @@ impl CompactStatus {
let mut select_level_inputs = vec![*table_id];
let key_range;
let mut tier_key_range;
// Must ensure that there exists no SSTs in `select_level` which have
// overlapping user key with `select_level_inputs`
if !is_select_level_leveling {
tier_key_range = sst_key_range.clone();

Expand Down Expand Up @@ -153,6 +156,8 @@ impl CompactStatus {
compacting_key_ranges.partition_point(|(ongoing_key_range, _, _)| {
user_key(&ongoing_key_range.right) < user_key(&key_range.left)
});
// if following condition is not satisfied, it may result in two overlapping
// SSTs in target level
if insert_point >= compacting_key_ranges.len()
|| user_key(&compacting_key_ranges[insert_point].0.left)
> user_key(&key_range.right)
Expand All @@ -179,6 +184,7 @@ impl CompactStatus {
overlap_end += 1;
}
if overlap_all_idle {
// Here, we have known that `select_level_input` is valid
compacting_key_ranges.insert(
insert_point,
(
Expand Down
59 changes: 53 additions & 6 deletions rust/meta/src/hummock/hummock_manager.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;

use itertools::{enumerate, Itertools};
use prometheus::core::{AtomicF64, GenericGauge};
use prometheus::core::{AtomicF64, AtomicU64, GenericCounter};
use prost::Message;
use risingwave_common::error::{ErrorCode, Result};
use risingwave_pb::hummock::hummock_version::HummockVersionRefId;
Expand Down Expand Up @@ -239,39 +239,86 @@ where
}
}

fn single_level_stat<T: FnMut(String) -> prometheus::Result<GenericGauge<AtomicF64>>>(
fn single_level_stat_bytes<
T: FnMut(String) -> prometheus::Result<GenericCounter<AtomicF64>>,
>(
mut metric_vec: T,
level_stat: &TableSetStatistics,
) {
let level_label = String::from("L") + &level_stat.level_idx.to_string();
metric_vec(level_label).unwrap().add(level_stat.size_gb);
metric_vec(level_label).unwrap().inc_by(level_stat.size_gb);
}

fn single_level_stat_sstn<T: FnMut(String) -> prometheus::Result<GenericCounter<AtomicU64>>>(
mut metric_vec: T,
level_stat: &TableSetStatistics,
) {
let level_label = String::from("L") + &level_stat.level_idx.to_string();
metric_vec(level_label).unwrap().inc_by(level_stat.cnt);
}

fn trigger_rw_stat(&self, compact_metrics: &CompactMetrics) {
Self::single_level_stat(
self.metrics
.level_compact_frequence
.get_metric_with_label_values(&[&(String::from("L")
+ &compact_metrics
.read_level_n
.as_ref()
.unwrap()
.level_idx
.to_string())])
.unwrap()
.inc();

Self::single_level_stat_bytes(
|label| {
self.metrics
.level_compact_read_curr
.get_metric_with_label_values(&[&label])
},
compact_metrics.read_level_n.as_ref().unwrap(),
);
Self::single_level_stat(
Self::single_level_stat_bytes(
|label| {
self.metrics
.level_compact_read_next
.get_metric_with_label_values(&[&label])
},
compact_metrics.read_level_nplus1.as_ref().unwrap(),
);
Self::single_level_stat(
Self::single_level_stat_bytes(
|label| {
self.metrics
.level_compact_write
.get_metric_with_label_values(&[&label])
},
compact_metrics.write.as_ref().unwrap(),
);

Self::single_level_stat_sstn(
|label| {
self.metrics
.level_compact_read_sstn_curr
.get_metric_with_label_values(&[&label])
},
compact_metrics.read_level_n.as_ref().unwrap(),
);
Self::single_level_stat_sstn(
|label| {
self.metrics
.level_compact_read_sstn_next
.get_metric_with_label_values(&[&label])
},
compact_metrics.read_level_nplus1.as_ref().unwrap(),
);
Self::single_level_stat_sstn(
|label| {
self.metrics
.level_compact_write_sstn
.get_metric_with_label_values(&[&label])
},
compact_metrics.write.as_ref().unwrap(),
);
}

pub async fn add_tables(
Expand Down
63 changes: 54 additions & 9 deletions rust/meta/src/rpc/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ use std::sync::Arc;
use hyper::{Body, Request, Response};
use itertools::Itertools;
use prometheus::{
histogram_opts, register_gauge_vec_with_registry, register_histogram_vec_with_registry,
register_histogram_with_registry, register_int_gauge_vec_with_registry, Encoder, GaugeVec,
Histogram, HistogramVec, IntGaugeVec, Registry, TextEncoder, DEFAULT_BUCKETS,
histogram_opts, register_counter_vec_with_registry, register_histogram_vec_with_registry,
register_histogram_with_registry, register_int_counter_vec_with_registry,
register_int_gauge_vec_with_registry, CounterVec, Encoder, Histogram, HistogramVec,
IntCounterVec, IntGaugeVec, Registry, TextEncoder, DEFAULT_BUCKETS,
};
use tower::make::Shared;
use tower::ServiceBuilder;
Expand All @@ -29,11 +30,19 @@ pub struct MetaMetrics {
/// num of SSTs to be merged to next level in each level
pub level_compact_cnt: IntGaugeVec,
/// GBs read from current level during history compactions to next level
pub level_compact_read_curr: GaugeVec,
pub level_compact_read_curr: CounterVec,
/// GBs read from next level during history compactions to next level
pub level_compact_read_next: GaugeVec,
pub level_compact_read_next: CounterVec,
/// GBs written into next level during history compactions to next level
pub level_compact_write: GaugeVec,
pub level_compact_write: CounterVec,
/// num of SSTs read from current level during history compactions to next level
pub level_compact_read_sstn_curr: IntCounterVec,
/// num of SSTs read from next level during history compactions to next level
pub level_compact_read_sstn_next: IntCounterVec,
/// num of SSTs written into next level during history compactions to next level
pub level_compact_write_sstn: IntCounterVec,
/// num of compactions from each level to next level
pub level_compact_frequence: IntCounterVec,
}

impl MetaMetrics {
Expand Down Expand Up @@ -72,30 +81,62 @@ impl MetaMetrics {
)
.unwrap();

let level_compact_read_curr = register_gauge_vec_with_registry!(
let level_compact_read_curr = register_counter_vec_with_registry!(
"storage_level_compact_read_curr",
"GBs read from current level during history compactions to next level",
&["level_index"],
registry
)
.unwrap();

let level_compact_read_next = register_gauge_vec_with_registry!(
let level_compact_read_next = register_counter_vec_with_registry!(
"storage_level_compact_read_next",
"GBs read from next level during history compactions to next level",
&["level_index"],
registry
)
.unwrap();

let level_compact_write = register_gauge_vec_with_registry!(
let level_compact_write = register_counter_vec_with_registry!(
"storage_level_compact_write",
"GBs written into next level during history compactions to next level",
&["level_index"],
registry
)
.unwrap();

let level_compact_read_sstn_curr = register_int_counter_vec_with_registry!(
"storage_level_compact_read_sstn_curr",
"num of SSTs read from current level during history compactions to next level",
&["level_index"],
registry
)
.unwrap();

let level_compact_read_sstn_next = register_int_counter_vec_with_registry!(
"storage_level_compact_read_sstn_next",
"num of SSTs read from next level during history compactions to next level",
&["level_index"],
registry
)
.unwrap();

let level_compact_write_sstn = register_int_counter_vec_with_registry!(
"storage_level_compact_write_sstn",
"num of SSTs written into next level during history compactions to next level",
&["level_index"],
registry
)
.unwrap();

let level_compact_frequence = register_int_counter_vec_with_registry!(
"storage_level_compact_frequence",
"num of compactions from each level to next level",
&["level_index"],
registry
)
.unwrap();

Self {
registry,
grpc_latency,
Expand All @@ -105,6 +146,10 @@ impl MetaMetrics {
level_compact_read_curr,
level_compact_read_next,
level_compact_write,
level_compact_read_sstn_curr,
level_compact_read_sstn_next,
level_compact_write_sstn,
level_compact_frequence,
}
}

Expand Down
3 changes: 3 additions & 0 deletions rust/storage/src/hummock/compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ impl Compactor {
sub_result?
}

// `sorted_output_ssts` must be sorted by key range
sub_compact_outputsets.sort_by_key(|(sub_kr_idx, _)| *sub_kr_idx);
for (_, sub_output) in sub_compact_outputsets {
for table in &sub_output {
Expand Down Expand Up @@ -224,13 +225,15 @@ impl Compactor {
let epoch = get_epoch(iter_key);

if epoch < watermark {
// Only retain latest key which satisfies epoch < watermark
skip_key = BytesMut::from(iter_key);
if matches!(iter.value(), HummockValue::Delete) && !has_user_key_overlap {
iter.next().await?;
continue;
}
}

// Don't allow two SSTs to share same user key
sst_builder
.add_full_key(FullKey::from_slice(iter_key), iter.value(), is_new_user_key)
.await?;
Expand Down