Skip to content

Commit

Permalink
fix: use SomeShard as the distribution of batch scan (risingwavelabs#…
Browse files Browse the repository at this point in the history
…4420)

* fix: use SomeShard as the distribution of batch scan

* drop

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
xxchan and mergify[bot] authored Aug 4, 2022
1 parent 2d42495 commit 5fe991c
Show file tree
Hide file tree
Showing 10 changed files with 129 additions and 53 deletions.
24 changes: 24 additions & 0 deletions e2e_test/batch/basic/join.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,27 @@ query I
select count(*) from (values (1, 2), (3, 4)) as a, (values (9),(4),(1)) as b;
----
6

statement ok
create table t(x int);

statement ok
create index i on t(x);

statement ok
insert into t values (1),(2),(3),(4),(5);

query I rowsort
Select * from t join i using(x)
----
1
2
3
4
5

statement ok
drop index i;

statement ok
drop table t;
15 changes: 13 additions & 2 deletions src/frontend/src/optimizer/plan_node/batch_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ impl BatchHashJoin {
&logical
.l2i_col_mapping()
.composite(&logical.i2o_col_mapping()),
&logical
.r2i_col_mapping()
.composite(&logical.i2o_col_mapping()),
);
let base = PlanBase::new_batch(ctx, logical.schema().clone(), dist, Order::any());

Expand All @@ -65,13 +68,21 @@ impl BatchHashJoin {
left: &Distribution,
right: &Distribution,
l2o_mapping: &ColIndexMapping,
r2o_mapping: &ColIndexMapping,
) -> Distribution {
match (left, right) {
(Distribution::Single, Distribution::Single) => Distribution::Single,
(Distribution::HashShard(_), Distribution::HashShard(_)) => {
(Distribution::HashShard(_), Distribution::HashShard(_) | Distribution::SomeShard) => {
l2o_mapping.rewrite_provided_distribution(left)
}
(_, _) => unreachable!(),
(Distribution::SomeShard, Distribution::HashShard(_)) => {
r2o_mapping.rewrite_provided_distribution(right)
}
(Distribution::SomeShard, Distribution::SomeShard) => Distribution::SomeShard,
(_, _) => unreachable!(
"suspicious distribution: left: {:?}, right: {:?}",
left, right
),
}
}

Expand Down
22 changes: 13 additions & 9 deletions src/frontend/src/optimizer/plan_node/batch_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,19 @@ impl BatchSeqScan {
if self.logical.is_sys_table() {
Distribution::Single
} else {
match self.logical.distribution_key() {
// FIXME: Should be `Single` if no distribution key.
// Currently the task will be scheduled to frontend under local mode, which is
// unimplemented yet. Enable this when it's done.
//
// Some(dist_key) if dist_key.is_empty() => Distribution::Single,
Some(dist_key) => Distribution::HashShard(dist_key),
None => Distribution::SomeShard,
}
// FIXME: Should be `Single` if no distribution key.
// Currently the task will be scheduled to frontend under local mode, which is
// unimplemented yet. Enable this when it's done.

// For other batch operators, `HashShard` is a simple hashing, i.e.,
// `target_shard = hash(dist_key) % shard_num`
//
// But MV is actually sharded by consistent hashing, i.e.,
// `target_shard = vnode_mapping.map(hash(dist_key) % vnode_num)`
//
// They are incompatible, so we just specify its distribution as `SomeShard`
// to force an exchange is inserted.
Distribution::SomeShard
},
self.scan_ranges.clone(),
)
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/logical_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ impl LogicalScan {

/// The mapped distribution key of the scan operator.
///
/// The column indices in it is the position in the `required_col_idx`,instead of the position
/// The column indices in it is the position in the `required_col_idx`, instead of the position
/// in all the columns of the table (which is the table's distribution key).
///
/// Return `None` if the table's distribution key are not all in the `required_col_idx`.
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/test_runner/tests/testdata/basic_query.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,14 @@
delete from t;
batch_plan: |
BatchDelete { table: t }
BatchScan { table: t, columns: [t._row_id, t.v1, t.v2], distribution: HashShard(t._row_id) }
BatchScan { table: t, columns: [t._row_id, t.v1, t.v2], distribution: SomeShard }
- sql: |
create table t (v1 int, v2 int);
delete from t where v1 = 1;
batch_plan: |
BatchDelete { table: t }
BatchFilter { predicate: (t.v1 = 1:Int32) }
BatchScan { table: t, columns: [t._row_id, t.v1, t.v2], distribution: HashShard(t._row_id) }
BatchScan { table: t, columns: [t._row_id, t.v1, t.v2], distribution: SomeShard }
- sql: |
select * from generate_series('2'::INT,'10'::INT,'2'::INT);
batch_plan: |
Expand Down
43 changes: 40 additions & 3 deletions src/frontend/test_runner/tests/testdata/join.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -189,15 +189,52 @@
LogicalScan { table: t1, output_columns: [t1.v1, t1.v2], required_columns: [v1, v2], predicate: (t1.v1 > 100:Int32) }
LogicalScan { table: t2, output_columns: [t2.v1, t2.v2], required_columns: [v1, v2], predicate: (t2.v1 < 1000:Int32) }
- sql: |
/* Left & right has same distribution. There should be no exchange below hash join */
/* Left & right has same SomeShard distribution. There should still be exchanges below hash join */
create table t(x int);
create index i on t(x);
select * from i join i as ii on i.x=ii.x;
batch_plan: |
BatchExchange { order: [], dist: Single }
BatchHashJoin { type: Inner, predicate: i.x = i.x, output: all }
BatchScan { table: i, columns: [i.x], distribution: HashShard(i.x) }
BatchScan { table: i, columns: [i.x], distribution: HashShard(i.x) }
BatchExchange { order: [], dist: HashShard(i.x) }
BatchScan { table: i, columns: [i.x], distribution: SomeShard }
BatchExchange { order: [], dist: HashShard(i.x) }
BatchScan { table: i, columns: [i.x], distribution: SomeShard }
- sql: |
/* Left & right has same SomeShard distribution. There should still be exchanges below hash join */
create table t(x int);
create index i on t(x);
select * from i join t on i.x=t.x;
batch_plan: |
BatchExchange { order: [], dist: Single }
BatchHashJoin { type: Inner, predicate: i.x = t.x, output: all }
BatchExchange { order: [], dist: HashShard(i.x) }
BatchScan { table: i, columns: [i.x], distribution: SomeShard }
BatchExchange { order: [], dist: HashShard(t.x) }
BatchScan { table: t, columns: [t.x], distribution: SomeShard }
- sql: |
/* Left & right has same HashShard distribution. There should be no exchange below hash join */
create table t(x int);
create index i on t(x);
select * from
(select * from i join i as ii using (x)) t1
full join
(select * from i join i as ii using (x)) t2
using (x);
batch_plan: |
BatchExchange { order: [], dist: Single }
BatchProject { exprs: [Coalesce(i.x, i.x)] }
BatchHashJoin { type: FullOuter, predicate: i.x = i.x, output: all }
BatchHashJoin { type: Inner, predicate: i.x = i.x, output: [i.x] }
BatchExchange { order: [], dist: HashShard(i.x) }
BatchScan { table: i, columns: [i.x], distribution: SomeShard }
BatchExchange { order: [], dist: HashShard(i.x) }
BatchScan { table: i, columns: [i.x], distribution: SomeShard }
BatchHashJoin { type: Inner, predicate: i.x = i.x, output: [i.x] }
BatchExchange { order: [], dist: HashShard(i.x) }
BatchScan { table: i, columns: [i.x], distribution: SomeShard }
BatchExchange { order: [], dist: HashShard(i.x) }
BatchScan { table: i, columns: [i.x], distribution: SomeShard }
- sql: |
/* Use lookup join */
create table t1 (v1 int, v2 int);
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/test_runner/tests/testdata/mv_on_mv.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
select v from mv; -- FIXME: there should not be a `Single` exchange here
batch_plan: |
BatchExchange { order: [], dist: Single }
BatchScan { table: mv, columns: [mv.v], distribution: HashShard() }
BatchScan { table: mv, columns: [mv.v], distribution: SomeShard }
- id: single_fragment_mv_on_singleton_mv
before:
- create_singleton_mv
Expand Down
12 changes: 6 additions & 6 deletions src/frontend/test_runner/tests/testdata/project_set.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
BatchProject { exprs: [Unnest($1)] }
BatchExchange { order: [], dist: Single }
BatchProjectSet { select_list: [Unnest($1)] }
BatchScan { table: t, columns: [t._row_id, t.x], distribution: HashShard(t._row_id) }
BatchScan { table: t, columns: [t._row_id, t.x], distribution: SomeShard }
stream_plan: |
StreamMaterialize { columns: [projected_row_id(hidden), unnest, t._row_id(hidden)], pk_columns: [t._row_id, projected_row_id] }
StreamProjectSet { select_list: [Unnest($1), $0] }
Expand All @@ -37,7 +37,7 @@
BatchProject { exprs: [Unnest($1), 1:Int32] }
BatchExchange { order: [], dist: Single }
BatchProjectSet { select_list: [Unnest($1), 1:Int32] }
BatchScan { table: t, columns: [t._row_id, t.x], distribution: HashShard(t._row_id) }
BatchScan { table: t, columns: [t._row_id, t.x], distribution: SomeShard }
- sql: |
/* multiple table functions */
create table t(x int[]);
Expand All @@ -46,7 +46,7 @@
BatchProject { exprs: [Unnest($1), Unnest(Array(1:Int32, 2:Int32))] }
BatchExchange { order: [], dist: Single }
BatchProjectSet { select_list: [Unnest($1), Unnest(Array(1:Int32, 2:Int32))] }
BatchScan { table: t, columns: [t._row_id, t.x], distribution: HashShard(t._row_id) }
BatchScan { table: t, columns: [t._row_id, t.x], distribution: SomeShard }
- sql: |
/* table functions as parameters of usual functions */
create table t(x int);
Expand All @@ -55,7 +55,7 @@
BatchExchange { order: [], dist: Single }
BatchProject { exprs: [Neg(Generate($1, $1, $1))] }
BatchProjectSet { select_list: [$0, $1, Generate($1, $1, $1)] }
BatchScan { table: t, columns: [t._row_id, t.x], distribution: HashShard(t._row_id) }
BatchScan { table: t, columns: [t._row_id, t.x], distribution: SomeShard }
- sql: |
/* table functions as parameters of usual functions */
create table t(x int[]);
Expand All @@ -65,7 +65,7 @@
BatchExchange { order: [], dist: Single }
BatchProjectSet { select_list: [($3 * $4), Unnest($2)] }
BatchProjectSet { select_list: [$0, $1, Unnest($1), Unnest($1)] }
BatchScan { table: t, columns: [t._row_id, t.x], distribution: HashShard(t._row_id) }
BatchScan { table: t, columns: [t._row_id, t.x], distribution: SomeShard }
stream_plan: |
StreamMaterialize { columns: [projected_row_id(hidden), a, b, t._row_id(hidden), projected_row_id#1(hidden)], pk_columns: [t._row_id, projected_row_id#1, projected_row_id] }
StreamProjectSet { select_list: [($3 * $4), Unnest($2), $1, $0] }
Expand All @@ -80,4 +80,4 @@
BatchExchange { order: [], dist: Single }
BatchProjectSet { select_list: [Generate($3, 100:Int32, 1:Int32)] }
BatchProjectSet { select_list: [$0, $1, Unnest($1)] }
BatchScan { table: t, columns: [t._row_id, t.x], distribution: HashShard(t._row_id) }
BatchScan { table: t, columns: [t._row_id, t.x], distribution: SomeShard }
Loading

0 comments on commit 5fe991c

Please sign in to comment.