Skip to content

Commit

Permalink
feat(relational_iter): add state_table_iter with pk_prefix and pk_bou…
Browse files Browse the repository at this point in the history
…nds (#3008)

* add iter with pk_bounds and pk_prefix
  • Loading branch information
wcy-fdu authored Jun 8, 2022
1 parent 408e9fb commit 001814f
Show file tree
Hide file tree
Showing 5 changed files with 724 additions and 0 deletions.
7 changes: 7 additions & 0 deletions src/common/src/array/data_chunk_iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,13 @@ impl ops::Index<usize> for Row {
}
}

impl AsRef<Row> for Row {
#[inline]
fn as_ref(&self) -> &Row {
self
}
}

// TODO: remove this due to implicit allocation
impl From<RowRef<'_>> for Row {
fn from(row_ref: RowRef<'_>) -> Self {
Expand Down
30 changes: 30 additions & 0 deletions src/storage/src/keyspace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
// limitations under the License.

use std::future::Future;
use std::ops::Bound::{Excluded, Included, Unbounded};
use std::ops::RangeBounds;

use bytes::{BufMut, Bytes, BytesMut};
use risingwave_common::catalog::TableId;
Expand Down Expand Up @@ -142,6 +144,34 @@ impl<S: StateStore> Keyspace<S> {
Ok(strip_prefix_iterator)
}

pub async fn iter_with_range<R, B>(
&self,
pk_bounds: R,
epoch: u64,
) -> StorageResult<StripPrefixIterator<S::Iter>>
where
R: RangeBounds<B> + Send,
B: AsRef<[u8]> + Send,
{
let start = match pk_bounds.start_bound() {
Included(k) => Included(Bytes::copy_from_slice(k.as_ref())),
Excluded(k) => Excluded(Bytes::copy_from_slice(k.as_ref())),
Unbounded => Unbounded,
};
let end = match pk_bounds.end_bound() {
Included(k) => Included(Bytes::copy_from_slice(k.as_ref())),
Excluded(k) => Excluded(Bytes::copy_from_slice(k.as_ref())),
Unbounded => Unbounded,
};
let range = (start, end);
let iter = self.store.iter(range, epoch).await?;
let strip_prefix_iterator = StripPrefixIterator {
iter,
prefix_len: self.prefix.len(),
};
Ok(strip_prefix_iterator)
}

/// Gets the underlying state store.
pub fn state_store(&self) -> S {
self.store.clone()
Expand Down
21 changes: 21 additions & 0 deletions src/storage/src/table/cell_based_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::collections::{BTreeMap, HashMap};
use std::ops::RangeBounds;
use std::sync::Arc;

use bytes::Bytes;
Expand Down Expand Up @@ -570,6 +571,26 @@ impl<S: StateStore> CellBasedTableStreamingIter<S> {
Ok(iter)
}

pub async fn new_with_bounds<R, B>(
keyspace: &Keyspace<S>,
table_descs: Vec<ColumnDesc>,
pk_bounds: R,
epoch: u64,
) -> StorageResult<Self>
where
R: RangeBounds<B> + Send,
B: AsRef<[u8]> + Send,
{
let cell_based_row_deserializer = CellBasedRowDeserializer::new(table_descs);

let iter = keyspace.iter_with_range(pk_bounds, epoch).await?;
let iter = Self {
iter,
cell_based_row_deserializer,
};
Ok(iter)
}

/// Yield a row with its primary key.
#[try_stream(ok = (Vec<u8>, Row), error = StorageError)]
pub async fn into_stream(mut self) {
Expand Down
194 changes: 194 additions & 0 deletions src/storage/src/table/state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ use std::borrow::Cow;
use std::cmp::Ordering;
use std::collections::btree_map;
use std::marker::PhantomData;
use std::ops::Bound::{Excluded, Included, Unbounded};
use std::ops::{Bound, RangeBounds};
use std::sync::Arc;

use futures::{pin_mut, Stream, StreamExt};
Expand All @@ -24,6 +26,7 @@ use risingwave_common::catalog::ColumnDesc;
use risingwave_common::error::RwError;
use risingwave_common::util::ordered::{serialize_pk, OrderedRowSerializer};
use risingwave_common::util::sort_util::OrderType;
use risingwave_hummock_sdk::key::next_key;

use super::cell_based_table::{CellBasedTable, CellBasedTableStreamingIter};
use super::mem_table::{MemTable, RowOp};
Expand Down Expand Up @@ -135,6 +138,96 @@ impl<S: StateStore> StateTable<S> {
epoch,
))
}

/// This function scans rows from the relational table with specific `pk_bounds`.
pub async fn iter_with_pk_bounds<R, B>(
&self,
pk_bounds: R,
epoch: u64,
) -> StorageResult<impl RowStream<'_>>
where
R: RangeBounds<B> + Send + Clone + 'static,
B: AsRef<Row> + Send + Clone + 'static,
{
let mem_table_iter = self.mem_table.buffer.iter();
let pk_serializer = self
.cell_based_table
.pk_serializer
.as_ref()
.expect("pk_serializer is None");
let cell_based_start_key = match pk_bounds.start_bound() {
Included(k) => Included(
self.keyspace
.prefixed_key(&serialize_pk(k.as_ref(), pk_serializer).map_err(err)?),
),
Excluded(k) => Excluded(
self.keyspace
.prefixed_key(&serialize_pk(k.as_ref(), pk_serializer).map_err(err)?),
),
Unbounded => Unbounded,
};
let cell_based_end_key = match pk_bounds.end_bound() {
Included(k) => Included(
self.keyspace
.prefixed_key(&serialize_pk(k.as_ref(), pk_serializer).map_err(err)?),
),
Excluded(k) => Excluded(
self.keyspace
.prefixed_key(&serialize_pk(k.as_ref(), pk_serializer).map_err(err)?),
),
Unbounded => Unbounded,
};
let cell_based_bounds = (cell_based_start_key, cell_based_end_key);

let mem_table_start_key = match pk_bounds.start_bound() {
Included(k) => Included(serialize_pk(k.as_ref(), pk_serializer).map_err(err)?),
Excluded(k) => Excluded(serialize_pk(k.as_ref(), pk_serializer).map_err(err)?),
Unbounded => Unbounded,
};
let mem_table_end_key = match pk_bounds.end_bound() {
Included(k) => Included(serialize_pk(k.as_ref(), pk_serializer).map_err(err)?),
Excluded(k) => Excluded(serialize_pk(k.as_ref(), pk_serializer).map_err(err)?),
Unbounded => Unbounded,
};
let mem_table_bounds = (mem_table_start_key, mem_table_end_key);
Ok(StateTableRowIter::into_stream_inner(
&self.keyspace,
self.column_descs.clone(),
mem_table_iter,
cell_based_bounds,
mem_table_bounds,
epoch,
))
}

/// This function scans rows from the relational table with specific `pk_prefix`.
pub async fn iter_with_pk_prefix(
&self,
pk_prefix: Row,
prefix_serializer: OrderedRowSerializer,
epoch: u64,
) -> StorageResult<impl RowStream<'_>> {
let mem_table_iter = self.mem_table.buffer.iter();
let key_bytes = serialize_pk(&pk_prefix, &prefix_serializer).map_err(err)?;
let start_key_with_prefix = self.keyspace.prefixed_key(&key_bytes);
let cell_based_bounds = (
Included(start_key_with_prefix.clone()),
Included(next_key(start_key_with_prefix.as_slice())),
);

let mem_table_bounds = (
Included(key_bytes.clone()),
Included(next_key(key_bytes.as_slice())),
);
Ok(StateTableRowIter::into_stream_inner(
&self.keyspace,
self.column_descs.clone(),
mem_table_iter,
cell_based_bounds,
mem_table_bounds,
epoch,
))
}
}

pub trait RowStream<'a> = Stream<Item = StorageResult<Cow<'a, Row>>> + 'a;
Expand Down Expand Up @@ -235,6 +328,107 @@ impl<S: StateStore> StateTableRowIter<S> {
}
}
}

/// This function scans kv pairs from the `shared_storage`(`cell_based_table`) and
/// memory(`mem_table`) with pk_bounds, and will be used in `iter_with_pk_prefix` and
/// `iter_with_pk_bounds`.
#[try_stream(ok = Cow<'a, Row>, error = StorageError)]
async fn into_stream_inner<'a>(
keyspace: &'a Keyspace<S>,
table_descs: Vec<ColumnDesc>,
mem_table_iter: MemTableIter<'a>,
cell_based_bounds: (Bound<Vec<u8>>, Bound<Vec<u8>>),
mem_table_bounds: (Bound<Vec<u8>>, Bound<Vec<u8>>),
epoch: u64,
) {
let cell_based_table_iter: futures::stream::Peekable<_> =
CellBasedTableStreamingIter::new_with_bounds(
keyspace,
table_descs,
cell_based_bounds,
epoch,
)
.await?
.into_stream()
.peekable();
pin_mut!(cell_based_table_iter);

let mut mem_table_iter = mem_table_iter
.map(|(k, v)| Ok::<_, StorageError>((k, v)))
.peekable();
loop {
match (
cell_based_table_iter.as_mut().peek().await,
mem_table_iter.peek(),
) {
(None, None) => break,
(Some(_), None) => {
let row: Row = cell_based_table_iter.next().await.unwrap()?.1;
yield Cow::Owned(row);
}
(None, Some(_)) => {
let (mem_table_pk, row_op) = mem_table_iter.next().unwrap()?;

if mem_table_bounds.contains(mem_table_pk) {
match row_op {
RowOp::Insert(row) | RowOp::Update((_, row)) => {
yield Cow::Borrowed(row);
}
_ => {}
}
}
}

(
Some(Ok((cell_based_pk, cell_based_row))),
Some(Ok((mem_table_pk, _mem_table_row_op))),
) => {
match cell_based_pk.cmp(mem_table_pk) {
Ordering::Less => {
// cell_based_table_item will be return
let row: Row = cell_based_table_iter.next().await.unwrap()?.1;
yield Cow::Owned(row);
}
Ordering::Equal => {
// mem_table_item will be return, while both
// and mem_table_iter need to execute
// once.
let (mem_table_pk, row_op) = mem_table_iter.next().unwrap()?;
if mem_table_bounds.contains(mem_table_pk) {
match row_op {
RowOp::Insert(row) => yield Cow::Borrowed(row),
RowOp::Delete(_) => {}
RowOp::Update((old_row, new_row)) => {
debug_assert!(old_row == cell_based_row);
yield Cow::Borrowed(new_row);
}
}
}
cell_based_table_iter.next().await.unwrap()?;
}
Ordering::Greater => {
// mem_table_item will be return
let (mem_table_pk, row_op) = mem_table_iter.next().unwrap()?;
if mem_table_bounds.contains(mem_table_pk) {
match row_op {
RowOp::Insert(row) => yield Cow::Borrowed(row),
RowOp::Delete(_) => {}
RowOp::Update(_) => unreachable!(),
}
}
}
}
}
(Some(_), Some(_)) => {
// Throw the error.
cell_based_table_iter.next().await.unwrap()?;
mem_table_iter.next().unwrap()?;

unreachable!()
}
}
}
}
}

fn err(rw: impl Into<RwError>) -> StorageError {
Expand Down
Loading

0 comments on commit 001814f

Please sign in to comment.