Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(batch): fix batch scan distribution & handle more than one table scans in one stage #3736

Merged
merged 3 commits into from
Jul 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions e2e_test/batch/basic/join.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,23 @@ select * from t1 join t2 using(v1) join t3 using(v2);
----
1 2 1 3 2 3

statement ok
create index i1 on t1(v1);

statement ok
create index i2 on t2(v1);

query II
select * from i1 join i2 using(v1);
----
1 2 1 3

statement ok
drop index i1;

statement ok
drop index i2;

statement ok
drop table t1;

Expand Down
5 changes: 4 additions & 1 deletion src/frontend/src/optimizer/plan_node/batch_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,10 @@ impl BatchSeqScan {
if self.logical.is_sys_table() {
Distribution::Single
} else {
Distribution::SomeShard
match self.logical.distribution_key() {
Some(dist_key) => Distribution::HashShard(dist_key),
None => Distribution::SomeShard,
}
},
self.scan_range.clone(),
)
Expand Down
11 changes: 7 additions & 4 deletions src/frontend/src/optimizer/plan_node/logical_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,12 @@ impl LogicalScan {
&self.indexes
}

/// distribution keys stored in catalog only contains column index of the table (`table_idx`),
/// so we need to convert it to `operator_idx` when filling distributions.
pub fn map_distribution_key(&self) -> Vec<usize> {
/// The mapped distribution key of the scan operator.
///
/// The column indices in it is the position in the `required_col_idx`,
/// instead of the position in all the columns of the table
/// (which is the table's distribution key).
pub fn distribution_key(&self) -> Option<Vec<usize>> {
let tb_idx_to_op_idx = self
.required_col_idx
.iter()
Expand All @@ -192,7 +195,7 @@ impl LogicalScan {
self.table_desc
.distribution_key
.iter()
.map(|&tb_idx| tb_idx_to_op_idx[&tb_idx])
.map(|&tb_idx| tb_idx_to_op_idx.get(&tb_idx).cloned())
.collect()
}

Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/stream_index_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl StreamIndexScan {
ctx,
logical.schema().clone(),
logical.base.pk_indices.clone(),
Distribution::HashShard(logical.map_distribution_key()),
Distribution::HashShard(logical.distribution_key().unwrap()),
false, // TODO: determine the `append-only` field of table scan
);
Self {
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/stream_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl StreamTableScan {
logical.schema().clone(),
logical.base.pk_indices.clone(),
// follows upstream distribution from TableCatalog
Distribution::HashShard(logical.map_distribution_key()),
Distribution::HashShard(logical.distribution_key().unwrap()),
logical.table_desc().appendonly,
);
Self {
Expand Down
18 changes: 7 additions & 11 deletions src/frontend/src/scheduler/plan_fragmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,17 +394,13 @@ impl BatchPlanFragmenter {
// Check out the comments for `has_table_scan` in `QueryStage`.
let scan_node: Option<&BatchSeqScan> = node.as_batch_seq_scan();
if let Some(scan_node) = scan_node {
// TODO: handle multiple table scan inside a stage
assert!(
builder.table_scan_info.is_none()
|| builder
.table_scan_info
.as_ref()
.unwrap()
.vnode_bitmaps
.is_none(),
"multiple table scan inside a stage"
);
if builder.table_scan_info.is_some() {
// There is already a scan node in this stage.
// The nodes have the same distribution, but maybe different vnodes
// partition. We just use the same partition for all
// the scan nodes.
return;
}

builder.table_scan_info = Some({
let table_desc = scan_node.logical().table_desc();
Expand Down
10 changes: 10 additions & 0 deletions src/frontend/test_runner/tests/testdata/join.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -188,3 +188,13 @@
LogicalJoin { type: Inner, on: ($1 = $3), output_indices: all }
LogicalScan { table: t1, output_columns: [v1, v2], required_columns: [$1:v1, $2:v2], predicate: ($1 > 100:Int32) }
LogicalScan { table: t2, output_columns: [v1, v2], required_columns: [$1:v1, $2:v2], predicate: ($1 < 1000:Int32) }
- sql: |
/* Left & right has same distribution. There should be no exchange below hash join */
create table t(x int);
create index i on t(x);
select * from i join i as ii on i.x=ii.x;
batch_plan: |
BatchExchange { order: [], dist: Single }
BatchHashJoin { type: Inner, predicate: $0 = $1, output_indices: all }
BatchScan { table: i, columns: [x] }
BatchScan { table: i, columns: [x] }