-
Notifications
You must be signed in to change notification settings - Fork 600
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: expand operator #3563
feat: expand operator #3563
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
license-eye has totally checked 890 files.
Valid | Invalid | Ignored | Fixed |
---|---|---|---|
887 | 2 | 1 | 0 |
Click to see the invalid file list
- src/frontend/src/optimizer/plan_node/stream_expand.rs
- src/stream/src/executor/expand.rs
b62588b
to
325b38e
Compare
Codecov Report
@@ Coverage Diff @@
## main #3563 +/- ##
==========================================
- Coverage 74.39% 74.34% -0.05%
==========================================
Files 781 787 +6
Lines 111016 111448 +432
==========================================
+ Hits 82593 82859 +266
- Misses 28423 28589 +166
Flags with carried forward coverage won't be shown. Click here to find out more.
📣 Codecov can now indicate which changes are the most critical in Pull Requests. Learn more |
proto/batch_plan.proto
Outdated
message Keys { | ||
repeated uint32 keys = 1; | ||
} | ||
repeated Keys expanded_keys = 1; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about make it more general as Exprs
? So that we can reuse it in HopWindowExecutor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean make Keys
more general?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, IIUC it's just InputRef
, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's indices of columns actually......
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are quite similar to me 😂
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes InputRef
is essentially the same as uint32
here. But I do not see why we need to generalize from InputRef
to arbitrary Expr
, or how this is related to HopWindowExecutor
.
As for uint32
vs InputRef
, I prefer to avoid InputRef
: it is lacking the return_type
part to be treated as a genuine Expr
, and the caller (ExecutorBuilders including hop) actually just takes column_idx
out of it rather than building it into an expression and then evaluate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The expand operator are following expressions:
[
[InputRef(1), InputRef(2)],
[InputRef(3), InputRef(4)]
]
If we change HopWindowExecutor to ExpandExecutor, they are just following expressions:
[
[window start expr in interval 1, window end expr in interval 1, .... original exprs],
[window start expr in interval 2, window end expr in interval 2, .... original exprs],
[window start expr in interval 3, window end expr in interval 3, .... original exprs],
.
.
]
And these can be computed by frontend, which can be reused by both streaming and batch executors. As with data type, I don't see it as a blocking issue for us.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I’ve got it and +1 for this. maybe we can have a try after this PR is merged and well tested with distinct agg.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an interesting generalization. We can give it a try, but probably after we have a working e2e distinct agg as tianshuo suggested above.
Actually, this generalized node is just a multi-row expr evaluation. If we want to, it can also cover use cases of project
(single row of exprs) and values
(input is fixed to 1-row-0-col). Just not sure if this generalization is too much.
Also to my understanding, it would be
[
[LiteralNull, InputRef(1), InputRef(2), LiteralNull, LiteralNull],
[LiteralNull, LiteralNull, LiteralNull, InputRef(3), InputRef(4)]
]
rather than the example above.
I also agree that data type is not a big issue. I was just listing the differences between our InputRef in rust vs InputRef in proto.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm ok to try it after distinct agg has been resolved.
Distribution::SomeShard, | ||
Order::any(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the input has HashShard(hash_keys)
, and all subset
cover hash_keys
, does the output provide HashShard(hash_keys)
as well? For example:
input: HashShard(4,2)
column_subsets: [[1,2,4], [2,3,4], [2,4]]
SomeShard
is also valid but may result in unnecessary exchange. We can also leave this as a later improvement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure for the rows generated with the Null value... maybe we can give it SomeShard
first and ensure correctness. After having test, we can give it a better(more accurate) distribution.
src/stream/src/executor/expand.rs
Outdated
match msg { | ||
Message::Chunk(chunk) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: avoid deep indentation with early return:
let chunk = match msg {
Message::Chunk(chunk) => chunk,
m => {yield m; continue}
};
Note:
let_else
is syntactically the same but seems problematic: fix(streaming): fix possible ub in hop executor #3270
/// [`LogicalExpand`] expand one row multiple times according to `column_subsets`. | ||
/// | ||
/// It can be used to implement distinct aggregation and group set. | ||
#[derive(Debug, Clone)] | ||
pub struct LogicalExpand { | ||
pub base: PlanBase, | ||
column_subsets: Vec<Vec<usize>>, | ||
input: PlanRef, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would you please add more comments? on the struct and the field column_subsets
.
proto/batch_plan.proto
Outdated
message Keys { | ||
repeated uint32 keys = 1; | ||
} | ||
repeated Keys expanded_keys = 1; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I’ve got it and +1 for this. maybe we can have a try after this PR is merged and well tested with distinct agg.
proto/batch_plan.proto
Outdated
@@ -109,6 +109,13 @@ message HashAggNode { | |||
repeated expr.AggCall agg_calls = 2; | |||
} | |||
|
|||
message ExpandNode { | |||
message Subset { | |||
repeated uint32 subset = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about calling this keys
? This way repeated
fields are always named as plural form of its element.
data_chunk: DataChunk, | ||
) -> ArrayResult<Vec<DataChunk>> { | ||
let mut sliced_data_chunk = SlicedDataChunk::new_checked(data_chunk)?; | ||
let mut res = vec![]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It defeats the purpose of SlicedDataChunk
and yield
. We should yield as soon as one output chunk is available, rather than collecting all of them in a vec.
It is possible to wrap the loop into a utility function like this, but this should also yield output one by one. I am okay with keeping the previous impl and doing the refactor later.
src/common/src/array/column.rs
Outdated
new_columns.push(flags); | ||
Ok::<Vec<Column>, ArrayError>(new_columns) | ||
}) | ||
.try_collect() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly, it is better to yield the output of each subset rather than processing all subsets at once.
However, it would be unnecessarily complex to yield inside this function. We can keep this as a normal function that only input/output for one subset. And the shared null_column also built once and passed into each call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Let's also open issues for things left for future improvement:
- reuse with hop
- accurate dist
- all-null column builder
for msg in self.input.execute() { | ||
match msg? { | ||
Message::Chunk(chunk) => { | ||
// TODO: handle dummy chunk. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it hard? If not, recommend fixing this ASAP.
* add LogicalExpand. * add unit test for LogicalExpand. * add BatchExpand. * add batch expand executor * add stream expand executor. * add StreamExpand. * clean up code. * improve quality of code. * improve col_prune. * improve quality of code. Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
I hereby agree to the terms of the Singularity Data, Inc. Contributor License Agreement.
What's changed and what's your intention?
PLEASE DO NOT LEAVE THIS EMPTY !!!
Please explain IN DETAIL what the changes are in this PR and why they are needed:
Checklist
./risedev check
(or alias,./risedev c
)Refer to a related PR or issue link (optional)