Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cell_based_table): add sentinel column and cell_based muti_get #1590

Merged
merged 12 commits into from
Apr 11, 2022
22 changes: 8 additions & 14 deletions src/common/src/util/ordered/serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::util::sort_util::{OrderPair, OrderType};
use crate::util::value_encoding::serialize_cell;

/// The special `cell_id` reserved for a whole null row is `i32::MIN`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update this doc as well. :)

pub const NULL_ROW_SPECIAL_CELL_ID: ColumnId = ColumnId::new(i32::MIN);
pub const SENTINEL_CELL_ID: ColumnId = ColumnId::new(-1_i32);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, -1 is because we want SENTINEL_CELL_ID to be at the very beginning when we do scan? If so, may add some comments to it.
May give some examples in the comments, e.g. normal row, all-none row, what the storage format would be like.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure how we are encoding the column id now. If simply using be or le encoding, -1 might not be at the beginning when scanning.


/// We can use memcomparable serialization to serialize data
/// and flip the bits if the order of that datum is descending.
Expand Down Expand Up @@ -144,7 +144,6 @@ pub fn serialize_pk_and_row(
assert_eq!(values.0.len(), column_ids.len());
}
let mut result = vec![];
let mut all_null = true;
for (index, column_id) in column_ids.iter().enumerate() {
let key = [pk_buf, serialize_column_id(column_id)?.as_slice()].concat();
match row {
Expand All @@ -154,32 +153,27 @@ pub fn serialize_pk_and_row(
// we serialize this null row specially by only using one cell encoding.
}
datum => {
all_null = false;
let value = serialize_cell(datum)?;
result.push((key, Some(value)));
}
},
None => {
// A `None` of row means deleting that row, while the a `None` of datum represents a
// null.
all_null = false;
result.push((key, None));
}
}
}
if all_null {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May check whether the comments for this whole function is obsolete.

// Here we use a special column id -1 to represent a row consisting of all null values.
// `MViewTable` has a `get` interface which accepts a cell id. A null row in this case
// would return null datum as it has only a single cell with column id == -1 and `get`
// gets nothing.
let key = [
pk_buf,
serialize_column_id(&NULL_ROW_SPECIAL_CELL_ID)?.as_slice(),
]
.concat();

if row.is_none() {
let key = [pk_buf, serialize_column_id(&SENTINEL_CELL_ID)?.as_slice()].concat();
result.push((key, None));
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
} else {
let key = [pk_buf, serialize_column_id(&SENTINEL_CELL_ID)?.as_slice()].concat();
let value = serialize_cell(&None)?;
result.push((key, Some(value)));
}

Ok(result)
}

Expand Down
22 changes: 11 additions & 11 deletions src/storage/src/cell_based_row_deserializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ use std::collections::HashMap;
use bytes::Bytes;
use risingwave_common::array::Row;
use risingwave_common::catalog::{ColumnDesc, ColumnId};
use risingwave_common::error::Result;
use risingwave_common::error::{ErrorCode, Result};
use risingwave_common::types::Datum;
use risingwave_common::util::ordered::{deserialize_column_id, NULL_ROW_SPECIAL_CELL_ID};
use risingwave_common::util::ordered::deserialize_column_id;
use risingwave_common::util::value_encoding::deserialize_cell;

#[derive(Clone)]
Expand All @@ -34,7 +34,6 @@ pub struct CellBasedRowDeserializer {
/// which should also be done on the caller side.
pk_bytes: Option<Vec<u8>>,
}

impl CellBasedRowDeserializer {
pub fn new(table_column_descs: Vec<ColumnDesc>) -> Self {
let num_cells = table_column_descs.len();
Expand All @@ -60,20 +59,20 @@ impl CellBasedRowDeserializer {
) -> Result<Option<(Vec<u8>, Row)>> {
let pk_with_cell_id = pk_with_cell_id.to_vec();
let pk_vec_len = pk_with_cell_id.len();
let cur_pk_bytes = &pk_with_cell_id[0..pk_vec_len - 4];
if pk_vec_len < 4 {
return Err(ErrorCode::InternalError("corrupted key".to_owned()).into());
}
let (cur_pk_bytes, cell_id_bytes) = pk_with_cell_id.split_at(pk_vec_len - 4);
let mut result = None;
if let Some(prev_pk_bytes) = &self.pk_bytes && prev_pk_bytes != cur_pk_bytes {
let cell_id = deserialize_column_id(cell_id_bytes)?;
if let Some(prev_pk_bytes) = &self.pk_bytes && prev_pk_bytes != cur_pk_bytes {
result = self.take();
self.pk_bytes = Some(cur_pk_bytes.to_vec());
} else if self.pk_bytes.is_none() {
self.pk_bytes = Some(cur_pk_bytes.to_vec());
}

let cell_id_bytes = &pk_with_cell_id[pk_vec_len - 4..];
let cell_id = deserialize_column_id(cell_id_bytes)?;
if cell_id == NULL_ROW_SPECIAL_CELL_ID {
// do nothing
} else if let Some((column_desc, index)) = self.columns.get(&cell_id) {
if let Some((column_desc, index)) = self.columns.get(&cell_id) {
let mut de = value_encoding::Deserializer::new(cell.clone());
if let Some(datum) = deserialize_cell(&mut de, &column_desc.data_type)? {
let old = self.data.get_mut(*index).unwrap().replace(datum);
Expand All @@ -82,6 +81,7 @@ impl CellBasedRowDeserializer {
} else {
// ignore this cell
}

Ok(result)
}

Expand Down Expand Up @@ -140,7 +140,6 @@ mod tests {
let bytes2 = serialize_pk_and_row(&pk2, &Some(row2.clone()), &column_ids).unwrap();
let bytes3 = serialize_pk_and_row(&pk3, &Some(row3.clone()), &column_ids).unwrap();
let bytes = [bytes1, bytes2, bytes3].concat();

let partial_table_column_descs = table_column_descs.into_iter().skip(1).take(3).collect();
let mut result = vec![];
let mut deserializer = CellBasedRowDeserializer::new(partial_table_column_descs);
Expand All @@ -153,6 +152,7 @@ mod tests {
}
}
let pk_and_row = deserializer.take();

result.push(pk_and_row.unwrap().1);

for (expected, result) in [row1, row2, row3].into_iter().zip_eq(result.into_iter()) {
Expand Down
92 changes: 35 additions & 57 deletions src/storage/src/table/cell_based_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

use bytes::Bytes;
Expand All @@ -21,10 +20,8 @@ use risingwave_common::array::column::Column;
use risingwave_common::array::{DataChunk, Row};
use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema};
use risingwave_common::error::{ErrorCode, RwError};
use risingwave_common::types::Datum;
use risingwave_common::util::ordered::*;
use risingwave_common::util::sort_util::OrderType;
use risingwave_common::util::value_encoding::deserialize_cell;
use risingwave_hummock_sdk::key::next_key;

use super::TableIter;
Expand All @@ -50,8 +47,6 @@ pub struct CellBasedTable<S: StateStore> {
column_descs: Vec<ColumnDesc>,

/// Mapping from column id to column index
column_id_to_column_index: HashMap<ColumnId, usize>,

pk_serializer: Option<OrderedRowSerializer>,

cell_based_row_serializer: CellBasedRowSerializer,
Expand Down Expand Up @@ -87,13 +82,11 @@ impl<S: StateStore> CellBasedTable<S> {
.map(|cd| Field::with_name(cd.data_type.clone(), cd.name.clone()))
.collect_vec(),
);
let column_id_to_column_index = generate_column_id_to_column_index_mapping(&column_descs);
let column_ids = generate_column_id(&column_descs);
Self {
keyspace,
schema,
column_descs,
column_id_to_column_index,

pk_serializer: ordered_row_serializer,
cell_based_row_serializer: CellBasedRowSerializer::new(),
Expand Down Expand Up @@ -125,9 +118,41 @@ impl<S: StateStore> CellBasedTable<S> {
}

// cell-based interface
pub async fn get_row(&self, _pk: &Row, _epoch: u64) -> StorageResult<Option<Row>> {
// get row by state_store multi get
todo!()
pub async fn get_row(&self, pk: &Row, epoch: u64) -> StorageResult<Option<Row>> {
// TODO: use multi-get for cell_based get_row
let pk_serializer = self.pk_serializer.as_ref().expect("pk_serializer is None");
let serialized_pk = &serialize_pk(pk, pk_serializer).map_err(err)?[..];
let sentinel_key = [
serialized_pk,
&serialize_column_id(&SENTINEL_CELL_ID).map_err(err)?,
]
.concat();
let mut get_res = Vec::new();
let sentinel_cell = self.keyspace.get(&sentinel_key, epoch).await?;

if sentinel_cell.is_none() {
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
// if sentinel cell is none, this row doesn't exist
return Ok(None);
} else {
get_res.push((sentinel_key, sentinel_cell.unwrap()));
}
for column_id in &self.column_ids {
let key = [serialized_pk, &serialize_column_id(column_id).map_err(err)?].concat();
let state_store_get_res = self.keyspace.get(&key, epoch).await?;
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
if let Some(state_store_get_res) = state_store_get_res {
get_res.push((key, state_store_get_res));
}
}
let mut cell_based_row_deserializer =
CellBasedRowDeserializer::new(self.column_descs.clone());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May optimize CellBasedRowDeserializer to take reference in the future, or refactor it to take Arc<[ColumnDesc]>.

for (key, value) in get_res {
let deserialize_res = cell_based_row_deserializer
.deserialize(&Bytes::from(key), &value)
.map_err(err)?;
assert!(deserialize_res.is_none());
}
let pk_and_row = cell_based_row_deserializer.take();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we can unwrap here?

Ok(pk_and_row.map(|(_pk, row)| row))
}

pub async fn get_row_by_scan(&self, pk: &Row, epoch: u64) -> StorageResult<Option<Row>> {
Expand Down Expand Up @@ -214,58 +239,11 @@ impl<S: StateStore> CellBasedTable<S> {
.await
}

pub async fn get_for_test(
&self,
pk: Row,
column_id: i32,
epoch: u64,
) -> StorageResult<Option<Datum>> {
assert!(
self.pk_serializer.is_some(),
"this table is adhoc and there's no sort key serializer"
);

let column_id = ColumnId::new(column_id);

let column_index = self.column_id_to_column_index.get(&column_id).unwrap();
// TODO(MrCroxx): More efficient encoding is needed.

let buf = self
.keyspace
.get(
&[
&serialize_pk(&pk, self.pk_serializer.as_ref().unwrap()).map_err(err)?[..],
&serialize_column_id(&column_id).map_err(err)?,
]
.concat(),
epoch,
)
.await?;

if let Some(buf) = buf {
let mut de = value_encoding::Deserializer::new(buf);
let cell = deserialize_cell(&mut de, &self.schema.fields[*column_index].data_type)
.map_err(err)?;
Ok(Some(cell))
} else {
Ok(None)
}
}
pub fn schema(&self) -> &Schema {
&self.schema
}
}

fn generate_column_id_to_column_index_mapping(
column_descs: &[ColumnDesc],
) -> HashMap<ColumnId, usize> {
let mut mapping = HashMap::with_capacity(column_descs.len());
for (index, column_desc) in column_descs.iter().enumerate() {
mapping.insert(column_desc.column_id, index);
}
mapping
}

fn generate_column_id(column_descs: &[ColumnDesc]) -> Vec<ColumnId> {
column_descs.iter().map(|d| d.column_id).collect()
}
Expand Down
9 changes: 4 additions & 5 deletions src/stream/src/executor/managed_state/top_n/top_n_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,14 +291,13 @@ impl<S: StateStore, const TOP_N_TYPE: usize> ManagedTopNState<S, TOP_N_TYPE> {
number_rows: Option<usize>,
epoch: u64,
) -> Result<Vec<(OrderedRow, Row)>> {
// We remark that since we now encode a null row by only using a special cell encoding
// instead of # datum in Row of cell encodings. `top_n_count *
// self.data_types.len()` over-calculates the number of kv-pairs that we need to
// read from storage. But it is fine.
// We remark that since we uses a sentinel column by encoding a special none cell.
// `top_n_count * self.data_types.len()` over-calculates the number of kv-pairs that
// we need to read from storage. But it is fine.
let pk_row_bytes = self
.keyspace
.scan_strip_prefix(
number_rows.map(|top_n_count| top_n_count * self.data_types.len()),
number_rows.map(|top_n_count| top_n_count * (self.data_types.len() + 1)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So top-n will possibly scan more cells from storage? If there're nulls in storage, it will scan more that it needs. Hmm... better to have this refactored in the future. For example, use iterator-based APIs. (cc @lmatz)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also update the comments above

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I talked to @wcy-fdu about this part requiring refactoring in the future.

epoch,
)
.await?
Expand Down
8 changes: 4 additions & 4 deletions src/stream/src/executor_v2/mview/materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,15 +227,15 @@ mod tests {
// First stream chunk. We check the existence of (3) -> (3,6)
match materialize_executor.next().await.transpose().unwrap() {
Some(Message::Barrier(_)) => {
// Simply assert there is 3 rows (6 elements) in state store instead of doing full
// comparison
// Simply assert there is 3 rows (9 elements) in state store instead of doing full
// comparison, each row contains one sentinel cell.
assert_eq!(
memory_state_store
.scan::<_, Vec<u8>>(.., None, u64::MAX)
.await
.unwrap()
.len(),
6
9
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use get_row API to refactor this test case now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also update comments above

);

// FIXME(Bugen): restore this test by using new `RowTable` interface
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we can restore these tests with get_row.

Expand All @@ -261,7 +261,7 @@ mod tests {
.await
.unwrap()
.len(),
6
9
);

// FIXME(Bugen): restore this test by using new `RowTable` interface
Expand Down
6 changes: 3 additions & 3 deletions src/stream/src/executor_v2/mview/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,13 @@ mod tests {

state.flush(epoch).await.unwrap();
let data = keyspace.scan(None, epoch).await.unwrap();
// cell-based storage has 4 cells
assert_eq!(data.len(), 4);
// cell-based storage has 6 cells
assert_eq!(data.len(), 6);
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved

epoch += 1;
state.delete(Row(vec![Some(3_i32.into())]));
state.flush(epoch).await.unwrap();
let data = keyspace.scan(None, epoch).await.unwrap();
assert_eq!(data.len(), 2);
assert_eq!(data.len(), 3);
}
}
Loading