Skip to content

Commit

Permalink
feat(compaction): add more metrics into compaction (#771)
Browse files Browse the repository at this point in the history
* feat(compaction): stat active sst cnts in each level

* feat(compaction): stat read and write size in each compaction
  • Loading branch information
soundOfDestiny authored Mar 9, 2022
1 parent 3a8cdaf commit 6528f8b
Show file tree
Hide file tree
Showing 8 changed files with 156 additions and 34 deletions.
13 changes: 13 additions & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,18 @@ message KeyRange {
bool inf = 3;
}

message TableSetStatistics {
uint32 level_idx = 1;
double size_gb = 2;
uint64 cnt = 3;
}

message CompactMetrics {
TableSetStatistics read_level_n = 1;
TableSetStatistics read_level_nplus1 = 2;
TableSetStatistics write = 3;
}

message CompactTask {
// SSTs to be compacted, which will be removed from LSM after compaction
repeated LevelEntry input_ssts = 1;
Expand All @@ -149,6 +161,7 @@ message CompactTask {
// compacion output will be added to [`target_level`] of LSM after compaction
uint32 target_level = 6;
bool is_target_ultimate_and_leveling = 7;
CompactMetrics metrics = 8;
}

message ReportCompactionTasksRequest {
Expand Down
29 changes: 26 additions & 3 deletions rust/meta/src/hummock/compaction.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use bytes::Bytes;
use itertools::{EitherOrBoth, Itertools};
use risingwave_common::error::Result;
use risingwave_pb::hummock::{CompactTask, Level, LevelEntry, LevelType, SstableInfo};
use risingwave_pb::hummock::{
CompactMetrics, CompactTask, Level, LevelEntry, LevelType, SstableInfo, TableSetStatistics,
};
use risingwave_storage::hummock::key::{user_key, FullKey};
use risingwave_storage::hummock::key_range::KeyRange;
use risingwave_storage::hummock::{HummockEpoch, HummockSSTableId};
Expand Down Expand Up @@ -148,7 +150,7 @@ impl CompactStatus {

if is_select_idle {
let insert_point =
compacting_key_ranges.partition_point(|(ongoing_key_range, _)| {
compacting_key_ranges.partition_point(|(ongoing_key_range, _, _)| {
user_key(&ongoing_key_range.right) < user_key(&key_range.left)
});
if insert_point >= compacting_key_ranges.len()
Expand Down Expand Up @@ -179,7 +181,11 @@ impl CompactStatus {
if overlap_all_idle {
compacting_key_ranges.insert(
insert_point,
(key_range.clone(), next_task_id),
(
key_range.clone(),
next_task_id,
select_level_inputs.len() as u64,
),
);

let mut suc_table_ids =
Expand Down Expand Up @@ -300,6 +306,23 @@ impl CompactStatus {
is_target_ultimate_and_leveling: target_level as usize
== self.level_handlers.len() - 1
&& is_target_level_leveling,
metrics: Some(CompactMetrics {
read_level_n: Some(TableSetStatistics {
level_idx: select_level,
size_gb: 0f64,
cnt: 0,
}),
read_level_nplus1: Some(TableSetStatistics {
level_idx: target_level,
size_gb: 0f64,
cnt: 0,
}),
write: Some(TableSetStatistics {
level_idx: target_level,
size_gb: 0f64,
cnt: 0,
}),
}),
};
Some(compact_task)
}
Expand Down
9 changes: 8 additions & 1 deletion rust/meta/src/hummock/compactor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,9 @@ impl CompactorManager {
#[cfg(test)]
mod tests {
use risingwave_common::error::Result;
use risingwave_pb::hummock::{CompactTask, SubscribeCompactTasksResponse};
use risingwave_pb::hummock::{
CompactMetrics, CompactTask, SubscribeCompactTasksResponse, TableSetStatistics,
};
use tokio::sync::mpsc::error::TryRecvError;

use crate::hummock::CompactorManager;
Expand All @@ -120,6 +122,11 @@ mod tests {
task_id,
target_level: 0,
is_target_ultimate_and_leveling: false,
metrics: Some(CompactMetrics {
read_level_n: Some(TableSetStatistics::default()),
read_level_nplus1: Some(TableSetStatistics::default()),
write: Some(TableSetStatistics::default()),
}),
}
}

Expand Down
31 changes: 25 additions & 6 deletions rust/meta/src/hummock/hummock_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,16 +206,35 @@ where
}

fn trigger_sst_stat(&self, compact_status: &CompactStatus) {
let reduce_compact_cnt = |compacting_key_ranges: &Vec<(
risingwave_storage::hummock::key_range::KeyRange,
u64,
u64,
)>| {
compacting_key_ranges
.iter()
.fold(0, |accum, elem| accum + elem.2)
};
for (idx, level_handler) in enumerate(compact_status.level_handlers.iter()) {
let sst_cnt = match level_handler {
LevelHandler::Nonoverlapping(ssts, _) => ssts.len(),
LevelHandler::Overlapping(ssts, _) => ssts.len(),
let (sst_num, compact_cnt) = match level_handler {
LevelHandler::Nonoverlapping(ssts, compacting_key_ranges) => {
(ssts.len(), reduce_compact_cnt(compacting_key_ranges))
}
LevelHandler::Overlapping(ssts, compacting_key_ranges) => {
(ssts.len(), reduce_compact_cnt(compacting_key_ranges))
}
};
let level_label = String::from("L") + &idx.to_string();
self.metrics
.level_sst_num
.get_metric_with_label_values(&[&level_label])
.unwrap()
.set(sst_num as i64);
self.metrics
.level_payload
.get_metric_with_label_values(&[&(String::from("L") + &idx.to_string())])
.level_compact_cnt
.get_metric_with_label_values(&[&level_label])
.unwrap()
.set(sst_cnt as i64);
.set(compact_cnt as i64);
}
}

Expand Down
10 changes: 5 additions & 5 deletions rust/meta/src/hummock/level_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,18 @@ impl From<&SstableInfo> for SSTableStat {
pub enum LevelHandler {
/// * `Vec<SSTableStat>` - existing SSTs in this level, arranged in order no matter Tiering or
/// Leveling
/// * `Vec<(KeyRange, u64)>` - key ranges (and corresponding compaction task id) to be merged
/// to bottom level in order
Nonoverlapping(Vec<SSTableStat>, Vec<(KeyRange, u64)>),
Overlapping(Vec<SSTableStat>, Vec<(KeyRange, u64)>),
/// * `Vec<(KeyRange, u64, u64)>` - key ranges (and corresponding compaction task id, #SSTs) to
/// be merged to bottom level in order
Nonoverlapping(Vec<SSTableStat>, Vec<(KeyRange, u64, u64)>),
Overlapping(Vec<SSTableStat>, Vec<(KeyRange, u64, u64)>),
}

impl LevelHandler {
fn clear_compacting_range(&mut self, clear_task_id: u64) {
match self {
LevelHandler::Overlapping(_, compacting_key_ranges)
| LevelHandler::Nonoverlapping(_, compacting_key_ranges) => {
compacting_key_ranges.retain(|(_, task_id)| *task_id != clear_task_id);
compacting_key_ranges.retain(|(_, task_id, _)| *task_id != clear_task_id);
}
}
}
Expand Down
17 changes: 14 additions & 3 deletions rust/meta/src/rpc/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ pub struct MetaMetrics {
/// latency of each barrier
pub barrier_latency: Histogram,
/// num of SSTs in each level
pub level_payload: IntGaugeVec,
pub level_sst_num: IntGaugeVec,
/// num of SSTs to be merged to next level in each level
pub level_compact_cnt: IntGaugeVec,
}

impl MetaMetrics {
Expand All @@ -48,19 +50,28 @@ impl MetaMetrics {
);
let barrier_latency = register_histogram_with_registry!(opts, registry).unwrap();

let level_payload = register_int_gauge_vec_with_registry!(
let level_sst_num = register_int_gauge_vec_with_registry!(
"storage_level_sst_num",
"num of SSTs in each level",
&["level_index"],
registry
)
.unwrap();

let level_compact_cnt = register_int_gauge_vec_with_registry!(
"storage_level_compact_cnt",
"num of SSTs to be merged to next level in each level",
&["level_index"],
registry
)
.unwrap();

Self {
registry,
grpc_latency,
barrier_latency,
level_payload,
level_sst_num,
level_compact_cnt,
}
}

Expand Down
62 changes: 47 additions & 15 deletions rust/storage/src/hummock/compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use futures::stream::{self, StreamExt};
use futures::Future;
use risingwave_common::error::RwError;
use risingwave_pb::hummock::{
CompactTask, LevelEntry, LevelType, SstableInfo, SubscribeCompactTasksResponse, VacuumTask,
CompactMetrics, CompactTask, LevelEntry, LevelType, SstableInfo, SubscribeCompactTasksResponse,
TableSetStatistics, VacuumTask,
};
use tokio::sync::mpsc::UnboundedSender;
use tokio::task::JoinHandle;
Expand Down Expand Up @@ -35,23 +36,41 @@ pub struct SubCompactContext {
}

pub struct Compactor;

impl Compactor {
pub async fn run_compact(
context: &SubCompactContext,
compact_task: &mut CompactTask,
) -> HummockResult<()> {
let mut overlapping_tables = vec![];
let mut non_overlapping_table_seqs = vec![];
let target_level = compact_task.target_level;
let add_table = |accumu: &mut TableSetStatistics, table: &Sstable| {
accumu.size_gb += table.meta.estimated_size as f64 / (1024 * 1024 * 1024) as f64;
accumu.cnt += 1;
};
let accumulating_readsize =
|metrics: &mut CompactMetrics, level_idx: u32, tables: &Vec<Arc<Sstable>>| {
let read_statistics: &mut TableSetStatistics = if level_idx == target_level {
metrics.read_level_nplus1.as_mut().unwrap()
} else {
metrics.read_level_n.as_mut().unwrap()
};
for table in tables {
add_table(read_statistics, table);
}
};
for LevelEntry {
level: opt_level, ..
level_idx,
level: opt_level,
..
} in &compact_task.input_ssts
{
let level = opt_level.as_ref().unwrap();
let tables = context
.local_version_manager
.pick_few_tables(level.get_table_ids())
.await?;
accumulating_readsize(compact_task.metrics.as_mut().unwrap(), *level_idx, &tables);
if level.get_level_type().unwrap() == LevelType::Nonoverlapping {
non_overlapping_table_seqs.push(tables);
} else {
Expand Down Expand Up @@ -128,7 +147,28 @@ impl Compactor {

sub_compact_outputsets.sort_by_key(|(sub_kr_idx, _)| *sub_kr_idx);
for (_, sub_output) in sub_compact_outputsets {
compact_task.sorted_output_ssts.extend(sub_output);
for table in &sub_output {
add_table(
compact_task
.metrics
.as_mut()
.unwrap()
.write
.as_mut()
.unwrap(),
table,
);
}
compact_task
.sorted_output_ssts
.extend(sub_output.iter().map(|sst| SstableInfo {
id: sst.id,
key_range: Some(risingwave_pb::hummock::KeyRange {
left: sst.meta.get_smallest_key().to_vec(),
right: sst.meta.get_largest_key().to_vec(),
inf: false,
}),
}));
}

Ok(())
Expand Down Expand Up @@ -204,7 +244,7 @@ impl Compactor {
context: SubCompactContext,
kr: KeyRange,
iter: MergeIterator<'_>,
output_sst_infos: &mut Vec<SstableInfo>,
output_ssts: &mut Vec<Sstable>,
// TODO: better naming
is_target_ultimate_and_leveling: bool,
watermark: Epoch,
Expand All @@ -223,7 +263,7 @@ impl Compactor {
// Seal table for each split
builder.seal_current();

output_sst_infos.reserve(builder.len());
output_ssts.reserve(builder.len());
// TODO: decide upload concurrency
for (table_id, data, meta) in builder.finish() {
let sst = Sstable { id: table_id, meta };
Expand All @@ -236,15 +276,7 @@ impl Compactor {
} else {
context.stats.compaction_upload_sst_counts.inc();
}
let info = SstableInfo {
id: table_id,
key_range: Some(risingwave_pb::hummock::KeyRange {
left: sst.meta.get_smallest_key().to_vec(),
right: sst.meta.get_largest_key().to_vec(),
inf: false,
}),
};
output_sst_infos.push(info);
output_ssts.push(sst);
}

Ok(())
Expand Down
19 changes: 18 additions & 1 deletion rust/storage/src/hummock/shared_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use bytes::Bytes;
use itertools::Itertools;
use parking_lot::RwLock as PLRwLock;
use risingwave_common::error::Result;
use risingwave_pb::hummock::SstableInfo;
use tokio::task::JoinHandle;

use super::compactor::{Compactor, SubCompactContext};
Expand Down Expand Up @@ -418,7 +419,23 @@ impl SharedBufferUploader {

// Add all tables at once.
let timer = self.stats.batch_write_add_l0_latency.start_timer();
let version = self.hummock_meta_client.add_tables(epoch, tables).await?;
let version = self
.hummock_meta_client
.add_tables(
epoch,
tables
.iter()
.map(|sst| SstableInfo {
id: sst.id,
key_range: Some(risingwave_pb::hummock::KeyRange {
left: sst.meta.get_smallest_key().to_vec(),
right: sst.meta.get_largest_key().to_vec(),
inf: false,
}),
})
.collect(),
)
.await?;
timer.observe_duration();

// Ensure the added data is available locally
Expand Down

0 comments on commit 6528f8b

Please sign in to comment.