Skip to content

Commit

Permalink
refactor(streaming): use table catalog for materialize executor (#3742)
Browse files Browse the repository at this point in the history
* use catalog for materialize executor
  • Loading branch information
wcy-fdu authored Jul 8, 2022
1 parent 23f41ab commit 8fbe3f9
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 53 deletions.
8 changes: 3 additions & 5 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,9 @@ message FilterNode {
message MaterializeNode {
uint32 table_id = 1;
// Column indexes and orders of primary key
repeated plan_common.ColumnOrder column_orders = 2;
// Column IDs of input schema
repeated int32 column_ids = 3;
// Hash key of the materialize node, which is a subset of pk.
repeated uint32 distribution_key = 4;
repeated plan_common.ColumnOrder column_orders = 3;
// Used for internal table states.
catalog.Table table = 4;
}

// Remark by Yanghao: for both local and global we use the same node in the protobuf.
Expand Down
19 changes: 5 additions & 14 deletions src/frontend/src/optimizer/plan_node/stream_materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::fmt;

use fixedbitset::FixedBitSet;
use itertools::Itertools;
use risingwave_common::catalog::{ColumnDesc, TableId};
use risingwave_common::catalog::{ColumnDesc, DatabaseId, SchemaId, TableId};
use risingwave_common::error::ErrorCode::InternalError;
use risingwave_common::error::Result;
use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode;
Expand Down Expand Up @@ -240,25 +240,16 @@ impl ToStreamProst for StreamMaterialize {
// We don't need table id for materialize node in frontend. The id will be generated on
// meta catalog service.
table_id: 0,
column_ids: self
.table()
.columns()
.iter()
.map(|col| ColumnId::get_id(&col.column_desc.column_id))
.collect(),
column_orders: self
.table()
.order_key()
.iter()
.map(FieldOrder::to_protobuf)
.collect(),
distribution_key: self
.base
.dist
.dist_column_indices()
.iter()
.map(|idx| *idx as u32)
.collect_vec(),
table: Some(self.table().to_prost(
SchemaId::placeholder() as u32,
DatabaseId::placeholder() as u32,
)),
})
}
}
146 changes: 123 additions & 23 deletions src/frontend/test_runner/tests/testdata/stream_proto.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,32 @@
columnOrders:
- orderType: ASCENDING
index: 1
columnIds:
- 0
- 1
distributionKey:
- 1
table:
id: 4294967294
schemaId: 2147483646
databaseId: 2147483646
name: test
columns:
- columnDesc:
columnType:
typeName: INT32
isNullable: true
name: v1
- columnDesc:
columnType:
typeName: INT64
isNullable: true
columnId: 1
name: _row_id
isHidden: true
orderKey:
- orderType: ASCENDING
index: 1
distributionKey:
- 1
pk:
- 1
owner: root
---
id: 4294967294
schemaId: 1
Expand Down Expand Up @@ -204,11 +225,32 @@
columnOrders:
- orderType: ASCENDING
index: 1
columnIds:
- 0
- 1
distributionKey:
- 1
table:
id: 4294967294
schemaId: 2147483646
databaseId: 2147483646
name: test
columns:
- columnDesc:
columnType:
typeName: INT32
isNullable: true
name: v1
- columnDesc:
columnType:
typeName: INT64
isNullable: true
columnId: 1
name: _row_id
isHidden: true
orderKey:
- orderType: ASCENDING
index: 1
distributionKey:
- 1
pk:
- 1
owner: root
---
id: 4294967294
schemaId: 1
Expand Down Expand Up @@ -315,11 +357,32 @@
columnOrders:
- orderType: ASCENDING
index: 1
columnIds:
- 0
- 1
distributionKey:
- 1
table:
id: 4294967294
schemaId: 2147483646
databaseId: 2147483646
name: test
columns:
- columnDesc:
columnType:
typeName: INT32
isNullable: true
name: v1
- columnDesc:
columnType:
typeName: INT64
isNullable: true
columnId: 1
name: _row_id
isHidden: true
orderKey:
- orderType: ASCENDING
index: 1
distributionKey:
- 1
pk:
- 1
owner: root
---
id: 4294967294
schemaId: 1
Expand Down Expand Up @@ -525,9 +588,25 @@
isNullable: true
name: "agg#1"
materialize:
columnIds:
- 0
- 1
table:
id: 4294967294
schemaId: 2147483646
databaseId: 2147483646
name: test
columns:
- columnDesc:
columnType:
typeName: INT64
isNullable: true
name: "agg#0"
isHidden: true
- columnDesc:
columnType:
typeName: INT64
isNullable: true
columnId: 1
name: sum
owner: root
---
id: 4294967294
schemaId: 1
Expand Down Expand Up @@ -791,11 +870,32 @@
columnOrders:
- orderType: ASCENDING
index: 1
columnIds:
- 0
- 1
distributionKey:
- 1
table:
id: 4294967294
schemaId: 2147483646
databaseId: 2147483646
name: test
columns:
- columnDesc:
columnType:
typeName: INT64
isNullable: true
name: sum_v1
- columnDesc:
columnType:
typeName: INT32
isNullable: true
columnId: 1
name: v2
isHidden: true
orderKey:
- orderType: ASCENDING
index: 1
distributionKey:
- 1
pk:
- 1
owner: root
---
id: 4294967294
schemaId: 1
Expand Down
3 changes: 1 addition & 2 deletions src/meta/src/stream/test_fragmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,9 +261,8 @@ fn make_stream_node() -> StreamNode {
pk_indices: vec![],
node_body: Some(NodeBody::Materialize(MaterializeNode {
table_id: 1,
column_ids: vec![0_i32, 1_i32],
table: Some(make_internal_table(true)),
column_orders: vec![make_column_order(1), make_column_order(2)],
distribution_key: Default::default(),
})),
fields: vec![], // TODO: fill this later
operator_id: 7,
Expand Down
12 changes: 6 additions & 6 deletions src/stream/src/executor/mview/materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,29 +37,29 @@ pub struct MaterializeExecutor<S: StateStore> {

state_table: StateTable<S>,

/// Columns of arrange keys (including pk, group key, join keys, etc.)
/// Columns of arrange keys (including pk, group keys, join keys, etc.)
arrange_columns: Vec<usize>,

info: ExecutorInfo,
}

impl<S: StateStore> MaterializeExecutor<S> {
/// Create a new `MaterializeExecutor` with distribution specified with `distribution_key` and
/// `vnodes`. For singleton distribution, `distribution_key` should be empty and `vnodes`
/// Create a new `MaterializeExecutor` with distribution specified with `distribution_keys` and
/// `vnodes`. For singleton distribution, `distribution_keys` should be empty and `vnodes`
/// should be `None`.
#[allow(clippy::too_many_arguments)]
pub fn new(
input: BoxedExecutor,
store: S,
table_id: TableId,
order_key: Vec<OrderPair>,
key: Vec<OrderPair>,
column_ids: Vec<ColumnId>,
executor_id: u64,
distribution_key: Vec<usize>,
vnodes: Option<Arc<Bitmap>>,
) -> Self {
let arrange_columns: Vec<usize> = order_key.iter().map(|k| k.column_idx).collect();
let arrange_order_types = order_key.iter().map(|k| k.order_type).collect();
let arrange_columns: Vec<usize> = key.iter().map(|k| k.column_idx).collect();
let arrange_order_types = key.iter().map(|k| k.order_type).collect();

let schema = input.schema().clone();
let columns = column_ids
Expand Down
8 changes: 5 additions & 3 deletions src/stream/src/from_proto/mview.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,15 @@ impl ExecutorBuilder for MaterializeExecutorBuilder {
.map(OrderPair::from_prost)
.collect();
let column_ids = node
.column_ids
.get_table()?
.get_columns()
.iter()
.map(|id| ColumnId::from(*id))
.map(|t| ColumnId::from(t.column_desc.as_ref().unwrap().column_id))
.collect();

let distribution_key = node
.distribution_key
.get_table()?
.get_distribution_key()
.iter()
.map(|key| *key as usize)
.collect();
Expand Down

0 comments on commit 8fbe3f9

Please sign in to comment.