Skip to content

Commit

Permalink
fix(planner): Tumble can accept CTE as input (risingwavelabs#4450)
Browse files Browse the repository at this point in the history
fix tumble window

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
jon-chuang and mergify[bot] authored Aug 5, 2022
1 parent a232f2e commit ecdadce
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 12 deletions.
36 changes: 29 additions & 7 deletions src/frontend/src/planner/relation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,17 +112,39 @@ impl Planner {
) -> Result<PlanRef> {
let mut args = args.into_iter();

let cols = match &input {
Relation::Source(s) => s.catalog.columns.to_vec(),
Relation::BaseTable(t) => t.table_catalog.columns().to_vec(),
_ => return Err(ErrorCode::BindError("the ".to_string()).into()),
let col_data_types: Vec<_> = match &input {
Relation::Source(s) => s
.catalog
.columns
.iter()
.map(|col| col.data_type().clone())
.collect(),
Relation::BaseTable(t) => t
.table_catalog
.columns
.iter()
.map(|col| col.data_type().clone())
.collect(),
Relation::Subquery(q) => q
.query
.schema()
.fields
.iter()
.map(|f| f.data_type())
.collect(),
r => {
return Err(ErrorCode::BindError(format!(
"Invalid input relation to tumble: {r:?}"
))
.into())
}
};

match (args.next(), args.next()) {
(Some(window_size @ ExprImpl::Literal(_)), None) => {
let mut exprs = Vec::with_capacity(cols.len() + 2);
for (idx, col) in cols.iter().enumerate() {
exprs.push(InputRef::new(idx, col.data_type().clone()).into());
let mut exprs = Vec::with_capacity(col_data_types.len() + 2);
for (idx, col_dt) in col_data_types.iter().enumerate() {
exprs.push(InputRef::new(idx, col_dt.clone()).into());
}
let window_start: ExprImpl = FunctionCall::new(
ExprType::TumbleStart,
Expand Down
23 changes: 18 additions & 5 deletions src/frontend/test_runner/tests/testdata/time_window.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -134,19 +134,23 @@
- sql: |
create table t1 (id int, v1 int, created_at date);
with t2 as (select * from t1 where v1 >= 10)
select * from tumble(t1, created_at, interval '3' day);
select * from tumble(t2, created_at, interval '3' day);
logical_plan: |
LogicalProject { exprs: [t1.id, t1.v1, t1.created_at, TumbleStart(t1.created_at, '3 days 00:00:00':Interval), (TumbleStart(t1.created_at, '3 days 00:00:00':Interval) + '3 days 00:00:00':Interval)] }
LogicalProject { exprs: [t1._row_id, t1.id, t1.v1, t1.created_at, TumbleStart(t1.created_at, '3 days 00:00:00':Interval), (TumbleStart(t1.created_at, '3 days 00:00:00':Interval) + '3 days 00:00:00':Interval)] }
LogicalScan { table: t1, columns: [t1._row_id, t1.id, t1.v1, t1.created_at] }
LogicalProject { exprs: [t1.id, t1.v1, t1.created_at, TumbleStart(t1.created_at, '3 days 00:00:00':Interval), (TumbleStart(t1.created_at, '3 days 00:00:00':Interval) + '3 days 00:00:00':Interval)] }
LogicalProject { exprs: [t1.id, t1.v1, t1.created_at] }
LogicalFilter { predicate: (t1.v1 >= 10:Int32) }
LogicalScan { table: t1, columns: [t1._row_id, t1.id, t1.v1, t1.created_at] }
batch_plan: |
BatchExchange { order: [], dist: Single }
BatchProject { exprs: [t1.id, t1.v1, t1.created_at, TumbleStart(t1.created_at, '3 days 00:00:00':Interval), (TumbleStart(t1.created_at, '3 days 00:00:00':Interval) + '3 days 00:00:00':Interval)] }
BatchScan { table: t1, columns: [t1.id, t1.v1, t1.created_at], distribution: SomeShard }
BatchFilter { predicate: (t1.v1 >= 10:Int32) }
BatchScan { table: t1, columns: [t1.id, t1.v1, t1.created_at], distribution: SomeShard }
stream_plan: |
StreamMaterialize { columns: [id, v1, created_at, window_start, window_end, t1._row_id(hidden)], pk_columns: [t1._row_id] }
StreamProject { exprs: [t1.id, t1.v1, t1.created_at, TumbleStart(t1.created_at, '3 days 00:00:00':Interval), (TumbleStart(t1.created_at, '3 days 00:00:00':Interval) + '3 days 00:00:00':Interval), t1._row_id] }
StreamTableScan { table: t1, columns: [t1.id, t1.v1, t1.created_at, t1._row_id], pk: [t1._row_id], distribution: HashShard(t1._row_id) }
StreamFilter { predicate: (t1.v1 >= 10:Int32) }
StreamTableScan { table: t1, columns: [t1.id, t1.v1, t1.created_at, t1._row_id], pk: [t1._row_id], distribution: HashShard(t1._row_id) }
- sql: |
create table t1 (id int, v1 int, created_at date);
with t2 as (select * from t1 where v1 >= 10)
Expand All @@ -167,3 +171,12 @@
StreamHopWindow { time_col: t1.created_at, slide: 1 day 00:00:00, size: 3 days 00:00:00, output: [t1.id, t1.v1, t1.created_at, window_start, window_end, t1._row_id] }
StreamFilter { predicate: (t1.v1 >= 10:Int32) }
StreamTableScan { table: t1, columns: [t1.id, t1.v1, t1.created_at, t1._row_id], pk: [t1._row_id], distribution: HashShard(t1._row_id) }
- sql: |
with t(ts) as (values ('2020-01-01 12:00:00'::timestamp)) select * from tumble(t, ts, interval '10' second) as z;
logical_plan: |
LogicalProject { exprs: [, TumbleStart(, '00:00:10':Interval), (TumbleStart(, '00:00:10':Interval) + '00:00:10':Interval)] }
LogicalProject { exprs: [, TumbleStart(, '00:00:10':Interval), (TumbleStart(, '00:00:10':Interval) + '00:00:10':Interval)] }
LogicalValues { rows: [['2020-01-01 12:00:00':Varchar::Timestamp]], schema: Schema { fields: [:Timestamp] } }
batch_plan: |
BatchProject { exprs: [, TumbleStart(, '00:00:10':Interval), (TumbleStart(, '00:00:10':Interval) + '00:00:10':Interval)] }
BatchValues { rows: [['2020-01-01 12:00:00':Varchar::Timestamp]] }

0 comments on commit ecdadce

Please sign in to comment.