diff --git a/Cargo.lock b/Cargo.lock index 8cb0c51cbc082..24a45026bb26c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4165,6 +4165,7 @@ dependencies = [ "sha2", "simsearch", "time", + "tokio", "url", ] diff --git a/src/query/service/src/interpreters/interpreter.rs b/src/query/service/src/interpreters/interpreter.rs index 7a0d5bba52682..2154feb8e3e55 100644 --- a/src/query/service/src/interpreters/interpreter.rs +++ b/src/query/service/src/interpreters/interpreter.rs @@ -51,6 +51,7 @@ use crate::pipelines::executor::ExecutorSettings; use crate::pipelines::executor::PipelineCompleteExecutor; use crate::pipelines::executor::PipelinePullingExecutor; use crate::pipelines::PipelineBuildResult; +use crate::schedulers::ServiceQueryExecutor; use crate::servers::http::v1::ClientSessionManager; use crate::sessions::QueryContext; use crate::sessions::SessionManager; @@ -205,7 +206,10 @@ fn log_query_finished(ctx: &QueryContext, error: Option, has_profiles /// /// This function is used to plan the SQL. If an error occurs, we will log the query start and finished. pub async fn interpreter_plan_sql(ctx: Arc, sql: &str) -> Result<(Plan, PlanExtras)> { - let mut planner = Planner::new(ctx.clone()); + let mut planner = Planner::new_with_sample_executor( + ctx.clone(), + Arc::new(ServiceQueryExecutor::new(ctx.clone())), + ); let result = planner.plan_sql(sql).await; let short_sql = short_sql(sql.to_string()); let mut stmt = if let Ok((_, extras)) = &result { diff --git a/src/query/service/src/schedulers/scheduler.rs b/src/query/service/src/schedulers/scheduler.rs index 3333849f3d59c..488b41c3e920c 100644 --- a/src/query/service/src/schedulers/scheduler.rs +++ b/src/query/service/src/schedulers/scheduler.rs @@ -14,8 +14,14 @@ use std::sync::Arc; +use async_trait::async_trait; use databend_common_exception::Result; +use databend_common_expression::DataBlock; +use databend_common_sql::optimizer::QuerySampleExecutor; +use futures_util::TryStreamExt; +use crate::pipelines::executor::ExecutorSettings; +use crate::pipelines::executor::PipelinePullingExecutor; use crate::pipelines::PipelineBuildResult; use crate::pipelines::PipelineBuilder; use crate::schedulers::Fragmenter; @@ -24,6 +30,7 @@ use crate::sessions::QueryContext; use crate::sessions::TableContext; use crate::sql::executor::PhysicalPlan; use crate::sql::ColumnBinding; +use crate::stream::PullingExecutorStream; /// Build query pipeline from physical plan. /// If plan is distributed plan it will build_distributed_pipeline @@ -106,3 +113,27 @@ pub async fn build_distributed_pipeline( build_res.set_max_threads(settings.get_max_threads()? as usize); Ok(build_res) } + +pub struct ServiceQueryExecutor { + ctx: Arc, +} + +impl ServiceQueryExecutor { + pub fn new(ctx: Arc) -> Self { + Self { ctx } + } +} + +#[async_trait] +impl QuerySampleExecutor for ServiceQueryExecutor { + async fn execute_query(&self, plan: &PhysicalPlan) -> Result> { + let build_res = build_query_pipeline_without_render_result_set(&self.ctx, plan).await?; + let settings = ExecutorSettings::try_create(self.ctx.clone())?; + let pulling_executor = PipelinePullingExecutor::from_pipelines(build_res, settings)?; + self.ctx.set_executor(pulling_executor.get_inner())?; + + PullingExecutorStream::create(pulling_executor)? + .try_collect::>() + .await + } +} diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index b496271fdb6da..af9dc3af99543 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -848,7 +848,13 @@ impl DefaultSettings { desc: "Seed for random function", mode: SettingMode::Both, range: Some(SettingRange::Numeric(0..=1)), - }) + }), + ("dynamic_sample_time_budget_ms", DefaultSettingValue { + value: UserSettingValue::UInt64(0), + desc: "Time budget for dynamic sample in milliseconds", + mode: SettingMode::Both, + range: Some(SettingRange::Numeric(0..=u64::MAX)), + }), ]); Ok(Arc::new(DefaultSettings { diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index 0cc3b68c96cf7..07e2c1b0dbc49 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -701,4 +701,8 @@ impl Settings { pub fn get_random_function_seed(&self) -> Result { Ok(self.try_get_u64("random_function_seed")? == 1) } + + pub fn get_dynamic_sample_time_budget_ms(&self) -> Result { + self.try_get_u64("dynamic_sample_time_budget_ms") + } } diff --git a/src/query/sql/Cargo.toml b/src/query/sql/Cargo.toml index 2dfa30961a612..84596e0ed17d2 100644 --- a/src/query/sql/Cargo.toml +++ b/src/query/sql/Cargo.toml @@ -73,6 +73,7 @@ serde = { workspace = true } sha2 = { workspace = true } simsearch = "0.2" time = "0.3.14" +tokio = "1.39.2" url = "2.3.1" [lints] diff --git a/src/query/sql/src/planner/optimizer/dynamic_sample/dynamic_sample.rs b/src/query/sql/src/planner/optimizer/dynamic_sample/dynamic_sample.rs new file mode 100644 index 0000000000000..07e8abf91402b --- /dev/null +++ b/src/query/sql/src/planner/optimizer/dynamic_sample/dynamic_sample.rs @@ -0,0 +1,88 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; +use std::time::Duration; + +use databend_common_base::base::tokio::time::Instant; +use databend_common_catalog::table_context::TableContext; +use databend_common_exception::Result; + +use crate::optimizer::dynamic_sample::filter_selectivity_sample::filter_selectivity_sample; +use crate::optimizer::dynamic_sample::join_selectivity_sample::join_selectivity_sample; +use crate::optimizer::QuerySampleExecutor; +use crate::optimizer::RelExpr; +use crate::optimizer::SExpr; +use crate::optimizer::StatInfo; +use crate::plans::Operator; +use crate::plans::RelOperator; +use crate::MetadataRef; + +#[async_recursion::async_recursion(#[recursive::recursive])] +pub async fn dynamic_sample( + ctx: Arc, + metadata: MetadataRef, + s_expr: &SExpr, + sample_executor: Arc, +) -> Result> { + let time_budget = + Duration::from_millis(ctx.get_settings().get_dynamic_sample_time_budget_ms()?); + let start_time = Instant::now(); + + async fn sample_with_budget( + start_time: Instant, + time_budget: Duration, + fallback: F, + sample_fn: impl FnOnce() -> Fut, + ) -> Result> + where + F: FnOnce() -> Result>, + Fut: std::future::Future>>, + { + if time_budget.as_millis() == 0 || start_time.elapsed() > time_budget { + fallback() + } else { + let remaining_time = time_budget - start_time.elapsed(); + match tokio::time::timeout(remaining_time, sample_fn()).await { + Ok(Ok(result)) => Ok(result), + // The error contains the timeout error or the error from the sample_fn + Ok(Err(_)) | Err(_) => fallback(), + } + } + } + + match s_expr.plan() { + RelOperator::Filter(_) => { + sample_with_budget( + start_time, + time_budget, + || { + let rel_expr = RelExpr::with_s_expr(s_expr); + rel_expr.derive_cardinality() + }, + || filter_selectivity_sample(ctx, metadata, s_expr, sample_executor), + ) + .await + } + RelOperator::Join(_) => { + join_selectivity_sample(ctx, metadata, s_expr, sample_executor).await + } + RelOperator::Scan(_) => s_expr.plan().derive_stats(&RelExpr::with_s_expr(s_expr)), + // Todo: add more operators here, and support more query patterns. + _ => { + let rel_expr = RelExpr::with_s_expr(s_expr); + rel_expr.derive_cardinality() + } + } +} diff --git a/src/query/sql/src/planner/optimizer/dynamic_sample/filter_selectivity_sample.rs b/src/query/sql/src/planner/optimizer/dynamic_sample/filter_selectivity_sample.rs new file mode 100644 index 0000000000000..262bf06900f22 --- /dev/null +++ b/src/query/sql/src/planner/optimizer/dynamic_sample/filter_selectivity_sample.rs @@ -0,0 +1,134 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashSet; +use std::sync::Arc; + +use databend_common_ast::ast::Sample; +use databend_common_ast::ast::SampleConfig; +use databend_common_ast::ast::SampleLevel; +use databend_common_catalog::table_context::TableContext; +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_expression::types::DataType; +use databend_common_expression::types::NumberDataType; +use num_traits::ToPrimitive; + +use crate::executor::PhysicalPlanBuilder; +use crate::optimizer::statistics::CollectStatisticsOptimizer; +use crate::optimizer::QuerySampleExecutor; +use crate::optimizer::RelExpr; +use crate::optimizer::SExpr; +use crate::optimizer::SelectivityEstimator; +use crate::optimizer::StatInfo; +use crate::plans::Aggregate; +use crate::plans::AggregateFunction; +use crate::plans::AggregateMode; +use crate::plans::RelOperator; +use crate::plans::ScalarItem; +use crate::MetadataRef; +use crate::ScalarExpr; + +pub async fn filter_selectivity_sample( + ctx: Arc, + metadata: MetadataRef, + s_expr: &SExpr, + sample_executor: Arc, +) -> Result> { + // filter cardinality by sample will be called in `dphyp`, so we can ensure the filter is in complex query(contains not only one table) + // Because it's meaningless for filter cardinality by sample in single table query. + let child = s_expr.child(0)?; + let child_rel_expr = RelExpr::with_s_expr(child); + if let RelOperator::Scan(mut scan) = child.plan().clone() { + let num_rows = scan + .statistics + .table_stats + .as_ref() + .and_then(|s| s.num_rows) + .unwrap_or(0); + + // Calculate sample size (0.2% of total data) + let sample_size = (num_rows as f64 * 0.002).ceil(); + let mut new_s_expr = s_expr.clone(); + // If the table is too small, we don't need to sample. + if sample_size >= 10.0 { + scan.sample = Some(Sample { + sample_level: SampleLevel::ROW, + sample_conf: SampleConfig::RowsNum(sample_size), + }); + let new_child = SExpr::create_leaf(Arc::new(RelOperator::Scan(scan))); + new_s_expr = s_expr.replace_children(vec![Arc::new(new_child)]); + let collect_statistics_optimizer = + CollectStatisticsOptimizer::new(ctx.clone(), metadata.clone()); + new_s_expr = collect_statistics_optimizer.run(&new_s_expr).await?; + } + + new_s_expr = SExpr::create_unary( + Arc::new(create_count_aggregate(AggregateMode::Partial).into()), + Arc::new(new_s_expr), + ); + new_s_expr = SExpr::create_unary( + Arc::new(create_count_aggregate(AggregateMode::Final).into()), + Arc::new(new_s_expr), + ); + + let mut builder = PhysicalPlanBuilder::new(metadata.clone(), ctx.clone(), false); + let mut required = HashSet::new(); + required.insert(0); + let plan = builder.build(&new_s_expr, required).await?; + + let result = sample_executor.execute_query(&plan).await?; + if let Some(block) = result.first() { + if let Some(count) = block.get_last_column().as_number() { + if let Some(number_scalar) = count.index(0) { + // Compute and return selectivity + let selectivity = number_scalar.to_f64().to_f64().unwrap() / sample_size; + let mut statistics = child_rel_expr.derive_cardinality()?.statistics.clone(); + let mut sb = SelectivityEstimator::new(&mut statistics, HashSet::new()); + sb.update_other_statistic_by_selectivity(selectivity); + let stat_info = Arc::new(StatInfo { + cardinality: (selectivity * num_rows as f64).ceil(), + statistics, + }); + *s_expr.stat_info.lock().unwrap() = Some(stat_info.clone()); + return Ok(stat_info); + } + } + } + } + Err(ErrorCode::Internal( + "Failed to calculate filter selectivity by sample".to_string(), + )) +} + +fn create_count_aggregate(mode: AggregateMode) -> Aggregate { + Aggregate { + mode, + group_items: vec![], + aggregate_functions: vec![ScalarItem { + scalar: ScalarExpr::AggregateFunction(AggregateFunction { + func_name: "count".to_string(), + distinct: false, + params: vec![], + args: vec![], + return_type: Box::new(DataType::Number(NumberDataType::UInt64)), + display_name: "".to_string(), + }), + index: 0, + }], + from_distinct: false, + limit: None, + grouping_sets: None, + } +} diff --git a/src/query/sql/src/planner/optimizer/dynamic_sample/join_selectivity_sample.rs b/src/query/sql/src/planner/optimizer/dynamic_sample/join_selectivity_sample.rs new file mode 100644 index 0000000000000..b7fa2affc0e22 --- /dev/null +++ b/src/query/sql/src/planner/optimizer/dynamic_sample/join_selectivity_sample.rs @@ -0,0 +1,49 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use databend_common_catalog::table_context::TableContext; +use databend_common_exception::Result; + +use crate::optimizer::dynamic_sample::dynamic_sample; +use crate::optimizer::QuerySampleExecutor; +use crate::optimizer::SExpr; +use crate::optimizer::StatInfo; +use crate::plans::Join; +use crate::MetadataRef; + +pub async fn join_selectivity_sample( + ctx: Arc, + metadata: MetadataRef, + s_expr: &SExpr, + sample_executor: Arc, +) -> Result> { + let left_stat_info = dynamic_sample( + ctx.clone(), + metadata.clone(), + s_expr.child(0)?, + sample_executor.clone(), + ) + .await?; + let right_stat_info = dynamic_sample( + ctx.clone(), + metadata.clone(), + s_expr.child(1)?, + sample_executor.clone(), + ) + .await?; + let join = Join::try_from(s_expr.plan().clone())?; + join.derive_join_stats(left_stat_info, right_stat_info) +} diff --git a/src/query/sql/src/planner/optimizer/dynamic_sample/mod.rs b/src/query/sql/src/planner/optimizer/dynamic_sample/mod.rs new file mode 100644 index 0000000000000..0998554242d58 --- /dev/null +++ b/src/query/sql/src/planner/optimizer/dynamic_sample/mod.rs @@ -0,0 +1,22 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#[allow(clippy::module_inception)] +mod dynamic_sample; +mod filter_selectivity_sample; +mod join_selectivity_sample; +mod query_sample_executor; + +pub use dynamic_sample::dynamic_sample; +pub use query_sample_executor::QuerySampleExecutor; diff --git a/src/query/sql/src/planner/optimizer/dynamic_sample/query_sample_executor.rs b/src/query/sql/src/planner/optimizer/dynamic_sample/query_sample_executor.rs new file mode 100644 index 0000000000000..ca376b2fc6517 --- /dev/null +++ b/src/query/sql/src/planner/optimizer/dynamic_sample/query_sample_executor.rs @@ -0,0 +1,24 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use async_trait::async_trait; +use databend_common_exception::Result; +use databend_common_expression::DataBlock; + +use crate::executor::PhysicalPlan; + +#[async_trait] +pub trait QuerySampleExecutor: Send + Sync { + async fn execute_query(&self, plan: &PhysicalPlan) -> Result>; +} diff --git a/src/query/sql/src/planner/optimizer/filter/deduplicate_join_condition.rs b/src/query/sql/src/planner/optimizer/filter/deduplicate_join_condition.rs index 47eb35c9a438e..00fa66f05ca4c 100644 --- a/src/query/sql/src/planner/optimizer/filter/deduplicate_join_condition.rs +++ b/src/query/sql/src/planner/optimizer/filter/deduplicate_join_condition.rs @@ -95,9 +95,16 @@ impl DeduplicateJoinConditionOptimizer { pub fn deduplicate_children(&mut self, s_expr: &SExpr) -> Result { let mut children = Vec::with_capacity(s_expr.arity()); + let mut children_changed = false; for child in s_expr.children() { - let child = self.deduplicate(child)?; - children.push(Arc::new(child)); + let optimized_child = self.deduplicate(child)?; + if !optimized_child.eq(child) { + children_changed = true; + } + children.push(Arc::new(optimized_child)); + } + if !children_changed { + return Ok(s_expr.clone()); } Ok(s_expr.replace_children(children)) } diff --git a/src/query/sql/src/planner/optimizer/hyper_dp/dphyp.rs b/src/query/sql/src/planner/optimizer/hyper_dp/dphyp.rs index d3306b5173b35..1cb9762f61c59 100644 --- a/src/query/sql/src/planner/optimizer/hyper_dp/dphyp.rs +++ b/src/query/sql/src/planner/optimizer/hyper_dp/dphyp.rs @@ -16,8 +16,9 @@ use std::collections::HashMap; use std::collections::HashSet; use std::sync::Arc; -use databend_common_base::runtime::Thread; +use databend_common_base::runtime::spawn; use databend_common_catalog::table_context::TableContext; +use databend_common_exception::ErrorCode; use databend_common_exception::Result; use crate::optimizer::hyper_dp::join_node::JoinNode; @@ -27,6 +28,7 @@ use crate::optimizer::hyper_dp::query_graph::QueryGraph; use crate::optimizer::hyper_dp::util::intersect; use crate::optimizer::hyper_dp::util::union; use crate::optimizer::rule::TransformResult; +use crate::optimizer::QuerySampleExecutor; use crate::optimizer::RuleFactory; use crate::optimizer::RuleID; use crate::optimizer::SExpr; @@ -44,6 +46,7 @@ const RELATION_THRESHOLD: usize = 10; // See the paper for more details. pub struct DPhpy { ctx: Arc, + sample_executor: Option>, metadata: MetadataRef, join_relations: Vec, // base table index -> index of join_relations @@ -58,9 +61,14 @@ pub struct DPhpy { } impl DPhpy { - pub fn new(ctx: Arc, metadata: MetadataRef) -> Self { + pub fn new( + ctx: Arc, + metadata: MetadataRef, + sample_executor: Option>, + ) -> Self { Self { ctx, + sample_executor, metadata, join_relations: vec![], table_index_map: Default::default(), @@ -72,25 +80,31 @@ impl DPhpy { } } - fn new_children(&mut self, s_expr: &SExpr) -> Result { + async fn new_children(&mut self, s_expr: &SExpr) -> Result { // Parallel process children: start a new dphyp for each child. let ctx = self.ctx.clone(); let metadata = self.metadata.clone(); + let sample_executor = self.sample_executor.clone(); let left_expr = s_expr.children[0].clone(); - let left_res = Thread::spawn(move || { - let mut dphyp = DPhpy::new(ctx, metadata); - (dphyp.optimize(&left_expr), dphyp.table_index_map) + let left_res = spawn(async move { + let mut dphyp = DPhpy::new(ctx, metadata, sample_executor); + (dphyp.optimize(&left_expr).await, dphyp.table_index_map) }); let ctx = self.ctx.clone(); let metadata = self.metadata.clone(); + let sample_executor = self.sample_executor.clone(); let right_expr = s_expr.children[1].clone(); - let right_res = Thread::spawn(move || { - let mut dphyp = DPhpy::new(ctx, metadata); - (dphyp.optimize(&right_expr), dphyp.table_index_map) + let right_res = spawn(async move { + let mut dphyp = DPhpy::new(ctx, metadata, sample_executor); + (dphyp.optimize(&right_expr).await, dphyp.table_index_map) }); - let left_res = left_res.join()?; + let left_res = left_res + .await + .map_err(|e| ErrorCode::TokioError(format!("Cannot join tokio job, err: {:?}", e)))?; + let right_res = right_res + .await + .map_err(|e| ErrorCode::TokioError(format!("Cannot join tokio job, err: {:?}", e)))?; let (left_expr, _) = left_res.0?; - let right_res = right_res.join()?; let (right_expr, _) = right_res.0?; // Merge `table_index_map` of left and right into current `table_index_map`. @@ -105,7 +119,8 @@ impl DPhpy { } // Traverse the s_expr and get all base relations and join conditions - fn get_base_relations( + #[async_recursion::async_recursion(#[recursive::recursive])] + async fn get_base_relations( &mut self, s_expr: &SExpr, join_conditions: &mut Vec<(ScalarExpr, ScalarExpr)>, @@ -115,14 +130,19 @@ impl DPhpy { ) -> Result<(Arc, bool)> { if is_subquery { // If it's a subquery, start a new dphyp - let mut dphyp = DPhpy::new(self.ctx.clone(), self.metadata.clone()); - let (new_s_expr, _) = dphyp.optimize(s_expr)?; + let mut dphyp = DPhpy::new( + self.ctx.clone(), + self.metadata.clone(), + self.sample_executor.clone(), + ); + let (new_s_expr, _) = dphyp.optimize(s_expr).await?; // Merge `table_index_map` of subquery into current `table_index_map`. let relation_idx = self.join_relations.len() as IndexType; for table_index in dphyp.table_index_map.keys() { self.table_index_map.insert(*table_index, relation_idx); } - self.join_relations.push(JoinRelation::new(&new_s_expr)); + self.join_relations + .push(JoinRelation::new(&new_s_expr, self.sample_executor.clone())); return Ok((new_s_expr, true)); } @@ -132,9 +152,9 @@ impl DPhpy { // Check if relation contains filter, if exists, check if the filter in `filters` // If exists, remove it from `filters` self.check_filter(relation); - JoinRelation::new(relation) + JoinRelation::new(relation, self.sample_executor.clone()) } else { - JoinRelation::new(s_expr) + JoinRelation::new(s_expr, self.sample_executor.clone()) }; self.table_index_map .insert(op.table_index, self.join_relations.len() as IndexType); @@ -196,24 +216,29 @@ impl DPhpy { self.filters.insert(filter); } if !is_inner_join { - let new_s_expr = self.new_children(s_expr)?; - self.join_relations.push(JoinRelation::new(&new_s_expr)); + let new_s_expr = self.new_children(s_expr).await?; + self.join_relations + .push(JoinRelation::new(&new_s_expr, self.sample_executor.clone())); Ok((Arc::new(new_s_expr), true)) } else { - let left_res = self.get_base_relations( - s_expr.child(0)?, - join_conditions, - true, - None, - left_is_subquery, - )?; - let right_res = self.get_base_relations( - s_expr.child(1)?, - join_conditions, - true, - None, - right_is_subquery, - )?; + let left_res = self + .get_base_relations( + s_expr.child(0)?, + join_conditions, + true, + None, + left_is_subquery, + ) + .await?; + let right_res = self + .get_base_relations( + s_expr.child(1)?, + join_conditions, + true, + None, + right_is_subquery, + ) + .await?; let new_s_expr: Arc = Arc::new(s_expr.replace_children([left_res.0, right_res.0])); Ok((new_s_expr, left_res.1 && right_res.1)) @@ -232,30 +257,29 @@ impl DPhpy { if let RelOperator::Filter(op) = s_expr.plan.as_ref() { self.filters.insert(op.clone()); } - let (child, optimized) = self.get_base_relations( - s_expr.child(0)?, - join_conditions, - true, - Some(s_expr), - false, - )?; + let (child, optimized) = self + .get_base_relations( + s_expr.child(0)?, + join_conditions, + true, + Some(s_expr), + false, + ) + .await?; let new_s_expr = Arc::new(s_expr.replace_children([child])); Ok((new_s_expr, optimized)) } else { - let (child, optimized) = self.get_base_relations( - s_expr.child(0)?, - join_conditions, - false, - None, - false, - )?; + let (child, optimized) = self + .get_base_relations(s_expr.child(0)?, join_conditions, false, None, false) + .await?; let new_s_expr = Arc::new(s_expr.replace_children([child])); Ok((new_s_expr, optimized)) } } RelOperator::UnionAll(_) => { - let new_s_expr = self.new_children(s_expr)?; - self.join_relations.push(JoinRelation::new(&new_s_expr)); + let new_s_expr = self.new_children(s_expr).await?; + self.join_relations + .push(JoinRelation::new(&new_s_expr, self.sample_executor.clone())); Ok((Arc::new(new_s_expr), true)) } RelOperator::Exchange(_) => { @@ -279,12 +303,13 @@ impl DPhpy { // The input plan tree has been optimized by heuristic optimizer // So filters have pushed down join and cross join has been converted to inner join as possible as we can // The output plan will have optimal join order theoretically - pub fn optimize(&mut self, s_expr: &SExpr) -> Result<(Arc, bool)> { + pub async fn optimize(&mut self, s_expr: &SExpr) -> Result<(Arc, bool)> { // Firstly, we need to extract all join conditions and base tables // `join_condition` is pair, left is left_condition, right is right_condition let mut join_conditions = vec![]; - let (s_expr, optimized) = - self.get_base_relations(s_expr, &mut join_conditions, false, None, false)?; + let (s_expr, optimized) = self + .get_base_relations(s_expr, &mut join_conditions, false, None, false) + .await?; if !optimized { return Ok((s_expr, false)); } @@ -336,7 +361,7 @@ impl DPhpy { for (_, neighbors) in self.query_graph.cached_neighbors.iter_mut() { neighbors.sort(); } - self.join_reorder()?; + self.join_reorder().await?; // Get all join relations in `relation_set_tree` let all_relations = self .relation_set_tree @@ -352,12 +377,14 @@ impl DPhpy { // Adaptive Optimization: // If the if the query graph is simple enough, it uses dynamic programming to construct the optimal join tree, // if that is not possible within the given optimization budget, it switches to a greedy approach. - fn join_reorder(&mut self) -> Result<()> { + async fn join_reorder(&mut self) -> Result<()> { // Initial `dp_table` with plan for single relation for (idx, relation) in self.join_relations.iter().enumerate() { // Get nodes in `relation_set_tree` let nodes = self.relation_set_tree.get_relation_set_by_index(idx)?; - let ce = relation.cardinality()?; + let ce = relation + .cardinality(self.ctx.clone(), self.metadata.clone()) + .await?; let join = JoinNode { join_type: JoinType::Inner, leaves: Arc::new(nodes.clone()), @@ -371,30 +398,30 @@ impl DPhpy { } // First, try to use dynamic programming to find the optimal join order. - if !self.join_reorder_by_dphyp()? { + if !self.join_reorder_by_dphyp().await? { // When DPhpy takes too much time during join ordering, it is necessary to exit the dynamic programming algorithm // and switch to a greedy algorithm to minimizes the overall query time. - self.join_reorder_by_greedy()?; + self.join_reorder_by_greedy().await?; } Ok(()) } // Join reorder by dynamic programming algorithm. - fn join_reorder_by_dphyp(&mut self) -> Result { + async fn join_reorder_by_dphyp(&mut self) -> Result { // Choose all nodes as enumeration start node once (desc order) for idx in (0..self.join_relations.len()).rev() { // Get node from `relation_set_tree` let node = self.relation_set_tree.get_relation_set_by_index(idx)?; // Emit node as subgraph - if !self.emit_csg(&node)? { + if !self.emit_csg(&node).await? { return Ok(false); } // Create forbidden node set // Forbid node idx will less than current idx let forbidden_nodes = (0..idx).collect(); // Enlarge the subgraph recursively - if !self.enumerate_csg_rec(&node, &forbidden_nodes)? { + if !self.enumerate_csg_rec(&node, &forbidden_nodes).await? { return Ok(false); } } @@ -402,7 +429,7 @@ impl DPhpy { } // Join reorder by greedy algorithm. - fn join_reorder_by_greedy(&mut self) -> Result { + async fn join_reorder_by_greedy(&mut self) -> Result { // The Greedy Operator Ordering starts with a single relation and iteratively adds the relation that minimizes the cost of the join. // the algorithm terminates when all relations have been added, the cost of a join is the sum of the cardinalities of the node involved // in the tree, the algorithm is not guaranteed to find the optimal join tree, it is guaranteed to find it in polynomial time. @@ -428,8 +455,9 @@ impl DPhpy { if !join_conditions.is_empty() { // If left_relation set and right_relation set are connected, emit csg-cmp-pair and keep the // minimum cost pair in `dp_table`. - let cost = - self.emit_csg_cmp(left_relation, right_relation, join_conditions)?; + let cost = self + .emit_csg_cmp(left_relation, right_relation, join_conditions) + .await?; // Update the minimum cost pair. if cost < min_cost { min_cost = cost; @@ -448,7 +476,7 @@ impl DPhpy { let mut lowest_index = Vec::with_capacity(2); for (i, relation) in join_relations.iter().enumerate().take(2) { let mut join_node = self.dp_table.get(relation).unwrap().clone(); - let cardinality = join_node.cardinality(&self.join_relations)?; + let cardinality = join_node.cardinality(&self.join_relations).await?; lowest_cost.push(cardinality); lowest_index.push(i); } @@ -459,7 +487,7 @@ impl DPhpy { // Update the minimum cost relation set pair. for (i, relation) in join_relations.iter().enumerate().skip(2) { let mut join_node = self.dp_table.get(relation).unwrap().clone(); - let cardinality = join_node.cardinality(&self.join_relations)?; + let cardinality = join_node.cardinality(&self.join_relations).await?; if cardinality < lowest_cost[0] { lowest_cost[1] = cardinality; lowest_index[1] = i; @@ -480,7 +508,8 @@ impl DPhpy { &join_relations[left_idx], &join_relations[right_idx], vec![], - )?; + ) + .await?; } if left_idx > right_idx { std::mem::swap(&mut left_idx, &mut right_idx); @@ -494,7 +523,7 @@ impl DPhpy { // EmitCsg will take a non-empty subset of hyper_graph's nodes(V) which contains a connected subgraph. // Then it will possibly generate a connected complement which will combine `nodes` to be a csg-cmp-pair. - fn emit_csg(&mut self, nodes: &[IndexType]) -> Result { + async fn emit_csg(&mut self, nodes: &[IndexType]) -> Result { if nodes.len() == self.join_relations.len() { return Ok(true); } @@ -514,11 +543,16 @@ impl DPhpy { // Check if neighbor is connected with `nodes` let join_conditions = self.query_graph.is_connected(nodes, &neighbor_relations)?; if !join_conditions.is_empty() - && !self.try_emit_csg_cmp(nodes, &neighbor_relations, join_conditions)? + && !self + .try_emit_csg_cmp(nodes, &neighbor_relations, join_conditions) + .await? { return Ok(false); } - if !self.enumerate_cmp_rec(nodes, &neighbor_relations, &forbidden_nodes)? { + if !self + .enumerate_cmp_rec(nodes, &neighbor_relations, &forbidden_nodes) + .await? + { return Ok(false); } } @@ -527,7 +561,8 @@ impl DPhpy { // EnumerateCsgRec will extend the given `nodes`. // It'll consider each non-empty, proper subset of the neighborhood of nodes that are not forbidden. - fn enumerate_csg_rec( + #[async_recursion::async_recursion(#[recursive::recursive])] + async fn enumerate_csg_rec( &mut self, nodes: &[IndexType], forbidden_nodes: &HashSet, @@ -552,7 +587,7 @@ impl DPhpy { let merged_relation_set = union(nodes, &neighbor_relations); if self.dp_table.contains_key(&merged_relation_set) && merged_relation_set.len() > nodes.len() - && !self.emit_csg(&merged_relation_set)? + && !self.emit_csg(&merged_relation_set).await? { return Ok(false); } @@ -565,14 +600,17 @@ impl DPhpy { new_forbidden_nodes = forbidden_nodes.clone(); } new_forbidden_nodes.insert(*neighbor); - if !self.enumerate_csg_rec(&merged_sets[idx], &new_forbidden_nodes)? { + if !self + .enumerate_csg_rec(&merged_sets[idx], &new_forbidden_nodes) + .await? + { return Ok(false); } } Ok(true) } - fn try_emit_csg_cmp( + async fn try_emit_csg_cmp( &mut self, left: &[IndexType], right: &[IndexType], @@ -583,7 +621,7 @@ impl DPhpy { // otherwise it will take too much time match self.emit_count > EMIT_THRESHOLD { false => { - self.emit_csg_cmp(left, right, join_conditions)?; + self.emit_csg_cmp(left, right, join_conditions).await?; Ok(true) } true => Ok(false), @@ -591,7 +629,7 @@ impl DPhpy { } // EmitCsgCmp will join the optimal plan from left and right - fn emit_csg_cmp( + async fn emit_csg_cmp( &mut self, left: &[IndexType], right: &[IndexType], @@ -602,8 +640,8 @@ impl DPhpy { let parent_set = union(left, right); let mut left_join = self.dp_table.get(left).unwrap().clone(); let mut right_join = self.dp_table.get(right).unwrap().clone(); - let left_cardinality = left_join.cardinality(&self.join_relations)?; - let right_cardinality = right_join.cardinality(&self.join_relations)?; + let left_cardinality = left_join.cardinality(&self.join_relations).await?; + let right_cardinality = right_join.cardinality(&self.join_relations).await?; if left_cardinality < right_cardinality { for join_condition in join_conditions.iter_mut() { @@ -641,7 +679,7 @@ impl DPhpy { } }; if join_node.join_type == JoinType::Inner { - let cost = join_node.cardinality(&self.join_relations)? + let cost = join_node.cardinality(&self.join_relations).await? + join_node.children[0].cost + join_node.children[1].cost; join_node.set_cost(cost); @@ -668,7 +706,8 @@ impl DPhpy { // The second parameter is a set which is connected and must be extended until a valid csg-cmp-pair is reached. // Therefore, it considers the neighborhood of right. - fn enumerate_cmp_rec( + #[async_recursion::async_recursion(#[recursive::recursive])] + async fn enumerate_cmp_rec( &mut self, left: &[IndexType], right: &[IndexType], @@ -689,7 +728,9 @@ impl DPhpy { if merged_relation_set.len() > right.len() && self.dp_table.contains_key(&merged_relation_set) && !join_conditions.is_empty() - && !self.try_emit_csg_cmp(left, &merged_relation_set, join_conditions)? + && !self + .try_emit_csg_cmp(left, &merged_relation_set, join_conditions) + .await? { return Ok(false); } @@ -699,7 +740,10 @@ impl DPhpy { let mut new_forbidden_nodes = forbidden_nodes.clone(); for (idx, neighbor) in neighbor_set.iter().enumerate() { new_forbidden_nodes.insert(*neighbor); - if !self.enumerate_cmp_rec(left, &merged_sets[idx], &new_forbidden_nodes)? { + if !self + .enumerate_cmp_rec(left, &merged_sets[idx], &new_forbidden_nodes) + .await? + { return Ok(false); } } diff --git a/src/query/sql/src/planner/optimizer/hyper_dp/join_node.rs b/src/query/sql/src/planner/optimizer/hyper_dp/join_node.rs index 8674aed15f2b5..15ca730ae0818 100644 --- a/src/query/sql/src/planner/optimizer/hyper_dp/join_node.rs +++ b/src/query/sql/src/planner/optimizer/hyper_dp/join_node.rs @@ -26,7 +26,7 @@ use crate::plans::RelOperator; use crate::IndexType; use crate::ScalarExpr; -#[derive(Clone, Debug)] +#[derive(Clone)] pub struct JoinNode { pub join_type: JoinType, pub leaves: Arc>, @@ -39,7 +39,7 @@ pub struct JoinNode { } impl JoinNode { - pub fn cardinality(&mut self, relations: &[JoinRelation]) -> Result { + pub async fn cardinality(&mut self, relations: &[JoinRelation]) -> Result { if let Some(card) = self.cardinality { return Ok(card); } @@ -49,6 +49,7 @@ impl JoinNode { self.s_expr = Some(self.s_expr(relations)); self.s_expr.as_ref().unwrap() }; + let rel_expr = RelExpr::with_s_expr(s_expr); let card = rel_expr.derive_cardinality()?.cardinality; self.cardinality = Some(card); diff --git a/src/query/sql/src/planner/optimizer/hyper_dp/join_relation.rs b/src/query/sql/src/planner/optimizer/hyper_dp/join_relation.rs index 37e4b9fd09212..b3e7cc6dbfb70 100644 --- a/src/query/sql/src/planner/optimizer/hyper_dp/join_relation.rs +++ b/src/query/sql/src/planner/optimizer/hyper_dp/join_relation.rs @@ -13,22 +13,29 @@ // limitations under the License. use std::collections::HashSet; +use std::sync::Arc; use ahash::HashMap; +use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; +use crate::optimizer::dynamic_sample::dynamic_sample; +use crate::optimizer::QuerySampleExecutor; use crate::optimizer::RelExpr; use crate::optimizer::SExpr; use crate::IndexType; +use crate::MetadataRef; pub struct JoinRelation { s_expr: SExpr, + sample_executor: Option>, } impl JoinRelation { - pub fn new(s_expr: &SExpr) -> Self { + pub fn new(s_expr: &SExpr, sample_executor: Option>) -> Self { Self { s_expr: s_expr.clone(), + sample_executor, } } @@ -36,9 +43,25 @@ impl JoinRelation { self.s_expr.clone() } - pub fn cardinality(&self) -> Result { - let rel_expr = RelExpr::with_s_expr(&self.s_expr); - Ok(rel_expr.derive_cardinality()?.cardinality) + pub async fn cardinality( + &self, + ctx: Arc, + metadata: MetadataRef, + ) -> Result { + let card = if let Some(sample_executor) = &self.sample_executor { + dynamic_sample( + ctx.clone(), + metadata.clone(), + &self.s_expr, + sample_executor.clone(), + ) + .await? + .cardinality + } else { + let rel_expr = RelExpr::with_s_expr(&self.s_expr); + rel_expr.derive_cardinality()?.cardinality + }; + Ok(card) } } diff --git a/src/query/sql/src/planner/optimizer/join/single_to_inner.rs b/src/query/sql/src/planner/optimizer/join/single_to_inner.rs index 85545c1199528..6ae2cd0f34dd1 100644 --- a/src/query/sql/src/planner/optimizer/join/single_to_inner.rs +++ b/src/query/sql/src/planner/optimizer/join/single_to_inner.rs @@ -33,20 +33,28 @@ impl SingleToInnerOptimizer { } fn single_to_inner(s_expr: &SExpr) -> Result { - let s_expr = if let RelOperator::Join(join) = s_expr.plan.as_ref() { + let mut s_expr = if let RelOperator::Join(join) = s_expr.plan.as_ref() + && join.single_to_inner.is_some() + { let mut join = join.clone(); - if join.single_to_inner.is_some() { - join.join_type = JoinType::Inner; - } + join.join_type = JoinType::Inner; s_expr.replace_plan(Arc::new(RelOperator::Join(join))) } else { s_expr.clone() }; + let mut children_changed = false; let mut children = Vec::with_capacity(s_expr.arity()); for child in s_expr.children() { - let child = Self::single_to_inner(child)?; - children.push(Arc::new(child)); + let new_child = Self::single_to_inner(child)?; + if !new_child.eq(child) { + children_changed = true; + } + children.push(Arc::new(new_child)); } - Ok(s_expr.replace_children(children)) + if children_changed { + s_expr = s_expr.replace_children(children); + } + + Ok(s_expr) } } diff --git a/src/query/sql/src/planner/optimizer/mod.rs b/src/query/sql/src/planner/optimizer/mod.rs index 782618703938f..82582c63fa3dc 100644 --- a/src/query/sql/src/planner/optimizer/mod.rs +++ b/src/query/sql/src/planner/optimizer/mod.rs @@ -33,9 +33,12 @@ pub mod s_expr; mod statistics; mod util; +mod dynamic_sample; + pub use cascades::CascadesOptimizer; pub use decorrelate::FlattenInfo; pub use decorrelate::SubqueryRewriter; +pub use dynamic_sample::QuerySampleExecutor; pub use extract::PatternExtractor; pub use hyper_dp::DPhpy; pub use m_expr::MExpr; diff --git a/src/query/sql/src/planner/optimizer/optimizer.rs b/src/query/sql/src/planner/optimizer/optimizer.rs index 3bf7843228484..461a267b509c1 100644 --- a/src/query/sql/src/planner/optimizer/optimizer.rs +++ b/src/query/sql/src/planner/optimizer/optimizer.rs @@ -41,6 +41,7 @@ use crate::optimizer::join::SingleToInnerOptimizer; use crate::optimizer::rule::TransformResult; use crate::optimizer::statistics::CollectStatisticsOptimizer; use crate::optimizer::util::contains_local_table_scan; +use crate::optimizer::QuerySampleExecutor; use crate::optimizer::RuleFactory; use crate::optimizer::RuleID; use crate::optimizer::SExpr; @@ -66,6 +67,8 @@ pub struct OptimizerContext { enable_distributed_optimization: bool, enable_join_reorder: bool, enable_dphyp: bool, + #[educe(Debug(ignore))] + sample_executor: Option>, } impl OptimizerContext { @@ -77,6 +80,7 @@ impl OptimizerContext { enable_distributed_optimization: false, enable_join_reorder: true, enable_dphyp: true, + sample_executor: None, } } @@ -94,6 +98,14 @@ impl OptimizerContext { self.enable_dphyp = enable; self } + + pub fn with_sample_executor( + mut self, + sample_executor: Option>, + ) -> Self { + self.sample_executor = sample_executor; + self + } } /// A recursive optimizer that will apply the given rules recursively. @@ -118,10 +130,19 @@ impl<'a> RecursiveOptimizer<'a> { #[recursive::recursive] fn optimize_expression(&self, s_expr: &SExpr) -> Result { let mut optimized_children = Vec::with_capacity(s_expr.arity()); + let mut children_changed = false; for expr in s_expr.children() { - optimized_children.push(Arc::new(self.run(expr)?)); + let optimized_child = self.run(expr)?; + if !optimized_child.eq(expr) { + children_changed = true; + } + optimized_children.push(Arc::new(optimized_child)); + } + let mut optimized_expr = s_expr.clone(); + if children_changed { + optimized_expr = s_expr.replace_children(optimized_children); } - let optimized_expr = s_expr.replace_children(optimized_children); + let result = self.apply_transform_rules(&optimized_expr, self.rules)?; Ok(result) @@ -334,8 +355,13 @@ pub async fn optimize_query(opt_ctx: &mut OptimizerContext, mut s_expr: SExpr) - // Cost based optimization let mut dphyp_optimized = false; if opt_ctx.enable_dphyp && opt_ctx.enable_join_reorder { - let (dp_res, optimized) = - DPhpy::new(opt_ctx.table_ctx.clone(), opt_ctx.metadata.clone()).optimize(&s_expr)?; + let (dp_res, optimized) = DPhpy::new( + opt_ctx.table_ctx.clone(), + opt_ctx.metadata.clone(), + opt_ctx.sample_executor.clone(), + ) + .optimize(&s_expr) + .await?; if optimized { s_expr = (*dp_res).clone(); dphyp_optimized = true; @@ -422,8 +448,13 @@ async fn get_optimized_memo(opt_ctx: OptimizerContext, mut s_expr: SExpr) -> Res // Cost based optimization let mut dphyp_optimized = false; if opt_ctx.enable_dphyp && opt_ctx.enable_join_reorder { - let (dp_res, optimized) = - DPhpy::new(opt_ctx.table_ctx.clone(), opt_ctx.metadata.clone()).optimize(&s_expr)?; + let (dp_res, optimized) = DPhpy::new( + opt_ctx.table_ctx.clone(), + opt_ctx.metadata.clone(), + opt_ctx.sample_executor.clone(), + ) + .optimize(&s_expr) + .await?; if optimized { s_expr = (*dp_res).clone(); dphyp_optimized = true; diff --git a/src/query/sql/src/planner/planner.rs b/src/query/sql/src/planner/planner.rs index fe6ce7697dca1..93bc0722c2acf 100644 --- a/src/query/sql/src/planner/planner.rs +++ b/src/query/sql/src/planner/planner.rs @@ -39,6 +39,7 @@ use super::semantic::AggregateRewriter; use super::semantic::DistinctToGroupBy; use crate::optimizer::optimize; use crate::optimizer::OptimizerContext; +use crate::optimizer::QuerySampleExecutor; use crate::plans::Insert; use crate::plans::InsertInputSource; use crate::plans::Plan; @@ -53,6 +54,7 @@ const PROBE_INSERT_MAX_TOKENS: usize = 128 * 8; pub struct Planner { pub(crate) ctx: Arc, + pub(crate) sample_executor: Option>, } #[derive(Debug, Clone)] @@ -63,7 +65,20 @@ pub struct PlanExtras { impl Planner { pub fn new(ctx: Arc) -> Self { - Planner { ctx } + Planner { + ctx, + sample_executor: None, + } + } + + pub fn new_with_sample_executor( + ctx: Arc, + sample_executor: Arc, + ) -> Self { + Planner { + ctx, + sample_executor: Some(sample_executor), + } } #[async_backtrace::framed] @@ -196,7 +211,8 @@ impl Planner { let opt_ctx = OptimizerContext::new(self.ctx.clone(), metadata.clone()) .with_enable_distributed_optimization(!self.ctx.get_cluster().is_empty()) .with_enable_join_reorder(unsafe { !settings.get_disable_join_reorder()? }) - .with_enable_dphyp(settings.get_enable_dphyp()?); + .with_enable_dphyp(settings.get_enable_dphyp()?) + .with_sample_executor(self.sample_executor.clone()); let optimized_plan = optimize(opt_ctx, plan).await?; let result = (optimized_plan, PlanExtras { diff --git a/src/query/sql/src/planner/plans/join.rs b/src/query/sql/src/planner/plans/join.rs index cfc93c8463233..643d44058972c 100644 --- a/src/query/sql/src/planner/plans/join.rs +++ b/src/query/sql/src/planner/plans/join.rs @@ -419,6 +419,59 @@ impl Join { .iter() .any(|condition| condition.is_null_equal) } + + pub fn derive_join_stats( + &self, + left_stat_info: Arc, + right_stat_info: Arc, + ) -> Result> { + let (mut left_cardinality, mut left_statistics) = ( + left_stat_info.cardinality, + left_stat_info.statistics.clone(), + ); + let (mut right_cardinality, mut right_statistics) = ( + right_stat_info.cardinality, + right_stat_info.statistics.clone(), + ); + // Evaluating join cardinality using histograms. + // If histogram is None, will evaluate using NDV. + let inner_join_cardinality = self.inner_join_cardinality( + &mut left_cardinality, + &mut right_cardinality, + &mut left_statistics, + &mut right_statistics, + )?; + let cardinality = match self.join_type { + JoinType::Inner | JoinType::Cross => inner_join_cardinality, + JoinType::Left => f64::max(left_cardinality, inner_join_cardinality), + JoinType::Right => f64::max(right_cardinality, inner_join_cardinality), + JoinType::Full => { + f64::max(left_cardinality, inner_join_cardinality) + + f64::max(right_cardinality, inner_join_cardinality) + - inner_join_cardinality + } + JoinType::LeftSemi => f64::min(left_cardinality, inner_join_cardinality), + JoinType::RightSemi => f64::min(right_cardinality, inner_join_cardinality), + JoinType::LeftSingle | JoinType::RightMark | JoinType::LeftAnti => left_cardinality, + JoinType::RightSingle | JoinType::LeftMark | JoinType::RightAnti => right_cardinality, + }; + // Derive column statistics + let column_stats = if cardinality == 0.0 { + HashMap::new() + } else { + let mut column_stats = HashMap::new(); + column_stats.extend(left_statistics.column_stats); + column_stats.extend(right_statistics.column_stats); + column_stats + }; + Ok(Arc::new(StatInfo { + cardinality, + statistics: Statistics { + precise_cardinality: None, + column_stats, + }, + })) + } } impl Operator for Join { @@ -518,52 +571,8 @@ impl Operator for Join { fn derive_stats(&self, rel_expr: &RelExpr) -> Result> { let left_stat_info = rel_expr.derive_cardinality_child(0)?; let right_stat_info = rel_expr.derive_cardinality_child(1)?; - let (mut left_cardinality, mut left_statistics) = ( - left_stat_info.cardinality, - left_stat_info.statistics.clone(), - ); - let (mut right_cardinality, mut right_statistics) = ( - right_stat_info.cardinality, - right_stat_info.statistics.clone(), - ); - // Evaluating join cardinality using histograms. - // If histogram is None, will evaluate using NDV. - let inner_join_cardinality = self.inner_join_cardinality( - &mut left_cardinality, - &mut right_cardinality, - &mut left_statistics, - &mut right_statistics, - )?; - let cardinality = match self.join_type { - JoinType::Inner | JoinType::Cross => inner_join_cardinality, - JoinType::Left => f64::max(left_cardinality, inner_join_cardinality), - JoinType::Right => f64::max(right_cardinality, inner_join_cardinality), - JoinType::Full => { - f64::max(left_cardinality, inner_join_cardinality) - + f64::max(right_cardinality, inner_join_cardinality) - - inner_join_cardinality - } - JoinType::LeftSemi => f64::min(left_cardinality, inner_join_cardinality), - JoinType::RightSemi => f64::min(right_cardinality, inner_join_cardinality), - JoinType::LeftSingle | JoinType::RightMark | JoinType::LeftAnti => left_cardinality, - JoinType::RightSingle | JoinType::LeftMark | JoinType::RightAnti => right_cardinality, - }; - // Derive column statistics - let column_stats = if cardinality == 0.0 { - HashMap::new() - } else { - let mut column_stats = HashMap::new(); - column_stats.extend(left_statistics.column_stats); - column_stats.extend(right_statistics.column_stats); - column_stats - }; - Ok(Arc::new(StatInfo { - cardinality, - statistics: Statistics { - precise_cardinality: None, - column_stats, - }, - })) + let stat_info = self.derive_join_stats(left_stat_info, right_stat_info)?; + Ok(stat_info) } fn compute_required_prop_child( diff --git a/tests/sqllogictests/suites/tpch/sample.test b/tests/sqllogictests/suites/tpch/sample.test new file mode 100644 index 0000000000000..3d1fbafaabc43 --- /dev/null +++ b/tests/sqllogictests/suites/tpch/sample.test @@ -0,0 +1,407 @@ +statement ok +set sandbox_tenant = 'test_tenant'; + +statement ok +use tpch_test; + +# To make the test stable, we set the dynamic_sample_time_budget_ms to a large vale +statement ok +set dynamic_sample_time_budget_ms = 10000; + +# cbo will remove the `stat_info` computed by sample, so we need to disable cbo to see the estimate row info in explain +statement ok +set enable_cbo = 0; + +statement ok +set random_function_seed = 1; + +# estimated rows and output rows are similar for filter +# the test is flaky, so only put it there as a reference. +onlyif todo +query ok +EXPLAIN ANALYZE PARTIAL +SELECT + * +FROM + orders, + lineitem +WHERE + o_orderkey = l_orderkey + AND l_shipmode LIKE '%MAIL%'; +---- +HashJoin +├── estimated rows: 66953.00 +├── output rows: 85.95 thousand +├── Filter +│ ├── filters: [like(lineitem.l_shipmode (#23), '%MAIL%')] +│ ├── estimated rows: 66953.00 +│ ├── output rows: 85.95 thousand +│ └── TableScan +│ ├── table: default.tpch_test.lineitem +│ ├── estimated rows: 600572.00 +│ └── output rows: 600.57 thousand +└── TableScan + ├── table: default.tpch_test.orders + ├── estimated rows: 150000.00 + └── output rows: 150 thousand + +statement ok +set enable_cbo = 1; + +# use `join.test` to test dynamic sample framework without error +query I +select + c_custkey, count(o_orderkey) as c_count +from + customer + full outer join + orders + on c_custkey = o_custkey + and o_comment not like '%pending%deposits%' and c_custkey > 100 and c_custkey < 120 +group by + c_custkey +order by c_custkey + limit 20; +---- +1 0 +2 0 +3 0 +4 0 +5 0 +6 0 +7 0 +8 0 +9 0 +10 0 +11 0 +12 0 +13 0 +14 0 +15 0 +16 0 +17 0 +18 0 +19 0 +20 0 + + +query I +select + c_custkey +from + customer + inner join + orders + on c_custkey = o_custkey + and o_comment not like '%pending%deposits%' and c_custkey > 100 and c_custkey < 120 order by c_custkey limit 20; +---- +101 +101 +101 +101 +101 +101 +101 +101 +101 +101 +101 +101 +103 +103 +103 +103 +103 +103 +103 +103 + +query I +select + c_custkey, count(o_orderkey) as c_count +from + customer + left join + orders + on c_custkey = o_custkey + and o_comment not like '%pending%deposits%' and c_custkey > 100 and c_custkey < 120 +group by + c_custkey +order by c_custkey + limit 20; +---- +1 0 +2 0 +3 0 +4 0 +5 0 +6 0 +7 0 +8 0 +9 0 +10 0 +11 0 +12 0 +13 0 +14 0 +15 0 +16 0 +17 0 +18 0 +19 0 +20 0 + + +query I +select + c_custkey, count(o_orderkey) as c_count +from + customer + right join + orders + on c_custkey = o_custkey + and o_comment not like '%pending%deposits%' and c_custkey > 100 and c_custkey < 120 +group by + c_custkey +order by c_custkey +limit 20; +---- +101 12 +103 18 +104 7 +106 18 +107 12 +109 25 +110 9 +112 19 +113 17 +115 28 +116 4 +118 18 +119 10 +NULL 149803 + +query I +select + c_custkey +from + customer + left semi join + orders + on c_custkey = o_custkey + and o_comment not like '%pending%deposits%' and c_custkey > 100 and c_custkey < 120 +order by c_custkey + limit 20; +---- +101 +103 +104 +106 +107 +109 +110 +112 +113 +115 +116 +118 +119 + +query I +select + o_custkey +from + customer + right semi join + orders +on c_custkey = o_custkey + and o_comment not like '%pending%deposits%' and c_custkey > 100 and c_custkey < 120 +order by o_custkey + limit 20; +---- +101 +101 +101 +101 +101 +101 +101 +101 +101 +101 +101 +101 +103 +103 +103 +103 +103 +103 +103 +103 + +query I +select + c_custkey +from + customer + left anti join + orders +on c_custkey = o_custkey + and o_comment not like '%pending%deposits%' and c_custkey > 100 and c_custkey < 120 +order by c_custkey + limit 20; +---- +1 +2 +3 +4 +5 +6 +7 +8 +9 +10 +11 +12 +13 +14 +15 +16 +17 +18 +19 +20 + +query I +select + o_custkey +from + customer + right anti join + orders +on c_custkey = o_custkey + and o_comment not like '%pending%deposits%' and c_custkey > 100 and c_custkey < 120 +order by o_custkey + limit 20; +---- +1 +1 +1 +1 +1 +1 +1 +1 +1 +2 +2 +2 +2 +2 +2 +2 +2 +2 +2 +2 + +query I +select + o_comment +from + customer + cross join + orders +where o_comment not like '%pending%deposits%' and c_custkey > 100 and c_custkey < 120 +order by o_comment + limit 20; +---- + Tiresias about the blithely ironic a + Tiresias about the blithely ironic a + Tiresias about the blithely ironic a + Tiresias about the blithely ironic a + Tiresias about the blithely ironic a + Tiresias about the blithely ironic a + Tiresias about the blithely ironic a + Tiresias about the blithely ironic a + Tiresias about the blithely ironic a + Tiresias about the blithely ironic a + Tiresias about the blithely ironic a + Tiresias about the blithely ironic a + Tiresias about the blithely ironic a + Tiresias about the blithely ironic a + Tiresias about the blithely ironic a + Tiresias about the blithely ironic a + Tiresias about the blithely ironic a + Tiresias about the blithely ironic a + Tiresias about the blithely ironic a + Tiresias above the carefully ironic packages nag about the pend + +statement ok +set max_block_size = 1024; + + +# Test iejoin with large dataset +query I +select l_orderkey from (select * from lineitem order by l_orderkey limit 5000) as l, (select * from orders order by o_orderkey limit 5000) as o where l.l_orderkey > o.o_orderkey and l.l_partkey < o.o_custkey order by l_orderkey limit 10; +---- +3 +3 +3 +3 +3 +3 +3 +4 +5 +5 + +statement ok +set max_block_size = 65536; + +query I +select l_orderkey from (select * from lineitem order by l_orderkey limit 5000) as l, (select * from orders order by o_orderkey limit 5000) as o where l.l_orderkey > o.o_orderkey order by l_orderkey limit 10; +---- +2 +3 +3 +3 +3 +3 +3 +3 +3 +3 + +# LEFT OUTER / LEFT SINGEL / FULL +query I +select l_orderkey, o_orderdate, o_shippriority from lineitem left join orders on l_orderkey = o_orderkey and o_orderdate < to_date('1995-03-15') order by o_orderdate, l_orderkey limit 5; +---- +3271 1992-01-01 0 +3271 1992-01-01 0 +3271 1992-01-01 0 +3271 1992-01-01 0 +5607 1992-01-01 0 + +# LEFT ANTI +query I +select o_custkey from orders where not exists (select * from customer where substring(c_phone from 1 for 2) in ('13', '31', '23', '29', '30', '18', '17') and o_custkey = c_custkey) order by o_custkey limit 10; +---- +1 +1 +1 +1 +1 +1 +1 +1 +1 +4 + + +statement ok +set random_function_seed = 0; + +statement ok +set dynamic_sample_time_budget_ms = 0;