Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: read break down #1094

Merged
merged 3 commits into from
Mar 21, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,337 changes: 1,257 additions & 1,080 deletions grafana/risingwave-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion rust/meta/src/hummock/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ async fn get_hummock_storage() -> (HummockStorage, Arc<HummockManager<MemStore>>
});
let hummock_meta_client = Arc::new(get_hummock_meta_client().await);
let obj_client = Arc::new(InMemObjectStore::new());
let sstable_store = Arc::new(SstableStore::new(obj_client.clone(), remote_dir));
let sstable_store = Arc::new(SstableStore::new(obj_client.clone(), remote_dir, None));
let local_version_manager = Arc::new(LocalVersionManager::new(sstable_store.clone()));
let storage = HummockStorage::with_default_stats(
options.clone(),
Expand Down
1 change: 1 addition & 0 deletions rust/storage/src/hummock/compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ impl Compactor {
))
},
)),
Some(context.stats.clone()),
);

let context_clone = context.clone();
Expand Down
2 changes: 1 addition & 1 deletion rust/storage/src/hummock/iterator/concat_inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl<TI: SSTableIteratorType> HummockIterator for ConcatIteratorInner<TI> {

self.seek_idx(table_idx, Some(key)).await?;
if !self.is_valid() {
// seek to next block
// seek to next table
self.seek_idx(table_idx + 1, None).await?;
}
Ok(())
Expand Down
6 changes: 3 additions & 3 deletions rust/storage/src/hummock/iterator/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ mod test {
.map(|x| Box::new(x) as BoxedHummockIterator)
.collect_vec();

let mut mi = MergeIterator::new(iters);
let mut mi = MergeIterator::new(iters, None);
let mut i = 0;
mi.rewind().await.unwrap();
while mi.is_valid() {
Expand Down Expand Up @@ -84,7 +84,7 @@ mod test {
.map(|x| Box::new(x) as BoxedHummockIterator)
.collect_vec();

let mut mi = MergeIterator::new(iters);
let mut mi = MergeIterator::new(iters, None);
let test_validator = &validators[2];

// right edge case
Expand Down Expand Up @@ -131,7 +131,7 @@ mod test {
Box::new(SSTableIterator::new(Arc::new(table1), sstable_store)),
];

let mut mi = MergeIterator::new(iters);
let mut mi = MergeIterator::new(iters, None);

mi.rewind().await.unwrap();
let mut count = 0;
Expand Down
44 changes: 43 additions & 1 deletion rust/storage/src/hummock/iterator/merge_inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
//
use std::collections::binary_heap::PeekMut;
use std::collections::{BinaryHeap, LinkedList};
use std::sync::Arc;

use async_trait::async_trait;

Expand All @@ -22,6 +23,7 @@ use crate::hummock::iterator::{BoxedHummockIterator, HummockIterator};
use crate::hummock::value::HummockValue;
use crate::hummock::version_cmp::VersionedComparator;
use crate::hummock::HummockResult;
use crate::monitor::StateStoreMetrics;

pub struct Node<'a, const DIRECTION: usize>(BoxedHummockIterator<'a>);

Expand Down Expand Up @@ -56,14 +58,21 @@ pub struct MergeIteratorInner<'a, const DIRECTION: usize> {

/// The heap for merge sort.
heap: BinaryHeap<Node<'a, DIRECTION>>,

/// Statistics.
stats: Option<Arc<StateStoreMetrics>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if remove Option, and pass StateStoreMetrics::unused() in unit tests?
Now all these metrics structs implemented a interface xxx::unused() for unit tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if remove Option, and pass StateStoreMetrics::unused() in unit tests? Now all these metrics structs implemented a interface xxx::unused() for unit tests.

Sounds good!

}

impl<'a, const DIRECTION: usize> MergeIteratorInner<'a, DIRECTION> {
/// Caller should make sure that `iterators`'s direction is the same as `DIRECTION`.
pub fn new(iterators: impl IntoIterator<Item = BoxedHummockIterator<'a>>) -> Self {
pub fn new(
iterators: impl IntoIterator<Item = BoxedHummockIterator<'a>>,
stats: Option<Arc<StateStoreMetrics>>,
) -> Self {
Self {
unused_iters: iterators.into_iter().collect(),
heap: BinaryHeap::new(),
stats,
}
}

Expand All @@ -88,6 +97,18 @@ impl<'a, const DIRECTION: usize> MergeIteratorInner<'a, DIRECTION> {
#[async_trait]
impl<const DIRECTION: usize> HummockIterator for MergeIteratorInner<'_, DIRECTION> {
async fn next(&mut self) -> HummockResult<()> {
let timer = if self.stats.is_some() {
Some(
self.stats
.as_ref()
.unwrap()
.iter_merge_next_duration
.start_timer(),
)
} else {
None
};

let mut node = self.heap.peek_mut().expect("no inner iter");

node.0.next().await?;
Expand All @@ -100,6 +121,10 @@ impl<const DIRECTION: usize> HummockIterator for MergeIteratorInner<'_, DIRECTIO
drop(node);
}

if let Some(timer) = timer {
timer.observe_duration();
}

Ok(())
}

Expand All @@ -123,9 +148,26 @@ impl<const DIRECTION: usize> HummockIterator for MergeIteratorInner<'_, DIRECTIO
}

async fn seek(&mut self, key: &[u8]) -> HummockResult<()> {
let timer = if self.stats.is_some() {
Some(
self.stats
.as_ref()
.unwrap()
.iter_merge_seek_duration
.start_timer(),
)
} else {
None
};

self.reset_heap();
futures::future::try_join_all(self.unused_iters.iter_mut().map(|x| x.seek(key))).await?;
self.build_heap();

if let Some(timer) = timer {
timer.observe_duration();
}

Ok(())
}
}
6 changes: 3 additions & 3 deletions rust/storage/src/hummock/iterator/reverse_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ mod test {
.map(|x| Box::new(x) as BoxedHummockIterator)
.collect_vec();

let mut mi = ReverseMergeIterator::new(iters);
let mut mi = ReverseMergeIterator::new(iters, None);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to pass StateStoreMetrics::unused()

let mut i = 0;
mi.rewind().await.unwrap();
while mi.is_valid() {
Expand Down Expand Up @@ -90,7 +90,7 @@ mod test {
.map(|x| Box::new(x) as BoxedHummockIterator)
.collect_vec();

let mut mi = ReverseMergeIterator::new(iters);
let mut mi = ReverseMergeIterator::new(iters, None);
let test_validator = &validators[2];

// right edge case
Expand Down Expand Up @@ -137,7 +137,7 @@ mod test {
Box::new(ReverseSSTableIterator::new(Arc::new(table0), sstable_store)),
];

let mut mi = ReverseMergeIterator::new(iters);
let mut mi = ReverseMergeIterator::new(iters, None);

mi.rewind().await.unwrap();
let mut count = 0;
Expand Down
16 changes: 8 additions & 8 deletions rust/storage/src/hummock/iterator/reverse_user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ mod tests {
.map(|x| Box::new(x) as BoxedHummockIterator)
.collect_vec();

let mi = ReverseMergeIterator::new(iters);
let mi = ReverseMergeIterator::new(iters, None);
let mut ui = ReverseUserIterator::new(mi, (Unbounded, Unbounded));
ui.rewind().await.unwrap();

Expand Down Expand Up @@ -355,7 +355,7 @@ mod tests {
.map(|x| Box::new(x) as BoxedHummockIterator)
.collect_vec();

let mi = ReverseMergeIterator::new(iters);
let mi = ReverseMergeIterator::new(iters, None);
let mut ui = ReverseUserIterator::new(mi, (Unbounded, Unbounded));
let test_validator = &validators[2];

Expand Down Expand Up @@ -418,7 +418,7 @@ mod tests {
)),
Box::new(SSTableIterator::new(Arc::new(table1), sstable_store)),
];
let mi = ReverseMergeIterator::new(iters);
let mi = ReverseMergeIterator::new(iters, None);
let mut ui = ReverseUserIterator::new(mi, (Unbounded, Unbounded));

ui.rewind().await.unwrap();
Expand Down Expand Up @@ -463,7 +463,7 @@ mod tests {
Arc::new(table),
sstable_store,
))];
let mi = ReverseMergeIterator::new(iters);
let mi = ReverseMergeIterator::new(iters, None);

let begin_key = Included(user_key(iterator_test_key_of_epoch(0, 2, 0).as_slice()).to_vec());
let end_key = Included(user_key(iterator_test_key_of_epoch(0, 7, 0).as_slice()).to_vec());
Expand Down Expand Up @@ -543,7 +543,7 @@ mod tests {
Arc::new(table),
sstable_store,
))];
let mi = ReverseMergeIterator::new(iters);
let mi = ReverseMergeIterator::new(iters, None);

let begin_key = Excluded(user_key(iterator_test_key_of_epoch(0, 2, 0).as_slice()).to_vec());
let end_key = Included(user_key(iterator_test_key_of_epoch(0, 7, 0).as_slice()).to_vec());
Expand Down Expand Up @@ -624,7 +624,7 @@ mod tests {
Arc::new(table),
sstable_store,
))];
let mi = ReverseMergeIterator::new(iters);
let mi = ReverseMergeIterator::new(iters, None);
let end_key = Included(user_key(iterator_test_key_of_epoch(0, 7, 0).as_slice()).to_vec());

let mut ui = ReverseUserIterator::new(mi, (Unbounded, end_key));
Expand Down Expand Up @@ -703,7 +703,7 @@ mod tests {
Arc::new(table),
sstable_store,
))];
let mi = ReverseMergeIterator::new(iters);
let mi = ReverseMergeIterator::new(iters, None);
let begin_key = Included(user_key(iterator_test_key_of_epoch(0, 2, 0).as_slice()).to_vec());

let mut ui = ReverseUserIterator::new(mi, (begin_key, Unbounded));
Expand Down Expand Up @@ -795,7 +795,7 @@ mod tests {
Arc::new(clone_sst(&table)),
sstable_store,
))];
let rsi = ReverseMergeIterator::new(iters);
let rsi = ReverseMergeIterator::new(iters, None);
let mut ruki = ReverseUserIterator::new(rsi, (start_bound, end_bound));
let num_puts: usize = truth
.iter()
Expand Down
2 changes: 1 addition & 1 deletion rust/storage/src/hummock/iterator/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ pub fn mock_sstable_store() -> SstableStoreRef {

pub fn mock_sstable_store_with_object_store(object_store: ObjectStoreRef) -> SstableStoreRef {
let path = "test".to_string();
Arc::new(SstableStore::new(object_store, path))
Arc::new(SstableStore::new(object_store, path, None))
}

#[cfg(test)]
Expand Down
14 changes: 7 additions & 7 deletions rust/storage/src/hummock/iterator/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ mod tests {
.map(|x| Box::new(x) as BoxedHummockIterator)
.collect_vec();

let mi = MergeIterator::new(iters);
let mi = MergeIterator::new(iters, None);
let mut ui = UserIterator::for_test(mi, (Unbounded, Unbounded));
ui.rewind().await.unwrap();

Expand Down Expand Up @@ -318,7 +318,7 @@ mod tests {
.map(|x| Box::new(x) as BoxedHummockIterator)
.collect_vec();

let mi = MergeIterator::new(iters);
let mi = MergeIterator::new(iters, None);
let mut ui = UserIterator::for_test(mi, (Unbounded, Unbounded));
let test_validator = &validators[2];

Expand Down Expand Up @@ -385,7 +385,7 @@ mod tests {
sstable_store.clone(),
)),
];
let mi = MergeIterator::new(iters);
let mi = MergeIterator::new(iters, None);
let mut ui = UserIterator::for_test(mi, (Unbounded, Unbounded));
ui.rewind().await.unwrap();

Expand Down Expand Up @@ -427,7 +427,7 @@ mod tests {
Arc::new(table),
sstable_store,
))];
let mi = MergeIterator::new(iters);
let mi = MergeIterator::new(iters, None);

let begin_key = Included(user_key(iterator_test_key_of_epoch(0, 2, 0).as_slice()).to_vec());
let end_key = Included(user_key(iterator_test_key_of_epoch(0, 7, 0).as_slice()).to_vec());
Expand Down Expand Up @@ -507,7 +507,7 @@ mod tests {
Arc::new(table),
sstable_store,
))];
let mi = MergeIterator::new(iters);
let mi = MergeIterator::new(iters, None);

let begin_key = Included(user_key(iterator_test_key_of_epoch(0, 2, 0).as_slice()).to_vec());
let end_key = Excluded(user_key(iterator_test_key_of_epoch(0, 7, 0).as_slice()).to_vec());
Expand Down Expand Up @@ -588,7 +588,7 @@ mod tests {
Arc::new(table),
sstable_store,
))];
let mi = MergeIterator::new(iters);
let mi = MergeIterator::new(iters, None);
let end_key = Included(user_key(iterator_test_key_of_epoch(0, 7, 0).as_slice()).to_vec());

let mut ui = UserIterator::for_test(mi, (Unbounded, end_key));
Expand Down Expand Up @@ -671,7 +671,7 @@ mod tests {
Arc::new(table),
sstable_store,
))];
let mi = MergeIterator::new(iters);
let mi = MergeIterator::new(iters, None);
let begin_key = Included(user_key(iterator_test_key_of_epoch(0, 2, 0).as_slice()).to_vec());

let mut ui = UserIterator::for_test(mi, (begin_key, Unbounded));
Expand Down
Loading