Skip to content

Commit

Permalink
feat(storage): support deserialize_prefix_len for prefix_bloom_filter (
Browse files Browse the repository at this point in the history
…#3807)

* feat(storage): support deserialize_prefix_len for prefix_bloom_filter

* fix(storage): fix deserialize_prefix unit-test

* chore(storage): add doc for encoding_data_size

* fix(storage): fix encoding_data_size to return truely length in storage_encoding
  • Loading branch information
Li0k authored Jul 13, 2022
1 parent edc6660 commit ca49f6d
Show file tree
Hide file tree
Showing 3 changed files with 389 additions and 2 deletions.
49 changes: 49 additions & 0 deletions src/common/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,55 @@ impl ScalarImpl {
})
}

/// Deserialize the `data_size` of `input_data_type` in `storage_encoding`. This function will
/// consume the offset of deserializer then return the length (without memcopy, only length
/// calculation). The difference between `encoding_data_size` and `ScalarImpl::data_size` is
/// that `ScalarImpl::data_size` calculates the `memory_length` of type instead of
/// `storage_encoding`
pub fn encoding_data_size(
data_type: &DataType,
deserializer: &mut memcomparable::Deserializer<impl Buf>,
) -> memcomparable::Result<usize> {
let base_position = deserializer.position();
let null_tag = u8::deserialize(&mut *deserializer)?;
match null_tag {
0 => {}
1 => {
use std::mem::size_of;
let len = match data_type {
DataType::Int16 => size_of::<i16>(),
DataType::Int32 => size_of::<i32>(),
DataType::Int64 => size_of::<i64>(),
DataType::Float32 => size_of::<OrderedF32>(),
DataType::Float64 => size_of::<OrderedF64>(),
DataType::Date => size_of::<NaiveDateWrapper>(),
DataType::Time => size_of::<NaiveTimeWrapper>(),
DataType::Timestamp => size_of::<NaiveDateTimeWrapper>(),
DataType::Timestampz => size_of::<NaiveDateTimeWrapper>(),
DataType::Boolean => size_of::<u8>(),
DataType::Interval => size_of::<IntervalUnit>(),

DataType::Decimal => deserializer.read_decimal_len()?,
DataType::List { .. } | DataType::Struct { .. } => {
// these two types is var-length and should only be determine at runtime.
// TODO: need some test for this case (e.g. e2e test)
deserializer.read_struct_and_list_len()?
}
DataType::Varchar => deserializer.read_bytes_len()?,
};

// consume offset of fixed_type
if deserializer.position() == base_position + 1 {
// fixed type
deserializer.advance(len);
}
}
_ => return Err(memcomparable::Error::InvalidTagEncoding(null_tag as _)),
}

Ok(deserializer.position() - base_position)
}

pub fn to_protobuf(&self) -> Vec<u8> {
let body = match self {
ScalarImpl::Int16(v) => v.to_be_bytes().to_vec(),
Expand Down
255 changes: 254 additions & 1 deletion src/common/src/util/ordered/serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,14 +154,38 @@ impl OrderedRowDeserializer {
pub fn get_order_types(&self) -> &[OrderType] {
&self.order_types
}

pub fn deserialize_prefix_len_with_column_indices(
&self,
key: &[u8],
column_indices: Vec<usize>,
) -> memcomparable::Result<usize> {
use crate::types::ScalarImpl;
let mut len: usize = 0;
for index in column_indices {
let data_type = &self.data_types[index];
let order_type = &self.order_types[index];
let data = &key[len..];
let mut deserializer = memcomparable::Deserializer::new(data);
deserializer.set_reverse(*order_type == OrderType::Descending);

len += ScalarImpl::encoding_data_size(data_type, &mut deserializer)?;
}

Ok(len)
}
}

#[cfg(test)]
mod tests {
use std::str::FromStr;

use super::*;
use crate::array::{I16Array, Utf8Array};
use crate::array_nonnull;
use crate::types::ScalarImpl::{Int16, Utf8};
use crate::types::chrono_wrapper::*;
use crate::types::interval;
use crate::types::ScalarImpl::{self, *};

#[test]
fn test_ordered_row_serializer() {
Expand Down Expand Up @@ -317,4 +341,233 @@ mod tests {
);
}
}

#[test]
fn test_deserialize_with_column_indices() {
let order_types = vec![OrderType::Descending, OrderType::Ascending];
let serializer = OrderedRowSerializer::new(order_types.clone());
let schema = vec![DataType::Varchar, DataType::Int16];
let row1 = Row(vec![Some(Utf8("abc".to_string())), Some(Int16(5))]);
let rows = vec![row1.clone()];
let deserializer = OrderedRowDeserializer::new(schema, order_types);
let mut array = vec![];
for row in &rows {
let mut row_bytes = vec![];
serializer.serialize(row, &mut row_bytes);
array.push(row_bytes);
}

{
let row_0_idx_0_len = deserializer
.deserialize_prefix_len_with_column_indices(&array[0], vec![0])
.unwrap();

let schema = vec![DataType::Varchar];
let order_types = vec![OrderType::Descending];
let deserializer = OrderedRowDeserializer::new(schema, order_types.clone());
let prefix_slice = &array[0][0..row_0_idx_0_len];
assert_eq!(
deserializer.deserialize(prefix_slice).unwrap(),
OrderedRow::new(Row(vec![Some(Utf8("abc".to_string()))]), &order_types)
);
}

{
let row_0_idx_1_len = deserializer
.deserialize_prefix_len_with_column_indices(&array[0], vec![0, 1])
.unwrap();

let order_types = vec![OrderType::Descending, OrderType::Ascending];
let schema = vec![DataType::Varchar, DataType::Int16];
let deserializer = OrderedRowDeserializer::new(schema, order_types.clone());
let prefix_slice = &array[0][0..row_0_idx_1_len];
assert_eq!(
deserializer.deserialize(prefix_slice).unwrap(),
OrderedRow::new(row1, &order_types)
);
}
}

#[test]
fn test_encoding_data_size() {
use std::mem::size_of;

use crate::types::interval::IntervalUnit;
use crate::types::OrderedF64;

let order_types = vec![OrderType::Ascending];
let serializer = OrderedRowSerializer::new(order_types);

// test fixed_size
{
{
// test None
let row = Row(vec![None]);
let mut row_bytes = vec![];
serializer.serialize(&row, &mut row_bytes);
let mut deserializer = memcomparable::Deserializer::new(&row_bytes[..]);
let encoding_data_size =
ScalarImpl::encoding_data_size(&DataType::Int16, &mut deserializer).unwrap();
assert_eq!(1, encoding_data_size);
}

{
// float64
let row = Row(vec![Some(ScalarImpl::Float64(6.4.into()))]);
let mut row_bytes = vec![];
serializer.serialize(&row, &mut row_bytes);
let mut deserializer = memcomparable::Deserializer::new(&row_bytes[..]);
let encoding_data_size =
ScalarImpl::encoding_data_size(&DataType::Float64, &mut deserializer).unwrap();
let data_size = size_of::<OrderedF64>();
assert_eq!(8, data_size);
assert_eq!(1 + data_size, encoding_data_size);
}

{
// bool
let row = Row(vec![Some(ScalarImpl::Bool(false))]);
let mut row_bytes = vec![];
serializer.serialize(&row, &mut row_bytes);
let mut deserializer = memcomparable::Deserializer::new(&row_bytes[..]);
let encoding_data_size =
ScalarImpl::encoding_data_size(&DataType::Boolean, &mut deserializer).unwrap();

let data_size = size_of::<u8>();
assert_eq!(1, data_size);
assert_eq!(1 + data_size, encoding_data_size);
}

{
// ts
let row = Row(vec![Some(ScalarImpl::NaiveDateTime(
NaiveDateTimeWrapper::default(),
))]);
let mut row_bytes = vec![];
serializer.serialize(&row, &mut row_bytes);
let mut deserializer = memcomparable::Deserializer::new(&row_bytes[..]);
let encoding_data_size =
ScalarImpl::encoding_data_size(&DataType::Timestamp, &mut deserializer)
.unwrap();
let data_size = size_of::<NaiveDateTimeWrapper>();
assert_eq!(12, data_size);
assert_eq!(1 + data_size, encoding_data_size);
}

{
// interval
let row = Row(vec![Some(ScalarImpl::Interval(
interval::IntervalUnit::default(),
))]);
let mut row_bytes = vec![];
serializer.serialize(&row, &mut row_bytes);
let mut deserializer = memcomparable::Deserializer::new(&row_bytes[..]);
let encoding_data_size =
ScalarImpl::encoding_data_size(&DataType::Interval, &mut deserializer).unwrap();
let data_size = size_of::<IntervalUnit>();
assert_eq!(16, data_size);
assert_eq!(1 + data_size, encoding_data_size);
}
}

{
// test dynamic_size
{
// test decimal
pub use crate::types::decimal::Decimal;

{
let d = Decimal::from_str("41721.900909090909090909090909").unwrap();
let row = Row(vec![Some(ScalarImpl::Decimal(d))]);
let mut row_bytes = vec![];
serializer.serialize(&row, &mut row_bytes);
let mut deserializer = memcomparable::Deserializer::new(&row_bytes[..]);
let encoding_data_size =
ScalarImpl::encoding_data_size(&DataType::Decimal, &mut deserializer)
.unwrap();
// [nulltag, flag, decimal_chunk, 0]
assert_eq!(18, encoding_data_size);
}

{
let d = Decimal::from_str("1").unwrap();
let row = Row(vec![Some(ScalarImpl::Decimal(d))]);
let mut row_bytes = vec![];
serializer.serialize(&row, &mut row_bytes);
let mut deserializer = memcomparable::Deserializer::new(&row_bytes[..]);
let encoding_data_size =
ScalarImpl::encoding_data_size(&DataType::Decimal, &mut deserializer)
.unwrap();
// [nulltag, flag, decimal_chunk, 0]
assert_eq!(4, encoding_data_size);
}

{
let d = Decimal::from_str("inf").unwrap();
let row = Row(vec![Some(ScalarImpl::Decimal(d))]);
let mut row_bytes = vec![];
serializer.serialize(&row, &mut row_bytes);
let mut deserializer = memcomparable::Deserializer::new(&row_bytes[..]);
let encoding_data_size =
ScalarImpl::encoding_data_size(&DataType::Decimal, &mut deserializer)
.unwrap();

assert_eq!(3, encoding_data_size); // [1, 35, 0]
}

{
let d = Decimal::from_str("nan").unwrap();
let row = Row(vec![Some(ScalarImpl::Decimal(d))]);
let mut row_bytes = vec![];
serializer.serialize(&row, &mut row_bytes);
let mut deserializer = memcomparable::Deserializer::new(&row_bytes[..]);
let encoding_data_size =
ScalarImpl::encoding_data_size(&DataType::Decimal, &mut deserializer)
.unwrap();
assert_eq!(3, encoding_data_size); // [1, 6, 0]
}

{
// TODO(test list / struct)
}

{
// test varchar
let varchar = "abcdefghijklmn";
let row = Row(vec![Some(Utf8(varchar.to_string()))]);
let mut row_bytes = vec![];
serializer.serialize(&row, &mut row_bytes);
let mut deserializer = memcomparable::Deserializer::new(&row_bytes[..]);
let encoding_data_size =
ScalarImpl::encoding_data_size(&DataType::Varchar, &mut deserializer)
.unwrap();
// [1, 1, 97, 98, 99, 100, 101, 102, 103, 104, 9, 105, 106, 107, 108, 109, 110,
// 0, 0, 6]
assert_eq!(6 + varchar.len(), encoding_data_size);
}

{
{
// test varchar Descending
let order_types = vec![OrderType::Descending];
let serializer = OrderedRowSerializer::new(order_types);
let varchar = "abcdefghijklmnopq";
let row = Row(vec![Some(Utf8(varchar.to_string()))]);
let mut row_bytes = vec![];
serializer.serialize(&row, &mut row_bytes);
let mut deserializer = memcomparable::Deserializer::new(&row_bytes[..]);
deserializer.set_reverse(true);
let encoding_data_size =
ScalarImpl::encoding_data_size(&DataType::Varchar, &mut deserializer)
.unwrap();

// [254, 254, 158, 157, 156, 155, 154, 153, 152, 151, 246, 150, 149, 148,
// 147, 146, 145, 144, 143, 246, 142, 255, 255, 255, 255, 255, 255, 255,
// 254]
assert_eq!(12 + varchar.len(), encoding_data_size);
}
}
}
}
}
}
Loading

0 comments on commit ca49f6d

Please sign in to comment.