From 853a8c5362fc515ae28c15798df8cc45eae9f4db Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Mon, 3 Oct 2022 19:52:39 +0800 Subject: [PATCH] fix(processor): try fix test failure --- src/query/catalog/src/table_context.rs | 8 ----- .../src/api/rpc/exchange/exchange_manager.rs | 3 +- src/query/service/src/sessions/query_ctx.rs | 34 ++----------------- .../service/src/sessions/query_ctx_shared.rs | 16 +++++++++ .../storages/fuse/src/operations/read.rs | 4 +-- .../fuse/src/operations/read_partitions.rs | 14 -------- 6 files changed, 21 insertions(+), 58 deletions(-) diff --git a/src/query/catalog/src/table_context.rs b/src/query/catalog/src/table_context.rs index 1eb60c78d95d2..5a44f879485eb 100644 --- a/src/query/catalog/src/table_context.rs +++ b/src/query/catalog/src/table_context.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use common_base::base::Progress; use common_base::base::ProgressValues; -use common_base::base::Runtime; use common_config::Config; use common_contexts::DalContext; use common_contexts::DalMetrics; @@ -66,10 +65,6 @@ pub trait TableContext: Send + Sync { fn get_write_progress_value(&self) -> ProgressValues; fn get_result_progress(&self) -> Arc; fn get_result_progress_value(&self) -> ProgressValues; - // Steal n partitions from the partition pool by the pipeline worker. - // This also can steal the partitions from distributed node. - fn try_get_partitions(&self, num: u64) -> Result; - fn try_get_part(&self) -> Option; // Update the context partition pool from the pipeline builder. fn try_set_partitions(&self, partitions: Partitions) -> Result<()>; @@ -107,7 +102,4 @@ pub trait TableContext: Send + Sync { async fn get_table(&self, catalog: &str, database: &str, table: &str) -> Result>; fn get_processes_info(&self) -> Vec; - fn get_runtime(&self) -> Result>; - - fn clone_inner(&self) -> Arc; } diff --git a/src/query/service/src/api/rpc/exchange/exchange_manager.rs b/src/query/service/src/api/rpc/exchange/exchange_manager.rs index 5b217033aca86..256371c838b90 100644 --- a/src/query/service/src/api/rpc/exchange/exchange_manager.rs +++ b/src/query/service/src/api/rpc/exchange/exchange_manager.rs @@ -668,7 +668,8 @@ impl FragmentCoordinator { match &self.payload { FragmentPayload::PlanV2(plan) => { - let pipeline_builder = PipelineBuilderV2::create(ctx); + let new_ctx = QueryContext::create_from(ctx); + let pipeline_builder = PipelineBuilderV2::create(new_ctx); self.pipeline_build_res = Some(pipeline_builder.finalize(plan)?); } }; diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index 6c0166ec93ff6..d5bb73f538e6a 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -25,7 +25,6 @@ use chrono_tz::Tz; use common_base::base::tokio::task::JoinHandle; use common_base::base::Progress; use common_base::base::ProgressValues; -use common_base::base::Runtime; use common_base::base::TrySpawn; use common_contexts::DalContext; use common_contexts::DalMetrics; @@ -69,7 +68,6 @@ pub struct QueryContext { version: String, partition_queue: Arc>>, shared: Arc, - precommit_blocks: Arc>>, fragment_id: Arc, } @@ -85,7 +83,6 @@ impl QueryContext { partition_queue: Arc::new(RwLock::new(VecDeque::new())), version: format!("DatabendQuery {}", *crate::version::DATABEND_COMMIT_VERSION), shared, - precommit_blocks: Arc::new(RwLock::new(Vec::new())), fragment_id: Arc::new(AtomicUsize::new(0)), }) } @@ -226,20 +223,6 @@ impl TableContext for QueryContext { fn get_result_progress_value(&self) -> ProgressValues { self.shared.result_progress.as_ref().get_values() } - // Steal n partitions from the partition pool by the pipeline worker. - // This also can steal the partitions from distributed node. - fn try_get_partitions(&self, num: u64) -> Result { - let mut partitions = vec![]; - for _ in 0..num { - match self.partition_queue.write().pop_back() { - None => break, - Some(partition) => { - partitions.push(partition); - } - } - } - Ok(partitions) - } fn try_get_part(&self) -> Option { self.partition_queue.write().pop_front() @@ -334,15 +317,10 @@ impl TableContext for QueryContext { self.shared.dal_ctx.as_ref() } fn push_precommit_block(&self, block: DataBlock) { - let mut blocks = self.precommit_blocks.write(); - blocks.push(block); + self.shared.push_precommit_block(block) } fn consume_precommit_blocks(&self) -> Vec { - let mut blocks = self.precommit_blocks.write(); - - let mut swaped_precommit_blocks = vec![]; - std::mem::swap(&mut *blocks, &mut swaped_precommit_blocks); - swaped_precommit_blocks + self.shared.consume_precommit_blocks() } fn try_get_function_context(&self) -> Result { let tz = self.get_settings().get_timezone()?; @@ -382,14 +360,6 @@ impl TableContext for QueryContext { fn get_processes_info(&self) -> Vec { SessionManager::instance().processes_info() } - - fn get_runtime(&self) -> Result> { - self.shared.try_get_runtime() - } - - fn clone_inner(&self) -> Arc { - QueryContext::create_from_shared(self.shared.clone()) - } } impl TrySpawn for QueryContext { diff --git a/src/query/service/src/sessions/query_ctx_shared.rs b/src/query/service/src/sessions/query_ctx_shared.rs index 52ca0f031c827..33676d0a9cd2f 100644 --- a/src/query/service/src/sessions/query_ctx_shared.rs +++ b/src/query/service/src/sessions/query_ctx_shared.rs @@ -22,6 +22,7 @@ use std::sync::Weak; use common_base::base::Progress; use common_base::base::Runtime; use common_contexts::DalContext; +use common_datablocks::DataBlock; use common_exception::ErrorCode; use common_exception::Result; use common_meta_types::UserInfo; @@ -79,6 +80,7 @@ pub struct QueryContextShared { pub(in crate::sessions) catalog_manager: Arc, pub(in crate::sessions) storage_operator: StorageOperator, pub(in crate::sessions) executor: Arc>>, + pub(in crate::sessions) precommit_blocks: Arc>>, } impl QueryContextShared { @@ -108,6 +110,7 @@ impl QueryContextShared { auth_manager: AuthMgr::create(config).await?, affect: Arc::new(Mutex::new(None)), executor: Arc::new(RwLock::new(Weak::new())), + precommit_blocks: Arc::new(RwLock::new(vec![])), })) } @@ -297,4 +300,17 @@ impl QueryContextShared { let mut executor = self.executor.write(); *executor = weak_ptr; } + + pub fn push_precommit_block(&self, block: DataBlock) { + let mut blocks = self.precommit_blocks.write(); + blocks.push(block); + } + + pub fn consume_precommit_blocks(&self) -> Vec { + let mut blocks = self.precommit_blocks.write(); + + let mut swaped_precommit_blocks = vec![]; + std::mem::swap(&mut *blocks, &mut swaped_precommit_blocks); + swaped_precommit_blocks + } } diff --git a/src/query/storages/fuse/src/operations/read.rs b/src/query/storages/fuse/src/operations/read.rs index dcd42572687bb..91ee70fb60c3f 100644 --- a/src/query/storages/fuse/src/operations/read.rs +++ b/src/query/storages/fuse/src/operations/read.rs @@ -74,7 +74,7 @@ impl FuseTable { #[inline] pub fn do_read2( &self, - mut ctx: Arc, + ctx: Arc, plan: &ReadDataSourcePlan, pipeline: &mut Pipeline, ) -> Result<()> { @@ -87,8 +87,6 @@ impl FuseTable { } if !lazy_init_segments.is_empty() { - ctx = ctx.clone_inner(); - let table_info = self.table_info.clone(); let push_downs = plan.push_downs.clone(); let query_ctx = ctx.clone(); diff --git a/src/query/storages/fuse/src/operations/read_partitions.rs b/src/query/storages/fuse/src/operations/read_partitions.rs index b486accbcb7c9..2dd497e9b9367 100644 --- a/src/query/storages/fuse/src/operations/read_partitions.rs +++ b/src/query/storages/fuse/src/operations/read_partitions.rs @@ -85,20 +85,6 @@ impl FuseTable { } } - pub fn sync_prune_snapshot_blocks( - ctx: Arc, - push_downs: Option, - table_info: TableInfo, - segments_location: Vec, - ) -> Result<(Statistics, Partitions)> { - let block_metas = - BlockPruner::sync_prune(&ctx, table_info.schema(), &push_downs, segments_location)? - .into_iter() - .map(|(_, v)| v) - .collect::>(); - Self::read_partitions_with_metas(ctx, table_info.schema(), push_downs, block_metas, 0) - } - pub async fn prune_snapshot_blocks( ctx: Arc, push_downs: Option,