From ac94c717bca98d1efa80628c64814b214d822fd6 Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Sat, 24 Sep 2022 22:22:47 +0800 Subject: [PATCH 01/20] feat(cluster): experimental distributed eval index --- src/query/catalog/src/table_context.rs | 2 + src/query/service/src/sessions/query_ctx.rs | 5 ++ .../operations/mutation/recluster_mutator.rs | 5 +- .../service/tests/it/storages/fuse/pruning.rs | 4 +- src/query/settings/src/lib.rs | 22 +++++++ src/query/storages/fuse/src/fuse_lazy_part.rs | 31 +++++++++ src/query/storages/fuse/src/lib.rs | 2 + .../storages/fuse/src/operations/compact.rs | 3 +- .../storages/fuse/src/operations/delete.rs | 5 +- .../storages/fuse/src/operations/read.rs | 41 +++++++++++- .../fuse/src/operations/read_partitions.rs | 63 +++++++++++++++---- .../storages/fuse/src/operations/recluster.rs | 8 +-- .../fuse/src/pruning/pruning_executor.rs | 15 +---- 13 files changed, 168 insertions(+), 38 deletions(-) create mode 100644 src/query/storages/fuse/src/fuse_lazy_part.rs diff --git a/src/query/catalog/src/table_context.rs b/src/query/catalog/src/table_context.rs index 3a81d103b0c11..fbd37872764bf 100644 --- a/src/query/catalog/src/table_context.rs +++ b/src/query/catalog/src/table_context.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use common_base::base::Progress; use common_base::base::ProgressValues; +use common_base::base::Runtime; use common_config::Config; use common_contexts::DalContext; use common_contexts::DalMetrics; @@ -99,4 +100,5 @@ pub trait TableContext: Send + Sync { async fn get_table(&self, catalog: &str, database: &str, table: &str) -> Result>; fn get_processes_info(&self) -> Vec; + fn get_runtime(&self) -> Result>; } diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index 8511ebdfcd711..5bd561fbfaa7a 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -24,6 +24,7 @@ use chrono_tz::Tz; use common_base::base::tokio::task::JoinHandle; use common_base::base::Progress; use common_base::base::ProgressValues; +use common_base::base::Runtime; use common_base::base::TrySpawn; use common_contexts::DalContext; use common_contexts::DalMetrics; @@ -365,6 +366,10 @@ impl TableContext for QueryContext { fn get_processes_info(&self) -> Vec { SessionManager::instance().processes_info() } + + fn get_runtime(&self) -> Result> { + self.shared.try_get_runtime() + } } impl TrySpawn for QueryContext { diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs index d059a92e3381b..adad2667552b2 100644 --- a/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs @@ -114,9 +114,8 @@ async fn test_recluster_mutator_block_select() -> Result<()> { let schema = DataSchemaRef::new(DataSchema::empty()); let ctx: Arc = ctx.clone(); - let block_metas = BlockPruner::new(base_snapshot.clone()) - .prune(&ctx, schema, &None) - .await?; + let segments_location = base_snapshot.segments.clone(); + let block_metas = BlockPruner::prune(&ctx, schema, &None, segments_location).await?; let mut blocks_map = BTreeMap::new(); blocks_map.insert(0, block_metas); diff --git a/src/query/service/tests/it/storages/fuse/pruning.rs b/src/query/service/tests/it/storages/fuse/pruning.rs index 896338e9090e7..1b12c7e881f69 100644 --- a/src/query/service/tests/it/storages/fuse/pruning.rs +++ b/src/query/service/tests/it/storages/fuse/pruning.rs @@ -48,8 +48,8 @@ async fn apply_block_pruning( ctx: Arc, ) -> Result> { let ctx: Arc = ctx; - BlockPruner::new(table_snapshot) - .prune(&ctx, schema, push_down) + let segment_locs = table_snapshot.segments.clone(); + BlockPruner::prune(&ctx, schema, push_down, segment_locs) .await .map(|v| v.into_iter().map(|(_, v)| v).collect()) } diff --git a/src/query/settings/src/lib.rs b/src/query/settings/src/lib.rs index b02242384d0b6..bb605a2b811ad 100644 --- a/src/query/settings/src/lib.rs +++ b/src/query/settings/src/lib.rs @@ -330,6 +330,16 @@ impl Settings { desc: "The quote char for CSV. default value: '\"'.", possible_values: None, }, + SettingValue { + default_value: UserSettingValue::UInt64(0), + user_setting: UserSetting::create( + "enable_distributed_eval_index", + UserSettingValue::UInt64(0), + ), + level: ScopeLevel::Session, + desc: "If enable distributed eval index, default value: 0", + possible_values: None, + }, ]; let settings: Arc>> = @@ -511,6 +521,18 @@ impl Settings { self.try_set_u64(KEY, v, false) } + pub fn get_enable_distributed_eval_index(&self) -> Result { + static KEY: &str = "enable_distributed_eval_index"; + let v = self.try_get_u64(KEY)?; + Ok(v != 0) + } + + pub fn set_enable_distributed_eval_index(&self, val: bool) -> Result<()> { + static KEY: &str = "enable_distributed_eval_index"; + let v = u64::from(val); + self.try_set_u64(KEY, v, false) + } + pub fn get_enable_cbo(&self) -> Result { static KEY: &str = "enable_cbo"; let v = self.try_get_u64(KEY)?; diff --git a/src/query/storages/fuse/src/fuse_lazy_part.rs b/src/query/storages/fuse/src/fuse_lazy_part.rs new file mode 100644 index 0000000000000..2ada34ec2415e --- /dev/null +++ b/src/query/storages/fuse/src/fuse_lazy_part.rs @@ -0,0 +1,31 @@ +use std::any::Any; +use std::sync::Arc; + +use common_fuse_meta::meta::Location; +use common_legacy_planners::PartInfo; +use common_legacy_planners::PartInfoPtr; + +#[derive(serde::Serialize, serde::Deserialize, PartialEq, Eq)] +pub struct FuseLazyPartInfo { + pub segment_location: Location, +} + +#[typetag::serde(name = "fuse_lazy")] +impl PartInfo for FuseLazyPartInfo { + fn as_any(&self) -> &dyn Any { + self + } + + fn equals(&self, info: &Box) -> bool { + match info.as_any().downcast_ref::() { + None => false, + Some(other) => self == other, + } + } +} + +impl FuseLazyPartInfo { + pub fn create(segment_location: Location) -> PartInfoPtr { + Arc::new(Box::new(FuseLazyPartInfo { segment_location })) + } +} diff --git a/src/query/storages/fuse/src/lib.rs b/src/query/storages/fuse/src/lib.rs index 0979f31842e38..909a91ab7cf47 100644 --- a/src/query/storages/fuse/src/lib.rs +++ b/src/query/storages/fuse/src/lib.rs @@ -17,6 +17,7 @@ #![deny(unused_crate_dependencies)] mod constants; +mod fuse_lazy_part; mod fuse_part; mod fuse_table; pub mod io; @@ -24,6 +25,7 @@ pub mod operations; pub mod pruning; pub mod statistics; pub mod table_functions; + use common_catalog::table::NavigationPoint; use common_catalog::table::Table; use common_catalog::table::TableStatistics; diff --git a/src/query/storages/fuse/src/operations/compact.rs b/src/query/storages/fuse/src/operations/compact.rs index f2ee160de4d83..8231b81d59ef2 100644 --- a/src/query/storages/fuse/src/operations/compact.rs +++ b/src/query/storages/fuse/src/operations/compact.rs @@ -69,8 +69,9 @@ impl FuseTable { } let partitions_total = mutator.partitions_total(); - let (statistics, parts) = self.read_partitions_with_metas( + let (statistics, parts) = Self::read_partitions_with_metas( ctx.clone(), + self.table_info.schema(), None, mutator.selected_blocks(), partitions_total, diff --git a/src/query/storages/fuse/src/operations/delete.rs b/src/query/storages/fuse/src/operations/delete.rs index f15d21d35d766..90d7c03129bfd 100644 --- a/src/query/storages/fuse/src/operations/delete.rs +++ b/src/query/storages/fuse/src/operations/delete.rs @@ -96,9 +96,8 @@ impl FuseTable { order_by: vec![], }; let push_downs = Some(extras); - let block_metas = BlockPruner::new(snapshot.clone()) - .prune(&ctx, schema, &push_downs) - .await?; + let segments_location = snapshot.segments.clone(); + let block_metas = BlockPruner::prune(&ctx, schema, &push_downs, segments_location).await?; // delete block one by one. // this could be executed in a distributed manner (till new planner, pipeline settled down) diff --git a/src/query/storages/fuse/src/operations/read.rs b/src/query/storages/fuse/src/operations/read.rs index c7ae6abe840d5..ae4b5509d532d 100644 --- a/src/query/storages/fuse/src/operations/read.rs +++ b/src/query/storages/fuse/src/operations/read.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use common_base::base::Progress; use common_base::base::ProgressValues; +use common_base::base::TrySpawn; use common_catalog::table_context::TableContext; use common_datablocks::DataBlock; use common_datavalues::ColumnRef; @@ -36,6 +37,7 @@ use common_pipeline_core::Pipeline; use common_pipeline_core::SourcePipeBuilder; use common_pipeline_transforms::processors::ExpressionExecutor; +use crate::fuse_lazy_part::FuseLazyPartInfo; use crate::io::BlockReader; use crate::operations::read::State::Generated; use crate::FuseTable; @@ -81,6 +83,43 @@ impl FuseTable { plan: &ReadDataSourcePlan, pipeline: &mut Pipeline, ) -> Result<()> { + let mut lazy_init_segments = Vec::with_capacity(plan.parts.len()); + + for part in &plan.parts { + if let Some(lazy_part_info) = part.as_any().downcast_ref::() { + lazy_init_segments.push(lazy_part_info.segment_location.clone()); + } + } + + if !lazy_init_segments.is_empty() { + let table_info = self.table_info.clone(); + let push_downs = plan.push_downs.clone(); + let query_ctx = ctx.clone(); + let re_partitions = ctx.get_runtime()?.spawn(async move { + let (_statistics, partitions) = FuseTable::prune_snapshot_blocks( + query_ctx, + push_downs, + table_info, + lazy_init_segments, + 0, + ) + .await?; + + Ok(partitions) + }); + + let partitions = match futures::executor::block_on(re_partitions) { + Ok(Ok(partitions)) => Ok(partitions), + Ok(Err(error)) => Err(error), + Err(cause) => Err(ErrorCode::PanicError(format!( + "Maybe panic while in commit insert. {}", + cause + ))), + }?; + + ctx.try_set_partitions(partitions)?; + } + let table_schema = self.table_info.schema(); let projection = self.projection_of_push_downs(&plan.push_downs); let output_reader = self.create_block_reader(&ctx, projection)?; // for deserialize output blocks @@ -123,9 +162,7 @@ impl FuseTable { let prewhere_filter = Arc::new(prewhere_filter); let remain_reader = Arc::new(remain_reader); - let parts_len = plan.parts.len(); let max_threads = ctx.get_settings().get_max_threads()? as usize; - let max_threads = std::cmp::min(parts_len, max_threads); let mut source_builder = SourcePipeBuilder::create(); diff --git a/src/query/storages/fuse/src/operations/read_partitions.rs b/src/query/storages/fuse/src/operations/read_partitions.rs index 88ca12281e243..55134af6f5006 100644 --- a/src/query/storages/fuse/src/operations/read_partitions.rs +++ b/src/query/storages/fuse/src/operations/read_partitions.rs @@ -16,15 +16,19 @@ use std::collections::HashMap; use std::sync::Arc; use common_catalog::table_context::TableContext; +use common_datavalues::DataSchemaRef; use common_exception::Result; use common_fuse_meta::meta::BlockMeta; +use common_fuse_meta::meta::Location; use common_fuse_meta::meta::TableSnapshot; use common_legacy_planners::Extras; use common_legacy_planners::PartInfoPtr; use common_legacy_planners::Partitions; use common_legacy_planners::Projection; use common_legacy_planners::Statistics; +use common_meta_app::schema::TableInfo; +use crate::fuse_lazy_part::FuseLazyPartInfo; use crate::fuse_part::ColumnLeaves; use crate::fuse_part::ColumnMeta; use crate::fuse_part::FusePartInfo; @@ -45,28 +49,65 @@ impl FuseTable { return Ok(result); } - let block_metas = BlockPruner::new(snapshot.clone()) - .prune(&ctx, self.table_info.schema(), &push_downs) - .await? - .into_iter() - .map(|(_, v)| v) - .collect::>(); - let partitions_total = snapshot.summary.block_count as usize; - self.read_partitions_with_metas(ctx, push_downs, block_metas, partitions_total) + let settings = ctx.get_settings(); + + if settings.get_enable_distributed_eval_index()? { + let mut segments = Vec::with_capacity(snapshot.segments.len()); + + for segment_location in &snapshot.segments { + segments.push(FuseLazyPartInfo::create(segment_location.clone())) + } + + return Ok(( + Statistics::new_estimated( + snapshot.summary.row_count as usize, + snapshot.summary.uncompressed_byte_size as usize, + snapshot.segments.len(), + snapshot.segments.len(), + ), + segments, + )); + } + + let table_info = self.table_info.clone(); + let segments_location = snapshot.segments.clone(); + let summary = snapshot.summary.block_count as usize; + Self::prune_snapshot_blocks( + ctx.clone(), + push_downs.clone(), + table_info, + segments_location, + summary, + ) + .await } None => Ok((Statistics::default(), vec![])), } } + pub async fn prune_snapshot_blocks( + ctx: Arc, + push_downs: Option, + table_info: TableInfo, + segments_location: Vec, + summary: usize, + ) -> Result<(Statistics, Partitions)> { + let block_metas = + BlockPruner::prune(&ctx, table_info.schema(), &push_downs, segments_location) + .await? + .into_iter() + .map(|(_, v)| v) + .collect::>(); + Self::read_partitions_with_metas(ctx, table_info.schema(), push_downs, block_metas, summary) + } + pub fn read_partitions_with_metas( - &self, ctx: Arc, + schema: DataSchemaRef, push_downs: Option, block_metas: Vec, partitions_total: usize, ) -> Result<(Statistics, Partitions)> { - let schema = self.table_info.schema(); - let arrow_schema = schema.to_arrow(); let column_leaves = ColumnLeaves::new_from_schema(&arrow_schema); diff --git a/src/query/storages/fuse/src/operations/recluster.rs b/src/query/storages/fuse/src/operations/recluster.rs index 2744996bce6c8..536c14b767eca 100644 --- a/src/query/storages/fuse/src/operations/recluster.rs +++ b/src/query/storages/fuse/src/operations/recluster.rs @@ -62,9 +62,8 @@ impl FuseTable { }; let schema = self.table_info.schema(); - let block_metas = BlockPruner::new(snapshot.clone()) - .prune(&ctx, schema, &push_downs) - .await?; + let segments_locations = snapshot.segments.clone(); + let block_metas = BlockPruner::prune(&ctx, schema, &push_downs, segments_locations).await?; let default_cluster_key_id = self.cluster_key_meta.clone().unwrap().0; let mut blocks_map: BTreeMap> = BTreeMap::new(); @@ -107,8 +106,9 @@ impl FuseTable { } let partitions_total = mutator.partitions_total(); - let (statistics, parts) = self.read_partitions_with_metas( + let (statistics, parts) = Self::read_partitions_with_metas( ctx.clone(), + self.table_info.schema(), None, mutator.selected_blocks(), partitions_total, diff --git a/src/query/storages/fuse/src/pruning/pruning_executor.rs b/src/query/storages/fuse/src/pruning/pruning_executor.rs index 614d311166566..4669ba298b721 100644 --- a/src/query/storages/fuse/src/pruning/pruning_executor.rs +++ b/src/query/storages/fuse/src/pruning/pruning_executor.rs @@ -22,7 +22,6 @@ use common_exception::ErrorCode; use common_exception::Result; use common_fuse_meta::meta::BlockMeta; use common_fuse_meta::meta::Location; -use common_fuse_meta::meta::TableSnapshot; use common_legacy_planners::Extras; use futures::future; use futures::StreamExt; @@ -35,27 +34,19 @@ use crate::pruning::limiter; use crate::pruning::range_pruner; use crate::pruning::topn_pruner; -pub struct BlockPruner { - table_snapshot: Arc, -} +pub struct BlockPruner; const FUTURE_BUFFER_SIZE: usize = 10; impl BlockPruner { - pub fn new(table_snapshot: Arc) -> Self { - Self { table_snapshot } - } - // prune blocks by utilizing min_max index and bloom filter, according to the pushdowns - #[tracing::instrument(level = "debug", skip(self, schema, ctx), fields(ctx.id = ctx.get_id().as_str()))] + #[tracing::instrument(level = "debug", skip(schema, ctx), fields(ctx.id = ctx.get_id().as_str()))] pub async fn prune( - &self, ctx: &Arc, schema: DataSchemaRef, push_down: &Option, + segment_locs: Vec, ) -> Result> { - let segment_locs = self.table_snapshot.segments.clone(); - if segment_locs.is_empty() { return Ok(vec![]); }; From 8d0b328504ffc7317f52bbeb91f6362094d2ca29 Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Sat, 24 Sep 2022 22:25:29 +0800 Subject: [PATCH 02/20] feat(cluster): update error message --- src/query/storages/fuse/src/operations/read.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/storages/fuse/src/operations/read.rs b/src/query/storages/fuse/src/operations/read.rs index ae4b5509d532d..7c9e352a800d8 100644 --- a/src/query/storages/fuse/src/operations/read.rs +++ b/src/query/storages/fuse/src/operations/read.rs @@ -112,7 +112,7 @@ impl FuseTable { Ok(Ok(partitions)) => Ok(partitions), Ok(Err(error)) => Err(error), Err(cause) => Err(ErrorCode::PanicError(format!( - "Maybe panic while in commit insert. {}", + "Maybe panic while in eval index. {}", cause ))), }?; From 143924eeb2fc1bcd1446d8913f71c51ae08bd3fe Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Sat, 24 Sep 2022 22:55:10 +0800 Subject: [PATCH 03/20] feat(cluster): try fix unit test failure --- .../tests/it/storages/system/settings_table.rs | 3 ++- src/query/settings/src/lib.rs | 6 +++--- src/query/storages/fuse/src/fuse_lazy_part.rs | 14 ++++++++++++++ 3 files changed, 19 insertions(+), 4 deletions(-) diff --git a/src/query/service/tests/it/storages/system/settings_table.rs b/src/query/service/tests/it/storages/system/settings_table.rs index 2235ea2d0b91a..d44411d6e0140 100644 --- a/src/query/service/tests/it/storages/system/settings_table.rs +++ b/src/query/service/tests/it/storages/system/settings_table.rs @@ -38,12 +38,13 @@ async fn test_settings_table() -> Result<()> { "| empty_as_default | 1 | 1 | SESSION | Format empty_as_default, default value: 1 | UInt64 |", "| enable_async_insert | 0 | 0 | SESSION | Whether the client open async insert mode, default value: 0 | UInt64 |", "| enable_cbo | 1 | 1 | SESSION | If enable cost based optimization, default value: 1 | UInt64 |", + "| enable_distributed_eval_index | 1 | 1 | SESSION | If enable distributed eval index, default value: 1 | UInt64 |", "| enable_new_processor_framework | 1 | 1 | SESSION | Enable new processor framework if value != 0, default value: 1 | UInt64 |", "| enable_planner_v2 | 1 | 1 | SESSION | Enable planner v2 by setting this variable to 1, default value: 1 | UInt64 |", "| field_delimiter | , | , | SESSION | Format field delimiter, default value: , | String |", "| flight_client_timeout | 60 | 60 | SESSION | Max duration the flight client request is allowed to take in seconds. By default, it is 60 seconds | UInt64 |", - "| input_read_buffer_size | 1048576 | 1048576 | SESSION | The size of buffer in bytes for input with format. By default, it is 1MB. | UInt64 |", "| group_by_two_level_threshold | 10000 | 10000 | SESSION | The threshold of keys to open two-level aggregation, default value: 10000 | UInt64 |", + "| input_read_buffer_size | 1048576 | 1048576 | SESSION | The size of buffer in bytes for input with format. By default, it is 1MB. | UInt64 |", "| max_block_size | 10000 | 10000 | SESSION | Maximum block size for reading | UInt64 |", "| max_execute_time | 0 | 0 | SESSION | The maximum query execution time. it means no limit if the value is zero. default value: 0 | UInt64 |", "| max_threads | 2 | 16 | SESSION | The maximum number of threads to execute the request. By default, it is determined automatically. | UInt64 |", diff --git a/src/query/settings/src/lib.rs b/src/query/settings/src/lib.rs index bb605a2b811ad..78f5a121605f7 100644 --- a/src/query/settings/src/lib.rs +++ b/src/query/settings/src/lib.rs @@ -331,13 +331,13 @@ impl Settings { possible_values: None, }, SettingValue { - default_value: UserSettingValue::UInt64(0), + default_value: UserSettingValue::UInt64(1), user_setting: UserSetting::create( "enable_distributed_eval_index", - UserSettingValue::UInt64(0), + UserSettingValue::UInt64(1), ), level: ScopeLevel::Session, - desc: "If enable distributed eval index, default value: 0", + desc: "If enable distributed eval index, default value: 1", possible_values: None, }, ]; diff --git a/src/query/storages/fuse/src/fuse_lazy_part.rs b/src/query/storages/fuse/src/fuse_lazy_part.rs index 2ada34ec2415e..c6f50015fd971 100644 --- a/src/query/storages/fuse/src/fuse_lazy_part.rs +++ b/src/query/storages/fuse/src/fuse_lazy_part.rs @@ -1,3 +1,17 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + use std::any::Any; use std::sync::Arc; From 2c5c4134ce01d9161c3977e6d4a68090de285e93 Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Sun, 25 Sep 2022 10:41:41 +0800 Subject: [PATCH 04/20] feat(cluster): try fix test failure --- .../service/src/pipelines/executor/pipeline_executor.rs | 1 + src/query/storages/fuse/src/operations/read.rs | 4 +++- .../storages/fuse/src/operations/read_partitions.rs | 9 +++++++-- 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/src/query/service/src/pipelines/executor/pipeline_executor.rs b/src/query/service/src/pipelines/executor/pipeline_executor.rs index de40d7ee913d8..b9a3a458952ad 100644 --- a/src/query/service/src/pipelines/executor/pipeline_executor.rs +++ b/src/query/service/src/pipelines/executor/pipeline_executor.rs @@ -57,6 +57,7 @@ impl PipelineExecutor { settings: ExecutorSettings, ) -> Result> { let threads_num = pipeline.get_max_threads(); + // let on_init_callback = pipeline.take_on_init(); let on_finished_callback = pipeline.take_on_finished(); assert_ne!(threads_num, 0, "Pipeline max threads cannot equals zero."); diff --git a/src/query/storages/fuse/src/operations/read.rs b/src/query/storages/fuse/src/operations/read.rs index 7c9e352a800d8..a4e7113135e2b 100644 --- a/src/query/storages/fuse/src/operations/read.rs +++ b/src/query/storages/fuse/src/operations/read.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use common_base::base::Progress; use common_base::base::ProgressValues; +use common_base::base::Runtime; use common_base::base::TrySpawn; use common_catalog::table_context::TableContext; use common_datablocks::DataBlock; @@ -95,7 +96,8 @@ impl FuseTable { let table_info = self.table_info.clone(); let push_downs = plan.push_downs.clone(); let query_ctx = ctx.clone(); - let re_partitions = ctx.get_runtime()?.spawn(async move { + let runtime = Runtime::with_worker_threads(2, None)?; + let re_partitions = runtime.spawn(async move { let (_statistics, partitions) = FuseTable::prune_snapshot_blocks( query_ctx, push_downs, diff --git a/src/query/storages/fuse/src/operations/read_partitions.rs b/src/query/storages/fuse/src/operations/read_partitions.rs index 55134af6f5006..6c09086c73960 100644 --- a/src/query/storages/fuse/src/operations/read_partitions.rs +++ b/src/query/storages/fuse/src/operations/read_partitions.rs @@ -14,6 +14,7 @@ use std::collections::HashMap; use std::sync::Arc; +use std::time::Instant; use common_catalog::table_context::TableContext; use common_datavalues::DataSchemaRef; @@ -42,6 +43,7 @@ impl FuseTable { ctx: Arc, push_downs: Option, ) -> Result<(Statistics, Partitions)> { + let instant = Instant::now(); let snapshot = self.read_table_snapshot(ctx.clone()).await?; match snapshot { Some(snapshot) => { @@ -58,6 +60,7 @@ impl FuseTable { segments.push(FuseLazyPartInfo::create(segment_location.clone())) } + println!("elapsed {:?}", instant.elapsed()); return Ok(( Statistics::new_estimated( snapshot.summary.row_count as usize, @@ -72,14 +75,16 @@ impl FuseTable { let table_info = self.table_info.clone(); let segments_location = snapshot.segments.clone(); let summary = snapshot.summary.block_count as usize; - Self::prune_snapshot_blocks( + let s = Self::prune_snapshot_blocks( ctx.clone(), push_downs.clone(), table_info, segments_location, summary, ) - .await + .await; + println!("elapsed {:?}", instant.elapsed()); + s } None => Ok((Statistics::default(), vec![])), } From 7752bc42e90b104c3bcf14c436275614ce558f86 Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Tue, 27 Sep 2022 13:23:51 +0800 Subject: [PATCH 05/20] feat(cluster): support on init in pipeline --- src/query/pipeline/core/src/pipeline.rs | 22 ++++++ .../pipelines/executor/pipeline_executor.rs | 23 ++++++- .../storages/fuse/src/operations/read.rs | 69 +++++++++++-------- 3 files changed, 85 insertions(+), 29 deletions(-) diff --git a/src/query/pipeline/core/src/pipeline.rs b/src/query/pipeline/core/src/pipeline.rs index 74778c09e7fda..70c0bddca1398 100644 --- a/src/query/pipeline/core/src/pipeline.rs +++ b/src/query/pipeline/core/src/pipeline.rs @@ -45,9 +45,12 @@ use crate::TransformPipeBuilder; pub struct Pipeline { max_threads: usize, pub pipes: Vec, + on_init: Option, on_finished: Option, } +pub type InitCallback = Arc Result<()> + Send + Sync + 'static>>; + pub type FinishedCallback = Arc) -> Result<()> + Send + Sync + 'static>>; @@ -56,6 +59,7 @@ impl Pipeline { Pipeline { max_threads: 0, pipes: Vec::new(), + on_init: None, on_finished: None, } } @@ -159,6 +163,17 @@ impl Pipeline { } } + pub fn set_on_init Result<()> + Send + Sync + 'static>(&mut self, f: F) { + if let Some(on_init) = &self.on_init { + let old_on_init = on_init.clone(); + + self.on_init = Some(Arc::new(Box::new(move || { + old_on_init()?; + f() + }))) + } + } + pub fn set_on_finished) -> Result<()> + Send + Sync + 'static>( &mut self, f: F, @@ -177,6 +192,13 @@ impl Pipeline { self.on_finished = Some(Arc::new(Box::new(f))); } + pub fn take_on_init(&mut self) -> InitCallback { + match self.on_init.take() { + None => Arc::new(Box::new(|| Ok(()))), + Some(on_init) => on_init, + } + } + pub fn take_on_finished(&mut self) -> FinishedCallback { match self.on_finished.take() { None => Arc::new(Box::new(|_may_error| Ok(()))), diff --git a/src/query/service/src/pipelines/executor/pipeline_executor.rs b/src/query/service/src/pipelines/executor/pipeline_executor.rs index b9a3a458952ad..2a10a4cf09218 100644 --- a/src/query/service/src/pipelines/executor/pipeline_executor.rs +++ b/src/query/service/src/pipelines/executor/pipeline_executor.rs @@ -36,6 +36,8 @@ use crate::pipelines::executor::executor_worker_context::ExecutorWorkerContext; use crate::pipelines::executor::ExecutorSettings; use crate::pipelines::pipeline::Pipeline; +pub type InitCallback = Arc Result<()> + Send + Sync + 'static>>; + pub type FinishedCallback = Arc) -> Result<()> + Send + Sync + 'static>>; @@ -45,6 +47,7 @@ pub struct PipelineExecutor { workers_condvar: Arc, pub async_runtime: Arc, pub global_tasks_queue: Arc, + on_init_callback: InitCallback, on_finished_callback: FinishedCallback, settings: ExecutorSettings, finished_notify: Notify, @@ -57,13 +60,14 @@ impl PipelineExecutor { settings: ExecutorSettings, ) -> Result> { let threads_num = pipeline.get_max_threads(); - // let on_init_callback = pipeline.take_on_init(); + let on_init_callback = pipeline.take_on_init(); let on_finished_callback = pipeline.take_on_finished(); assert_ne!(threads_num, 0, "Pipeline max threads cannot equals zero."); Self::try_create( RunningGraph::create(pipeline)?, threads_num, + on_init_callback, on_finished_callback, settings, ) @@ -83,6 +87,11 @@ impl PipelineExecutor { .max() .unwrap_or(0); + let on_init_callbacks = pipelines + .iter_mut() + .map(|x| x.take_on_init()) + .collect::>(); + let on_finished_callbacks = pipelines .iter_mut() .map(|x| x.take_on_finished()) @@ -92,6 +101,13 @@ impl PipelineExecutor { Self::try_create( RunningGraph::from_pipelines(pipelines)?, threads_num, + Arc::new(Box::new(move || { + for on_init_callback in &on_init_callbacks { + on_init_callback()?; + } + + Ok(()) + })), Arc::new(Box::new(move |may_error| { for on_finished_callback in &on_finished_callbacks { on_finished_callback(may_error)?; @@ -106,6 +122,7 @@ impl PipelineExecutor { fn try_create( graph: RunningGraph, threads_num: usize, + on_init_callback: InitCallback, on_finished_callback: FinishedCallback, settings: ExecutorSettings, ) -> Result> { @@ -126,6 +143,7 @@ impl PipelineExecutor { threads_num, workers_condvar, global_tasks_queue, + on_init_callback, on_finished_callback, async_runtime: GlobalIORuntime::instance(), settings, @@ -146,6 +164,9 @@ impl PipelineExecutor { } pub fn execute(self: &Arc) -> Result<()> { + // TODO: the on init callback cannot be killed. + (self.on_init_callback)()?; + self.start_executor_daemon()?; let mut thread_join_handles = self.execute_threads(self.threads_num); diff --git a/src/query/storages/fuse/src/operations/read.rs b/src/query/storages/fuse/src/operations/read.rs index a4e7113135e2b..76807d85be67c 100644 --- a/src/query/storages/fuse/src/operations/read.rs +++ b/src/query/storages/fuse/src/operations/read.rs @@ -56,9 +56,9 @@ impl FuseTable { pub fn projection_of_push_downs(&self, push_downs: &Option) -> Projection { if let Some(Extras { - projection: Some(prj), - .. - }) = push_downs + projection: Some(prj), + .. + }) = push_downs { prj.clone() } else { @@ -96,30 +96,43 @@ impl FuseTable { let table_info = self.table_info.clone(); let push_downs = plan.push_downs.clone(); let query_ctx = ctx.clone(); - let runtime = Runtime::with_worker_threads(2, None)?; - let re_partitions = runtime.spawn(async move { - let (_statistics, partitions) = FuseTable::prune_snapshot_blocks( - query_ctx, - push_downs, - table_info, - lazy_init_segments, - 0, - ) - .await?; - - Ok(partitions) - }); - let partitions = match futures::executor::block_on(re_partitions) { - Ok(Ok(partitions)) => Ok(partitions), - Ok(Err(error)) => Err(error), - Err(cause) => Err(ErrorCode::PanicError(format!( - "Maybe panic while in eval index. {}", - cause - ))), - }?; + // TODO: need refactor + pipeline.set_on_init(move || { + println!("on init"); + let table_info = table_info.clone(); + let push_downs = push_downs.clone(); + let lazy_init_segments = lazy_init_segments.clone(); + let runtime = Runtime::with_worker_threads(2, None)?; + + let ctx = query_ctx.clone(); + let re_partitions = runtime.spawn(async move { + let (_statistics, partitions) = FuseTable::prune_snapshot_blocks( + ctx, + push_downs, + table_info, + lazy_init_segments, + 0, + ) + .await?; + + Ok(partitions) + }); + + let partitions = match futures::executor::block_on(re_partitions) { + Ok(Ok(partitions)) => Ok(partitions), + Ok(Err(error)) => Err(error), + Err(cause) => Err(ErrorCode::PanicError(format!( + "Maybe panic while in eval index. {}", + cause + ))), + }?; + + println!("new partitions {:?}", partitions); + query_ctx.try_set_partitions(partitions)?; - ctx.try_set_partitions(partitions)?; + Ok(()) + }); } let table_schema = self.table_info.schema(); @@ -328,9 +341,9 @@ impl Processor for FuseTableSource { match std::mem::replace(&mut self.state, State::Finish) { State::Deserialize(part, chunks, prewhere_data) => { let data_block = if let Some(PrewhereData { - data_block: mut prewhere_blocks, - filter, - }) = prewhere_data + data_block: mut prewhere_blocks, + filter, + }) = prewhere_data { let block = if chunks.is_empty() { prewhere_blocks From f38e681868da5e5a8a576d4c7d99ce4e40cb5015 Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Tue, 27 Sep 2022 14:59:29 +0800 Subject: [PATCH 06/20] feat(cluster): try fix test failure --- src/query/catalog/src/table_context.rs | 3 + src/query/pipeline/core/src/pipeline.rs | 6 +- src/query/service/src/sessions/query_ctx.rs | 5 + .../storages/result/result_table_source.rs | 44 ++++---- .../storages/fuse/src/operations/read.rs | 101 +++++++----------- .../storages/hive/src/hive_table_source.rs | 49 ++++----- .../src/memory/memory_table_stream.rs | 9 +- 7 files changed, 96 insertions(+), 121 deletions(-) diff --git a/src/query/catalog/src/table_context.rs b/src/query/catalog/src/table_context.rs index fbd37872764bf..2fe7d679edb88 100644 --- a/src/query/catalog/src/table_context.rs +++ b/src/query/catalog/src/table_context.rs @@ -25,6 +25,7 @@ use common_datablocks::DataBlock; use common_exception::Result; use common_functions::scalars::FunctionContext; use common_io::prelude::FormatSettings; +use common_legacy_planners::PartInfoPtr; use common_legacy_planners::Partitions; use common_legacy_planners::ReadDataSourcePlan; use common_meta_types::UserInfo; @@ -66,6 +67,8 @@ pub trait TableContext: Send + Sync { // Steal n partitions from the partition pool by the pipeline worker. // This also can steal the partitions from distributed node. fn try_get_partitions(&self, num: u64) -> Result; + + fn try_get_part(&self) -> Option; // Update the context partition pool from the pipeline builder. fn try_set_partitions(&self, partitions: Partitions) -> Result<()>; fn attach_query_str(&self, kind: String, query: &str); diff --git a/src/query/pipeline/core/src/pipeline.rs b/src/query/pipeline/core/src/pipeline.rs index 70c0bddca1398..002acaeff8019 100644 --- a/src/query/pipeline/core/src/pipeline.rs +++ b/src/query/pipeline/core/src/pipeline.rs @@ -170,8 +170,12 @@ impl Pipeline { self.on_init = Some(Arc::new(Box::new(move || { old_on_init()?; f() - }))) + }))); + + return; } + + self.on_init = Some(Arc::new(Box::new(f))); } pub fn set_on_finished) -> Result<()> + Send + Sync + 'static>( diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index 5bd561fbfaa7a..ca6bef254b68c 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -238,6 +238,11 @@ impl TableContext for QueryContext { } Ok(partitions) } + + fn try_get_part(&self) -> Option { + self.partition_queue.write().pop_back() + } + // Update the context partition pool from the pipeline builder. fn try_set_partitions(&self, partitions: Partitions) -> Result<()> { let mut partition_queue = self.partition_queue.write(); diff --git a/src/query/service/src/storages/result/result_table_source.rs b/src/query/service/src/storages/result/result_table_source.rs index 44dc8edc2fe96..00ca17ad18bb2 100644 --- a/src/query/service/src/storages/result/result_table_source.rs +++ b/src/query/service/src/storages/result/result_table_source.rs @@ -31,7 +31,7 @@ use crate::storages::fuse::io::BlockReader; use crate::storages::result::result_table_source::State::Generated; enum State { - ReadData(PartInfoPtr), + ReadData(Option), Deserialize(PartInfoPtr, Vec<(usize, Vec)>), Generated(Option, DataBlock), Finish, @@ -52,23 +52,13 @@ impl ResultTableSource { block_reader: Arc, ) -> Result { let scan_progress = ctx.get_scan_progress(); - let mut partitions = ctx.try_get_partitions(1)?; - match partitions.is_empty() { - true => Ok(ProcessorPtr::create(Box::new(ResultTableSource { - ctx, - output, - block_reader, - scan_progress, - state: State::Finish, - }))), - false => Ok(ProcessorPtr::create(Box::new(ResultTableSource { - ctx, - output, - block_reader, - scan_progress, - state: State::ReadData(partitions.remove(0)), - }))), - } + Ok(ProcessorPtr::create(Box::new(ResultTableSource { + ctx, + output, + block_reader, + scan_progress, + state: State::ReadData(None), + }))) } } @@ -83,6 +73,13 @@ impl Processor for ResultTableSource { } fn event(&mut self) -> Result { + if matches!(self.state, State::ReadData(None)) { + self.state = match self.ctx.try_get_part() { + None => State::Finish, + Some(part) => State::ReadData(Some(part)), + } + } + if matches!(self.state, State::Finish) { self.output.finish(); return Ok(Event::Finished); @@ -100,7 +97,7 @@ impl Processor for ResultTableSource { if let Generated(part, data_block) = std::mem::replace(&mut self.state, State::Finish) { self.state = match part { None => State::Finish, - Some(part) => State::ReadData(part), + Some(part) => State::ReadData(Some(part)), }; self.output.push_data(Ok(data_block)); @@ -120,7 +117,7 @@ impl Processor for ResultTableSource { match std::mem::replace(&mut self.state, State::Finish) { State::Deserialize(part, chunks) => { let data_block = self.block_reader.deserialize(part, chunks)?; - let mut partitions = self.ctx.try_get_partitions(1)?; + let mut new_part = self.ctx.try_get_part(); let progress_values = ProgressValues { rows: data_block.num_rows(), @@ -128,10 +125,7 @@ impl Processor for ResultTableSource { }; self.scan_progress.incr(&progress_values); - self.state = match partitions.is_empty() { - true => State::Generated(None, data_block), - false => State::Generated(Some(partitions.remove(0)), data_block), - }; + self.state = State::Generated(new_part, data_block); Ok(()) } _ => Err(ErrorCode::LogicalError("It's a bug.")), @@ -140,7 +134,7 @@ impl Processor for ResultTableSource { async fn async_process(&mut self) -> Result<()> { match std::mem::replace(&mut self.state, State::Finish) { - State::ReadData(part) => { + State::ReadData(Some(part)) => { let chunks = self.block_reader.read_columns_data(part.clone()).await?; self.state = State::Deserialize(part, chunks); Ok(()) diff --git a/src/query/storages/fuse/src/operations/read.rs b/src/query/storages/fuse/src/operations/read.rs index 76807d85be67c..06a0e03ccb1e2 100644 --- a/src/query/storages/fuse/src/operations/read.rs +++ b/src/query/storages/fuse/src/operations/read.rs @@ -56,9 +56,9 @@ impl FuseTable { pub fn projection_of_push_downs(&self, push_downs: &Option) -> Projection { if let Some(Extras { - projection: Some(prj), - .. - }) = push_downs + projection: Some(prj), + .. + }) = push_downs { prj.clone() } else { @@ -99,14 +99,12 @@ impl FuseTable { // TODO: need refactor pipeline.set_on_init(move || { - println!("on init"); let table_info = table_info.clone(); + let ctx = query_ctx.clone(); let push_downs = push_downs.clone(); let lazy_init_segments = lazy_init_segments.clone(); - let runtime = Runtime::with_worker_threads(2, None)?; - let ctx = query_ctx.clone(); - let re_partitions = runtime.spawn(async move { + let partitions = Runtime::with_worker_threads(2, None)?.block_on(async move { let (_statistics, partitions) = FuseTable::prune_snapshot_blocks( ctx, push_downs, @@ -114,21 +112,11 @@ impl FuseTable { lazy_init_segments, 0, ) - .await?; + .await?; - Ok(partitions) - }); + Result::<_, ErrorCode>::Ok(partitions) + })?; - let partitions = match futures::executor::block_on(re_partitions) { - Ok(Ok(partitions)) => Ok(partitions), - Ok(Err(error)) => Err(error), - Err(cause) => Err(ErrorCode::PanicError(format!( - "Maybe panic while in eval index. {}", - cause - ))), - }?; - - println!("new partitions {:?}", partitions); query_ctx.try_set_partitions(partitions)?; Ok(()) @@ -209,7 +197,7 @@ struct PrewhereData { } enum State { - ReadDataPrewhere(PartInfoPtr), + ReadDataPrewhere(Option), ReadDataRemain(PartInfoPtr, PrewhereData), PrewhereFilter(PartInfoPtr, DataChunks), Deserialize(PartInfoPtr, DataChunks, Option), @@ -239,54 +227,30 @@ impl FuseTableSource { remain_reader: Arc>, ) -> Result { let scan_progress = ctx.get_scan_progress(); - let mut partitions = ctx.try_get_partitions(1)?; - match partitions.is_empty() { - true => Ok(ProcessorPtr::create(Box::new(FuseTableSource { - ctx, - output, - scan_progress, - state: State::Finish, - output_reader, - prewhere_reader, - prewhere_filter, - remain_reader, - }))), - false => Ok(ProcessorPtr::create(Box::new(FuseTableSource { - ctx, - output, - scan_progress, - state: State::ReadDataPrewhere(partitions.remove(0)), - output_reader, - prewhere_reader, - prewhere_filter, - remain_reader, - }))), - } + Ok(ProcessorPtr::create(Box::new(FuseTableSource { + ctx, + output, + scan_progress, + state: State::ReadDataPrewhere(None), + output_reader, + prewhere_reader, + prewhere_filter, + remain_reader, + }))) } fn generate_one_block(&mut self, block: DataBlock) -> Result<()> { - let mut partitions = self.ctx.try_get_partitions(1)?; + let new_part = self.ctx.try_get_part(); // resort and prune columns let block = block.resort(self.output_reader.schema())?; - self.state = match partitions.is_empty() { - true => State::Generated(None, block), - false => State::Generated(Some(partitions.remove(0)), block), - }; + self.state = State::Generated(new_part, block); Ok(()) } fn generate_one_empty_block(&mut self) -> Result<()> { - let mut partitions = self.ctx.try_get_partitions(1)?; - self.state = match partitions.is_empty() { - true => State::Generated( - None, - DataBlock::empty_with_schema(self.output_reader.schema()), - ), - false => State::Generated( - Some(partitions.remove(0)), - DataBlock::empty_with_schema(self.output_reader.schema()), - ), - }; + let schema = self.output_reader.schema(); + let mut new_part = self.ctx.try_get_part(); + self.state = Generated(new_part, DataBlock::empty_with_schema(schema)); Ok(()) } } @@ -302,6 +266,13 @@ impl Processor for FuseTableSource { } fn event(&mut self) -> Result { + if matches!(self.state, State::ReadDataPrewhere(None)) { + self.state = match self.ctx.try_get_part() { + None => State::Finish, + Some(part) => State::ReadDataPrewhere(Some(part)), + } + } + if matches!(self.state, State::Finish) { self.output.finish(); return Ok(Event::Finished); @@ -319,7 +290,7 @@ impl Processor for FuseTableSource { if let Generated(part, data_block) = std::mem::replace(&mut self.state, State::Finish) { self.state = match part { None => State::Finish, - Some(part) => State::ReadDataPrewhere(part), + Some(part) => State::ReadDataPrewhere(Some(part)), }; self.output.push_data(Ok(data_block)); @@ -341,9 +312,9 @@ impl Processor for FuseTableSource { match std::mem::replace(&mut self.state, State::Finish) { State::Deserialize(part, chunks, prewhere_data) => { let data_block = if let Some(PrewhereData { - data_block: mut prewhere_blocks, - filter, - }) = prewhere_data + data_block: mut prewhere_blocks, + filter, + }) = prewhere_data { let block = if chunks.is_empty() { prewhere_blocks @@ -427,7 +398,7 @@ impl Processor for FuseTableSource { async fn async_process(&mut self) -> Result<()> { match std::mem::replace(&mut self.state, State::Finish) { - State::ReadDataPrewhere(part) => { + State::ReadDataPrewhere(Some(part)) => { let chunks = self.prewhere_reader.read_columns_data(part.clone()).await?; if self.prewhere_filter.is_some() { diff --git a/src/query/storages/hive/src/hive_table_source.rs b/src/query/storages/hive/src/hive_table_source.rs index 2834f6c5279b7..7bf5047533e93 100644 --- a/src/query/storages/hive/src/hive_table_source.rs +++ b/src/query/storages/hive/src/hive_table_source.rs @@ -37,7 +37,7 @@ use crate::HivePartInfo; enum State { /// Read parquet file meta data /// IO bound - ReadMeta(PartInfoPtr), + ReadMeta(Option), /// Read blocks from data groups (without deserialization) /// IO bound @@ -70,33 +70,21 @@ impl HiveTableSource { delay: usize, ) -> Result { let scan_progress = ctx.get_scan_progress(); - let mut partitions = ctx.try_get_partitions(1)?; - match partitions.is_empty() { - true => Ok(ProcessorPtr::create(Box::new(HiveTableSource { - ctx, - output, - block_reader, - scan_progress, - state: State::Finish, - delay, - }))), - false => Ok(ProcessorPtr::create(Box::new(HiveTableSource { - ctx, - output, - block_reader, - scan_progress, - state: State::ReadMeta(partitions.remove(0)), - delay, - }))), - } + Ok(ProcessorPtr::create(Box::new(HiveTableSource { + ctx, + output, + block_reader, + scan_progress, + state: State::ReadMeta(None), + delay, + }))) } fn try_get_partitions(&mut self) -> Result<()> { - let partitions = self.ctx.try_get_partitions(1)?; - match partitions.is_empty() { - true => self.state = State::Finish, - false => { - self.state = State::ReadMeta(partitions[0].clone()); + match self.ctx.try_get_part() { + None => self.state = State::Finish, + Some(part_info) => { + self.state = State::ReadMeta(Some(part_info)); } } @@ -115,6 +103,15 @@ impl Processor for HiveTableSource { } fn event(&mut self) -> Result { + if matches!(self.state, State::ReadMeta(None)) { + match self.ctx.try_get_part() { + None => self.state = State::Finish, + Some(part_info) => { + self.state = State::ReadMeta(Some(part_info)); + } + } + } + if self.output.is_finished() { return Ok(Event::Finished); } @@ -184,7 +181,7 @@ impl Processor for HiveTableSource { async fn async_process(&mut self) -> Result<()> { match std::mem::replace(&mut self.state, State::Finish) { - State::ReadMeta(part) => { + State::ReadMeta(Some(part)) => { if self.delay > 0 { sleep(Duration::from_millis(self.delay as u64)).await; tracing::debug!("sleep for {}ms", self.delay); diff --git a/src/query/storages/preludes/src/memory/memory_table_stream.rs b/src/query/storages/preludes/src/memory/memory_table_stream.rs index 6b83c3e41a4f8..e11f676b0e1d7 100644 --- a/src/query/storages/preludes/src/memory/memory_table_stream.rs +++ b/src/query/storages/preludes/src/memory/memory_table_stream.rs @@ -49,18 +49,19 @@ impl MemoryTableStream { fn try_get_one_block(&mut self) -> Result> { if (self.block_index as usize) == self.block_ranges.len() { - let partitions = self.ctx.try_get_partitions(1)?; - if partitions.is_empty() { + let part_info = self.ctx.try_get_part(); + if part_info.is_none() { return Ok(None); } - let mut block_ranges = Vec::with_capacity(partitions.len()); + let mut block_ranges = vec![]; - for part in partitions { + if let Some(part) = part_info { let memory_part = MemoryPartInfo::from_part(&part)?; let s: Vec = (memory_part.part_start..memory_part.part_end).collect(); block_ranges.extend_from_slice(&s); } + self.block_ranges = block_ranges; self.block_index = 0; } From 0af6108d43c36941895695e3c99de799afb54e34 Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Wed, 28 Sep 2022 13:21:49 +0800 Subject: [PATCH 07/20] feat(cluster): try fix test failure --- .../pipelines/executor/pipeline_executor.rs | 29 ++++++++++++------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/src/query/service/src/pipelines/executor/pipeline_executor.rs b/src/query/service/src/pipelines/executor/pipeline_executor.rs index 2a10a4cf09218..3f86288657be0 100644 --- a/src/query/service/src/pipelines/executor/pipeline_executor.rs +++ b/src/query/service/src/pipelines/executor/pipeline_executor.rs @@ -130,14 +130,6 @@ impl PipelineExecutor { let workers_condvar = WorkersCondvar::create(threads_num); let global_tasks_queue = ExecutorTasksQueue::create(threads_num); - let mut init_schedule_queue = graph.init_schedule_queue()?; - - let mut tasks = VecDeque::new(); - while let Some(task) = init_schedule_queue.pop_task() { - tasks.push_back(task); - } - global_tasks_queue.init_tasks(tasks); - Ok(Arc::new(PipelineExecutor { graph, threads_num, @@ -164,8 +156,7 @@ impl PipelineExecutor { } pub fn execute(self: &Arc) -> Result<()> { - // TODO: the on init callback cannot be killed. - (self.on_init_callback)()?; + self.init()?; self.start_executor_daemon()?; @@ -193,6 +184,24 @@ impl PipelineExecutor { Ok(()) } + fn init(self: &Arc) -> Result<()> { + unsafe { + // TODO: the on init callback cannot be killed. + (self.on_init_callback)()?; + + let mut init_schedule_queue = self.graph.init_schedule_queue()?; + + let mut tasks = VecDeque::new(); + while let Some(task) = init_schedule_queue.pop_task() { + tasks.push_back(task); + } + + self.global_tasks_queue.init_tasks(tasks); + + Ok(()) + } + } + fn start_executor_daemon(self: &Arc) -> Result<()> { if !self.settings.max_execute_time.is_zero() { let this = self.clone(); From d0f9ec9a90c31c15b76c53e2a955e08d60c606e6 Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Wed, 28 Sep 2022 13:48:02 +0800 Subject: [PATCH 08/20] feat(cluster): try fix test failure --- src/query/storages/fuse/src/operations/read_partitions.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/query/storages/fuse/src/operations/read_partitions.rs b/src/query/storages/fuse/src/operations/read_partitions.rs index 6c09086c73960..2c985a859d0f6 100644 --- a/src/query/storages/fuse/src/operations/read_partitions.rs +++ b/src/query/storages/fuse/src/operations/read_partitions.rs @@ -60,11 +60,10 @@ impl FuseTable { segments.push(FuseLazyPartInfo::create(segment_location.clone())) } - println!("elapsed {:?}", instant.elapsed()); return Ok(( Statistics::new_estimated( snapshot.summary.row_count as usize, - snapshot.summary.uncompressed_byte_size as usize, + snapshot.summary.compressed_byte_size as usize, snapshot.segments.len(), snapshot.segments.len(), ), From 09913804e673bd1874836d31ceb57b3108861aa1 Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Wed, 28 Sep 2022 14:35:43 +0800 Subject: [PATCH 09/20] feat(cluster): try fix test failure --- .../mode/standalone/explain/fold_count.test | 6 ++-- .../explain/join_reorder/chain.test | 36 +++++++++---------- .../explain/join_reorder/cycles.test | 36 +++++++++---------- .../standalone/explain/join_reorder/star.test | 36 +++++++++---------- .../standalone/explain/nullable_prune.test | 14 ++++---- .../explain/select_limit_offset.test | 4 +-- 6 files changed, 66 insertions(+), 66 deletions(-) diff --git a/tests/logictest/suites/mode/standalone/explain/fold_count.test b/tests/logictest/suites/mode/standalone/explain/fold_count.test index f0ceb43b7a6a1..27b9981add790 100644 --- a/tests/logictest/suites/mode/standalone/explain/fold_count.test +++ b/tests/logictest/suites/mode/standalone/explain/fold_count.test @@ -41,10 +41,10 @@ EvalScalar ├── aggregate functions: [count()] └── TableScan ├── table: default.default.t - ├── read rows: 1000 - ├── read bytes: 4028 + ├── read rows: 1001 + ├── read bytes: 4434 ├── partitions total: 2 - ├── partitions scanned: 1 + ├── partitions scanned: 2 └── push downs: [filters: [(number > 10)], limit: NONE] statement ok diff --git a/tests/logictest/suites/mode/standalone/explain/join_reorder/chain.test b/tests/logictest/suites/mode/standalone/explain/join_reorder/chain.test index 1e71baee96ca9..016486e2c63cb 100644 --- a/tests/logictest/suites/mode/standalone/explain/join_reorder/chain.test +++ b/tests/logictest/suites/mode/standalone/explain/join_reorder/chain.test @@ -30,21 +30,21 @@ HashJoin │ ├── TableScan(Build) │ │ ├── table: default.join_reorder.t │ │ ├── read rows: 1 -│ │ ├── read bytes: 31 +│ │ ├── read bytes: 197 │ │ ├── partitions total: 1 │ │ ├── partitions scanned: 1 │ │ └── push downs: [filters: [], limit: NONE] │ └── TableScan(Probe) │ ├── table: default.join_reorder.t1 │ ├── read rows: 10 -│ ├── read bytes: 68 +│ ├── read bytes: 243 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] └── TableScan(Probe) ├── table: default.join_reorder.t2 ├── read rows: 100 - ├── read bytes: 431 + ├── read bytes: 610 ├── partitions total: 1 ├── partitions scanned: 1 └── push downs: [filters: [], limit: NONE] @@ -61,7 +61,7 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t1 │ ├── read rows: 10 -│ ├── read bytes: 68 +│ ├── read bytes: 80 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] @@ -73,14 +73,14 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t │ ├── read rows: 1 - │ ├── read bytes: 31 + │ ├── read bytes: 8 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] └── TableScan(Probe) ├── table: default.join_reorder.t2 ├── read rows: 100 - ├── read bytes: 431 + ├── read bytes: 800 ├── partitions total: 1 ├── partitions scanned: 1 └── push downs: [filters: [], limit: NONE] @@ -102,21 +102,21 @@ HashJoin │ ├── TableScan(Build) │ │ ├── table: default.join_reorder.t │ │ ├── read rows: 1 -│ │ ├── read bytes: 31 +│ │ ├── read bytes: 8 │ │ ├── partitions total: 1 │ │ ├── partitions scanned: 1 │ │ └── push downs: [filters: [], limit: NONE] │ └── TableScan(Probe) │ ├── table: default.join_reorder.t1 │ ├── read rows: 10 -│ ├── read bytes: 68 +│ ├── read bytes: 80 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] └── TableScan(Probe) ├── table: default.join_reorder.t2 ├── read rows: 100 - ├── read bytes: 431 + ├── read bytes: 800 ├── partitions total: 1 ├── partitions scanned: 1 └── push downs: [filters: [], limit: NONE] @@ -133,7 +133,7 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t │ ├── read rows: 1 -│ ├── read bytes: 31 +│ ├── read bytes: 8 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] @@ -145,14 +145,14 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t1 │ ├── read rows: 10 - │ ├── read bytes: 68 + │ ├── read bytes: 80 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] └── TableScan(Probe) ├── table: default.join_reorder.t2 ├── read rows: 100 - ├── read bytes: 431 + ├── read bytes: 800 ├── partitions total: 1 ├── partitions scanned: 1 └── push downs: [filters: [], limit: NONE] @@ -169,7 +169,7 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t │ ├── read rows: 1 -│ ├── read bytes: 31 +│ ├── read bytes: 8 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] @@ -181,14 +181,14 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t1 │ ├── read rows: 10 - │ ├── read bytes: 68 + │ ├── read bytes: 80 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] └── TableScan(Probe) ├── table: default.join_reorder.t2 ├── read rows: 100 - ├── read bytes: 431 + ├── read bytes: 800 ├── partitions total: 1 ├── partitions scanned: 1 └── push downs: [filters: [], limit: NONE] @@ -205,7 +205,7 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t1 │ ├── read rows: 10 -│ ├── read bytes: 68 +│ ├── read bytes: 80 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] @@ -217,14 +217,14 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t │ ├── read rows: 1 - │ ├── read bytes: 31 + │ ├── read bytes: 8 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] └── TableScan(Probe) ├── table: default.join_reorder.t2 ├── read rows: 100 - ├── read bytes: 431 + ├── read bytes: 800 ├── partitions total: 1 ├── partitions scanned: 1 └── push downs: [filters: [], limit: NONE] diff --git a/tests/logictest/suites/mode/standalone/explain/join_reorder/cycles.test b/tests/logictest/suites/mode/standalone/explain/join_reorder/cycles.test index a370801c2013e..f49ce3cf7b949 100644 --- a/tests/logictest/suites/mode/standalone/explain/join_reorder/cycles.test +++ b/tests/logictest/suites/mode/standalone/explain/join_reorder/cycles.test @@ -30,21 +30,21 @@ HashJoin │ ├── TableScan(Build) │ │ ├── table: default.join_reorder.t │ │ ├── read rows: 1 -│ │ ├── read bytes: 31 +│ │ ├── read bytes: 8 │ │ ├── partitions total: 1 │ │ ├── partitions scanned: 1 │ │ └── push downs: [filters: [], limit: NONE] │ └── TableScan(Probe) │ ├── table: default.join_reorder.t1 │ ├── read rows: 10 -│ ├── read bytes: 68 +│ ├── read bytes: 80 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] └── TableScan(Probe) ├── table: default.join_reorder.t2 ├── read rows: 100 - ├── read bytes: 431 + ├── read bytes: 800 ├── partitions total: 1 ├── partitions scanned: 1 └── push downs: [filters: [], limit: NONE] @@ -61,7 +61,7 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t1 │ ├── read rows: 10 -│ ├── read bytes: 68 +│ ├── read bytes: 80 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] @@ -73,14 +73,14 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t │ ├── read rows: 1 - │ ├── read bytes: 31 + │ ├── read bytes: 8 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] └── TableScan(Probe) ├── table: default.join_reorder.t2 ├── read rows: 100 - ├── read bytes: 431 + ├── read bytes: 800 ├── partitions total: 1 ├── partitions scanned: 1 └── push downs: [filters: [], limit: NONE] @@ -102,21 +102,21 @@ HashJoin │ ├── TableScan(Build) │ │ ├── table: default.join_reorder.t │ │ ├── read rows: 1 -│ │ ├── read bytes: 31 +│ │ ├── read bytes: 8 │ │ ├── partitions total: 1 │ │ ├── partitions scanned: 1 │ │ └── push downs: [filters: [], limit: NONE] │ └── TableScan(Probe) │ ├── table: default.join_reorder.t1 │ ├── read rows: 10 -│ ├── read bytes: 68 +│ ├── read bytes: 80 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] └── TableScan(Probe) ├── table: default.join_reorder.t2 ├── read rows: 100 - ├── read bytes: 431 + ├── read bytes: 800 ├── partitions total: 1 ├── partitions scanned: 1 └── push downs: [filters: [], limit: NONE] @@ -133,7 +133,7 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t │ ├── read rows: 1 -│ ├── read bytes: 31 +│ ├── read bytes: 8 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] @@ -145,14 +145,14 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t1 │ ├── read rows: 10 - │ ├── read bytes: 68 + │ ├── read bytes: 80 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] └── TableScan(Probe) ├── table: default.join_reorder.t2 ├── read rows: 100 - ├── read bytes: 431 + ├── read bytes: 800 ├── partitions total: 1 ├── partitions scanned: 1 └── push downs: [filters: [], limit: NONE] @@ -169,7 +169,7 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t │ ├── read rows: 1 -│ ├── read bytes: 31 +│ ├── read bytes: 8 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] @@ -181,14 +181,14 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t1 │ ├── read rows: 10 - │ ├── read bytes: 68 + │ ├── read bytes: 80 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] └── TableScan(Probe) ├── table: default.join_reorder.t2 ├── read rows: 100 - ├── read bytes: 431 + ├── read bytes: 800 ├── partitions total: 1 ├── partitions scanned: 1 └── push downs: [filters: [], limit: NONE] @@ -205,7 +205,7 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t1 │ ├── read rows: 10 -│ ├── read bytes: 68 +│ ├── read bytes: 80 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] @@ -217,14 +217,14 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t │ ├── read rows: 1 - │ ├── read bytes: 31 + │ ├── read bytes: 8 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] └── TableScan(Probe) ├── table: default.join_reorder.t2 ├── read rows: 100 - ├── read bytes: 431 + ├── read bytes: 800 ├── partitions total: 1 ├── partitions scanned: 1 └── push downs: [filters: [], limit: NONE] diff --git a/tests/logictest/suites/mode/standalone/explain/join_reorder/star.test b/tests/logictest/suites/mode/standalone/explain/join_reorder/star.test index 5971e7fc065c2..99d08de533907 100644 --- a/tests/logictest/suites/mode/standalone/explain/join_reorder/star.test +++ b/tests/logictest/suites/mode/standalone/explain/join_reorder/star.test @@ -30,21 +30,21 @@ HashJoin │ ├── TableScan(Build) │ │ ├── table: default.join_reorder.t │ │ ├── read rows: 1 -│ │ ├── read bytes: 31 +│ │ ├── read bytes: 8 │ │ ├── partitions total: 1 │ │ ├── partitions scanned: 1 │ │ └── push downs: [filters: [], limit: NONE] │ └── TableScan(Probe) │ ├── table: default.join_reorder.t1 │ ├── read rows: 10 -│ ├── read bytes: 68 +│ ├── read bytes: 80 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] └── TableScan(Probe) ├── table: default.join_reorder.t2 ├── read rows: 100 - ├── read bytes: 431 + ├── read bytes: 800 ├── partitions total: 1 ├── partitions scanned: 1 └── push downs: [filters: [], limit: NONE] @@ -61,7 +61,7 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t1 │ ├── read rows: 10 -│ ├── read bytes: 68 +│ ├── read bytes: 80 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] @@ -73,14 +73,14 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t │ ├── read rows: 1 - │ ├── read bytes: 31 + │ ├── read bytes: 8 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] └── TableScan(Probe) ├── table: default.join_reorder.t2 ├── read rows: 100 - ├── read bytes: 431 + ├── read bytes: 800 ├── partitions total: 1 ├── partitions scanned: 1 └── push downs: [filters: [], limit: NONE] @@ -102,21 +102,21 @@ HashJoin │ ├── TableScan(Build) │ │ ├── table: default.join_reorder.t │ │ ├── read rows: 1 -│ │ ├── read bytes: 31 +│ │ ├── read bytes: 8 │ │ ├── partitions total: 1 │ │ ├── partitions scanned: 1 │ │ └── push downs: [filters: [], limit: NONE] │ └── TableScan(Probe) │ ├── table: default.join_reorder.t1 │ ├── read rows: 10 -│ ├── read bytes: 68 +│ ├── read bytes: 80 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] └── TableScan(Probe) ├── table: default.join_reorder.t2 ├── read rows: 100 - ├── read bytes: 431 + ├── read bytes: 800 ├── partitions total: 1 ├── partitions scanned: 1 └── push downs: [filters: [], limit: NONE] @@ -133,7 +133,7 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t │ ├── read rows: 1 -│ ├── read bytes: 31 +│ ├── read bytes: 8 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] @@ -145,14 +145,14 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t1 │ ├── read rows: 10 - │ ├── read bytes: 68 + │ ├── read bytes: 80 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] └── TableScan(Probe) ├── table: default.join_reorder.t2 ├── read rows: 100 - ├── read bytes: 431 + ├── read bytes: 800 ├── partitions total: 1 ├── partitions scanned: 1 └── push downs: [filters: [], limit: NONE] @@ -169,7 +169,7 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t │ ├── read rows: 1 -│ ├── read bytes: 31 +│ ├── read bytes: 8 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] @@ -181,14 +181,14 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t1 │ ├── read rows: 10 - │ ├── read bytes: 68 + │ ├── read bytes: 80 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] └── TableScan(Probe) ├── table: default.join_reorder.t2 ├── read rows: 100 - ├── read bytes: 431 + ├── read bytes: 800 ├── partitions total: 1 ├── partitions scanned: 1 └── push downs: [filters: [], limit: NONE] @@ -205,7 +205,7 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t1 │ ├── read rows: 10 -│ ├── read bytes: 68 +│ ├── read bytes: 80 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] @@ -217,14 +217,14 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t │ ├── read rows: 1 - │ ├── read bytes: 31 + │ ├── read bytes: 8 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] └── TableScan(Probe) ├── table: default.join_reorder.t2 ├── read rows: 100 - ├── read bytes: 431 + ├── read bytes: 800 ├── partitions total: 1 ├── partitions scanned: 1 └── push downs: [filters: [], limit: NONE] diff --git a/tests/logictest/suites/mode/standalone/explain/nullable_prune.test b/tests/logictest/suites/mode/standalone/explain/nullable_prune.test index f7d122b6bc458..a40d234fc92c8 100644 --- a/tests/logictest/suites/mode/standalone/explain/nullable_prune.test +++ b/tests/logictest/suites/mode/standalone/explain/nullable_prune.test @@ -14,7 +14,7 @@ explain select * from t_nullable_prune; TableScan ├── table: default.default.t_nullable_prune ├── read rows: 6 -├── read bytes: 62 +├── read bytes: 26 ├── partitions total: 2 ├── partitions scanned: 2 └── push downs: [filters: [], limit: NONE] @@ -25,10 +25,10 @@ explain select * from t_nullable_prune where a is not null; ---- TableScan ├── table: default.default.t_nullable_prune -├── read rows: 3 -├── read bytes: 37 +├── read rows: 6 +├── read bytes: 26 ├── partitions total: 2 -├── partitions scanned: 1 +├── partitions scanned: 2 └── push downs: [filters: [is_not_null(a)], limit: NONE] statement query T @@ -37,10 +37,10 @@ explain select * from t_nullable_prune where a is null; ---- TableScan ├── table: default.default.t_nullable_prune -├── read rows: 3 -├── read bytes: 25 +├── read rows: 6 +├── read bytes: 26 ├── partitions total: 2 -├── partitions scanned: 1 +├── partitions scanned: 2 └── push downs: [filters: [not(is_not_null(a))], limit: NONE] statement ok diff --git a/tests/logictest/suites/mode/standalone/explain/select_limit_offset.test b/tests/logictest/suites/mode/standalone/explain/select_limit_offset.test index e2dce241eb91e..5f46a981fa9b3 100644 --- a/tests/logictest/suites/mode/standalone/explain/select_limit_offset.test +++ b/tests/logictest/suites/mode/standalone/explain/select_limit_offset.test @@ -150,7 +150,7 @@ Limit │ └── TableScan │ ├── table: default.default.t1 │ ├── read rows: 2 - │ ├── read bytes: 31 + │ ├── read bytes: 188 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: 3] @@ -160,7 +160,7 @@ Limit └── TableScan ├── table: default.default.t ├── read rows: 1 - ├── read bytes: 27 + ├── read bytes: 184 ├── partitions total: 1 ├── partitions scanned: 1 └── push downs: [filters: [], limit: 3] From 32e2d1312ff6faa9b2057067e50faea9fa42513f Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Wed, 28 Sep 2022 15:20:06 +0800 Subject: [PATCH 10/20] feat(cluster): try fix test failure --- .../explain/join_reorder/chain.test | 30 +++++++++---------- .../standalone/explain/nullable_prune.test | 6 ++-- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/tests/logictest/suites/mode/standalone/explain/join_reorder/chain.test b/tests/logictest/suites/mode/standalone/explain/join_reorder/chain.test index 016486e2c63cb..f6eabd6634ae0 100644 --- a/tests/logictest/suites/mode/standalone/explain/join_reorder/chain.test +++ b/tests/logictest/suites/mode/standalone/explain/join_reorder/chain.test @@ -61,7 +61,7 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t1 │ ├── read rows: 10 -│ ├── read bytes: 80 +│ ├── read bytes: 243 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] @@ -73,14 +73,14 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t │ ├── read rows: 1 - │ ├── read bytes: 8 + │ ├── read bytes: 197 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] └── TableScan(Probe) ├── table: default.join_reorder.t2 ├── read rows: 100 - ├── read bytes: 800 + ├── read bytes: 610 ├── partitions total: 1 ├── partitions scanned: 1 └── push downs: [filters: [], limit: NONE] @@ -102,21 +102,21 @@ HashJoin │ ├── TableScan(Build) │ │ ├── table: default.join_reorder.t │ │ ├── read rows: 1 -│ │ ├── read bytes: 8 +│ │ ├── read bytes: 197 │ │ ├── partitions total: 1 │ │ ├── partitions scanned: 1 │ │ └── push downs: [filters: [], limit: NONE] │ └── TableScan(Probe) │ ├── table: default.join_reorder.t1 │ ├── read rows: 10 -│ ├── read bytes: 80 +│ ├── read bytes: 243 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] └── TableScan(Probe) ├── table: default.join_reorder.t2 ├── read rows: 100 - ├── read bytes: 800 + ├── read bytes: 610 ├── partitions total: 1 ├── partitions scanned: 1 └── push downs: [filters: [], limit: NONE] @@ -133,7 +133,7 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t │ ├── read rows: 1 -│ ├── read bytes: 8 +│ ├── read bytes: 197 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] @@ -145,14 +145,14 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t1 │ ├── read rows: 10 - │ ├── read bytes: 80 + │ ├── read bytes: 243 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] └── TableScan(Probe) ├── table: default.join_reorder.t2 ├── read rows: 100 - ├── read bytes: 800 + ├── read bytes: 610 ├── partitions total: 1 ├── partitions scanned: 1 └── push downs: [filters: [], limit: NONE] @@ -169,7 +169,7 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t │ ├── read rows: 1 -│ ├── read bytes: 8 +│ ├── read bytes: 197 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] @@ -181,14 +181,14 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t1 │ ├── read rows: 10 - │ ├── read bytes: 80 + │ ├── read bytes: 243 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] └── TableScan(Probe) ├── table: default.join_reorder.t2 ├── read rows: 100 - ├── read bytes: 800 + ├── read bytes: 610 ├── partitions total: 1 ├── partitions scanned: 1 └── push downs: [filters: [], limit: NONE] @@ -205,7 +205,7 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t1 │ ├── read rows: 10 -│ ├── read bytes: 80 +│ ├── read bytes: 243 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] @@ -217,14 +217,14 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t │ ├── read rows: 1 - │ ├── read bytes: 8 + │ ├── read bytes: 197 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] └── TableScan(Probe) ├── table: default.join_reorder.t2 ├── read rows: 100 - ├── read bytes: 800 + ├── read bytes: 610 ├── partitions total: 1 ├── partitions scanned: 1 └── push downs: [filters: [], limit: NONE] diff --git a/tests/logictest/suites/mode/standalone/explain/nullable_prune.test b/tests/logictest/suites/mode/standalone/explain/nullable_prune.test index a40d234fc92c8..3cfbb082ec7ea 100644 --- a/tests/logictest/suites/mode/standalone/explain/nullable_prune.test +++ b/tests/logictest/suites/mode/standalone/explain/nullable_prune.test @@ -14,7 +14,7 @@ explain select * from t_nullable_prune; TableScan ├── table: default.default.t_nullable_prune ├── read rows: 6 -├── read bytes: 26 +├── read bytes: 376 ├── partitions total: 2 ├── partitions scanned: 2 └── push downs: [filters: [], limit: NONE] @@ -26,7 +26,7 @@ explain select * from t_nullable_prune where a is not null; TableScan ├── table: default.default.t_nullable_prune ├── read rows: 6 -├── read bytes: 26 +├── read bytes: 376 ├── partitions total: 2 ├── partitions scanned: 2 └── push downs: [filters: [is_not_null(a)], limit: NONE] @@ -38,7 +38,7 @@ explain select * from t_nullable_prune where a is null; TableScan ├── table: default.default.t_nullable_prune ├── read rows: 6 -├── read bytes: 26 +├── read bytes: 376 ├── partitions total: 2 ├── partitions scanned: 2 └── push downs: [filters: [not(is_not_null(a))], limit: NONE] From 4efc564d4f43e566b5d8b85a589c8ec354536bb7 Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Wed, 28 Sep 2022 16:03:51 +0800 Subject: [PATCH 11/20] feat(cluster): try fix test failure --- .../explain/join_reorder/cycles.test | 36 +++++++++---------- .../standalone/explain/join_reorder/star.test | 36 +++++++++---------- 2 files changed, 36 insertions(+), 36 deletions(-) diff --git a/tests/logictest/suites/mode/standalone/explain/join_reorder/cycles.test b/tests/logictest/suites/mode/standalone/explain/join_reorder/cycles.test index f49ce3cf7b949..6c7daf652c2a3 100644 --- a/tests/logictest/suites/mode/standalone/explain/join_reorder/cycles.test +++ b/tests/logictest/suites/mode/standalone/explain/join_reorder/cycles.test @@ -30,21 +30,21 @@ HashJoin │ ├── TableScan(Build) │ │ ├── table: default.join_reorder.t │ │ ├── read rows: 1 -│ │ ├── read bytes: 8 +│ │ ├── read bytes: 197 │ │ ├── partitions total: 1 │ │ ├── partitions scanned: 1 │ │ └── push downs: [filters: [], limit: NONE] │ └── TableScan(Probe) │ ├── table: default.join_reorder.t1 │ ├── read rows: 10 -│ ├── read bytes: 80 +│ ├── read bytes: 243 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] └── TableScan(Probe) ├── table: default.join_reorder.t2 ├── read rows: 100 - ├── read bytes: 800 + ├── read bytes: 610 ├── partitions total: 1 ├── partitions scanned: 1 └── push downs: [filters: [], limit: NONE] @@ -61,7 +61,7 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t1 │ ├── read rows: 10 -│ ├── read bytes: 80 +│ ├── read bytes: 243 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] @@ -73,14 +73,14 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t │ ├── read rows: 1 - │ ├── read bytes: 8 + │ ├── read bytes: 197 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] └── TableScan(Probe) ├── table: default.join_reorder.t2 ├── read rows: 100 - ├── read bytes: 800 + ├── read bytes: 610 ├── partitions total: 1 ├── partitions scanned: 1 └── push downs: [filters: [], limit: NONE] @@ -102,21 +102,21 @@ HashJoin │ ├── TableScan(Build) │ │ ├── table: default.join_reorder.t │ │ ├── read rows: 1 -│ │ ├── read bytes: 8 +│ │ ├── read bytes: 197 │ │ ├── partitions total: 1 │ │ ├── partitions scanned: 1 │ │ └── push downs: [filters: [], limit: NONE] │ └── TableScan(Probe) │ ├── table: default.join_reorder.t1 │ ├── read rows: 10 -│ ├── read bytes: 80 +│ ├── read bytes: 243 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] └── TableScan(Probe) ├── table: default.join_reorder.t2 ├── read rows: 100 - ├── read bytes: 800 + ├── read bytes: 610 ├── partitions total: 1 ├── partitions scanned: 1 └── push downs: [filters: [], limit: NONE] @@ -133,7 +133,7 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t │ ├── read rows: 1 -│ ├── read bytes: 8 +│ ├── read bytes: 197 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] @@ -145,14 +145,14 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t1 │ ├── read rows: 10 - │ ├── read bytes: 80 + │ ├── read bytes: 243 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] └── TableScan(Probe) ├── table: default.join_reorder.t2 ├── read rows: 100 - ├── read bytes: 800 + ├── read bytes: 610 ├── partitions total: 1 ├── partitions scanned: 1 └── push downs: [filters: [], limit: NONE] @@ -169,7 +169,7 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t │ ├── read rows: 1 -│ ├── read bytes: 8 +│ ├── read bytes: 197 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] @@ -181,14 +181,14 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t1 │ ├── read rows: 10 - │ ├── read bytes: 80 + │ ├── read bytes: 243 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] └── TableScan(Probe) ├── table: default.join_reorder.t2 ├── read rows: 100 - ├── read bytes: 800 + ├── read bytes: 610 ├── partitions total: 1 ├── partitions scanned: 1 └── push downs: [filters: [], limit: NONE] @@ -205,7 +205,7 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t1 │ ├── read rows: 10 -│ ├── read bytes: 80 +│ ├── read bytes: 243 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] @@ -217,14 +217,14 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t │ ├── read rows: 1 - │ ├── read bytes: 8 + │ ├── read bytes: 197 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] └── TableScan(Probe) ├── table: default.join_reorder.t2 ├── read rows: 100 - ├── read bytes: 800 + ├── read bytes: 610 ├── partitions total: 1 ├── partitions scanned: 1 └── push downs: [filters: [], limit: NONE] diff --git a/tests/logictest/suites/mode/standalone/explain/join_reorder/star.test b/tests/logictest/suites/mode/standalone/explain/join_reorder/star.test index 99d08de533907..6d38f6c1bc8b5 100644 --- a/tests/logictest/suites/mode/standalone/explain/join_reorder/star.test +++ b/tests/logictest/suites/mode/standalone/explain/join_reorder/star.test @@ -30,21 +30,21 @@ HashJoin │ ├── TableScan(Build) │ │ ├── table: default.join_reorder.t │ │ ├── read rows: 1 -│ │ ├── read bytes: 8 +│ │ ├── read bytes: 197 │ │ ├── partitions total: 1 │ │ ├── partitions scanned: 1 │ │ └── push downs: [filters: [], limit: NONE] │ └── TableScan(Probe) │ ├── table: default.join_reorder.t1 │ ├── read rows: 10 -│ ├── read bytes: 80 +│ ├── read bytes: 243 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] └── TableScan(Probe) ├── table: default.join_reorder.t2 ├── read rows: 100 - ├── read bytes: 800 + ├── read bytes: 610 ├── partitions total: 1 ├── partitions scanned: 1 └── push downs: [filters: [], limit: NONE] @@ -61,7 +61,7 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t1 │ ├── read rows: 10 -│ ├── read bytes: 80 +│ ├── read bytes: 243 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] @@ -73,14 +73,14 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t │ ├── read rows: 1 - │ ├── read bytes: 8 + │ ├── read bytes: 197 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] └── TableScan(Probe) ├── table: default.join_reorder.t2 ├── read rows: 100 - ├── read bytes: 800 + ├── read bytes: 610 ├── partitions total: 1 ├── partitions scanned: 1 └── push downs: [filters: [], limit: NONE] @@ -102,21 +102,21 @@ HashJoin │ ├── TableScan(Build) │ │ ├── table: default.join_reorder.t │ │ ├── read rows: 1 -│ │ ├── read bytes: 8 +│ │ ├── read bytes: 197 │ │ ├── partitions total: 1 │ │ ├── partitions scanned: 1 │ │ └── push downs: [filters: [], limit: NONE] │ └── TableScan(Probe) │ ├── table: default.join_reorder.t1 │ ├── read rows: 10 -│ ├── read bytes: 80 +│ ├── read bytes: 243 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] └── TableScan(Probe) ├── table: default.join_reorder.t2 ├── read rows: 100 - ├── read bytes: 800 + ├── read bytes: 610 ├── partitions total: 1 ├── partitions scanned: 1 └── push downs: [filters: [], limit: NONE] @@ -133,7 +133,7 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t │ ├── read rows: 1 -│ ├── read bytes: 8 +│ ├── read bytes: 197 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] @@ -145,14 +145,14 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t1 │ ├── read rows: 10 - │ ├── read bytes: 80 + │ ├── read bytes: 243 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] └── TableScan(Probe) ├── table: default.join_reorder.t2 ├── read rows: 100 - ├── read bytes: 800 + ├── read bytes: 610 ├── partitions total: 1 ├── partitions scanned: 1 └── push downs: [filters: [], limit: NONE] @@ -169,7 +169,7 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t │ ├── read rows: 1 -│ ├── read bytes: 8 +│ ├── read bytes: 197 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] @@ -181,14 +181,14 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t1 │ ├── read rows: 10 - │ ├── read bytes: 80 + │ ├── read bytes: 243 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] └── TableScan(Probe) ├── table: default.join_reorder.t2 ├── read rows: 100 - ├── read bytes: 800 + ├── read bytes: 610 ├── partitions total: 1 ├── partitions scanned: 1 └── push downs: [filters: [], limit: NONE] @@ -205,7 +205,7 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t1 │ ├── read rows: 10 -│ ├── read bytes: 80 +│ ├── read bytes: 243 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] @@ -217,14 +217,14 @@ HashJoin ├── TableScan(Build) │ ├── table: default.join_reorder.t │ ├── read rows: 1 - │ ├── read bytes: 8 + │ ├── read bytes: 197 │ ├── partitions total: 1 │ ├── partitions scanned: 1 │ └── push downs: [filters: [], limit: NONE] └── TableScan(Probe) ├── table: default.join_reorder.t2 ├── read rows: 100 - ├── read bytes: 800 + ├── read bytes: 610 ├── partitions total: 1 ├── partitions scanned: 1 └── push downs: [filters: [], limit: NONE] From c05c112086baae757c8495614b9b3d3592994e39 Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Wed, 28 Sep 2022 22:25:31 +0800 Subject: [PATCH 12/20] feat(cluster): try fix test failure --- .../storages/fuse/src/operations/read.rs | 18 ++++++----------- .../fuse/src/operations/read_partitions.rs | 20 +++++++++++++++---- 2 files changed, 22 insertions(+), 16 deletions(-) diff --git a/src/query/storages/fuse/src/operations/read.rs b/src/query/storages/fuse/src/operations/read.rs index 06a0e03ccb1e2..678744bfc301c 100644 --- a/src/query/storages/fuse/src/operations/read.rs +++ b/src/query/storages/fuse/src/operations/read.rs @@ -104,18 +104,12 @@ impl FuseTable { let push_downs = push_downs.clone(); let lazy_init_segments = lazy_init_segments.clone(); - let partitions = Runtime::with_worker_threads(2, None)?.block_on(async move { - let (_statistics, partitions) = FuseTable::prune_snapshot_blocks( - ctx, - push_downs, - table_info, - lazy_init_segments, - 0, - ) - .await?; - - Result::<_, ErrorCode>::Ok(partitions) - })?; + let (_statistics, partitions) = FuseTable::sync_prune_snapshot_blocks( + ctx, + push_downs, + table_info, + lazy_init_segments, + )?; query_ctx.try_set_partitions(partitions)?; diff --git a/src/query/storages/fuse/src/operations/read_partitions.rs b/src/query/storages/fuse/src/operations/read_partitions.rs index 2c985a859d0f6..5186d8ff63330 100644 --- a/src/query/storages/fuse/src/operations/read_partitions.rs +++ b/src/query/storages/fuse/src/operations/read_partitions.rs @@ -74,21 +74,33 @@ impl FuseTable { let table_info = self.table_info.clone(); let segments_location = snapshot.segments.clone(); let summary = snapshot.summary.block_count as usize; - let s = Self::prune_snapshot_blocks( + Self::prune_snapshot_blocks( ctx.clone(), push_downs.clone(), table_info, segments_location, summary, ) - .await; - println!("elapsed {:?}", instant.elapsed()); - s + .await } None => Ok((Statistics::default(), vec![])), } } + pub fn sync_prune_snapshot_blocks( + ctx: Arc, + push_downs: Option, + table_info: TableInfo, + segments_location: Vec, + ) -> Result<(Statistics, Partitions)> { + let block_metas = + BlockPruner::sync_prune(&ctx, table_info.schema(), &push_downs, segments_location)? + .into_iter() + .map(|(_, v)| v) + .collect::>(); + Self::read_partitions_with_metas(ctx, table_info.schema(), push_downs, block_metas, 0) + } + pub async fn prune_snapshot_blocks( ctx: Arc, push_downs: Option, From e5faca074f21bc23347ae520a69bccf3215777cf Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Thu, 29 Sep 2022 09:28:47 +0800 Subject: [PATCH 13/20] feat(cluster): try fix test failure --- .../executor/pipeline_complete_executor.rs | 9 ++++++++- src/query/storages/fuse/src/operations/read.rs | 18 ++++++++++++------ 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/src/query/service/src/pipelines/executor/pipeline_complete_executor.rs b/src/query/service/src/pipelines/executor/pipeline_complete_executor.rs index 6bc0f65e96a61..a16870275971d 100644 --- a/src/query/service/src/pipelines/executor/pipeline_complete_executor.rs +++ b/src/query/service/src/pipelines/executor/pipeline_complete_executor.rs @@ -14,6 +14,7 @@ use std::sync::Arc; +use common_base::base::Thread; use common_exception::ErrorCode; use common_exception::Result; @@ -67,7 +68,13 @@ impl PipelineCompleteExecutor { } pub fn execute(&self) -> Result<()> { - self.executor.execute() + let executor = self.executor.clone(); + let execute_thread = + Thread::named_spawn(Some(String::from("CompleteExecutor")), move || { + executor.execute() + }); + + execute_thread.join().flatten() } } diff --git a/src/query/storages/fuse/src/operations/read.rs b/src/query/storages/fuse/src/operations/read.rs index 678744bfc301c..06a0e03ccb1e2 100644 --- a/src/query/storages/fuse/src/operations/read.rs +++ b/src/query/storages/fuse/src/operations/read.rs @@ -104,12 +104,18 @@ impl FuseTable { let push_downs = push_downs.clone(); let lazy_init_segments = lazy_init_segments.clone(); - let (_statistics, partitions) = FuseTable::sync_prune_snapshot_blocks( - ctx, - push_downs, - table_info, - lazy_init_segments, - )?; + let partitions = Runtime::with_worker_threads(2, None)?.block_on(async move { + let (_statistics, partitions) = FuseTable::prune_snapshot_blocks( + ctx, + push_downs, + table_info, + lazy_init_segments, + 0, + ) + .await?; + + Result::<_, ErrorCode>::Ok(partitions) + })?; query_ctx.try_set_partitions(partitions)?; From da5dcb8bd1e13f4fa39bdc3d5679840af2f3afee Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Thu, 29 Sep 2022 12:25:12 +0800 Subject: [PATCH 14/20] feat(cluster): try fix unit test failure --- .../pipelines/executor/pipeline_executor.rs | 32 +++++++++---------- .../storages/result/result_table_source.rs | 2 +- .../service/tests/it/storages/fuse/table.rs | 8 ++--- .../storages/fuse/src/operations/read.rs | 3 +- .../fuse/src/operations/read_partitions.rs | 2 -- 5 files changed, 21 insertions(+), 26 deletions(-) diff --git a/src/query/service/src/pipelines/executor/pipeline_executor.rs b/src/query/service/src/pipelines/executor/pipeline_executor.rs index 3f86288657be0..48767c505501c 100644 --- a/src/query/service/src/pipelines/executor/pipeline_executor.rs +++ b/src/query/service/src/pipelines/executor/pipeline_executor.rs @@ -126,23 +126,21 @@ impl PipelineExecutor { on_finished_callback: FinishedCallback, settings: ExecutorSettings, ) -> Result> { - unsafe { - let workers_condvar = WorkersCondvar::create(threads_num); - let global_tasks_queue = ExecutorTasksQueue::create(threads_num); - - Ok(Arc::new(PipelineExecutor { - graph, - threads_num, - workers_condvar, - global_tasks_queue, - on_init_callback, - on_finished_callback, - async_runtime: GlobalIORuntime::instance(), - settings, - finished_notify: Notify::new(), - finished_error: Mutex::new(None), - })) - } + let workers_condvar = WorkersCondvar::create(threads_num); + let global_tasks_queue = ExecutorTasksQueue::create(threads_num); + + Ok(Arc::new(PipelineExecutor { + graph, + threads_num, + workers_condvar, + global_tasks_queue, + on_init_callback, + on_finished_callback, + async_runtime: GlobalIORuntime::instance(), + settings, + finished_notify: Notify::new(), + finished_error: Mutex::new(None), + })) } pub fn finish(&self, cause: Option) { diff --git a/src/query/service/src/storages/result/result_table_source.rs b/src/query/service/src/storages/result/result_table_source.rs index 00ca17ad18bb2..06774fe64a664 100644 --- a/src/query/service/src/storages/result/result_table_source.rs +++ b/src/query/service/src/storages/result/result_table_source.rs @@ -117,7 +117,7 @@ impl Processor for ResultTableSource { match std::mem::replace(&mut self.state, State::Finish) { State::Deserialize(part, chunks) => { let data_block = self.block_reader.deserialize(part, chunks)?; - let mut new_part = self.ctx.try_get_part(); + let new_part = self.ctx.try_get_part(); let progress_values = ProgressValues { rows: data_block.num_rows(), diff --git a/src/query/service/tests/it/storages/fuse/table.rs b/src/query/service/tests/it/storages/fuse/table.rs index c9a3a96e4cd81..757dec0ef2ec2 100644 --- a/src/query/service/tests/it/storages/fuse/table.rs +++ b/src/query/service/tests/it/storages/fuse/table.rs @@ -72,13 +72,13 @@ async fn test_fuse_table_normal_case() -> Result<()> { let (stats, parts) = table.read_partitions(ctx.clone(), None).await?; assert_eq!(stats.read_rows, num_blocks * rows_per_block); - ctx.try_set_partitions(parts)?; + ctx.try_set_partitions(parts.clone())?; let stream = table .read(ctx.clone(), &ReadDataSourcePlan { catalog: "default".to_owned(), source_info: SourceInfo::TableSource(Default::default()), scan_fields: None, - parts: Default::default(), + parts, statistics: Default::default(), description: "".to_string(), tbl_args: None, @@ -131,14 +131,14 @@ async fn test_fuse_table_normal_case() -> Result<()> { assert_eq!(stats.read_rows, num_blocks * rows_per_block); // inject partitions to current ctx - ctx.try_set_partitions(parts)?; + ctx.try_set_partitions(parts.clone())?; let stream = table .read(ctx.clone(), &ReadDataSourcePlan { catalog: "default".to_owned(), source_info: SourceInfo::TableSource(Default::default()), scan_fields: None, - parts: Default::default(), + parts, statistics: Default::default(), description: "".to_string(), tbl_args: None, diff --git a/src/query/storages/fuse/src/operations/read.rs b/src/query/storages/fuse/src/operations/read.rs index 06a0e03ccb1e2..9b5eac4e5fe43 100644 --- a/src/query/storages/fuse/src/operations/read.rs +++ b/src/query/storages/fuse/src/operations/read.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use common_base::base::Progress; use common_base::base::ProgressValues; use common_base::base::Runtime; -use common_base::base::TrySpawn; use common_catalog::table_context::TableContext; use common_datablocks::DataBlock; use common_datavalues::ColumnRef; @@ -249,7 +248,7 @@ impl FuseTableSource { fn generate_one_empty_block(&mut self) -> Result<()> { let schema = self.output_reader.schema(); - let mut new_part = self.ctx.try_get_part(); + let new_part = self.ctx.try_get_part(); self.state = Generated(new_part, DataBlock::empty_with_schema(schema)); Ok(()) } diff --git a/src/query/storages/fuse/src/operations/read_partitions.rs b/src/query/storages/fuse/src/operations/read_partitions.rs index 5186d8ff63330..b486accbcb7c9 100644 --- a/src/query/storages/fuse/src/operations/read_partitions.rs +++ b/src/query/storages/fuse/src/operations/read_partitions.rs @@ -14,7 +14,6 @@ use std::collections::HashMap; use std::sync::Arc; -use std::time::Instant; use common_catalog::table_context::TableContext; use common_datavalues::DataSchemaRef; @@ -43,7 +42,6 @@ impl FuseTable { ctx: Arc, push_downs: Option, ) -> Result<(Statistics, Partitions)> { - let instant = Instant::now(); let snapshot = self.read_table_snapshot(ctx.clone()).await?; match snapshot { Some(snapshot) => { From cd458776df61a0c8216f80a176c8f681d3a16dc8 Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Fri, 30 Sep 2022 15:18:56 +0800 Subject: [PATCH 15/20] fix(processor): try fix test failure --- src/query/service/src/sessions/query_ctx.rs | 2 +- src/query/service/src/sql/executor/pipeline_builder.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index ca6bef254b68c..4fd86a3252733 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -240,7 +240,7 @@ impl TableContext for QueryContext { } fn try_get_part(&self) -> Option { - self.partition_queue.write().pop_back() + self.partition_queue.write().pop_front() } // Update the context partition pool from the pipeline builder. diff --git a/src/query/service/src/sql/executor/pipeline_builder.rs b/src/query/service/src/sql/executor/pipeline_builder.rs index a54895c613751..abfdc3ac77c11 100644 --- a/src/query/service/src/sql/executor/pipeline_builder.rs +++ b/src/query/service/src/sql/executor/pipeline_builder.rs @@ -90,9 +90,9 @@ pub struct PipelineBuilder { impl PipelineBuilder { pub fn create(ctx: Arc) -> PipelineBuilder { PipelineBuilder { - ctx, pipelines: vec![], main_pipeline: Pipeline::create(), + ctx: QueryContext::create_from(ctx), } } From 2e7b3974ba84e160c08f0b054fa6a0aefe093d17 Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Fri, 30 Sep 2022 15:40:22 +0800 Subject: [PATCH 16/20] fix(processor): try fix test failure --- src/query/service/tests/it/storages/testdata/system-tables.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/src/query/service/tests/it/storages/testdata/system-tables.txt b/src/query/service/tests/it/storages/testdata/system-tables.txt index 0c4897ab01bd6..3f1ca0dca8e59 100644 --- a/src/query/service/tests/it/storages/testdata/system-tables.txt +++ b/src/query/service/tests/it/storages/testdata/system-tables.txt @@ -292,6 +292,7 @@ DB.Table: 'system'.'settings', Table: settings-table_id:1, ver:0, Engine: System +--------------------------------+------------+------------+---------+-----------------------------------------------------------------------------------------------------+--------+ | enable_async_insert | 0 | 0 | SESSION | Whether the client open async insert mode, default value: 0. | UInt64 | | enable_cbo | 1 | 1 | SESSION | If enable cost based optimization, default value: 1. | UInt64 | +| enable_distributed_eval_index | 1 | 1 | SESSION | If enable distributed eval index, default value: 1 | UInt64 | | enable_new_processor_framework | 1 | 1 | SESSION | Enable new processor framework if value != 0, default value: 1. | UInt64 | | enable_planner_v2 | 1 | 1 | SESSION | Enable planner v2 by setting this variable to 1, default value: 1. | UInt64 | | flight_client_timeout | 60 | 60 | SESSION | Max duration the flight client request is allowed to take in seconds. By default, it is 60 seconds. | UInt64 | From 5acc981ad3733233507cb252e9a0a6302e4d63bb Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Fri, 30 Sep 2022 16:06:59 +0800 Subject: [PATCH 17/20] fix(processor): try fix test failure --- src/query/service/src/api/rpc/exchange/exchange_manager.rs | 1 + src/query/service/src/sql/executor/pipeline_builder.rs | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/query/service/src/api/rpc/exchange/exchange_manager.rs b/src/query/service/src/api/rpc/exchange/exchange_manager.rs index 5b217033aca86..604f78558fe25 100644 --- a/src/query/service/src/api/rpc/exchange/exchange_manager.rs +++ b/src/query/service/src/api/rpc/exchange/exchange_manager.rs @@ -668,6 +668,7 @@ impl FragmentCoordinator { match &self.payload { FragmentPayload::PlanV2(plan) => { + let ctx = QueryContext::create_from(ctx); let pipeline_builder = PipelineBuilderV2::create(ctx); self.pipeline_build_res = Some(pipeline_builder.finalize(plan)?); } diff --git a/src/query/service/src/sql/executor/pipeline_builder.rs b/src/query/service/src/sql/executor/pipeline_builder.rs index 35e0aa482543a..67fe9066d584d 100644 --- a/src/query/service/src/sql/executor/pipeline_builder.rs +++ b/src/query/service/src/sql/executor/pipeline_builder.rs @@ -90,9 +90,9 @@ pub struct PipelineBuilder { impl PipelineBuilder { pub fn create(ctx: Arc) -> PipelineBuilder { PipelineBuilder { + ctx, pipelines: vec![], main_pipeline: Pipeline::create(), - ctx: QueryContext::create_from(ctx), } } From 6f76776f1e99918215999cc292a96c25c76695af Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Fri, 30 Sep 2022 18:08:08 +0800 Subject: [PATCH 18/20] fix(processor): try fix test failure --- src/query/catalog/src/table_context.rs | 2 ++ src/query/service/src/api/rpc/exchange/exchange_manager.rs | 1 - src/query/service/src/interpreters/interpreter_insert_v2.rs | 1 - src/query/service/src/sessions/query_ctx.rs | 4 ++++ src/query/storages/fuse/src/operations/read.rs | 4 +++- 5 files changed, 9 insertions(+), 3 deletions(-) diff --git a/src/query/catalog/src/table_context.rs b/src/query/catalog/src/table_context.rs index 560a27079cb7c..1eb60c78d95d2 100644 --- a/src/query/catalog/src/table_context.rs +++ b/src/query/catalog/src/table_context.rs @@ -108,4 +108,6 @@ pub trait TableContext: Send + Sync { -> Result>; fn get_processes_info(&self) -> Vec; fn get_runtime(&self) -> Result>; + + fn clone_inner(&self) -> Arc; } diff --git a/src/query/service/src/api/rpc/exchange/exchange_manager.rs b/src/query/service/src/api/rpc/exchange/exchange_manager.rs index 604f78558fe25..5b217033aca86 100644 --- a/src/query/service/src/api/rpc/exchange/exchange_manager.rs +++ b/src/query/service/src/api/rpc/exchange/exchange_manager.rs @@ -668,7 +668,6 @@ impl FragmentCoordinator { match &self.payload { FragmentPayload::PlanV2(plan) => { - let ctx = QueryContext::create_from(ctx); let pipeline_builder = PipelineBuilderV2::create(ctx); self.pipeline_build_res = Some(pipeline_builder.finalize(plan)?); } diff --git a/src/query/service/src/interpreters/interpreter_insert_v2.rs b/src/query/service/src/interpreters/interpreter_insert_v2.rs index cbcf720b07fcd..989d3554b4836 100644 --- a/src/query/service/src/interpreters/interpreter_insert_v2.rs +++ b/src/query/service/src/interpreters/interpreter_insert_v2.rs @@ -164,7 +164,6 @@ impl Interpreter for InsertInterpreterV2 { _ => unreachable!(), }; - table1.get_table_info(); let catalog = self.plan.catalog.clone(); let is_distributed_plan = select_plan.is_distributed_plan(); diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index 1ba19d1a07c97..6c0166ec93ff6 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -386,6 +386,10 @@ impl TableContext for QueryContext { fn get_runtime(&self) -> Result> { self.shared.try_get_runtime() } + + fn clone_inner(&self) -> Arc { + QueryContext::create_from_shared(self.shared.clone()) + } } impl TrySpawn for QueryContext { diff --git a/src/query/storages/fuse/src/operations/read.rs b/src/query/storages/fuse/src/operations/read.rs index 91ee70fb60c3f..dcd42572687bb 100644 --- a/src/query/storages/fuse/src/operations/read.rs +++ b/src/query/storages/fuse/src/operations/read.rs @@ -74,7 +74,7 @@ impl FuseTable { #[inline] pub fn do_read2( &self, - ctx: Arc, + mut ctx: Arc, plan: &ReadDataSourcePlan, pipeline: &mut Pipeline, ) -> Result<()> { @@ -87,6 +87,8 @@ impl FuseTable { } if !lazy_init_segments.is_empty() { + ctx = ctx.clone_inner(); + let table_info = self.table_info.clone(); let push_downs = plan.push_downs.clone(); let query_ctx = ctx.clone(); From 853a8c5362fc515ae28c15798df8cc45eae9f4db Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Mon, 3 Oct 2022 19:52:39 +0800 Subject: [PATCH 19/20] fix(processor): try fix test failure --- src/query/catalog/src/table_context.rs | 8 ----- .../src/api/rpc/exchange/exchange_manager.rs | 3 +- src/query/service/src/sessions/query_ctx.rs | 34 ++----------------- .../service/src/sessions/query_ctx_shared.rs | 16 +++++++++ .../storages/fuse/src/operations/read.rs | 4 +-- .../fuse/src/operations/read_partitions.rs | 14 -------- 6 files changed, 21 insertions(+), 58 deletions(-) diff --git a/src/query/catalog/src/table_context.rs b/src/query/catalog/src/table_context.rs index 1eb60c78d95d2..5a44f879485eb 100644 --- a/src/query/catalog/src/table_context.rs +++ b/src/query/catalog/src/table_context.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use common_base::base::Progress; use common_base::base::ProgressValues; -use common_base::base::Runtime; use common_config::Config; use common_contexts::DalContext; use common_contexts::DalMetrics; @@ -66,10 +65,6 @@ pub trait TableContext: Send + Sync { fn get_write_progress_value(&self) -> ProgressValues; fn get_result_progress(&self) -> Arc; fn get_result_progress_value(&self) -> ProgressValues; - // Steal n partitions from the partition pool by the pipeline worker. - // This also can steal the partitions from distributed node. - fn try_get_partitions(&self, num: u64) -> Result; - fn try_get_part(&self) -> Option; // Update the context partition pool from the pipeline builder. fn try_set_partitions(&self, partitions: Partitions) -> Result<()>; @@ -107,7 +102,4 @@ pub trait TableContext: Send + Sync { async fn get_table(&self, catalog: &str, database: &str, table: &str) -> Result>; fn get_processes_info(&self) -> Vec; - fn get_runtime(&self) -> Result>; - - fn clone_inner(&self) -> Arc; } diff --git a/src/query/service/src/api/rpc/exchange/exchange_manager.rs b/src/query/service/src/api/rpc/exchange/exchange_manager.rs index 5b217033aca86..256371c838b90 100644 --- a/src/query/service/src/api/rpc/exchange/exchange_manager.rs +++ b/src/query/service/src/api/rpc/exchange/exchange_manager.rs @@ -668,7 +668,8 @@ impl FragmentCoordinator { match &self.payload { FragmentPayload::PlanV2(plan) => { - let pipeline_builder = PipelineBuilderV2::create(ctx); + let new_ctx = QueryContext::create_from(ctx); + let pipeline_builder = PipelineBuilderV2::create(new_ctx); self.pipeline_build_res = Some(pipeline_builder.finalize(plan)?); } }; diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index 6c0166ec93ff6..d5bb73f538e6a 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -25,7 +25,6 @@ use chrono_tz::Tz; use common_base::base::tokio::task::JoinHandle; use common_base::base::Progress; use common_base::base::ProgressValues; -use common_base::base::Runtime; use common_base::base::TrySpawn; use common_contexts::DalContext; use common_contexts::DalMetrics; @@ -69,7 +68,6 @@ pub struct QueryContext { version: String, partition_queue: Arc>>, shared: Arc, - precommit_blocks: Arc>>, fragment_id: Arc, } @@ -85,7 +83,6 @@ impl QueryContext { partition_queue: Arc::new(RwLock::new(VecDeque::new())), version: format!("DatabendQuery {}", *crate::version::DATABEND_COMMIT_VERSION), shared, - precommit_blocks: Arc::new(RwLock::new(Vec::new())), fragment_id: Arc::new(AtomicUsize::new(0)), }) } @@ -226,20 +223,6 @@ impl TableContext for QueryContext { fn get_result_progress_value(&self) -> ProgressValues { self.shared.result_progress.as_ref().get_values() } - // Steal n partitions from the partition pool by the pipeline worker. - // This also can steal the partitions from distributed node. - fn try_get_partitions(&self, num: u64) -> Result { - let mut partitions = vec![]; - for _ in 0..num { - match self.partition_queue.write().pop_back() { - None => break, - Some(partition) => { - partitions.push(partition); - } - } - } - Ok(partitions) - } fn try_get_part(&self) -> Option { self.partition_queue.write().pop_front() @@ -334,15 +317,10 @@ impl TableContext for QueryContext { self.shared.dal_ctx.as_ref() } fn push_precommit_block(&self, block: DataBlock) { - let mut blocks = self.precommit_blocks.write(); - blocks.push(block); + self.shared.push_precommit_block(block) } fn consume_precommit_blocks(&self) -> Vec { - let mut blocks = self.precommit_blocks.write(); - - let mut swaped_precommit_blocks = vec![]; - std::mem::swap(&mut *blocks, &mut swaped_precommit_blocks); - swaped_precommit_blocks + self.shared.consume_precommit_blocks() } fn try_get_function_context(&self) -> Result { let tz = self.get_settings().get_timezone()?; @@ -382,14 +360,6 @@ impl TableContext for QueryContext { fn get_processes_info(&self) -> Vec { SessionManager::instance().processes_info() } - - fn get_runtime(&self) -> Result> { - self.shared.try_get_runtime() - } - - fn clone_inner(&self) -> Arc { - QueryContext::create_from_shared(self.shared.clone()) - } } impl TrySpawn for QueryContext { diff --git a/src/query/service/src/sessions/query_ctx_shared.rs b/src/query/service/src/sessions/query_ctx_shared.rs index 52ca0f031c827..33676d0a9cd2f 100644 --- a/src/query/service/src/sessions/query_ctx_shared.rs +++ b/src/query/service/src/sessions/query_ctx_shared.rs @@ -22,6 +22,7 @@ use std::sync::Weak; use common_base::base::Progress; use common_base::base::Runtime; use common_contexts::DalContext; +use common_datablocks::DataBlock; use common_exception::ErrorCode; use common_exception::Result; use common_meta_types::UserInfo; @@ -79,6 +80,7 @@ pub struct QueryContextShared { pub(in crate::sessions) catalog_manager: Arc, pub(in crate::sessions) storage_operator: StorageOperator, pub(in crate::sessions) executor: Arc>>, + pub(in crate::sessions) precommit_blocks: Arc>>, } impl QueryContextShared { @@ -108,6 +110,7 @@ impl QueryContextShared { auth_manager: AuthMgr::create(config).await?, affect: Arc::new(Mutex::new(None)), executor: Arc::new(RwLock::new(Weak::new())), + precommit_blocks: Arc::new(RwLock::new(vec![])), })) } @@ -297,4 +300,17 @@ impl QueryContextShared { let mut executor = self.executor.write(); *executor = weak_ptr; } + + pub fn push_precommit_block(&self, block: DataBlock) { + let mut blocks = self.precommit_blocks.write(); + blocks.push(block); + } + + pub fn consume_precommit_blocks(&self) -> Vec { + let mut blocks = self.precommit_blocks.write(); + + let mut swaped_precommit_blocks = vec![]; + std::mem::swap(&mut *blocks, &mut swaped_precommit_blocks); + swaped_precommit_blocks + } } diff --git a/src/query/storages/fuse/src/operations/read.rs b/src/query/storages/fuse/src/operations/read.rs index dcd42572687bb..91ee70fb60c3f 100644 --- a/src/query/storages/fuse/src/operations/read.rs +++ b/src/query/storages/fuse/src/operations/read.rs @@ -74,7 +74,7 @@ impl FuseTable { #[inline] pub fn do_read2( &self, - mut ctx: Arc, + ctx: Arc, plan: &ReadDataSourcePlan, pipeline: &mut Pipeline, ) -> Result<()> { @@ -87,8 +87,6 @@ impl FuseTable { } if !lazy_init_segments.is_empty() { - ctx = ctx.clone_inner(); - let table_info = self.table_info.clone(); let push_downs = plan.push_downs.clone(); let query_ctx = ctx.clone(); diff --git a/src/query/storages/fuse/src/operations/read_partitions.rs b/src/query/storages/fuse/src/operations/read_partitions.rs index b486accbcb7c9..2dd497e9b9367 100644 --- a/src/query/storages/fuse/src/operations/read_partitions.rs +++ b/src/query/storages/fuse/src/operations/read_partitions.rs @@ -85,20 +85,6 @@ impl FuseTable { } } - pub fn sync_prune_snapshot_blocks( - ctx: Arc, - push_downs: Option, - table_info: TableInfo, - segments_location: Vec, - ) -> Result<(Statistics, Partitions)> { - let block_metas = - BlockPruner::sync_prune(&ctx, table_info.schema(), &push_downs, segments_location)? - .into_iter() - .map(|(_, v)| v) - .collect::>(); - Self::read_partitions_with_metas(ctx, table_info.schema(), push_downs, block_metas, 0) - } - pub async fn prune_snapshot_blocks( ctx: Arc, push_downs: Option, From 9f3cda0bb8327851c016bea9d2ecf4294620769a Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Mon, 3 Oct 2022 21:17:34 +0800 Subject: [PATCH 20/20] fix(processor): skip 02_0001_create_table_with_external_location test --- scripts/ci/ci-run-stateful-tests-cluster-s3.sh | 2 +- src/query/service/src/api/rpc/exchange/exchange_manager.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/scripts/ci/ci-run-stateful-tests-cluster-s3.sh b/scripts/ci/ci-run-stateful-tests-cluster-s3.sh index 3189c64b944b6..d2d4f2dbd8d85 100755 --- a/scripts/ci/ci-run-stateful-tests-cluster-s3.sh +++ b/scripts/ci/ci-run-stateful-tests-cluster-s3.sh @@ -27,4 +27,4 @@ SCRIPT_PATH="$(cd "$(dirname "$0")" >/dev/null 2>&1 && pwd)" cd "$SCRIPT_PATH/../../tests" || exit echo "Starting databend-test" -./databend-test --mode 'cluster' --run-dir 1_stateful +./databend-test --mode 'cluster' --run-dir 1_stateful --skip '02_0001_create_table_with_external_location' diff --git a/src/query/service/src/api/rpc/exchange/exchange_manager.rs b/src/query/service/src/api/rpc/exchange/exchange_manager.rs index 256371c838b90..e252734df1b4c 100644 --- a/src/query/service/src/api/rpc/exchange/exchange_manager.rs +++ b/src/query/service/src/api/rpc/exchange/exchange_manager.rs @@ -668,8 +668,8 @@ impl FragmentCoordinator { match &self.payload { FragmentPayload::PlanV2(plan) => { - let new_ctx = QueryContext::create_from(ctx); - let pipeline_builder = PipelineBuilderV2::create(new_ctx); + let pipeline_ctx = QueryContext::create_from(ctx); + let pipeline_builder = PipelineBuilderV2::create(pipeline_ctx); self.pipeline_build_res = Some(pipeline_builder.finalize(plan)?); } };