Skip to content

Commit

Permalink
fix(batch scheduler): intermediate stage may have single distribution (
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangjinwu authored Jun 13, 2022
1 parent 5e852a7 commit fb9174f
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 50 deletions.
3 changes: 2 additions & 1 deletion e2e_test/batch/tpch.slt
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@ include ./tpch/q11.slt.part
include ./tpch/q12.slt.part
include ./tpch/q13.slt.part
include ./tpch/q14.slt.part
#include ./tpch/q15.slt.part
include ./tpch/q15.slt.part
include ./tpch/q16.slt.part
include ./tpch/q17.slt.part
include ./tpch/q18.slt.part
include ./tpch/q19.slt.part
include ./tpch/q20.slt.part
include ./tpch/q21.slt.part
include ./tpch/q22.slt.part

include ../tpch/drop_tables.slt.part
31 changes: 7 additions & 24 deletions src/frontend/src/scheduler/plan_fragmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,33 +299,20 @@ impl StageGraphBuilder {
impl BatchPlanFragmenter {
/// Split the plan node into each stages, based on exchange node.
pub fn split(mut self, batch_node: PlanRef) -> Result<Query> {
let root_stage = self.new_stage(batch_node.clone(), None, None);
let root_stage = self.new_stage(batch_node.clone(), Distribution::Single.to_prost(1));
let stage_graph = self.stage_graph_builder.build(root_stage.id);
Ok(Query {
stage_graph,
query_id: self.query_id,
})
}

fn new_stage(
&mut self,
root: PlanRef,
parent_parallelism: Option<u32>,
exchange_info: Option<ExchangeInfo>,
) -> QueryStageRef {
fn new_stage(&mut self, root: PlanRef, exchange_info: ExchangeInfo) -> QueryStageRef {
let next_stage_id = self.next_stage_id;
self.next_stage_id += 1;
let parallelism = match parent_parallelism {
// Non-root node
Some(_) => self.worker_node_manager.worker_node_count(),
// Root node.
None => 1,
};

let exchange_info = match exchange_info {
Some(info) => info,
// Root stage, the exchange info should always be Single
None => Distribution::Single.to_prost(1),
let parallelism = match root.distribution() {
Distribution::Single => 1,
_ => self.worker_node_manager.worker_node_count(),
};

let mut builder = QueryStageBuilder::new(
Expand Down Expand Up @@ -375,12 +362,8 @@ impl BatchPlanFragmenter {
parent_exec_node: Option<&mut ExecutionPlanNode>,
) {
let mut execution_plan_node = ExecutionPlanNode::from(node.clone());
let child_exchange_info = Some(node.distribution().to_prost(builder.parallelism));
let child_stage = self.new_stage(
node.inputs()[0].clone(),
Some(builder.parallelism),
child_exchange_info,
);
let child_exchange_info = node.distribution().to_prost(builder.parallelism);
let child_stage = self.new_stage(node.inputs()[0].clone(), child_exchange_info);
execution_plan_node.stage_id = Some(child_stage.id);

if let Some(parent) = parent_exec_node {
Expand Down
40 changes: 15 additions & 25 deletions src/frontend/test_runner/tests/testdata/tpch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1218,44 +1218,34 @@
before:
- create_tables
sql: |
select
s_suppkey,
s_name,
s_address,
s_phone,
total_revenue
from
supplier,
(
select
with revenue0 (supplier_no, total_revenue) as (
select
l_suppkey,
sum(l_extendedprice * (1 - l_discount)) as total_revenue
sum(l_extendedprice * (1 - l_discount))
from
lineitem
where
l_shipdate >= date '1993-01-01'
and l_shipdate < date '1993-01-01' + interval '3' month
group by
l_suppkey
) as revenue0 (supplier_no, total_revenue)
)
select
s_suppkey,
s_name,
s_address,
s_phone,
total_revenue
from
supplier,
revenue0
where
s_suppkey = supplier_no
and total_revenue = (
select
max(total_revenue) as max_revenue
max(total_revenue)
from
(
select
l_suppkey,
sum(l_extendedprice * (1 - l_discount)) as total_revenue
from
lineitem
where
l_shipdate >= date '1993-01-01'
and l_shipdate < date '1993-01-01' + interval '3' month
group by
l_suppkey
) as revenue0 (supplier_no, total_revenue)
revenue0
)
order by
s_suppkey;
Expand Down

0 comments on commit fb9174f

Please sign in to comment.