Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): support vnode hint in storage table #3628

Merged
merged 13 commits into from
Jul 5, 2022
Merged
13 changes: 3 additions & 10 deletions src/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,16 +225,9 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder {
ScanType::PointGet(row)
} else {
assert!(pk_prefix_value.size() < pk_len);

let iter = if is_full_range(&next_col_bounds) {
table
.batch_iter_with_pk_prefix(source.epoch, pk_prefix_value)
.await?
} else {
table
.batch_iter_with_pk_bounds(source.epoch, pk_prefix_value, next_col_bounds)
.await?
};
let iter = table
.batch_iter_with_pk_bounds(source.epoch, &pk_prefix_value, next_col_bounds)
.await?;
ScanType::RangeScan(iter)
};

Expand Down
11 changes: 8 additions & 3 deletions src/common/src/util/ordered/serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::borrow::Cow;
use std::cmp::Reverse;

use itertools::Itertools;
Expand Down Expand Up @@ -76,9 +77,13 @@ impl OrderedRowSerializer {
}

#[must_use]
pub fn prefix(&self, len: usize) -> Self {
Self {
order_types: self.order_types[..len].to_vec(),
pub fn prefix(&self, len: usize) -> Cow<Self> {
if len == self.order_types.len() {
Cow::Borrowed(self)
} else {
Cow::Owned(Self {
order_types: self.order_types[..len].to_vec(),
})
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/storage/hummock_sdk/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ fn next_key_no_alloc(key: &[u8]) -> Option<(&[u8], u8)> {
// End Copyright 2016 TiKV Project Authors. Licensed under Apache-2.0.

/// Get the end bound of the given `prefix` when transforming it to a key range.
fn end_bound_of_prefix(prefix: &[u8]) -> Bound<Vec<u8>> {
pub fn end_bound_of_prefix(prefix: &[u8]) -> Bound<Vec<u8>> {
if let Some((s, e)) = next_key_no_alloc(prefix) {
let mut res = Vec::with_capacity(s.len() + 1);
res.extend_from_slice(s);
Expand Down
61 changes: 15 additions & 46 deletions src/storage/src/table/state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::borrow::Cow;
use std::cmp::Ordering;
use std::marker::PhantomData;
use std::ops::{Index, RangeBounds};
use std::ops::Index;

use futures::{pin_mut, Stream, StreamExt};
use futures_async_stream::try_stream;
Expand Down Expand Up @@ -195,48 +195,9 @@ impl<S: StateStore, E: Encoding> StateTableBase<S, E> {

/// Iterator functions.
impl<S: StateStore> StateTable<S> {
async fn iter_with_encoded_key_range<'a, R>(
&'a self,
encoded_key_range: R,
epoch: u64,
) -> StorageResult<RowStream<'a, S>>
where
R: RangeBounds<Vec<u8>> + Send + Clone + 'a,
{
let storage_table_iter = self
.storage_table
.streaming_iter_with_encoded_key_range(encoded_key_range.clone(), epoch)
.await?;
let mem_table_iter = self.mem_table.iter(encoded_key_range);

Ok(StateTableRowIter::new(mem_table_iter, storage_table_iter).into_stream())
}

/// This function scans rows from the relational table.
pub async fn iter(&self, epoch: u64) -> StorageResult<RowStream<'_, S>> {
self.iter_with_pk_bounds::<_, Row>(.., epoch).await
}

/// This function scans rows from the relational table with specific `pk_bounds`.
pub async fn iter_with_pk_bounds<R, B>(
&self,
pk_bounds: R,
epoch: u64,
) -> StorageResult<RowStream<'_, S>>
where
R: RangeBounds<B> + Send + Clone + 'static,
B: AsRef<Row> + Send + Clone + 'static,
{
let encoded_start_key = pk_bounds
.start_bound()
.map(|pk| serialize_pk(pk.as_ref(), self.pk_serializer()));
let encoded_end_key = pk_bounds
.end_bound()
.map(|pk| serialize_pk(pk.as_ref(), self.pk_serializer()));
let encoded_key_range = (encoded_start_key, encoded_end_key);

self.iter_with_encoded_key_range(encoded_key_range, epoch)
.await
self.iter_with_pk_prefix(Row::empty(), epoch).await
}

/// This function scans rows from the relational table with specific `pk_prefix`.
Expand All @@ -245,12 +206,20 @@ impl<S: StateStore> StateTable<S> {
pk_prefix: &'a Row,
epoch: u64,
) -> StorageResult<RowStream<'a, S>> {
let prefix_serializer = self.pk_serializer().prefix(pk_prefix.size());
let encoded_prefix = serialize_pk(pk_prefix, &prefix_serializer);
let encoded_key_range = range_of_prefix(&encoded_prefix);
let storage_table_iter = self
.storage_table
.streaming_iter_with_pk_bounds(epoch, pk_prefix, ..)
.await?;

self.iter_with_encoded_key_range(encoded_key_range, epoch)
.await
let mem_table_iter = {
// TODO: reuse calculated serialized key from cell-based table.
let prefix_serializer = self.pk_serializer().prefix(pk_prefix.size());
let encoded_prefix = serialize_pk(pk_prefix, &prefix_serializer);
let encoded_key_range = range_of_prefix(&encoded_prefix);
self.mem_table.iter(encoded_key_range)
};

Ok(StateTableRowIter::new(mem_table_iter, storage_table_iter).into_stream())
}
}

Expand Down
Loading