Skip to content

Commit

Permalink
refactor(compaction): separate select and validate logic in get_compa…
Browse files Browse the repository at this point in the history
…ct_task (#1160)
  • Loading branch information
soundOfDestiny authored Mar 31, 2022
1 parent c964573 commit 9b9ea94
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 42 deletions.
1 change: 1 addition & 0 deletions rust/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ parking_lot = { version = "0.12", features = ["arc_lock"] }
paste = "1"
prometheus = "0.13"
prost = "0.9"
rand = "0.8"
risingwave_common = { path = "../common" }
risingwave_connector = { path = "../connector" }
risingwave_pb = { path = "../prost" }
Expand Down
103 changes: 61 additions & 42 deletions rust/meta/src/hummock/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ use std::io::Cursor;
use bytes::Bytes;
use itertools::{EitherOrBoth, Itertools};
use prost::Message;
use rand::seq::SliceRandom;
use rand::thread_rng;
use risingwave_common::error::Result;
use risingwave_pb::hummock::{
CompactMetrics, CompactTask, Level, LevelEntry, LevelType, TableSetStatistics,
Expand Down Expand Up @@ -110,58 +112,77 @@ impl CompactStatus {
let mut found = SearchResult::NotFound;
let next_task_id = self.next_compact_task_id;
let (prior, posterior) = self.level_handlers.split_at_mut(select_level as usize + 1);
let target_level = select_level + 1;
let (prior, posterior) = (prior.last_mut().unwrap(), posterior.first_mut().unwrap());
let is_select_level_leveling = matches!(prior, LevelHandler::Nonoverlapping(_, _));
let target_level = select_level + 1;
let is_target_level_leveling = matches!(posterior, LevelHandler::Nonoverlapping(_, _));
// Try to select and merge table(s) in `select_level` into `target_level`
match prior {
LevelHandler::Overlapping(l_n, compacting_key_ranges)
| LevelHandler::Nonoverlapping(l_n, compacting_key_ranges) => {
let mut sst_idx = 0;
let l_n_len = l_n.len();
while sst_idx < l_n_len {
let mut next_sst_idx = sst_idx + 1;
let SSTableStat {
key_range: sst_key_range,
table_id,
..
} = &l_n[sst_idx];
let mut select_level_inputs = vec![*table_id];
let key_range;
let mut tier_key_range;
// Must ensure that there exists no SSTs in `select_level` which have
// overlapping user key with `select_level_inputs`
if !is_select_level_leveling {
tier_key_range = sst_key_range.clone();

next_sst_idx = sst_idx;
for (
delta_idx,
SSTableStat {
key_range: other_key_range,
table_id: other_table_id,
..
},
) in l_n[sst_idx + 1..].iter().enumerate()
{
if user_key(&other_key_range.left) <= user_key(&tier_key_range.right) {
select_level_inputs.push(*other_table_id);
tier_key_range.full_key_extend(other_key_range);
} else {
next_sst_idx = sst_idx + 1 + delta_idx;
break;
let mut polysst_candidates = Vec::with_capacity(l_n_len);
{
let mut sst_idx = 0;
while sst_idx < l_n_len {
let mut next_sst_idx = sst_idx + 1;
let SSTableStat {
key_range: sst_key_range,
table_id,
..
} = &l_n[sst_idx];
let mut select_level_inputs = vec![*table_id];
let key_range;
let mut tier_key_range;
// Must ensure that there exists no SSTs in `select_level` which have
// overlapping user key with `select_level_inputs`
if !is_select_level_leveling {
tier_key_range = sst_key_range.clone();

next_sst_idx = sst_idx;
for (
delta_idx,
SSTableStat {
key_range: other_key_range,
table_id: other_table_id,
..
},
) in l_n[sst_idx + 1..].iter().enumerate()
{
if user_key(&other_key_range.left)
<= user_key(&tier_key_range.right)
{
select_level_inputs.push(*other_table_id);
tier_key_range.full_key_extend(other_key_range);
} else {
next_sst_idx = sst_idx + 1 + delta_idx;
break;
}
}
}
if next_sst_idx == sst_idx {
next_sst_idx = l_n_len;
if next_sst_idx == sst_idx {
next_sst_idx = l_n_len;
}

key_range = &tier_key_range;
} else {
key_range = sst_key_range;
}

key_range = &tier_key_range;
} else {
key_range = sst_key_range;
polysst_candidates.push((
(sst_idx, next_sst_idx),
select_level_inputs,
key_range.clone(),
));

sst_idx = next_sst_idx;
}
}

let mut rng = thread_rng();
polysst_candidates.shuffle(&mut rng);

for ((sst_idx, next_sst_idx), select_level_inputs, key_range) in polysst_candidates
{
let mut is_select_idle = true;
for SSTableStat { compact_task, .. } in &l_n[sst_idx..next_sst_idx] {
if compact_task.is_some() {
Expand All @@ -185,7 +206,6 @@ impl CompactStatus {
LevelHandler::Overlapping(_, _) => unimplemented!(),
LevelHandler::Nonoverlapping(l_n_suc, _) => {
let mut overlap_all_idle = true;
// TODO: use pointer last time to avoid binary search
let overlap_begin = l_n_suc.partition_point(|table_status| {
user_key(&table_status.key_range.right)
< user_key(&key_range.left)
Expand All @@ -207,7 +227,7 @@ impl CompactStatus {
compacting_key_ranges.insert(
insert_point,
(
key_range.clone(),
key_range,
next_task_id,
select_level_inputs.len() as u64,
),
Expand Down Expand Up @@ -260,7 +280,6 @@ impl CompactStatus {
}
}
}
sst_idx = next_sst_idx;
}
match &found {
SearchResult::Found(select_ln_ids, _, _) => {
Expand Down

0 comments on commit 9b9ea94

Please sign in to comment.