From 101f1e803a612ac3f7b4bd4d62905d7e41852b0d Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Wed, 29 Nov 2023 15:05:35 +0800 Subject: [PATCH 01/26] add rewrite rule --- src/query/ast/src/visitors/visitor_mut.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/query/ast/src/visitors/visitor_mut.rs b/src/query/ast/src/visitors/visitor_mut.rs index 5c417f67cabed..1ef949f195f0e 100644 --- a/src/query/ast/src/visitors/visitor_mut.rs +++ b/src/query/ast/src/visitors/visitor_mut.rs @@ -431,7 +431,11 @@ pub trait VisitorMut: Sized { fn visit_insert(&mut self, _insert: &mut InsertStmt) {} fn visit_replace(&mut self, _replace: &mut ReplaceStmt) {} - fn visit_merge_into(&mut self, _merge_into: &mut MergeIntoStmt) {} + fn visit_merge_into(&mut self, merge_into: &mut MergeIntoStmt) { + if let MergeSource::Select { query, .. } = &mut merge_into.source { + self.visit_query(query) + } + } fn visit_insert_source(&mut self, _insert_source: &mut InsertSource) {} fn visit_delete(&mut self, _delete: &mut DeleteStmt) {} From e6b209af98edef31e43e70362910e4902bd4350c Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Wed, 29 Nov 2023 16:21:16 +0800 Subject: [PATCH 02/26] add tests --- .../09_fuse_engine/09_0026_merge_into.test | 41 +++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0026_merge_into.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0026_merge_into.test index a74243b22a88f..35e3da9c11b6d 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0026_merge_into.test +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0026_merge_into.test @@ -901,6 +901,47 @@ WHEN NOT MATCHED THEN INSERT (order_id, user_id, order_type, asset_type, quantity, price, status, created_at, updated_at) VALUES ((SELECT MAX(order_id) FROM orders) + 1, synthetic_orders.user_id, synthetic_orders.synthetic_order_type, synthetic_orders.asset_type, synthetic_orders.total_quantity, 0, 'pending', synthetic_orders.synthetic_date, synthetic_orders.synthetic_date); +## issue #13810: rewrite rule test +statement ok +create table rule_target(a int,b string,c string); + +statement ok +truncate table orders; + +statement ok +insert into orders values(200007,7,'buy','BTC',4.81412194,48.14121943,'completed',to_date('2021-01-01'),to_date('2021-01-01')), +(200015,15,'buy','BTC',3.78463552,37.84635523,'completed',to_date('2021-01-01'),to_date('2021-01-01')), +(200019,19,'buy','BTC',1.61186913,16.11869132,'completed',to_date('2021-01-01'),to_date('2021-01-01')), +(200031,31,'buy','BTC',3.99013730,39.90137297,'completed',to_date('2021-01-01'),to_date('2021-01-01')), +(200047,47,'buy','BTC',0.98841829,9.88418289,'completed',to_date('2021-01-01'),to_date('2021-01-01')), +(200077,77,'buy','BTC',2.07360391,20.73603908,'completed',to_date('2021-01-01'),to_date('2021-01-01')), +(200087,87,'sell','ETH',9.64567442,96.45674419,'pending',to_date('2021-01-01'),to_date('2021-01-01')), +(200095,95,'buy','BTC',2.26686563,22.66865634,'completed',to_date('2021-01-01'),to_date('2021-01-01')), +(200098,98,'buy','BTC',1.37252960,13.72529599,'completed',to_date('2021-01-01'),to_date('2021-01-01')), +(200102,102,'buy','BTC',1.53596481,15.35964815,'completed',to_date('2021-01-01'),to_date('2021-01-01')); + +statement ok +MERGE INTO orders USING ( + SELECT o.order_id, o.user_id, o.order_type, o.asset_type, o.quantity + a.avg_quantity AS new_quantity, o.price, o.status, o.created_at, o.updated_at + FROM orders o + INNER JOIN ( + SELECT user_id, asset_type, sum(quantity) AS avg_quantity + FROM orders + GROUP BY user_id, asset_type + ) a ON o.user_id = a.user_id AND o.asset_type = a.asset_type +) AS joined_data ON orders.order_id = joined_data.order_id + WHEN MATCHED THEN + UPDATE SET orders.quantity = joined_data.new_quantity; + +query TTTT +SELECT SUM(quantity) AS total_quantity, + AVG(quantity) AS average_quantity, + MIN(quantity) AS min_quantity, + MAX(quantity) AS max_quantity +FROM orders; +---- +64.16764110 6.416764110000 1.97683658 19.29134884 + statement ok set enable_distributed_merge_into = 0; From 3d1d96acfaccba69df99652e2f94be0e8b84edd3 Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Wed, 29 Nov 2023 17:26:08 +0800 Subject: [PATCH 03/26] fix test --- .../09_fuse_engine/09_0026_merge_into.test | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0026_merge_into.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0026_merge_into.test index 35e3da9c11b6d..616d3ed9165d3 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0026_merge_into.test +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0026_merge_into.test @@ -903,10 +903,20 @@ WHEN NOT MATCHED THEN ## issue #13810: rewrite rule test statement ok -create table rule_target(a int,b string,c string); - -statement ok -truncate table orders; +DROP TABLE IF EXISTS orders; + +statement ok +CREATE TABLE orders ( + order_id INT NOT NULL, + user_id INT NOT NULL, + order_type VARCHAR NOT NULL, + asset_type VARCHAR NOT NULL, + quantity DECIMAL(18,8) NOT NULL, + price DECIMAL(18,8) NOT NULL, + status VARCHAR NOT NULL, + created_at DATE NOT NULL, + updated_at DATE NOT NULL +) row_per_block=5113; statement ok insert into orders values(200007,7,'buy','BTC',4.81412194,48.14121943,'completed',to_date('2021-01-01'),to_date('2021-01-01')), From e8c87604f71c8384d19986afc8f9c93b1eb6576a Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Wed, 29 Nov 2023 19:42:42 +0800 Subject: [PATCH 04/26] add more exprs --- src/query/ast/src/visitors/visitor.rs | 29 ++++++++++++++++++++++- src/query/ast/src/visitors/visitor_mut.rs | 25 +++++++++++++++++++ 2 files changed, 53 insertions(+), 1 deletion(-) diff --git a/src/query/ast/src/visitors/visitor.rs b/src/query/ast/src/visitors/visitor.rs index dc814041d31d6..35167812f7914 100644 --- a/src/query/ast/src/visitors/visitor.rs +++ b/src/query/ast/src/visitors/visitor.rs @@ -418,7 +418,34 @@ pub trait Visitor<'ast>: Sized { fn visit_insert(&mut self, _insert: &'ast InsertStmt) {} fn visit_replace(&mut self, _replace: &'ast ReplaceStmt) {} - fn visit_merge_into(&mut self, _merge_into: &'ast MergeIntoStmt) {} + fn visit_merge_into(&mut self, merge_into: &'ast MergeIntoStmt) { + if let MergeSource::Select { query, .. } = &merge_into.source { + self.visit_query(query) + } + self.visit_expr(&merge_into.join_expr); + for operation in &merge_into.merge_options { + match operation { + MergeOption::Match(match_operation) => { + if let Some(expr) = &match_operation.selection { + self.visit_expr(expr) + } + if let MatchOperation::Update { update_list, .. } = &match_operation.operation { + for update in update_list { + self.visit_expr(&update.expr) + } + } + } + MergeOption::Unmatch(unmatch_operation) => { + if let Some(expr) = &unmatch_operation.selection { + self.visit_expr(expr) + } + for expr in &unmatch_operation.insert_operation.values { + self.visit_expr(expr) + } + } + } + } + } fn visit_insert_source(&mut self, _insert_source: &'ast InsertSource) {} fn visit_delete(&mut self, _delete: &'ast DeleteStmt) {} diff --git a/src/query/ast/src/visitors/visitor_mut.rs b/src/query/ast/src/visitors/visitor_mut.rs index 1ef949f195f0e..36b7cd936d3ec 100644 --- a/src/query/ast/src/visitors/visitor_mut.rs +++ b/src/query/ast/src/visitors/visitor_mut.rs @@ -435,6 +435,31 @@ pub trait VisitorMut: Sized { if let MergeSource::Select { query, .. } = &mut merge_into.source { self.visit_query(query) } + self.visit_expr(&mut merge_into.join_expr); + for operation in &mut merge_into.merge_options { + match operation { + MergeOption::Match(match_operation) => { + if let Some(expr) = &mut match_operation.selection { + self.visit_expr(expr) + } + if let MatchOperation::Update { update_list, .. } = + &mut match_operation.operation + { + for update in update_list { + self.visit_expr(&mut update.expr) + } + } + } + MergeOption::Unmatch(unmatch_operation) => { + if let Some(expr) = &mut unmatch_operation.selection { + self.visit_expr(expr) + } + for expr in &mut unmatch_operation.insert_operation.values { + self.visit_expr(expr) + } + } + } + } } fn visit_insert_source(&mut self, _insert_source: &mut InsertSource) {} From ed3de213c87420fd8d55e218aa9a4f26bac2ee31 Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Thu, 30 Nov 2023 10:03:05 +0800 Subject: [PATCH 05/26] add merge into visit comments --- src/query/ast/src/visitors/visitor.rs | 3 +++ src/query/ast/src/visitors/visitor_mut.rs | 3 +++ 2 files changed, 6 insertions(+) diff --git a/src/query/ast/src/visitors/visitor.rs b/src/query/ast/src/visitors/visitor.rs index 35167812f7914..0612782db6813 100644 --- a/src/query/ast/src/visitors/visitor.rs +++ b/src/query/ast/src/visitors/visitor.rs @@ -419,6 +419,9 @@ pub trait Visitor<'ast>: Sized { fn visit_insert(&mut self, _insert: &'ast InsertStmt) {} fn visit_replace(&mut self, _replace: &'ast ReplaceStmt) {} fn visit_merge_into(&mut self, merge_into: &'ast MergeIntoStmt) { + // for visit merge into, its destination is to do some rules for the exprs + // in merge into before we bind_merge_into, we need to make sure the correct + // exprs rewrite for bind_merge_into if let MergeSource::Select { query, .. } = &merge_into.source { self.visit_query(query) } diff --git a/src/query/ast/src/visitors/visitor_mut.rs b/src/query/ast/src/visitors/visitor_mut.rs index 36b7cd936d3ec..2c0314c7ae0e6 100644 --- a/src/query/ast/src/visitors/visitor_mut.rs +++ b/src/query/ast/src/visitors/visitor_mut.rs @@ -432,6 +432,9 @@ pub trait VisitorMut: Sized { fn visit_insert(&mut self, _insert: &mut InsertStmt) {} fn visit_replace(&mut self, _replace: &mut ReplaceStmt) {} fn visit_merge_into(&mut self, merge_into: &mut MergeIntoStmt) { + // for visit merge into, its destination is to do some rules for the exprs + // in merge into before we bind_merge_into, we need to make sure the correct + // exprs rewrite for bind_merge_into if let MergeSource::Select { query, .. } = &mut merge_into.source { self.visit_query(query) } From 4bb134b199484b4748855fee9a171c57a4482c1c Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Thu, 30 Nov 2023 13:16:26 +0800 Subject: [PATCH 06/26] fix visit and source for merge,insert,delete,update,replace,copy --- src/query/ast/src/visitors/visitor.rs | 36 ++++- src/query/ast/src/visitors/visitor_mut.rs | 34 +++- src/query/sql/src/planner/binder/binder.rs | 151 ++++++++++++++++++ .../src/planner/binder/copy_into_location.rs | 14 +- src/query/sql/src/planner/binder/insert.rs | 9 ++ .../sql/src/planner/binder/merge_into.rs | 6 + src/query/sql/src/planner/binder/replace.rs | 8 + .../sql/src/planner/binder/scalar_common.rs | 8 + 8 files changed, 254 insertions(+), 12 deletions(-) diff --git a/src/query/ast/src/visitors/visitor.rs b/src/query/ast/src/visitors/visitor.rs index 0612782db6813..e02c05be4f88f 100644 --- a/src/query/ast/src/visitors/visitor.rs +++ b/src/query/ast/src/visitors/visitor.rs @@ -379,7 +379,11 @@ pub trait Visitor<'ast>: Sized { fn visit_explain(&mut self, _kind: &'ast ExplainKind, _query: &'ast Statement) {} fn visit_copy_into_table(&mut self, _copy: &'ast CopyIntoTableStmt) {} - fn visit_copy_into_location(&mut self, _copy: &'ast CopyIntoLocationStmt) {} + fn visit_copy_into_location(&mut self, copy: &'ast CopyIntoLocationStmt) { + if let CopyIntoLocationSource::Query(query) = ©.src { + self.visit_query(query) + } + } fn visit_call(&mut self, _call: &'ast CallStmt) {} @@ -416,8 +420,18 @@ pub trait Visitor<'ast>: Sized { fn visit_set_role(&mut self, _is_default: bool, _role_name: &'ast str) {} fn visit_set_secondary_roles(&mut self, _option: &SecondaryRolesOption) {} - fn visit_insert(&mut self, _insert: &'ast InsertStmt) {} - fn visit_replace(&mut self, _replace: &'ast ReplaceStmt) {} + fn visit_insert(&mut self, insert: &'ast InsertStmt) { + if let InsertSource::Select { query } = &insert.source { + self.visit_query(query) + } + } + + fn visit_replace(&mut self, replace: &'ast ReplaceStmt) { + if let InsertSource::Select { query, .. } = &replace.source { + self.visit_query(query) + } + } + fn visit_merge_into(&mut self, merge_into: &'ast MergeIntoStmt) { // for visit merge into, its destination is to do some rules for the exprs // in merge into before we bind_merge_into, we need to make sure the correct @@ -449,11 +463,23 @@ pub trait Visitor<'ast>: Sized { } } } + fn visit_insert_source(&mut self, _insert_source: &'ast InsertSource) {} - fn visit_delete(&mut self, _delete: &'ast DeleteStmt) {} + fn visit_delete(&mut self, delete: &'ast DeleteStmt) { + if let Some(expr) = &delete.selection { + self.visit_expr(expr) + } + } - fn visit_update(&mut self, _update: &'ast UpdateStmt) {} + fn visit_update(&mut self, update: &'ast UpdateStmt) { + if let Some(expr) = &update.selection { + self.visit_expr(expr) + } + for update in &update.update_list { + self.visit_expr(&update.expr) + } + } fn visit_show_catalogs(&mut self, _stmt: &'ast ShowCatalogsStmt) {} diff --git a/src/query/ast/src/visitors/visitor_mut.rs b/src/query/ast/src/visitors/visitor_mut.rs index 2c0314c7ae0e6..d7f7a6e4229ef 100644 --- a/src/query/ast/src/visitors/visitor_mut.rs +++ b/src/query/ast/src/visitors/visitor_mut.rs @@ -394,7 +394,11 @@ pub trait VisitorMut: Sized { } fn visit_copy_into_table(&mut self, _copy: &mut CopyIntoTableStmt) {} - fn visit_copy_into_location(&mut self, _copy: &mut CopyIntoLocationStmt) {} + fn visit_copy_into_location(&mut self, copy: &mut CopyIntoLocationStmt) { + if let CopyIntoLocationSource::Query(query) = &mut copy.src { + self.visit_query(query) + } + } fn visit_call(&mut self, _call: &mut CallStmt) {} @@ -429,8 +433,16 @@ pub trait VisitorMut: Sized { fn visit_set_role(&mut self, _is_default: bool, _role_name: &mut String) {} fn visit_set_secondary_roles(&mut self, _option: &mut SecondaryRolesOption) {} - fn visit_insert(&mut self, _insert: &mut InsertStmt) {} - fn visit_replace(&mut self, _replace: &mut ReplaceStmt) {} + fn visit_insert(&mut self, insert: &mut InsertStmt) { + if let InsertSource::Select { query } = &mut insert.source { + self.visit_query(query) + } + } + fn visit_replace(&mut self, replace: &mut ReplaceStmt) { + if let InsertSource::Select { query } = &mut replace.source { + self.visit_query(query) + } + } fn visit_merge_into(&mut self, merge_into: &mut MergeIntoStmt) { // for visit merge into, its destination is to do some rules for the exprs // in merge into before we bind_merge_into, we need to make sure the correct @@ -464,11 +476,23 @@ pub trait VisitorMut: Sized { } } } + fn visit_insert_source(&mut self, _insert_source: &mut InsertSource) {} - fn visit_delete(&mut self, _delete: &mut DeleteStmt) {} + fn visit_delete(&mut self, delete: &mut DeleteStmt) { + if let Some(expr) = &mut delete.selection { + self.visit_expr(expr) + } + } - fn visit_update(&mut self, _update: &mut UpdateStmt) {} + fn visit_update(&mut self, update: &mut UpdateStmt) { + if let Some(expr) = &mut update.selection { + self.visit_expr(expr) + } + for update in &mut update.update_list { + self.visit_expr(&mut update.expr) + } + } fn visit_show_catalogs(&mut self, _stmt: &mut ShowCatalogsStmt) {} diff --git a/src/query/sql/src/planner/binder/binder.rs b/src/query/sql/src/planner/binder/binder.rs index ebb7652c8bbcd..7fbf0e07fb3be 100644 --- a/src/query/sql/src/planner/binder/binder.rs +++ b/src/query/sql/src/planner/binder/binder.rs @@ -71,6 +71,13 @@ use crate::ScalarExpr; use crate::TypeChecker; use crate::Visibility; +pub enum CheckType { + Merge, + Insert, + Replace, + Copy, +} + /// Binder is responsible to transform AST of a query into a canonical logical SExpr. /// /// During this phase, it will: @@ -674,6 +681,150 @@ impl<'a> Binder { Ok(finder.scalars().is_empty()) } + pub(crate) fn check_sexpr_top(&self, s_expr: &SExpr, top_check: CheckType) -> Result { + let f = match top_check { + CheckType::Copy => |scalar: &ScalarExpr| { + matches!( + scalar, + ScalarExpr::WindowFunction(_) + | ScalarExpr::AggregateFunction(_) + | ScalarExpr::UDFServerCall(_) + ) + }, + _ => |scalar: &ScalarExpr| matches!(scalar, ScalarExpr::UDFServerCall(_)), + }; + + let mut finder = Finder::new(&f); + self.check_sexpr(s_expr, &mut finder) + } + + pub(crate) fn check_sexpr( + &self, + s_expr: &'a SExpr, + f: &'a mut Finder<'a, F>, + ) -> Result + where + F: Fn(&ScalarExpr) -> bool, + { + let result = match s_expr.plan.as_ref() { + RelOperator::Scan(scan) => { + f.reset_finder(); + if let Some(agg_info) = &scan.agg_index { + for predicate in &agg_info.predicates { + f.visit(predicate)?; + } + for selection in &agg_info.selection { + f.visit(&selection.scalar)?; + } + } + if let Some(predicates) = &scan.push_down_predicates { + for predicate in predicates { + f.visit(predicate)?; + } + } + f.scalars().is_empty() + } + RelOperator::Join(join) => { + f.reset_finder(); + for condition in &join.left_conditions { + f.visit(condition)?; + } + for condition in &join.right_conditions { + f.visit(condition)?; + } + for condition in &join.non_equi_conditions { + f.visit(condition)?; + } + f.scalars().is_empty() + } + RelOperator::EvalScalar(eval) => { + f.reset_finder(); + for item in &eval.items { + f.visit(&item.scalar)?; + } + f.scalars().is_empty() + } + RelOperator::Filter(filter) => { + f.reset_finder(); + for predicate in &filter.predicates { + f.visit(predicate)?; + } + f.scalars().is_empty() + } + RelOperator::Aggregate(aggregate) => { + f.reset_finder(); + for item in &aggregate.group_items { + f.visit(&item.scalar)?; + } + for item in &aggregate.aggregate_functions { + f.visit(&item.scalar)?; + } + f.scalars().is_empty() + } + RelOperator::Exchange(exchange) => { + f.reset_finder(); + if let crate::plans::Exchange::Hash(hash) = exchange { + for scalar in hash { + f.visit(scalar)?; + } + } + f.scalars().is_empty() + } + RelOperator::RuntimeFilterSource(runtime_filter_source) => { + f.reset_finder(); + for scalar in runtime_filter_source.left_runtime_filters.values() { + f.visit(scalar)?; + } + for scalar in runtime_filter_source.right_runtime_filters.values() { + f.visit(scalar)?; + } + f.scalars().is_empty() + } + RelOperator::Window(window) => { + f.reset_finder(); + for scalar_item in &window.arguments { + f.visit(&scalar_item.scalar)?; + } + for scalar_item in &window.partition_by { + f.visit(&scalar_item.scalar)?; + } + for info in &window.order_by { + f.visit(&info.order_by_item.scalar)?; + } + f.scalars().is_empty() + } + RelOperator::ProjectSet(set) => { + f.reset_finder(); + for item in &set.srfs { + f.visit(&item.scalar)?; + } + f.scalars().is_empty() + } + RelOperator::Udf(udf) => { + f.reset_finder(); + for item in &udf.items { + f.visit(&item.scalar)?; + } + f.scalars().is_empty() + } + _ => true, + }; + + match result { + true => { + for child in &s_expr.children { + let mut finder = Finder::new(f.find_fn()); + let flag = self.check_sexpr(child.as_ref(), &mut finder)?; + if !flag { + return Ok(false); + } + } + Ok(true) + } + false => Ok(false), + } + } + pub(crate) fn check_allowed_scalar_expr_with_subquery( &self, scalar: &ScalarExpr, diff --git a/src/query/sql/src/planner/binder/copy_into_location.rs b/src/query/sql/src/planner/binder/copy_into_location.rs index 33c76efbf43b8..770040da799fe 100644 --- a/src/query/sql/src/planner/binder/copy_into_location.rs +++ b/src/query/sql/src/planner/binder/copy_into_location.rs @@ -59,8 +59,18 @@ impl<'a> Binder { } } CopyIntoLocationSource::Query(query) => { - self.bind_statement(bind_context, &Statement::Query(query.clone())) - .await + let select_plan = self + .bind_statement(bind_context, &Statement::Query(query.clone())) + .await?; + if let Plan::Query { s_expr, .. } = &select_plan { + if !self.check_sexpr_top(s_expr, super::binder::CheckType::Copy)? { + return Err(ErrorCode::SemanticError( + "copy into location source's condition can't contain window|aggregate|udf functions" + .to_string(), + )); + } + } + Ok(select_plan) } }?; diff --git a/src/query/sql/src/planner/binder/insert.rs b/src/query/sql/src/planner/binder/insert.rs index eff62f987374f..9fbd39acf6db0 100644 --- a/src/query/sql/src/planner/binder/insert.rs +++ b/src/query/sql/src/planner/binder/insert.rs @@ -153,6 +153,15 @@ impl Binder { let opt_ctx = Arc::new(OptimizerContext::new(OptimizerConfig { enable_distributed_optimization: !self.ctx.get_cluster().is_empty(), })); + + if let Plan::Query { s_expr, .. } = &select_plan { + if !self.check_sexpr_top(s_expr, super::binder::CheckType::Insert)? { + return Err(ErrorCode::SemanticError( + "insert source's condition can't contain udf functions".to_string(), + )); + } + } + let optimized_plan = optimize(self.ctx.clone(), opt_ctx, select_plan)?; Ok(InsertInputSource::SelectPlan(Box::new(optimized_plan))) } diff --git a/src/query/sql/src/planner/binder/merge_into.rs b/src/query/sql/src/planner/binder/merge_into.rs index 351fb97bba274..67550de266a52 100644 --- a/src/query/sql/src/planner/binder/merge_into.rs +++ b/src/query/sql/src/planner/binder/merge_into.rs @@ -127,6 +127,12 @@ impl Binder { let (source_expr, mut source_context) = self.bind_single_table(bind_context, &source_data).await?; + if !self.check_sexpr_top(&source_expr, super::binder::CheckType::Merge)? { + return Err(ErrorCode::SemanticError( + "replace source's condition can't contain udf functions".to_string(), + )); + } + // add all left source columns for read // todo: (JackTan25) do column prune after finish "split expr for target and source" let mut columns_set = source_context.column_set(); diff --git a/src/query/sql/src/planner/binder/replace.rs b/src/query/sql/src/planner/binder/replace.rs index ee4a11acdadf3..35903edf7864f 100644 --- a/src/query/sql/src/planner/binder/replace.rs +++ b/src/query/sql/src/planner/binder/replace.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use common_ast::ast::InsertSource; use common_ast::ast::ReplaceStmt; use common_ast::ast::Statement; +use common_exception::ErrorCode; use common_exception::Result; use common_meta_app::principal::FileFormatOptionsAst; use common_meta_app::principal::OnErrorMode; @@ -136,6 +137,13 @@ impl Binder { InsertSource::Select { query } => { let statement = Statement::Query(query); let select_plan = self.bind_statement(bind_context, &statement).await?; + if let Plan::Query { s_expr, .. } = &select_plan { + if !self.check_sexpr_top(s_expr, super::binder::CheckType::Replace)? { + return Err(ErrorCode::SemanticError( + "replace source's condition can't contain udf functions".to_string(), + )); + } + } let enable_distributed_optimization = false; let opt_ctx = Arc::new(OptimizerContext::new(OptimizerConfig { enable_distributed_optimization, diff --git a/src/query/sql/src/planner/binder/scalar_common.rs b/src/query/sql/src/planner/binder/scalar_common.rs index 32e4ec9468a91..f017f4eb44076 100644 --- a/src/query/sql/src/planner/binder/scalar_common.rs +++ b/src/query/sql/src/planner/binder/scalar_common.rs @@ -48,6 +48,14 @@ where F: Fn(&ScalarExpr) -> bool pub fn scalars(&self) -> &[ScalarExpr] { &self.scalars } + + pub fn reset_finder(&mut self) { + self.scalars.clear() + } + + pub fn find_fn(&self) -> &'a F { + self.find_fn + } } impl<'a, F> Visitor<'a> for Finder<'a, F> From 4605958eea4e064238d230693489496b3bc25596 Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Thu, 30 Nov 2023 13:36:34 +0800 Subject: [PATCH 07/26] fix lint --- src/query/sql/src/planner/binder/binder.rs | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/src/query/sql/src/planner/binder/binder.rs b/src/query/sql/src/planner/binder/binder.rs index 7fbf0e07fb3be..a66e2b4abb0a6 100644 --- a/src/query/sql/src/planner/binder/binder.rs +++ b/src/query/sql/src/planner/binder/binder.rs @@ -681,6 +681,7 @@ impl<'a> Binder { Ok(finder.scalars().is_empty()) } + // add check for SExpr to disable invalid source for copy/insert/merge/replace pub(crate) fn check_sexpr_top(&self, s_expr: &SExpr, top_check: CheckType) -> Result { let f = match top_check { CheckType::Copy => |scalar: &ScalarExpr| { @@ -698,14 +699,8 @@ impl<'a> Binder { self.check_sexpr(s_expr, &mut finder) } - pub(crate) fn check_sexpr( - &self, - s_expr: &'a SExpr, - f: &'a mut Finder<'a, F>, - ) -> Result - where - F: Fn(&ScalarExpr) -> bool, - { + pub(crate) fn check_sexpr(s_expr: &'a SExpr, f: &'a mut Finder<'a, F>) -> Result + where F: Fn(&ScalarExpr) -> bool { let result = match s_expr.plan.as_ref() { RelOperator::Scan(scan) => { f.reset_finder(); @@ -814,7 +809,7 @@ impl<'a> Binder { true => { for child in &s_expr.children { let mut finder = Finder::new(f.find_fn()); - let flag = self.check_sexpr(child.as_ref(), &mut finder)?; + let flag = Self::check_sexpr(child.as_ref(), &mut finder)?; if !flag { return Ok(false); } From 38802fd0408dacc63e0ba948dd620b80f16d755f Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Thu, 30 Nov 2023 13:36:45 +0800 Subject: [PATCH 08/26] fix lint --- src/query/sql/src/planner/binder/binder.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/sql/src/planner/binder/binder.rs b/src/query/sql/src/planner/binder/binder.rs index a66e2b4abb0a6..cec4bcca989c6 100644 --- a/src/query/sql/src/planner/binder/binder.rs +++ b/src/query/sql/src/planner/binder/binder.rs @@ -696,7 +696,7 @@ impl<'a> Binder { }; let mut finder = Finder::new(&f); - self.check_sexpr(s_expr, &mut finder) + Self::check_sexpr(s_expr, &mut finder) } pub(crate) fn check_sexpr(s_expr: &'a SExpr, f: &'a mut Finder<'a, F>) -> Result From 5791538a24296f7a246e7d5766a134c360d34ba0 Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Thu, 30 Nov 2023 15:08:41 +0800 Subject: [PATCH 09/26] modify support --- src/query/sql/src/planner/binder/binder.rs | 72 +++++++++++----------- 1 file changed, 37 insertions(+), 35 deletions(-) diff --git a/src/query/sql/src/planner/binder/binder.rs b/src/query/sql/src/planner/binder/binder.rs index cec4bcca989c6..eff8508ab247b 100644 --- a/src/query/sql/src/planner/binder/binder.rs +++ b/src/query/sql/src/planner/binder/binder.rs @@ -696,11 +696,17 @@ impl<'a> Binder { }; let mut finder = Finder::new(&f); - Self::check_sexpr(s_expr, &mut finder) + Self::check_sexpr(s_expr, &mut finder, &top_check) } - pub(crate) fn check_sexpr(s_expr: &'a SExpr, f: &'a mut Finder<'a, F>) -> Result - where F: Fn(&ScalarExpr) -> bool { + pub(crate) fn check_sexpr( + s_expr: &'a SExpr, + f: &'a mut Finder<'a, F>, + top_check: &CheckType, + ) -> Result + where + F: Fn(&ScalarExpr) -> bool, + { let result = match s_expr.plan.as_ref() { RelOperator::Scan(scan) => { f.reset_finder(); @@ -747,14 +753,18 @@ impl<'a> Binder { f.scalars().is_empty() } RelOperator::Aggregate(aggregate) => { - f.reset_finder(); - for item in &aggregate.group_items { - f.visit(&item.scalar)?; - } - for item in &aggregate.aggregate_functions { - f.visit(&item.scalar)?; + if let CheckType::Copy = top_check { + false + } else { + f.reset_finder(); + for item in &aggregate.group_items { + f.visit(&item.scalar)?; + } + for item in &aggregate.aggregate_functions { + f.visit(&item.scalar)?; + } + f.scalars().is_empty() } - f.scalars().is_empty() } RelOperator::Exchange(exchange) => { f.reset_finder(); @@ -776,32 +786,24 @@ impl<'a> Binder { f.scalars().is_empty() } RelOperator::Window(window) => { - f.reset_finder(); - for scalar_item in &window.arguments { - f.visit(&scalar_item.scalar)?; - } - for scalar_item in &window.partition_by { - f.visit(&scalar_item.scalar)?; - } - for info in &window.order_by { - f.visit(&info.order_by_item.scalar)?; - } - f.scalars().is_empty() - } - RelOperator::ProjectSet(set) => { - f.reset_finder(); - for item in &set.srfs { - f.visit(&item.scalar)?; - } - f.scalars().is_empty() - } - RelOperator::Udf(udf) => { - f.reset_finder(); - for item in &udf.items { - f.visit(&item.scalar)?; + if let CheckType::Copy = top_check { + false + } else { + f.reset_finder(); + for scalar_item in &window.arguments { + f.visit(&scalar_item.scalar)?; + } + for scalar_item in &window.partition_by { + f.visit(&scalar_item.scalar)?; + } + for info in &window.order_by { + f.visit(&info.order_by_item.scalar)?; + } + f.scalars().is_empty() } - f.scalars().is_empty() } + RelOperator::ProjectSet(_) => false, + RelOperator::Udf(_) => false, _ => true, }; @@ -809,7 +811,7 @@ impl<'a> Binder { true => { for child in &s_expr.children { let mut finder = Finder::new(f.find_fn()); - let flag = Self::check_sexpr(child.as_ref(), &mut finder)?; + let flag = Self::check_sexpr(child.as_ref(), &mut finder, &top_check)?; if !flag { return Ok(false); } From 814d799efce379e2429869e6f8ac9c9161c9d940 Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Thu, 30 Nov 2023 16:55:21 +0800 Subject: [PATCH 10/26] modify --- src/query/ast/src/visitors/visitor.rs | 6 +- src/query/ast/src/visitors/visitor_mut.rs | 6 +- src/query/sql/src/planner/binder/binder.rs | 44 ++++---- .../src/planner/binder/copy_into_location.rs | 5 +- .../sql/src/planner/binder/copy_into_table.rs | 8 ++ src/query/sql/src/planner/binder/insert.rs | 2 +- .../sql/src/planner/binder/merge_into.rs | 2 +- src/query/sql/src/planner/binder/replace.rs | 2 +- .../base/09_fuse_engine/09_0032_pr13848.test | 105 ++++++++++++++++++ 9 files changed, 152 insertions(+), 28 deletions(-) create mode 100644 tests/sqllogictests/suites/base/09_fuse_engine/09_0032_pr13848.test diff --git a/src/query/ast/src/visitors/visitor.rs b/src/query/ast/src/visitors/visitor.rs index e02c05be4f88f..72dd8d6345b56 100644 --- a/src/query/ast/src/visitors/visitor.rs +++ b/src/query/ast/src/visitors/visitor.rs @@ -378,7 +378,11 @@ pub trait Visitor<'ast>: Sized { fn visit_explain(&mut self, _kind: &'ast ExplainKind, _query: &'ast Statement) {} - fn visit_copy_into_table(&mut self, _copy: &'ast CopyIntoTableStmt) {} + fn visit_copy_into_table(&mut self, copy: &'ast CopyIntoTableStmt) { + if let CopyIntoTableSource::Query(query) = ©.src { + self.visit_query(query) + } + } fn visit_copy_into_location(&mut self, copy: &'ast CopyIntoLocationStmt) { if let CopyIntoLocationSource::Query(query) = ©.src { self.visit_query(query) diff --git a/src/query/ast/src/visitors/visitor_mut.rs b/src/query/ast/src/visitors/visitor_mut.rs index d7f7a6e4229ef..913d338603e08 100644 --- a/src/query/ast/src/visitors/visitor_mut.rs +++ b/src/query/ast/src/visitors/visitor_mut.rs @@ -393,7 +393,11 @@ pub trait VisitorMut: Sized { walk_statement_mut(self, stmt); } - fn visit_copy_into_table(&mut self, _copy: &mut CopyIntoTableStmt) {} + fn visit_copy_into_table(&mut self, copy: &mut CopyIntoTableStmt) { + if let CopyIntoTableSource::Query(query) = &mut copy.src { + self.visit_query(query) + } + } fn visit_copy_into_location(&mut self, copy: &mut CopyIntoLocationStmt) { if let CopyIntoLocationSource::Query(query) = &mut copy.src { self.visit_query(query) diff --git a/src/query/sql/src/planner/binder/binder.rs b/src/query/sql/src/planner/binder/binder.rs index eff8508ab247b..e0b6e5ae80b21 100644 --- a/src/query/sql/src/planner/binder/binder.rs +++ b/src/query/sql/src/planner/binder/binder.rs @@ -75,7 +75,8 @@ pub enum CheckType { Merge, Insert, Replace, - Copy, + CopyIntoTable, + CopyIntoLocation, } /// Binder is responsible to transform AST of a query into a canonical logical SExpr. @@ -682,9 +683,9 @@ impl<'a> Binder { } // add check for SExpr to disable invalid source for copy/insert/merge/replace - pub(crate) fn check_sexpr_top(&self, s_expr: &SExpr, top_check: CheckType) -> Result { - let f = match top_check { - CheckType::Copy => |scalar: &ScalarExpr| { + pub(crate) fn check_sexpr_top(&self, s_expr: &SExpr, check_type: CheckType) -> Result { + let f = match check_type { + CheckType::CopyIntoTable => |scalar: &ScalarExpr| { matches!( scalar, ScalarExpr::WindowFunction(_) @@ -696,13 +697,13 @@ impl<'a> Binder { }; let mut finder = Finder::new(&f); - Self::check_sexpr(s_expr, &mut finder, &top_check) + Self::check_sexpr(s_expr, &mut finder, &check_type) } pub(crate) fn check_sexpr( s_expr: &'a SExpr, f: &'a mut Finder<'a, F>, - top_check: &CheckType, + check_type: &CheckType, ) -> Result where F: Fn(&ScalarExpr) -> bool, @@ -726,17 +727,21 @@ impl<'a> Binder { f.scalars().is_empty() } RelOperator::Join(join) => { - f.reset_finder(); - for condition in &join.left_conditions { - f.visit(condition)?; - } - for condition in &join.right_conditions { - f.visit(condition)?; - } - for condition in &join.non_equi_conditions { - f.visit(condition)?; + if let CheckType::CopyIntoTable = check_type { + false + } else { + f.reset_finder(); + for condition in &join.left_conditions { + f.visit(condition)?; + } + for condition in &join.right_conditions { + f.visit(condition)?; + } + for condition in &join.non_equi_conditions { + f.visit(condition)?; + } + f.scalars().is_empty() } - f.scalars().is_empty() } RelOperator::EvalScalar(eval) => { f.reset_finder(); @@ -753,7 +758,7 @@ impl<'a> Binder { f.scalars().is_empty() } RelOperator::Aggregate(aggregate) => { - if let CheckType::Copy = top_check { + if let CheckType::CopyIntoTable = check_type { false } else { f.reset_finder(); @@ -786,7 +791,7 @@ impl<'a> Binder { f.scalars().is_empty() } RelOperator::Window(window) => { - if let CheckType::Copy = top_check { + if let CheckType::CopyIntoTable = check_type { false } else { f.reset_finder(); @@ -802,7 +807,6 @@ impl<'a> Binder { f.scalars().is_empty() } } - RelOperator::ProjectSet(_) => false, RelOperator::Udf(_) => false, _ => true, }; @@ -811,7 +815,7 @@ impl<'a> Binder { true => { for child in &s_expr.children { let mut finder = Finder::new(f.find_fn()); - let flag = Self::check_sexpr(child.as_ref(), &mut finder, &top_check)?; + let flag = Self::check_sexpr(child.as_ref(), &mut finder, check_type)?; if !flag { return Ok(false); } diff --git a/src/query/sql/src/planner/binder/copy_into_location.rs b/src/query/sql/src/planner/binder/copy_into_location.rs index 770040da799fe..d31808c78567a 100644 --- a/src/query/sql/src/planner/binder/copy_into_location.rs +++ b/src/query/sql/src/planner/binder/copy_into_location.rs @@ -63,10 +63,9 @@ impl<'a> Binder { .bind_statement(bind_context, &Statement::Query(query.clone())) .await?; if let Plan::Query { s_expr, .. } = &select_plan { - if !self.check_sexpr_top(s_expr, super::binder::CheckType::Copy)? { + if !self.check_sexpr_top(s_expr, super::binder::CheckType::CopyIntoLocation)? { return Err(ErrorCode::SemanticError( - "copy into location source's condition can't contain window|aggregate|udf functions" - .to_string(), + "copy into location source can't contain udf functions".to_string(), )); } } diff --git a/src/query/sql/src/planner/binder/copy_into_table.rs b/src/query/sql/src/planner/binder/copy_into_table.rs index 0cec396b2b71f..c081f9401cb70 100644 --- a/src/query/sql/src/planner/binder/copy_into_table.rs +++ b/src/query/sql/src/planner/binder/copy_into_table.rs @@ -91,6 +91,14 @@ impl<'a> Binder { let plan = self .bind_copy_into_table_common(bind_context, stmt, location) .await?; + if let Some(Plan::Query { s_expr, .. }) = plan.query.as_deref() { + if !self.check_sexpr_top(s_expr, super::binder::CheckType::CopyIntoTable)? { + return Err(ErrorCode::SemanticError( + "copy into table source can't contain window|aggregate|udf|join functions" + .to_string(), + )); + } + } self.bind_copy_from_query_into_table(bind_context, plan, select_list, alias) .await } diff --git a/src/query/sql/src/planner/binder/insert.rs b/src/query/sql/src/planner/binder/insert.rs index 9fbd39acf6db0..ef2e6d1432af9 100644 --- a/src/query/sql/src/planner/binder/insert.rs +++ b/src/query/sql/src/planner/binder/insert.rs @@ -157,7 +157,7 @@ impl Binder { if let Plan::Query { s_expr, .. } = &select_plan { if !self.check_sexpr_top(s_expr, super::binder::CheckType::Insert)? { return Err(ErrorCode::SemanticError( - "insert source's condition can't contain udf functions".to_string(), + "insert source can't contain udf functions".to_string(), )); } } diff --git a/src/query/sql/src/planner/binder/merge_into.rs b/src/query/sql/src/planner/binder/merge_into.rs index 67550de266a52..db9cda0f4cb56 100644 --- a/src/query/sql/src/planner/binder/merge_into.rs +++ b/src/query/sql/src/planner/binder/merge_into.rs @@ -129,7 +129,7 @@ impl Binder { if !self.check_sexpr_top(&source_expr, super::binder::CheckType::Merge)? { return Err(ErrorCode::SemanticError( - "replace source's condition can't contain udf functions".to_string(), + "replace source can't contain udf functions".to_string(), )); } diff --git a/src/query/sql/src/planner/binder/replace.rs b/src/query/sql/src/planner/binder/replace.rs index 35903edf7864f..faf699c7fccf8 100644 --- a/src/query/sql/src/planner/binder/replace.rs +++ b/src/query/sql/src/planner/binder/replace.rs @@ -140,7 +140,7 @@ impl Binder { if let Plan::Query { s_expr, .. } = &select_plan { if !self.check_sexpr_top(s_expr, super::binder::CheckType::Replace)? { return Err(ErrorCode::SemanticError( - "replace source's condition can't contain udf functions".to_string(), + "replace source can't contain udf functions".to_string(), )); } } diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0032_pr13848.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0032_pr13848.test new file mode 100644 index 0000000000000..09c339d2678d6 --- /dev/null +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0032_pr13848.test @@ -0,0 +1,105 @@ +## test window,agg,join's correctess +statement ok +set enable_experimental_merge_into = 1; + +statement ok +set enable_distributed_merge_into = 1; + +statement ok +create table merge_target_0(a int,b string); + +statement ok +create table merge_source_0(a int,b string); + +statement ok +insert into merge_target_0 values(1,'a1'),(2,'b1'); + +statement ok +insert into merge_target_0 values(3,'a2'),(4,'b2'); + +statement ok +insert into merge_source_0 values(1,'a3'),(3,'b3'); + +statement ok +insert into merge_source_0 values(5,'a4'),(6,'b6'); + +## test window,agg,join +## 1. join test +statement ok +merge into merge_target_0 as t1 using +(select t2.a,t3.b from merge_source_0 as t2 inner join merge_source_0 as t3 on t2.a = t3.a) as t4 +on t4.a = t1.a when matched then update * when not matched then insert *; + +query TT +select * from merge_target_0 order by a,b; +----- +1 a3 +2 b1 +3 b3 +4 b2 +5 a4 +6 b6 + +statement ok +truncate table merge_source_0; + +statement ok +insert into merge_source_0 values(1,'c7'),(3,'c7'); + +query TT +select * from merge_source_0 order by a,b; +---- +1 c7 +3 c7 + +## 2. agg test +statement ok +merge into merge_target_0 as t1 using (select avg(a) as a,b from merge_source_0 group by b) as t2 on t1.a = t2.a +when matched then update * when not matched then insert *; + +query TT +select * from merge_target_0 order by a,b; +---- +1 a3 +2 c7 +3 b3 +4 b2 +5 a4 +6 b6 + +## 2. window func test +statement ok +merge into merge_target_0 as t1 using (select row_number() OVER (PARTITION BY b ORDER BY a) as a,'d1' as b from merge_source_0) as t2 on t1.a = t2.a +when matched then update * when not matched then insert *; + +query TT +select * from merge_target_0 order by a,b; +---- +1 d1 +2 d1 +3 b3 +4 b2 +5 a4 +6 b6 + +### test copy into table unsupport (@youngsofun suppory query) +##statement ok +##create table copy_table_test0(a int,b string); + +## test agg +##statement error 1065 +##copy into copy_table_test0 from (select avg(a) as a,b from merge_source_0 group by b); + +## test window +##statement error 1065 +##copy into copy_table_test0 from (select row_number() OVER (PARTITION BY b ORDER BY a) as a,'d1' as b from merge_source_0); + +## test join +##statement error 1065 +##copy into copy_table_test0 from (select t2.a,t3.b from merge_source_0 as t2 inner join merge_source_0 as t3 on t2.a = t3.a); + +statement ok +set enable_distributed_merge_into = 0; + +statement ok +set enable_experimental_merge_into = 0; \ No newline at end of file From 40650fddb905c36cbfb2a4f930e9555257e78a5a Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Thu, 30 Nov 2023 18:09:08 +0800 Subject: [PATCH 11/26] add create table visit --- src/query/ast/src/visitors/visitor.rs | 6 +++- src/query/ast/src/visitors/visitor_mut.rs | 6 +++- .../sql/src/planner/binder/copy_into_table.rs | 17 ++++++----- .../base/09_fuse_engine/09_0032_pr13848.test | 29 ++++++++++++------- 4 files changed, 38 insertions(+), 20 deletions(-) diff --git a/src/query/ast/src/visitors/visitor.rs b/src/query/ast/src/visitors/visitor.rs index 64b98a3016445..e45e3bdfbea3e 100644 --- a/src/query/ast/src/visitors/visitor.rs +++ b/src/query/ast/src/visitors/visitor.rs @@ -519,7 +519,11 @@ pub trait Visitor<'ast>: Sized { fn visit_show_drop_tables(&mut self, _stmt: &'ast ShowDropTablesStmt) {} - fn visit_create_table(&mut self, _stmt: &'ast CreateTableStmt) {} + fn visit_create_table(&mut self, stmt: &'ast CreateTableStmt) { + if let Some(query) = stmt.as_query.as_deref() { + self.visit_query(query) + } + } fn visit_create_table_source(&mut self, _source: &'ast CreateTableSource) {} diff --git a/src/query/ast/src/visitors/visitor_mut.rs b/src/query/ast/src/visitors/visitor_mut.rs index 461856263f255..bc7f9f99f6f36 100644 --- a/src/query/ast/src/visitors/visitor_mut.rs +++ b/src/query/ast/src/visitors/visitor_mut.rs @@ -533,7 +533,11 @@ pub trait VisitorMut: Sized { fn visit_show_drop_tables(&mut self, _stmt: &mut ShowDropTablesStmt) {} - fn visit_create_table(&mut self, _stmt: &mut CreateTableStmt) {} + fn visit_create_table(&mut self, stmt: &mut CreateTableStmt) { + if let Some(query) = stmt.as_query.as_deref_mut() { + self.visit_query(query) + } + } fn visit_create_table_source(&mut self, _source: &mut CreateTableSource) {} diff --git a/src/query/sql/src/planner/binder/copy_into_table.rs b/src/query/sql/src/planner/binder/copy_into_table.rs index fa72fec2f8282..3857a54abb746 100644 --- a/src/query/sql/src/planner/binder/copy_into_table.rs +++ b/src/query/sql/src/planner/binder/copy_into_table.rs @@ -91,14 +91,17 @@ impl<'a> Binder { let plan = self .bind_copy_into_table_common(bind_context, stmt, location) .await?; - if let Some(Plan::Query { s_expr, .. }) = plan.query.as_deref() { - if !self.check_sexpr_top(s_expr, super::binder::CheckType::CopyIntoTable)? { - return Err(ErrorCode::SemanticError( - "copy into table source can't contain window|aggregate|udf|join functions" - .to_string(), - )); - } + + // just check query + let mut tmp_bind_context = BindContext::new(); + let (bind_query, _) = self.bind_query(&mut tmp_bind_context, query).await?; + if !self.check_sexpr_top(&bind_query, super::binder::CheckType::CopyIntoTable)? { + return Err(ErrorCode::SemanticError( + "copy into table source can't contain window|aggregate|udf|join functions" + .to_string(), + )); } + self.bind_copy_from_query_into_table(bind_context, plan, select_list, alias) .await } diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0032_pr13848.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0032_pr13848.test index 09c339d2678d6..2a6fb2fa61d98 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0032_pr13848.test +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0032_pr13848.test @@ -32,7 +32,7 @@ on t4.a = t1.a when matched then update * when not matched then insert *; query TT select * from merge_target_0 order by a,b; ------ +---- 1 a3 2 b1 3 b3 @@ -82,21 +82,28 @@ select * from merge_target_0 order by a,b; 5 a4 6 b6 -### test copy into table unsupport (@youngsofun suppory query) -##statement ok -##create table copy_table_test0(a int,b string); +### test copy into table unsupport +statement ok +create table copy_table_test0(a int,b string); + +statement ok +create stage parquet_table0 FILE_FORMAT = (TYPE = PARQUET); + +statement ok +copy into @parquet_table0 from (select a,b from merge_source_0 limit 2) ## test agg -##statement error 1065 -##copy into copy_table_test0 from (select avg(a) as a,b from merge_source_0 group by b); +statement error 1065 +copy into copy_table_test0 from (select avg($1) as a,'b' as b from @parquet_table0); ## test window -##statement error 1065 -##copy into copy_table_test0 from (select row_number() OVER (PARTITION BY b ORDER BY a) as a,'d1' as b from merge_source_0); +statement error 1065 +copy into copy_table_test0 from (select row_number() OVER (PARTITION BY b ORDER BY a) as a,'d1' as b from @parquet_table0); -## test join -##statement error 1065 -##copy into copy_table_test0 from (select t2.a,t3.b from merge_source_0 as t2 inner join merge_source_0 as t3 on t2.a = t3.a); +## test join, but it's not checked as `copy into table source can't contain window|aggregate|udf|join functions` +## it's `query as source of copy only allow projection on one stage table.`, because it always support only one table. +statement error 1005 +copy into copy_table_test0 from (select t2.a,t3.b from @parquet_table0 as t ,@parquet_table0 as t3 where t2.a = t3.a); statement ok set enable_distributed_merge_into = 0; From 6b24f519e101670f3d97bba31520637c4e5a0786 Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Thu, 30 Nov 2023 19:08:45 +0800 Subject: [PATCH 12/26] fix test --- src/query/sql/src/planner/binder/copy_into_table.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/query/sql/src/planner/binder/copy_into_table.rs b/src/query/sql/src/planner/binder/copy_into_table.rs index 3857a54abb746..90e5d29d4c137 100644 --- a/src/query/sql/src/planner/binder/copy_into_table.rs +++ b/src/query/sql/src/planner/binder/copy_into_table.rs @@ -94,7 +94,13 @@ impl<'a> Binder { // just check query let mut tmp_bind_context = BindContext::new(); - let (bind_query, _) = self.bind_query(&mut tmp_bind_context, query).await?; + let mut new_binder = Binder::new( + self.ctx.clone(), + self.catalogs.clone(), + self.name_resolution_ctx.clone(), + self.metadata.clone(), + ); + let (bind_query, _) = new_binder.bind_query(&mut tmp_bind_context, query).await?; if !self.check_sexpr_top(&bind_query, super::binder::CheckType::CopyIntoTable)? { return Err(ErrorCode::SemanticError( "copy into table source can't contain window|aggregate|udf|join functions" From 2a8e5dba710a85c0066dafc7a313f08ff2fbd1f3 Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Thu, 30 Nov 2023 19:09:20 +0800 Subject: [PATCH 13/26] fix test --- src/query/sql/src/planner/binder/copy_into_table.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/query/sql/src/planner/binder/copy_into_table.rs b/src/query/sql/src/planner/binder/copy_into_table.rs index 90e5d29d4c137..4a637d3462f22 100644 --- a/src/query/sql/src/planner/binder/copy_into_table.rs +++ b/src/query/sql/src/planner/binder/copy_into_table.rs @@ -94,13 +94,13 @@ impl<'a> Binder { // just check query let mut tmp_bind_context = BindContext::new(); - let mut new_binder = Binder::new( + let mut tmp_binder = Binder::new( self.ctx.clone(), self.catalogs.clone(), self.name_resolution_ctx.clone(), self.metadata.clone(), ); - let (bind_query, _) = new_binder.bind_query(&mut tmp_bind_context, query).await?; + let (bind_query, _) = tmp_binder.bind_query(&mut tmp_bind_context, query).await?; if !self.check_sexpr_top(&bind_query, super::binder::CheckType::CopyIntoTable)? { return Err(ErrorCode::SemanticError( "copy into table source can't contain window|aggregate|udf|join functions" From 3eea3fd1ae4583be974b6fb8ad22f440e56c2b60 Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Thu, 30 Nov 2023 19:50:20 +0800 Subject: [PATCH 14/26] fix --- src/query/ast/src/visitors/visitor.rs | 20 ++++++++++--------- .../sql/src/planner/binder/copy_into_table.rs | 8 +------- .../02_0000_function_aggregate_mix.test | 2 +- 3 files changed, 13 insertions(+), 17 deletions(-) diff --git a/src/query/ast/src/visitors/visitor.rs b/src/query/ast/src/visitors/visitor.rs index e45e3bdfbea3e..181cbb6334317 100644 --- a/src/query/ast/src/visitors/visitor.rs +++ b/src/query/ast/src/visitors/visitor.rs @@ -378,15 +378,17 @@ pub trait Visitor<'ast>: Sized { fn visit_explain(&mut self, _kind: &'ast ExplainKind, _query: &'ast Statement) {} - fn visit_copy_into_table(&mut self, copy: &'ast CopyIntoTableStmt) { - if let CopyIntoTableSource::Query(query) = ©.src { - self.visit_query(query) - } - } - fn visit_copy_into_location(&mut self, copy: &'ast CopyIntoLocationStmt) { - if let CopyIntoLocationSource::Query(query) = ©.src { - self.visit_query(query) - } + fn visit_copy_into_table(&mut self, _copy: &'ast CopyIntoTableStmt) { + // todo!(youngsofun) please fix this + // if let CopyIntoTableSource::Query(query) = ©.src { + // self.visit_query(query) + // } + } + fn visit_copy_into_location(&mut self, _copy: &'ast CopyIntoLocationStmt) { + // todo!(youngsofun) please fix this + // if let CopyIntoLocationSource::Query(query) = ©.src { + // self.visit_query(query) + // } } fn visit_call(&mut self, _call: &'ast CallStmt) {} diff --git a/src/query/sql/src/planner/binder/copy_into_table.rs b/src/query/sql/src/planner/binder/copy_into_table.rs index 4a637d3462f22..3857a54abb746 100644 --- a/src/query/sql/src/planner/binder/copy_into_table.rs +++ b/src/query/sql/src/planner/binder/copy_into_table.rs @@ -94,13 +94,7 @@ impl<'a> Binder { // just check query let mut tmp_bind_context = BindContext::new(); - let mut tmp_binder = Binder::new( - self.ctx.clone(), - self.catalogs.clone(), - self.name_resolution_ctx.clone(), - self.metadata.clone(), - ); - let (bind_query, _) = tmp_binder.bind_query(&mut tmp_bind_context, query).await?; + let (bind_query, _) = self.bind_query(&mut tmp_bind_context, query).await?; if !self.check_sexpr_top(&bind_query, super::binder::CheckType::CopyIntoTable)? { return Err(ErrorCode::SemanticError( "copy into table source can't contain window|aggregate|udf|join functions" diff --git a/tests/sqllogictests/suites/query/02_function/02_0000_function_aggregate_mix.test b/tests/sqllogictests/suites/query/02_function/02_0000_function_aggregate_mix.test index 337b79301f1e5..07b9ac602eeb3 100644 --- a/tests/sqllogictests/suites/query/02_function/02_0000_function_aggregate_mix.test +++ b/tests/sqllogictests/suites/query/02_function/02_0000_function_aggregate_mix.test @@ -378,7 +378,7 @@ create table aggavg1 as select t.goodsid, t.md, avg(t.avgcostvalue) avgcostvalue query TTFFF select * from aggavg1 ---- -149350 246 18.99817596 37.99635193 2 +149350 246 18.998175965000 37.99635193 2 statement ok DROP TABLE aggr From 78c0f6f655c5ee65cdd6434ab0a07746838c5e90 Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Fri, 1 Dec 2023 00:41:45 +0800 Subject: [PATCH 15/26] revert copy --- src/query/ast/src/visitors/visitor.rs | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/src/query/ast/src/visitors/visitor.rs b/src/query/ast/src/visitors/visitor.rs index 181cbb6334317..14be08e4dcdf5 100644 --- a/src/query/ast/src/visitors/visitor.rs +++ b/src/query/ast/src/visitors/visitor.rs @@ -379,16 +379,14 @@ pub trait Visitor<'ast>: Sized { fn visit_explain(&mut self, _kind: &'ast ExplainKind, _query: &'ast Statement) {} fn visit_copy_into_table(&mut self, _copy: &'ast CopyIntoTableStmt) { - // todo!(youngsofun) please fix this - // if let CopyIntoTableSource::Query(query) = ©.src { - // self.visit_query(query) - // } + if let CopyIntoTableSource::Query(query) = ©.src { + self.visit_query(query) + } } fn visit_copy_into_location(&mut self, _copy: &'ast CopyIntoLocationStmt) { - // todo!(youngsofun) please fix this - // if let CopyIntoLocationSource::Query(query) = ©.src { - // self.visit_query(query) - // } + if let CopyIntoLocationSource::Query(query) = ©.src { + self.visit_query(query) + } } fn visit_call(&mut self, _call: &'ast CallStmt) {} From 775d2b4dbb5b901d50d8c4f39a57afe8b1874eaa Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Fri, 1 Dec 2023 00:42:02 +0800 Subject: [PATCH 16/26] revert copy --- src/query/ast/src/visitors/visitor.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/query/ast/src/visitors/visitor.rs b/src/query/ast/src/visitors/visitor.rs index 14be08e4dcdf5..e45e3bdfbea3e 100644 --- a/src/query/ast/src/visitors/visitor.rs +++ b/src/query/ast/src/visitors/visitor.rs @@ -378,12 +378,12 @@ pub trait Visitor<'ast>: Sized { fn visit_explain(&mut self, _kind: &'ast ExplainKind, _query: &'ast Statement) {} - fn visit_copy_into_table(&mut self, _copy: &'ast CopyIntoTableStmt) { + fn visit_copy_into_table(&mut self, copy: &'ast CopyIntoTableStmt) { if let CopyIntoTableSource::Query(query) = ©.src { self.visit_query(query) } } - fn visit_copy_into_location(&mut self, _copy: &'ast CopyIntoLocationStmt) { + fn visit_copy_into_location(&mut self, copy: &'ast CopyIntoLocationStmt) { if let CopyIntoLocationSource::Query(query) = ©.src { self.visit_query(query) } From 4baa751cc11f9086472f3b2bb14d91b20596f3db Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Fri, 1 Dec 2023 01:05:11 +0800 Subject: [PATCH 17/26] fix --- src/query/sql/src/planner/binder/copy_into_table.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/query/sql/src/planner/binder/copy_into_table.rs b/src/query/sql/src/planner/binder/copy_into_table.rs index 3857a54abb746..4a637d3462f22 100644 --- a/src/query/sql/src/planner/binder/copy_into_table.rs +++ b/src/query/sql/src/planner/binder/copy_into_table.rs @@ -94,7 +94,13 @@ impl<'a> Binder { // just check query let mut tmp_bind_context = BindContext::new(); - let (bind_query, _) = self.bind_query(&mut tmp_bind_context, query).await?; + let mut tmp_binder = Binder::new( + self.ctx.clone(), + self.catalogs.clone(), + self.name_resolution_ctx.clone(), + self.metadata.clone(), + ); + let (bind_query, _) = tmp_binder.bind_query(&mut tmp_bind_context, query).await?; if !self.check_sexpr_top(&bind_query, super::binder::CheckType::CopyIntoTable)? { return Err(ErrorCode::SemanticError( "copy into table source can't contain window|aggregate|udf|join functions" From 235f1b5b1ce81f68539932e595c348e1b4bbc9b3 Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Fri, 1 Dec 2023 13:32:07 +0800 Subject: [PATCH 18/26] revert test --- src/query/ast/src/visitors/visitor.rs | 8 ++++---- src/query/ast/src/visitors/visitor_mut.rs | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/query/ast/src/visitors/visitor.rs b/src/query/ast/src/visitors/visitor.rs index e45e3bdfbea3e..2a637919b7141 100644 --- a/src/query/ast/src/visitors/visitor.rs +++ b/src/query/ast/src/visitors/visitor.rs @@ -378,10 +378,10 @@ pub trait Visitor<'ast>: Sized { fn visit_explain(&mut self, _kind: &'ast ExplainKind, _query: &'ast Statement) {} - fn visit_copy_into_table(&mut self, copy: &'ast CopyIntoTableStmt) { - if let CopyIntoTableSource::Query(query) = ©.src { - self.visit_query(query) - } + fn visit_copy_into_table(&mut self, _copy: &'ast CopyIntoTableStmt) { + // if let CopyIntoTableSource::Query(query) = ©.src { + // self.visit_query(query) + // } } fn visit_copy_into_location(&mut self, copy: &'ast CopyIntoLocationStmt) { if let CopyIntoLocationSource::Query(query) = ©.src { diff --git a/src/query/ast/src/visitors/visitor_mut.rs b/src/query/ast/src/visitors/visitor_mut.rs index bc7f9f99f6f36..2d4ebf112e92f 100644 --- a/src/query/ast/src/visitors/visitor_mut.rs +++ b/src/query/ast/src/visitors/visitor_mut.rs @@ -394,10 +394,10 @@ pub trait VisitorMut: Sized { walk_statement_mut(self, stmt); } - fn visit_copy_into_table(&mut self, copy: &mut CopyIntoTableStmt) { - if let CopyIntoTableSource::Query(query) = &mut copy.src { - self.visit_query(query) - } + fn visit_copy_into_table(&mut self, _copy: &mut CopyIntoTableStmt) { + // if let CopyIntoTableSource::Query(query) = &mut copy.src { + // self.visit_query(query) + // } } fn visit_copy_into_location(&mut self, copy: &mut CopyIntoLocationStmt) { if let CopyIntoLocationSource::Query(query) = &mut copy.src { From a27a2b2f341dc04d2661ddd3927cc9d987fb4f66 Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Fri, 1 Dec 2023 15:49:25 +0800 Subject: [PATCH 19/26] fix --- src/query/ast/src/visitors/visitor.rs | 8 +++---- src/query/ast/src/visitors/visitor_mut.rs | 8 +++---- .../sql/src/planner/binder/copy_into_table.rs | 24 +++++++------------ 3 files changed, 16 insertions(+), 24 deletions(-) diff --git a/src/query/ast/src/visitors/visitor.rs b/src/query/ast/src/visitors/visitor.rs index 2a637919b7141..e45e3bdfbea3e 100644 --- a/src/query/ast/src/visitors/visitor.rs +++ b/src/query/ast/src/visitors/visitor.rs @@ -378,10 +378,10 @@ pub trait Visitor<'ast>: Sized { fn visit_explain(&mut self, _kind: &'ast ExplainKind, _query: &'ast Statement) {} - fn visit_copy_into_table(&mut self, _copy: &'ast CopyIntoTableStmt) { - // if let CopyIntoTableSource::Query(query) = ©.src { - // self.visit_query(query) - // } + fn visit_copy_into_table(&mut self, copy: &'ast CopyIntoTableStmt) { + if let CopyIntoTableSource::Query(query) = ©.src { + self.visit_query(query) + } } fn visit_copy_into_location(&mut self, copy: &'ast CopyIntoLocationStmt) { if let CopyIntoLocationSource::Query(query) = ©.src { diff --git a/src/query/ast/src/visitors/visitor_mut.rs b/src/query/ast/src/visitors/visitor_mut.rs index 2d4ebf112e92f..bc7f9f99f6f36 100644 --- a/src/query/ast/src/visitors/visitor_mut.rs +++ b/src/query/ast/src/visitors/visitor_mut.rs @@ -394,10 +394,10 @@ pub trait VisitorMut: Sized { walk_statement_mut(self, stmt); } - fn visit_copy_into_table(&mut self, _copy: &mut CopyIntoTableStmt) { - // if let CopyIntoTableSource::Query(query) = &mut copy.src { - // self.visit_query(query) - // } + fn visit_copy_into_table(&mut self, copy: &mut CopyIntoTableStmt) { + if let CopyIntoTableSource::Query(query) = &mut copy.src { + self.visit_query(query) + } } fn visit_copy_into_location(&mut self, copy: &mut CopyIntoLocationStmt) { if let CopyIntoLocationSource::Query(query) = &mut copy.src { diff --git a/src/query/sql/src/planner/binder/copy_into_table.rs b/src/query/sql/src/planner/binder/copy_into_table.rs index 4a637d3462f22..db31b19372271 100644 --- a/src/query/sql/src/planner/binder/copy_into_table.rs +++ b/src/query/sql/src/planner/binder/copy_into_table.rs @@ -92,27 +92,12 @@ impl<'a> Binder { .bind_copy_into_table_common(bind_context, stmt, location) .await?; - // just check query - let mut tmp_bind_context = BindContext::new(); - let mut tmp_binder = Binder::new( - self.ctx.clone(), - self.catalogs.clone(), - self.name_resolution_ctx.clone(), - self.metadata.clone(), - ); - let (bind_query, _) = tmp_binder.bind_query(&mut tmp_bind_context, query).await?; - if !self.check_sexpr_top(&bind_query, super::binder::CheckType::CopyIntoTable)? { - return Err(ErrorCode::SemanticError( - "copy into table source can't contain window|aggregate|udf|join functions" - .to_string(), - )); - } - self.bind_copy_from_query_into_table(bind_context, plan, select_list, alias) .await } } } + async fn bind_copy_into_table_common( &mut self, bind_context: &mut BindContext, @@ -368,6 +353,12 @@ impl<'a> Binder { output_context.parent = from_context.parent; output_context.columns = from_context.columns; + if !self.check_sexpr_top(&s_expr, super::binder::CheckType::CopyIntoTable)? { + return Err(ErrorCode::SemanticError( + "insert source can't contain udf functions".to_string(), + )); + } + plan.query = Some(Box::new(Plan::Query { s_expr: Box::new(s_expr), metadata: self.metadata.clone(), @@ -376,6 +367,7 @@ impl<'a> Binder { ignore_result: false, formatted_ast: None, })); + Ok(Plan::CopyIntoTable(Box::new(plan))) } From 96312e5e1d29ede39948af429ac9d73cf8f16637 Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Fri, 1 Dec 2023 16:00:31 +0800 Subject: [PATCH 20/26] fix comments --- src/query/sql/src/planner/binder/copy_into_table.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/query/sql/src/planner/binder/copy_into_table.rs b/src/query/sql/src/planner/binder/copy_into_table.rs index db31b19372271..03b8730198254 100644 --- a/src/query/sql/src/planner/binder/copy_into_table.rs +++ b/src/query/sql/src/planner/binder/copy_into_table.rs @@ -355,7 +355,8 @@ impl<'a> Binder { if !self.check_sexpr_top(&s_expr, super::binder::CheckType::CopyIntoTable)? { return Err(ErrorCode::SemanticError( - "insert source can't contain udf functions".to_string(), + "copy into table source can't contain window|aggregate|udf|join functions" + .to_string(), )); } From c5077cfc0edf6e539a04076563adf43415f3dfe7 Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Fri, 1 Dec 2023 16:16:54 +0800 Subject: [PATCH 21/26] fix --- src/query/sql/src/planner/binder/binder.rs | 80 +++++++------------ .../sql/src/planner/binder/copy_into_table.rs | 19 +++-- 2 files changed, 40 insertions(+), 59 deletions(-) diff --git a/src/query/sql/src/planner/binder/binder.rs b/src/query/sql/src/planner/binder/binder.rs index e0b6e5ae80b21..34fc822bcf01a 100644 --- a/src/query/sql/src/planner/binder/binder.rs +++ b/src/query/sql/src/planner/binder/binder.rs @@ -75,7 +75,6 @@ pub enum CheckType { Merge, Insert, Replace, - CopyIntoTable, CopyIntoLocation, } @@ -684,18 +683,7 @@ impl<'a> Binder { // add check for SExpr to disable invalid source for copy/insert/merge/replace pub(crate) fn check_sexpr_top(&self, s_expr: &SExpr, check_type: CheckType) -> Result { - let f = match check_type { - CheckType::CopyIntoTable => |scalar: &ScalarExpr| { - matches!( - scalar, - ScalarExpr::WindowFunction(_) - | ScalarExpr::AggregateFunction(_) - | ScalarExpr::UDFServerCall(_) - ) - }, - _ => |scalar: &ScalarExpr| matches!(scalar, ScalarExpr::UDFServerCall(_)), - }; - + let f = |scalar: &ScalarExpr| matches!(scalar, ScalarExpr::UDFServerCall(_)); let mut finder = Finder::new(&f); Self::check_sexpr(s_expr, &mut finder, &check_type) } @@ -727,21 +715,17 @@ impl<'a> Binder { f.scalars().is_empty() } RelOperator::Join(join) => { - if let CheckType::CopyIntoTable = check_type { - false - } else { - f.reset_finder(); - for condition in &join.left_conditions { - f.visit(condition)?; - } - for condition in &join.right_conditions { - f.visit(condition)?; - } - for condition in &join.non_equi_conditions { - f.visit(condition)?; - } - f.scalars().is_empty() + f.reset_finder(); + for condition in &join.left_conditions { + f.visit(condition)?; } + for condition in &join.right_conditions { + f.visit(condition)?; + } + for condition in &join.non_equi_conditions { + f.visit(condition)?; + } + f.scalars().is_empty() } RelOperator::EvalScalar(eval) => { f.reset_finder(); @@ -758,18 +742,14 @@ impl<'a> Binder { f.scalars().is_empty() } RelOperator::Aggregate(aggregate) => { - if let CheckType::CopyIntoTable = check_type { - false - } else { - f.reset_finder(); - for item in &aggregate.group_items { - f.visit(&item.scalar)?; - } - for item in &aggregate.aggregate_functions { - f.visit(&item.scalar)?; - } - f.scalars().is_empty() + f.reset_finder(); + for item in &aggregate.group_items { + f.visit(&item.scalar)?; + } + for item in &aggregate.aggregate_functions { + f.visit(&item.scalar)?; } + f.scalars().is_empty() } RelOperator::Exchange(exchange) => { f.reset_finder(); @@ -791,21 +771,17 @@ impl<'a> Binder { f.scalars().is_empty() } RelOperator::Window(window) => { - if let CheckType::CopyIntoTable = check_type { - false - } else { - f.reset_finder(); - for scalar_item in &window.arguments { - f.visit(&scalar_item.scalar)?; - } - for scalar_item in &window.partition_by { - f.visit(&scalar_item.scalar)?; - } - for info in &window.order_by { - f.visit(&info.order_by_item.scalar)?; - } - f.scalars().is_empty() + f.reset_finder(); + for scalar_item in &window.arguments { + f.visit(&scalar_item.scalar)?; } + for scalar_item in &window.partition_by { + f.visit(&scalar_item.scalar)?; + } + for info in &window.order_by { + f.visit(&info.order_by_item.scalar)?; + } + f.scalars().is_empty() } RelOperator::Udf(_) => false, _ => true, diff --git a/src/query/sql/src/planner/binder/copy_into_table.rs b/src/query/sql/src/planner/binder/copy_into_table.rs index 03b8730198254..1653ece9ef955 100644 --- a/src/query/sql/src/planner/binder/copy_into_table.rs +++ b/src/query/sql/src/planner/binder/copy_into_table.rs @@ -65,6 +65,7 @@ use crate::BindContext; use crate::Metadata; use crate::NameResolutionContext; use crate::ScalarBinder; +use crate::ScalarExpr; impl<'a> Binder { #[async_backtrace::framed] @@ -333,6 +334,17 @@ impl<'a> Binder { let select_list = self .normalize_select_list(&mut from_context, select_list) .await?; + + for item in select_list.items.iter() { + if matches!(&item.scalar, ScalarExpr::AggregateFunction(_)) + || matches!(&item.scalar, ScalarExpr::WindowFunction(_)) + { + return Err(ErrorCode::SemanticError( + "copy into table source can't contain window|aggregate|udf|join functions" + .to_string(), + )); + }; + } let (scalar_items, projections) = self.analyze_projection( &from_context.aggregate_info, &from_context.windows, @@ -353,13 +365,6 @@ impl<'a> Binder { output_context.parent = from_context.parent; output_context.columns = from_context.columns; - if !self.check_sexpr_top(&s_expr, super::binder::CheckType::CopyIntoTable)? { - return Err(ErrorCode::SemanticError( - "copy into table source can't contain window|aggregate|udf|join functions" - .to_string(), - )); - } - plan.query = Some(Box::new(Plan::Query { s_expr: Box::new(s_expr), metadata: self.metadata.clone(), From 150007fdb6c85ee7f568f6e8ec97449033531e5b Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Fri, 1 Dec 2023 16:30:08 +0800 Subject: [PATCH 22/26] fix --- src/query/sql/src/planner/binder/copy_into_table.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/query/sql/src/planner/binder/copy_into_table.rs b/src/query/sql/src/planner/binder/copy_into_table.rs index 1653ece9ef955..aeaa1450d4a63 100644 --- a/src/query/sql/src/planner/binder/copy_into_table.rs +++ b/src/query/sql/src/planner/binder/copy_into_table.rs @@ -336,9 +336,8 @@ impl<'a> Binder { .await?; for item in select_list.items.iter() { - if matches!(&item.scalar, ScalarExpr::AggregateFunction(_)) - || matches!(&item.scalar, ScalarExpr::WindowFunction(_)) - { + if self.check_allowed_scalar_expr_with_subquery(&item.scalar)? { + // in fact, if there is a join, we will stop in `check_transform_query()` return Err(ErrorCode::SemanticError( "copy into table source can't contain window|aggregate|udf|join functions" .to_string(), From 7d4e5a8e2d2ff5b4a05f2395a11f872ef5644cf2 Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Fri, 1 Dec 2023 16:36:06 +0800 Subject: [PATCH 23/26] fix lint --- src/query/sql/src/planner/binder/copy_into_table.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/query/sql/src/planner/binder/copy_into_table.rs b/src/query/sql/src/planner/binder/copy_into_table.rs index aeaa1450d4a63..89470c1ba1173 100644 --- a/src/query/sql/src/planner/binder/copy_into_table.rs +++ b/src/query/sql/src/planner/binder/copy_into_table.rs @@ -65,7 +65,6 @@ use crate::BindContext; use crate::Metadata; use crate::NameResolutionContext; use crate::ScalarBinder; -use crate::ScalarExpr; impl<'a> Binder { #[async_backtrace::framed] From 50dd7ce934300519ceedbe1f245528e6ccde6fbb Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Fri, 1 Dec 2023 16:39:54 +0800 Subject: [PATCH 24/26] fix lint --- .../interpreters/interpreter_merge_into.rs | 7 +++--- src/query/sql/src/planner/binder/binder.rs | 23 ++++--------------- .../src/planner/binder/copy_into_location.rs | 2 +- src/query/sql/src/planner/binder/insert.rs | 2 +- .../sql/src/planner/binder/merge_into.rs | 2 +- src/query/sql/src/planner/binder/replace.rs | 2 +- 6 files changed, 13 insertions(+), 25 deletions(-) diff --git a/src/query/service/src/interpreters/interpreter_merge_into.rs b/src/query/service/src/interpreters/interpreter_merge_into.rs index dc5b9ee405d92..77e210b31b54b 100644 --- a/src/query/service/src/interpreters/interpreter_merge_into.rs +++ b/src/query/service/src/interpreters/interpreter_merge_into.rs @@ -91,9 +91,10 @@ impl Interpreter for MergeIntoInterpreter { .await?; // Add table lock before execution. - let table_lock = LockManager::create_table_lock(table_info)?; - let lock_guard = table_lock.try_lock(self.ctx.clone()).await?; - build_res.main_pipeline.add_lock_guard(lock_guard); + // todo!(@zhyass) :But for now the lock maybe exist problem, let's open this after fix it. + // let table_lock = LockManager::create_table_lock(table_info)?; + // let lock_guard = table_lock.try_lock(self.ctx.clone()).await?; + // build_res.main_pipeline.add_lock_guard(lock_guard); // Compact if 'enable_recluster_after_write' on. { diff --git a/src/query/sql/src/planner/binder/binder.rs b/src/query/sql/src/planner/binder/binder.rs index 34fc822bcf01a..30a2ed94904c1 100644 --- a/src/query/sql/src/planner/binder/binder.rs +++ b/src/query/sql/src/planner/binder/binder.rs @@ -71,13 +71,6 @@ use crate::ScalarExpr; use crate::TypeChecker; use crate::Visibility; -pub enum CheckType { - Merge, - Insert, - Replace, - CopyIntoLocation, -} - /// Binder is responsible to transform AST of a query into a canonical logical SExpr. /// /// During this phase, it will: @@ -682,20 +675,14 @@ impl<'a> Binder { } // add check for SExpr to disable invalid source for copy/insert/merge/replace - pub(crate) fn check_sexpr_top(&self, s_expr: &SExpr, check_type: CheckType) -> Result { + pub(crate) fn check_sexpr_top(&self, s_expr: &SExpr) -> Result { let f = |scalar: &ScalarExpr| matches!(scalar, ScalarExpr::UDFServerCall(_)); let mut finder = Finder::new(&f); - Self::check_sexpr(s_expr, &mut finder, &check_type) + Self::check_sexpr(s_expr, &mut finder) } - pub(crate) fn check_sexpr( - s_expr: &'a SExpr, - f: &'a mut Finder<'a, F>, - check_type: &CheckType, - ) -> Result - where - F: Fn(&ScalarExpr) -> bool, - { + pub(crate) fn check_sexpr(s_expr: &'a SExpr, f: &'a mut Finder<'a, F>) -> Result + where F: Fn(&ScalarExpr) -> bool { let result = match s_expr.plan.as_ref() { RelOperator::Scan(scan) => { f.reset_finder(); @@ -791,7 +778,7 @@ impl<'a> Binder { true => { for child in &s_expr.children { let mut finder = Finder::new(f.find_fn()); - let flag = Self::check_sexpr(child.as_ref(), &mut finder, check_type)?; + let flag = Self::check_sexpr(child.as_ref(), &mut finder)?; if !flag { return Ok(false); } diff --git a/src/query/sql/src/planner/binder/copy_into_location.rs b/src/query/sql/src/planner/binder/copy_into_location.rs index d31808c78567a..9f0f98ee158dd 100644 --- a/src/query/sql/src/planner/binder/copy_into_location.rs +++ b/src/query/sql/src/planner/binder/copy_into_location.rs @@ -63,7 +63,7 @@ impl<'a> Binder { .bind_statement(bind_context, &Statement::Query(query.clone())) .await?; if let Plan::Query { s_expr, .. } = &select_plan { - if !self.check_sexpr_top(s_expr, super::binder::CheckType::CopyIntoLocation)? { + if !self.check_sexpr_top(s_expr)? { return Err(ErrorCode::SemanticError( "copy into location source can't contain udf functions".to_string(), )); diff --git a/src/query/sql/src/planner/binder/insert.rs b/src/query/sql/src/planner/binder/insert.rs index ef2e6d1432af9..2db84057eb1c8 100644 --- a/src/query/sql/src/planner/binder/insert.rs +++ b/src/query/sql/src/planner/binder/insert.rs @@ -155,7 +155,7 @@ impl Binder { })); if let Plan::Query { s_expr, .. } = &select_plan { - if !self.check_sexpr_top(s_expr, super::binder::CheckType::Insert)? { + if !self.check_sexpr_top(s_expr)? { return Err(ErrorCode::SemanticError( "insert source can't contain udf functions".to_string(), )); diff --git a/src/query/sql/src/planner/binder/merge_into.rs b/src/query/sql/src/planner/binder/merge_into.rs index 6cbe1d81f1801..c2f33778b17b7 100644 --- a/src/query/sql/src/planner/binder/merge_into.rs +++ b/src/query/sql/src/planner/binder/merge_into.rs @@ -199,7 +199,7 @@ impl Binder { let (source_expr, mut source_context) = self.bind_single_table(bind_context, &source_data).await?; - if !self.check_sexpr_top(&source_expr, super::binder::CheckType::Merge)? { + if !self.check_sexpr_top(&source_expr)? { return Err(ErrorCode::SemanticError( "replace source can't contain udf functions".to_string(), )); diff --git a/src/query/sql/src/planner/binder/replace.rs b/src/query/sql/src/planner/binder/replace.rs index faf699c7fccf8..148623c45f8c7 100644 --- a/src/query/sql/src/planner/binder/replace.rs +++ b/src/query/sql/src/planner/binder/replace.rs @@ -138,7 +138,7 @@ impl Binder { let statement = Statement::Query(query); let select_plan = self.bind_statement(bind_context, &statement).await?; if let Plan::Query { s_expr, .. } = &select_plan { - if !self.check_sexpr_top(s_expr, super::binder::CheckType::Replace)? { + if !self.check_sexpr_top(s_expr)? { return Err(ErrorCode::SemanticError( "replace source can't contain udf functions".to_string(), )); From bb01af7b4b6bf6211b6e15c72e3f1a9adb322d32 Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Fri, 1 Dec 2023 16:41:45 +0800 Subject: [PATCH 25/26] fix lint --- .../service/src/interpreters/interpreter_merge_into.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/query/service/src/interpreters/interpreter_merge_into.rs b/src/query/service/src/interpreters/interpreter_merge_into.rs index 77e210b31b54b..dc5b9ee405d92 100644 --- a/src/query/service/src/interpreters/interpreter_merge_into.rs +++ b/src/query/service/src/interpreters/interpreter_merge_into.rs @@ -91,10 +91,9 @@ impl Interpreter for MergeIntoInterpreter { .await?; // Add table lock before execution. - // todo!(@zhyass) :But for now the lock maybe exist problem, let's open this after fix it. - // let table_lock = LockManager::create_table_lock(table_info)?; - // let lock_guard = table_lock.try_lock(self.ctx.clone()).await?; - // build_res.main_pipeline.add_lock_guard(lock_guard); + let table_lock = LockManager::create_table_lock(table_info)?; + let lock_guard = table_lock.try_lock(self.ctx.clone()).await?; + build_res.main_pipeline.add_lock_guard(lock_guard); // Compact if 'enable_recluster_after_write' on. { From 3bcc1f936ef9d842a4d2665d06c94b028c6f34f9 Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Fri, 1 Dec 2023 17:23:14 +0800 Subject: [PATCH 26/26] fix --- src/query/sql/src/planner/binder/copy_into_table.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/sql/src/planner/binder/copy_into_table.rs b/src/query/sql/src/planner/binder/copy_into_table.rs index 89470c1ba1173..ff9731af48365 100644 --- a/src/query/sql/src/planner/binder/copy_into_table.rs +++ b/src/query/sql/src/planner/binder/copy_into_table.rs @@ -335,7 +335,7 @@ impl<'a> Binder { .await?; for item in select_list.items.iter() { - if self.check_allowed_scalar_expr_with_subquery(&item.scalar)? { + if !self.check_allowed_scalar_expr_with_subquery(&item.scalar)? { // in fact, if there is a join, we will stop in `check_transform_query()` return Err(ErrorCode::SemanticError( "copy into table source can't contain window|aggregate|udf|join functions"