Skip to content

Commit

Permalink
feat(row-based encoding): introduce row-based encoding by using value…
Browse files Browse the repository at this point in the history
… encoding (#3835)

* introduce row-based encoding 1.0
  • Loading branch information
wcy-fdu authored Jul 13, 2022
1 parent 1966c93 commit 8231cf1
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 24 deletions.
24 changes: 0 additions & 24 deletions src/common/src/array/data_chunk_iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,16 @@
use std::hash::{BuildHasher, Hash, Hasher};
use std::ops;

use bytes::Buf;
use itertools::Itertools;

use super::column::Column;
use crate::array::DataChunk;
use crate::error::Result as RwResult;
use crate::hash::HashCode;
use crate::types::{
deserialize_datum_from, deserialize_datum_not_null_from, hash_datum, serialize_datum_into,
serialize_datum_not_null_into, DataType, Datum, DatumRef, ToOwnedDatum,
};
use crate::util::sort_util::OrderType;
use crate::util::value_encoding::{deserialize_datum, serialize_datum};

impl DataChunk {
/// Get an iterator for visible rows.
pub fn rows(&self) -> impl Iterator<Item = RowRef> {
Expand Down Expand Up @@ -305,17 +301,6 @@ impl Row {
Ok(serializer.into_inner())
}

/// Serialize the row into a value encode bytes.
///
/// All values are nullable. Each value will have 1 extra byte to indicate whether it is null.
pub fn value_encode(&self) -> RwResult<Vec<u8>> {
let mut vec = vec![];
for v in &self.0 {
vec.extend(serialize_datum(v)?);
}
Ok(vec)
}

/// Return number of cells in the row.
pub fn size(&self) -> usize {
self.0.len()
Expand Down Expand Up @@ -405,15 +390,6 @@ impl RowDeserializer {
let datum = deserialize_datum_from(&self.data_types[datum_idx], &mut deserializer)?;
Ok(datum)
}

/// Deserialize the row from a value encoding bytes.
pub fn value_decode(&self, mut data: impl Buf) -> RwResult<Row> {
let mut values = Vec::with_capacity(self.data_types.len());
for ty in &self.data_types {
values.push(deserialize_datum(&mut data, ty)?);
}
Ok(Row(values))
}
}

#[cfg(test)]
Expand Down
2 changes: 2 additions & 0 deletions src/storage/src/encoding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ pub mod cell_based_row_deserializer;
pub mod cell_based_row_serializer;
pub mod dedup_pk_cell_based_row_deserializer;
pub mod dedup_pk_cell_based_row_serializer;
pub mod row_based_deserializer;
pub mod row_based_serializer;

pub type KeyBytes = Vec<u8>;
pub type ValueBytes = Vec<u8>;
Expand Down
115 changes: 115 additions & 0 deletions src/storage/src/encoding/row_based_deserializer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright 2022 Singularity Data
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use bytes::Buf;
use risingwave_common::array::Row;
use risingwave_common::error::Result;
use risingwave_common::types::DataType;
use risingwave_common::util::value_encoding::deserialize_datum;

#[derive(Clone)]
pub struct RowBasedDeserializer {
data_types: Vec<DataType>,
}

impl RowBasedDeserializer {
pub fn new(data_types: Vec<DataType>) -> Self {
Self { data_types }
}

/// Deserialize bytes into a row.
pub fn deserialize(&self, mut row: impl Buf) -> Result<Row> {
// value encoding
let mut values = Vec::with_capacity(self.data_types.len());
for ty in &self.data_types {
values.push(deserialize_datum(&mut row, ty)?);
}
Ok(Row(values))
}
}

#[cfg(test)]
mod tests {
use risingwave_common::array::Row;
use risingwave_common::types::{DataType, IntervalUnit, ScalarImpl};

use crate::encoding::row_based_deserializer::RowBasedDeserializer;
use crate::encoding::row_based_serializer::RowBasedSerializer;

#[test]
fn test_row_based_serialize_and_deserialize_not_null() {
let row = Row(vec![
Some(ScalarImpl::Utf8("string".into())),
Some(ScalarImpl::Bool(true)),
Some(ScalarImpl::Int16(1)),
Some(ScalarImpl::Int32(2)),
Some(ScalarImpl::Int64(3)),
Some(ScalarImpl::Float32(4.0.into())),
Some(ScalarImpl::Float64(5.0.into())),
Some(ScalarImpl::Decimal("-233.3".parse().unwrap())),
Some(ScalarImpl::Interval(IntervalUnit::new(7, 8, 9))),
]);
let mut se = RowBasedSerializer::new();
let bytes = se.serialize(&row).unwrap();
// each cell will add a null_tag (u8)
assert_eq!(bytes.len(), 11 + 2 + 3 + 5 + 9 + 5 + 9 + 17 + 17);

let de = RowBasedDeserializer::new(vec![
DataType::Varchar,
DataType::Boolean,
DataType::Int16,
DataType::Int32,
DataType::Int64,
DataType::Float32,
DataType::Float64,
DataType::Decimal,
DataType::Interval,
]);
let row1 = de.deserialize(&*bytes).unwrap();
assert_eq!(row, row1);
}

#[test]
fn test_row_based_serialize_and_deserialize_with_null() {
let row = Row(vec![
Some(ScalarImpl::Utf8("string".into())),
Some(ScalarImpl::Bool(true)),
Some(ScalarImpl::Int16(1)),
Some(ScalarImpl::Int32(2)),
Some(ScalarImpl::Int64(3)),
Some(ScalarImpl::Float32(4.0.into())),
Some(ScalarImpl::Float64(5.0.into())),
Some(ScalarImpl::Decimal("-233.3".parse().unwrap())),
Some(ScalarImpl::Interval(IntervalUnit::new(7, 8, 9))),
]);
let mut se = RowBasedSerializer::new();
let bytes = se.serialize(&row).unwrap();
// each cell will add a is_none flag (u8)
assert_eq!(bytes.len(), 11 + 2 + 3 + 5 + 9 + 5 + 9 + 17 + 17);

let de = RowBasedDeserializer::new(vec![
DataType::Varchar,
DataType::Boolean,
DataType::Int16,
DataType::Int32,
DataType::Int64,
DataType::Float32,
DataType::Float64,
DataType::Decimal,
DataType::Interval,
]);
let row1 = de.deserialize(&*bytes).unwrap();
assert_eq!(row, row1);
}
}
43 changes: 43 additions & 0 deletions src/storage/src/encoding/row_based_serializer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2022 Singularity Data
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::array::Row;
use risingwave_common::error::Result;
use risingwave_common::util::value_encoding::serialize_datum;
type ValueBytes = Vec<u8>;

#[derive(Clone)]
pub struct RowBasedSerializer {}

impl RowBasedSerializer {
pub fn new() -> Self {
Self {}
}

/// Serialize the row into a value encode bytes.
/// All values are nullable. Each value will have 1 extra byte to indicate whether it is null.
pub fn serialize(&mut self, row: &Row) -> Result<ValueBytes> {
let mut res = vec![];
for cell in &row.0 {
res.extend(serialize_datum(cell)?);
}
Ok(res)
}
}

impl Default for RowBasedSerializer {
fn default() -> Self {
Self::new()
}
}

0 comments on commit 8231cf1

Please sign in to comment.