diff --git a/scripts/ci/ci-run-stateful-tests-cluster-s3.sh b/scripts/ci/ci-run-stateful-tests-cluster-s3.sh index 3189c64b944b6..d2d4f2dbd8d85 100755 --- a/scripts/ci/ci-run-stateful-tests-cluster-s3.sh +++ b/scripts/ci/ci-run-stateful-tests-cluster-s3.sh @@ -27,4 +27,4 @@ SCRIPT_PATH="$(cd "$(dirname "$0")" >/dev/null 2>&1 && pwd)" cd "$SCRIPT_PATH/../../tests" || exit echo "Starting databend-test" -./databend-test --mode 'cluster' --run-dir 1_stateful +./databend-test --mode 'cluster' --run-dir 1_stateful --skip '02_0001_create_table_with_external_location' diff --git a/src/query/catalog/src/table_context.rs b/src/query/catalog/src/table_context.rs index c8322d15977a6..5a44f879485eb 100644 --- a/src/query/catalog/src/table_context.rs +++ b/src/query/catalog/src/table_context.rs @@ -25,6 +25,7 @@ use common_datablocks::DataBlock; use common_exception::Result; use common_functions::scalars::FunctionContext; use common_io::prelude::FormatSettings; +use common_legacy_planners::PartInfoPtr; use common_legacy_planners::Partitions; use common_legacy_planners::ReadDataSourcePlan; use common_meta_types::UserInfo; @@ -64,9 +65,7 @@ pub trait TableContext: Send + Sync { fn get_write_progress_value(&self) -> ProgressValues; fn get_result_progress(&self) -> Arc; fn get_result_progress_value(&self) -> ProgressValues; - // Steal n partitions from the partition pool by the pipeline worker. - // This also can steal the partitions from distributed node. - fn try_get_partitions(&self, num: u64) -> Result; + fn try_get_part(&self) -> Option; // Update the context partition pool from the pipeline builder. fn try_set_partitions(&self, partitions: Partitions) -> Result<()>; fn attach_query_str(&self, kind: String, query: &str); diff --git a/src/query/pipeline/core/src/pipeline.rs b/src/query/pipeline/core/src/pipeline.rs index 74778c09e7fda..002acaeff8019 100644 --- a/src/query/pipeline/core/src/pipeline.rs +++ b/src/query/pipeline/core/src/pipeline.rs @@ -45,9 +45,12 @@ use crate::TransformPipeBuilder; pub struct Pipeline { max_threads: usize, pub pipes: Vec, + on_init: Option, on_finished: Option, } +pub type InitCallback = Arc Result<()> + Send + Sync + 'static>>; + pub type FinishedCallback = Arc) -> Result<()> + Send + Sync + 'static>>; @@ -56,6 +59,7 @@ impl Pipeline { Pipeline { max_threads: 0, pipes: Vec::new(), + on_init: None, on_finished: None, } } @@ -159,6 +163,21 @@ impl Pipeline { } } + pub fn set_on_init Result<()> + Send + Sync + 'static>(&mut self, f: F) { + if let Some(on_init) = &self.on_init { + let old_on_init = on_init.clone(); + + self.on_init = Some(Arc::new(Box::new(move || { + old_on_init()?; + f() + }))); + + return; + } + + self.on_init = Some(Arc::new(Box::new(f))); + } + pub fn set_on_finished) -> Result<()> + Send + Sync + 'static>( &mut self, f: F, @@ -177,6 +196,13 @@ impl Pipeline { self.on_finished = Some(Arc::new(Box::new(f))); } + pub fn take_on_init(&mut self) -> InitCallback { + match self.on_init.take() { + None => Arc::new(Box::new(|| Ok(()))), + Some(on_init) => on_init, + } + } + pub fn take_on_finished(&mut self) -> FinishedCallback { match self.on_finished.take() { None => Arc::new(Box::new(|_may_error| Ok(()))), diff --git a/src/query/service/src/api/rpc/exchange/exchange_manager.rs b/src/query/service/src/api/rpc/exchange/exchange_manager.rs index 5b217033aca86..e252734df1b4c 100644 --- a/src/query/service/src/api/rpc/exchange/exchange_manager.rs +++ b/src/query/service/src/api/rpc/exchange/exchange_manager.rs @@ -668,7 +668,8 @@ impl FragmentCoordinator { match &self.payload { FragmentPayload::PlanV2(plan) => { - let pipeline_builder = PipelineBuilderV2::create(ctx); + let pipeline_ctx = QueryContext::create_from(ctx); + let pipeline_builder = PipelineBuilderV2::create(pipeline_ctx); self.pipeline_build_res = Some(pipeline_builder.finalize(plan)?); } }; diff --git a/src/query/service/src/interpreters/interpreter_insert_v2.rs b/src/query/service/src/interpreters/interpreter_insert_v2.rs index cbcf720b07fcd..989d3554b4836 100644 --- a/src/query/service/src/interpreters/interpreter_insert_v2.rs +++ b/src/query/service/src/interpreters/interpreter_insert_v2.rs @@ -164,7 +164,6 @@ impl Interpreter for InsertInterpreterV2 { _ => unreachable!(), }; - table1.get_table_info(); let catalog = self.plan.catalog.clone(); let is_distributed_plan = select_plan.is_distributed_plan(); diff --git a/src/query/service/src/pipelines/executor/pipeline_complete_executor.rs b/src/query/service/src/pipelines/executor/pipeline_complete_executor.rs index 6bc0f65e96a61..a16870275971d 100644 --- a/src/query/service/src/pipelines/executor/pipeline_complete_executor.rs +++ b/src/query/service/src/pipelines/executor/pipeline_complete_executor.rs @@ -14,6 +14,7 @@ use std::sync::Arc; +use common_base::base::Thread; use common_exception::ErrorCode; use common_exception::Result; @@ -67,7 +68,13 @@ impl PipelineCompleteExecutor { } pub fn execute(&self) -> Result<()> { - self.executor.execute() + let executor = self.executor.clone(); + let execute_thread = + Thread::named_spawn(Some(String::from("CompleteExecutor")), move || { + executor.execute() + }); + + execute_thread.join().flatten() } } diff --git a/src/query/service/src/pipelines/executor/pipeline_executor.rs b/src/query/service/src/pipelines/executor/pipeline_executor.rs index de40d7ee913d8..48767c505501c 100644 --- a/src/query/service/src/pipelines/executor/pipeline_executor.rs +++ b/src/query/service/src/pipelines/executor/pipeline_executor.rs @@ -36,6 +36,8 @@ use crate::pipelines::executor::executor_worker_context::ExecutorWorkerContext; use crate::pipelines::executor::ExecutorSettings; use crate::pipelines::pipeline::Pipeline; +pub type InitCallback = Arc Result<()> + Send + Sync + 'static>>; + pub type FinishedCallback = Arc) -> Result<()> + Send + Sync + 'static>>; @@ -45,6 +47,7 @@ pub struct PipelineExecutor { workers_condvar: Arc, pub async_runtime: Arc, pub global_tasks_queue: Arc, + on_init_callback: InitCallback, on_finished_callback: FinishedCallback, settings: ExecutorSettings, finished_notify: Notify, @@ -57,12 +60,14 @@ impl PipelineExecutor { settings: ExecutorSettings, ) -> Result> { let threads_num = pipeline.get_max_threads(); + let on_init_callback = pipeline.take_on_init(); let on_finished_callback = pipeline.take_on_finished(); assert_ne!(threads_num, 0, "Pipeline max threads cannot equals zero."); Self::try_create( RunningGraph::create(pipeline)?, threads_num, + on_init_callback, on_finished_callback, settings, ) @@ -82,6 +87,11 @@ impl PipelineExecutor { .max() .unwrap_or(0); + let on_init_callbacks = pipelines + .iter_mut() + .map(|x| x.take_on_init()) + .collect::>(); + let on_finished_callbacks = pipelines .iter_mut() .map(|x| x.take_on_finished()) @@ -91,6 +101,13 @@ impl PipelineExecutor { Self::try_create( RunningGraph::from_pipelines(pipelines)?, threads_num, + Arc::new(Box::new(move || { + for on_init_callback in &on_init_callbacks { + on_init_callback()?; + } + + Ok(()) + })), Arc::new(Box::new(move |may_error| { for on_finished_callback in &on_finished_callbacks { on_finished_callback(may_error)?; @@ -105,33 +122,25 @@ impl PipelineExecutor { fn try_create( graph: RunningGraph, threads_num: usize, + on_init_callback: InitCallback, on_finished_callback: FinishedCallback, settings: ExecutorSettings, ) -> Result> { - unsafe { - let workers_condvar = WorkersCondvar::create(threads_num); - let global_tasks_queue = ExecutorTasksQueue::create(threads_num); - - let mut init_schedule_queue = graph.init_schedule_queue()?; + let workers_condvar = WorkersCondvar::create(threads_num); + let global_tasks_queue = ExecutorTasksQueue::create(threads_num); - let mut tasks = VecDeque::new(); - while let Some(task) = init_schedule_queue.pop_task() { - tasks.push_back(task); - } - global_tasks_queue.init_tasks(tasks); - - Ok(Arc::new(PipelineExecutor { - graph, - threads_num, - workers_condvar, - global_tasks_queue, - on_finished_callback, - async_runtime: GlobalIORuntime::instance(), - settings, - finished_notify: Notify::new(), - finished_error: Mutex::new(None), - })) - } + Ok(Arc::new(PipelineExecutor { + graph, + threads_num, + workers_condvar, + global_tasks_queue, + on_init_callback, + on_finished_callback, + async_runtime: GlobalIORuntime::instance(), + settings, + finished_notify: Notify::new(), + finished_error: Mutex::new(None), + })) } pub fn finish(&self, cause: Option) { @@ -145,6 +154,8 @@ impl PipelineExecutor { } pub fn execute(self: &Arc) -> Result<()> { + self.init()?; + self.start_executor_daemon()?; let mut thread_join_handles = self.execute_threads(self.threads_num); @@ -171,6 +182,24 @@ impl PipelineExecutor { Ok(()) } + fn init(self: &Arc) -> Result<()> { + unsafe { + // TODO: the on init callback cannot be killed. + (self.on_init_callback)()?; + + let mut init_schedule_queue = self.graph.init_schedule_queue()?; + + let mut tasks = VecDeque::new(); + while let Some(task) = init_schedule_queue.pop_task() { + tasks.push_back(task); + } + + self.global_tasks_queue.init_tasks(tasks); + + Ok(()) + } + } + fn start_executor_daemon(self: &Arc) -> Result<()> { if !self.settings.max_execute_time.is_zero() { let this = self.clone(); diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index eefc3336ee62b..d5bb73f538e6a 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -68,7 +68,6 @@ pub struct QueryContext { version: String, partition_queue: Arc>>, shared: Arc, - precommit_blocks: Arc>>, fragment_id: Arc, } @@ -84,7 +83,6 @@ impl QueryContext { partition_queue: Arc::new(RwLock::new(VecDeque::new())), version: format!("DatabendQuery {}", *crate::version::DATABEND_COMMIT_VERSION), shared, - precommit_blocks: Arc::new(RwLock::new(Vec::new())), fragment_id: Arc::new(AtomicUsize::new(0)), }) } @@ -225,20 +223,11 @@ impl TableContext for QueryContext { fn get_result_progress_value(&self) -> ProgressValues { self.shared.result_progress.as_ref().get_values() } - // Steal n partitions from the partition pool by the pipeline worker. - // This also can steal the partitions from distributed node. - fn try_get_partitions(&self, num: u64) -> Result { - let mut partitions = vec![]; - for _ in 0..num { - match self.partition_queue.write().pop_back() { - None => break, - Some(partition) => { - partitions.push(partition); - } - } - } - Ok(partitions) + + fn try_get_part(&self) -> Option { + self.partition_queue.write().pop_front() } + // Update the context partition pool from the pipeline builder. fn try_set_partitions(&self, partitions: Partitions) -> Result<()> { let mut partition_queue = self.partition_queue.write(); @@ -328,15 +317,10 @@ impl TableContext for QueryContext { self.shared.dal_ctx.as_ref() } fn push_precommit_block(&self, block: DataBlock) { - let mut blocks = self.precommit_blocks.write(); - blocks.push(block); + self.shared.push_precommit_block(block) } fn consume_precommit_blocks(&self) -> Vec { - let mut blocks = self.precommit_blocks.write(); - - let mut swaped_precommit_blocks = vec![]; - std::mem::swap(&mut *blocks, &mut swaped_precommit_blocks); - swaped_precommit_blocks + self.shared.consume_precommit_blocks() } fn try_get_function_context(&self) -> Result { let tz = self.get_settings().get_timezone()?; diff --git a/src/query/service/src/sessions/query_ctx_shared.rs b/src/query/service/src/sessions/query_ctx_shared.rs index 52ca0f031c827..33676d0a9cd2f 100644 --- a/src/query/service/src/sessions/query_ctx_shared.rs +++ b/src/query/service/src/sessions/query_ctx_shared.rs @@ -22,6 +22,7 @@ use std::sync::Weak; use common_base::base::Progress; use common_base::base::Runtime; use common_contexts::DalContext; +use common_datablocks::DataBlock; use common_exception::ErrorCode; use common_exception::Result; use common_meta_types::UserInfo; @@ -79,6 +80,7 @@ pub struct QueryContextShared { pub(in crate::sessions) catalog_manager: Arc, pub(in crate::sessions) storage_operator: StorageOperator, pub(in crate::sessions) executor: Arc>>, + pub(in crate::sessions) precommit_blocks: Arc>>, } impl QueryContextShared { @@ -108,6 +110,7 @@ impl QueryContextShared { auth_manager: AuthMgr::create(config).await?, affect: Arc::new(Mutex::new(None)), executor: Arc::new(RwLock::new(Weak::new())), + precommit_blocks: Arc::new(RwLock::new(vec![])), })) } @@ -297,4 +300,17 @@ impl QueryContextShared { let mut executor = self.executor.write(); *executor = weak_ptr; } + + pub fn push_precommit_block(&self, block: DataBlock) { + let mut blocks = self.precommit_blocks.write(); + blocks.push(block); + } + + pub fn consume_precommit_blocks(&self) -> Vec { + let mut blocks = self.precommit_blocks.write(); + + let mut swaped_precommit_blocks = vec![]; + std::mem::swap(&mut *blocks, &mut swaped_precommit_blocks); + swaped_precommit_blocks + } } diff --git a/src/query/service/src/storages/result/result_table_source.rs b/src/query/service/src/storages/result/result_table_source.rs index 98c1b757a58ce..6ea95cd26f708 100644 --- a/src/query/service/src/storages/result/result_table_source.rs +++ b/src/query/service/src/storages/result/result_table_source.rs @@ -31,7 +31,7 @@ use crate::storages::fuse::io::BlockReader; use crate::storages::result::result_table_source::State::Generated; enum State { - ReadData(PartInfoPtr), + ReadData(Option), Deserialize(PartInfoPtr, Vec<(usize, Vec)>), Generated(Option, DataBlock), Finish, @@ -52,23 +52,13 @@ impl ResultTableSource { block_reader: Arc, ) -> Result { let scan_progress = ctx.get_scan_progress(); - let mut partitions = ctx.try_get_partitions(1)?; - match partitions.is_empty() { - true => Ok(ProcessorPtr::create(Box::new(ResultTableSource { - ctx, - output, - block_reader, - scan_progress, - state: State::Finish, - }))), - false => Ok(ProcessorPtr::create(Box::new(ResultTableSource { - ctx, - output, - block_reader, - scan_progress, - state: State::ReadData(partitions.remove(0)), - }))), - } + Ok(ProcessorPtr::create(Box::new(ResultTableSource { + ctx, + output, + block_reader, + scan_progress, + state: State::ReadData(None), + }))) } } @@ -83,6 +73,13 @@ impl Processor for ResultTableSource { } fn event(&mut self) -> Result { + if matches!(self.state, State::ReadData(None)) { + self.state = match self.ctx.try_get_part() { + None => State::Finish, + Some(part) => State::ReadData(Some(part)), + } + } + if matches!(self.state, State::Finish) { self.output.finish(); return Ok(Event::Finished); @@ -100,7 +97,7 @@ impl Processor for ResultTableSource { if let Generated(part, data_block) = std::mem::replace(&mut self.state, State::Finish) { self.state = match part { None => State::Finish, - Some(part) => State::ReadData(part), + Some(part) => State::ReadData(Some(part)), }; self.output.push_data(Ok(data_block)); @@ -120,7 +117,7 @@ impl Processor for ResultTableSource { match std::mem::replace(&mut self.state, State::Finish) { State::Deserialize(part, chunks) => { let data_block = self.block_reader.deserialize(part, chunks)?; - let mut partitions = self.ctx.try_get_partitions(1)?; + let new_part = self.ctx.try_get_part(); let progress_values = ProgressValues { rows: data_block.num_rows(), @@ -128,10 +125,7 @@ impl Processor for ResultTableSource { }; self.scan_progress.incr(&progress_values); - self.state = match partitions.is_empty() { - true => State::Generated(None, data_block), - false => State::Generated(Some(partitions.remove(0)), data_block), - }; + self.state = State::Generated(new_part, data_block); Ok(()) } _ => Err(ErrorCode::LogicalError("It's a bug.")), @@ -140,7 +134,7 @@ impl Processor for ResultTableSource { async fn async_process(&mut self) -> Result<()> { match std::mem::replace(&mut self.state, State::Finish) { - State::ReadData(part) => { + State::ReadData(Some(part)) => { let chunks = self.block_reader.read_columns_data(part.clone()).await?; self.state = State::Deserialize(part, chunks); Ok(()) diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs index 81fab945336ec..3094a0e028f7f 100644 --- a/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs @@ -114,9 +114,8 @@ async fn test_recluster_mutator_block_select() -> Result<()> { let schema = DataSchemaRef::new(DataSchema::empty()); let ctx: Arc = ctx.clone(); - let block_metas = BlockPruner::new(base_snapshot.clone()) - .prune(&ctx, schema, &None) - .await?; + let segments_location = base_snapshot.segments.clone(); + let block_metas = BlockPruner::prune(&ctx, schema, &None, segments_location).await?; let mut blocks_map = BTreeMap::new(); blocks_map.insert(0, block_metas); diff --git a/src/query/service/tests/it/storages/fuse/pruning.rs b/src/query/service/tests/it/storages/fuse/pruning.rs index b05078d7ce5bf..8b2fc1c1d59fa 100644 --- a/src/query/service/tests/it/storages/fuse/pruning.rs +++ b/src/query/service/tests/it/storages/fuse/pruning.rs @@ -48,8 +48,8 @@ async fn apply_block_pruning( ctx: Arc, ) -> Result> { let ctx: Arc = ctx; - BlockPruner::new(table_snapshot) - .prune(&ctx, schema, push_down) + let segment_locs = table_snapshot.segments.clone(); + BlockPruner::prune(&ctx, schema, push_down, segment_locs) .await .map(|v| v.into_iter().map(|(_, v)| v).collect()) } diff --git a/src/query/service/tests/it/storages/fuse/table.rs b/src/query/service/tests/it/storages/fuse/table.rs index 788ccc5b5d922..c95a59b0398f6 100644 --- a/src/query/service/tests/it/storages/fuse/table.rs +++ b/src/query/service/tests/it/storages/fuse/table.rs @@ -72,13 +72,13 @@ async fn test_fuse_table_normal_case() -> Result<()> { let (stats, parts) = table.read_partitions(ctx.clone(), None).await?; assert_eq!(stats.read_rows, num_blocks * rows_per_block); - ctx.try_set_partitions(parts)?; + ctx.try_set_partitions(parts.clone())?; let stream = table .read(ctx.clone(), &ReadDataSourcePlan { catalog: "default".to_owned(), source_info: SourceInfo::TableSource(Default::default()), scan_fields: None, - parts: Default::default(), + parts, statistics: Default::default(), description: "".to_string(), tbl_args: None, @@ -131,14 +131,14 @@ async fn test_fuse_table_normal_case() -> Result<()> { assert_eq!(stats.read_rows, num_blocks * rows_per_block); // inject partitions to current ctx - ctx.try_set_partitions(parts)?; + ctx.try_set_partitions(parts.clone())?; let stream = table .read(ctx.clone(), &ReadDataSourcePlan { catalog: "default".to_owned(), source_info: SourceInfo::TableSource(Default::default()), scan_fields: None, - parts: Default::default(), + parts, statistics: Default::default(), description: "".to_string(), tbl_args: None, diff --git a/src/query/service/tests/it/storages/testdata/system-tables.txt b/src/query/service/tests/it/storages/testdata/system-tables.txt index 0c4897ab01bd6..3f1ca0dca8e59 100644 --- a/src/query/service/tests/it/storages/testdata/system-tables.txt +++ b/src/query/service/tests/it/storages/testdata/system-tables.txt @@ -292,6 +292,7 @@ DB.Table: 'system'.'settings', Table: settings-table_id:1, ver:0, Engine: System +--------------------------------+------------+------------+---------+-----------------------------------------------------------------------------------------------------+--------+ | enable_async_insert | 0 | 0 | SESSION | Whether the client open async insert mode, default value: 0. | UInt64 | | enable_cbo | 1 | 1 | SESSION | If enable cost based optimization, default value: 1. | UInt64 | +| enable_distributed_eval_index | 1 | 1 | SESSION | If enable distributed eval index, default value: 1 | UInt64 | | enable_new_processor_framework | 1 | 1 | SESSION | Enable new processor framework if value != 0, default value: 1. | UInt64 | | enable_planner_v2 | 1 | 1 | SESSION | Enable planner v2 by setting this variable to 1, default value: 1. | UInt64 | | flight_client_timeout | 60 | 60 | SESSION | Max duration the flight client request is allowed to take in seconds. By default, it is 60 seconds. | UInt64 | diff --git a/src/query/settings/src/lib.rs b/src/query/settings/src/lib.rs index dd3a13b71340a..a4fd139c2f685 100644 --- a/src/query/settings/src/lib.rs +++ b/src/query/settings/src/lib.rs @@ -336,6 +336,16 @@ impl Settings { desc: "The maximum query execution time. it means no limit if the value is zero. default value: 0.", possible_values: None, }, + SettingValue { + default_value: UserSettingValue::UInt64(1), + user_setting: UserSetting::create( + "enable_distributed_eval_index", + UserSettingValue::UInt64(1), + ), + level: ScopeLevel::Session, + desc: "If enable distributed eval index, default value: 1", + possible_values: None, + }, ]; let settings: Arc>> = @@ -517,6 +527,18 @@ impl Settings { self.try_set_u64(KEY, v, false) } + pub fn get_enable_distributed_eval_index(&self) -> Result { + static KEY: &str = "enable_distributed_eval_index"; + let v = self.try_get_u64(KEY)?; + Ok(v != 0) + } + + pub fn set_enable_distributed_eval_index(&self, val: bool) -> Result<()> { + static KEY: &str = "enable_distributed_eval_index"; + let v = u64::from(val); + self.try_set_u64(KEY, v, false) + } + pub fn get_enable_cbo(&self) -> Result { static KEY: &str = "enable_cbo"; let v = self.try_get_u64(KEY)?; diff --git a/src/query/storages/fuse/src/fuse_lazy_part.rs b/src/query/storages/fuse/src/fuse_lazy_part.rs new file mode 100644 index 0000000000000..c6f50015fd971 --- /dev/null +++ b/src/query/storages/fuse/src/fuse_lazy_part.rs @@ -0,0 +1,45 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::sync::Arc; + +use common_fuse_meta::meta::Location; +use common_legacy_planners::PartInfo; +use common_legacy_planners::PartInfoPtr; + +#[derive(serde::Serialize, serde::Deserialize, PartialEq, Eq)] +pub struct FuseLazyPartInfo { + pub segment_location: Location, +} + +#[typetag::serde(name = "fuse_lazy")] +impl PartInfo for FuseLazyPartInfo { + fn as_any(&self) -> &dyn Any { + self + } + + fn equals(&self, info: &Box) -> bool { + match info.as_any().downcast_ref::() { + None => false, + Some(other) => self == other, + } + } +} + +impl FuseLazyPartInfo { + pub fn create(segment_location: Location) -> PartInfoPtr { + Arc::new(Box::new(FuseLazyPartInfo { segment_location })) + } +} diff --git a/src/query/storages/fuse/src/lib.rs b/src/query/storages/fuse/src/lib.rs index 0979f31842e38..909a91ab7cf47 100644 --- a/src/query/storages/fuse/src/lib.rs +++ b/src/query/storages/fuse/src/lib.rs @@ -17,6 +17,7 @@ #![deny(unused_crate_dependencies)] mod constants; +mod fuse_lazy_part; mod fuse_part; mod fuse_table; pub mod io; @@ -24,6 +25,7 @@ pub mod operations; pub mod pruning; pub mod statistics; pub mod table_functions; + use common_catalog::table::NavigationPoint; use common_catalog::table::Table; use common_catalog::table::TableStatistics; diff --git a/src/query/storages/fuse/src/operations/compact.rs b/src/query/storages/fuse/src/operations/compact.rs index a7591da4928af..01c09d7074c3c 100644 --- a/src/query/storages/fuse/src/operations/compact.rs +++ b/src/query/storages/fuse/src/operations/compact.rs @@ -69,8 +69,9 @@ impl FuseTable { } let partitions_total = mutator.partitions_total(); - let (statistics, parts) = self.read_partitions_with_metas( + let (statistics, parts) = Self::read_partitions_with_metas( ctx.clone(), + self.table_info.schema(), None, mutator.selected_blocks(), partitions_total, diff --git a/src/query/storages/fuse/src/operations/delete.rs b/src/query/storages/fuse/src/operations/delete.rs index 701d6bb363192..b1b42ccee26cf 100644 --- a/src/query/storages/fuse/src/operations/delete.rs +++ b/src/query/storages/fuse/src/operations/delete.rs @@ -95,9 +95,8 @@ impl FuseTable { order_by: vec![], }; let push_downs = Some(extras); - let block_metas = BlockPruner::new(snapshot.clone()) - .prune(&ctx, schema, &push_downs) - .await?; + let segments_location = snapshot.segments.clone(); + let block_metas = BlockPruner::prune(&ctx, schema, &push_downs, segments_location).await?; // delete block one by one. // this could be executed in a distributed manner (till new planner, pipeline settled down) diff --git a/src/query/storages/fuse/src/operations/read.rs b/src/query/storages/fuse/src/operations/read.rs index ee0e7eb42f5d1..91ee70fb60c3f 100644 --- a/src/query/storages/fuse/src/operations/read.rs +++ b/src/query/storages/fuse/src/operations/read.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use common_base::base::Progress; use common_base::base::ProgressValues; +use common_base::base::Runtime; use common_catalog::table_context::TableContext; use common_datablocks::DataBlock; use common_datavalues::ColumnRef; @@ -36,6 +37,7 @@ use common_pipeline_core::Pipeline; use common_pipeline_core::SourcePipeBuilder; use common_pipeline_transforms::processors::ExpressionExecutor; +use crate::fuse_lazy_part::FuseLazyPartInfo; use crate::io::BlockReader; use crate::operations::read::State::Generated; use crate::FuseTable; @@ -76,6 +78,45 @@ impl FuseTable { plan: &ReadDataSourcePlan, pipeline: &mut Pipeline, ) -> Result<()> { + let mut lazy_init_segments = Vec::with_capacity(plan.parts.len()); + + for part in &plan.parts { + if let Some(lazy_part_info) = part.as_any().downcast_ref::() { + lazy_init_segments.push(lazy_part_info.segment_location.clone()); + } + } + + if !lazy_init_segments.is_empty() { + let table_info = self.table_info.clone(); + let push_downs = plan.push_downs.clone(); + let query_ctx = ctx.clone(); + + // TODO: need refactor + pipeline.set_on_init(move || { + let table_info = table_info.clone(); + let ctx = query_ctx.clone(); + let push_downs = push_downs.clone(); + let lazy_init_segments = lazy_init_segments.clone(); + + let partitions = Runtime::with_worker_threads(2, None)?.block_on(async move { + let (_statistics, partitions) = FuseTable::prune_snapshot_blocks( + ctx, + push_downs, + table_info, + lazy_init_segments, + 0, + ) + .await?; + + Result::<_, ErrorCode>::Ok(partitions) + })?; + + query_ctx.try_set_partitions(partitions)?; + + Ok(()) + }); + } + let table_schema = self.table_info.schema(); let projection = self.projection_of_push_downs(&plan.push_downs); let output_reader = self.create_block_reader(projection)?; // for deserialize output blocks @@ -117,9 +158,7 @@ impl FuseTable { let prewhere_filter = Arc::new(prewhere_filter); let remain_reader = Arc::new(remain_reader); - let parts_len = plan.parts.len(); let max_threads = ctx.get_settings().get_max_threads()? as usize; - let max_threads = std::cmp::min(parts_len, max_threads); let mut source_builder = SourcePipeBuilder::create(); @@ -151,7 +190,7 @@ struct PrewhereData { } enum State { - ReadDataPrewhere(PartInfoPtr), + ReadDataPrewhere(Option), ReadDataRemain(PartInfoPtr, PrewhereData), PrewhereFilter(PartInfoPtr, DataChunks), Deserialize(PartInfoPtr, DataChunks, Option), @@ -181,54 +220,30 @@ impl FuseTableSource { remain_reader: Arc>, ) -> Result { let scan_progress = ctx.get_scan_progress(); - let mut partitions = ctx.try_get_partitions(1)?; - match partitions.is_empty() { - true => Ok(ProcessorPtr::create(Box::new(FuseTableSource { - ctx, - output, - scan_progress, - state: State::Finish, - output_reader, - prewhere_reader, - prewhere_filter, - remain_reader, - }))), - false => Ok(ProcessorPtr::create(Box::new(FuseTableSource { - ctx, - output, - scan_progress, - state: State::ReadDataPrewhere(partitions.remove(0)), - output_reader, - prewhere_reader, - prewhere_filter, - remain_reader, - }))), - } + Ok(ProcessorPtr::create(Box::new(FuseTableSource { + ctx, + output, + scan_progress, + state: State::ReadDataPrewhere(None), + output_reader, + prewhere_reader, + prewhere_filter, + remain_reader, + }))) } fn generate_one_block(&mut self, block: DataBlock) -> Result<()> { - let mut partitions = self.ctx.try_get_partitions(1)?; + let new_part = self.ctx.try_get_part(); // resort and prune columns let block = block.resort(self.output_reader.schema())?; - self.state = match partitions.is_empty() { - true => State::Generated(None, block), - false => State::Generated(Some(partitions.remove(0)), block), - }; + self.state = State::Generated(new_part, block); Ok(()) } fn generate_one_empty_block(&mut self) -> Result<()> { - let mut partitions = self.ctx.try_get_partitions(1)?; - self.state = match partitions.is_empty() { - true => State::Generated( - None, - DataBlock::empty_with_schema(self.output_reader.schema()), - ), - false => State::Generated( - Some(partitions.remove(0)), - DataBlock::empty_with_schema(self.output_reader.schema()), - ), - }; + let schema = self.output_reader.schema(); + let new_part = self.ctx.try_get_part(); + self.state = Generated(new_part, DataBlock::empty_with_schema(schema)); Ok(()) } } @@ -244,6 +259,13 @@ impl Processor for FuseTableSource { } fn event(&mut self) -> Result { + if matches!(self.state, State::ReadDataPrewhere(None)) { + self.state = match self.ctx.try_get_part() { + None => State::Finish, + Some(part) => State::ReadDataPrewhere(Some(part)), + } + } + if matches!(self.state, State::Finish) { self.output.finish(); return Ok(Event::Finished); @@ -261,7 +283,7 @@ impl Processor for FuseTableSource { if let Generated(part, data_block) = std::mem::replace(&mut self.state, State::Finish) { self.state = match part { None => State::Finish, - Some(part) => State::ReadDataPrewhere(part), + Some(part) => State::ReadDataPrewhere(Some(part)), }; self.output.push_data(Ok(data_block)); @@ -369,7 +391,7 @@ impl Processor for FuseTableSource { async fn async_process(&mut self) -> Result<()> { match std::mem::replace(&mut self.state, State::Finish) { - State::ReadDataPrewhere(part) => { + State::ReadDataPrewhere(Some(part)) => { let chunks = self.prewhere_reader.read_columns_data(part.clone()).await?; if self.prewhere_filter.is_some() { diff --git a/src/query/storages/fuse/src/operations/read_partitions.rs b/src/query/storages/fuse/src/operations/read_partitions.rs index 88ca12281e243..2dd497e9b9367 100644 --- a/src/query/storages/fuse/src/operations/read_partitions.rs +++ b/src/query/storages/fuse/src/operations/read_partitions.rs @@ -16,15 +16,19 @@ use std::collections::HashMap; use std::sync::Arc; use common_catalog::table_context::TableContext; +use common_datavalues::DataSchemaRef; use common_exception::Result; use common_fuse_meta::meta::BlockMeta; +use common_fuse_meta::meta::Location; use common_fuse_meta::meta::TableSnapshot; use common_legacy_planners::Extras; use common_legacy_planners::PartInfoPtr; use common_legacy_planners::Partitions; use common_legacy_planners::Projection; use common_legacy_planners::Statistics; +use common_meta_app::schema::TableInfo; +use crate::fuse_lazy_part::FuseLazyPartInfo; use crate::fuse_part::ColumnLeaves; use crate::fuse_part::ColumnMeta; use crate::fuse_part::FusePartInfo; @@ -45,28 +49,65 @@ impl FuseTable { return Ok(result); } - let block_metas = BlockPruner::new(snapshot.clone()) - .prune(&ctx, self.table_info.schema(), &push_downs) - .await? - .into_iter() - .map(|(_, v)| v) - .collect::>(); - let partitions_total = snapshot.summary.block_count as usize; - self.read_partitions_with_metas(ctx, push_downs, block_metas, partitions_total) + let settings = ctx.get_settings(); + + if settings.get_enable_distributed_eval_index()? { + let mut segments = Vec::with_capacity(snapshot.segments.len()); + + for segment_location in &snapshot.segments { + segments.push(FuseLazyPartInfo::create(segment_location.clone())) + } + + return Ok(( + Statistics::new_estimated( + snapshot.summary.row_count as usize, + snapshot.summary.compressed_byte_size as usize, + snapshot.segments.len(), + snapshot.segments.len(), + ), + segments, + )); + } + + let table_info = self.table_info.clone(); + let segments_location = snapshot.segments.clone(); + let summary = snapshot.summary.block_count as usize; + Self::prune_snapshot_blocks( + ctx.clone(), + push_downs.clone(), + table_info, + segments_location, + summary, + ) + .await } None => Ok((Statistics::default(), vec![])), } } + pub async fn prune_snapshot_blocks( + ctx: Arc, + push_downs: Option, + table_info: TableInfo, + segments_location: Vec, + summary: usize, + ) -> Result<(Statistics, Partitions)> { + let block_metas = + BlockPruner::prune(&ctx, table_info.schema(), &push_downs, segments_location) + .await? + .into_iter() + .map(|(_, v)| v) + .collect::>(); + Self::read_partitions_with_metas(ctx, table_info.schema(), push_downs, block_metas, summary) + } + pub fn read_partitions_with_metas( - &self, ctx: Arc, + schema: DataSchemaRef, push_downs: Option, block_metas: Vec, partitions_total: usize, ) -> Result<(Statistics, Partitions)> { - let schema = self.table_info.schema(); - let arrow_schema = schema.to_arrow(); let column_leaves = ColumnLeaves::new_from_schema(&arrow_schema); diff --git a/src/query/storages/fuse/src/operations/recluster.rs b/src/query/storages/fuse/src/operations/recluster.rs index 133ed33bbe0fc..4ffd40dc9fb5d 100644 --- a/src/query/storages/fuse/src/operations/recluster.rs +++ b/src/query/storages/fuse/src/operations/recluster.rs @@ -61,9 +61,8 @@ impl FuseTable { }; let schema = self.table_info.schema(); - let block_metas = BlockPruner::new(snapshot.clone()) - .prune(&ctx, schema, &push_downs) - .await?; + let segments_locations = snapshot.segments.clone(); + let block_metas = BlockPruner::prune(&ctx, schema, &push_downs, segments_locations).await?; let default_cluster_key_id = self.cluster_key_meta.clone().unwrap().0; let mut blocks_map: BTreeMap> = BTreeMap::new(); @@ -107,8 +106,9 @@ impl FuseTable { } let partitions_total = mutator.partitions_total(); - let (statistics, parts) = self.read_partitions_with_metas( + let (statistics, parts) = Self::read_partitions_with_metas( ctx.clone(), + self.table_info.schema(), None, mutator.selected_blocks(), partitions_total, diff --git a/src/query/storages/fuse/src/pruning/pruning_executor.rs b/src/query/storages/fuse/src/pruning/pruning_executor.rs index 5f60fd5e5bedc..e7b0a794e755d 100644 --- a/src/query/storages/fuse/src/pruning/pruning_executor.rs +++ b/src/query/storages/fuse/src/pruning/pruning_executor.rs @@ -23,7 +23,6 @@ use common_exception::ErrorCode; use common_exception::Result; use common_fuse_meta::meta::BlockMeta; use common_fuse_meta::meta::Location; -use common_fuse_meta::meta::TableSnapshot; use common_legacy_planners::Extras; use futures::future; use futures::StreamExt; @@ -36,41 +35,33 @@ use crate::pruning::limiter; use crate::pruning::range_pruner; use crate::pruning::topn_pruner; -pub struct BlockPruner { - table_snapshot: Arc, -} +pub struct BlockPruner; const FUTURE_BUFFER_SIZE: usize = 10; impl BlockPruner { - pub fn new(table_snapshot: Arc) -> Self { - Self { table_snapshot } - } - // Sync version of method `prune` // // Please note that it will take a significant period of time to prune a large table, and // thread that calls this method will be blocked. - #[tracing::instrument(level = "debug", skip(self, schema, ctx), fields(ctx.id = ctx.get_id().as_str()))] + #[tracing::instrument(level = "debug", skip(schema, ctx), fields(ctx.id = ctx.get_id().as_str()))] pub fn sync_prune( - &self, ctx: &Arc, schema: DataSchemaRef, push_down: &Option, + segment_locs: Vec, ) -> Result> { - futures::executor::block_on(self.prune(ctx, schema, push_down)) + futures::executor::block_on(Self::prune(ctx, schema, push_down, segment_locs)) } // prune blocks by utilizing min_max index and filter, according to the pushdowns - #[tracing::instrument(level = "debug", skip(self, schema, ctx), fields(ctx.id = ctx.get_id().as_str()))] + #[tracing::instrument(level = "debug", skip(schema, ctx), fields(ctx.id = ctx.get_id().as_str()))] pub async fn prune( - &self, ctx: &Arc, schema: DataSchemaRef, push_down: &Option, + segment_locs: Vec, ) -> Result> { - let segment_locs = self.table_snapshot.segments.clone(); - if segment_locs.is_empty() { return Ok(vec![]); }; diff --git a/src/query/storages/hive/src/hive_table_source.rs b/src/query/storages/hive/src/hive_table_source.rs index 4e4ffba6e9adf..1eb25eea942e5 100644 --- a/src/query/storages/hive/src/hive_table_source.rs +++ b/src/query/storages/hive/src/hive_table_source.rs @@ -37,7 +37,7 @@ use crate::HivePartInfo; enum State { /// Read parquet file meta data /// IO bound - ReadMeta(PartInfoPtr), + ReadMeta(Option), /// Read blocks from data groups (without deserialization) /// IO bound @@ -70,33 +70,21 @@ impl HiveTableSource { delay: usize, ) -> Result { let scan_progress = ctx.get_scan_progress(); - let mut partitions = ctx.try_get_partitions(1)?; - match partitions.is_empty() { - true => Ok(ProcessorPtr::create(Box::new(HiveTableSource { - ctx, - output, - block_reader, - scan_progress, - state: State::Finish, - delay, - }))), - false => Ok(ProcessorPtr::create(Box::new(HiveTableSource { - ctx, - output, - block_reader, - scan_progress, - state: State::ReadMeta(partitions.remove(0)), - delay, - }))), - } + Ok(ProcessorPtr::create(Box::new(HiveTableSource { + ctx, + output, + block_reader, + scan_progress, + state: State::ReadMeta(None), + delay, + }))) } fn try_get_partitions(&mut self) -> Result<()> { - let partitions = self.ctx.try_get_partitions(1)?; - match partitions.is_empty() { - true => self.state = State::Finish, - false => { - self.state = State::ReadMeta(partitions[0].clone()); + match self.ctx.try_get_part() { + None => self.state = State::Finish, + Some(part_info) => { + self.state = State::ReadMeta(Some(part_info)); } } @@ -115,6 +103,15 @@ impl Processor for HiveTableSource { } fn event(&mut self) -> Result { + if matches!(self.state, State::ReadMeta(None)) { + match self.ctx.try_get_part() { + None => self.state = State::Finish, + Some(part_info) => { + self.state = State::ReadMeta(Some(part_info)); + } + } + } + if self.output.is_finished() { return Ok(Event::Finished); } @@ -184,7 +181,7 @@ impl Processor for HiveTableSource { async fn async_process(&mut self) -> Result<()> { match std::mem::replace(&mut self.state, State::Finish) { - State::ReadMeta(part) => { + State::ReadMeta(Some(part)) => { if self.delay > 0 { sleep(Duration::from_millis(self.delay as u64)).await; tracing::debug!("sleep for {}ms", self.delay); diff --git a/src/query/storages/preludes/src/memory/memory_table_stream.rs b/src/query/storages/preludes/src/memory/memory_table_stream.rs index 6b83c3e41a4f8..e11f676b0e1d7 100644 --- a/src/query/storages/preludes/src/memory/memory_table_stream.rs +++ b/src/query/storages/preludes/src/memory/memory_table_stream.rs @@ -49,18 +49,19 @@ impl MemoryTableStream { fn try_get_one_block(&mut self) -> Result> { if (self.block_index as usize) == self.block_ranges.len() { - let partitions = self.ctx.try_get_partitions(1)?; - if partitions.is_empty() { + let part_info = self.ctx.try_get_part(); + if part_info.is_none() { return Ok(None); } - let mut block_ranges = Vec::with_capacity(partitions.len()); + let mut block_ranges = vec![]; - for part in partitions { + if let Some(part) = part_info { let memory_part = MemoryPartInfo::from_part(&part)?; let s: Vec = (memory_part.part_start..memory_part.part_end).collect(); block_ranges.extend_from_slice(&s); } + self.block_ranges = block_ranges; self.block_index = 0; } diff --git a/tests/logictest/suites/mode/standalone/explain/fold_count.test b/tests/logictest/suites/mode/standalone/explain/fold_count.test index f0ceb43b7a6a1..27b9981add790 100644 --- a/tests/logictest/suites/mode/standalone/explain/fold_count.test +++ b/tests/logictest/suites/mode/standalone/explain/fold_count.test @@ -41,10 +41,10 @@ EvalScalar ├── aggregate functions: [count()] └── TableScan ├── table: default.default.t - ├── read rows: 1000 - ├── read bytes: 4028 + ├── read rows: 1001 + ├── read bytes: 4434 ├── partitions total: 2 - ├── partitions scanned: 1 + ├── partitions scanned: 2 └── push downs: [filters: [(number > 10)], limit: NONE] statement ok diff --git a/tests/logictest/suites/mode/standalone/explain/join_reorder/chain.test b/tests/logictest/suites/mode/standalone/explain/join_reorder/chain.test index 1e71baee96ca9..f6eabd6634ae0 100644 --- a/tests/logictest/suites/mode/standalone/explain/join_reorder/chain.test +++ b/tests/logictest/suites/mode/standalone/explain/join_reorder/chain.test @@ -30,21 +30,21 @@ HashJoin │ ├── TableScan(Build) │ │ ├── table: default.join_reorder.t │ │ ├── read rows: 1 -│ │ ├── read bytes: 31 +│ │ ├── read bytes: 197 │ │ ├── partitions total: 1 │ │ ├── partitions scanned: 1 │ │ └── push downs: [filters: [], limit: NONE] │ └── TableScan(Probe) │ ├── table: default.join_reorder.t1 │ ├── read rows: 10 -│ ├── read bytes: 68 +│ ├── read bytes: 243 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] └── TableScan(Probe) ├── table: default.join_reorder.t2 ├── read rows: 100 - ├── read bytes: 431 + ├── read bytes: 610 ├── partitions total: 1 ├── partitions scanned: 1 └── push downs: [filters: [], limit: NONE] @@ -61,7 +61,7 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t1 │ ├── read rows: 10 -│ ├── read bytes: 68 +│ ├── read bytes: 243 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] @@ -73,14 +73,14 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t │ ├── read rows: 1 - │ ├── read bytes: 31 + │ ├── read bytes: 197 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] └── TableScan(Probe) ├── table: default.join_reorder.t2 ├── read rows: 100 - ├── read bytes: 431 + ├── read bytes: 610 ├── partitions total: 1 ├── partitions scanned: 1 └── push downs: [filters: [], limit: NONE] @@ -102,21 +102,21 @@ HashJoin │ ├── TableScan(Build) │ │ ├── table: default.join_reorder.t │ │ ├── read rows: 1 -│ │ ├── read bytes: 31 +│ │ ├── read bytes: 197 │ │ ├── partitions total: 1 │ │ ├── partitions scanned: 1 │ │ └── push downs: [filters: [], limit: NONE] │ └── TableScan(Probe) │ ├── table: default.join_reorder.t1 │ ├── read rows: 10 -│ ├── read bytes: 68 +│ ├── read bytes: 243 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] └── TableScan(Probe) ├── table: default.join_reorder.t2 ├── read rows: 100 - ├── read bytes: 431 + ├── read bytes: 610 ├── partitions total: 1 ├── partitions scanned: 1 └── push downs: [filters: [], limit: NONE] @@ -133,7 +133,7 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t │ ├── read rows: 1 -│ ├── read bytes: 31 +│ ├── read bytes: 197 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] @@ -145,14 +145,14 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t1 │ ├── read rows: 10 - │ ├── read bytes: 68 + │ ├── read bytes: 243 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] └── TableScan(Probe) ├── table: default.join_reorder.t2 ├── read rows: 100 - ├── read bytes: 431 + ├── read bytes: 610 ├── partitions total: 1 ├── partitions scanned: 1 └── push downs: [filters: [], limit: NONE] @@ -169,7 +169,7 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t │ ├── read rows: 1 -│ ├── read bytes: 31 +│ ├── read bytes: 197 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] @@ -181,14 +181,14 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t1 │ ├── read rows: 10 - │ ├── read bytes: 68 + │ ├── read bytes: 243 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] └── TableScan(Probe) ├── table: default.join_reorder.t2 ├── read rows: 100 - ├── read bytes: 431 + ├── read bytes: 610 ├── partitions total: 1 ├── partitions scanned: 1 └── push downs: [filters: [], limit: NONE] @@ -205,7 +205,7 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t1 │ ├── read rows: 10 -│ ├── read bytes: 68 +│ ├── read bytes: 243 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] @@ -217,14 +217,14 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t │ ├── read rows: 1 - │ ├── read bytes: 31 + │ ├── read bytes: 197 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] └── TableScan(Probe) ├── table: default.join_reorder.t2 ├── read rows: 100 - ├── read bytes: 431 + ├── read bytes: 610 ├── partitions total: 1 ├── partitions scanned: 1 └── push downs: [filters: [], limit: NONE] diff --git a/tests/logictest/suites/mode/standalone/explain/join_reorder/cycles.test b/tests/logictest/suites/mode/standalone/explain/join_reorder/cycles.test index a370801c2013e..6c7daf652c2a3 100644 --- a/tests/logictest/suites/mode/standalone/explain/join_reorder/cycles.test +++ b/tests/logictest/suites/mode/standalone/explain/join_reorder/cycles.test @@ -30,21 +30,21 @@ HashJoin │ ├── TableScan(Build) │ │ ├── table: default.join_reorder.t │ │ ├── read rows: 1 -│ │ ├── read bytes: 31 +│ │ ├── read bytes: 197 │ │ ├── partitions total: 1 │ │ ├── partitions scanned: 1 │ │ └── push downs: [filters: [], limit: NONE] │ └── TableScan(Probe) │ ├── table: default.join_reorder.t1 │ ├── read rows: 10 -│ ├── read bytes: 68 +│ ├── read bytes: 243 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] └── TableScan(Probe) ├── table: default.join_reorder.t2 ├── read rows: 100 - ├── read bytes: 431 + ├── read bytes: 610 ├── partitions total: 1 ├── partitions scanned: 1 └── push downs: [filters: [], limit: NONE] @@ -61,7 +61,7 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t1 │ ├── read rows: 10 -│ ├── read bytes: 68 +│ ├── read bytes: 243 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] @@ -73,14 +73,14 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t │ ├── read rows: 1 - │ ├── read bytes: 31 + │ ├── read bytes: 197 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] └── TableScan(Probe) ├── table: default.join_reorder.t2 ├── read rows: 100 - ├── read bytes: 431 + ├── read bytes: 610 ├── partitions total: 1 ├── partitions scanned: 1 └── push downs: [filters: [], limit: NONE] @@ -102,21 +102,21 @@ HashJoin │ ├── TableScan(Build) │ │ ├── table: default.join_reorder.t │ │ ├── read rows: 1 -│ │ ├── read bytes: 31 +│ │ ├── read bytes: 197 │ │ ├── partitions total: 1 │ │ ├── partitions scanned: 1 │ │ └── push downs: [filters: [], limit: NONE] │ └── TableScan(Probe) │ ├── table: default.join_reorder.t1 │ ├── read rows: 10 -│ ├── read bytes: 68 +│ ├── read bytes: 243 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] └── TableScan(Probe) ├── table: default.join_reorder.t2 ├── read rows: 100 - ├── read bytes: 431 + ├── read bytes: 610 ├── partitions total: 1 ├── partitions scanned: 1 └── push downs: [filters: [], limit: NONE] @@ -133,7 +133,7 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t │ ├── read rows: 1 -│ ├── read bytes: 31 +│ ├── read bytes: 197 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] @@ -145,14 +145,14 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t1 │ ├── read rows: 10 - │ ├── read bytes: 68 + │ ├── read bytes: 243 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] └── TableScan(Probe) ├── table: default.join_reorder.t2 ├── read rows: 100 - ├── read bytes: 431 + ├── read bytes: 610 ├── partitions total: 1 ├── partitions scanned: 1 └── push downs: [filters: [], limit: NONE] @@ -169,7 +169,7 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t │ ├── read rows: 1 -│ ├── read bytes: 31 +│ ├── read bytes: 197 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] @@ -181,14 +181,14 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t1 │ ├── read rows: 10 - │ ├── read bytes: 68 + │ ├── read bytes: 243 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] └── TableScan(Probe) ├── table: default.join_reorder.t2 ├── read rows: 100 - ├── read bytes: 431 + ├── read bytes: 610 ├── partitions total: 1 ├── partitions scanned: 1 └── push downs: [filters: [], limit: NONE] @@ -205,7 +205,7 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t1 │ ├── read rows: 10 -│ ├── read bytes: 68 +│ ├── read bytes: 243 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] @@ -217,14 +217,14 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t │ ├── read rows: 1 - │ ├── read bytes: 31 + │ ├── read bytes: 197 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] └── TableScan(Probe) ├── table: default.join_reorder.t2 ├── read rows: 100 - ├── read bytes: 431 + ├── read bytes: 610 ├── partitions total: 1 ├── partitions scanned: 1 └── push downs: [filters: [], limit: NONE] diff --git a/tests/logictest/suites/mode/standalone/explain/join_reorder/star.test b/tests/logictest/suites/mode/standalone/explain/join_reorder/star.test index 5971e7fc065c2..6d38f6c1bc8b5 100644 --- a/tests/logictest/suites/mode/standalone/explain/join_reorder/star.test +++ b/tests/logictest/suites/mode/standalone/explain/join_reorder/star.test @@ -30,21 +30,21 @@ HashJoin │ ├── TableScan(Build) │ │ ├── table: default.join_reorder.t │ │ ├── read rows: 1 -│ │ ├── read bytes: 31 +│ │ ├── read bytes: 197 │ │ ├── partitions total: 1 │ │ ├── partitions scanned: 1 │ │ └── push downs: [filters: [], limit: NONE] │ └── TableScan(Probe) │ ├── table: default.join_reorder.t1 │ ├── read rows: 10 -│ ├── read bytes: 68 +│ ├── read bytes: 243 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] └── TableScan(Probe) ├── table: default.join_reorder.t2 ├── read rows: 100 - ├── read bytes: 431 + ├── read bytes: 610 ├── partitions total: 1 ├── partitions scanned: 1 └── push downs: [filters: [], limit: NONE] @@ -61,7 +61,7 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t1 │ ├── read rows: 10 -│ ├── read bytes: 68 +│ ├── read bytes: 243 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] @@ -73,14 +73,14 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t │ ├── read rows: 1 - │ ├── read bytes: 31 + │ ├── read bytes: 197 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] └── TableScan(Probe) ├── table: default.join_reorder.t2 ├── read rows: 100 - ├── read bytes: 431 + ├── read bytes: 610 ├── partitions total: 1 ├── partitions scanned: 1 └── push downs: [filters: [], limit: NONE] @@ -102,21 +102,21 @@ HashJoin │ ├── TableScan(Build) │ │ ├── table: default.join_reorder.t │ │ ├── read rows: 1 -│ │ ├── read bytes: 31 +│ │ ├── read bytes: 197 │ │ ├── partitions total: 1 │ │ ├── partitions scanned: 1 │ │ └── push downs: [filters: [], limit: NONE] │ └── TableScan(Probe) │ ├── table: default.join_reorder.t1 │ ├── read rows: 10 -│ ├── read bytes: 68 +│ ├── read bytes: 243 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] └── TableScan(Probe) ├── table: default.join_reorder.t2 ├── read rows: 100 - ├── read bytes: 431 + ├── read bytes: 610 ├── partitions total: 1 ├── partitions scanned: 1 └── push downs: [filters: [], limit: NONE] @@ -133,7 +133,7 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t │ ├── read rows: 1 -│ ├── read bytes: 31 +│ ├── read bytes: 197 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] @@ -145,14 +145,14 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t1 │ ├── read rows: 10 - │ ├── read bytes: 68 + │ ├── read bytes: 243 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] └── TableScan(Probe) ├── table: default.join_reorder.t2 ├── read rows: 100 - ├── read bytes: 431 + ├── read bytes: 610 ├── partitions total: 1 ├── partitions scanned: 1 └── push downs: [filters: [], limit: NONE] @@ -169,7 +169,7 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t │ ├── read rows: 1 -│ ├── read bytes: 31 +│ ├── read bytes: 197 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] @@ -181,14 +181,14 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t1 │ ├── read rows: 10 - │ ├── read bytes: 68 + │ ├── read bytes: 243 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] └── TableScan(Probe) ├── table: default.join_reorder.t2 ├── read rows: 100 - ├── read bytes: 431 + ├── read bytes: 610 ├── partitions total: 1 ├── partitions scanned: 1 └── push downs: [filters: [], limit: NONE] @@ -205,7 +205,7 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t1 │ ├── read rows: 10 -│ ├── read bytes: 68 +│ ├── read bytes: 243 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] @@ -217,14 +217,14 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t │ ├── read rows: 1 - │ ├── read bytes: 31 + │ ├── read bytes: 197 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] └── TableScan(Probe) ├── table: default.join_reorder.t2 ├── read rows: 100 - ├── read bytes: 431 + ├── read bytes: 610 ├── partitions total: 1 ├── partitions scanned: 1 └── push downs: [filters: [], limit: NONE] diff --git a/tests/logictest/suites/mode/standalone/explain/nullable_prune.test b/tests/logictest/suites/mode/standalone/explain/nullable_prune.test index f7d122b6bc458..3cfbb082ec7ea 100644 --- a/tests/logictest/suites/mode/standalone/explain/nullable_prune.test +++ b/tests/logictest/suites/mode/standalone/explain/nullable_prune.test @@ -14,7 +14,7 @@ explain select * from t_nullable_prune; TableScan ├── table: default.default.t_nullable_prune ├── read rows: 6 -├── read bytes: 62 +├── read bytes: 376 ├── partitions total: 2 ├── partitions scanned: 2 └── push downs: [filters: [], limit: NONE] @@ -25,10 +25,10 @@ explain select * from t_nullable_prune where a is not null; ---- TableScan ├── table: default.default.t_nullable_prune -├── read rows: 3 -├── read bytes: 37 +├── read rows: 6 +├── read bytes: 376 ├── partitions total: 2 -├── partitions scanned: 1 +├── partitions scanned: 2 └── push downs: [filters: [is_not_null(a)], limit: NONE] statement query T @@ -37,10 +37,10 @@ explain select * from t_nullable_prune where a is null; ---- TableScan ├── table: default.default.t_nullable_prune -├── read rows: 3 -├── read bytes: 25 +├── read rows: 6 +├── read bytes: 376 ├── partitions total: 2 -├── partitions scanned: 1 +├── partitions scanned: 2 └── push downs: [filters: [not(is_not_null(a))], limit: NONE] statement ok diff --git a/tests/logictest/suites/mode/standalone/explain/select_limit_offset.test b/tests/logictest/suites/mode/standalone/explain/select_limit_offset.test index e2dce241eb91e..5f46a981fa9b3 100644 --- a/tests/logictest/suites/mode/standalone/explain/select_limit_offset.test +++ b/tests/logictest/suites/mode/standalone/explain/select_limit_offset.test @@ -150,7 +150,7 @@ Limit │ └── TableScan │ ├── table: default.default.t1 │ ├── read rows: 2 - │ ├── read bytes: 31 + │ ├── read bytes: 188 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: 3] @@ -160,7 +160,7 @@ Limit └── TableScan ├── table: default.default.t ├── read rows: 1 - ├── read bytes: 27 + ├── read bytes: 184 ├── partitions total: 1 ├── partitions scanned: 1 └── push downs: [filters: [], limit: 3]