Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(compaction): support dynamic leveled-compaction #2242

Merged
merged 31 commits into from
May 9, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
2e640fa
add compact selector
Little-Wallace Apr 29, 2022
186d9d0
support dynamic level compact
Little-Wallace Apr 29, 2022
df0f913
Merge branch 'main' into wallace/leveled
Little-Wallace Apr 29, 2022
27830a8
add scores
Little-Wallace Apr 29, 2022
41fe3d4
fix
Little-Wallace May 1, 2022
ec33452
add size overlap picker
Little-Wallace May 1, 2022
ba9acb4
fix strategy
Little-Wallace May 2, 2022
71ff671
Merge branch 'wallace/leveled' of github.com:singularity-data/risingw…
Little-Wallace May 2, 2022
7b97532
support check range
Little-Wallace May 2, 2022
be2a138
fix overlap tests
Little-Wallace May 2, 2022
d04a36e
fix initial
Little-Wallace May 2, 2022
ecef019
fix test
Little-Wallace May 3, 2022
578c632
fix level index bug
Little-Wallace May 4, 2022
a0d344c
add more tests
Little-Wallace May 5, 2022
f09dbd9
add comment
Little-Wallace May 5, 2022
e13230d
fix overlap strategy
Little-Wallace May 5, 2022
8a55bd7
Merge branch 'main' into wallace/leveled
Little-Wallace May 6, 2022
249143e
Merge branch 'main' into wallace/leveled
Little-Wallace May 8, 2022
a765642
fix comment
Little-Wallace May 8, 2022
a781183
Merge branch 'main' into wallace/leveled
Little-Wallace May 9, 2022
1dd35d2
Merge branch 'main' into wallace/leveled
Little-Wallace May 9, 2022
5cf15ba
reduce compact duration
Little-Wallace May 9, 2022
a172ada
add some log
Little-Wallace May 9, 2022
1ffc87b
Merge branch 'main' into wallace/leveled
Little-Wallace May 9, 2022
b095c2a
add more log
Little-Wallace May 9, 2022
8723c99
Merge branch 'wallace/leveled' of github.com:singularity-data/risingw…
Little-Wallace May 9, 2022
3ce07e1
add more log
Little-Wallace May 9, 2022
219c345
fix format
Little-Wallace May 9, 2022
1ec0633
add log after lock
Little-Wallace May 9, 2022
4c00e88
remove debug log
Little-Wallace May 9, 2022
89fa6e9
Merge branch 'main' into wallace/leveled
Little-Wallace May 9, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions src/meta/src/hummock/compaction/compaction_picker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
use std::sync::Arc;
Little-Wallace marked this conversation as resolved.
Show resolved Hide resolved
use risingwave_pb::hummock::Level;
use crate::hummock::compaction::overlap_strategy::OverlapStrategy;

use crate::hummock::compaction::{CompactionConfig, SearchResult};
use crate::hummock::level_handler::LevelHandler;

pub trait CompactionPicker {
zwang28 marked this conversation as resolved.
Show resolved Hide resolved
fn pick_compaction(
&self,
levels: &[Level],
level_handlers: &mut [LevelHandler],
) -> Option<SearchResult>;
}

pub struct SizeOverlapPicker {
compact_task_id: u64,
config: Arc<CompactionConfig>,
overlap_strategy: Box<dyn OverlapStrategy>,
level: usize,
}

impl SizeOverlapPicker {
pub fn new(compact_task_id: u64, level: usize,
config: Arc<CompactionConfig>, overlap_strategy: Box<dyn OverlapStrategy>) -> SizeOverlapPicker {
SizeOverlapPicker {
compact_task_id,
config,
overlap_strategy,
level
}
}
}

impl CompactionPicker for SizeOverlapPicker {
fn pick_compaction(&self, levels: &[Level], level_handlers: &mut [LevelHandler]) -> Option<SearchResult> {
todo!();
None
}
}
209 changes: 209 additions & 0 deletions src/meta/src/hummock/compaction/level_selector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
use std::collections::HashMap;
Little-Wallace marked this conversation as resolved.
Show resolved Hide resolved
use std::intrinsics::roundf64;
use std::sync::Arc;
use itertools::Itertools;
use risingwave_pb::hummock::Level;

use crate::hummock::compaction::compaction_picker::{CompactionPicker, SizeOverlapPicker};
use crate::hummock::compaction::{CompactionConfig, SearchResult};
use crate::hummock::compaction::overlap_strategy::RangeOverlapStrategy;
use crate::hummock::compaction::tier_compaction_picker::TierCompactionPicker;
use crate::hummock::level_handler::LevelHandler;


pub trait LevelSelector {
fn select_level(
&self,
task_id: u64,
levels: &[Level],
level_handlers: &mut [LevelHandler],
) -> Box<dyn CompactionPicker>;

fn pick_compaction(
&self,
task_id: u64,
levels: &[Level],
level_handlers: &mut [LevelHandler],
) -> Option<SearchResult> {
let picker = self.select_level(task_id, levels, level_handlers);
picker.pick_compaction(levels, level_handlers)
}

fn apply_compact_result(&mut self, levels: &[Level]);
}

// TODO: Set these configurations by meta rpc
pub struct DynamicLevelSelector {
config: Arc<CompactionConfig>,
level_max_bytes: Vec<u64>,
base_level: usize,
}

struct SelectorContext {
// score and level idx.
level_scores: Vec<(u64, usize)>,
}

impl DynamicLevelSelector {
pub fn new(
config: Arc<CompactionConfig>,
) -> Self {
DynamicLevelSelector {
base_level: config.max_level,
level_max_bytes: vec![0u64; config.max_level as usize + 1],
config,
}
}

fn create_compaction_picker(
&self,
level: usize,
task_id: u64,
) -> Box<dyn CompactionPicker> {
let overlap = Box::new(RangeOverlapStrategy::default());
if level == 0 {
Box::new(TierCompactionPicker::new(task_id, self.config.clone(), overlap))
} else {
Box::new(SizeOverlapPicker::new(task_id, level, self.config.clone(), overlap))
}
}

fn get_priority_levels(&self, levels: &[Level], handlers: &mut [LevelHandler]) -> Vec<(u64, usize)> {
let mut scores = vec![];

// The bottommost level can not be input level.
for level in &levels[..self.config.max_level] {
let level_idx = level.level_idx as usize;
let mut total_size = 0;
let mut idle_file_count = 0;
for table in level.table_infos {
if !handlers[level.level_idx as usize].is_pending_compact(&table.id) {
total_size += table.file_size;
idle_file_count += 1;
}
}
if total_size == 0 {
continue;
}
if level.level_idx == 0 {
let score = std::cmp::max(total_size * 100 / self.config.max_bytes_for_level_base ,
(idle_file_count * 100 / self.config.level0_trigger_number as u64));
scores.push((score, 0));
} else {
scores.push((total_size * 100 / self.level_max_bytes[level_idx], level_idx));
}
}
scores.sort_by(|a, b| {
a.0.cmp(&b.0)
});
scores
}
}

impl LevelSelector for DynamicLevelSelector {
fn select_level(
&self,
task_id: u64,
levels: &[Level],
level_handlers: &mut [LevelHandler],
) -> Box<dyn CompactionPicker> {
let level_scores = self.get_priority_levels(levels, level_handlers);
self.create_compaction_picker(level_scores[0].1, task_id)
}

fn pick_compaction(
&self,
task_id: u64,
levels: &[Level],
level_handlers: &mut [LevelHandler],
) -> Option<SearchResult> {
let level_scores = self.get_priority_levels(levels, level_handlers);
for (score, level_idx) in level_scores {
if score <= 100 {
return None;
}
let picker = self.create_compaction_picker(level_idx);
if let Some(ret) = picker.pick_compaction(levels, level_handlers) {
return Some(ret);
}
}
None
}


fn apply_compact_result(&mut self, levels: &[Level]) {
let mut first_non_empty_level = 0;
let mut max_level_size = 0;

for level in levels.iter() {
let mut total_file_size = 0;
for table in &level.table_infos {
total_file_size += table.file_size;
}
if level.level_idx > 0 {
if total_file_size > 0 && first_non_empty_level == 0 {
first_non_empty_level = level.level_idx as usize;
}
max_level_size = std::cmp::max(max_level_size, total_file_size);
}
}

self.level_max_bytes.resize(self.config.max_level as usize + 1, u64::MAX);

if max_level_size == 0 {
// Use the bottommost level.
self.base_level = self.config.max_level;
return;
}

let l0_size = scores[0].0;

let base_bytes_max = std::cmp::max(self.config.max_bytes_for_level_base, l0_size);
let base_bytes_min = base_bytes_max / self.config.max_bytes_for_level_multiplier;

let mut cur_level_size = max_level_size;
for _ in first_non_empty_level..self.config.max_level {
cur_level_size /= self.config.max_bytes_for_level_multiplier;
}

let mut base_level_size = if cur_level_size <= base_bytes_min {
// Case 1. If we make target size of last level to be max_level_size,
// target size of the first non-empty level would be smaller than
// base_bytes_min. We set it be base_bytes_min.
self.base_level = first_non_empty_level;
base_bytes_min + 1
} else {
self.base_level = first_non_empty_level;
while self.base_level > 1 && cur_level_size > base_bytes_min {
self.base_level -= 1;
cur_level_size /= self.config.max_bytes_for_level_multiplier;
}
std::cmp::min(base_bytes_max, cur_level_size)
};

let mut level_multiplier = self.config.max_bytes_for_level_multiplier as f64;

if l0_size > base_level_size && levels[0].table_infos.len() > self.config.level0_max_file_number {
// We adjust the base level according to actual L0 size, and adjust
// the level multiplier accordingly, when the number of L0 files reaches twice the level0_max_file_number.
// We don't do this otherwise to keep the LSM-tree structure stable
// unless the L0 compation is backlogged.
base_level_size = l0_size;
if base_level == self.config.max_level {
// There is only two level (L0 and L1).
level_multiplier = 1.0;
} else {
unsafe {
level_multiplier =
roundf64(std::intrinsics::powf64(max_level_size as f64 / (base_level_size as f64), 1.0 / (self.config.max_level - base_level) as f64));
}
}
}

let mut level_size = base_level_size;
for i in self.base_level..=self.config.max_level {
self.level_max_bytes[i] = std::cmp::max(level_size, base_bytes_max);
level_size = (level_size as f64 * level_multiplier) as u64;
}
}
}
44 changes: 40 additions & 4 deletions src/meta/src/hummock/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@
// limitations under the License.

mod overlap_strategy;
mod compaction_picker;
mod level_selector;
mod tier_compaction_picker;
mod leveled_compaction_picker;

pub use level_selector::MAX_LEVEL;

use std::io::Cursor;

Expand All @@ -27,6 +32,7 @@ use risingwave_pb::hummock::{
};

use crate::hummock::compaction::overlap_strategy::RangeOverlapStrategy;
use crate::hummock::compaction::level_selector::{DynamicLevelSelector, LevelSelector};
use crate::hummock::compaction::tier_compaction_picker::TierCompactionPicker;
use crate::hummock::level_handler::LevelHandler;
use crate::hummock::model::HUMMOCK_DEFAULT_CF_NAME;
Expand All @@ -37,11 +43,17 @@ use crate::storage::{MetaStore, Transaction};
/// Hummock `compact_status` key
/// `cf(hummock_default)`: `hummock_compact_status_key` -> `CompactStatus`
pub(crate) const HUMMOCK_COMPACT_STATUS_KEY: &str = "compact_status";
const DEFAULT_MAX_COMPACTION_BYTES: u64 = 2 * 1024 * 1024 * 1024; // 2GB
const DEFAULT_MAX_BYTES_FOR_LEVEL_BASE: u64 = 1024 * 1024 * 1024;
const DEFAULT_LEVEL0_MAX_FILE_NUMBER: usize = 16;
const DEFAULT_LEVEL0_TRIGGER_NUMBER: usize = 4;
pub const MAX_LEVEL: usize = 6;

#[derive(Clone, PartialEq, Debug)]
pub struct CompactStatus {
pub(crate) level_handlers: Vec<LevelHandler>,
pub(crate) next_compact_task_id: u64,
compaction_selector: Box<dyn LevelSelector>,
}

pub struct SearchResult {
Expand All @@ -50,12 +62,36 @@ pub struct SearchResult {
split_ranges: Vec<KeyRange>,
}


pub struct CompactionConfig {
max_bytes_for_level_base: u64,
max_level: usize,
max_bytes_for_level_multiplier: u64,
max_compaction_bytes: u64,
level0_max_file_number: usize,
level0_trigger_number: usize,
}

impl Default for CompactionConfig {
fn default() -> Self {
Self {
max_bytes_for_level_base: DEFAULT_MAX_BYTES_FOR_LEVEL_BASE, // 1GB
max_bytes_for_level_multiplier: 10,
max_level: MAX_LEVEL,
max_compaction_bytes: DEFAULT_MAX_COMPACTION_BYTES,
level0_max_file_number: DEFAULT_LEVEL0_MAX_FILE_NUMBER,
level0_trigger_number: DEFAULT_LEVEL0_TRIGGER_NUMBER,
}
}
}

impl CompactStatus {
pub fn new() -> CompactStatus {
let vec_handler_having_l0 = vec![LevelHandler::new(0), LevelHandler::new(1)];
CompactStatus {
level_handlers: vec_handler_having_l0,
next_compact_task_id: 1,
compaction_selector: Box::new(DynamicLevelSelector::default()),
}
}

Expand Down Expand Up @@ -136,11 +172,11 @@ impl CompactStatus {

fn pick_compaction(&mut self, levels: Vec<Level>) -> Option<SearchResult> {
// only support compact L0 to L1 or L0 to L0
let picker = TierCompactionPicker::new(
self.compaction_selector.pick_compaction(
self.next_compact_task_id,
Box::new(RangeOverlapStrategy::default()),
);
picker.pick_compaction(levels, &mut self.level_handlers)
&levels,
&mut self.level_handlers,
)
}

/// Declares a task is either finished or canceled.
Expand Down
Loading