From 94987cea1c855a98624cfa7ac6a8305c6e7e4b44 Mon Sep 17 00:00:00 2001 From: Dousir9 <736191200@qq.com> Date: Wed, 24 Apr 2024 13:23:27 +0800 Subject: [PATCH 1/3] chore(planner): fix merge into statistics --- .../sql/src/planner/optimizer/optimizer.rs | 7 ++-- ...39_target_build_merge_into_standalone.test | 6 +-- .../mode/standalone/explain/merge_into.test | 38 +++++++++---------- 3 files changed, 25 insertions(+), 26 deletions(-) diff --git a/src/query/sql/src/planner/optimizer/optimizer.rs b/src/query/sql/src/planner/optimizer/optimizer.rs index a7d537d5cd750..e197f27026e7a 100644 --- a/src/query/sql/src/planner/optimizer/optimizer.rs +++ b/src/query/sql/src/planner/optimizer/optimizer.rs @@ -379,10 +379,9 @@ async fn optimize_merge_into(opt_ctx: OptimizerContext, plan: Box) -> } // replace right source let mut join_sexpr = plan.input.clone(); - join_sexpr = Box::new(join_sexpr.replace_children(vec![ - Arc::new(join_sexpr.child(0)?.clone()), - Arc::new(right_source), - ])); + let left_target = optimize_query(opt_ctx.clone(), join_sexpr.child(0)?.clone()).await?; + join_sexpr = + Box::new(join_sexpr.replace_children(vec![Arc::new(left_target), Arc::new(right_source)])); let join_op = Join::try_from(join_sexpr.plan().clone())?; let non_equal_join = join_op.right_conditions.is_empty() && join_op.left_conditions.is_empty(); diff --git a/tests/sqllogictests/suites/mode/standalone/explain/09_0039_target_build_merge_into_standalone.test b/tests/sqllogictests/suites/mode/standalone/explain/09_0039_target_build_merge_into_standalone.test index a53712011579d..46199069ff991 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain/09_0039_target_build_merge_into_standalone.test +++ b/tests/sqllogictests/suites/mode/standalone/explain/09_0039_target_build_merge_into_standalone.test @@ -80,7 +80,7 @@ HashJoin │ ├── partitions scanned: 4 │ ├── pruning stats: [segments: , blocks: ] │ ├── push downs: [filters: [], limit: NONE] -│ └── estimated rows: 0.00 +│ └── estimated rows: 8.00 └── TableScan(Probe) ├── table: default.default.source_optimization ├── output columns: [a (#0), b (#1), c (#2)] @@ -155,7 +155,7 @@ HashJoin │ ├── partitions scanned: 4 │ ├── pruning stats: [segments: , blocks: ] │ ├── push downs: [filters: [], limit: NONE] -│ └── estimated rows: 0.00 +│ └── estimated rows: 8.00 └── TableScan(Probe) ├── table: default.default.source_optimization ├── output columns: [a (#0), b (#1), c (#2)] @@ -247,7 +247,7 @@ HashJoin │ ├── partitions scanned: 4 │ ├── pruning stats: [segments: , blocks: ] │ ├── push downs: [filters: [], limit: NONE] -│ └── estimated rows: 0.00 +│ └── estimated rows: 8.00 └── TableScan(Probe) ├── table: default.default.source_optimization ├── output columns: [a (#0), b (#1), c (#2)] diff --git a/tests/sqllogictests/suites/mode/standalone/explain/merge_into.test b/tests/sqllogictests/suites/mode/standalone/explain/merge_into.test index f0b10a76cd32e..e615633e26dd4 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain/merge_into.test +++ b/tests/sqllogictests/suites/mode/standalone/explain/merge_into.test @@ -51,7 +51,7 @@ HashJoin │ ├── partitions scanned: 1 │ ├── pruning stats: [segments: , blocks: ] │ ├── push downs: [filters: [], limit: NONE] -│ └── estimated rows: 0.00 +│ └── estimated rows: 4.00 └── TableScan(Probe) ├── table: default.default.employees2 ├── output columns: [employee_id (#0), employee_name (#1), department (#2)] @@ -78,32 +78,32 @@ target_table: default.default.salaries2 ├── matched update: [condition: None,update set salary = plus(salaries2.salary (#4), 500.00)] └── unmatched insert: [condition: None,insert into (employee_id,salary) values(CAST(employees2.employee_id (#0) AS Int32 NULL),CAST(55000.00 AS Decimal(10, 2) NULL))] HashJoin -├── output columns: [employees2.employee_id (#0), employees2.employee_name (#1), employees2.department (#2), salaries2.employee_id (#3), salaries2.salary (#4), salaries2._row_id (#5)] -├── join type: LEFT OUTER -├── build keys: [salaries2.employee_id (#3)] -├── probe keys: [employees2.employee_id (#0)] +├── output columns: [salaries2.employee_id (#3), salaries2.salary (#4), salaries2._row_id (#5), employees2.employee_id (#0), employees2.employee_name (#1), employees2.department (#2)] +├── join type: RIGHT OUTER +├── build keys: [employees2.employee_id (#0)] +├── probe keys: [salaries2.employee_id (#3)] ├── filters: [] ├── estimated rows: 4.00 ├── TableScan(Build) -│ ├── table: default.default.salaries2 -│ ├── output columns: [employee_id (#3), salary (#4), _row_id (#5)] -│ ├── read rows: 6 +│ ├── table: default.default.employees2 +│ ├── output columns: [employee_id (#0), employee_name (#1), department (#2)] +│ ├── read rows: 4 │ ├── read size: < 1 KiB -│ ├── partitions total: 2 -│ ├── partitions scanned: 2 -│ ├── pruning stats: [segments: , blocks: ] +│ ├── partitions total: 1 +│ ├── partitions scanned: 1 +│ ├── pruning stats: [segments: , blocks: ] │ ├── push downs: [filters: [], limit: NONE] -│ └── estimated rows: 0.00 +│ └── estimated rows: 4.00 └── TableScan(Probe) - ├── table: default.default.employees2 - ├── output columns: [employee_id (#0), employee_name (#1), department (#2)] - ├── read rows: 4 + ├── table: default.default.salaries2 + ├── output columns: [employee_id (#3), salary (#4), _row_id (#5)] + ├── read rows: 6 ├── read size: < 1 KiB - ├── partitions total: 1 - ├── partitions scanned: 1 - ├── pruning stats: [segments: , blocks: ] + ├── partitions total: 2 + ├── partitions scanned: 2 + ├── pruning stats: [segments: , blocks: ] ├── push downs: [filters: [], limit: NONE] - └── estimated rows: 4.00 + └── estimated rows: 6.00 ## test update column only optimization statement ok From cc2cd831557e4bff67665fae33be7898ae7cdde2 Mon Sep 17 00:00:00 2001 From: Dousir9 <736191200@qq.com> Date: Wed, 24 Apr 2024 13:50:14 +0800 Subject: [PATCH 2/3] chore: fix sqllogictest --- .../merge_into_non_equal_distributed.test | 107 +++++++++--------- 1 file changed, 52 insertions(+), 55 deletions(-) diff --git a/tests/sqllogictests/suites/mode/cluster/merge_into_non_equal_distributed.test b/tests/sqllogictests/suites/mode/cluster/merge_into_non_equal_distributed.test index d9e4ea3934f9b..fba36d5327c8f 100644 --- a/tests/sqllogictests/suites/mode/cluster/merge_into_non_equal_distributed.test +++ b/tests/sqllogictests/suites/mode/cluster/merge_into_non_equal_distributed.test @@ -58,44 +58,44 @@ explain merge into t1 using t2 on t1.a < t2.a when matched then update * when no ---- MergeInto: target_table: default.default.t1 -├── distributed: false +├── distributed: true ├── target_build_optimization: false ├── can_try_update_column_only: true ├── matched update: [condition: None,update set a = a (#0)] └── unmatched insert: [condition: None,insert into (a) values(CAST(a (#0) AS Int32 NULL))] HashJoin -├── output columns: [t2.a (#0), t1.a (#1), t1._row_id (#2)] -├── join type: LEFT OUTER +├── output columns: [t1.a (#1), t1._row_id (#2), t2.a (#0)] +├── join type: RIGHT OUTER ├── build keys: [] ├── probe keys: [] ├── filters: [t1.a (#1) < t2.a (#0)] -├── estimated rows: 1.00 +├── estimated rows: 15.00 ├── Exchange(Build) -│ ├── output columns: [t1.a (#1), t1._row_id (#2)] +│ ├── output columns: [t2.a (#0)] │ ├── exchange type: Merge │ └── TableScan -│ ├── table: default.default.t1 -│ ├── output columns: [a (#1), _row_id (#2)] -│ ├── read rows: 15 +│ ├── table: default.default.t2 +│ ├── output columns: [a (#0)] +│ ├── read rows: 1 │ ├── read size: < 1 KiB -│ ├── partitions total: 3 -│ ├── partitions scanned: 3 -│ ├── pruning stats: [segments: , blocks: ] +│ ├── partitions total: 1 +│ ├── partitions scanned: 1 +│ ├── pruning stats: [segments: , blocks: ] │ ├── push downs: [filters: [], limit: NONE] -│ └── estimated rows: 0.00 +│ └── estimated rows: 1.00 └── Exchange(Probe) - ├── output columns: [t2.a (#0)] + ├── output columns: [t1.a (#1), t1._row_id (#2)] ├── exchange type: Merge └── TableScan - ├── table: default.default.t2 - ├── output columns: [a (#0)] - ├── read rows: 1 + ├── table: default.default.t1 + ├── output columns: [a (#1), _row_id (#2)] + ├── read rows: 15 ├── read size: < 1 KiB - ├── partitions total: 1 - ├── partitions scanned: 1 - ├── pruning stats: [segments: , blocks: ] + ├── partitions total: 3 + ├── partitions scanned: 3 + ├── pruning stats: [segments: , blocks: ] ├── push downs: [filters: [], limit: NONE] - └── estimated rows: 1.00 + └── estimated rows: 15.00 query TT merge into t1 using t2 on t1.a < t2.a when matched then update * when not matched then insert *; @@ -162,41 +162,38 @@ target_table: default.default.t1 ├── can_try_update_column_only: true ├── matched update: [condition: None,update set a = a (#0)] └── unmatched insert: [condition: None,insert into (a) values(CAST(a (#0) AS Int32 NULL))] -Exchange -├── output columns: [stage._$1 (#0), t1.a (#1), t1._row_id (#2)] -├── exchange type: Merge -└── HashJoin - ├── output columns: [stage._$1 (#0), t1.a (#1), t1._row_id (#2)] - ├── join type: LEFT OUTER - ├── build keys: [CAST(t1.a (#1) AS Int64 NULL)] - ├── probe keys: [CAST(t2.a (#0) AS Int64 NULL)] - ├── filters: [] - ├── estimated rows: 0.00 - ├── Exchange(Build) - │ ├── output columns: [t1.a (#1), t1._row_id (#2)] - │ ├── exchange type: Hash(CAST(t1.a (#1) AS Int64 NULL)) - │ └── TableScan - │ ├── table: default.default.t1 - │ ├── output columns: [a (#1), _row_id (#2)] - │ ├── read rows: 2 - │ ├── read size: < 1 KiB - │ ├── partitions total: 1 - │ ├── partitions scanned: 1 - │ ├── pruning stats: [segments: , blocks: ] - │ ├── push downs: [filters: [], limit: NONE] - │ └── estimated rows: 0.00 - └── Exchange(Probe) - ├── output columns: [stage._$1 (#0)] - ├── exchange type: Hash(CAST(t2.a (#0) AS Int64 NULL)) - └── TableScan - ├── table: default.system.stage - ├── output columns: [_$1 (#0)] - ├── read rows: 6 - ├── read size: < 1 KiB - ├── partitions total: 1 - ├── partitions scanned: 1 - ├── push downs: [filters: [], limit: NONE] - └── estimated rows: 0.00 +HashJoin +├── output columns: [t1.a (#1), t1._row_id (#2), stage._$1 (#0)] +├── join type: RIGHT OUTER +├── build keys: [CAST(t2.a (#0) AS Int64 NULL)] +├── probe keys: [CAST(t1.a (#1) AS Int64 NULL)] +├── filters: [] +├── estimated rows: 0.00 +├── Exchange(Build) +│ ├── output columns: [stage._$1 (#0)] +│ ├── exchange type: Merge +│ └── TableScan +│ ├── table: default.system.stage +│ ├── output columns: [_$1 (#0)] +│ ├── read rows: 6 +│ ├── read size: < 1 KiB +│ ├── partitions total: 1 +│ ├── partitions scanned: 1 +│ ├── push downs: [filters: [], limit: NONE] +│ └── estimated rows: 0.00 +└── Exchange(Probe) + ├── output columns: [t1.a (#1), t1._row_id (#2)] + ├── exchange type: Merge + └── TableScan + ├── table: default.default.t1 + ├── output columns: [a (#1), _row_id (#2)] + ├── read rows: 2 + ├── read size: < 1 KiB + ├── partitions total: 1 + ├── partitions scanned: 1 + ├── pruning stats: [segments: , blocks: ] + ├── push downs: [filters: [], limit: NONE] + └── estimated rows: 2.00 query TT merge into t1 using (select $1 as a from @ss) as t2 on t1.a = t2.a when matched then update * when not matched then insert *; From c562b067d7db268161cddcc1fe70ba93214ef33c Mon Sep 17 00:00:00 2001 From: Dousir9 <736191200@qq.com> Date: Wed, 24 Apr 2024 14:21:52 +0800 Subject: [PATCH 3/3] chore: fix sqllogictest --- .../suites/mode/cluster/merge_into_non_equal_distributed.test | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/sqllogictests/suites/mode/cluster/merge_into_non_equal_distributed.test b/tests/sqllogictests/suites/mode/cluster/merge_into_non_equal_distributed.test index fba36d5327c8f..a71e2bed9df7a 100644 --- a/tests/sqllogictests/suites/mode/cluster/merge_into_non_equal_distributed.test +++ b/tests/sqllogictests/suites/mode/cluster/merge_into_non_equal_distributed.test @@ -58,7 +58,7 @@ explain merge into t1 using t2 on t1.a < t2.a when matched then update * when no ---- MergeInto: target_table: default.default.t1 -├── distributed: true +├── distributed: false ├── target_build_optimization: false ├── can_try_update_column_only: true ├── matched update: [condition: None,update set a = a (#0)] @@ -157,7 +157,7 @@ explain merge into t1 using (select $1 as a from @ss) as t2 on t1.a = t2.a when ---- MergeInto: target_table: default.default.t1 -├── distributed: true +├── distributed: false ├── target_build_optimization: false ├── can_try_update_column_only: true ├── matched update: [condition: None,update set a = a (#0)]