Skip to content

Commit

Permalink
feat(frontend): support stream scan plan
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Chi <[email protected]>
  • Loading branch information
skyzh committed Mar 15, 2022
1 parent e3b2814 commit c587e98
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 10 deletions.
4 changes: 2 additions & 2 deletions rust/frontend/src/optimizer/plan_node/logical_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use fixedbitset::FixedBitSet;
use risingwave_common::catalog::Schema;
use risingwave_common::error::Result;

use super::{ColPrunable, LogicalBase, PlanRef, ToBatch, ToStream};
use super::{ColPrunable, LogicalBase, PlanRef, StreamTableSource, ToBatch, ToStream};
use crate::catalog::{ColumnId, TableId};
use crate::optimizer::plan_node::BatchSeqScan;
use crate::optimizer::property::WithSchema;
Expand Down Expand Up @@ -106,6 +106,6 @@ impl ToBatch for LogicalScan {

impl ToStream for LogicalScan {
fn to_stream(&self) -> PlanRef {
todo!()
StreamTableSource::new(self.clone()).into()
}
}
4 changes: 2 additions & 2 deletions rust/frontend/src/optimizer/plan_node/stream_project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ pub struct StreamProject {
}

impl fmt::Display for StreamProject {
fn fmt(&self, _f: &mut fmt::Formatter) -> fmt::Result {
todo!()
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.logical.fmt_with_name(f, "StreamProject")
}
}

Expand Down
27 changes: 21 additions & 6 deletions rust/frontend/src/optimizer/plan_node/stream_table_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,42 @@ use std::fmt;

use risingwave_common::catalog::Schema;

use super::{StreamBase, ToStreamProst};
use crate::optimizer::property::WithSchema;
use super::{LogicalScan, StreamBase, ToStreamProst};
use crate::optimizer::property::{Distribution, WithSchema};

/// `StreamTableSource` continuously streams data from internal table or various kinds of
/// external sources.
#[derive(Debug, Clone)]
pub struct StreamTableSource {
pub base: StreamBase,
// TODO(catalog)
// TODO: replace this with actual table. Currently we place the logical scan node here only to
// pass plan tests.
logical: LogicalScan,
}

impl StreamTableSource {
pub fn new(logical: LogicalScan) -> Self {
let ctx = logical.base.ctx.clone();
// TODO: derive from input
let base = StreamBase {
dist: Distribution::any().clone(),
id: ctx.borrow_mut().get_id(),
ctx: ctx.clone(),
};
Self { logical, base }
}
}

impl WithSchema for StreamTableSource {
fn schema(&self) -> &Schema {
todo!()
self.logical.schema()
}
}

impl_plan_tree_node_for_leaf! {StreamTableSource}
impl fmt::Display for StreamTableSource {
fn fmt(&self, _f: &mut fmt::Formatter) -> fmt::Result {
todo!()
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "StreamTableSource {{ logical: {} }}", self.logical)
}
}

Expand Down
14 changes: 14 additions & 0 deletions rust/frontend/test_runner/tests/testdata/basic_query.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
batch_plan: |
BatchProject { exprs: [$0, $1, $2], expr_alias: [Some("_row_id"), Some("v1"), Some("v2")] }
BatchScan { table: "t", columns: ["_row_id", "v1", "v2"] }
stream_plan: |
StreamProject { exprs: [$0, $1, $2], expr_alias: [Some("_row_id"), Some("v1"), Some("v2")] }
StreamTableSource { logical: LogicalScan { table: "t", columns: ["_row_id", "v1", "v2"] } }
- sql: |
create table t (v1 bigint, v2 double precision);
select t2.* from t;
Expand All @@ -33,6 +36,10 @@
BatchProject { exprs: [$0], expr_alias: [Some("_row_id")] }
BatchFilter { predicate: Condition { conjunctions: [Or(And(And(And(GreaterThan(1:Int32, 2:Int32), Equal(1:Int32, 1:Int32)), LessThan(3:Int32, 1:Int32)), NotEqual(4:Int32, 1:Int32)), And(And(Equal(1:Int32, 1:Int32), GreaterThanOrEqual(2:Int32, 1:Int32)), LessThanOrEqual(1:Int32, 2:Int32)))] } }
BatchScan { table: "t", columns: ["_row_id"] }
stream_plan: |
StreamProject { exprs: [$0], expr_alias: [Some("_row_id")] }
StreamFilter { predicate: Condition { conjunctions: [Or(And(And(And(GreaterThan(1:Int32, 2:Int32), Equal(1:Int32, 1:Int32)), LessThan(3:Int32, 1:Int32)), NotEqual(4:Int32, 1:Int32)), And(And(Equal(1:Int32, 1:Int32), GreaterThanOrEqual(2:Int32, 1:Int32)), LessThanOrEqual(1:Int32, 2:Int32)))] } }
StreamTableSource { logical: LogicalScan { table: "t", columns: ["_row_id"] } }
- sql: |
create table t (v1 int);
select * from t where v1<1;
Expand All @@ -44,6 +51,10 @@
BatchProject { exprs: [$0, $1], expr_alias: [Some("_row_id"), Some("v1")] }
BatchFilter { predicate: Condition { conjunctions: [LessThan($1, 1:Int32)] } }
BatchScan { table: "t", columns: ["_row_id", "v1"] }
stream_plan: |
StreamProject { exprs: [$0, $1], expr_alias: [Some("_row_id"), Some("v1")] }
StreamFilter { predicate: Condition { conjunctions: [LessThan($1, 1:Int32)] } }
StreamTableSource { logical: LogicalScan { table: "t", columns: ["_row_id", "v1"] } }
- sql: |
create table t (v1 int, v2 int);
insert into t values (22, 33), (44, 55);
Expand Down Expand Up @@ -72,6 +83,9 @@
batch_plan: |
BatchProject { exprs: [$0], expr_alias: [Some("v1")] }
BatchScan { table: "t", columns: ["v1"] }
stream_plan: |
StreamProject { exprs: [$0], expr_alias: [Some("v1")] }
StreamTableSource { logical: LogicalScan { table: "t", columns: ["v1"] } }
- sql: |
values(cast(1 as bigint));
logical_plan: |
Expand Down

0 comments on commit c587e98

Please sign in to comment.