Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(batch): use wrapper executor to impl time and i32 generate series #2524

Merged
merged 13 commits into from
May 17, 2022
36 changes: 36 additions & 0 deletions e2e_test/v2/batch/generate_series.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
query I
select generate_series from generate_series('2008-03-01 00:00:00'::TIMESTAMP,'2008-03-04 12:00:00'::TIMESTAMP, interval '12' hour)
----
2008-03-01 00:00:00
2008-03-01 12:00:00
2008-03-02 00:00:00
2008-03-02 12:00:00
2008-03-03 00:00:00
2008-03-03 12:00:00
2008-03-04 00:00:00
2008-03-04 12:00:00

query II
SELECT * FROM generate_series('2'::INT,'10'::INT,'2'::INT)
----
2
4
6
8
10

query III
SELECT * FROM generate_series('2'::INT + '2'::INT,'10'::INT,'2'::INT);
----
4
6
8
10

query IIII
SELECT generate_series FROM generate_series('2'::INT + '2'::INT,'10'::INT,'2'::INT);
----
4
6
8
10
17 changes: 5 additions & 12 deletions proto/batch_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -101,16 +101,10 @@ message HopWindowNode {
data.IntervalUnit window_size = 3;
}

message GenerateInt32SeriesNode {
int32 start = 1;
int32 stop = 2;
int32 step = 3;
}

message GenerateTimeSeriesNode {
string start = 1;
string stop = 2;
data.IntervalUnit step = 3;
message GenerateSeriesNode {
expr.ExprNode start = 1;
expr.ExprNode stop = 2;
expr.ExprNode step = 3;
}

// Task is a running instance of Stage.
Expand Down Expand Up @@ -164,9 +158,8 @@ message PlanNode {
HashJoinNode hash_join = 19;
MergeSortExchangeNode merge_sort_exchange = 21;
SortMergeJoinNode sort_merge_join = 22;
GenerateInt32SeriesNode generate_int32_series = 23;
HopWindowNode hop_window = 25;
GenerateTimeSeriesNode generate_time_series = 26;
GenerateSeriesNode generate_series = 26;
D2Lark marked this conversation as resolved.
Show resolved Hide resolved
}
string identity = 24;
}
Expand Down
13 changes: 6 additions & 7 deletions src/batch/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ use risingwave_pb::batch_plan::PlanNode;

use crate::executor2::{
BoxedExecutor2, BoxedExecutor2Builder, DeleteExecutor2, ExchangeExecutor2, FilterExecutor2,
GenerateSeriesI32Executor2, GenerateSeriesTimestampExecutor2, HashAggExecutor2Builder,
HashJoinExecutor2Builder, HopWindowExecutor2, InsertExecutor2, LimitExecutor2,
MergeSortExchangeExecutor2, NestedLoopJoinExecutor2, OrderByExecutor2, ProjectExecutor2,
RowSeqScanExecutor2Builder, SortAggExecutor2, SortMergeJoinExecutor2, StreamScanExecutor2,
TopNExecutor2, TraceExecutor2, ValuesExecutor2,
GenerateSeriesExecutor2Wrapper, HashAggExecutor2Builder, HashJoinExecutor2Builder,
HopWindowExecutor2, InsertExecutor2, LimitExecutor2, MergeSortExchangeExecutor2,
NestedLoopJoinExecutor2, OrderByExecutor2, ProjectExecutor2, RowSeqScanExecutor2Builder,
SortAggExecutor2, SortMergeJoinExecutor2, StreamScanExecutor2, TopNExecutor2, TraceExecutor2,
ValuesExecutor2,
};
use crate::task::{BatchEnvironment, TaskId};

Expand Down Expand Up @@ -114,8 +114,7 @@ impl<'a> ExecutorBuilder<'a> {
NodeBody::SortMergeJoin => SortMergeJoinExecutor2,
NodeBody::HashAgg => HashAggExecutor2Builder,
NodeBody::MergeSortExchange => MergeSortExchangeExecutor2,
NodeBody::GenerateInt32Series => GenerateSeriesI32Executor2,
NodeBody::GenerateTimeSeries => GenerateSeriesTimestampExecutor2,
NodeBody::GenerateSeries => GenerateSeriesExecutor2Wrapper,
NodeBody::HopWindow => HopWindowExecutor2,
}?;
let input_desc = real_executor.identity().to_string();
Expand Down
133 changes: 133 additions & 0 deletions src/batch/src/executor2/generate_i32_series.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Copyright 2022 Singularity Data
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use futures_async_stream::try_stream;
use risingwave_common::array::column::Column;
use risingwave_common::array::{ArrayBuilder, DataChunk, I32ArrayBuilder};
use risingwave_common::catalog::Schema;
use risingwave_common::error::RwError;
use risingwave_common::util::chunk_coalesce::DEFAULT_CHUNK_BUFFER_SIZE;

use crate::executor2::{BoxedDataChunkStream, Executor2};

pub struct GenerateSeriesI32Executor2 {
start: i32,
stop: i32,
step: i32,

schema: Schema,
identity: String,
}

impl GenerateSeriesI32Executor2 {
pub fn new(start: i32, stop: i32, step: i32, schema: Schema, identity: String) -> Self {
Self {
start,
stop,
step,
schema,
identity,
}
}
}

impl Executor2 for GenerateSeriesI32Executor2 {
fn schema(&self) -> &Schema {
&self.schema
}

fn identity(&self) -> &str {
&self.identity
}

fn execute(self: Box<Self>) -> BoxedDataChunkStream {
self.do_execute()
}
}

impl GenerateSeriesI32Executor2 {
#[try_stream(boxed, ok = DataChunk, error = RwError)]
async fn do_execute(self: Box<Self>) {
let Self {
start, stop, step, ..
} = *self;

let mut rest_rows = ((stop - start) / step + 1) as usize;
let mut cur = start;

// Simulate a do-while loop.
while rest_rows > 0 {
let chunk_size = rest_rows.min(DEFAULT_CHUNK_BUFFER_SIZE);
let mut builder = I32ArrayBuilder::new(chunk_size)?;

for _ in 0..chunk_size {
builder.append(Some(cur)).unwrap();
cur += self.step;
}

let arr = builder.finish()?;
let columns = vec![Column::new(Arc::new(arr.into()))];
let chunk: DataChunk = DataChunk::builder().columns(columns).build();

yield chunk;

rest_rows -= chunk_size;
}
}
}

#[cfg(test)]
mod tests {
use futures::StreamExt;
use risingwave_common::array::{Array, ArrayImpl};
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::try_match_expand;
use risingwave_common::types::DataType;

use super::*;

#[tokio::test]
async fn test_generate_series() {
generate_series_test_case(2, 4, 1).await;
generate_series_test_case(0, 9, 2).await;
generate_series_test_case(0, (DEFAULT_CHUNK_BUFFER_SIZE * 2 + 3) as i32, 1).await;
}

async fn generate_series_test_case(start: i32, stop: i32, step: i32) {
let executor = Box::new(GenerateSeriesI32Executor2 {
start,
stop,
step,
schema: Schema::new(vec![Field::unnamed(DataType::Int32)]),
identity: "GenerateSeriesI32Executor2".to_string(),
});
let mut remained_values = ((stop - start) / step + 1) as usize;
let mut stream = executor.execute();
while remained_values > 0 {
let chunk = stream.next().await.unwrap().unwrap();
let col = chunk.column_at(0);
let arr = try_match_expand!(col.array_ref(), ArrayImpl::Int32).unwrap();

if remained_values > DEFAULT_CHUNK_BUFFER_SIZE {
assert_eq!(arr.len(), DEFAULT_CHUNK_BUFFER_SIZE);
} else {
assert_eq!(arr.len(), remained_values);
}
remained_values -= arr.len();
}
assert!(stream.next().await.is_none());
}
}
Loading