Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: expand operator #3563

Merged
merged 13 commits into from
Jul 5, 2022
8 changes: 8 additions & 0 deletions proto/batch_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,13 @@ message HashAggNode {
repeated expr.AggCall agg_calls = 2;
}

message ExpandNode {
message Keys {
repeated uint32 keys = 1;
}
repeated Keys expanded_keys = 1;
xiangjinwu marked this conversation as resolved.
Show resolved Hide resolved
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about make it more general as Exprs? So that we can reuse it in HopWindowExecutor

Copy link
Contributor Author

@likg227 likg227 Jun 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean make Keys more general?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, IIUC it's just InputRef, right?

Copy link
Contributor Author

@likg227 likg227 Jun 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's indices of columns actually......

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are quite similar to me 😂

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes InputRef is essentially the same as uint32 here. But I do not see why we need to generalize from InputRef to arbitrary Expr, or how this is related to HopWindowExecutor .

As for uint32 vs InputRef, I prefer to avoid InputRef: it is lacking the return_type part to be treated as a genuine Expr, and the caller (ExecutorBuilders including hop) actually just takes column_idx out of it rather than building it into an expression and then evaluate.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The expand operator are following expressions:

[
  [InputRef(1), InputRef(2)],
  [InputRef(3), InputRef(4)]
]

If we change HopWindowExecutor to ExpandExecutor, they are just following expressions:

[
  [window start expr in interval 1, window end expr in interval 1, .... original exprs],
  [window start expr in interval 2, window end expr in interval 2, .... original exprs],
 [window start expr in interval 3, window end expr in interval 3, .... original exprs],
           .
           .
]

And these can be computed by frontend, which can be reused by both streaming and batch executors. As with data type, I don't see it as a blocking issue for us.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’ve got it and +1 for this. maybe we can have a try after this PR is merged and well tested with distinct agg.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an interesting generalization. We can give it a try, but probably after we have a working e2e distinct agg as tianshuo suggested above.

Actually, this generalized node is just a multi-row expr evaluation. If we want to, it can also cover use cases of project (single row of exprs) and values (input is fixed to 1-row-0-col). Just not sure if this generalization is too much.

Also to my understanding, it would be

[
  [LiteralNull, InputRef(1), InputRef(2), LiteralNull, LiteralNull],
  [LiteralNull, LiteralNull, LiteralNull, InputRef(3), InputRef(4)]
]

rather than the example above.

I also agree that data type is not a big issue. I was just listing the differences between our InputRef in rust vs InputRef in proto.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm ok to try it after distinct agg has been resolved.


message SortAggNode {
repeated expr.ExprNode group_keys = 1;
repeated expr.AggCall agg_calls = 2;
Expand Down Expand Up @@ -210,6 +217,7 @@ message PlanNode {
HopWindowNode hop_window = 25;
TableFunctionNode table_function = 26;
SysRowSeqScanNode sys_row_seq_scan = 27;
ExpandNode expand = 28;
}
string identity = 24;
}
Expand Down
8 changes: 8 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,13 @@ message LookupUnionNode {
repeated uint32 order = 1;
}

message ExpandNode {
message Keys {
repeated uint32 keys = 1;
}
repeated Keys expanded_keys = 1;
}

message StreamNode {
oneof node_body {
SourceNode source = 100;
Expand All @@ -243,6 +250,7 @@ message StreamNode {
LookupUnionNode lookup_union = 117;
UnionNode union = 118;
DeltaIndexJoinNode delta_index_join = 119;
ExpandNode expand = 120;
}
// The id for the operator.
uint64 operator_id = 1;
Expand Down
202 changes: 202 additions & 0 deletions src/batch/src/executor/expand.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
// Copyright 2022 Singularity Data
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use futures_async_stream::try_stream;
use itertools::Itertools;
use risingwave_common::array::column::Column;
use risingwave_common::array::{DataChunk, PrimitiveArray, Vis};
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::error::ErrorCode::InternalError;
use risingwave_common::error::{Result, RwError};
use risingwave_common::types::DataType;
use risingwave_common::util::chunk_coalesce::{DataChunkBuilder, SlicedDataChunk};
use risingwave_pb::batch_plan::plan_node::NodeBody;

use super::{BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder};
use crate::task::BatchTaskContext;

pub struct ExpandExecutor {
expanded_keys: Vec<Vec<usize>>,
child: BoxedExecutor,
schema: Schema,
identity: String,
}

impl Executor for ExpandExecutor {
fn schema(&self) -> &Schema {
&self.schema
}

fn identity(&self) -> &str {
&self.identity
}

fn execute(self: Box<Self>) -> BoxedDataChunkStream {
self.do_execute()
}
}

impl ExpandExecutor {
#[try_stream(boxed, ok = DataChunk, error = RwError)]
async fn do_execute(self: Box<Self>) {
let mut data_chunk_builder = DataChunkBuilder::with_default_size(self.schema.data_types());

#[for_await]
for data_chunk in self.child.execute() {
// TODO: handle dummy chunk.
let data_chunk: DataChunk = data_chunk?.compact()?;
let cardinality = data_chunk.cardinality();
let (columns, vis) = data_chunk.into_parts();
assert_eq!(vis, Vis::Compact(cardinality));

let null_columns: Vec<Column> = columns
.iter()
.map(|column| {
let array = column.array_ref();
let mut builder = array.create_builder(cardinality)?;
(0..cardinality).try_for_each(|_i| builder.append_null())?;
Ok::<Column, RwError>(Column::new(Arc::new(builder.finish()?)))
xiangjinwu marked this conversation as resolved.
Show resolved Hide resolved
})
.try_collect()?;

for (i, keys) in self.expanded_keys.iter().enumerate() {
let mut new_columns = null_columns.clone();
for key in keys {
new_columns[*key] = columns[*key].clone();
}
let flags = Column::from(PrimitiveArray::<i64>::from_slice(&vec![
Some(i as i64);
cardinality
])?);
new_columns.push(flags);
let new_data_chunk = DataChunk::new(new_columns, vis.clone());
let mut sliced_data_chunk = SlicedDataChunk::new_checked(new_data_chunk)?;
loop {
let (left_data, output) = data_chunk_builder.append_chunk(sliced_data_chunk)?;
match (left_data, output) {
(Some(left_data), Some(output)) => {
sliced_data_chunk = left_data;
yield output;
}
(None, Some(output)) => {
yield output;
break;
}
(None, None) => {
break;
}
_ => {
return Err(
InternalError("Data chunk builder error".to_string()).into()
);
}
}
}
}
}
if let Some(chunk) = data_chunk_builder.consume_all()? {
yield chunk;
}
}
}

#[async_trait::async_trait]
impl BoxedExecutorBuilder for ExpandExecutor {
async fn new_boxed_executor<C: BatchTaskContext>(
source: &ExecutorBuilder<C>,
mut inputs: Vec<BoxedExecutor>,
) -> Result<BoxedExecutor> {
ensure!(inputs.len() == 1);
let expand_node = try_match_expand!(
source.plan_node().get_node_body().unwrap(),
NodeBody::Expand
)?;

let expanded_keys = expand_node
.expanded_keys
.iter()
.map(|keys| keys.keys.iter().map(|key| *key as usize).collect_vec())
.collect_vec();

let child = inputs.remove(0);

let mut schema = child.schema().clone();
schema
.fields
.push(Field::with_name(DataType::Int64, "flag"));

Ok(Box::new(Self {
expanded_keys,
child,
schema,
identity: "ExpandExecutor".to_string(),
}))
}
}

#[cfg(test)]
mod tests {
use futures::StreamExt;
use risingwave_common::array::{DataChunk, DataChunkTestExt};
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::types::DataType;

use super::ExpandExecutor;
use crate::executor::test_utils::MockExecutor;
use crate::executor::Executor;

#[tokio::test]
async fn test_expand_executor() {
let mock_schema = Schema {
fields: vec![
Field::unnamed(DataType::Int32),
Field::unnamed(DataType::Int32),
Field::unnamed(DataType::Int32),
],
};
let expand_schema = Schema {
fields: vec![
Field::unnamed(DataType::Int32),
Field::unnamed(DataType::Int32),
Field::unnamed(DataType::Int32),
Field::unnamed(DataType::Int64),
],
};
let mut mock_executor = MockExecutor::new(mock_schema);
mock_executor.add(DataChunk::from_pretty(
"i i i
1 2 3
2 3 4",
));
let expanded_keys = vec![vec![0, 1], vec![1, 2]];
let expand_executor = Box::new(ExpandExecutor {
expanded_keys,
child: Box::new(mock_executor),
schema: expand_schema,
identity: "ExpandExecutor".to_string(),
});
let mut stream = expand_executor.execute();
let res = stream.next().await.unwrap().unwrap();
let expected_chunk = DataChunk::from_pretty(
"i i i I
1 2 . 0
2 3 . 0
. 2 3 1
. 3 4 1",
);
assert_eq!(res, expected_chunk);
}
}
3 changes: 3 additions & 0 deletions src/batch/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

mod delete;
mod expand;
mod filter;
mod generic_exchange;
mod hash_agg;
Expand All @@ -36,6 +37,7 @@ mod values;

use async_recursion::async_recursion;
pub use delete::*;
pub use expand::*;
pub use filter::*;
use futures::stream::BoxStream;
pub use generic_exchange::*;
Expand Down Expand Up @@ -187,6 +189,7 @@ impl<'a, C: BatchTaskContext> ExecutorBuilder<'a, C> {
NodeBody::TableFunction => TableFunctionExecutorBuilder,
NodeBody::HopWindow => HopWindowExecutor,
NodeBody::SysRowSeqScan => SysRowSeqScanExecutorBuilder,
NodeBody::Expand => ExpandExecutor,
}
.await?;
let input_desc = real_executor.identity().to_string();
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/array/data_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub struct DataChunk {

/// `Vis` is a visibility bitmap of rows. When all rows are visible, it is considered compact and
/// is represented by a single cardinality number rather than that many of ones.
#[derive(Clone, PartialEq)]
#[derive(Clone, PartialEq, Debug)]
pub enum Vis {
Bitmap(Bitmap),
Compact(usize), // equivalent to all ones of this size
Expand Down
103 changes: 103 additions & 0 deletions src/frontend/src/optimizer/plan_node/batch_expand.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright 2022 Singularity Data
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt;

use itertools::Itertools;
use risingwave_common::error::Result;
use risingwave_pb::batch_plan::expand_node::Keys;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::ExpandNode;

use crate::optimizer::plan_node::{
LogicalExpand, PlanBase, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch, ToLocalBatch,
};
use crate::optimizer::property::{Distribution, Order};
use crate::optimizer::PlanRef;

#[derive(Debug, Clone)]
pub struct BatchExpand {
pub base: PlanBase,
logical: LogicalExpand,
}

impl BatchExpand {
pub fn new(logical: LogicalExpand) -> Self {
let ctx = logical.base.ctx.clone();
let base = PlanBase::new_batch(
ctx,
logical.schema().clone(),
Distribution::SomeShard,
Order::any(),
Comment on lines +41 to +42
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the input has HashShard(hash_keys), and all subset cover hash_keys, does the output provide HashShard(hash_keys) as well? For example:

input: HashShard(4,2)
column_subsets: [[1,2,4], [2,3,4], [2,4]]

SomeShard is also valid but may result in unnecessary exchange. We can also leave this as a later improvement.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure for the rows generated with the Null value... maybe we can give it SomeShard first and ensure correctness. After having test, we can give it a better(more accurate) distribution.

);
BatchExpand { base, logical }
}

pub fn expanded_keys(&self) -> &Vec<Vec<usize>> {
self.logical.expanded_keys()
}
}

impl fmt::Display for BatchExpand {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"BatchExpand {{ expanded_keys: {:#?} }}",
self.expanded_keys()
)
}
}

impl PlanTreeNodeUnary for BatchExpand {
fn input(&self) -> PlanRef {
self.logical.input()
}

fn clone_with_input(&self, input: PlanRef) -> Self {
Self::new(self.logical.clone_with_input(input))
}
}

impl_plan_tree_node_for_unary! { BatchExpand }

impl ToDistributedBatch for BatchExpand {
fn to_distributed(&self) -> Result<PlanRef> {
let new_input = self.input().to_distributed()?;
Ok(self.clone_with_input(new_input).into())
}
}

impl ToBatchProst for BatchExpand {
fn to_batch_prost_body(&self) -> NodeBody {
NodeBody::Expand(ExpandNode {
expanded_keys: self
.expanded_keys()
.iter()
.map(|keys| keys_to_protobuf(keys))
.collect_vec(),
})
}
}

fn keys_to_protobuf(keys: &[usize]) -> Keys {
let keys = keys.iter().map(|key| *key as u32).collect_vec();
Keys { keys }
}

impl ToLocalBatch for BatchExpand {
fn to_local(&self) -> Result<PlanRef> {
let new_input = self.input().to_local()?;
Ok(self.clone_with_input(new_input).into())
}
}
Loading