Skip to content

Commit

Permalink
feat: support explain sink (#4430)
Browse files Browse the repository at this point in the history
* support explain sink

Signed-off-by: tabVersion <[email protected]>

* format

Signed-off-by: tabVersion <[email protected]>

* add ci for explain sink

Signed-off-by: tabVersion <[email protected]>

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
tabVersion and mergify[bot] authored Aug 4, 2022
1 parent 5fe991c commit 667ab5d
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 32 deletions.
3 changes: 3 additions & 0 deletions e2e_test/batch/explain.slt
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,8 @@ create table t(v int);
statement ok
explain create index i on t(v);

statement ok
explain create sink sink_t from t with ( sink_type = 'kafka' )

statement ok
drop table t;
3 changes: 3 additions & 0 deletions e2e_test/ddl/table.slt
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ create materialized view ddl_mv as select v2 from ddl_t;
statement ok
explain select v2 from ddl_t;

statement ok
explain create sink sink_t from ddl_t with ( sink_type = 'kafka' );

# Create a mview with duplicated name.
statement error
create materialized view ddl_mv as select v2 from ddl_t;
Expand Down
67 changes: 35 additions & 32 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::collections::HashMap;
use std::rc::Rc;

use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::catalog::TableDesc;
use risingwave_common::error::Result;
use risingwave_pb::catalog::Sink as ProstSink;
use risingwave_pb::user::grant_privilege::{Action, Object};
Expand All @@ -29,7 +28,7 @@ use crate::catalog::{DatabaseId, SchemaId};
use crate::handler::privilege::ObjectCheckItem;
use crate::optimizer::plan_node::{LogicalScan, StreamSink, StreamTableScan};
use crate::optimizer::PlanRef;
use crate::session::{OptimizerContext, OptimizerContextRef};
use crate::session::{OptimizerContext, OptimizerContextRef, SessionImpl};
use crate::stream_fragmenter::StreamFragmenter;

pub(crate) fn make_prost_sink(
Expand All @@ -52,30 +51,12 @@ pub(crate) fn make_prost_sink(
})
}

fn gen_sink_plan(
pub fn gen_sink_plan(
session: &SessionImpl,
context: OptimizerContextRef,
associated_table_name: String,
associated_table_desc: TableDesc,
properties: HashMap<String, String>,
) -> Result<PlanRef> {
let scan_node = StreamTableScan::new(LogicalScan::create(
associated_table_name,
false,
Rc::new(associated_table_desc),
vec![],
context,
))
.into();

Ok(StreamSink::new(scan_node, properties).into())
}

pub async fn handle_create_sink(
context: OptimizerContext,
stmt: CreateSinkStatement,
) -> Result<PgResponse> {
) -> Result<(PlanRef, ProstSink)> {
let with_properties = handle_with_properties("create_sink", stmt.with_properties.0)?;
let session = context.session_ctx.clone();

let (schema_name, sink_name) = Binder::resolve_table_name(stmt.sink_name.clone())?;

Expand All @@ -85,7 +66,7 @@ pub async fn handle_create_sink(
let schema = catalog_reader.get_schema_by_name(session.database(), &schema_name)?;

check_privileges(
&session,
session,
&vec![ObjectCheckItem::new(
schema.owner(),
Action::Create,
Expand Down Expand Up @@ -123,16 +104,38 @@ pub async fn handle_create_sink(
session.user_id(),
)?;

let graph = {
let plan = gen_sink_plan(
context.into(),
associated_table_name,
associated_table_desc,
with_properties.clone(),
)?;
let scan_node = StreamTableScan::new(LogicalScan::create(
associated_table_name,
false,
Rc::new(associated_table_desc),
vec![],
context,
))
.into();

let plan: PlanRef = StreamSink::new(scan_node, with_properties).into();

let ctx = plan.ctx();
let explain_trace = ctx.is_explain_trace();
if explain_trace {
ctx.trace("Create Sink:".to_string());
ctx.trace(plan.explain_to_string().unwrap());
}

Ok((plan, sink))
}

pub async fn handle_create_sink(
context: OptimizerContext,
stmt: CreateSinkStatement,
) -> Result<PgResponse> {
let session = context.session_ctx.clone();

let (sink, graph) = {
let (plan, sink) = gen_sink_plan(&session, context.into(), stmt)?;
let stream_plan = plan.to_stream_prost();

StreamFragmenter::build_graph(stream_plan)
(sink, StreamFragmenter::build_graph(stream_plan))
};

let catalog_writer = session.env().catalog_writer();
Expand Down
3 changes: 3 additions & 0 deletions src/frontend/src/handler/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use risingwave_sqlparser::ast::Statement;

use super::create_index::gen_create_index_plan;
use super::create_mv::gen_create_mv_plan;
use super::create_sink::gen_sink_plan;
use super::create_table::gen_create_table_plan;
use crate::binder::Binder;
use crate::handler::util::force_local_mode;
Expand All @@ -49,6 +50,8 @@ pub(super) fn handle_explain(
..
} => gen_create_mv_plan(&session, planner.ctx(), query, name)?.0,

Statement::CreateSink { stmt } => gen_sink_plan(&session, planner.ctx(), stmt)?.0,

Statement::CreateTable { name, columns, .. } => {
gen_create_table_plan(&session, planner.ctx(), name, columns)?.0
}
Expand Down

0 comments on commit 667ab5d

Please sign in to comment.