Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(batch): apply row-based encoding for materialize executor and batch mode #4335

Merged
merged 13 commits into from
Aug 2, 2022
23 changes: 7 additions & 16 deletions src/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,16 @@ use futures_async_stream::try_stream;
use itertools::Itertools;
use risingwave_common::array::{DataChunk, Row};
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::{ColumnDesc, ColumnId, OrderedColumnDesc, Schema, TableId};
use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema, TableId};
use risingwave_common::error::{Result, RwError};
use risingwave_common::types::{DataType, Datum, ScalarImpl};
use risingwave_common::util::select_all;
use risingwave_common::util::sort_util::OrderType;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::{scan_range, ScanRange};
use risingwave_pb::plan_common::{CellBasedTableDesc, OrderType as ProstOrderType};
use risingwave_storage::row_serde::CellBasedRowSerde;
use risingwave_storage::table::storage_table::{BatchDedupPkIter, StorageTable, StorageTableIter};
use risingwave_storage::row_serde::RowBasedSerde;
use risingwave_storage::table::storage_table::{RowBasedStorageTable, StorageTableIter};
use risingwave_storage::table::{Distribution, TableIter};
use risingwave_storage::{dispatch_state_store, Keyspace, StateStore, StateStoreImpl};

Expand All @@ -49,8 +49,8 @@ pub struct RowSeqScanExecutor<S: StateStore> {
}

pub enum ScanType<S: StateStore> {
TableScan(BatchDedupPkIter<S, CellBasedRowSerde>),
RangeScan(StorageTableIter<S, CellBasedRowSerde>),
TableScan(StorageTableIter<S, RowBasedSerde>),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need to differentiate TableScan and RangeScan?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure whether we need to differentiate, if we have row-based dudup pk. cc @BugenZhao

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With dedup fully implemented, they should also use the same iterator. For now I think there's no need to differentiate them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will refactor this part in next PR.

RangeScan(StorageTableIter<S, RowBasedSerde>),
PointGet(Option<Row>),
}

Expand Down Expand Up @@ -156,15 +156,6 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder {
.map(ColumnId::from)
.collect();

// TODO: remove this
let pk_descs = table_desc
.order_key
.iter()
.map(|order| OrderedColumnDesc {
column_desc: column_descs[order.index as usize].clone(),
order: OrderType::from_prost(&ProstOrderType::from_i32(order.order_type).unwrap()),
})
.collect_vec();
let pk_types = table_desc
.order_key
.iter()
Expand Down Expand Up @@ -203,7 +194,7 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder {

dispatch_state_store!(source.context().try_get_state_store()?, state_store, {
let batch_stats = source.context().stats();
let table = StorageTable::new_partial(
let table = RowBasedStorageTable::new_partial(
state_store.clone(),
table_id,
column_descs,
Expand All @@ -215,7 +206,7 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder {
let keyspace = Keyspace::table_root(state_store.clone(), &table_id);

if seq_scan_node.scan_ranges.is_empty() {
let iter = table.batch_dedup_pk_iter(source.epoch, &pk_descs).await?;
let iter = table.batch_iter(source.epoch).await?;
return Ok(Box::new(RowSeqScanExecutor::new(
table.schema().clone(),
vec![ScanType::TableScan(iter)],
Expand Down
51 changes: 9 additions & 42 deletions src/compute/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use risingwave_batch::executor::{
RowSeqScanExecutor, ScanType,
};
use risingwave_common::array::{Array, DataChunk, F64Array, I64Array, Row};
use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, OrderedColumnDesc, Schema, TableId};
use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableId};
use risingwave_common::column_nonnull;
use risingwave_common::error::{Result, RwError};
use risingwave_common::test_prelude::DataChunkTestExt;
Expand All @@ -36,8 +36,8 @@ use risingwave_pb::data::data_type::TypeName;
use risingwave_pb::plan_common::ColumnDesc as ProstColumnDesc;
use risingwave_source::{MemSourceManager, SourceManager};
use risingwave_storage::memory::MemoryStateStore;
use risingwave_storage::table::state_table::StateTable;
use risingwave_storage::table::storage_table::StorageTable;
use risingwave_storage::table::state_table::RowBasedStateTable;
use risingwave_storage::table::storage_table::RowBasedStorageTable;
use risingwave_storage::Keyspace;
use risingwave_stream::executor::monitor::StreamingMetrics;
use risingwave_stream::executor::{
Expand Down Expand Up @@ -200,30 +200,17 @@ async fn test_table_v2_materialize() -> Result<()> {
.collect_vec();

// Since we have not polled `Materialize`, we cannot scan anything from this table
let table = StorageTable::new_for_test(
let table = RowBasedStorageTable::new_for_test(
memory_state_store.clone(),
source_table_id,
column_descs.clone(),
vec![OrderType::Ascending],
vec![0],
);

let ordered_column_descs: Vec<OrderedColumnDesc> = column_descs
.iter()
.take(1)
.map(|d| OrderedColumnDesc {
column_desc: d.clone(),
order: OrderType::Ascending,
})
.collect();

let scan = Box::new(RowSeqScanExecutor::new(
table.schema().clone(),
vec![ScanType::TableScan(
table
.batch_dedup_pk_iter(u64::MAX, &ordered_column_descs)
.await?,
)],
vec![ScanType::TableScan(table.batch_iter(u64::MAX).await?)],
1024,
"RowSeqExecutor2".to_string(),
Arc::new(BatchMetrics::unused()),
Expand Down Expand Up @@ -281,11 +268,7 @@ async fn test_table_v2_materialize() -> Result<()> {
// Scan the table again, we are able to get the data now!
let scan = Box::new(RowSeqScanExecutor::new(
table.schema().clone(),
vec![ScanType::TableScan(
table
.batch_dedup_pk_iter(u64::MAX, &ordered_column_descs)
.await?,
)],
vec![ScanType::TableScan(table.batch_iter(u64::MAX).await?)],
1024,
"RowSeqScanExecutor2".to_string(),
Arc::new(BatchMetrics::unused()),
Expand Down Expand Up @@ -352,11 +335,7 @@ async fn test_table_v2_materialize() -> Result<()> {
// Scan the table again, we are able to see the deletion now!
let scan = Box::new(RowSeqScanExecutor::new(
table.schema().clone(),
vec![ScanType::TableScan(
table
.batch_dedup_pk_iter(u64::MAX, &ordered_column_descs)
.await?,
)],
vec![ScanType::TableScan(table.batch_iter(u64::MAX).await?)],
1024,
"RowSeqScanExecutor2".to_string(),
Arc::new(BatchMetrics::unused()),
Expand Down Expand Up @@ -389,7 +368,7 @@ async fn test_row_seq_scan() -> Result<()> {
ColumnDesc::unnamed(ColumnId::from(2), schema[2].data_type.clone()),
];

let mut state = StateTable::new_without_distribution(
let mut state = RowBasedStateTable::new_without_distribution(
memory_state_store.clone(),
TableId::from(0x42),
column_descs.clone(),
Expand All @@ -416,22 +395,10 @@ async fn test_row_seq_scan() -> Result<()> {
.unwrap();
state.commit(epoch).await.unwrap();

let pk_descs: Vec<OrderedColumnDesc> = column_descs
.iter()
.take(1)
.map(|d| OrderedColumnDesc {
column_desc: d.clone(),
order: OrderType::Ascending,
})
.collect();

let executor = Box::new(RowSeqScanExecutor::new(
table.schema().clone(),
vec![ScanType::TableScan(
table
.batch_dedup_pk_iter(u64::MAX, &pk_descs)
.await
.unwrap(),
table.batch_iter(u64::MAX).await.unwrap(),
)],
1,
"RowSeqScanExecutor2".to_string(),
Expand Down
37 changes: 23 additions & 14 deletions src/storage/src/row_serde/cell_based_row_deserializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,47 +28,56 @@ use crate::row_serde::ColumnDescMapping;

#[allow(clippy::len_without_is_empty)]
impl ColumnDescMapping {
fn new_inner(output_columns: Vec<ColumnDesc>, all_data_types: Vec<DataType>) -> Arc<Self> {
let id_to_column_index = output_columns
fn new_inner(
output_columns: Vec<ColumnDesc>,
all_data_types: Vec<DataType>,
output_index: Vec<usize>,
) -> Arc<Self> {
let output_id_to_index = output_columns
.iter()
.enumerate()
.map(|(index, d)| (d.column_id, index))
.collect();

Self {
output_columns,
id_to_column_index,
output_id_to_index,
all_data_types,
output_index,
}
.into()
}

/// Create a mapping with given `output_columns`.
pub fn new(output_columns: Vec<ColumnDesc>) -> Arc<Self> {
let all_data_types = output_columns.iter().map(|d| d.data_type.clone()).collect();
Self::new_inner(output_columns, all_data_types)
let output_index: Vec<usize> = output_columns
.iter()
.map(|c| c.column_id.get_id() as usize)
.collect();
Self::new_inner(output_columns, all_data_types, output_index)
}

/// Create a mapping with given `table_columns` projected on the `column_ids`.
pub fn new_partial(table_columns: &[ColumnDesc], column_ids: &[ColumnId]) -> Arc<Self> {
pub fn new_partial(table_columns: &[ColumnDesc], output_column_ids: &[ColumnId]) -> Arc<Self> {
let all_data_types = table_columns.iter().map(|d| d.data_type.clone()).collect();

let mut table_columns = table_columns
.iter()
.map(|c| (c.column_id, c.clone()))
.enumerate()
.map(|(index, c)| (c.column_id, (c.clone(), index)))
.collect::<HashMap<_, _>>();

let output_columns = column_ids
let (output_columns, output_index): (
Vec<risingwave_common::catalog::ColumnDesc>,
Vec<usize>,
) = output_column_ids
.iter()
.map(|id| table_columns.remove(id).unwrap())
.collect();

Self::new_inner(output_columns, all_data_types)
.unzip();
Self::new_inner(output_columns, all_data_types, output_index)
}

/// Get the [`ColumnDesc`] and its index in the output with given `id`.
pub fn get(&self, id: ColumnId) -> Option<(&ColumnDesc, usize)> {
self.id_to_column_index
self.output_id_to_index
.get(&id)
.map(|&index| (&self.output_columns[index], index))
}
Expand Down
8 changes: 7 additions & 1 deletion src/storage/src/row_serde/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,17 @@ pub trait RowSerialize: Clone {
/// Record mapping from [`ColumnDesc`], [`ColumnId`], and output index of columns in a table.
#[derive(Clone)]
pub struct ColumnDescMapping {
/// output_columns are some of the columns that to be partialy scan.
pub output_columns: Vec<ColumnDesc>,

pub id_to_column_index: HashMap<ColumnId, usize>,
/// The output column's column index in output row, which is used in cell-based deserialize.
pub output_id_to_index: HashMap<ColumnId, usize>,

/// The full row data types, which is used in row-based deserialize.
pub all_data_types: Vec<DataType>,

/// The output column's column index in full row, which is used in row-based deserialize.
pub output_index: Vec<usize>,
}

/// `Decoding` defines an interface for decoding a key row from kv storage.
Expand Down
82 changes: 75 additions & 7 deletions src/storage/src/row_serde/row_based_deserializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,14 @@ impl RowBasedDeserializer {
}

let (vnode, key_bytes) = parse_raw_key_to_vnode_and_key(raw_key);
Ok(Some((
vnode,
key_bytes.to_vec(),
row_based_deserialize_inner(&self.column_mapping.all_data_types, value.as_ref())?,
)))
let mut origin_row =
row_based_deserialize_inner(&self.column_mapping.all_data_types, value.as_ref())?;

let mut output_row = Vec::with_capacity(self.column_mapping.output_index.len());
for col_idx in &self.column_mapping.output_index {
output_row.push(origin_row.0[*col_idx].take());
}
Ok(Some((vnode, key_bytes.to_vec(), Row(output_row))))
}
}

Expand All @@ -93,7 +96,7 @@ mod tests {
use crate::table::storage_table::DEFAULT_VNODE;

#[test]
fn test_row_based_serialize_and_deserialize_with_null() {
fn test_row_based_serialize_and_deserialize_full_row() {
let row = Row(vec![
Some(ScalarImpl::Utf8("string".into())),
Some(ScalarImpl::Bool(true)),
Expand All @@ -117,7 +120,7 @@ mod tests {
DataType::Decimal,
DataType::Interval,
];
let column_ids = (1..=row.size())
let column_ids = (0..=row.size() - 1)
.map(|i| ColumnId::new(i as _))
.collect_vec();

Expand All @@ -140,4 +143,69 @@ mod tests {
assert_eq!(row, row1.unwrap().2);
}
}

#[test]
fn test_row_based_serialize_and_deserialize_partial_row() {
let row = Row(vec![
Some(ScalarImpl::Utf8("string".into())),
Some(ScalarImpl::Bool(true)),
Some(ScalarImpl::Int16(1)),
Some(ScalarImpl::Int32(2)),
Some(ScalarImpl::Int64(3)),
Some(ScalarImpl::Float32(4.0.into())),
Some(ScalarImpl::Float64(5.0.into())),
Some(ScalarImpl::Decimal("-233.3".parse().unwrap())),
Some(ScalarImpl::Interval(IntervalUnit::new(7, 8, 9))),
]);

let data_types = vec![
DataType::Varchar,
DataType::Boolean,
DataType::Int16,
DataType::Int32,
DataType::Int64,
DataType::Float32,
DataType::Float64,
DataType::Decimal,
DataType::Interval,
];
let column_ids = (0..=row.size() - 1)
.map(|i| ColumnId::new(i as _))
.collect_vec();

// remove the first two columns
let output_column_ids = (2..=row.size() - 1)
.map(|i| ColumnId::new(i as _))
.collect_vec();
let column_descs = data_types
.iter()
.zip_eq(column_ids.iter())
.map(|(d, i)| ColumnDesc::unnamed(*i, d.clone()))
.collect_vec();

let mut se = RowBasedSerializer::create_row_serializer(&[], &column_descs, &column_ids);
let value_bytes = se.serialize(DEFAULT_VNODE, &[], row).unwrap();
// each cell will add a is_none flag (u8)

let mut de = RowBasedDeserializer::create_row_deserializer(ColumnDescMapping::new_partial(
&column_descs,
&output_column_ids,
));

let partial_row = Row(vec![
Some(ScalarImpl::Int16(1)),
Some(ScalarImpl::Int32(2)),
Some(ScalarImpl::Int64(3)),
Some(ScalarImpl::Float32(4.0.into())),
Some(ScalarImpl::Float64(5.0.into())),
Some(ScalarImpl::Decimal("-233.3".parse().unwrap())),
Some(ScalarImpl::Interval(IntervalUnit::new(7, 8, 9))),
]);
for (pk, value) in value_bytes {
assert_eq!(value.len(), 11 + 2 + 3 + 5 + 9 + 5 + 9 + 17 + 17);
let deser_row = de.deserialize(pk, value).unwrap();
assert_eq!(DEFAULT_VNODE, deser_row.clone().unwrap().0);
assert_eq!(partial_row, deser_row.unwrap().2);
}
}
}
Loading