-
Notifications
You must be signed in to change notification settings - Fork 600
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(scheduler): exchange node rewrite in serialization #525
Conversation
e0d0c0c
to
4749701
Compare
4749701
to
4743a76
Compare
4743a76
to
a19d86b
Compare
Codecov Report
@@ Coverage Diff @@
## main #525 +/- ##
============================================
+ Coverage 72.20% 72.36% +0.16%
Complexity 2766 2766
============================================
Files 931 936 +5
Lines 54671 55117 +446
Branches 1787 1787
============================================
+ Hits 39473 39884 +411
- Misses 14308 14343 +35
Partials 890 890
Flags with carried forward coverage won't be shown. Click here to find out more.
📣 Codecov can now indicate which changes are the most critical in Pull Requests. Learn more |
a19d86b
to
7dd54f8
Compare
@@ -18,6 +23,24 @@ static ANY_DISTRIBUTION: Distribution = Distribution::Any; | |||
|
|||
#[allow(dead_code)] | |||
impl Distribution { | |||
pub fn to_prost(&self, output_count: u32) -> ExchangeInfo { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here. cc @st1page . I didn't get how Distribution::AnyShard is map to ExchangeInfo. Let's talk offline.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the Distribution::AnyShard
will not and should not appear here. it is just used in optimizer to "require" to optimize the plan with the sub tree. I think here you can just panic with Distribution::AnyShard
and Distribution::Any
7dd54f8
to
d1ab2b8
Compare
93d7fd1
to
e785223
Compare
NodeBody::RowSeqScan(RowSeqScanNode { | ||
table_desc: Some(CellBasedTableDesc { | ||
table_id: self.logical.table_id.table_id, | ||
pk: vec![], | ||
}), | ||
..Default::default() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
e785223
to
27a28c0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally LGTM, except these.
If I understand correctly, you will abstract a Scheduler later?
Distribution::Single => DistributionMode::Single, | ||
Distribution::Broadcast => DistributionMode::Broadcast, | ||
Distribution::HashShard(_keys) => DistributionMode::Hash, | ||
_ => DistributionMode::Hash, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As @st1page suggested, I think it's better to panic!
here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can not simply panic, currently our BatchSeqScan
use Distribution::any().
let base = BatchBase {
id: ctx.borrow_mut().get_id(),
order: Order::any().clone(),
dist: Distribution::any().clone(),
ctx: ctx.clone(),
};
Fix this later.
host: host.clone(), | ||
sink_id, | ||
}; | ||
exchange_node.sources.push(exchange_source); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am confused by the ExchangeSource
and TaskSinkId
. What does it mean, exchange source or target?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes we should add some comments on ExchangeSource and TaskSinkId in later pr.
Yes... Needs to bridge between handler and the real Scheduler. |
@@ -13,6 +20,24 @@ pub enum Distribution { | |||
static ANY_DISTRIBUTION: Distribution = Distribution::Any; | |||
#[allow(dead_code)] | |||
impl Distribution { | |||
pub fn to_prost(&self, output_count: u32) -> ExchangeInfo { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does Distribution "to_prost" become ExchangeInfo? It seems weird.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do not have other concepts maps to Exchange Info.
host: host.clone(), | ||
sink_id, | ||
}; | ||
exchange_node.sources.push(exchange_source); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes we should add some comments on ExchangeSource and TaskSinkId in later pr.
What's changed and what's your intention?
Main idea: add exchange node serialization. Exchange node is special, simply to_batch_prost can not finish the serialization (do not know any about exchange source), it should be done in scheduler. This PR adds that and check the serialization results after rewrite.
Code change:
rewrite_exchange
method when serialize a stage. The to_batch_prost() of BatchExchange simply do nothing because it do not know any WorkerNode info. It will be filled byrewrite_exchange
. After this PR, the serialization of one stage is OK, it will be used by schedule when distribute task.Next plan will be: provide handler a interface on how to use scheduler (accept a PlanRef and return a QueryResultLocation).
Checklist
Refer to a related PR or issue link (optional)