Skip to content

Commit

Permalink
feat(batch): apply row-based encoding for materialize executor and ba…
Browse files Browse the repository at this point in the history
…tch mode (risingwavelabs#4335)

* x

* row-based mv and batch

* remove redudant code

* fmt happy

* fmt happy

* doc

* remove clone

* refactor

* fix column_mapping

* update comments

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
2 people authored and nasnoisaac committed Aug 9, 2022
1 parent be4fd63 commit 5f046c7
Show file tree
Hide file tree
Showing 15 changed files with 307 additions and 153 deletions.
23 changes: 7 additions & 16 deletions src/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,16 @@ use futures_async_stream::try_stream;
use itertools::Itertools;
use risingwave_common::array::{DataChunk, Row};
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::{ColumnDesc, ColumnId, OrderedColumnDesc, Schema, TableId};
use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema, TableId};
use risingwave_common::error::{Result, RwError};
use risingwave_common::types::{DataType, Datum, ScalarImpl};
use risingwave_common::util::select_all;
use risingwave_common::util::sort_util::OrderType;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::{scan_range, ScanRange};
use risingwave_pb::plan_common::{CellBasedTableDesc, OrderType as ProstOrderType};
use risingwave_storage::row_serde::CellBasedRowSerde;
use risingwave_storage::table::storage_table::{BatchDedupPkIter, StorageTable, StorageTableIter};
use risingwave_storage::row_serde::RowBasedSerde;
use risingwave_storage::table::storage_table::{RowBasedStorageTable, StorageTableIter};
use risingwave_storage::table::{Distribution, TableIter};
use risingwave_storage::{dispatch_state_store, Keyspace, StateStore, StateStoreImpl};

Expand All @@ -49,8 +49,8 @@ pub struct RowSeqScanExecutor<S: StateStore> {
}

pub enum ScanType<S: StateStore> {
TableScan(BatchDedupPkIter<S, CellBasedRowSerde>),
RangeScan(StorageTableIter<S, CellBasedRowSerde>),
TableScan(StorageTableIter<S, RowBasedSerde>),
RangeScan(StorageTableIter<S, RowBasedSerde>),
PointGet(Option<Row>),
}

Expand Down Expand Up @@ -156,15 +156,6 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder {
.map(ColumnId::from)
.collect();

// TODO: remove this
let pk_descs = table_desc
.order_key
.iter()
.map(|order| OrderedColumnDesc {
column_desc: column_descs[order.index as usize].clone(),
order: OrderType::from_prost(&ProstOrderType::from_i32(order.order_type).unwrap()),
})
.collect_vec();
let pk_types = table_desc
.order_key
.iter()
Expand Down Expand Up @@ -203,7 +194,7 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder {

dispatch_state_store!(source.context().try_get_state_store()?, state_store, {
let batch_stats = source.context().stats();
let table = StorageTable::new_partial(
let table = RowBasedStorageTable::new_partial(
state_store.clone(),
table_id,
column_descs,
Expand All @@ -215,7 +206,7 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder {
let keyspace = Keyspace::table_root(state_store.clone(), &table_id);

if seq_scan_node.scan_ranges.is_empty() {
let iter = table.batch_dedup_pk_iter(source.epoch, &pk_descs).await?;
let iter = table.batch_iter(source.epoch).await?;
return Ok(Box::new(RowSeqScanExecutor::new(
table.schema().clone(),
vec![ScanType::TableScan(iter)],
Expand Down
51 changes: 9 additions & 42 deletions src/compute/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use risingwave_batch::executor::{
RowSeqScanExecutor, ScanType,
};
use risingwave_common::array::{Array, DataChunk, F64Array, I64Array, Row};
use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, OrderedColumnDesc, Schema, TableId};
use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableId};
use risingwave_common::column_nonnull;
use risingwave_common::error::{Result, RwError};
use risingwave_common::test_prelude::DataChunkTestExt;
Expand All @@ -36,8 +36,8 @@ use risingwave_pb::data::data_type::TypeName;
use risingwave_pb::plan_common::ColumnDesc as ProstColumnDesc;
use risingwave_source::{MemSourceManager, SourceManager};
use risingwave_storage::memory::MemoryStateStore;
use risingwave_storage::table::state_table::StateTable;
use risingwave_storage::table::storage_table::StorageTable;
use risingwave_storage::table::state_table::RowBasedStateTable;
use risingwave_storage::table::storage_table::RowBasedStorageTable;
use risingwave_storage::Keyspace;
use risingwave_stream::executor::monitor::StreamingMetrics;
use risingwave_stream::executor::{
Expand Down Expand Up @@ -200,30 +200,17 @@ async fn test_table_v2_materialize() -> Result<()> {
.collect_vec();

// Since we have not polled `Materialize`, we cannot scan anything from this table
let table = StorageTable::new_for_test(
let table = RowBasedStorageTable::new_for_test(
memory_state_store.clone(),
source_table_id,
column_descs.clone(),
vec![OrderType::Ascending],
vec![0],
);

let ordered_column_descs: Vec<OrderedColumnDesc> = column_descs
.iter()
.take(1)
.map(|d| OrderedColumnDesc {
column_desc: d.clone(),
order: OrderType::Ascending,
})
.collect();

let scan = Box::new(RowSeqScanExecutor::new(
table.schema().clone(),
vec![ScanType::TableScan(
table
.batch_dedup_pk_iter(u64::MAX, &ordered_column_descs)
.await?,
)],
vec![ScanType::TableScan(table.batch_iter(u64::MAX).await?)],
1024,
"RowSeqExecutor2".to_string(),
Arc::new(BatchMetrics::unused()),
Expand Down Expand Up @@ -281,11 +268,7 @@ async fn test_table_v2_materialize() -> Result<()> {
// Scan the table again, we are able to get the data now!
let scan = Box::new(RowSeqScanExecutor::new(
table.schema().clone(),
vec![ScanType::TableScan(
table
.batch_dedup_pk_iter(u64::MAX, &ordered_column_descs)
.await?,
)],
vec![ScanType::TableScan(table.batch_iter(u64::MAX).await?)],
1024,
"RowSeqScanExecutor2".to_string(),
Arc::new(BatchMetrics::unused()),
Expand Down Expand Up @@ -352,11 +335,7 @@ async fn test_table_v2_materialize() -> Result<()> {
// Scan the table again, we are able to see the deletion now!
let scan = Box::new(RowSeqScanExecutor::new(
table.schema().clone(),
vec![ScanType::TableScan(
table
.batch_dedup_pk_iter(u64::MAX, &ordered_column_descs)
.await?,
)],
vec![ScanType::TableScan(table.batch_iter(u64::MAX).await?)],
1024,
"RowSeqScanExecutor2".to_string(),
Arc::new(BatchMetrics::unused()),
Expand Down Expand Up @@ -389,7 +368,7 @@ async fn test_row_seq_scan() -> Result<()> {
ColumnDesc::unnamed(ColumnId::from(2), schema[2].data_type.clone()),
];

let mut state = StateTable::new_without_distribution(
let mut state = RowBasedStateTable::new_without_distribution(
memory_state_store.clone(),
TableId::from(0x42),
column_descs.clone(),
Expand All @@ -416,22 +395,10 @@ async fn test_row_seq_scan() -> Result<()> {
.unwrap();
state.commit(epoch).await.unwrap();

let pk_descs: Vec<OrderedColumnDesc> = column_descs
.iter()
.take(1)
.map(|d| OrderedColumnDesc {
column_desc: d.clone(),
order: OrderType::Ascending,
})
.collect();

let executor = Box::new(RowSeqScanExecutor::new(
table.schema().clone(),
vec![ScanType::TableScan(
table
.batch_dedup_pk_iter(u64::MAX, &pk_descs)
.await
.unwrap(),
table.batch_iter(u64::MAX).await.unwrap(),
)],
1,
"RowSeqScanExecutor2".to_string(),
Expand Down
37 changes: 23 additions & 14 deletions src/storage/src/row_serde/cell_based_row_deserializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,47 +28,56 @@ use crate::row_serde::ColumnDescMapping;

#[allow(clippy::len_without_is_empty)]
impl ColumnDescMapping {
fn new_inner(output_columns: Vec<ColumnDesc>, all_data_types: Vec<DataType>) -> Arc<Self> {
let id_to_column_index = output_columns
fn new_inner(
output_columns: Vec<ColumnDesc>,
all_data_types: Vec<DataType>,
output_index: Vec<usize>,
) -> Arc<Self> {
let output_id_to_index = output_columns
.iter()
.enumerate()
.map(|(index, d)| (d.column_id, index))
.collect();

Self {
output_columns,
id_to_column_index,
output_id_to_index,
all_data_types,
output_index,
}
.into()
}

/// Create a mapping with given `output_columns`.
pub fn new(output_columns: Vec<ColumnDesc>) -> Arc<Self> {
let all_data_types = output_columns.iter().map(|d| d.data_type.clone()).collect();
Self::new_inner(output_columns, all_data_types)
let output_index: Vec<usize> = output_columns
.iter()
.map(|c| c.column_id.get_id() as usize)
.collect();
Self::new_inner(output_columns, all_data_types, output_index)
}

/// Create a mapping with given `table_columns` projected on the `column_ids`.
pub fn new_partial(table_columns: &[ColumnDesc], column_ids: &[ColumnId]) -> Arc<Self> {
pub fn new_partial(table_columns: &[ColumnDesc], output_column_ids: &[ColumnId]) -> Arc<Self> {
let all_data_types = table_columns.iter().map(|d| d.data_type.clone()).collect();

let mut table_columns = table_columns
.iter()
.map(|c| (c.column_id, c.clone()))
.enumerate()
.map(|(index, c)| (c.column_id, (c.clone(), index)))
.collect::<HashMap<_, _>>();

let output_columns = column_ids
let (output_columns, output_index): (
Vec<risingwave_common::catalog::ColumnDesc>,
Vec<usize>,
) = output_column_ids
.iter()
.map(|id| table_columns.remove(id).unwrap())
.collect();

Self::new_inner(output_columns, all_data_types)
.unzip();
Self::new_inner(output_columns, all_data_types, output_index)
}

/// Get the [`ColumnDesc`] and its index in the output with given `id`.
pub fn get(&self, id: ColumnId) -> Option<(&ColumnDesc, usize)> {
self.id_to_column_index
self.output_id_to_index
.get(&id)
.map(|&index| (&self.output_columns[index], index))
}
Expand Down
8 changes: 7 additions & 1 deletion src/storage/src/row_serde/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,17 @@ pub trait RowSerialize: Clone {
/// Record mapping from [`ColumnDesc`], [`ColumnId`], and output index of columns in a table.
#[derive(Clone)]
pub struct ColumnDescMapping {
/// output_columns are some of the columns that to be partialy scan.
pub output_columns: Vec<ColumnDesc>,

pub id_to_column_index: HashMap<ColumnId, usize>,
/// The output column's column index in output row, which is used in cell-based deserialize.
pub output_id_to_index: HashMap<ColumnId, usize>,

/// The full row data types, which is used in row-based deserialize.
pub all_data_types: Vec<DataType>,

/// The output column's column index in full row, which is used in row-based deserialize.
pub output_index: Vec<usize>,
}

/// `Decoding` defines an interface for decoding a key row from kv storage.
Expand Down
82 changes: 75 additions & 7 deletions src/storage/src/row_serde/row_based_deserializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,14 @@ impl RowBasedDeserializer {
}

let (vnode, key_bytes) = parse_raw_key_to_vnode_and_key(raw_key);
Ok(Some((
vnode,
key_bytes.to_vec(),
row_based_deserialize_inner(&self.column_mapping.all_data_types, value.as_ref())?,
)))
let mut origin_row =
row_based_deserialize_inner(&self.column_mapping.all_data_types, value.as_ref())?;

let mut output_row = Vec::with_capacity(self.column_mapping.output_index.len());
for col_idx in &self.column_mapping.output_index {
output_row.push(origin_row.0[*col_idx].take());
}
Ok(Some((vnode, key_bytes.to_vec(), Row(output_row))))
}
}

Expand All @@ -93,7 +96,7 @@ mod tests {
use crate::table::storage_table::DEFAULT_VNODE;

#[test]
fn test_row_based_serialize_and_deserialize_with_null() {
fn test_row_based_serialize_and_deserialize_full_row() {
let row = Row(vec![
Some(ScalarImpl::Utf8("string".into())),
Some(ScalarImpl::Bool(true)),
Expand All @@ -117,7 +120,7 @@ mod tests {
DataType::Decimal,
DataType::Interval,
];
let column_ids = (1..=row.size())
let column_ids = (0..=row.size() - 1)
.map(|i| ColumnId::new(i as _))
.collect_vec();

Expand All @@ -140,4 +143,69 @@ mod tests {
assert_eq!(row, row1.unwrap().2);
}
}

#[test]
fn test_row_based_serialize_and_deserialize_partial_row() {
let row = Row(vec![
Some(ScalarImpl::Utf8("string".into())),
Some(ScalarImpl::Bool(true)),
Some(ScalarImpl::Int16(1)),
Some(ScalarImpl::Int32(2)),
Some(ScalarImpl::Int64(3)),
Some(ScalarImpl::Float32(4.0.into())),
Some(ScalarImpl::Float64(5.0.into())),
Some(ScalarImpl::Decimal("-233.3".parse().unwrap())),
Some(ScalarImpl::Interval(IntervalUnit::new(7, 8, 9))),
]);

let data_types = vec![
DataType::Varchar,
DataType::Boolean,
DataType::Int16,
DataType::Int32,
DataType::Int64,
DataType::Float32,
DataType::Float64,
DataType::Decimal,
DataType::Interval,
];
let column_ids = (0..=row.size() - 1)
.map(|i| ColumnId::new(i as _))
.collect_vec();

// remove the first two columns
let output_column_ids = (2..=row.size() - 1)
.map(|i| ColumnId::new(i as _))
.collect_vec();
let column_descs = data_types
.iter()
.zip_eq(column_ids.iter())
.map(|(d, i)| ColumnDesc::unnamed(*i, d.clone()))
.collect_vec();

let mut se = RowBasedSerializer::create_row_serializer(&[], &column_descs, &column_ids);
let value_bytes = se.serialize(DEFAULT_VNODE, &[], row).unwrap();
// each cell will add a is_none flag (u8)

let mut de = RowBasedDeserializer::create_row_deserializer(ColumnDescMapping::new_partial(
&column_descs,
&output_column_ids,
));

let partial_row = Row(vec![
Some(ScalarImpl::Int16(1)),
Some(ScalarImpl::Int32(2)),
Some(ScalarImpl::Int64(3)),
Some(ScalarImpl::Float32(4.0.into())),
Some(ScalarImpl::Float64(5.0.into())),
Some(ScalarImpl::Decimal("-233.3".parse().unwrap())),
Some(ScalarImpl::Interval(IntervalUnit::new(7, 8, 9))),
]);
for (pk, value) in value_bytes {
assert_eq!(value.len(), 11 + 2 + 3 + 5 + 9 + 5 + 9 + 17 + 17);
let deser_row = de.deserialize(pk, value).unwrap();
assert_eq!(DEFAULT_VNODE, deser_row.clone().unwrap().0);
assert_eq!(partial_row, deser_row.unwrap().2);
}
}
}
Loading

0 comments on commit 5f046c7

Please sign in to comment.