From fb1bcf951a74c2f1c2621812257fda2cebfdfb65 Mon Sep 17 00:00:00 2001 From: xudong963 Date: Thu, 29 Aug 2024 15:31:17 +0800 Subject: [PATCH 1/8] feat: estimate selectivity by table sample --- Cargo.lock | 1 + src/common/exception/Cargo.toml | 1 + .../src/planner/optimizer/hyper_dp/dphyp.rs | 174 +++++++++++------- .../planner/optimizer/hyper_dp/join_node.rs | 2 +- .../sql/src/planner/optimizer/optimizer.rs | 10 +- 5 files changed, 112 insertions(+), 76 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8cb0c51cbc082..7afbda6e7e410 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3345,6 +3345,7 @@ dependencies = [ "serde_json", "tantivy", "thiserror", + "tokio", "tonic", ] diff --git a/src/common/exception/Cargo.toml b/src/common/exception/Cargo.toml index 033999b74d0d2..23c13145dfadc 100644 --- a/src/common/exception/Cargo.toml +++ b/src/common/exception/Cargo.toml @@ -31,6 +31,7 @@ serde_json = { workspace = true } tantivy = { workspace = true } thiserror = { workspace = true } tonic = { workspace = true } +tokio = "1.39.2" [package.metadata.cargo-machete] ignored = ["geos"] diff --git a/src/query/sql/src/planner/optimizer/hyper_dp/dphyp.rs b/src/query/sql/src/planner/optimizer/hyper_dp/dphyp.rs index d3306b5173b35..68da7fdc417b2 100644 --- a/src/query/sql/src/planner/optimizer/hyper_dp/dphyp.rs +++ b/src/query/sql/src/planner/optimizer/hyper_dp/dphyp.rs @@ -16,8 +16,9 @@ use std::collections::HashMap; use std::collections::HashSet; use std::sync::Arc; -use databend_common_base::runtime::Thread; +use databend_common_base::runtime::spawn; use databend_common_catalog::table_context::TableContext; +use databend_common_exception::ErrorCode; use databend_common_exception::Result; use crate::optimizer::hyper_dp::join_node::JoinNode; @@ -72,25 +73,35 @@ impl DPhpy { } } - fn new_children(&mut self, s_expr: &SExpr) -> Result { + async fn new_children(&mut self, s_expr: &SExpr) -> Result { // Parallel process children: start a new dphyp for each child. let ctx = self.ctx.clone(); let metadata = self.metadata.clone(); let left_expr = s_expr.children[0].clone(); - let left_res = Thread::spawn(move || { + let left_res = spawn(async move { let mut dphyp = DPhpy::new(ctx, metadata); - (dphyp.optimize(&left_expr), dphyp.table_index_map) + (dphyp.optimize(&left_expr).await, dphyp.table_index_map) }); let ctx = self.ctx.clone(); let metadata = self.metadata.clone(); let right_expr = s_expr.children[1].clone(); - let right_res = Thread::spawn(move || { + let right_res = spawn(async move { let mut dphyp = DPhpy::new(ctx, metadata); - (dphyp.optimize(&right_expr), dphyp.table_index_map) + (dphyp.optimize(&right_expr).await, dphyp.table_index_map) }); - let left_res = left_res.join()?; + let left_res = left_res.await.map_err(|e| { + ErrorCode::TokioError(format!( + "Cannot join tokio job, err: {:?}", + e + )) + })?; + let right_res = right_res.await.map_err(|e| { + ErrorCode::TokioError(format!( + "Cannot join tokio job, err: {:?}", + e + )) + })?; let (left_expr, _) = left_res.0?; - let right_res = right_res.join()?; let (right_expr, _) = right_res.0?; // Merge `table_index_map` of left and right into current `table_index_map`. @@ -105,7 +116,8 @@ impl DPhpy { } // Traverse the s_expr and get all base relations and join conditions - fn get_base_relations( + #[async_recursion::async_recursion(#[recursive::recursive])] + async fn get_base_relations( &mut self, s_expr: &SExpr, join_conditions: &mut Vec<(ScalarExpr, ScalarExpr)>, @@ -116,7 +128,7 @@ impl DPhpy { if is_subquery { // If it's a subquery, start a new dphyp let mut dphyp = DPhpy::new(self.ctx.clone(), self.metadata.clone()); - let (new_s_expr, _) = dphyp.optimize(s_expr)?; + let (new_s_expr, _) = dphyp.optimize(s_expr).await?; // Merge `table_index_map` of subquery into current `table_index_map`. let relation_idx = self.join_relations.len() as IndexType; for table_index in dphyp.table_index_map.keys() { @@ -196,24 +208,28 @@ impl DPhpy { self.filters.insert(filter); } if !is_inner_join { - let new_s_expr = self.new_children(s_expr)?; + let new_s_expr = self.new_children(s_expr).await?; self.join_relations.push(JoinRelation::new(&new_s_expr)); Ok((Arc::new(new_s_expr), true)) } else { - let left_res = self.get_base_relations( - s_expr.child(0)?, - join_conditions, - true, - None, - left_is_subquery, - )?; - let right_res = self.get_base_relations( - s_expr.child(1)?, - join_conditions, - true, - None, - right_is_subquery, - )?; + let left_res = self + .get_base_relations( + s_expr.child(0)?, + join_conditions, + true, + None, + left_is_subquery, + ) + .await?; + let right_res = self + .get_base_relations( + s_expr.child(1)?, + join_conditions, + true, + None, + right_is_subquery, + ) + .await?; let new_s_expr: Arc = Arc::new(s_expr.replace_children([left_res.0, right_res.0])); Ok((new_s_expr, left_res.1 && right_res.1)) @@ -232,29 +248,27 @@ impl DPhpy { if let RelOperator::Filter(op) = s_expr.plan.as_ref() { self.filters.insert(op.clone()); } - let (child, optimized) = self.get_base_relations( - s_expr.child(0)?, - join_conditions, - true, - Some(s_expr), - false, - )?; + let (child, optimized) = self + .get_base_relations( + s_expr.child(0)?, + join_conditions, + true, + Some(s_expr), + false, + ) + .await?; let new_s_expr = Arc::new(s_expr.replace_children([child])); Ok((new_s_expr, optimized)) } else { - let (child, optimized) = self.get_base_relations( - s_expr.child(0)?, - join_conditions, - false, - None, - false, - )?; + let (child, optimized) = self + .get_base_relations(s_expr.child(0)?, join_conditions, false, None, false) + .await?; let new_s_expr = Arc::new(s_expr.replace_children([child])); Ok((new_s_expr, optimized)) } } RelOperator::UnionAll(_) => { - let new_s_expr = self.new_children(s_expr)?; + let new_s_expr = self.new_children(s_expr).await?; self.join_relations.push(JoinRelation::new(&new_s_expr)); Ok((Arc::new(new_s_expr), true)) } @@ -279,12 +293,13 @@ impl DPhpy { // The input plan tree has been optimized by heuristic optimizer // So filters have pushed down join and cross join has been converted to inner join as possible as we can // The output plan will have optimal join order theoretically - pub fn optimize(&mut self, s_expr: &SExpr) -> Result<(Arc, bool)> { + pub async fn optimize(&mut self, s_expr: &SExpr) -> Result<(Arc, bool)> { // Firstly, we need to extract all join conditions and base tables // `join_condition` is pair, left is left_condition, right is right_condition let mut join_conditions = vec![]; - let (s_expr, optimized) = - self.get_base_relations(s_expr, &mut join_conditions, false, None, false)?; + let (s_expr, optimized) = self + .get_base_relations(s_expr, &mut join_conditions, false, None, false) + .await?; if !optimized { return Ok((s_expr, false)); } @@ -336,7 +351,7 @@ impl DPhpy { for (_, neighbors) in self.query_graph.cached_neighbors.iter_mut() { neighbors.sort(); } - self.join_reorder()?; + self.join_reorder().await?; // Get all join relations in `relation_set_tree` let all_relations = self .relation_set_tree @@ -352,7 +367,7 @@ impl DPhpy { // Adaptive Optimization: // If the if the query graph is simple enough, it uses dynamic programming to construct the optimal join tree, // if that is not possible within the given optimization budget, it switches to a greedy approach. - fn join_reorder(&mut self) -> Result<()> { + async fn join_reorder(&mut self) -> Result<()> { // Initial `dp_table` with plan for single relation for (idx, relation) in self.join_relations.iter().enumerate() { // Get nodes in `relation_set_tree` @@ -371,30 +386,30 @@ impl DPhpy { } // First, try to use dynamic programming to find the optimal join order. - if !self.join_reorder_by_dphyp()? { + if !self.join_reorder_by_dphyp().await? { // When DPhpy takes too much time during join ordering, it is necessary to exit the dynamic programming algorithm // and switch to a greedy algorithm to minimizes the overall query time. - self.join_reorder_by_greedy()?; + self.join_reorder_by_greedy().await?; } Ok(()) } // Join reorder by dynamic programming algorithm. - fn join_reorder_by_dphyp(&mut self) -> Result { + async fn join_reorder_by_dphyp(&mut self) -> Result { // Choose all nodes as enumeration start node once (desc order) for idx in (0..self.join_relations.len()).rev() { // Get node from `relation_set_tree` let node = self.relation_set_tree.get_relation_set_by_index(idx)?; // Emit node as subgraph - if !self.emit_csg(&node)? { + if !self.emit_csg(&node).await? { return Ok(false); } // Create forbidden node set // Forbid node idx will less than current idx let forbidden_nodes = (0..idx).collect(); // Enlarge the subgraph recursively - if !self.enumerate_csg_rec(&node, &forbidden_nodes)? { + if !self.enumerate_csg_rec(&node, &forbidden_nodes).await? { return Ok(false); } } @@ -402,7 +417,7 @@ impl DPhpy { } // Join reorder by greedy algorithm. - fn join_reorder_by_greedy(&mut self) -> Result { + async fn join_reorder_by_greedy(&mut self) -> Result { // The Greedy Operator Ordering starts with a single relation and iteratively adds the relation that minimizes the cost of the join. // the algorithm terminates when all relations have been added, the cost of a join is the sum of the cardinalities of the node involved // in the tree, the algorithm is not guaranteed to find the optimal join tree, it is guaranteed to find it in polynomial time. @@ -428,8 +443,9 @@ impl DPhpy { if !join_conditions.is_empty() { // If left_relation set and right_relation set are connected, emit csg-cmp-pair and keep the // minimum cost pair in `dp_table`. - let cost = - self.emit_csg_cmp(left_relation, right_relation, join_conditions)?; + let cost = self + .emit_csg_cmp(left_relation, right_relation, join_conditions) + .await?; // Update the minimum cost pair. if cost < min_cost { min_cost = cost; @@ -448,7 +464,7 @@ impl DPhpy { let mut lowest_index = Vec::with_capacity(2); for (i, relation) in join_relations.iter().enumerate().take(2) { let mut join_node = self.dp_table.get(relation).unwrap().clone(); - let cardinality = join_node.cardinality(&self.join_relations)?; + let cardinality = join_node.cardinality(&self.join_relations).await?; lowest_cost.push(cardinality); lowest_index.push(i); } @@ -459,7 +475,7 @@ impl DPhpy { // Update the minimum cost relation set pair. for (i, relation) in join_relations.iter().enumerate().skip(2) { let mut join_node = self.dp_table.get(relation).unwrap().clone(); - let cardinality = join_node.cardinality(&self.join_relations)?; + let cardinality = join_node.cardinality(&self.join_relations).await?; if cardinality < lowest_cost[0] { lowest_cost[1] = cardinality; lowest_index[1] = i; @@ -480,7 +496,8 @@ impl DPhpy { &join_relations[left_idx], &join_relations[right_idx], vec![], - )?; + ) + .await?; } if left_idx > right_idx { std::mem::swap(&mut left_idx, &mut right_idx); @@ -494,7 +511,7 @@ impl DPhpy { // EmitCsg will take a non-empty subset of hyper_graph's nodes(V) which contains a connected subgraph. // Then it will possibly generate a connected complement which will combine `nodes` to be a csg-cmp-pair. - fn emit_csg(&mut self, nodes: &[IndexType]) -> Result { + async fn emit_csg(&mut self, nodes: &[IndexType]) -> Result { if nodes.len() == self.join_relations.len() { return Ok(true); } @@ -514,11 +531,16 @@ impl DPhpy { // Check if neighbor is connected with `nodes` let join_conditions = self.query_graph.is_connected(nodes, &neighbor_relations)?; if !join_conditions.is_empty() - && !self.try_emit_csg_cmp(nodes, &neighbor_relations, join_conditions)? + && !self + .try_emit_csg_cmp(nodes, &neighbor_relations, join_conditions) + .await? { return Ok(false); } - if !self.enumerate_cmp_rec(nodes, &neighbor_relations, &forbidden_nodes)? { + if !self + .enumerate_cmp_rec(nodes, &neighbor_relations, &forbidden_nodes) + .await? + { return Ok(false); } } @@ -527,7 +549,8 @@ impl DPhpy { // EnumerateCsgRec will extend the given `nodes`. // It'll consider each non-empty, proper subset of the neighborhood of nodes that are not forbidden. - fn enumerate_csg_rec( + #[async_recursion::async_recursion(#[recursive::recursive])] + async fn enumerate_csg_rec( &mut self, nodes: &[IndexType], forbidden_nodes: &HashSet, @@ -552,7 +575,7 @@ impl DPhpy { let merged_relation_set = union(nodes, &neighbor_relations); if self.dp_table.contains_key(&merged_relation_set) && merged_relation_set.len() > nodes.len() - && !self.emit_csg(&merged_relation_set)? + && !self.emit_csg(&merged_relation_set).await? { return Ok(false); } @@ -565,14 +588,17 @@ impl DPhpy { new_forbidden_nodes = forbidden_nodes.clone(); } new_forbidden_nodes.insert(*neighbor); - if !self.enumerate_csg_rec(&merged_sets[idx], &new_forbidden_nodes)? { + if !self + .enumerate_csg_rec(&merged_sets[idx], &new_forbidden_nodes) + .await? + { return Ok(false); } } Ok(true) } - fn try_emit_csg_cmp( + async fn try_emit_csg_cmp( &mut self, left: &[IndexType], right: &[IndexType], @@ -583,7 +609,7 @@ impl DPhpy { // otherwise it will take too much time match self.emit_count > EMIT_THRESHOLD { false => { - self.emit_csg_cmp(left, right, join_conditions)?; + self.emit_csg_cmp(left, right, join_conditions).await?; Ok(true) } true => Ok(false), @@ -591,7 +617,7 @@ impl DPhpy { } // EmitCsgCmp will join the optimal plan from left and right - fn emit_csg_cmp( + async fn emit_csg_cmp( &mut self, left: &[IndexType], right: &[IndexType], @@ -602,8 +628,8 @@ impl DPhpy { let parent_set = union(left, right); let mut left_join = self.dp_table.get(left).unwrap().clone(); let mut right_join = self.dp_table.get(right).unwrap().clone(); - let left_cardinality = left_join.cardinality(&self.join_relations)?; - let right_cardinality = right_join.cardinality(&self.join_relations)?; + let left_cardinality = left_join.cardinality(&self.join_relations).await?; + let right_cardinality = right_join.cardinality(&self.join_relations).await?; if left_cardinality < right_cardinality { for join_condition in join_conditions.iter_mut() { @@ -641,7 +667,7 @@ impl DPhpy { } }; if join_node.join_type == JoinType::Inner { - let cost = join_node.cardinality(&self.join_relations)? + let cost = join_node.cardinality(&self.join_relations).await? + join_node.children[0].cost + join_node.children[1].cost; join_node.set_cost(cost); @@ -668,7 +694,8 @@ impl DPhpy { // The second parameter is a set which is connected and must be extended until a valid csg-cmp-pair is reached. // Therefore, it considers the neighborhood of right. - fn enumerate_cmp_rec( + #[async_recursion::async_recursion(#[recursive::recursive])] + async fn enumerate_cmp_rec( &mut self, left: &[IndexType], right: &[IndexType], @@ -689,7 +716,9 @@ impl DPhpy { if merged_relation_set.len() > right.len() && self.dp_table.contains_key(&merged_relation_set) && !join_conditions.is_empty() - && !self.try_emit_csg_cmp(left, &merged_relation_set, join_conditions)? + && !self + .try_emit_csg_cmp(left, &merged_relation_set, join_conditions) + .await? { return Ok(false); } @@ -699,7 +728,10 @@ impl DPhpy { let mut new_forbidden_nodes = forbidden_nodes.clone(); for (idx, neighbor) in neighbor_set.iter().enumerate() { new_forbidden_nodes.insert(*neighbor); - if !self.enumerate_cmp_rec(left, &merged_sets[idx], &new_forbidden_nodes)? { + if !self + .enumerate_cmp_rec(left, &merged_sets[idx], &new_forbidden_nodes) + .await? + { return Ok(false); } } diff --git a/src/query/sql/src/planner/optimizer/hyper_dp/join_node.rs b/src/query/sql/src/planner/optimizer/hyper_dp/join_node.rs index 8674aed15f2b5..14f388de194ff 100644 --- a/src/query/sql/src/planner/optimizer/hyper_dp/join_node.rs +++ b/src/query/sql/src/planner/optimizer/hyper_dp/join_node.rs @@ -39,7 +39,7 @@ pub struct JoinNode { } impl JoinNode { - pub fn cardinality(&mut self, relations: &[JoinRelation]) -> Result { + pub async fn cardinality(&mut self, relations: &[JoinRelation]) -> Result { if let Some(card) = self.cardinality { return Ok(card); } diff --git a/src/query/sql/src/planner/optimizer/optimizer.rs b/src/query/sql/src/planner/optimizer/optimizer.rs index 3bf7843228484..ad648f7a18621 100644 --- a/src/query/sql/src/planner/optimizer/optimizer.rs +++ b/src/query/sql/src/planner/optimizer/optimizer.rs @@ -334,8 +334,9 @@ pub async fn optimize_query(opt_ctx: &mut OptimizerContext, mut s_expr: SExpr) - // Cost based optimization let mut dphyp_optimized = false; if opt_ctx.enable_dphyp && opt_ctx.enable_join_reorder { - let (dp_res, optimized) = - DPhpy::new(opt_ctx.table_ctx.clone(), opt_ctx.metadata.clone()).optimize(&s_expr)?; + let (dp_res, optimized) = DPhpy::new(opt_ctx.table_ctx.clone(), opt_ctx.metadata.clone()) + .optimize(&s_expr) + .await?; if optimized { s_expr = (*dp_res).clone(); dphyp_optimized = true; @@ -422,8 +423,9 @@ async fn get_optimized_memo(opt_ctx: OptimizerContext, mut s_expr: SExpr) -> Res // Cost based optimization let mut dphyp_optimized = false; if opt_ctx.enable_dphyp && opt_ctx.enable_join_reorder { - let (dp_res, optimized) = - DPhpy::new(opt_ctx.table_ctx.clone(), opt_ctx.metadata.clone()).optimize(&s_expr)?; + let (dp_res, optimized) = DPhpy::new(opt_ctx.table_ctx.clone(), opt_ctx.metadata.clone()) + .optimize(&s_expr) + .await?; if optimized { s_expr = (*dp_res).clone(); dphyp_optimized = true; From 7c5094133bc019bb4aa87616962f377f8e25b2bb Mon Sep 17 00:00:00 2001 From: xudong963 Date: Fri, 30 Aug 2024 19:13:25 +0800 Subject: [PATCH 2/8] save --- .../service/src/interpreters/interpreter.rs | 6 +- src/query/service/src/schedulers/scheduler.rs | 33 ++++++ .../dynamic_sample/dynamic_sample.rs | 44 +++++++ .../filter_selectivity_sample.rs | 110 ++++++++++++++++++ .../planner/optimizer/dynamic_sample/mod.rs | 19 +++ .../dynamic_sample/query_sample_executor.rs | 24 ++++ .../src/planner/optimizer/hyper_dp/dphyp.rs | 37 +++--- src/query/sql/src/planner/optimizer/mod.rs | 3 + .../sql/src/planner/optimizer/optimizer.rs | 29 ++++- src/query/sql/src/planner/planner.rs | 17 ++- 10 files changed, 298 insertions(+), 24 deletions(-) create mode 100644 src/query/sql/src/planner/optimizer/dynamic_sample/dynamic_sample.rs create mode 100644 src/query/sql/src/planner/optimizer/dynamic_sample/filter_selectivity_sample.rs create mode 100644 src/query/sql/src/planner/optimizer/dynamic_sample/mod.rs create mode 100644 src/query/sql/src/planner/optimizer/dynamic_sample/query_sample_executor.rs diff --git a/src/query/service/src/interpreters/interpreter.rs b/src/query/service/src/interpreters/interpreter.rs index 7a0d5bba52682..80b004b6e5ad6 100644 --- a/src/query/service/src/interpreters/interpreter.rs +++ b/src/query/service/src/interpreters/interpreter.rs @@ -52,6 +52,7 @@ use crate::pipelines::executor::PipelineCompleteExecutor; use crate::pipelines::executor::PipelinePullingExecutor; use crate::pipelines::PipelineBuildResult; use crate::servers::http::v1::ClientSessionManager; +use crate::schedulers::ServiceQueryExecutor; use crate::sessions::QueryContext; use crate::sessions::SessionManager; use crate::sessions::SessionType; @@ -205,7 +206,10 @@ fn log_query_finished(ctx: &QueryContext, error: Option, has_profiles /// /// This function is used to plan the SQL. If an error occurs, we will log the query start and finished. pub async fn interpreter_plan_sql(ctx: Arc, sql: &str) -> Result<(Plan, PlanExtras)> { - let mut planner = Planner::new(ctx.clone()); + let mut planner = Planner::new_with_sample_executor( + ctx.clone(), + Arc::new(ServiceQueryExecutor::new(ctx.clone())), + ); let result = planner.plan_sql(sql).await; let short_sql = short_sql(sql.to_string()); let mut stmt = if let Ok((_, extras)) = &result { diff --git a/src/query/service/src/schedulers/scheduler.rs b/src/query/service/src/schedulers/scheduler.rs index 3333849f3d59c..19ba560b45e39 100644 --- a/src/query/service/src/schedulers/scheduler.rs +++ b/src/query/service/src/schedulers/scheduler.rs @@ -14,8 +14,16 @@ use std::sync::Arc; +use async_trait::async_trait; use databend_common_exception::Result; +use databend_common_expression::DataBlock; +use databend_common_sql::executor::PhysicalPlanBuilder; +use databend_common_sql::optimizer::QuerySampleExecutor; +use databend_common_sql::optimizer::SExpr; +use futures_util::TryStreamExt; +use crate::pipelines::executor::ExecutorSettings; +use crate::pipelines::executor::PipelinePullingExecutor; use crate::pipelines::PipelineBuildResult; use crate::pipelines::PipelineBuilder; use crate::schedulers::Fragmenter; @@ -24,6 +32,7 @@ use crate::sessions::QueryContext; use crate::sessions::TableContext; use crate::sql::executor::PhysicalPlan; use crate::sql::ColumnBinding; +use crate::stream::PullingExecutorStream; /// Build query pipeline from physical plan. /// If plan is distributed plan it will build_distributed_pipeline @@ -106,3 +115,27 @@ pub async fn build_distributed_pipeline( build_res.set_max_threads(settings.get_max_threads()? as usize); Ok(build_res) } + +pub struct ServiceQueryExecutor { + ctx: Arc, +} + +impl ServiceQueryExecutor { + pub fn new(ctx: Arc) -> Self { + Self { ctx } + } +} + +#[async_trait] +impl QuerySampleExecutor for ServiceQueryExecutor { + async fn execute_query(&self, plan: &PhysicalPlan) -> Result> { + let build_res = build_query_pipeline_without_render_result_set(&self.ctx, plan).await?; + let settings = ExecutorSettings::try_create(self.ctx.clone())?; + let pulling_executor = PipelinePullingExecutor::from_pipelines(build_res, settings)?; + self.ctx.set_executor(pulling_executor.get_inner())?; + + PullingExecutorStream::create(pulling_executor)? + .try_collect::>() + .await + } +} diff --git a/src/query/sql/src/planner/optimizer/dynamic_sample/dynamic_sample.rs b/src/query/sql/src/planner/optimizer/dynamic_sample/dynamic_sample.rs new file mode 100644 index 0000000000000..24a6cd5e684c5 --- /dev/null +++ b/src/query/sql/src/planner/optimizer/dynamic_sample/dynamic_sample.rs @@ -0,0 +1,44 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use databend_common_catalog::table_context::TableContext; +use databend_common_exception::Result; + +use crate::optimizer::dynamic_sample::filter_selectivity_sample::filter_selectivity_sample; +use crate::optimizer::QuerySampleExecutor; +use crate::optimizer::SExpr; +use crate::plans::RelOperator; +use crate::MetadataRef; + +pub async fn dynamic_sample( + ctx: Arc, + metadata: MetadataRef, + s_expr: &SExpr, + sample_executor: Arc, +) -> Result { + match s_expr.plan() { + RelOperator::Filter(_) => { + filter_selectivity_sample(ctx, metadata, s_expr, sample_executor).await + } + RelOperator::Join(_) => { + unimplemented!("derive_cardinality_by_sample for join is not supported yet") + } + _ => unreachable!( + "Invalid plan for derive_cardinality_by_sample: {:?}", + s_expr.plan() + ), + } +} diff --git a/src/query/sql/src/planner/optimizer/dynamic_sample/filter_selectivity_sample.rs b/src/query/sql/src/planner/optimizer/dynamic_sample/filter_selectivity_sample.rs new file mode 100644 index 0000000000000..58baf154dae6e --- /dev/null +++ b/src/query/sql/src/planner/optimizer/dynamic_sample/filter_selectivity_sample.rs @@ -0,0 +1,110 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashSet; +use std::sync::Arc; + +use databend_common_ast::ast::Sample; +use databend_common_ast::ast::SampleConfig; +use databend_common_ast::ast::SampleLevel; +use databend_common_catalog::table_context::TableContext; +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_expression::types::DataType; +use databend_common_expression::types::NumberDataType; +use databend_common_expression::types::NumberScalar; + +use crate::executor::PhysicalPlanBuilder; +use crate::optimizer::QuerySampleExecutor; +use crate::optimizer::SExpr; +use crate::plans::Aggregate; +use crate::plans::AggregateFunction; +use crate::plans::AggregateMode; +use crate::plans::Filter; +use crate::plans::RelOperator; +use crate::plans::ScalarItem; +use crate::MetadataRef; +use crate::ScalarExpr; + +pub async fn filter_selectivity_sample( + ctx: Arc, + metadata: MetadataRef, + s_expr: &SExpr, + sample_executor: Arc, +) -> Result { + // filter cardinality by sample will be called in `dphyp`, so we can ensure the filter is in complex query(contains not only one table) + // Because it's meaningless for filter cardinality by sample in single table query. + let filter = Filter::try_from(s_expr.plan().clone())?; + let child = s_expr.child(0)?; + if let RelOperator::Scan(mut scan) = child.plan().clone() { + // Get the table's num_rows + let num_rows = scan + .statistics + .table_stats + .as_ref() + .and_then(|s| s.num_rows) + .unwrap_or(0); + + // Calculate sample size (0.2% of total data) + let sample_size = (num_rows as f64 * 0.002).ceil(); + + // 2. Construct sample field and add it to scan + scan.sample = Some(Sample { + sample_level: SampleLevel::ROW, + sample_conf: SampleConfig::RowsNum(sample_size), + }); + + // Replace old scan in s_expr + let new_child = SExpr::create_leaf(Arc::new(RelOperator::Scan(scan))); + let mut new_s_expr = s_expr.replace_children(vec![Arc::new(new_child)]); + + // Wrap a count aggregate plan to original s_expr + let count_agg = Aggregate { + mode: AggregateMode::Initial, + group_items: vec![], + aggregate_functions: vec![ScalarItem { + scalar: ScalarExpr::AggregateFunction(AggregateFunction { + func_name: "count".to_string(), + distinct: false, + params: vec![], + args: vec![], + return_type: Box::new(DataType::Number(NumberDataType::UInt64)), + display_name: "".to_string(), + }), + index: 0, // Assuming 0 is the correct index for the count result + }], + from_distinct: false, + limit: None, + grouping_sets: None, + }; + new_s_expr = SExpr::create_unary(Arc::new(count_agg.into()), Arc::new(new_s_expr)); + + let mut builder = PhysicalPlanBuilder::new(metadata.clone(), ctx.clone(), true); + let plan = builder.build(s_expr, HashSet::new()).await?; + + let result = sample_executor.execute_query(&plan).await?; + if let Some(block) = result.first() { + if let Some(count) = block.get_last_column().as_number() { + if let Some(NumberScalar::UInt64(sampled_count)) = count.index(0) { + // Compute and return selectivity + let selectivity = sampled_count as f64 / sample_size as f64; + return Ok(selectivity); + } + } + } + } + return Err(ErrorCode::Internal( + "Failed to calculate filter selectivity by sample".to_string(), + )); +} diff --git a/src/query/sql/src/planner/optimizer/dynamic_sample/mod.rs b/src/query/sql/src/planner/optimizer/dynamic_sample/mod.rs new file mode 100644 index 0000000000000..e8bd463d2bf10 --- /dev/null +++ b/src/query/sql/src/planner/optimizer/dynamic_sample/mod.rs @@ -0,0 +1,19 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod dynamic_sample; +mod filter_selectivity_sample; +mod query_sample_executor; + +pub use query_sample_executor::QuerySampleExecutor; diff --git a/src/query/sql/src/planner/optimizer/dynamic_sample/query_sample_executor.rs b/src/query/sql/src/planner/optimizer/dynamic_sample/query_sample_executor.rs new file mode 100644 index 0000000000000..ca376b2fc6517 --- /dev/null +++ b/src/query/sql/src/planner/optimizer/dynamic_sample/query_sample_executor.rs @@ -0,0 +1,24 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use async_trait::async_trait; +use databend_common_exception::Result; +use databend_common_expression::DataBlock; + +use crate::executor::PhysicalPlan; + +#[async_trait] +pub trait QuerySampleExecutor: Send + Sync { + async fn execute_query(&self, plan: &PhysicalPlan) -> Result>; +} diff --git a/src/query/sql/src/planner/optimizer/hyper_dp/dphyp.rs b/src/query/sql/src/planner/optimizer/hyper_dp/dphyp.rs index 68da7fdc417b2..5070e3913d105 100644 --- a/src/query/sql/src/planner/optimizer/hyper_dp/dphyp.rs +++ b/src/query/sql/src/planner/optimizer/hyper_dp/dphyp.rs @@ -28,6 +28,7 @@ use crate::optimizer::hyper_dp::query_graph::QueryGraph; use crate::optimizer::hyper_dp::util::intersect; use crate::optimizer::hyper_dp::util::union; use crate::optimizer::rule::TransformResult; +use crate::optimizer::QuerySampleExecutor; use crate::optimizer::RuleFactory; use crate::optimizer::RuleID; use crate::optimizer::SExpr; @@ -45,6 +46,7 @@ const RELATION_THRESHOLD: usize = 10; // See the paper for more details. pub struct DPhpy { ctx: Arc, + sample_executor: Option>, metadata: MetadataRef, join_relations: Vec, // base table index -> index of join_relations @@ -59,9 +61,14 @@ pub struct DPhpy { } impl DPhpy { - pub fn new(ctx: Arc, metadata: MetadataRef) -> Self { + pub fn new( + ctx: Arc, + metadata: MetadataRef, + sample_executor: Option>, + ) -> Self { Self { ctx, + sample_executor, metadata, join_relations: vec![], table_index_map: Default::default(), @@ -79,28 +86,22 @@ impl DPhpy { let metadata = self.metadata.clone(); let left_expr = s_expr.children[0].clone(); let left_res = spawn(async move { - let mut dphyp = DPhpy::new(ctx, metadata); + let mut dphyp = DPhpy::new(ctx, metadata, self.sample_executor.clone()); (dphyp.optimize(&left_expr).await, dphyp.table_index_map) }); let ctx = self.ctx.clone(); let metadata = self.metadata.clone(); let right_expr = s_expr.children[1].clone(); let right_res = spawn(async move { - let mut dphyp = DPhpy::new(ctx, metadata); + let mut dphyp = DPhpy::new(ctx, metadata, self.sample_executor.clone()); (dphyp.optimize(&right_expr).await, dphyp.table_index_map) }); - let left_res = left_res.await.map_err(|e| { - ErrorCode::TokioError(format!( - "Cannot join tokio job, err: {:?}", - e - )) - })?; - let right_res = right_res.await.map_err(|e| { - ErrorCode::TokioError(format!( - "Cannot join tokio job, err: {:?}", - e - )) - })?; + let left_res = left_res + .await + .map_err(|e| ErrorCode::TokioError(format!("Cannot join tokio job, err: {:?}", e)))?; + let right_res = right_res + .await + .map_err(|e| ErrorCode::TokioError(format!("Cannot join tokio job, err: {:?}", e)))?; let (left_expr, _) = left_res.0?; let (right_expr, _) = right_res.0?; @@ -127,7 +128,11 @@ impl DPhpy { ) -> Result<(Arc, bool)> { if is_subquery { // If it's a subquery, start a new dphyp - let mut dphyp = DPhpy::new(self.ctx.clone(), self.metadata.clone()); + let mut dphyp = DPhpy::new( + self.ctx.clone(), + self.metadata.clone(), + self.sample_executor.clone(), + ); let (new_s_expr, _) = dphyp.optimize(s_expr).await?; // Merge `table_index_map` of subquery into current `table_index_map`. let relation_idx = self.join_relations.len() as IndexType; diff --git a/src/query/sql/src/planner/optimizer/mod.rs b/src/query/sql/src/planner/optimizer/mod.rs index 782618703938f..82582c63fa3dc 100644 --- a/src/query/sql/src/planner/optimizer/mod.rs +++ b/src/query/sql/src/planner/optimizer/mod.rs @@ -33,9 +33,12 @@ pub mod s_expr; mod statistics; mod util; +mod dynamic_sample; + pub use cascades::CascadesOptimizer; pub use decorrelate::FlattenInfo; pub use decorrelate::SubqueryRewriter; +pub use dynamic_sample::QuerySampleExecutor; pub use extract::PatternExtractor; pub use hyper_dp::DPhpy; pub use m_expr::MExpr; diff --git a/src/query/sql/src/planner/optimizer/optimizer.rs b/src/query/sql/src/planner/optimizer/optimizer.rs index ad648f7a18621..a36bb5d33c451 100644 --- a/src/query/sql/src/planner/optimizer/optimizer.rs +++ b/src/query/sql/src/planner/optimizer/optimizer.rs @@ -41,6 +41,7 @@ use crate::optimizer::join::SingleToInnerOptimizer; use crate::optimizer::rule::TransformResult; use crate::optimizer::statistics::CollectStatisticsOptimizer; use crate::optimizer::util::contains_local_table_scan; +use crate::optimizer::QuerySampleExecutor; use crate::optimizer::RuleFactory; use crate::optimizer::RuleID; use crate::optimizer::SExpr; @@ -66,6 +67,8 @@ pub struct OptimizerContext { enable_distributed_optimization: bool, enable_join_reorder: bool, enable_dphyp: bool, + + sample_executor: Option>, } impl OptimizerContext { @@ -77,6 +80,7 @@ impl OptimizerContext { enable_distributed_optimization: false, enable_join_reorder: true, enable_dphyp: true, + sample_executor: None, } } @@ -94,6 +98,11 @@ impl OptimizerContext { self.enable_dphyp = enable; self } + + pub fn with_sample_executor(mut self, sample_executor: Arc) -> Self { + self.sample_executor = Some(sample_executor); + self + } } /// A recursive optimizer that will apply the given rules recursively. @@ -334,9 +343,13 @@ pub async fn optimize_query(opt_ctx: &mut OptimizerContext, mut s_expr: SExpr) - // Cost based optimization let mut dphyp_optimized = false; if opt_ctx.enable_dphyp && opt_ctx.enable_join_reorder { - let (dp_res, optimized) = DPhpy::new(opt_ctx.table_ctx.clone(), opt_ctx.metadata.clone()) - .optimize(&s_expr) - .await?; + let (dp_res, optimized) = DPhpy::new( + opt_ctx.table_ctx.clone(), + opt_ctx.metadata.clone(), + opt_ctx.sample_executor.clone(), + ) + .optimize(&s_expr) + .await?; if optimized { s_expr = (*dp_res).clone(); dphyp_optimized = true; @@ -423,9 +436,13 @@ async fn get_optimized_memo(opt_ctx: OptimizerContext, mut s_expr: SExpr) -> Res // Cost based optimization let mut dphyp_optimized = false; if opt_ctx.enable_dphyp && opt_ctx.enable_join_reorder { - let (dp_res, optimized) = DPhpy::new(opt_ctx.table_ctx.clone(), opt_ctx.metadata.clone()) - .optimize(&s_expr) - .await?; + let (dp_res, optimized) = DPhpy::new( + opt_ctx.table_ctx.clone(), + opt_ctx.metadata.clone(), + opt_ctx.sample_executor.clone(), + ) + .optimize(&s_expr) + .await?; if optimized { s_expr = (*dp_res).clone(); dphyp_optimized = true; diff --git a/src/query/sql/src/planner/planner.rs b/src/query/sql/src/planner/planner.rs index fe6ce7697dca1..4a6bc44bcdf95 100644 --- a/src/query/sql/src/planner/planner.rs +++ b/src/query/sql/src/planner/planner.rs @@ -39,6 +39,7 @@ use super::semantic::AggregateRewriter; use super::semantic::DistinctToGroupBy; use crate::optimizer::optimize; use crate::optimizer::OptimizerContext; +use crate::optimizer::QuerySampleExecutor; use crate::plans::Insert; use crate::plans::InsertInputSource; use crate::plans::Plan; @@ -53,6 +54,7 @@ const PROBE_INSERT_MAX_TOKENS: usize = 128 * 8; pub struct Planner { pub(crate) ctx: Arc, + pub(crate) sample_executor: Option>, } #[derive(Debug, Clone)] @@ -63,7 +65,20 @@ pub struct PlanExtras { impl Planner { pub fn new(ctx: Arc) -> Self { - Planner { ctx } + Planner { + ctx, + sample_executor: None, + } + } + + pub fn new_with_sample_executor( + ctx: Arc, + sample_executor: Arc, + ) -> Self { + Planner { + ctx, + sample_executor: Some(sample_executor), + } } #[async_backtrace::framed] From 306826a9c4ff6060b298fc6d3beecbb7062ae506 Mon Sep 17 00:00:00 2001 From: xudong963 Date: Mon, 2 Sep 2024 15:11:14 +0800 Subject: [PATCH 3/8] update --- src/common/exception/Cargo.toml | 2 +- src/query/service/src/schedulers/scheduler.rs | 2 -- .../dynamic_sample/dynamic_sample.rs | 10 +++--- .../filter_selectivity_sample.rs | 4 +-- .../planner/optimizer/dynamic_sample/mod.rs | 1 + .../src/planner/optimizer/hyper_dp/dphyp.rs | 15 ++++++-- .../planner/optimizer/hyper_dp/join_node.rs | 34 +++++++++++++++++-- .../sql/src/planner/optimizer/optimizer.rs | 9 +++-- src/query/sql/src/planner/planner.rs | 3 +- 9 files changed, 59 insertions(+), 21 deletions(-) diff --git a/src/common/exception/Cargo.toml b/src/common/exception/Cargo.toml index 23c13145dfadc..cc723cc592948 100644 --- a/src/common/exception/Cargo.toml +++ b/src/common/exception/Cargo.toml @@ -30,8 +30,8 @@ serde = { workspace = true } serde_json = { workspace = true } tantivy = { workspace = true } thiserror = { workspace = true } -tonic = { workspace = true } tokio = "1.39.2" +tonic = { workspace = true } [package.metadata.cargo-machete] ignored = ["geos"] diff --git a/src/query/service/src/schedulers/scheduler.rs b/src/query/service/src/schedulers/scheduler.rs index 19ba560b45e39..488b41c3e920c 100644 --- a/src/query/service/src/schedulers/scheduler.rs +++ b/src/query/service/src/schedulers/scheduler.rs @@ -17,9 +17,7 @@ use std::sync::Arc; use async_trait::async_trait; use databend_common_exception::Result; use databend_common_expression::DataBlock; -use databend_common_sql::executor::PhysicalPlanBuilder; use databend_common_sql::optimizer::QuerySampleExecutor; -use databend_common_sql::optimizer::SExpr; use futures_util::TryStreamExt; use crate::pipelines::executor::ExecutorSettings; diff --git a/src/query/sql/src/planner/optimizer/dynamic_sample/dynamic_sample.rs b/src/query/sql/src/planner/optimizer/dynamic_sample/dynamic_sample.rs index 24a6cd5e684c5..36ab4f64f0b17 100644 --- a/src/query/sql/src/planner/optimizer/dynamic_sample/dynamic_sample.rs +++ b/src/query/sql/src/planner/optimizer/dynamic_sample/dynamic_sample.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use databend_common_catalog::table_context::TableContext; +use databend_common_exception::ErrorCode; use databend_common_exception::Result; use crate::optimizer::dynamic_sample::filter_selectivity_sample::filter_selectivity_sample; @@ -33,12 +34,9 @@ pub async fn dynamic_sample( RelOperator::Filter(_) => { filter_selectivity_sample(ctx, metadata, s_expr, sample_executor).await } - RelOperator::Join(_) => { - unimplemented!("derive_cardinality_by_sample for join is not supported yet") - } - _ => unreachable!( - "Invalid plan for derive_cardinality_by_sample: {:?}", + _ => Err(ErrorCode::Unimplemented(format!( + "derive_cardinality_by_sample for {:?} is not supported yet", s_expr.plan() - ), + ))), } } diff --git a/src/query/sql/src/planner/optimizer/dynamic_sample/filter_selectivity_sample.rs b/src/query/sql/src/planner/optimizer/dynamic_sample/filter_selectivity_sample.rs index 58baf154dae6e..95a91dd2ad883 100644 --- a/src/query/sql/src/planner/optimizer/dynamic_sample/filter_selectivity_sample.rs +++ b/src/query/sql/src/planner/optimizer/dynamic_sample/filter_selectivity_sample.rs @@ -31,7 +31,6 @@ use crate::optimizer::SExpr; use crate::plans::Aggregate; use crate::plans::AggregateFunction; use crate::plans::AggregateMode; -use crate::plans::Filter; use crate::plans::RelOperator; use crate::plans::ScalarItem; use crate::MetadataRef; @@ -45,7 +44,6 @@ pub async fn filter_selectivity_sample( ) -> Result { // filter cardinality by sample will be called in `dphyp`, so we can ensure the filter is in complex query(contains not only one table) // Because it's meaningless for filter cardinality by sample in single table query. - let filter = Filter::try_from(s_expr.plan().clone())?; let child = s_expr.child(0)?; if let RelOperator::Scan(mut scan) = child.plan().clone() { // Get the table's num_rows @@ -91,7 +89,7 @@ pub async fn filter_selectivity_sample( new_s_expr = SExpr::create_unary(Arc::new(count_agg.into()), Arc::new(new_s_expr)); let mut builder = PhysicalPlanBuilder::new(metadata.clone(), ctx.clone(), true); - let plan = builder.build(s_expr, HashSet::new()).await?; + let plan = builder.build(&new_s_expr, HashSet::new()).await?; let result = sample_executor.execute_query(&plan).await?; if let Some(block) = result.first() { diff --git a/src/query/sql/src/planner/optimizer/dynamic_sample/mod.rs b/src/query/sql/src/planner/optimizer/dynamic_sample/mod.rs index e8bd463d2bf10..63001e5869f8d 100644 --- a/src/query/sql/src/planner/optimizer/dynamic_sample/mod.rs +++ b/src/query/sql/src/planner/optimizer/dynamic_sample/mod.rs @@ -16,4 +16,5 @@ mod dynamic_sample; mod filter_selectivity_sample; mod query_sample_executor; +pub use dynamic_sample::dynamic_sample; pub use query_sample_executor::QuerySampleExecutor; diff --git a/src/query/sql/src/planner/optimizer/hyper_dp/dphyp.rs b/src/query/sql/src/planner/optimizer/hyper_dp/dphyp.rs index 5070e3913d105..c2593504746b9 100644 --- a/src/query/sql/src/planner/optimizer/hyper_dp/dphyp.rs +++ b/src/query/sql/src/planner/optimizer/hyper_dp/dphyp.rs @@ -84,16 +84,18 @@ impl DPhpy { // Parallel process children: start a new dphyp for each child. let ctx = self.ctx.clone(); let metadata = self.metadata.clone(); + let sample_executor = self.sample_executor.clone(); let left_expr = s_expr.children[0].clone(); let left_res = spawn(async move { - let mut dphyp = DPhpy::new(ctx, metadata, self.sample_executor.clone()); + let mut dphyp = DPhpy::new(ctx, metadata, sample_executor); (dphyp.optimize(&left_expr).await, dphyp.table_index_map) }); let ctx = self.ctx.clone(); let metadata = self.metadata.clone(); + let sample_executor = self.sample_executor.clone(); let right_expr = s_expr.children[1].clone(); let right_res = spawn(async move { - let mut dphyp = DPhpy::new(ctx, metadata, self.sample_executor.clone()); + let mut dphyp = DPhpy::new(ctx, metadata, sample_executor); (dphyp.optimize(&right_expr).await, dphyp.table_index_map) }); let left_res = left_res @@ -379,6 +381,8 @@ impl DPhpy { let nodes = self.relation_set_tree.get_relation_set_by_index(idx)?; let ce = relation.cardinality()?; let join = JoinNode { + ctx: self.ctx.clone(), + metadata: self.metadata.clone(), join_type: JoinType::Inner, leaves: Arc::new(nodes.clone()), children: Arc::new(vec![]), @@ -386,6 +390,7 @@ impl DPhpy { cost: 0.0, cardinality: Some(ce), s_expr: None, + sample_executor: self.sample_executor.clone(), }; self.dp_table.insert(nodes, join); } @@ -644,6 +649,8 @@ impl DPhpy { let parent_node = self.dp_table.get(&parent_set); let mut join_node = if !join_conditions.is_empty() { JoinNode { + ctx: self.ctx.clone(), + metadata: self.metadata.clone(), join_type: JoinType::Inner, leaves: Arc::new(parent_set.clone()), children: if left_cardinality < right_cardinality { @@ -655,9 +662,12 @@ impl DPhpy { join_conditions: Arc::new(join_conditions), cardinality: None, s_expr: None, + sample_executor: self.sample_executor.clone(), } } else { JoinNode { + ctx: self.ctx.clone(), + metadata: self.metadata.clone(), join_type: JoinType::Cross, leaves: Arc::new(parent_set.clone()), children: if left_cardinality < right_cardinality { @@ -669,6 +679,7 @@ impl DPhpy { join_conditions: Arc::new(vec![]), cardinality: None, s_expr: None, + sample_executor: self.sample_executor.clone(), } }; if join_node.join_type == JoinType::Inner { diff --git a/src/query/sql/src/planner/optimizer/hyper_dp/join_node.rs b/src/query/sql/src/planner/optimizer/hyper_dp/join_node.rs index 14f388de194ff..9eb60bc90b1ca 100644 --- a/src/query/sql/src/planner/optimizer/hyper_dp/join_node.rs +++ b/src/query/sql/src/planner/optimizer/hyper_dp/join_node.rs @@ -14,9 +14,12 @@ use std::sync::Arc; +use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; +use crate::optimizer::dynamic_sample::dynamic_sample; use crate::optimizer::hyper_dp::join_relation::JoinRelation; +use crate::optimizer::QuerySampleExecutor; use crate::optimizer::RelExpr; use crate::optimizer::SExpr; use crate::plans::Join; @@ -24,10 +27,13 @@ use crate::plans::JoinEquiCondition; use crate::plans::JoinType; use crate::plans::RelOperator; use crate::IndexType; +use crate::MetadataRef; use crate::ScalarExpr; -#[derive(Clone, Debug)] +#[derive(Clone)] pub struct JoinNode { + pub ctx: Arc, + pub metadata: MetadataRef, pub join_type: JoinType, pub leaves: Arc>, pub children: Arc>, @@ -36,6 +42,7 @@ pub struct JoinNode { // Cache cardinality/s_expr after computing. pub cardinality: Option, pub s_expr: Option, + pub sample_executor: Option>, } impl JoinNode { @@ -49,8 +56,29 @@ impl JoinNode { self.s_expr = Some(self.s_expr(relations)); self.s_expr.as_ref().unwrap() }; - let rel_expr = RelExpr::with_s_expr(s_expr); - let card = rel_expr.derive_cardinality()?.cardinality; + let card = if let Some(sample_executor) = &self.sample_executor { + match dynamic_sample( + self.ctx.clone(), + self.metadata.clone(), + s_expr, + sample_executor.clone(), + ) + .await + { + Ok(card) => { + dbg!(card); + card + } + Err(e) => { + dbg!(e); + let rel_expr = RelExpr::with_s_expr(s_expr); + rel_expr.derive_cardinality()?.cardinality + } + } + } else { + let rel_expr = RelExpr::with_s_expr(s_expr); + rel_expr.derive_cardinality()?.cardinality + }; self.cardinality = Some(card); Ok(card) } diff --git a/src/query/sql/src/planner/optimizer/optimizer.rs b/src/query/sql/src/planner/optimizer/optimizer.rs index a36bb5d33c451..8b0ecd8d2490f 100644 --- a/src/query/sql/src/planner/optimizer/optimizer.rs +++ b/src/query/sql/src/planner/optimizer/optimizer.rs @@ -67,7 +67,7 @@ pub struct OptimizerContext { enable_distributed_optimization: bool, enable_join_reorder: bool, enable_dphyp: bool, - + #[educe(Debug(ignore))] sample_executor: Option>, } @@ -99,8 +99,11 @@ impl OptimizerContext { self } - pub fn with_sample_executor(mut self, sample_executor: Arc) -> Self { - self.sample_executor = Some(sample_executor); + pub fn with_sample_executor( + mut self, + sample_executor: Option>, + ) -> Self { + self.sample_executor = sample_executor; self } } diff --git a/src/query/sql/src/planner/planner.rs b/src/query/sql/src/planner/planner.rs index 4a6bc44bcdf95..93bc0722c2acf 100644 --- a/src/query/sql/src/planner/planner.rs +++ b/src/query/sql/src/planner/planner.rs @@ -211,7 +211,8 @@ impl Planner { let opt_ctx = OptimizerContext::new(self.ctx.clone(), metadata.clone()) .with_enable_distributed_optimization(!self.ctx.get_cluster().is_empty()) .with_enable_join_reorder(unsafe { !settings.get_disable_join_reorder()? }) - .with_enable_dphyp(settings.get_enable_dphyp()?); + .with_enable_dphyp(settings.get_enable_dphyp()?) + .with_sample_executor(self.sample_executor.clone()); let optimized_plan = optimize(opt_ctx, plan).await?; let result = (optimized_plan, PlanExtras { From 9d4164e29c67230780e83c76ff530ffa2c13cba9 Mon Sep 17 00:00:00 2001 From: xudong963 Date: Tue, 3 Sep 2024 15:32:26 +0800 Subject: [PATCH 4/8] make it work --- .../dynamic_sample/dynamic_sample.rs | 11 +- .../filter_selectivity_sample.rs | 88 +++++++++------ .../dynamic_sample/join_selectivity_sample.rs | 49 +++++++++ .../planner/optimizer/dynamic_sample/mod.rs | 1 + .../planner/optimizer/hyper_dp/join_node.rs | 4 +- src/query/sql/src/planner/plans/join.rs | 101 ++++++++++-------- 6 files changed, 173 insertions(+), 81 deletions(-) create mode 100644 src/query/sql/src/planner/optimizer/dynamic_sample/join_selectivity_sample.rs diff --git a/src/query/sql/src/planner/optimizer/dynamic_sample/dynamic_sample.rs b/src/query/sql/src/planner/optimizer/dynamic_sample/dynamic_sample.rs index 36ab4f64f0b17..b65a7f45ea588 100644 --- a/src/query/sql/src/planner/optimizer/dynamic_sample/dynamic_sample.rs +++ b/src/query/sql/src/planner/optimizer/dynamic_sample/dynamic_sample.rs @@ -19,21 +19,30 @@ use databend_common_exception::ErrorCode; use databend_common_exception::Result; use crate::optimizer::dynamic_sample::filter_selectivity_sample::filter_selectivity_sample; +use crate::optimizer::dynamic_sample::join_selectivity_sample::join_selectivity_sample; use crate::optimizer::QuerySampleExecutor; +use crate::optimizer::RelExpr; use crate::optimizer::SExpr; +use crate::optimizer::StatInfo; +use crate::plans::Operator; use crate::plans::RelOperator; use crate::MetadataRef; +#[async_recursion::async_recursion(#[recursive::recursive])] pub async fn dynamic_sample( ctx: Arc, metadata: MetadataRef, s_expr: &SExpr, sample_executor: Arc, -) -> Result { +) -> Result> { match s_expr.plan() { RelOperator::Filter(_) => { filter_selectivity_sample(ctx, metadata, s_expr, sample_executor).await } + RelOperator::Join(_) => { + join_selectivity_sample(ctx, metadata, s_expr, sample_executor).await + } + RelOperator::Scan(_) => s_expr.plan().derive_stats(&RelExpr::with_s_expr(s_expr)), _ => Err(ErrorCode::Unimplemented(format!( "derive_cardinality_by_sample for {:?} is not supported yet", s_expr.plan() diff --git a/src/query/sql/src/planner/optimizer/dynamic_sample/filter_selectivity_sample.rs b/src/query/sql/src/planner/optimizer/dynamic_sample/filter_selectivity_sample.rs index 95a91dd2ad883..38d79dd0f068c 100644 --- a/src/query/sql/src/planner/optimizer/dynamic_sample/filter_selectivity_sample.rs +++ b/src/query/sql/src/planner/optimizer/dynamic_sample/filter_selectivity_sample.rs @@ -23,11 +23,15 @@ use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::types::DataType; use databend_common_expression::types::NumberDataType; -use databend_common_expression::types::NumberScalar; +use num_traits::ToPrimitive; use crate::executor::PhysicalPlanBuilder; +use crate::optimizer::statistics::CollectStatisticsOptimizer; use crate::optimizer::QuerySampleExecutor; +use crate::optimizer::RelExpr; use crate::optimizer::SExpr; +use crate::optimizer::SelectivityEstimator; +use crate::optimizer::StatInfo; use crate::plans::Aggregate; use crate::plans::AggregateFunction; use crate::plans::AggregateMode; @@ -41,12 +45,12 @@ pub async fn filter_selectivity_sample( metadata: MetadataRef, s_expr: &SExpr, sample_executor: Arc, -) -> Result { +) -> Result> { // filter cardinality by sample will be called in `dphyp`, so we can ensure the filter is in complex query(contains not only one table) // Because it's meaningless for filter cardinality by sample in single table query. let child = s_expr.child(0)?; + let child_rel_expr = RelExpr::with_s_expr(child); if let RelOperator::Scan(mut scan) = child.plan().clone() { - // Get the table's num_rows let num_rows = scan .statistics .table_stats @@ -57,52 +61,72 @@ pub async fn filter_selectivity_sample( // Calculate sample size (0.2% of total data) let sample_size = (num_rows as f64 * 0.002).ceil(); - // 2. Construct sample field and add it to scan scan.sample = Some(Sample { sample_level: SampleLevel::ROW, sample_conf: SampleConfig::RowsNum(sample_size), }); - // Replace old scan in s_expr let new_child = SExpr::create_leaf(Arc::new(RelOperator::Scan(scan))); let mut new_s_expr = s_expr.replace_children(vec![Arc::new(new_child)]); + let collect_statistics_optimizer = + CollectStatisticsOptimizer::new(ctx.clone(), metadata.clone()); + new_s_expr = collect_statistics_optimizer.run(&new_s_expr).await?; - // Wrap a count aggregate plan to original s_expr - let count_agg = Aggregate { - mode: AggregateMode::Initial, - group_items: vec![], - aggregate_functions: vec![ScalarItem { - scalar: ScalarExpr::AggregateFunction(AggregateFunction { - func_name: "count".to_string(), - distinct: false, - params: vec![], - args: vec![], - return_type: Box::new(DataType::Number(NumberDataType::UInt64)), - display_name: "".to_string(), - }), - index: 0, // Assuming 0 is the correct index for the count result - }], - from_distinct: false, - limit: None, - grouping_sets: None, - }; - new_s_expr = SExpr::create_unary(Arc::new(count_agg.into()), Arc::new(new_s_expr)); + new_s_expr = SExpr::create_unary( + Arc::new(create_count_aggregate(AggregateMode::Partial).into()), + Arc::new(new_s_expr), + ); + new_s_expr = SExpr::create_unary( + Arc::new(create_count_aggregate(AggregateMode::Final).into()), + Arc::new(new_s_expr), + ); - let mut builder = PhysicalPlanBuilder::new(metadata.clone(), ctx.clone(), true); - let plan = builder.build(&new_s_expr, HashSet::new()).await?; + let mut builder = PhysicalPlanBuilder::new(metadata.clone(), ctx.clone(), false); + let mut required = HashSet::new(); + required.insert(0); + let plan = builder.build(&new_s_expr, required).await?; let result = sample_executor.execute_query(&plan).await?; if let Some(block) = result.first() { if let Some(count) = block.get_last_column().as_number() { - if let Some(NumberScalar::UInt64(sampled_count)) = count.index(0) { + dbg!(count); + if let Some(number_scalar) = count.index(0) { // Compute and return selectivity - let selectivity = sampled_count as f64 / sample_size as f64; - return Ok(selectivity); + let selectivity = number_scalar.to_f64().to_f64().unwrap() / sample_size as f64; + dbg!(selectivity); + let mut statistics = child_rel_expr.derive_cardinality()?.statistics.clone(); + let mut sb = SelectivityEstimator::new(&mut statistics, HashSet::new()); + sb.update_other_statistic_by_selectivity(selectivity); + return Ok(Arc::new(StatInfo { + cardinality: selectivity * num_rows as f64, + statistics, + })); } } } } - return Err(ErrorCode::Internal( + Err(ErrorCode::Internal( "Failed to calculate filter selectivity by sample".to_string(), - )); + )) +} + +fn create_count_aggregate(mode: AggregateMode) -> Aggregate { + Aggregate { + mode, + group_items: vec![], + aggregate_functions: vec![ScalarItem { + scalar: ScalarExpr::AggregateFunction(AggregateFunction { + func_name: "count(*)".to_string(), + distinct: false, + params: vec![], + args: vec![], + return_type: Box::new(DataType::Number(NumberDataType::UInt64)), + display_name: "".to_string(), + }), + index: 0, + }], + from_distinct: false, + limit: None, + grouping_sets: None, + } } diff --git a/src/query/sql/src/planner/optimizer/dynamic_sample/join_selectivity_sample.rs b/src/query/sql/src/planner/optimizer/dynamic_sample/join_selectivity_sample.rs new file mode 100644 index 0000000000000..b7fa2affc0e22 --- /dev/null +++ b/src/query/sql/src/planner/optimizer/dynamic_sample/join_selectivity_sample.rs @@ -0,0 +1,49 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use databend_common_catalog::table_context::TableContext; +use databend_common_exception::Result; + +use crate::optimizer::dynamic_sample::dynamic_sample; +use crate::optimizer::QuerySampleExecutor; +use crate::optimizer::SExpr; +use crate::optimizer::StatInfo; +use crate::plans::Join; +use crate::MetadataRef; + +pub async fn join_selectivity_sample( + ctx: Arc, + metadata: MetadataRef, + s_expr: &SExpr, + sample_executor: Arc, +) -> Result> { + let left_stat_info = dynamic_sample( + ctx.clone(), + metadata.clone(), + s_expr.child(0)?, + sample_executor.clone(), + ) + .await?; + let right_stat_info = dynamic_sample( + ctx.clone(), + metadata.clone(), + s_expr.child(1)?, + sample_executor.clone(), + ) + .await?; + let join = Join::try_from(s_expr.plan().clone())?; + join.derive_join_stats(left_stat_info, right_stat_info) +} diff --git a/src/query/sql/src/planner/optimizer/dynamic_sample/mod.rs b/src/query/sql/src/planner/optimizer/dynamic_sample/mod.rs index 63001e5869f8d..be5955ead66e3 100644 --- a/src/query/sql/src/planner/optimizer/dynamic_sample/mod.rs +++ b/src/query/sql/src/planner/optimizer/dynamic_sample/mod.rs @@ -14,6 +14,7 @@ mod dynamic_sample; mod filter_selectivity_sample; +mod join_selectivity_sample; mod query_sample_executor; pub use dynamic_sample::dynamic_sample; diff --git a/src/query/sql/src/planner/optimizer/hyper_dp/join_node.rs b/src/query/sql/src/planner/optimizer/hyper_dp/join_node.rs index 9eb60bc90b1ca..e92c5a238aa7f 100644 --- a/src/query/sql/src/planner/optimizer/hyper_dp/join_node.rs +++ b/src/query/sql/src/planner/optimizer/hyper_dp/join_node.rs @@ -66,8 +66,8 @@ impl JoinNode { .await { Ok(card) => { - dbg!(card); - card + dbg!(card.cardinality); + card.cardinality } Err(e) => { dbg!(e); diff --git a/src/query/sql/src/planner/plans/join.rs b/src/query/sql/src/planner/plans/join.rs index cfc93c8463233..643d44058972c 100644 --- a/src/query/sql/src/planner/plans/join.rs +++ b/src/query/sql/src/planner/plans/join.rs @@ -419,6 +419,59 @@ impl Join { .iter() .any(|condition| condition.is_null_equal) } + + pub fn derive_join_stats( + &self, + left_stat_info: Arc, + right_stat_info: Arc, + ) -> Result> { + let (mut left_cardinality, mut left_statistics) = ( + left_stat_info.cardinality, + left_stat_info.statistics.clone(), + ); + let (mut right_cardinality, mut right_statistics) = ( + right_stat_info.cardinality, + right_stat_info.statistics.clone(), + ); + // Evaluating join cardinality using histograms. + // If histogram is None, will evaluate using NDV. + let inner_join_cardinality = self.inner_join_cardinality( + &mut left_cardinality, + &mut right_cardinality, + &mut left_statistics, + &mut right_statistics, + )?; + let cardinality = match self.join_type { + JoinType::Inner | JoinType::Cross => inner_join_cardinality, + JoinType::Left => f64::max(left_cardinality, inner_join_cardinality), + JoinType::Right => f64::max(right_cardinality, inner_join_cardinality), + JoinType::Full => { + f64::max(left_cardinality, inner_join_cardinality) + + f64::max(right_cardinality, inner_join_cardinality) + - inner_join_cardinality + } + JoinType::LeftSemi => f64::min(left_cardinality, inner_join_cardinality), + JoinType::RightSemi => f64::min(right_cardinality, inner_join_cardinality), + JoinType::LeftSingle | JoinType::RightMark | JoinType::LeftAnti => left_cardinality, + JoinType::RightSingle | JoinType::LeftMark | JoinType::RightAnti => right_cardinality, + }; + // Derive column statistics + let column_stats = if cardinality == 0.0 { + HashMap::new() + } else { + let mut column_stats = HashMap::new(); + column_stats.extend(left_statistics.column_stats); + column_stats.extend(right_statistics.column_stats); + column_stats + }; + Ok(Arc::new(StatInfo { + cardinality, + statistics: Statistics { + precise_cardinality: None, + column_stats, + }, + })) + } } impl Operator for Join { @@ -518,52 +571,8 @@ impl Operator for Join { fn derive_stats(&self, rel_expr: &RelExpr) -> Result> { let left_stat_info = rel_expr.derive_cardinality_child(0)?; let right_stat_info = rel_expr.derive_cardinality_child(1)?; - let (mut left_cardinality, mut left_statistics) = ( - left_stat_info.cardinality, - left_stat_info.statistics.clone(), - ); - let (mut right_cardinality, mut right_statistics) = ( - right_stat_info.cardinality, - right_stat_info.statistics.clone(), - ); - // Evaluating join cardinality using histograms. - // If histogram is None, will evaluate using NDV. - let inner_join_cardinality = self.inner_join_cardinality( - &mut left_cardinality, - &mut right_cardinality, - &mut left_statistics, - &mut right_statistics, - )?; - let cardinality = match self.join_type { - JoinType::Inner | JoinType::Cross => inner_join_cardinality, - JoinType::Left => f64::max(left_cardinality, inner_join_cardinality), - JoinType::Right => f64::max(right_cardinality, inner_join_cardinality), - JoinType::Full => { - f64::max(left_cardinality, inner_join_cardinality) - + f64::max(right_cardinality, inner_join_cardinality) - - inner_join_cardinality - } - JoinType::LeftSemi => f64::min(left_cardinality, inner_join_cardinality), - JoinType::RightSemi => f64::min(right_cardinality, inner_join_cardinality), - JoinType::LeftSingle | JoinType::RightMark | JoinType::LeftAnti => left_cardinality, - JoinType::RightSingle | JoinType::LeftMark | JoinType::RightAnti => right_cardinality, - }; - // Derive column statistics - let column_stats = if cardinality == 0.0 { - HashMap::new() - } else { - let mut column_stats = HashMap::new(); - column_stats.extend(left_statistics.column_stats); - column_stats.extend(right_statistics.column_stats); - column_stats - }; - Ok(Arc::new(StatInfo { - cardinality, - statistics: Statistics { - precise_cardinality: None, - column_stats, - }, - })) + let stat_info = self.derive_join_stats(left_stat_info, right_stat_info)?; + Ok(stat_info) } fn compute_required_prop_child( From 8d6eecd8aba5592bc29583c914fc05b9145405ca Mon Sep 17 00:00:00 2001 From: xudong963 Date: Tue, 3 Sep 2024 23:37:41 +0800 Subject: [PATCH 5/8] save --- .../filter_selectivity_sample.rs | 10 ++--- .../src/planner/optimizer/hyper_dp/dphyp.rs | 17 ++++++--- .../planner/optimizer/hyper_dp/join_node.rs | 26 ++----------- .../optimizer/hyper_dp/join_relation.rs | 37 +++++++++++++++++-- 4 files changed, 52 insertions(+), 38 deletions(-) diff --git a/src/query/sql/src/planner/optimizer/dynamic_sample/filter_selectivity_sample.rs b/src/query/sql/src/planner/optimizer/dynamic_sample/filter_selectivity_sample.rs index 38d79dd0f068c..4c9f5fbfa16d6 100644 --- a/src/query/sql/src/planner/optimizer/dynamic_sample/filter_selectivity_sample.rs +++ b/src/query/sql/src/planner/optimizer/dynamic_sample/filter_selectivity_sample.rs @@ -89,18 +89,18 @@ pub async fn filter_selectivity_sample( let result = sample_executor.execute_query(&plan).await?; if let Some(block) = result.first() { if let Some(count) = block.get_last_column().as_number() { - dbg!(count); if let Some(number_scalar) = count.index(0) { // Compute and return selectivity let selectivity = number_scalar.to_f64().to_f64().unwrap() / sample_size as f64; - dbg!(selectivity); let mut statistics = child_rel_expr.derive_cardinality()?.statistics.clone(); let mut sb = SelectivityEstimator::new(&mut statistics, HashSet::new()); sb.update_other_statistic_by_selectivity(selectivity); - return Ok(Arc::new(StatInfo { + let stat_info = Arc::new(StatInfo { cardinality: selectivity * num_rows as f64, statistics, - })); + }); + *s_expr.stat_info.lock().unwrap() = Some(stat_info.clone()); + return Ok(stat_info); } } } @@ -116,7 +116,7 @@ fn create_count_aggregate(mode: AggregateMode) -> Aggregate { group_items: vec![], aggregate_functions: vec![ScalarItem { scalar: ScalarExpr::AggregateFunction(AggregateFunction { - func_name: "count(*)".to_string(), + func_name: "count".to_string(), distinct: false, params: vec![], args: vec![], diff --git a/src/query/sql/src/planner/optimizer/hyper_dp/dphyp.rs b/src/query/sql/src/planner/optimizer/hyper_dp/dphyp.rs index c2593504746b9..bf3c08fc540a7 100644 --- a/src/query/sql/src/planner/optimizer/hyper_dp/dphyp.rs +++ b/src/query/sql/src/planner/optimizer/hyper_dp/dphyp.rs @@ -141,7 +141,8 @@ impl DPhpy { for table_index in dphyp.table_index_map.keys() { self.table_index_map.insert(*table_index, relation_idx); } - self.join_relations.push(JoinRelation::new(&new_s_expr)); + self.join_relations + .push(JoinRelation::new(&new_s_expr, self.sample_executor.clone())); return Ok((new_s_expr, true)); } @@ -151,9 +152,9 @@ impl DPhpy { // Check if relation contains filter, if exists, check if the filter in `filters` // If exists, remove it from `filters` self.check_filter(relation); - JoinRelation::new(relation) + JoinRelation::new(relation, self.sample_executor.clone()) } else { - JoinRelation::new(s_expr) + JoinRelation::new(s_expr, self.sample_executor.clone()) }; self.table_index_map .insert(op.table_index, self.join_relations.len() as IndexType); @@ -216,7 +217,8 @@ impl DPhpy { } if !is_inner_join { let new_s_expr = self.new_children(s_expr).await?; - self.join_relations.push(JoinRelation::new(&new_s_expr)); + self.join_relations + .push(JoinRelation::new(&new_s_expr, self.sample_executor.clone())); Ok((Arc::new(new_s_expr), true)) } else { let left_res = self @@ -276,7 +278,8 @@ impl DPhpy { } RelOperator::UnionAll(_) => { let new_s_expr = self.new_children(s_expr).await?; - self.join_relations.push(JoinRelation::new(&new_s_expr)); + self.join_relations + .push(JoinRelation::new(&new_s_expr, self.sample_executor.clone())); Ok((Arc::new(new_s_expr), true)) } RelOperator::Exchange(_) => { @@ -379,7 +382,9 @@ impl DPhpy { for (idx, relation) in self.join_relations.iter().enumerate() { // Get nodes in `relation_set_tree` let nodes = self.relation_set_tree.get_relation_set_by_index(idx)?; - let ce = relation.cardinality()?; + let ce = relation + .cardinality(self.ctx.clone(), self.metadata.clone()) + .await?; let join = JoinNode { ctx: self.ctx.clone(), metadata: self.metadata.clone(), diff --git a/src/query/sql/src/planner/optimizer/hyper_dp/join_node.rs b/src/query/sql/src/planner/optimizer/hyper_dp/join_node.rs index e92c5a238aa7f..82a297c177553 100644 --- a/src/query/sql/src/planner/optimizer/hyper_dp/join_node.rs +++ b/src/query/sql/src/planner/optimizer/hyper_dp/join_node.rs @@ -56,29 +56,9 @@ impl JoinNode { self.s_expr = Some(self.s_expr(relations)); self.s_expr.as_ref().unwrap() }; - let card = if let Some(sample_executor) = &self.sample_executor { - match dynamic_sample( - self.ctx.clone(), - self.metadata.clone(), - s_expr, - sample_executor.clone(), - ) - .await - { - Ok(card) => { - dbg!(card.cardinality); - card.cardinality - } - Err(e) => { - dbg!(e); - let rel_expr = RelExpr::with_s_expr(s_expr); - rel_expr.derive_cardinality()?.cardinality - } - } - } else { - let rel_expr = RelExpr::with_s_expr(s_expr); - rel_expr.derive_cardinality()?.cardinality - }; + + let rel_expr = RelExpr::with_s_expr(s_expr); + let card = rel_expr.derive_cardinality()?.cardinality; self.cardinality = Some(card); Ok(card) } diff --git a/src/query/sql/src/planner/optimizer/hyper_dp/join_relation.rs b/src/query/sql/src/planner/optimizer/hyper_dp/join_relation.rs index 37e4b9fd09212..cfdbca562832f 100644 --- a/src/query/sql/src/planner/optimizer/hyper_dp/join_relation.rs +++ b/src/query/sql/src/planner/optimizer/hyper_dp/join_relation.rs @@ -13,22 +13,29 @@ // limitations under the License. use std::collections::HashSet; +use std::sync::Arc; use ahash::HashMap; +use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; +use crate::optimizer::dynamic_sample::dynamic_sample; +use crate::optimizer::QuerySampleExecutor; use crate::optimizer::RelExpr; use crate::optimizer::SExpr; use crate::IndexType; +use crate::MetadataRef; pub struct JoinRelation { s_expr: SExpr, + sample_executor: Option>, } impl JoinRelation { - pub fn new(s_expr: &SExpr) -> Self { + pub fn new(s_expr: &SExpr, sample_executor: Option>) -> Self { Self { s_expr: s_expr.clone(), + sample_executor, } } @@ -36,9 +43,31 @@ impl JoinRelation { self.s_expr.clone() } - pub fn cardinality(&self) -> Result { - let rel_expr = RelExpr::with_s_expr(&self.s_expr); - Ok(rel_expr.derive_cardinality()?.cardinality) + pub async fn cardinality( + &self, + ctx: Arc, + metadata: MetadataRef, + ) -> Result { + let card = if let Some(sample_executor) = &self.sample_executor { + match dynamic_sample( + ctx.clone(), + metadata.clone(), + &self.s_expr, + sample_executor.clone(), + ) + .await + { + Ok(card) => card.cardinality, + Err(e) => { + let rel_expr = RelExpr::with_s_expr(&self.s_expr); + rel_expr.derive_cardinality()?.cardinality + } + } + } else { + let rel_expr = RelExpr::with_s_expr(&self.s_expr); + rel_expr.derive_cardinality()?.cardinality + }; + Ok(card) } } From 6b5bf73dd7afb135de941875539434207ce793c8 Mon Sep 17 00:00:00 2001 From: xudong963 Date: Wed, 4 Sep 2024 13:46:00 +0800 Subject: [PATCH 6/8] fix estimate row --- .../filter/deduplicate_join_condition.rs | 11 ++++++++-- .../src/planner/optimizer/hyper_dp/dphyp.rs | 9 -------- .../planner/optimizer/hyper_dp/join_node.rs | 4 ---- .../optimizer/hyper_dp/join_relation.rs | 2 +- .../planner/optimizer/join/single_to_inner.rs | 22 +++++++++++++------ .../sql/src/planner/optimizer/optimizer.rs | 13 +++++++++-- 6 files changed, 36 insertions(+), 25 deletions(-) diff --git a/src/query/sql/src/planner/optimizer/filter/deduplicate_join_condition.rs b/src/query/sql/src/planner/optimizer/filter/deduplicate_join_condition.rs index 47eb35c9a438e..00fa66f05ca4c 100644 --- a/src/query/sql/src/planner/optimizer/filter/deduplicate_join_condition.rs +++ b/src/query/sql/src/planner/optimizer/filter/deduplicate_join_condition.rs @@ -95,9 +95,16 @@ impl DeduplicateJoinConditionOptimizer { pub fn deduplicate_children(&mut self, s_expr: &SExpr) -> Result { let mut children = Vec::with_capacity(s_expr.arity()); + let mut children_changed = false; for child in s_expr.children() { - let child = self.deduplicate(child)?; - children.push(Arc::new(child)); + let optimized_child = self.deduplicate(child)?; + if !optimized_child.eq(child) { + children_changed = true; + } + children.push(Arc::new(optimized_child)); + } + if !children_changed { + return Ok(s_expr.clone()); } Ok(s_expr.replace_children(children)) } diff --git a/src/query/sql/src/planner/optimizer/hyper_dp/dphyp.rs b/src/query/sql/src/planner/optimizer/hyper_dp/dphyp.rs index bf3c08fc540a7..1cb9762f61c59 100644 --- a/src/query/sql/src/planner/optimizer/hyper_dp/dphyp.rs +++ b/src/query/sql/src/planner/optimizer/hyper_dp/dphyp.rs @@ -386,8 +386,6 @@ impl DPhpy { .cardinality(self.ctx.clone(), self.metadata.clone()) .await?; let join = JoinNode { - ctx: self.ctx.clone(), - metadata: self.metadata.clone(), join_type: JoinType::Inner, leaves: Arc::new(nodes.clone()), children: Arc::new(vec![]), @@ -395,7 +393,6 @@ impl DPhpy { cost: 0.0, cardinality: Some(ce), s_expr: None, - sample_executor: self.sample_executor.clone(), }; self.dp_table.insert(nodes, join); } @@ -654,8 +651,6 @@ impl DPhpy { let parent_node = self.dp_table.get(&parent_set); let mut join_node = if !join_conditions.is_empty() { JoinNode { - ctx: self.ctx.clone(), - metadata: self.metadata.clone(), join_type: JoinType::Inner, leaves: Arc::new(parent_set.clone()), children: if left_cardinality < right_cardinality { @@ -667,12 +662,9 @@ impl DPhpy { join_conditions: Arc::new(join_conditions), cardinality: None, s_expr: None, - sample_executor: self.sample_executor.clone(), } } else { JoinNode { - ctx: self.ctx.clone(), - metadata: self.metadata.clone(), join_type: JoinType::Cross, leaves: Arc::new(parent_set.clone()), children: if left_cardinality < right_cardinality { @@ -684,7 +676,6 @@ impl DPhpy { join_conditions: Arc::new(vec![]), cardinality: None, s_expr: None, - sample_executor: self.sample_executor.clone(), } }; if join_node.join_type == JoinType::Inner { diff --git a/src/query/sql/src/planner/optimizer/hyper_dp/join_node.rs b/src/query/sql/src/planner/optimizer/hyper_dp/join_node.rs index 82a297c177553..3ff1ac2d49ad8 100644 --- a/src/query/sql/src/planner/optimizer/hyper_dp/join_node.rs +++ b/src/query/sql/src/planner/optimizer/hyper_dp/join_node.rs @@ -17,7 +17,6 @@ use std::sync::Arc; use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; -use crate::optimizer::dynamic_sample::dynamic_sample; use crate::optimizer::hyper_dp::join_relation::JoinRelation; use crate::optimizer::QuerySampleExecutor; use crate::optimizer::RelExpr; @@ -32,8 +31,6 @@ use crate::ScalarExpr; #[derive(Clone)] pub struct JoinNode { - pub ctx: Arc, - pub metadata: MetadataRef, pub join_type: JoinType, pub leaves: Arc>, pub children: Arc>, @@ -42,7 +39,6 @@ pub struct JoinNode { // Cache cardinality/s_expr after computing. pub cardinality: Option, pub s_expr: Option, - pub sample_executor: Option>, } impl JoinNode { diff --git a/src/query/sql/src/planner/optimizer/hyper_dp/join_relation.rs b/src/query/sql/src/planner/optimizer/hyper_dp/join_relation.rs index cfdbca562832f..d98b0ebf66b25 100644 --- a/src/query/sql/src/planner/optimizer/hyper_dp/join_relation.rs +++ b/src/query/sql/src/planner/optimizer/hyper_dp/join_relation.rs @@ -58,7 +58,7 @@ impl JoinRelation { .await { Ok(card) => card.cardinality, - Err(e) => { + Err(_) => { let rel_expr = RelExpr::with_s_expr(&self.s_expr); rel_expr.derive_cardinality()?.cardinality } diff --git a/src/query/sql/src/planner/optimizer/join/single_to_inner.rs b/src/query/sql/src/planner/optimizer/join/single_to_inner.rs index 85545c1199528..9eb465a36010d 100644 --- a/src/query/sql/src/planner/optimizer/join/single_to_inner.rs +++ b/src/query/sql/src/planner/optimizer/join/single_to_inner.rs @@ -33,20 +33,28 @@ impl SingleToInnerOptimizer { } fn single_to_inner(s_expr: &SExpr) -> Result { - let s_expr = if let RelOperator::Join(join) = s_expr.plan.as_ref() { + let mut s_expr = if let RelOperator::Join(join) = s_expr.plan.as_ref() + && join.single_to_inner.is_some() + { let mut join = join.clone(); - if join.single_to_inner.is_some() { - join.join_type = JoinType::Inner; - } + join.join_type = JoinType::Inner; s_expr.replace_plan(Arc::new(RelOperator::Join(join))) } else { s_expr.clone() }; + let mut children_changed = false; let mut children = Vec::with_capacity(s_expr.arity()); for child in s_expr.children() { - let child = Self::single_to_inner(child)?; - children.push(Arc::new(child)); + let new_child = Self::single_to_inner(child)?; + if !new_child.eq(&child) { + children_changed = true; + } + children.push(Arc::new(new_child)); } - Ok(s_expr.replace_children(children)) + if children_changed { + s_expr = s_expr.replace_children(children); + } + + Ok(s_expr) } } diff --git a/src/query/sql/src/planner/optimizer/optimizer.rs b/src/query/sql/src/planner/optimizer/optimizer.rs index 8b0ecd8d2490f..461a267b509c1 100644 --- a/src/query/sql/src/planner/optimizer/optimizer.rs +++ b/src/query/sql/src/planner/optimizer/optimizer.rs @@ -130,10 +130,19 @@ impl<'a> RecursiveOptimizer<'a> { #[recursive::recursive] fn optimize_expression(&self, s_expr: &SExpr) -> Result { let mut optimized_children = Vec::with_capacity(s_expr.arity()); + let mut children_changed = false; for expr in s_expr.children() { - optimized_children.push(Arc::new(self.run(expr)?)); + let optimized_child = self.run(expr)?; + if !optimized_child.eq(expr) { + children_changed = true; + } + optimized_children.push(Arc::new(optimized_child)); + } + let mut optimized_expr = s_expr.clone(); + if children_changed { + optimized_expr = s_expr.replace_children(optimized_children); } - let optimized_expr = s_expr.replace_children(optimized_children); + let result = self.apply_transform_rules(&optimized_expr, self.rules)?; Ok(result) From e8d85a4cc9a5ea9bf37cb0c9ee52e8e43dea0a82 Mon Sep 17 00:00:00 2001 From: xudong963 Date: Wed, 4 Sep 2024 14:34:55 +0800 Subject: [PATCH 7/8] add time budget --- Cargo.lock | 1 + .../service/src/interpreters/interpreter.rs | 2 +- src/query/settings/src/settings_default.rs | 8 +++- .../settings/src/settings_getter_setter.rs | 4 ++ src/query/sql/Cargo.toml | 1 + .../dynamic_sample/dynamic_sample.rs | 39 ++++++++++++++++++- .../planner/optimizer/hyper_dp/join_node.rs | 3 -- .../optimizer/hyper_dp/join_relation.rs | 12 ++---- 8 files changed, 55 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7afbda6e7e410..09d99a4e4fdca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4166,6 +4166,7 @@ dependencies = [ "sha2", "simsearch", "time", + "tokio", "url", ] diff --git a/src/query/service/src/interpreters/interpreter.rs b/src/query/service/src/interpreters/interpreter.rs index 80b004b6e5ad6..2154feb8e3e55 100644 --- a/src/query/service/src/interpreters/interpreter.rs +++ b/src/query/service/src/interpreters/interpreter.rs @@ -51,8 +51,8 @@ use crate::pipelines::executor::ExecutorSettings; use crate::pipelines::executor::PipelineCompleteExecutor; use crate::pipelines::executor::PipelinePullingExecutor; use crate::pipelines::PipelineBuildResult; -use crate::servers::http::v1::ClientSessionManager; use crate::schedulers::ServiceQueryExecutor; +use crate::servers::http::v1::ClientSessionManager; use crate::sessions::QueryContext; use crate::sessions::SessionManager; use crate::sessions::SessionType; diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index b496271fdb6da..af9dc3af99543 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -848,7 +848,13 @@ impl DefaultSettings { desc: "Seed for random function", mode: SettingMode::Both, range: Some(SettingRange::Numeric(0..=1)), - }) + }), + ("dynamic_sample_time_budget_ms", DefaultSettingValue { + value: UserSettingValue::UInt64(0), + desc: "Time budget for dynamic sample in milliseconds", + mode: SettingMode::Both, + range: Some(SettingRange::Numeric(0..=u64::MAX)), + }), ]); Ok(Arc::new(DefaultSettings { diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index 0cc3b68c96cf7..07e2c1b0dbc49 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -701,4 +701,8 @@ impl Settings { pub fn get_random_function_seed(&self) -> Result { Ok(self.try_get_u64("random_function_seed")? == 1) } + + pub fn get_dynamic_sample_time_budget_ms(&self) -> Result { + self.try_get_u64("dynamic_sample_time_budget_ms") + } } diff --git a/src/query/sql/Cargo.toml b/src/query/sql/Cargo.toml index 2dfa30961a612..93fbfd52b75db 100644 --- a/src/query/sql/Cargo.toml +++ b/src/query/sql/Cargo.toml @@ -74,6 +74,7 @@ sha2 = { workspace = true } simsearch = "0.2" time = "0.3.14" url = "2.3.1" +tokio = "1.39.2" [lints] workspace = true diff --git a/src/query/sql/src/planner/optimizer/dynamic_sample/dynamic_sample.rs b/src/query/sql/src/planner/optimizer/dynamic_sample/dynamic_sample.rs index b65a7f45ea588..994683c5c0ef3 100644 --- a/src/query/sql/src/planner/optimizer/dynamic_sample/dynamic_sample.rs +++ b/src/query/sql/src/planner/optimizer/dynamic_sample/dynamic_sample.rs @@ -13,7 +13,9 @@ // limitations under the License. use std::sync::Arc; +use std::time::Duration; +use databend_common_base::base::tokio::time::Instant; use databend_common_catalog::table_context::TableContext; use databend_common_exception::ErrorCode; use databend_common_exception::Result; @@ -35,9 +37,44 @@ pub async fn dynamic_sample( s_expr: &SExpr, sample_executor: Arc, ) -> Result> { + let time_budget = + Duration::from_millis(ctx.get_settings().get_dynamic_sample_time_budget_ms()?); + let start_time = Instant::now(); + + async fn sample_with_budget( + start_time: Instant, + time_budget: Duration, + fallback: F, + sample_fn: impl FnOnce() -> Fut, + ) -> Result> + where + F: FnOnce() -> Result>, + Fut: std::future::Future>>, + { + if time_budget.as_millis() == 0 || start_time.elapsed() > time_budget { + fallback() + } else { + let remaining_time = time_budget - start_time.elapsed(); + match tokio::time::timeout(remaining_time, sample_fn()).await { + Ok(Ok(result)) => Ok(result), + // The error contains the timeout error or the error from the sample_fn + Ok(Err(_)) | Err(_) => fallback(), + } + } + } + match s_expr.plan() { RelOperator::Filter(_) => { - filter_selectivity_sample(ctx, metadata, s_expr, sample_executor).await + sample_with_budget( + start_time, + time_budget, + || { + let rel_expr = RelExpr::with_s_expr(s_expr); + rel_expr.derive_cardinality() + }, + || filter_selectivity_sample(ctx, metadata, s_expr, sample_executor), + ) + .await } RelOperator::Join(_) => { join_selectivity_sample(ctx, metadata, s_expr, sample_executor).await diff --git a/src/query/sql/src/planner/optimizer/hyper_dp/join_node.rs b/src/query/sql/src/planner/optimizer/hyper_dp/join_node.rs index 3ff1ac2d49ad8..15ca730ae0818 100644 --- a/src/query/sql/src/planner/optimizer/hyper_dp/join_node.rs +++ b/src/query/sql/src/planner/optimizer/hyper_dp/join_node.rs @@ -14,11 +14,9 @@ use std::sync::Arc; -use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; use crate::optimizer::hyper_dp::join_relation::JoinRelation; -use crate::optimizer::QuerySampleExecutor; use crate::optimizer::RelExpr; use crate::optimizer::SExpr; use crate::plans::Join; @@ -26,7 +24,6 @@ use crate::plans::JoinEquiCondition; use crate::plans::JoinType; use crate::plans::RelOperator; use crate::IndexType; -use crate::MetadataRef; use crate::ScalarExpr; #[derive(Clone)] diff --git a/src/query/sql/src/planner/optimizer/hyper_dp/join_relation.rs b/src/query/sql/src/planner/optimizer/hyper_dp/join_relation.rs index d98b0ebf66b25..b3e7cc6dbfb70 100644 --- a/src/query/sql/src/planner/optimizer/hyper_dp/join_relation.rs +++ b/src/query/sql/src/planner/optimizer/hyper_dp/join_relation.rs @@ -49,20 +49,14 @@ impl JoinRelation { metadata: MetadataRef, ) -> Result { let card = if let Some(sample_executor) = &self.sample_executor { - match dynamic_sample( + dynamic_sample( ctx.clone(), metadata.clone(), &self.s_expr, sample_executor.clone(), ) - .await - { - Ok(card) => card.cardinality, - Err(_) => { - let rel_expr = RelExpr::with_s_expr(&self.s_expr); - rel_expr.derive_cardinality()?.cardinality - } - } + .await? + .cardinality } else { let rel_expr = RelExpr::with_s_expr(&self.s_expr); rel_expr.derive_cardinality()?.cardinality From d49a135806c3fcb12aac9c2d0d099f65c11a84d0 Mon Sep 17 00:00:00 2001 From: xudong963 Date: Wed, 4 Sep 2024 17:02:59 +0800 Subject: [PATCH 8/8] add test --- Cargo.lock | 1 - src/common/exception/Cargo.toml | 1 - src/query/sql/Cargo.toml | 2 +- .../dynamic_sample/dynamic_sample.rs | 10 +- .../filter_selectivity_sample.rs | 28 +- .../planner/optimizer/dynamic_sample/mod.rs | 1 + .../planner/optimizer/join/single_to_inner.rs | 2 +- tests/sqllogictests/suites/tpch/sample.test | 407 ++++++++++++++++++ 8 files changed, 430 insertions(+), 22 deletions(-) create mode 100644 tests/sqllogictests/suites/tpch/sample.test diff --git a/Cargo.lock b/Cargo.lock index 09d99a4e4fdca..24a45026bb26c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3345,7 +3345,6 @@ dependencies = [ "serde_json", "tantivy", "thiserror", - "tokio", "tonic", ] diff --git a/src/common/exception/Cargo.toml b/src/common/exception/Cargo.toml index cc723cc592948..033999b74d0d2 100644 --- a/src/common/exception/Cargo.toml +++ b/src/common/exception/Cargo.toml @@ -30,7 +30,6 @@ serde = { workspace = true } serde_json = { workspace = true } tantivy = { workspace = true } thiserror = { workspace = true } -tokio = "1.39.2" tonic = { workspace = true } [package.metadata.cargo-machete] diff --git a/src/query/sql/Cargo.toml b/src/query/sql/Cargo.toml index 93fbfd52b75db..84596e0ed17d2 100644 --- a/src/query/sql/Cargo.toml +++ b/src/query/sql/Cargo.toml @@ -73,8 +73,8 @@ serde = { workspace = true } sha2 = { workspace = true } simsearch = "0.2" time = "0.3.14" -url = "2.3.1" tokio = "1.39.2" +url = "2.3.1" [lints] workspace = true diff --git a/src/query/sql/src/planner/optimizer/dynamic_sample/dynamic_sample.rs b/src/query/sql/src/planner/optimizer/dynamic_sample/dynamic_sample.rs index 994683c5c0ef3..07e8abf91402b 100644 --- a/src/query/sql/src/planner/optimizer/dynamic_sample/dynamic_sample.rs +++ b/src/query/sql/src/planner/optimizer/dynamic_sample/dynamic_sample.rs @@ -17,7 +17,6 @@ use std::time::Duration; use databend_common_base::base::tokio::time::Instant; use databend_common_catalog::table_context::TableContext; -use databend_common_exception::ErrorCode; use databend_common_exception::Result; use crate::optimizer::dynamic_sample::filter_selectivity_sample::filter_selectivity_sample; @@ -80,9 +79,10 @@ pub async fn dynamic_sample( join_selectivity_sample(ctx, metadata, s_expr, sample_executor).await } RelOperator::Scan(_) => s_expr.plan().derive_stats(&RelExpr::with_s_expr(s_expr)), - _ => Err(ErrorCode::Unimplemented(format!( - "derive_cardinality_by_sample for {:?} is not supported yet", - s_expr.plan() - ))), + // Todo: add more operators here, and support more query patterns. + _ => { + let rel_expr = RelExpr::with_s_expr(s_expr); + rel_expr.derive_cardinality() + } } } diff --git a/src/query/sql/src/planner/optimizer/dynamic_sample/filter_selectivity_sample.rs b/src/query/sql/src/planner/optimizer/dynamic_sample/filter_selectivity_sample.rs index 4c9f5fbfa16d6..262bf06900f22 100644 --- a/src/query/sql/src/planner/optimizer/dynamic_sample/filter_selectivity_sample.rs +++ b/src/query/sql/src/planner/optimizer/dynamic_sample/filter_selectivity_sample.rs @@ -60,17 +60,19 @@ pub async fn filter_selectivity_sample( // Calculate sample size (0.2% of total data) let sample_size = (num_rows as f64 * 0.002).ceil(); - - scan.sample = Some(Sample { - sample_level: SampleLevel::ROW, - sample_conf: SampleConfig::RowsNum(sample_size), - }); - - let new_child = SExpr::create_leaf(Arc::new(RelOperator::Scan(scan))); - let mut new_s_expr = s_expr.replace_children(vec![Arc::new(new_child)]); - let collect_statistics_optimizer = - CollectStatisticsOptimizer::new(ctx.clone(), metadata.clone()); - new_s_expr = collect_statistics_optimizer.run(&new_s_expr).await?; + let mut new_s_expr = s_expr.clone(); + // If the table is too small, we don't need to sample. + if sample_size >= 10.0 { + scan.sample = Some(Sample { + sample_level: SampleLevel::ROW, + sample_conf: SampleConfig::RowsNum(sample_size), + }); + let new_child = SExpr::create_leaf(Arc::new(RelOperator::Scan(scan))); + new_s_expr = s_expr.replace_children(vec![Arc::new(new_child)]); + let collect_statistics_optimizer = + CollectStatisticsOptimizer::new(ctx.clone(), metadata.clone()); + new_s_expr = collect_statistics_optimizer.run(&new_s_expr).await?; + } new_s_expr = SExpr::create_unary( Arc::new(create_count_aggregate(AggregateMode::Partial).into()), @@ -91,12 +93,12 @@ pub async fn filter_selectivity_sample( if let Some(count) = block.get_last_column().as_number() { if let Some(number_scalar) = count.index(0) { // Compute and return selectivity - let selectivity = number_scalar.to_f64().to_f64().unwrap() / sample_size as f64; + let selectivity = number_scalar.to_f64().to_f64().unwrap() / sample_size; let mut statistics = child_rel_expr.derive_cardinality()?.statistics.clone(); let mut sb = SelectivityEstimator::new(&mut statistics, HashSet::new()); sb.update_other_statistic_by_selectivity(selectivity); let stat_info = Arc::new(StatInfo { - cardinality: selectivity * num_rows as f64, + cardinality: (selectivity * num_rows as f64).ceil(), statistics, }); *s_expr.stat_info.lock().unwrap() = Some(stat_info.clone()); diff --git a/src/query/sql/src/planner/optimizer/dynamic_sample/mod.rs b/src/query/sql/src/planner/optimizer/dynamic_sample/mod.rs index be5955ead66e3..0998554242d58 100644 --- a/src/query/sql/src/planner/optimizer/dynamic_sample/mod.rs +++ b/src/query/sql/src/planner/optimizer/dynamic_sample/mod.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#[allow(clippy::module_inception)] mod dynamic_sample; mod filter_selectivity_sample; mod join_selectivity_sample; diff --git a/src/query/sql/src/planner/optimizer/join/single_to_inner.rs b/src/query/sql/src/planner/optimizer/join/single_to_inner.rs index 9eb465a36010d..6ae2cd0f34dd1 100644 --- a/src/query/sql/src/planner/optimizer/join/single_to_inner.rs +++ b/src/query/sql/src/planner/optimizer/join/single_to_inner.rs @@ -46,7 +46,7 @@ impl SingleToInnerOptimizer { let mut children = Vec::with_capacity(s_expr.arity()); for child in s_expr.children() { let new_child = Self::single_to_inner(child)?; - if !new_child.eq(&child) { + if !new_child.eq(child) { children_changed = true; } children.push(Arc::new(new_child)); diff --git a/tests/sqllogictests/suites/tpch/sample.test b/tests/sqllogictests/suites/tpch/sample.test new file mode 100644 index 0000000000000..3d1fbafaabc43 --- /dev/null +++ b/tests/sqllogictests/suites/tpch/sample.test @@ -0,0 +1,407 @@ +statement ok +set sandbox_tenant = 'test_tenant'; + +statement ok +use tpch_test; + +# To make the test stable, we set the dynamic_sample_time_budget_ms to a large vale +statement ok +set dynamic_sample_time_budget_ms = 10000; + +# cbo will remove the `stat_info` computed by sample, so we need to disable cbo to see the estimate row info in explain +statement ok +set enable_cbo = 0; + +statement ok +set random_function_seed = 1; + +# estimated rows and output rows are similar for filter +# the test is flaky, so only put it there as a reference. +onlyif todo +query ok +EXPLAIN ANALYZE PARTIAL +SELECT + * +FROM + orders, + lineitem +WHERE + o_orderkey = l_orderkey + AND l_shipmode LIKE '%MAIL%'; +---- +HashJoin +├── estimated rows: 66953.00 +├── output rows: 85.95 thousand +├── Filter +│ ├── filters: [like(lineitem.l_shipmode (#23), '%MAIL%')] +│ ├── estimated rows: 66953.00 +│ ├── output rows: 85.95 thousand +│ └── TableScan +│ ├── table: default.tpch_test.lineitem +│ ├── estimated rows: 600572.00 +│ └── output rows: 600.57 thousand +└── TableScan + ├── table: default.tpch_test.orders + ├── estimated rows: 150000.00 + └── output rows: 150 thousand + +statement ok +set enable_cbo = 1; + +# use `join.test` to test dynamic sample framework without error +query I +select + c_custkey, count(o_orderkey) as c_count +from + customer + full outer join + orders + on c_custkey = o_custkey + and o_comment not like '%pending%deposits%' and c_custkey > 100 and c_custkey < 120 +group by + c_custkey +order by c_custkey + limit 20; +---- +1 0 +2 0 +3 0 +4 0 +5 0 +6 0 +7 0 +8 0 +9 0 +10 0 +11 0 +12 0 +13 0 +14 0 +15 0 +16 0 +17 0 +18 0 +19 0 +20 0 + + +query I +select + c_custkey +from + customer + inner join + orders + on c_custkey = o_custkey + and o_comment not like '%pending%deposits%' and c_custkey > 100 and c_custkey < 120 order by c_custkey limit 20; +---- +101 +101 +101 +101 +101 +101 +101 +101 +101 +101 +101 +101 +103 +103 +103 +103 +103 +103 +103 +103 + +query I +select + c_custkey, count(o_orderkey) as c_count +from + customer + left join + orders + on c_custkey = o_custkey + and o_comment not like '%pending%deposits%' and c_custkey > 100 and c_custkey < 120 +group by + c_custkey +order by c_custkey + limit 20; +---- +1 0 +2 0 +3 0 +4 0 +5 0 +6 0 +7 0 +8 0 +9 0 +10 0 +11 0 +12 0 +13 0 +14 0 +15 0 +16 0 +17 0 +18 0 +19 0 +20 0 + + +query I +select + c_custkey, count(o_orderkey) as c_count +from + customer + right join + orders + on c_custkey = o_custkey + and o_comment not like '%pending%deposits%' and c_custkey > 100 and c_custkey < 120 +group by + c_custkey +order by c_custkey +limit 20; +---- +101 12 +103 18 +104 7 +106 18 +107 12 +109 25 +110 9 +112 19 +113 17 +115 28 +116 4 +118 18 +119 10 +NULL 149803 + +query I +select + c_custkey +from + customer + left semi join + orders + on c_custkey = o_custkey + and o_comment not like '%pending%deposits%' and c_custkey > 100 and c_custkey < 120 +order by c_custkey + limit 20; +---- +101 +103 +104 +106 +107 +109 +110 +112 +113 +115 +116 +118 +119 + +query I +select + o_custkey +from + customer + right semi join + orders +on c_custkey = o_custkey + and o_comment not like '%pending%deposits%' and c_custkey > 100 and c_custkey < 120 +order by o_custkey + limit 20; +---- +101 +101 +101 +101 +101 +101 +101 +101 +101 +101 +101 +101 +103 +103 +103 +103 +103 +103 +103 +103 + +query I +select + c_custkey +from + customer + left anti join + orders +on c_custkey = o_custkey + and o_comment not like '%pending%deposits%' and c_custkey > 100 and c_custkey < 120 +order by c_custkey + limit 20; +---- +1 +2 +3 +4 +5 +6 +7 +8 +9 +10 +11 +12 +13 +14 +15 +16 +17 +18 +19 +20 + +query I +select + o_custkey +from + customer + right anti join + orders +on c_custkey = o_custkey + and o_comment not like '%pending%deposits%' and c_custkey > 100 and c_custkey < 120 +order by o_custkey + limit 20; +---- +1 +1 +1 +1 +1 +1 +1 +1 +1 +2 +2 +2 +2 +2 +2 +2 +2 +2 +2 +2 + +query I +select + o_comment +from + customer + cross join + orders +where o_comment not like '%pending%deposits%' and c_custkey > 100 and c_custkey < 120 +order by o_comment + limit 20; +---- + Tiresias about the blithely ironic a + Tiresias about the blithely ironic a + Tiresias about the blithely ironic a + Tiresias about the blithely ironic a + Tiresias about the blithely ironic a + Tiresias about the blithely ironic a + Tiresias about the blithely ironic a + Tiresias about the blithely ironic a + Tiresias about the blithely ironic a + Tiresias about the blithely ironic a + Tiresias about the blithely ironic a + Tiresias about the blithely ironic a + Tiresias about the blithely ironic a + Tiresias about the blithely ironic a + Tiresias about the blithely ironic a + Tiresias about the blithely ironic a + Tiresias about the blithely ironic a + Tiresias about the blithely ironic a + Tiresias about the blithely ironic a + Tiresias above the carefully ironic packages nag about the pend + +statement ok +set max_block_size = 1024; + + +# Test iejoin with large dataset +query I +select l_orderkey from (select * from lineitem order by l_orderkey limit 5000) as l, (select * from orders order by o_orderkey limit 5000) as o where l.l_orderkey > o.o_orderkey and l.l_partkey < o.o_custkey order by l_orderkey limit 10; +---- +3 +3 +3 +3 +3 +3 +3 +4 +5 +5 + +statement ok +set max_block_size = 65536; + +query I +select l_orderkey from (select * from lineitem order by l_orderkey limit 5000) as l, (select * from orders order by o_orderkey limit 5000) as o where l.l_orderkey > o.o_orderkey order by l_orderkey limit 10; +---- +2 +3 +3 +3 +3 +3 +3 +3 +3 +3 + +# LEFT OUTER / LEFT SINGEL / FULL +query I +select l_orderkey, o_orderdate, o_shippriority from lineitem left join orders on l_orderkey = o_orderkey and o_orderdate < to_date('1995-03-15') order by o_orderdate, l_orderkey limit 5; +---- +3271 1992-01-01 0 +3271 1992-01-01 0 +3271 1992-01-01 0 +3271 1992-01-01 0 +5607 1992-01-01 0 + +# LEFT ANTI +query I +select o_custkey from orders where not exists (select * from customer where substring(c_phone from 1 for 2) in ('13', '31', '23', '29', '30', '18', '17') and o_custkey = c_custkey) order by o_custkey limit 10; +---- +1 +1 +1 +1 +1 +1 +1 +1 +1 +4 + + +statement ok +set random_function_seed = 0; + +statement ok +set dynamic_sample_time_budget_ms = 0;