Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(compaction): add more metrics into compaction #771

Merged
merged 2 commits into from
Mar 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,18 @@ message KeyRange {
bool inf = 3;
}

message TableSetStatistics {
uint32 level_idx = 1;
double size_gb = 2;
uint64 cnt = 3;
}

message CompactMetrics {
TableSetStatistics read_level_n = 1;
TableSetStatistics read_level_nplus1 = 2;
TableSetStatistics write = 3;
}

message CompactTask {
// SSTs to be compacted, which will be removed from LSM after compaction
repeated LevelEntry input_ssts = 1;
Expand All @@ -149,6 +161,7 @@ message CompactTask {
// compacion output will be added to [`target_level`] of LSM after compaction
uint32 target_level = 6;
bool is_target_ultimate_and_leveling = 7;
CompactMetrics metrics = 8;
}

message ReportCompactionTasksRequest {
Expand Down
29 changes: 26 additions & 3 deletions rust/meta/src/hummock/compaction.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use bytes::Bytes;
use itertools::{EitherOrBoth, Itertools};
use risingwave_common::error::Result;
use risingwave_pb::hummock::{CompactTask, Level, LevelEntry, LevelType, SstableInfo};
use risingwave_pb::hummock::{
CompactMetrics, CompactTask, Level, LevelEntry, LevelType, SstableInfo, TableSetStatistics,
};
use risingwave_storage::hummock::key::{user_key, FullKey};
use risingwave_storage::hummock::key_range::KeyRange;
use risingwave_storage::hummock::{HummockEpoch, HummockSSTableId};
Expand Down Expand Up @@ -148,7 +150,7 @@ impl CompactStatus {

if is_select_idle {
let insert_point =
compacting_key_ranges.partition_point(|(ongoing_key_range, _)| {
compacting_key_ranges.partition_point(|(ongoing_key_range, _, _)| {
user_key(&ongoing_key_range.right) < user_key(&key_range.left)
});
if insert_point >= compacting_key_ranges.len()
Expand Down Expand Up @@ -179,7 +181,11 @@ impl CompactStatus {
if overlap_all_idle {
compacting_key_ranges.insert(
insert_point,
(key_range.clone(), next_task_id),
(
key_range.clone(),
next_task_id,
select_level_inputs.len() as u64,
),
);

let mut suc_table_ids =
Expand Down Expand Up @@ -300,6 +306,23 @@ impl CompactStatus {
is_target_ultimate_and_leveling: target_level as usize
== self.level_handlers.len() - 1
&& is_target_level_leveling,
metrics: Some(CompactMetrics {
read_level_n: Some(TableSetStatistics {
level_idx: select_level,
size_gb: 0f64,
cnt: 0,
}),
read_level_nplus1: Some(TableSetStatistics {
level_idx: target_level,
size_gb: 0f64,
cnt: 0,
}),
write: Some(TableSetStatistics {
level_idx: target_level,
size_gb: 0f64,
cnt: 0,
}),
}),
};
Some(compact_task)
}
Expand Down
9 changes: 8 additions & 1 deletion rust/meta/src/hummock/compactor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,9 @@ impl CompactorManager {
#[cfg(test)]
mod tests {
use risingwave_common::error::Result;
use risingwave_pb::hummock::{CompactTask, SubscribeCompactTasksResponse};
use risingwave_pb::hummock::{
CompactMetrics, CompactTask, SubscribeCompactTasksResponse, TableSetStatistics,
};
use tokio::sync::mpsc::error::TryRecvError;

use crate::hummock::CompactorManager;
Expand All @@ -120,6 +122,11 @@ mod tests {
task_id,
target_level: 0,
is_target_ultimate_and_leveling: false,
metrics: Some(CompactMetrics {
read_level_n: Some(TableSetStatistics::default()),
read_level_nplus1: Some(TableSetStatistics::default()),
write: Some(TableSetStatistics::default()),
}),
}
}

Expand Down
31 changes: 25 additions & 6 deletions rust/meta/src/hummock/hummock_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,16 +206,35 @@ where
}

fn trigger_sst_stat(&self, compact_status: &CompactStatus) {
let reduce_compact_cnt = |compacting_key_ranges: &Vec<(
risingwave_storage::hummock::key_range::KeyRange,
u64,
u64,
)>| {
compacting_key_ranges
.iter()
.fold(0, |accum, elem| accum + elem.2)
};
for (idx, level_handler) in enumerate(compact_status.level_handlers.iter()) {
let sst_cnt = match level_handler {
LevelHandler::Nonoverlapping(ssts, _) => ssts.len(),
LevelHandler::Overlapping(ssts, _) => ssts.len(),
let (sst_num, compact_cnt) = match level_handler {
LevelHandler::Nonoverlapping(ssts, compacting_key_ranges) => {
(ssts.len(), reduce_compact_cnt(compacting_key_ranges))
}
LevelHandler::Overlapping(ssts, compacting_key_ranges) => {
(ssts.len(), reduce_compact_cnt(compacting_key_ranges))
}
};
let level_label = String::from("L") + &idx.to_string();
self.metrics
.level_sst_num
.get_metric_with_label_values(&[&level_label])
.unwrap()
.set(sst_num as i64);
self.metrics
.level_payload
.get_metric_with_label_values(&[&(String::from("L") + &idx.to_string())])
.level_compact_cnt
.get_metric_with_label_values(&[&level_label])
.unwrap()
.set(sst_cnt as i64);
.set(compact_cnt as i64);
}
}

Expand Down
10 changes: 5 additions & 5 deletions rust/meta/src/hummock/level_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,18 @@ impl From<&SstableInfo> for SSTableStat {
pub enum LevelHandler {
/// * `Vec<SSTableStat>` - existing SSTs in this level, arranged in order no matter Tiering or
/// Leveling
/// * `Vec<(KeyRange, u64)>` - key ranges (and corresponding compaction task id) to be merged
/// to bottom level in order
Nonoverlapping(Vec<SSTableStat>, Vec<(KeyRange, u64)>),
Overlapping(Vec<SSTableStat>, Vec<(KeyRange, u64)>),
/// * `Vec<(KeyRange, u64, u64)>` - key ranges (and corresponding compaction task id, #SSTs) to
/// be merged to bottom level in order
Nonoverlapping(Vec<SSTableStat>, Vec<(KeyRange, u64, u64)>),
Overlapping(Vec<SSTableStat>, Vec<(KeyRange, u64, u64)>),
}

impl LevelHandler {
fn clear_compacting_range(&mut self, clear_task_id: u64) {
match self {
LevelHandler::Overlapping(_, compacting_key_ranges)
| LevelHandler::Nonoverlapping(_, compacting_key_ranges) => {
compacting_key_ranges.retain(|(_, task_id)| *task_id != clear_task_id);
compacting_key_ranges.retain(|(_, task_id, _)| *task_id != clear_task_id);
}
}
}
Expand Down
17 changes: 14 additions & 3 deletions rust/meta/src/rpc/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ pub struct MetaMetrics {
/// latency of each barrier
pub barrier_latency: Histogram,
/// num of SSTs in each level
pub level_payload: IntGaugeVec,
pub level_sst_num: IntGaugeVec,
/// num of SSTs to be merged to next level in each level
pub level_compact_cnt: IntGaugeVec,
}

impl MetaMetrics {
Expand All @@ -48,19 +50,28 @@ impl MetaMetrics {
);
let barrier_latency = register_histogram_with_registry!(opts, registry).unwrap();

let level_payload = register_int_gauge_vec_with_registry!(
let level_sst_num = register_int_gauge_vec_with_registry!(
"storage_level_sst_num",
"num of SSTs in each level",
&["level_index"],
registry
)
.unwrap();

let level_compact_cnt = register_int_gauge_vec_with_registry!(
"storage_level_compact_cnt",
"num of SSTs to be merged to next level in each level",
&["level_index"],
registry
)
.unwrap();

Self {
registry,
grpc_latency,
barrier_latency,
level_payload,
level_sst_num,
level_compact_cnt,
}
}

Expand Down
62 changes: 47 additions & 15 deletions rust/storage/src/hummock/compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use futures::stream::{self, StreamExt};
use futures::Future;
use risingwave_common::error::RwError;
use risingwave_pb::hummock::{
CompactTask, LevelEntry, LevelType, SstableInfo, SubscribeCompactTasksResponse, VacuumTask,
CompactMetrics, CompactTask, LevelEntry, LevelType, SstableInfo, SubscribeCompactTasksResponse,
TableSetStatistics, VacuumTask,
};
use tokio::sync::mpsc::UnboundedSender;
use tokio::task::JoinHandle;
Expand Down Expand Up @@ -35,23 +36,41 @@ pub struct SubCompactContext {
}

pub struct Compactor;

impl Compactor {
pub async fn run_compact(
context: &SubCompactContext,
compact_task: &mut CompactTask,
) -> HummockResult<()> {
let mut overlapping_tables = vec![];
let mut non_overlapping_table_seqs = vec![];
let target_level = compact_task.target_level;
let add_table = |accumu: &mut TableSetStatistics, table: &Sstable| {
accumu.size_gb += table.meta.estimated_size as f64 / (1024 * 1024 * 1024) as f64;
accumu.cnt += 1;
};
let accumulating_readsize =
|metrics: &mut CompactMetrics, level_idx: u32, tables: &Vec<Arc<Sstable>>| {
let read_statistics: &mut TableSetStatistics = if level_idx == target_level {
metrics.read_level_nplus1.as_mut().unwrap()
} else {
metrics.read_level_n.as_mut().unwrap()
};
for table in tables {
add_table(read_statistics, table);
}
};
for LevelEntry {
level: opt_level, ..
level_idx,
level: opt_level,
..
} in &compact_task.input_ssts
{
let level = opt_level.as_ref().unwrap();
let tables = context
.local_version_manager
.pick_few_tables(level.get_table_ids())
.await?;
accumulating_readsize(compact_task.metrics.as_mut().unwrap(), *level_idx, &tables);
if level.get_level_type().unwrap() == LevelType::Nonoverlapping {
non_overlapping_table_seqs.push(tables);
} else {
Expand Down Expand Up @@ -128,7 +147,28 @@ impl Compactor {

sub_compact_outputsets.sort_by_key(|(sub_kr_idx, _)| *sub_kr_idx);
for (_, sub_output) in sub_compact_outputsets {
compact_task.sorted_output_ssts.extend(sub_output);
for table in &sub_output {
add_table(
compact_task
.metrics
.as_mut()
.unwrap()
.write
.as_mut()
.unwrap(),
table,
);
}
compact_task
.sorted_output_ssts
.extend(sub_output.iter().map(|sst| SstableInfo {
id: sst.id,
key_range: Some(risingwave_pb::hummock::KeyRange {
left: sst.meta.get_smallest_key().to_vec(),
right: sst.meta.get_largest_key().to_vec(),
inf: false,
}),
}));
}

Ok(())
Expand Down Expand Up @@ -204,7 +244,7 @@ impl Compactor {
context: SubCompactContext,
kr: KeyRange,
iter: MergeIterator<'_>,
output_sst_infos: &mut Vec<SstableInfo>,
output_ssts: &mut Vec<Sstable>,
// TODO: better naming
is_target_ultimate_and_leveling: bool,
watermark: Epoch,
Expand All @@ -223,7 +263,7 @@ impl Compactor {
// Seal table for each split
builder.seal_current();

output_sst_infos.reserve(builder.len());
output_ssts.reserve(builder.len());
// TODO: decide upload concurrency
for (table_id, data, meta) in builder.finish() {
let sst = Sstable { id: table_id, meta };
Expand All @@ -236,15 +276,7 @@ impl Compactor {
} else {
context.stats.compaction_upload_sst_counts.inc();
}
let info = SstableInfo {
id: table_id,
key_range: Some(risingwave_pb::hummock::KeyRange {
left: sst.meta.get_smallest_key().to_vec(),
right: sst.meta.get_largest_key().to_vec(),
inf: false,
}),
};
output_sst_infos.push(info);
output_ssts.push(sst);
}

Ok(())
Expand Down
19 changes: 18 additions & 1 deletion rust/storage/src/hummock/shared_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use bytes::Bytes;
use itertools::Itertools;
use parking_lot::RwLock as PLRwLock;
use risingwave_common::error::Result;
use risingwave_pb::hummock::SstableInfo;
use tokio::task::JoinHandle;

use super::compactor::{Compactor, SubCompactContext};
Expand Down Expand Up @@ -418,7 +419,23 @@ impl SharedBufferUploader {

// Add all tables at once.
let timer = self.stats.batch_write_add_l0_latency.start_timer();
let version = self.hummock_meta_client.add_tables(epoch, tables).await?;
let version = self
.hummock_meta_client
.add_tables(
epoch,
tables
.iter()
.map(|sst| SstableInfo {
id: sst.id,
key_range: Some(risingwave_pb::hummock::KeyRange {
left: sst.meta.get_smallest_key().to_vec(),
right: sst.meta.get_largest_key().to_vec(),
inf: false,
}),
})
.collect(),
)
.await?;
timer.observe_duration();

// Ensure the added data is available locally
Expand Down