Skip to content

Commit

Permalink
fix: do not store null value (#642)
Browse files Browse the repository at this point in the history
* fix: do not store null value

* revision

* add test

* fix clippy
  • Loading branch information
lmatz authored Mar 3, 2022
1 parent ccd54fa commit c49c9aa
Show file tree
Hide file tree
Showing 10 changed files with 363 additions and 165 deletions.
76 changes: 65 additions & 11 deletions rust/common/src/util/ordered/serde.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::cmp::Reverse;

use bytes::BufMut;
use itertools::Itertools;
use memcomparable::from_slice;

use super::OrderedDatum::{NormalOrder, ReversedOrder};
use super::OrderedRow;
Expand All @@ -14,6 +14,9 @@ use crate::types::{
};
use crate::util::sort_util::{OrderPair, OrderType};

/// The special `cell_id` reserved for a whole null row is -1.
pub const NULL_ROW_SPECIAL_CELL_ID: i32 = -1;

/// We can use memcomparable serialization to serialize data
/// and flip the bits if the order of that datum is descending.
/// As this is normally used for sorted keys, deserialization is
Expand Down Expand Up @@ -98,23 +101,68 @@ impl OrderedRowDeserializer {
}
}

pub fn serialize_pk(pk: &Row, serializer: &OrderedRowSerializer) -> Result<Vec<u8>> {
type KeyBytes = Vec<u8>;
type ValueBytes = Vec<u8>;

pub fn serialize_pk_and_row(
pk_buf: &[u8],
row: &Option<Row>,
column_ids: &[ColumnId],
) -> Result<Vec<(KeyBytes, Option<ValueBytes>)>> {
if let Some(values) = row.as_ref() {
assert_eq!(values.0.len(), column_ids.len());
}
let mut result = vec![];
serializer.serialize(pk, &mut result);
let mut all_null = true;
for (index, column_id) in column_ids.iter().enumerate() {
let key = [pk_buf, serialize_column_id(column_id)?.as_slice()].concat();
match row {
Some(values) => match &values[index] {
None => {
// This is when the datum is null. If all the datum in a row is null,
// we serialize this null row specially by only using one cell encoding.
}
datum => {
all_null = false;
let value = serialize_cell(datum)?;
result.push((key, Some(value)));
}
},
None => {
// A `None` of row means deleting that row, while the a `None` of datum represents a
// null.
all_null = false;
result.push((key, None));
}
}
}
if all_null {
// Here we use a special column id -1 to represent a row consisting of all null values.
// `MViewTable` has a `get` interface which accepts a cell id. A null row in this case
// would return null datum as it has only a single cell with column id == -1 and `get`
// gets nothing.
let key = [
pk_buf,
serialize_column_id(&ColumnId::from(NULL_ROW_SPECIAL_CELL_ID))?.as_slice(),
]
.concat();
let value = serialize_cell(&None)?;
result.push((key, Some(value)));
}
Ok(result)
}

// TODO(eric): deprecated. Remove when possible
pub fn serialize_cell_idx(cell_idx: i32) -> Result<Vec<u8>> {
let mut buf = Vec::with_capacity(4);
buf.put_i32(cell_idx);
debug_assert_eq!(buf.len(), 4);
Ok(buf)
pub fn serialize_pk(pk: &Row, serializer: &OrderedRowSerializer) -> Result<Vec<u8>> {
let mut result = vec![];
serializer.serialize(pk, &mut result);
Ok(result)
}

pub fn serialize_column_id(column_id: &ColumnId) -> Result<Vec<u8>> {
let mut buf = Vec::with_capacity(4);
buf.put_i32(column_id.get_id());
use serde::Serialize;
let mut serializer = memcomparable::Serializer::new(vec![]);
column_id.get_id().serialize(&mut serializer)?;
let buf = serializer.into_inner();
debug_assert_eq!(buf.len(), 4);
Ok(buf)
}
Expand Down Expand Up @@ -155,6 +203,12 @@ pub fn deserialize_cell(bytes: &[u8], ty: &DataType) -> Result<Datum> {
}
}

pub fn deserialize_column_id(bytes: &[u8]) -> Result<i32> {
assert_eq!(bytes.len(), 4);
let column_id = from_slice::<i32>(bytes)?;
Ok(column_id)
}

fn deserialize_decimal(bytes: &[u8]) -> Result<Datum> {
// None denotes NULL which is a valid value while Err means invalid encoding.
let null_tag = bytes[0];
Expand Down
124 changes: 124 additions & 0 deletions rust/storage/src/cell_based_row_deserializer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
use bytes::Bytes;
use risingwave_common::array::Row;
use risingwave_common::error::Result;
use risingwave_common::types::Datum;
use risingwave_common::util::ordered::{
deserialize_cell, deserialize_column_id, NULL_ROW_SPECIAL_CELL_ID,
};

use crate::TableColumnDesc;

#[derive(Clone)]
pub struct CellBasedRowDeserializer {
table_column_descs: Vec<TableColumnDesc>,
data: Vec<Datum>,
/// `CellBasedRowDeserializer` does not deserialize pk itself. We need to take the key in as
/// we have to know the cell id of each datum. So `pk_bytes` serves as an additional check
/// which should also be done on the caller side.
pk_bytes: Option<Vec<u8>>,
}

impl CellBasedRowDeserializer {
pub fn new(table_column_descs: Vec<TableColumnDesc>) -> Self {
let num_cells = table_column_descs.len();
Self {
table_column_descs,
data: vec![None; num_cells],
pk_bytes: None,
}
}

/// When we encounter a new key, we can be sure that the previous row has been fully
/// deserialized. Then we return the key and the value of the previous row.
pub fn deserialize(
&mut self,
pk_with_cell_id: &Bytes,
cell: &Bytes,
) -> Result<Option<(Vec<u8>, Row)>> {
let pk_with_cell_id = pk_with_cell_id.to_vec();
let pk_vec_len = pk_with_cell_id.len();
let cur_pk_bytes = &pk_with_cell_id[0..pk_vec_len - 4];
let mut result = None;
if let Some(prev_pk_bytes) = &self.pk_bytes && prev_pk_bytes != cur_pk_bytes {
result = self.take();
self.pk_bytes = Some(cur_pk_bytes.to_vec());
} else if self.pk_bytes.is_none() {
self.pk_bytes = Some(cur_pk_bytes.to_vec());
}

let cell_id_bytes = &pk_with_cell_id[pk_vec_len - 4..];
let cell_id = deserialize_column_id(cell_id_bytes)?;
if cell_id == NULL_ROW_SPECIAL_CELL_ID {
// do nothing
} else {
// FIXME: generate a mapping from column_id or cell_id to index.
let data_type = &self.table_column_descs[cell_id as usize].data_type;
let datum = deserialize_cell(cell, data_type)?;
assert!(self.data.get(cell_id as usize).unwrap().is_none());
*self.data.get_mut(cell_id as usize).unwrap() = datum;
}
Ok(result)
}

pub fn take(&mut self) -> Option<(Vec<u8>, Row)> {
let cur_pk_bytes = self.pk_bytes.take();
cur_pk_bytes.map(|bytes| {
let ret = self.data.iter_mut().map(std::mem::take).collect::<Vec<_>>();
(bytes, Row(ret))
})
}
}

#[cfg(test)]
mod tests {
use bytes::Bytes;
use risingwave_common::array::Row;
use risingwave_common::catalog::ColumnId;
use risingwave_common::types::{DataType, ScalarImpl};
use risingwave_common::util::ordered::serialize_pk_and_row;

use crate::cell_based_row_deserializer::CellBasedRowDeserializer;
use crate::TableColumnDesc;

#[test]
fn test_cell_based_deserializer() {
let column_ids = vec![ColumnId::from(0), ColumnId::from(1), ColumnId::from(2)];
let table_column_descs = vec![
TableColumnDesc::unnamed(column_ids[0], DataType::Char),
TableColumnDesc::unnamed(column_ids[1], DataType::Int32),
TableColumnDesc::unnamed(column_ids[2], DataType::Int64),
];
let pk1 = vec![0u8, 0u8, 0u8, 0u8];
let pk2 = vec![0u8, 0u8, 0u8, 1u8];
let pk3 = vec![0u8, 0u8, 0u8, 2u8];
let row1 = Row(vec![
Some(ScalarImpl::Utf8("abc".to_string())),
None,
Some(ScalarImpl::Int64(1500)),
]);
let row2 = Row(vec![None, None, None]);
let row3 = Row(vec![
None,
Some(ScalarImpl::Int32(2020)),
Some(ScalarImpl::Int64(2021)),
]);
let bytes1 = serialize_pk_and_row(&pk1, &Some(row1.clone()), &column_ids).unwrap();
let bytes2 = serialize_pk_and_row(&pk2, &Some(row2.clone()), &column_ids).unwrap();
let bytes3 = serialize_pk_and_row(&pk3, &Some(row3.clone()), &column_ids).unwrap();
let bytes = [bytes1, bytes2, bytes3].concat();

let mut result = vec![];
let mut deserializer = CellBasedRowDeserializer::new(table_column_descs);
for (key_bytes, value_bytes) in bytes {
let pk_and_row = deserializer
.deserialize(&Bytes::from(key_bytes), &Bytes::from(value_bytes.unwrap()))
.unwrap();
if let Some(pk_and_row) = pk_and_row {
result.push(pk_and_row.1);
}
}
let pk_and_row = deserializer.take();
result.push(pk_and_row.unwrap().1);
assert_eq!(vec![row1, row2, row3], result);
}
}
1 change: 1 addition & 0 deletions rust/storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use risingwave_common::types::DataType;
use risingwave_common::util::sort_util::OrderType;

pub mod cell_based_row_deserializer;
pub mod hummock;
pub mod keyspace;
pub mod memory;
Expand Down
50 changes: 19 additions & 31 deletions rust/storage/src/table/mview.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use risingwave_common::util::ordered::*;
use risingwave_common::util::sort_util::OrderType;

use super::TableIterRef;
use crate::cell_based_row_deserializer::CellBasedRowDeserializer;
use crate::table::{ScannableTable, TableIter};
use crate::{Keyspace, StateStore, TableColumnDesc};

Expand Down Expand Up @@ -99,6 +100,7 @@ impl<S: StateStore> MViewTable<S> {
async fn iter(&self, epoch: u64) -> Result<MViewTableIter<S>> {
MViewTableIter::new(
self.keyspace.clone(),
self.column_descs.clone(),
self.schema.clone(),
self.pk_columns.clone(),
epoch,
Expand All @@ -118,7 +120,8 @@ impl<S: StateStore> MViewTable<S> {
.get(
&[
&serialize_pk(&pk, &self.sort_key_serializer)?[..],
&serialize_cell_idx(cell_idx as i32)?[..],
&serialize_column_id(&ColumnId::from(cell_idx as i32))?
// &serialize_cell_idx(cell_idx as i32)?[..],
]
.concat(),
epoch,
Expand All @@ -139,6 +142,8 @@ impl<S: StateStore> MViewTable<S> {

pub struct MViewTableIter<S: StateStore> {
keyspace: Keyspace<S>,
#[allow(dead_code)]
// TODO: MViewTableIter will be discarded later?
schema: Schema,
// TODO: why pk_columns is not used??
#[allow(dead_code)]
Expand All @@ -153,6 +158,8 @@ pub struct MViewTableIter<S: StateStore> {
err_msg: Option<String>,
/// A epoch representing the read snapshot
epoch: u64,
/// Cell-based row deserializer
cell_based_row_deserializer: CellBasedRowDeserializer,
}

impl<'a, S: StateStore> MViewTableIter<S> {
Expand All @@ -161,12 +168,15 @@ impl<'a, S: StateStore> MViewTableIter<S> {

async fn new(
keyspace: Keyspace<S>,
table_descs: Vec<TableColumnDesc>,
schema: Schema,
pk_columns: Vec<usize>,
epoch: u64,
) -> Result<Self> {
keyspace.state_store().wait_epoch(epoch).await;

let cell_based_row_deserializer = CellBasedRowDeserializer::new(table_descs);

let iter = Self {
keyspace,
schema,
Expand All @@ -176,6 +186,7 @@ impl<'a, S: StateStore> MViewTableIter<S> {
done: false,
err_msg: None,
epoch,
cell_based_row_deserializer,
};
Ok(iter)
}
Expand Down Expand Up @@ -216,9 +227,6 @@ impl<S: StateStore> TableIter for MViewTableIter<S> {
}
}

let mut pk_buf = vec![];
let mut restored = 0;
let mut row = vec![];
loop {
let (key, value) = match self.buf.get(self.next_idx) {
Some(kv) => kv,
Expand All @@ -227,24 +235,13 @@ impl<S: StateStore> TableIter for MViewTableIter<S> {
self.consume_more().await?;
if let Some(item) = self.buf.first() {
item
} else if restored == 0 {
// No more items
self.done = true;
return Ok(None);
} else {
// current item is incomplete
let pk_and_row = self.cell_based_row_deserializer.take();
self.done = true;
self.err_msg = Some(String::from("incomplete item"));
return Err(ErrorCode::InternalError(
self.err_msg.as_ref().unwrap().clone(),
)
.into());
return Ok(pk_and_row.map(|(_pk, row)| row));
}
}
};

self.next_idx += 1;

tracing::trace!(
target: "events::stream::mview::scan",
"mview scanned key = {:?}, value = {:?}",
Expand All @@ -257,22 +254,13 @@ impl<S: StateStore> TableIter for MViewTableIter<S> {
return Err(ErrorCode::InternalError("corrupted key".to_owned()).into());
}

let cur_pk_buf = &key[self.keyspace.key().len()..key.len() - 4];
if restored == 0 {
pk_buf = cur_pk_buf.to_owned();
} else if pk_buf != cur_pk_buf {
return Err(ErrorCode::InternalError("primary key incorrect".to_owned()).into());
}

let datum = deserialize_cell(&value[..], &self.schema.data_types()[restored])?;
row.push(datum);

restored += 1;
if restored == self.schema.len() {
break;
let pk_and_row = self.cell_based_row_deserializer.deserialize(key, value)?;
self.next_idx += 1;
match pk_and_row {
Some(_) => return Ok(pk_and_row.map(|(_pk, row)| row)),
None => {}
}
}
Ok(Some(Row::new(row)))
}
}

Expand Down
Loading

0 comments on commit c49c9aa

Please sign in to comment.