Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(hummock): remove combine group for compaction #3138

Merged
merged 2 commits into from
Jun 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ message SstableInfo {
KeyRange key_range = 2;
uint64 file_size = 3;
repeated common.VNodeBitmap vnode_bitmaps = 4;
uint64 unit_id = 5;
}

enum LevelType {
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/hummock/compaction/tier_compaction_picker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ pub mod tests {
}),
file_size: (right - left + 1) as u64,
vnode_bitmaps: vec![],
unit_id: u64::MAX,
}
}

Expand Down
1 change: 1 addition & 0 deletions src/meta/src/hummock/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ pub fn generate_test_tables(epoch: u64, sst_ids: Vec<HummockSSTableId>) -> Vec<S
bitmap: vec![],
},
],
unit_id: 0,
});
}
sst_info
Expand Down
27 changes: 15 additions & 12 deletions src/storage/src/hummock/compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ use risingwave_rpc_client::HummockMetaClient;
use tokio::sync::oneshot::Sender;
use tokio::task::JoinHandle;

use super::group_builder::KeyValueGroupingImpl::VirtualNode;
use super::group_builder::{GroupedSstableBuilder, VirtualNodeGrouping};
use super::iterator::{BoxedForwardHummockIterator, ConcatIterator, MergeIterator};
use super::{HummockResult, SSTableBuilder, SSTableIterator, SSTableIteratorType, Sstable};
use crate::hummock::compaction_executor::CompactionExecutor;
use crate::hummock::group_builder::KeyValueGrouping;
use crate::hummock::iterator::ReadOptions;
use crate::hummock::multi_builder::SealedSstableBuilder;
use crate::hummock::shared_buffer::shared_buffer_uploader::UploadTaskPayload;
Expand Down Expand Up @@ -136,7 +136,7 @@ pub struct Compactor {
compact_task: CompactTask,
}

pub type CompactOutput = (usize, Vec<(Sstable, Vec<VNodeBitmap>)>);
pub type CompactOutput = (usize, Vec<(Sstable, u64, Vec<VNodeBitmap>)>);

impl Compactor {
/// Create a new compactor.
Expand All @@ -151,7 +151,7 @@ impl Compactor {
pub async fn compact_shared_buffer(
context: Arc<CompactorContext>,
payload: &UploadTaskPayload,
) -> HummockResult<Vec<(Sstable, Vec<VNodeBitmap>)>> {
) -> HummockResult<Vec<(Sstable, u64, Vec<VNodeBitmap>)>> {
let mut start_user_keys = payload
.iter()
.flat_map(|data_list| data_list.iter().map(UncommittedData::start_user_key))
Expand Down Expand Up @@ -253,7 +253,7 @@ impl Compactor {
let mut level0 = Vec::with_capacity(parallelism);

for (_, sst) in output_ssts {
for (table, _) in &sst {
for (table, _, _) in &sst {
compactor
.context
.stats
Expand Down Expand Up @@ -444,7 +444,7 @@ impl Compactor {
.reserve(self.compact_task.splits.len());
let mut compaction_write_bytes = 0;
for (_, ssts) in output_ssts {
for (sst, vnode_bitmaps) in ssts {
for (sst, unit_id, vnode_bitmaps) in ssts {
let sst_info = SstableInfo {
id: sst.id,
key_range: Some(risingwave_pb::hummock::KeyRange {
Expand All @@ -454,6 +454,7 @@ impl Compactor {
}),
file_size: sst.meta.estimated_size as u64,
vnode_bitmaps,
unit_id,
};
compaction_write_bytes += sst_info.file_size;
self.compact_task.sorted_output_ssts.push(sst_info);
Expand Down Expand Up @@ -507,11 +508,11 @@ impl Compactor {
let timer = Instant::now();
let table_id = (self.context.sstable_id_generator)().await?;
let cost = (timer.elapsed().as_secs_f64() * 1000000.0).round() as u64;
let builder = SSTableBuilder::new(self.context.options.as_ref().into());
let builder = SSTableBuilder::new(table_id, self.context.options.as_ref().into());
get_id_time.fetch_add(cost, Ordering::Relaxed);
Ok((table_id, builder))
Ok(builder)
},
VirtualNode(VirtualNodeGrouping::new(vnode2unit)),
VirtualNodeGrouping::new(vnode2unit),
self.context.sstable_store.clone(),
);

Expand Down Expand Up @@ -543,11 +544,12 @@ impl Compactor {
vnode_bitmaps,
upload_join_handle,
data_len,
unit_id,
} in sealed_builders
{
let sst = Sstable { id: table_id, meta };
let len = data_len;
ssts.push((sst.clone(), vnode_bitmaps));
ssts.push((sst.clone(), unit_id, vnode_bitmaps));
upload_join_handles.push(upload_join_handle);

if self.context.is_share_buffer_compact {
Expand Down Expand Up @@ -779,8 +781,8 @@ impl Compactor {
(join_handle, shutdown_tx)
}

async fn compact_and_build_sst<B, F>(
sst_builder: &mut GroupedSstableBuilder<B>,
async fn compact_and_build_sst<B, G, F>(
sst_builder: &mut GroupedSstableBuilder<B, G>,
kr: KeyRange,
mut iter: BoxedForwardHummockIterator,
has_user_key_overlap: bool,
Expand All @@ -789,7 +791,8 @@ impl Compactor {
) -> HummockResult<()>
where
B: Clone + Fn() -> F,
F: Future<Output = HummockResult<(u64, SSTableBuilder)>>,
G: KeyValueGrouping,
F: Future<Output = HummockResult<SSTableBuilder>>,
{
if !kr.left.is_empty() {
iter.seek(&kr.left).await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ impl SharedBufferUploader {

let uploaded_sst_info: Vec<SstableInfo> = tables
.into_iter()
.map(|(sst, vnode_bitmaps)| SstableInfo {
.map(|(sst, unit_id, vnode_bitmaps)| SstableInfo {
id: sst.id,
key_range: Some(risingwave_pb::hummock::KeyRange {
left: sst.meta.smallest_key.clone(),
Expand All @@ -212,6 +212,7 @@ impl SharedBufferUploader {
}),
file_size: sst.meta.estimated_size as u64,
vnode_bitmaps,
unit_id,
})
.collect();

Expand Down
13 changes: 8 additions & 5 deletions src/storage/src/hummock/sstable/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,11 @@ pub struct SSTableBuilder {
/// Last added full key.
last_full_key: Bytes,
key_count: usize,
sstable_id: u64,
}

impl SSTableBuilder {
pub fn new(options: SSTableBuilderOptions) -> Self {
pub fn new(sstable_id: u64, options: SSTableBuilderOptions) -> Self {
Self {
options: options.clone(),
buf: BytesMut::with_capacity(options.capacity),
Expand All @@ -99,6 +100,7 @@ impl SSTableBuilder {
user_key_hashes: Vec::with_capacity(options.capacity / DEFAULT_ENTRY_SIZE + 1),
last_full_key: Bytes::default(),
key_count: 0,
sstable_id,
}
}

Expand Down Expand Up @@ -162,7 +164,7 @@ impl SSTableBuilder {
/// ```plain
/// | Block 0 | ... | Block N-1 | N (4B) |
/// ```
pub fn finish(mut self) -> (Bytes, SstableMeta, Vec<VNodeBitmap>) {
pub fn finish(mut self) -> (u64, Bytes, SstableMeta, Vec<VNodeBitmap>) {
let smallest_key = self.block_metas[0].smallest_key.clone();
let largest_key = self.last_full_key.to_vec();
self.build_block();
Expand All @@ -187,6 +189,7 @@ impl SSTableBuilder {
};

(
self.sstable_id,
self.buf.freeze(),
meta,
self.vnode_bitmaps
Expand Down Expand Up @@ -248,20 +251,20 @@ pub(super) mod tests {
compression_algorithm: CompressionAlgorithm::None,
};

let b = SSTableBuilder::new(opt);
let b = SSTableBuilder::new(0, opt);

b.finish();
}

#[test]
fn test_smallest_key_and_largest_key() {
let mut b = SSTableBuilder::new(default_builder_opt_for_test());
let mut b = SSTableBuilder::new(0, default_builder_opt_for_test());

for i in 0..TEST_KEYS_COUNT {
b.add(&test_key_of(i), HummockValue::put(&test_value_of(i)));
}

let (_, meta, _) = b.finish();
let (_, _, meta, _) = b.finish();

assert_eq!(test_key_of(0), meta.smallest_key);
assert_eq!(test_key_of(TEST_KEYS_COUNT - 1), meta.largest_key);
Expand Down
Loading