Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add visit for merge into/update/replace/insert/delete/copy statement #13848

Merged
merged 44 commits into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
101f1e8
add rewrite rule
JackTan25 Nov 29, 2023
e6b209a
add tests
JackTan25 Nov 29, 2023
3d1d96a
fix test
JackTan25 Nov 29, 2023
006a068
Merge branch 'main' into fix_bug
JackTan25 Nov 29, 2023
e8c8760
add more exprs
JackTan25 Nov 29, 2023
fd91178
Merge branch 'fix_bug' of https://github.com/JackTan25/databend into …
JackTan25 Nov 29, 2023
499d1ce
Merge branch 'main' into fix_bug
JackTan25 Nov 29, 2023
ed3de21
add merge into visit comments
JackTan25 Nov 30, 2023
02ce375
Merge branch 'fix_bug' of https://github.com/JackTan25/databend into …
JackTan25 Nov 30, 2023
4bb134b
fix visit and source for merge,insert,delete,update,replace,copy
JackTan25 Nov 30, 2023
4605958
fix lint
JackTan25 Nov 30, 2023
38802fd
fix lint
JackTan25 Nov 30, 2023
b57799a
Merge branch 'main' into fix_bug
JackTan25 Nov 30, 2023
30a7da3
Merge branch 'main' into fix_bug
JackTan25 Nov 30, 2023
5791538
modify support
JackTan25 Nov 30, 2023
45f61d1
Merge branch 'fix_bug' of https://github.com/JackTan25/databend into …
JackTan25 Nov 30, 2023
814d799
modify
JackTan25 Nov 30, 2023
42966df
Merge branch 'main' of https://github.com/datafuselabs/databend into …
JackTan25 Nov 30, 2023
40650fd
add create table visit
JackTan25 Nov 30, 2023
38f36b6
Merge branch 'main' into fix_bug
JackTan25 Nov 30, 2023
6b24f51
fix test
JackTan25 Nov 30, 2023
391b426
Merge branch 'fix_bug' of https://github.com/JackTan25/databend into …
JackTan25 Nov 30, 2023
2a8e5db
fix test
JackTan25 Nov 30, 2023
b426243
Merge branch 'main' into fix_bug
JackTan25 Nov 30, 2023
3eea3fd
fix
JackTan25 Nov 30, 2023
8972f8a
Merge branch 'main' into fix_bug
JackTan25 Nov 30, 2023
78c0f6f
revert copy
JackTan25 Nov 30, 2023
775d2b4
revert copy
JackTan25 Nov 30, 2023
e7a1625
Merge branch 'fix_bug' of https://github.com/JackTan25/databend into …
JackTan25 Nov 30, 2023
4baa751
fix
JackTan25 Nov 30, 2023
235f1b5
revert test
JackTan25 Dec 1, 2023
a27a2b2
fix
JackTan25 Dec 1, 2023
96312e5
fix comments
JackTan25 Dec 1, 2023
c5077cf
fix
JackTan25 Dec 1, 2023
cb1233a
Merge branch 'main' into fix_bug
JackTan25 Dec 1, 2023
150007f
fix
JackTan25 Dec 1, 2023
a72d8ce
Merge branch 'fix_bug' of https://github.com/JackTan25/databend into …
JackTan25 Dec 1, 2023
7d4e5a8
fix lint
JackTan25 Dec 1, 2023
50dd7ce
fix lint
JackTan25 Dec 1, 2023
164d841
Merge branch 'main' into fix_bug
JackTan25 Dec 1, 2023
bb01af7
fix lint
JackTan25 Dec 1, 2023
8e5e024
Merge branch 'fix_bug' of https://github.com/JackTan25/databend into …
JackTan25 Dec 1, 2023
3bcc1f9
fix
JackTan25 Dec 1, 2023
c519374
Merge branch 'main' into fix_bug
JackTan25 Dec 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 72 additions & 8 deletions src/query/ast/src/visitors/visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,8 +378,16 @@ pub trait Visitor<'ast>: Sized {

fn visit_explain(&mut self, _kind: &'ast ExplainKind, _query: &'ast Statement) {}

fn visit_copy_into_table(&mut self, _copy: &'ast CopyIntoTableStmt) {}
fn visit_copy_into_location(&mut self, _copy: &'ast CopyIntoLocationStmt) {}
fn visit_copy_into_table(&mut self, copy: &'ast CopyIntoTableStmt) {
if let CopyIntoTableSource::Query(query) = &copy.src {
self.visit_query(query)
}
}
fn visit_copy_into_location(&mut self, copy: &'ast CopyIntoLocationStmt) {
if let CopyIntoLocationSource::Query(query) = &copy.src {
self.visit_query(query)
}
}

fn visit_call(&mut self, _call: &'ast CallStmt) {}

Expand Down Expand Up @@ -416,14 +424,66 @@ pub trait Visitor<'ast>: Sized {
fn visit_set_role(&mut self, _is_default: bool, _role_name: &'ast str) {}
fn visit_set_secondary_roles(&mut self, _option: &SecondaryRolesOption) {}

fn visit_insert(&mut self, _insert: &'ast InsertStmt) {}
fn visit_replace(&mut self, _replace: &'ast ReplaceStmt) {}
fn visit_merge_into(&mut self, _merge_into: &'ast MergeIntoStmt) {}
fn visit_insert(&mut self, insert: &'ast InsertStmt) {
if let InsertSource::Select { query } = &insert.source {
self.visit_query(query)
}
}

fn visit_replace(&mut self, replace: &'ast ReplaceStmt) {
if let InsertSource::Select { query, .. } = &replace.source {
self.visit_query(query)
}
}

fn visit_merge_into(&mut self, merge_into: &'ast MergeIntoStmt) {
// for visit merge into, its destination is to do some rules for the exprs
// in merge into before we bind_merge_into, we need to make sure the correct
// exprs rewrite for bind_merge_into
if let MergeSource::Select { query, .. } = &merge_into.source {
self.visit_query(query)
}
self.visit_expr(&merge_into.join_expr);
for operation in &merge_into.merge_options {
match operation {
MergeOption::Match(match_operation) => {
if let Some(expr) = &match_operation.selection {
self.visit_expr(expr)
}
if let MatchOperation::Update { update_list, .. } = &match_operation.operation {
for update in update_list {
self.visit_expr(&update.expr)
}
}
}
MergeOption::Unmatch(unmatch_operation) => {
if let Some(expr) = &unmatch_operation.selection {
self.visit_expr(expr)
}
for expr in &unmatch_operation.insert_operation.values {
self.visit_expr(expr)
}
}
}
}
}

fn visit_insert_source(&mut self, _insert_source: &'ast InsertSource) {}

fn visit_delete(&mut self, _delete: &'ast DeleteStmt) {}
fn visit_delete(&mut self, delete: &'ast DeleteStmt) {
if let Some(expr) = &delete.selection {
self.visit_expr(expr)
}
}

fn visit_update(&mut self, _update: &'ast UpdateStmt) {}
fn visit_update(&mut self, update: &'ast UpdateStmt) {
if let Some(expr) = &update.selection {
self.visit_expr(expr)
}
for update in &update.update_list {
self.visit_expr(&update.expr)
}
}

fn visit_show_catalogs(&mut self, _stmt: &'ast ShowCatalogsStmt) {}

Expand Down Expand Up @@ -459,7 +519,11 @@ pub trait Visitor<'ast>: Sized {

fn visit_show_drop_tables(&mut self, _stmt: &'ast ShowDropTablesStmt) {}

fn visit_create_table(&mut self, _stmt: &'ast CreateTableStmt) {}
fn visit_create_table(&mut self, stmt: &'ast CreateTableStmt) {
if let Some(query) = stmt.as_query.as_deref() {
self.visit_query(query)
}
}

fn visit_create_table_source(&mut self, _source: &'ast CreateTableSource) {}

Expand Down
80 changes: 72 additions & 8 deletions src/query/ast/src/visitors/visitor_mut.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,8 +394,16 @@ pub trait VisitorMut: Sized {
walk_statement_mut(self, stmt);
}

fn visit_copy_into_table(&mut self, _copy: &mut CopyIntoTableStmt) {}
fn visit_copy_into_location(&mut self, _copy: &mut CopyIntoLocationStmt) {}
fn visit_copy_into_table(&mut self, copy: &mut CopyIntoTableStmt) {
if let CopyIntoTableSource::Query(query) = &mut copy.src {
self.visit_query(query)
}
}
fn visit_copy_into_location(&mut self, copy: &mut CopyIntoLocationStmt) {
if let CopyIntoLocationSource::Query(query) = &mut copy.src {
self.visit_query(query)
}
}

fn visit_call(&mut self, _call: &mut CallStmt) {}

Expand Down Expand Up @@ -430,14 +438,66 @@ pub trait VisitorMut: Sized {
fn visit_set_role(&mut self, _is_default: bool, _role_name: &mut String) {}
fn visit_set_secondary_roles(&mut self, _option: &mut SecondaryRolesOption) {}

fn visit_insert(&mut self, _insert: &mut InsertStmt) {}
fn visit_replace(&mut self, _replace: &mut ReplaceStmt) {}
fn visit_merge_into(&mut self, _merge_into: &mut MergeIntoStmt) {}
fn visit_insert(&mut self, insert: &mut InsertStmt) {
if let InsertSource::Select { query } = &mut insert.source {
self.visit_query(query)
}
}
fn visit_replace(&mut self, replace: &mut ReplaceStmt) {
if let InsertSource::Select { query } = &mut replace.source {
self.visit_query(query)
}
}
fn visit_merge_into(&mut self, merge_into: &mut MergeIntoStmt) {
JackTan25 marked this conversation as resolved.
Show resolved Hide resolved
// for visit merge into, its destination is to do some rules for the exprs
// in merge into before we bind_merge_into, we need to make sure the correct
// exprs rewrite for bind_merge_into
if let MergeSource::Select { query, .. } = &mut merge_into.source {
self.visit_query(query)
}
self.visit_expr(&mut merge_into.join_expr);
for operation in &mut merge_into.merge_options {
match operation {
MergeOption::Match(match_operation) => {
if let Some(expr) = &mut match_operation.selection {
self.visit_expr(expr)
}
if let MatchOperation::Update { update_list, .. } =
&mut match_operation.operation
{
for update in update_list {
self.visit_expr(&mut update.expr)
}
}
}
MergeOption::Unmatch(unmatch_operation) => {
if let Some(expr) = &mut unmatch_operation.selection {
self.visit_expr(expr)
}
for expr in &mut unmatch_operation.insert_operation.values {
self.visit_expr(expr)
}
}
}
}
}

fn visit_insert_source(&mut self, _insert_source: &mut InsertSource) {}

fn visit_delete(&mut self, _delete: &mut DeleteStmt) {}
fn visit_delete(&mut self, delete: &mut DeleteStmt) {
if let Some(expr) = &mut delete.selection {
self.visit_expr(expr)
}
}

fn visit_update(&mut self, _update: &mut UpdateStmt) {}
fn visit_update(&mut self, update: &mut UpdateStmt) {
if let Some(expr) = &mut update.selection {
self.visit_expr(expr)
}
for update in &mut update.update_list {
self.visit_expr(&mut update.expr)
}
}

fn visit_show_catalogs(&mut self, _stmt: &mut ShowCatalogsStmt) {}

Expand Down Expand Up @@ -473,7 +533,11 @@ pub trait VisitorMut: Sized {

fn visit_show_drop_tables(&mut self, _stmt: &mut ShowDropTablesStmt) {}

fn visit_create_table(&mut self, _stmt: &mut CreateTableStmt) {}
fn visit_create_table(&mut self, stmt: &mut CreateTableStmt) {
if let Some(query) = stmt.as_query.as_deref_mut() {
self.visit_query(query)
}
}

fn visit_create_table_source(&mut self, _source: &mut CreateTableSource) {}

Expand Down
115 changes: 115 additions & 0 deletions src/query/sql/src/planner/binder/binder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,121 @@ impl<'a> Binder {
Ok(finder.scalars().is_empty())
}

// add check for SExpr to disable invalid source for copy/insert/merge/replace
pub(crate) fn check_sexpr_top(&self, s_expr: &SExpr) -> Result<bool> {
let f = |scalar: &ScalarExpr| matches!(scalar, ScalarExpr::UDFServerCall(_));
let mut finder = Finder::new(&f);
Self::check_sexpr(s_expr, &mut finder)
}

pub(crate) fn check_sexpr<F>(s_expr: &'a SExpr, f: &'a mut Finder<'a, F>) -> Result<bool>
where F: Fn(&ScalarExpr) -> bool {
let result = match s_expr.plan.as_ref() {
RelOperator::Scan(scan) => {
f.reset_finder();
if let Some(agg_info) = &scan.agg_index {
for predicate in &agg_info.predicates {
f.visit(predicate)?;
}
for selection in &agg_info.selection {
f.visit(&selection.scalar)?;
}
}
if let Some(predicates) = &scan.push_down_predicates {
for predicate in predicates {
f.visit(predicate)?;
}
}
f.scalars().is_empty()
}
RelOperator::Join(join) => {
b41sh marked this conversation as resolved.
Show resolved Hide resolved
f.reset_finder();
for condition in &join.left_conditions {
f.visit(condition)?;
}
for condition in &join.right_conditions {
f.visit(condition)?;
}
for condition in &join.non_equi_conditions {
f.visit(condition)?;
}
f.scalars().is_empty()
}
RelOperator::EvalScalar(eval) => {
f.reset_finder();
for item in &eval.items {
f.visit(&item.scalar)?;
}
f.scalars().is_empty()
}
RelOperator::Filter(filter) => {
f.reset_finder();
for predicate in &filter.predicates {
f.visit(predicate)?;
}
f.scalars().is_empty()
}
RelOperator::Aggregate(aggregate) => {
f.reset_finder();
for item in &aggregate.group_items {
f.visit(&item.scalar)?;
}
for item in &aggregate.aggregate_functions {
f.visit(&item.scalar)?;
}
f.scalars().is_empty()
}
RelOperator::Exchange(exchange) => {
f.reset_finder();
if let crate::plans::Exchange::Hash(hash) = exchange {
for scalar in hash {
f.visit(scalar)?;
}
}
f.scalars().is_empty()
}
RelOperator::RuntimeFilterSource(runtime_filter_source) => {
f.reset_finder();
for scalar in runtime_filter_source.left_runtime_filters.values() {
f.visit(scalar)?;
}
for scalar in runtime_filter_source.right_runtime_filters.values() {
f.visit(scalar)?;
}
f.scalars().is_empty()
}
RelOperator::Window(window) => {
f.reset_finder();
for scalar_item in &window.arguments {
f.visit(&scalar_item.scalar)?;
}
for scalar_item in &window.partition_by {
f.visit(&scalar_item.scalar)?;
}
for info in &window.order_by {
f.visit(&info.order_by_item.scalar)?;
}
f.scalars().is_empty()
}
RelOperator::Udf(_) => false,
_ => true,
};

match result {
true => {
for child in &s_expr.children {
let mut finder = Finder::new(f.find_fn());
let flag = Self::check_sexpr(child.as_ref(), &mut finder)?;
if !flag {
return Ok(false);
}
}
Ok(true)
}
false => Ok(false),
}
}

pub(crate) fn check_allowed_scalar_expr_with_subquery(
&self,
scalar: &ScalarExpr,
Expand Down
13 changes: 11 additions & 2 deletions src/query/sql/src/planner/binder/copy_into_location.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,17 @@ impl<'a> Binder {
}
}
CopyIntoLocationSource::Query(query) => {
self.bind_statement(bind_context, &Statement::Query(query.clone()))
.await
let select_plan = self
.bind_statement(bind_context, &Statement::Query(query.clone()))
.await?;
if let Plan::Query { s_expr, .. } = &select_plan {
if !self.check_sexpr_top(s_expr)? {
return Err(ErrorCode::SemanticError(
"copy into location source can't contain udf functions".to_string(),
));
}
}
Ok(select_plan)
}
}?;

Expand Down
13 changes: 13 additions & 0 deletions src/query/sql/src/planner/binder/copy_into_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,13 @@ impl<'a> Binder {
let plan = self
.bind_copy_into_table_common(bind_context, stmt, location)
.await?;

self.bind_copy_from_query_into_table(bind_context, plan, select_list, alias)
.await
}
}
}

async fn bind_copy_into_table_common(
&mut self,
bind_context: &mut BindContext,
Expand Down Expand Up @@ -331,6 +333,16 @@ impl<'a> Binder {
let select_list = self
.normalize_select_list(&mut from_context, select_list)
.await?;

for item in select_list.items.iter() {
if !self.check_allowed_scalar_expr_with_subquery(&item.scalar)? {
// in fact, if there is a join, we will stop in `check_transform_query()`
return Err(ErrorCode::SemanticError(
"copy into table source can't contain window|aggregate|udf|join functions"
.to_string(),
));
};
}
let (scalar_items, projections) = self.analyze_projection(
&from_context.aggregate_info,
&from_context.windows,
Expand Down Expand Up @@ -359,6 +371,7 @@ impl<'a> Binder {
ignore_result: false,
formatted_ast: None,
}));

Ok(Plan::CopyIntoTable(Box::new(plan)))
}

Expand Down
Loading
Loading