Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(batch): apply row-based encoding for materialize executor and batch mode #4335

Merged
merged 13 commits into from
Aug 2, 2022
23 changes: 7 additions & 16 deletions src/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,16 @@ use futures_async_stream::try_stream;
use itertools::Itertools;
use risingwave_common::array::{DataChunk, Row};
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::{ColumnDesc, ColumnId, OrderedColumnDesc, Schema, TableId};
use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema, TableId};
use risingwave_common::error::{Result, RwError};
use risingwave_common::types::{DataType, Datum, ScalarImpl};
use risingwave_common::util::select_all;
use risingwave_common::util::sort_util::OrderType;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::{scan_range, ScanRange};
use risingwave_pb::plan_common::{CellBasedTableDesc, OrderType as ProstOrderType};
use risingwave_storage::row_serde::CellBasedRowSerde;
use risingwave_storage::table::storage_table::{BatchDedupPkIter, StorageTable, StorageTableIter};
use risingwave_storage::row_serde::RowBasedSerde;
use risingwave_storage::table::storage_table::{RowBasedStorageTable, StorageTableIter};
use risingwave_storage::table::{Distribution, TableIter};
use risingwave_storage::{dispatch_state_store, Keyspace, StateStore, StateStoreImpl};

Expand All @@ -49,8 +49,8 @@ pub struct RowSeqScanExecutor<S: StateStore> {
}

pub enum ScanType<S: StateStore> {
TableScan(BatchDedupPkIter<S, CellBasedRowSerde>),
RangeScan(StorageTableIter<S, CellBasedRowSerde>),
TableScan(StorageTableIter<S, RowBasedSerde>),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need to differentiate TableScan and RangeScan?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure whether we need to differentiate, if we have row-based dudup pk. cc @BugenZhao

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With dedup fully implemented, they should also use the same iterator. For now I think there's no need to differentiate them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will refactor this part in next PR.

RangeScan(StorageTableIter<S, RowBasedSerde>),
PointGet(Option<Row>),
}

Expand Down Expand Up @@ -156,15 +156,6 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder {
.map(ColumnId::from)
.collect();

// TODO: remove this
let pk_descs = table_desc
.order_key
.iter()
.map(|order| OrderedColumnDesc {
column_desc: column_descs[order.index as usize].clone(),
order: OrderType::from_prost(&ProstOrderType::from_i32(order.order_type).unwrap()),
})
.collect_vec();
let pk_types = table_desc
.order_key
.iter()
Expand Down Expand Up @@ -203,7 +194,7 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder {

dispatch_state_store!(source.context().try_get_state_store()?, state_store, {
let batch_stats = source.context().stats();
let table = StorageTable::new_partial(
let table = RowBasedStorageTable::new_partial(
state_store.clone(),
table_id,
column_descs,
Expand All @@ -215,7 +206,7 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder {
let keyspace = Keyspace::table_root(state_store.clone(), &table_id);

if seq_scan_node.scan_ranges.is_empty() {
let iter = table.batch_dedup_pk_iter(source.epoch, &pk_descs).await?;
let iter = table.batch_iter(source.epoch).await?;
return Ok(Box::new(RowSeqScanExecutor::new(
table.schema().clone(),
vec![ScanType::TableScan(iter)],
Expand Down
51 changes: 9 additions & 42 deletions src/compute/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use risingwave_batch::executor::{
RowSeqScanExecutor, ScanType,
};
use risingwave_common::array::{Array, DataChunk, F64Array, I64Array, Row};
use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, OrderedColumnDesc, Schema, TableId};
use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableId};
use risingwave_common::column_nonnull;
use risingwave_common::error::{Result, RwError};
use risingwave_common::test_prelude::DataChunkTestExt;
Expand All @@ -36,8 +36,8 @@ use risingwave_pb::data::data_type::TypeName;
use risingwave_pb::plan_common::ColumnDesc as ProstColumnDesc;
use risingwave_source::{MemSourceManager, SourceManager};
use risingwave_storage::memory::MemoryStateStore;
use risingwave_storage::table::state_table::StateTable;
use risingwave_storage::table::storage_table::StorageTable;
use risingwave_storage::table::state_table::RowBasedStateTable;
use risingwave_storage::table::storage_table::RowBasedStorageTable;
use risingwave_storage::Keyspace;
use risingwave_stream::executor::monitor::StreamingMetrics;
use risingwave_stream::executor::{
Expand Down Expand Up @@ -200,30 +200,17 @@ async fn test_table_v2_materialize() -> Result<()> {
.collect_vec();

// Since we have not polled `Materialize`, we cannot scan anything from this table
let table = StorageTable::new_for_test(
let table = RowBasedStorageTable::new_for_test(
memory_state_store.clone(),
source_table_id,
column_descs.clone(),
vec![OrderType::Ascending],
vec![0],
);

let ordered_column_descs: Vec<OrderedColumnDesc> = column_descs
.iter()
.take(1)
.map(|d| OrderedColumnDesc {
column_desc: d.clone(),
order: OrderType::Ascending,
})
.collect();

let scan = Box::new(RowSeqScanExecutor::new(
table.schema().clone(),
vec![ScanType::TableScan(
table
.batch_dedup_pk_iter(u64::MAX, &ordered_column_descs)
.await?,
)],
vec![ScanType::TableScan(table.batch_iter(u64::MAX).await?)],
1024,
"RowSeqExecutor2".to_string(),
Arc::new(BatchMetrics::unused()),
Expand Down Expand Up @@ -281,11 +268,7 @@ async fn test_table_v2_materialize() -> Result<()> {
// Scan the table again, we are able to get the data now!
let scan = Box::new(RowSeqScanExecutor::new(
table.schema().clone(),
vec![ScanType::TableScan(
table
.batch_dedup_pk_iter(u64::MAX, &ordered_column_descs)
.await?,
)],
vec![ScanType::TableScan(table.batch_iter(u64::MAX).await?)],
1024,
"RowSeqScanExecutor2".to_string(),
Arc::new(BatchMetrics::unused()),
Expand Down Expand Up @@ -352,11 +335,7 @@ async fn test_table_v2_materialize() -> Result<()> {
// Scan the table again, we are able to see the deletion now!
let scan = Box::new(RowSeqScanExecutor::new(
table.schema().clone(),
vec![ScanType::TableScan(
table
.batch_dedup_pk_iter(u64::MAX, &ordered_column_descs)
.await?,
)],
vec![ScanType::TableScan(table.batch_iter(u64::MAX).await?)],
1024,
"RowSeqScanExecutor2".to_string(),
Arc::new(BatchMetrics::unused()),
Expand Down Expand Up @@ -389,7 +368,7 @@ async fn test_row_seq_scan() -> Result<()> {
ColumnDesc::unnamed(ColumnId::from(2), schema[2].data_type.clone()),
];

let mut state = StateTable::new_without_distribution(
let mut state = RowBasedStateTable::new_without_distribution(
memory_state_store.clone(),
TableId::from(0x42),
column_descs.clone(),
Expand All @@ -416,22 +395,10 @@ async fn test_row_seq_scan() -> Result<()> {
.unwrap();
state.commit(epoch).await.unwrap();

let pk_descs: Vec<OrderedColumnDesc> = column_descs
.iter()
.take(1)
.map(|d| OrderedColumnDesc {
column_desc: d.clone(),
order: OrderType::Ascending,
})
.collect();

let executor = Box::new(RowSeqScanExecutor::new(
table.schema().clone(),
vec![ScanType::TableScan(
table
.batch_dedup_pk_iter(u64::MAX, &pk_descs)
.await
.unwrap(),
table.batch_iter(u64::MAX).await.unwrap(),
)],
1,
"RowSeqScanExecutor2".to_string(),
Expand Down
21 changes: 15 additions & 6 deletions src/storage/src/row_serde/row_based_deserializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::sync::Arc;

use bytes::{Buf, Bytes};
use itertools::Itertools;
use risingwave_common::array::Row;
use risingwave_common::error::{ErrorCode, Result};
use risingwave_common::types::{DataType, VirtualNode, VIRTUAL_NODE_SIZE};
Expand Down Expand Up @@ -64,11 +65,19 @@ impl RowBasedDeserializer {
}

let (vnode, key_bytes) = parse_raw_key_to_vnode_and_key(raw_key);
Ok(Some((
vnode,
key_bytes.to_vec(),
row_based_deserialize_inner(&self.column_mapping.all_data_types, value.as_ref())?,
)))
let origin_row =
row_based_deserialize_inner(&self.column_mapping.all_data_types, value.as_ref())?;
let output_column_ids = self
.column_mapping
.output_columns
.iter()
.map(|c| c.column_id)
.collect_vec();
let mut output_row = vec![];
for col_id in output_column_ids {
output_row.push(origin_row.0[col_id.get_id() as usize].clone());
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
}
Ok(Some((vnode, key_bytes.to_vec(), Row(output_row))))
}
}

Expand Down Expand Up @@ -117,7 +126,7 @@ mod tests {
DataType::Decimal,
DataType::Interval,
];
let column_ids = (1..=row.size())
let column_ids = (0..=row.size() - 1)
.map(|i| ColumnId::new(i as _))
.collect_vec();

Expand Down
6 changes: 4 additions & 2 deletions src/storage/src/table/storage_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ use crate::error::{StorageError, StorageResult};
use crate::keyspace::StripPrefixIterator;
use crate::row_serde::cell_based_encoding_util::serialize_pk_and_column_id;
use crate::row_serde::{
serialize_pk, CellBasedRowSerde, ColumnDescMapping, RowDeserialize, RowSerde, RowSerialize,
serialize_pk, CellBasedRowSerde, ColumnDescMapping, RowBasedSerde, RowDeserialize, RowSerde,
RowSerialize,
};
use crate::storage_value::StorageValue;
use crate::store::{ReadOptions, WriteOptions};
Expand All @@ -64,6 +65,8 @@ pub const DEFAULT_VNODE: VirtualNode = 0;
/// encoding format: [keyspace | pk | `column_id` (4B)] -> value.
/// if the key of the column id does not exist, it will be Null in the relation
pub type StorageTable<S, const T: AccessType> = StorageTableBase<S, CellBasedRowSerde, T>;

pub type RowBasedStorageTable<S, const T: AccessType> = StorageTableBase<S, RowBasedSerde, T>;
/// [`StorageTableBase`] is the interface accessing relational data in KV(`StateStore`) with
/// encoding format: [keyspace | pk | `column_id` (4B)] -> value.
/// if the key of the column id does not exist, it will be Null in the relation.
Expand Down Expand Up @@ -876,7 +879,6 @@ impl<S: StateStore, RS: RowSerde> StorageTableIterInner<S, RS> {
}

let row_deserializer = RS::create_deserializer(table_descs);

let iter = keyspace
.iter_with_range(raw_key_range, read_options)
.await?;
Expand Down
Loading