From 3cb790b541012dd65f8d6e86617336f10fa4439f Mon Sep 17 00:00:00 2001 From: Yu Lei Date: Thu, 15 Jun 2023 14:22:58 +0800 Subject: [PATCH] feat(executor): Introduce `QueryProfileManager` to collect query profilings (#11760) * introduce profile manager to collect query profilings * fix license header * fix license header * format --- Cargo.lock | 5 + Cargo.toml | 2 +- src/common/profile/src/span.rs | 71 ---- src/query/catalog/Cargo.toml | 1 + src/query/catalog/src/table_context.rs | 3 + src/query/pipeline/transforms/Cargo.toml | 2 +- .../src/processors/profile_wrapper.rs | 18 +- .../processors/transforms/transform_sort.rs | 4 +- src/{common => query}/profile/Cargo.toml | 5 + src/{common => query}/profile/src/lib.rs | 9 +- src/query/profile/src/mgr.rs | 134 ++++++++ src/query/profile/src/proc.rs | 72 +++++ src/query/profile/src/prof.rs | 60 ++++ src/query/service/Cargo.toml | 2 +- .../src/api/rpc/exchange/exchange_manager.rs | 9 +- .../src/databases/system/system_database.rs | 2 + src/query/service/src/global_services.rs | 2 + .../src/interpreters/interpreter_explain.rs | 14 +- .../src/interpreters/interpreter_insert.rs | 6 + .../src/pipelines/pipeline_build_res.rs | 8 +- .../service/src/pipelines/pipeline_builder.rs | 6 +- .../src/schedulers/fragments/fragmenter.rs | 10 + .../query_fragment_actions_display.rs | 4 +- src/query/service/src/schedulers/scheduler.rs | 9 +- src/query/service/src/sessions/query_ctx.rs | 9 + .../service/src/sessions/query_ctx_shared.rs | 16 +- .../it/storages/fuse/operations/commit.rs | 5 + src/query/sql/Cargo.toml | 2 +- src/query/sql/src/executor/format.rs | 154 +++------ src/query/sql/src/executor/mod.rs | 2 + src/query/sql/src/executor/physical_plan.rs | 38 +++ .../sql/src/executor/physical_plan_builder.rs | 2 + .../sql/src/executor/physical_plan_visitor.rs | 6 + src/query/sql/src/executor/profile.rs | 305 ++++++++++++++++++ src/query/storages/system/src/lib.rs | 2 + .../system/src/query_profile_table.rs | 102 ++++++ 36 files changed, 882 insertions(+), 219 deletions(-) delete mode 100644 src/common/profile/src/span.rs rename src/{common => query}/profile/Cargo.toml (74%) rename src/{common => query}/profile/src/lib.rs (85%) create mode 100644 src/query/profile/src/mgr.rs create mode 100644 src/query/profile/src/proc.rs create mode 100644 src/query/profile/src/prof.rs create mode 100644 src/query/sql/src/executor/profile.rs create mode 100644 src/query/storages/system/src/query_profile_table.rs diff --git a/Cargo.lock b/Cargo.lock index 0afe96c7f3f03..18ad3ece6c75a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1753,6 +1753,7 @@ dependencies = [ "common-meta-app", "common-meta-types", "common-pipeline-core", + "common-profile", "common-settings", "common-storage", "dashmap", @@ -2413,6 +2414,10 @@ dependencies = [ [[package]] name = "common-profile" version = "0.1.0" +dependencies = [ + "common-base", + "dashmap", +] [[package]] name = "common-proto-conv" diff --git a/Cargo.toml b/Cargo.toml index 640460d0ea6cf..aa2cef641d9e7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,7 +25,6 @@ members = [ "src/common/openai", "src/common/tracing", "src/common/storage", - "src/common/profile", "src/common/vector", "src/common/license", # Query @@ -45,6 +44,7 @@ members = [ "src/query/pipeline/sinks", "src/query/pipeline/sources", "src/query/pipeline/transforms", + "src/query/profile", "src/query/settings", "src/query/sql", "src/query/storages/common/blocks", diff --git a/src/common/profile/src/span.rs b/src/common/profile/src/span.rs deleted file mode 100644 index e84f049dad85e..0000000000000 --- a/src/common/profile/src/span.rs +++ /dev/null @@ -1,71 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashMap; -use std::fmt::Debug; -use std::sync::Arc; -use std::sync::Mutex; - -pub type ProfSpanSetRef = Arc>>; - -#[derive(Default)] -pub struct ProfSpan { - /// The time spent to process in nanoseconds - pub process_time: u64, -} - -impl ProfSpan { - pub fn add(&mut self, other: &Self) { - self.process_time += other.process_time; - } -} - -#[derive(Default)] -pub struct ProfSpanSet { - spans: HashMap, -} - -impl ProfSpanSet -where K: std::hash::Hash + Eq + PartialEq + Clone + Debug -{ - pub fn update(&mut self, key: K, span: ProfSpan) { - let entry = self.spans.entry(key).or_insert_with(ProfSpan::default); - entry.add(&span); - } - - pub fn iter(&self) -> impl Iterator { - self.spans.iter() - } - - pub fn get(&self, k: &K) -> Option<&ProfSpan> { - self.spans.get(k) - } -} - -#[derive(Clone, Default)] -pub struct ProfSpanBuilder { - process_time: u64, -} - -impl ProfSpanBuilder { - pub fn accumulate_process_time(&mut self, nanos: u64) { - self.process_time += nanos; - } - - pub fn finish(self) -> ProfSpan { - ProfSpan { - process_time: self.process_time, - } - } -} diff --git a/src/query/catalog/Cargo.toml b/src/query/catalog/Cargo.toml index 5866cfbb5da59..f30d5bddb88d5 100644 --- a/src/query/catalog/Cargo.toml +++ b/src/query/catalog/Cargo.toml @@ -17,6 +17,7 @@ common-io = { path = "../../common/io" } common-meta-app = { path = "../../meta/app" } common-meta-types = { path = "../../meta/types" } common-pipeline-core = { path = "../pipeline/core" } +common-profile = { path = "../profile" } common-settings = { path = "../settings" } common-storage = { path = "../../common/storage" } diff --git a/src/query/catalog/src/table_context.rs b/src/query/catalog/src/table_context.rs index 3e7afab35c089..f7169f2ff4f35 100644 --- a/src/query/catalog/src/table_context.rs +++ b/src/query/catalog/src/table_context.rs @@ -30,6 +30,7 @@ use common_meta_app::principal::OnErrorMode; use common_meta_app::principal::RoleInfo; use common_meta_app::principal::UserInfo; use common_pipeline_core::InputError; +use common_profile::QueryProfileManager; use common_settings::ChangeValue; use common_settings::Settings; use common_storage::DataOperator; @@ -135,6 +136,8 @@ pub trait TableContext: Send + Sync { fn apply_changed_settings(&self, changes: HashMap) -> Result<()>; fn get_changed_settings(&self) -> HashMap; + fn get_query_profile_manager(&self) -> Arc; + // Get the storage data accessor operator from the session manager. fn get_data_operator(&self) -> Result; diff --git a/src/query/pipeline/transforms/Cargo.toml b/src/query/pipeline/transforms/Cargo.toml index d02728e25ee1d..498e80d47bb76 100644 --- a/src/query/pipeline/transforms/Cargo.toml +++ b/src/query/pipeline/transforms/Cargo.toml @@ -13,7 +13,7 @@ common-base = { path = "../../../common/base" } common-exception = { path = "../../../common/exception" } common-expression = { path = "../../expression" } common-pipeline-core = { path = "../core" } -common-profile = { path = "../../../common/profile" } +common-profile = { path = "../../profile" } match-template = "0.0.1" async-backtrace = { workspace = true } diff --git a/src/query/pipeline/transforms/src/processors/profile_wrapper.rs b/src/query/pipeline/transforms/src/processors/profile_wrapper.rs index 0b99c5739f05a..6eac56c7c3cb3 100644 --- a/src/query/pipeline/transforms/src/processors/profile_wrapper.rs +++ b/src/query/pipeline/transforms/src/processors/profile_wrapper.rs @@ -17,14 +17,15 @@ use std::time::Instant; use common_exception::Result; use common_pipeline_core::processors::processor::Event; use common_pipeline_core::processors::Processor; -use common_profile::ProfSpanBuilder; -use common_profile::ProfSpanSetRef; +use common_profile::ProcessorProfile; +use common_profile::SharedProcessorProfiles; pub struct ProfileWrapper { inner: T, prof_span_id: u32, - prof_span_set: ProfSpanSetRef, - prof_span_builder: ProfSpanBuilder, + prof_span_set: SharedProcessorProfiles, + + prof: ProcessorProfile, } impl ProfileWrapper @@ -33,13 +34,13 @@ where T: Processor + 'static pub fn create( inner: T, prof_span_id: u32, - prof_span_set: ProfSpanSetRef, + prof_span_set: SharedProcessorProfiles, ) -> Box { Box::new(Self { inner, prof_span_id, prof_span_set, - prof_span_builder: ProfSpanBuilder::default(), + prof: ProcessorProfile::default(), }) } } @@ -62,7 +63,7 @@ where T: Processor + 'static self.prof_span_set .lock() .unwrap() - .update(self.prof_span_id, self.prof_span_builder.clone().finish()); + .update(self.prof_span_id, self.prof); Ok(Event::Finished) } v => Ok(v), @@ -73,8 +74,7 @@ where T: Processor + 'static let instant = Instant::now(); self.inner.process()?; let elapsed = instant.elapsed(); - self.prof_span_builder - .accumulate_process_time(elapsed.as_nanos() as u64); + self.prof = self.prof + ProcessorProfile { cpu_time: elapsed }; Ok(()) } diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_sort.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_sort.rs index 7c9f57b58c718..f0360d5a12404 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_sort.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_sort.rs @@ -17,7 +17,7 @@ use common_expression::DataSchemaRef; use common_expression::SortColumnDescription; use common_pipeline_core::processors::processor::ProcessorPtr; use common_pipeline_core::Pipeline; -use common_profile::ProfSpanSetRef; +use common_profile::SharedProcessorProfiles; use super::transform_multi_sort_merge::try_add_multi_sort_merge; use super::transform_sort_merge::try_create_transform_sort_merge; @@ -33,7 +33,7 @@ pub fn build_full_sort_pipeline( limit: Option, partial_block_size: usize, final_block_size: usize, - prof_info: Option<(u32, ProfSpanSetRef)>, + prof_info: Option<(u32, SharedProcessorProfiles)>, after_exchange: bool, ) -> Result<()> { // Partial sort diff --git a/src/common/profile/Cargo.toml b/src/query/profile/Cargo.toml similarity index 74% rename from src/common/profile/Cargo.toml rename to src/query/profile/Cargo.toml index 958cae4afdad8..1e26ed7113abe 100644 --- a/src/common/profile/Cargo.toml +++ b/src/query/profile/Cargo.toml @@ -9,3 +9,8 @@ edition = { workspace = true } [lib] doctest = false test = false + +[dependencies] +common-base = { path = "../../common/base" } + +dashmap = "5.4" diff --git a/src/common/profile/src/lib.rs b/src/query/profile/src/lib.rs similarity index 85% rename from src/common/profile/src/lib.rs rename to src/query/profile/src/lib.rs index 708b2c752d2a3..5c3fb77006e64 100644 --- a/src/common/profile/src/lib.rs +++ b/src/query/profile/src/lib.rs @@ -12,5 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod span; -pub use span::*; +mod mgr; +mod proc; +mod prof; + +pub use mgr::QueryProfileManager; +pub use proc::*; +pub use prof::*; diff --git a/src/query/profile/src/mgr.rs b/src/query/profile/src/mgr.rs new file mode 100644 index 0000000000000..97035a2a0a69d --- /dev/null +++ b/src/query/profile/src/mgr.rs @@ -0,0 +1,134 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::VecDeque; +use std::sync::Arc; +use std::sync::Mutex; + +use common_base::base::GlobalInstance; +use dashmap::mapref::entry::Entry; +use dashmap::DashMap; + +use crate::prof::QueryProfile; + +/// Default capacity of the LRU cache of query profiles. +const DEFAULT_QUERY_PROFILE_LIMIT: usize = 20; + +/// Manager of query profiling. +/// This is a singleton in every databend-query process. +pub struct QueryProfileManager { + /// The LRU cache of query profiles. + profiles: Lru, +} + +impl QueryProfileManager { + fn new(capacity: usize) -> Self { + QueryProfileManager { + profiles: Lru::new(capacity), + } + } + + pub fn init() { + GlobalInstance::set(Arc::new(Self::new(DEFAULT_QUERY_PROFILE_LIMIT))); + } + + pub fn instance() -> Arc { + GlobalInstance::get() + } + + /// Try to get the query profile by query ID. + pub fn get(&self, query_id: &str) -> Option> { + self.profiles.get(query_id) + } + + /// Inserts a query profile. + pub fn insert(&self, query_profile: Arc) { + self.profiles + .insert(query_profile.query_id.clone(), query_profile); + } + + /// Lists all query profiles. + pub fn list_all(&self) -> Vec> { + self.profiles.list_all() + } +} + +impl Default for QueryProfileManager { + fn default() -> Self { + QueryProfileManager::new(DEFAULT_QUERY_PROFILE_LIMIT) + } +} + +/// An LRU cache of query profiles. The expired query profiles +/// will be removed. +struct Lru { + /// The maximum number of query profiles to keep in memory. + /// If the number of query profiles exceeds this number, + /// the oldest one will be removed. + capacity: usize, + + /// The query profiles. + /// The key is the query ID. + /// The value is the query profile. + profiles: DashMap>, + + /// An LRU list of query IDs. + lru: Mutex>, +} + +impl Lru { + /// Creates a new LRU cache. + pub fn new(capacity: usize) -> Self { + Lru { + capacity, + profiles: DashMap::with_capacity(capacity), + lru: Mutex::new(VecDeque::with_capacity(capacity)), + } + } + + /// Gets the query profile by the query ID. + /// Notice that this method required to acquire the shared lock of the LRU list. + /// So don't call this method when the lock is already acquired. + pub fn get(&self, query_id: &str) -> Option> { + self.profiles.get(query_id).map(|v| v.value().clone()) + } + + /// Inserts a query profile. + /// This operation is thread-safe. + pub fn insert(&self, query_id: String, query_profile: Arc) { + // Lock the LRU list to ensure the consistency between the LRU list and the query profiles. + let mut lru = self.lru.lock().unwrap(); + + if let Entry::Occupied(mut prof) = self.profiles.entry(query_id.clone()) { + prof.insert(query_profile); + return; + } + + if self.profiles.len() >= self.capacity { + if let Some(query_id) = lru.pop_front() { + self.profiles.remove(&query_id); + } + } + + self.profiles.insert(query_id.clone(), query_profile); + lru.push_back(query_id); + } + + /// Lists all query profiles. + /// Notice that this method required to acquire the shared lock of the LRU list. + /// So don't call this method when the lock is already acquired. + pub fn list_all(&self) -> Vec> { + self.profiles.iter().map(|v| v.value().clone()).collect() + } +} diff --git a/src/query/profile/src/proc.rs b/src/query/profile/src/proc.rs new file mode 100644 index 0000000000000..cff15c389264c --- /dev/null +++ b/src/query/profile/src/proc.rs @@ -0,0 +1,72 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::fmt::Debug; +use std::sync::Arc; +use std::sync::Mutex; +use std::time::Duration; + +pub type SharedProcessorProfiles = Arc>>; + +/// Execution profile information of a `Processor`. +/// Can be merged with other `ProcessorProfile` using +/// `add` or `+` operator. +/// +/// # Example +/// ``` +/// let profile1 = ProcessorProfile::default(); +/// let profile2 = ProcessorProfile::default(); +/// let profile = profile1 + profile2; +/// ``` +#[derive(Default, Clone, Copy, Debug)] +pub struct ProcessorProfile { + /// The time spent to process in nanoseconds + pub cpu_time: Duration, +} + +impl std::ops::Add for ProcessorProfile { + type Output = Self; + + fn add(self, rhs: Self) -> Self::Output { + Self { + cpu_time: self.cpu_time + rhs.cpu_time, + } + } +} + +#[derive(Default)] +pub struct ProcessorProfiles { + spans: HashMap, +} + +impl ProcessorProfiles +where K: std::hash::Hash + Eq + PartialEq + Clone + Debug +{ + pub fn update(&mut self, key: K, span: ProcessorProfile) { + let entry = self + .spans + .entry(key) + .or_insert_with(ProcessorProfile::default); + *entry = *entry + span; + } + + pub fn iter(&self) -> impl Iterator { + self.spans.iter() + } + + pub fn get(&self, k: &K) -> Option<&ProcessorProfile> { + self.spans.get(k) + } +} diff --git a/src/query/profile/src/prof.rs b/src/query/profile/src/prof.rs new file mode 100644 index 0000000000000..ca37c4fe1705e --- /dev/null +++ b/src/query/profile/src/prof.rs @@ -0,0 +1,60 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Duration; + +#[derive(Clone)] +pub struct QueryProfile { + /// Query ID of the query profile + pub query_id: String, + + /// Flattened plan node profiles + pub plan_node_profs: Vec, +} + +impl QueryProfile { + pub fn new(query_id: String, plan_node_profs: Vec) -> Self { + QueryProfile { + query_id, + plan_node_profs, + } + } +} + +#[derive(Clone)] +pub struct PlanNodeProfile { + /// ID of the `PhysicalPlan` + pub id: u32, + + /// Name of the `PhysicalPlan`, e.g. `HashJoin` + pub plan_node_name: String, + + // TODO(leiysky): making this a typed enum + /// Description of the `PhysicalPlan` + pub description: String, + + /// The time spent to process in nanoseconds + pub cpu_time: Duration, +} + +impl PlanNodeProfile { + pub fn new(id: u32, plan_node_name: String, description: String, cpu_time: Duration) -> Self { + PlanNodeProfile { + id, + plan_node_name, + description, + cpu_time, + } + } +} diff --git a/src/query/service/Cargo.toml b/src/query/service/Cargo.toml index a207ced40160b..d00ea55e5cb61 100644 --- a/src/query/service/Cargo.toml +++ b/src/query/service/Cargo.toml @@ -76,7 +76,7 @@ common-pipeline-core = { path = "../pipeline/core" } common-pipeline-sinks = { path = "../pipeline/sinks" } common-pipeline-sources = { path = "../pipeline/sources" } common-pipeline-transforms = { path = "../pipeline/transforms" } -common-profile = { path = "../../common/profile" } +common-profile = { path = "../profile" } common-settings = { path = "../settings" } common-sharing = { path = "../sharing" } common-sql = { path = "../sql" } diff --git a/src/query/service/src/api/rpc/exchange/exchange_manager.rs b/src/query/service/src/api/rpc/exchange/exchange_manager.rs index a278c68d061db..48031b3773f17 100644 --- a/src/query/service/src/api/rpc/exchange/exchange_manager.rs +++ b/src/query/service/src/api/rpc/exchange/exchange_manager.rs @@ -29,7 +29,7 @@ use common_config::GlobalConfig; use common_exception::ErrorCode; use common_exception::Result; use common_grpc::ConnectionFactory; -use common_profile::ProfSpanSetRef; +use common_profile::SharedProcessorProfiles; use common_sql::executor::PhysicalPlan; use parking_lot::Mutex; use parking_lot::ReentrantMutex; @@ -802,8 +802,11 @@ impl FragmentCoordinator { self.initialized = true; let pipeline_ctx = QueryContext::create_from(ctx); - let pipeline_builder = - PipelineBuilder::create(pipeline_ctx, enable_profiling, ProfSpanSetRef::default()); + let pipeline_builder = PipelineBuilder::create( + pipeline_ctx, + enable_profiling, + SharedProcessorProfiles::default(), + ); self.pipeline_build_res = Some(pipeline_builder.finalize(&self.physical_plan)?); } diff --git a/src/query/service/src/databases/system/system_database.rs b/src/query/service/src/databases/system/system_database.rs index 14c2885300d41..2ff270ae05c45 100644 --- a/src/query/service/src/databases/system/system_database.rs +++ b/src/query/service/src/databases/system/system_database.rs @@ -40,6 +40,7 @@ use common_storages_system::OneTable; use common_storages_system::ProcessesTable; use common_storages_system::QueryCacheTable; use common_storages_system::QueryLogTable; +use common_storages_system::QueryProfileTable; use common_storages_system::RolesTable; use common_storages_system::SettingsTable; use common_storages_system::StagesTable; @@ -103,6 +104,7 @@ impl SystemDatabase { TableFunctionsTable::create(sys_db_meta.next_table_id()), CachesTable::create(sys_db_meta.next_table_id()), IndexesTable::create(sys_db_meta.next_table_id()), + QueryProfileTable::create(sys_db_meta.next_table_id()), ]; let disable_tables = Self::disable_system_tables(); diff --git a/src/query/service/src/global_services.rs b/src/query/service/src/global_services.rs index 9fa0b2c7637a1..357b0360db6df 100644 --- a/src/query/service/src/global_services.rs +++ b/src/query/service/src/global_services.rs @@ -19,6 +19,7 @@ use common_catalog::catalog::CatalogManager; use common_config::GlobalConfig; use common_config::InnerConfig; use common_exception::Result; +use common_profile::QueryProfileManager; use common_sharing::ShareEndpointManager; use common_storage::DataOperator; use common_storage::ShareTableConfig; @@ -80,6 +81,7 @@ impl GlobalServices { .await?; RoleCacheManager::init()?; ShareEndpointManager::init()?; + QueryProfileManager::init(); Ok(()) } diff --git a/src/query/service/src/interpreters/interpreter_explain.rs b/src/query/service/src/interpreters/interpreter_explain.rs index 9c59a7d2dbe74..1fdb534c85937 100644 --- a/src/query/service/src/interpreters/interpreter_explain.rs +++ b/src/query/service/src/interpreters/interpreter_explain.rs @@ -25,7 +25,8 @@ use common_expression::DataField; use common_expression::DataSchemaRef; use common_expression::DataSchemaRefExt; use common_expression::FromData; -use common_profile::ProfSpanSetRef; +use common_profile::SharedProcessorProfiles; +use common_sql::executor::ProfileHelper; use common_sql::MetadataRef; use crate::interpreters::Interpreter; @@ -202,7 +203,7 @@ impl ExplainInterpreter { metadata: &MetadataRef, ) -> Result> { let result = plan - .format(metadata.clone(), ProfSpanSetRef::default())? + .format(metadata.clone(), SharedProcessorProfiles::default())? .format_pretty()?; let line_split_result: Vec<&str> = result.lines().collect(); let formatted_plan = StringType::from_data(line_split_result); @@ -293,7 +294,7 @@ impl ExplainInterpreter { let settings = self.ctx.get_settings(); let query_id = self.ctx.get_id(); build_res.set_max_threads(settings.get_max_threads()? as usize); - let settings = ExecutorSettings::try_create(&settings, query_id)?; + let settings = ExecutorSettings::try_create(&settings, query_id.clone())?; // Drain the data if build_res.main_pipeline.is_complete_pipeline()? { @@ -309,6 +310,13 @@ impl ExplainInterpreter { while (pulling_executor.pull_data()?).is_some() {} } + let profile = + ProfileHelper::build_query_profile(&query_id, &plan, &prof_span_set.lock().unwrap())?; + + // Record the query profile + let prof_mgr = self.ctx.get_query_profile_manager(); + prof_mgr.insert(Arc::new(profile)); + let result = plan .format(metadata.clone(), prof_span_set)? .format_pretty()?; diff --git a/src/query/service/src/interpreters/interpreter_insert.rs b/src/query/service/src/interpreters/interpreter_insert.rs index 4898be2fc0cf8..778bd1bda83ec 100644 --- a/src/query/service/src/interpreters/interpreter_insert.rs +++ b/src/query/service/src/interpreters/interpreter_insert.rs @@ -212,6 +212,9 @@ impl Interpreter for InsertInterpreter { let input = exchange.input.clone(); exchange.input = Box::new(PhysicalPlan::DistributedInsertSelect(Box::new( DistributedInsertSelect { + // TODO(leiysky): we reuse the id of exchange here, + // which is not correct. We should generate a new id for insert. + plan_id: exchange.plan_id, input, catalog, table_info: table1.get_table_info().clone(), @@ -226,6 +229,9 @@ impl Interpreter for InsertInterpreter { other_plan => { // insert should wait until all nodes finished PhysicalPlan::DistributedInsertSelect(Box::new(DistributedInsertSelect { + // TODO: we reuse the id of other plan here, + // which is not correct. We should generate a new id for insert. + plan_id: other_plan.get_id(), input: Box::new(other_plan), catalog, table_info: table1.get_table_info().clone(), diff --git a/src/query/service/src/pipelines/pipeline_build_res.rs b/src/query/service/src/pipelines/pipeline_build_res.rs index f410f39cea011..2acee48a867e2 100644 --- a/src/query/service/src/pipelines/pipeline_build_res.rs +++ b/src/query/service/src/pipelines/pipeline_build_res.rs @@ -20,7 +20,7 @@ use common_pipeline_core::processors::port::OutputPort; use common_pipeline_core::Pipeline; use common_pipeline_core::SourcePipeBuilder; use common_pipeline_sources::OneBlockSource; -use common_profile::ProfSpanSetRef; +use common_profile::SharedProcessorProfiles; use crate::api::DefaultExchangeInjector; use crate::api::ExchangeInjector; @@ -32,7 +32,7 @@ pub struct PipelineBuildResult { /// Set of profiling spans for the query. /// Will be empty if profiling is disabled. - pub prof_span_set: ProfSpanSetRef, + pub prof_span_set: SharedProcessorProfiles, pub exchange_injector: Arc, } @@ -42,7 +42,7 @@ impl PipelineBuildResult { PipelineBuildResult { main_pipeline: Pipeline::create(), sources_pipelines: vec![], - prof_span_set: ProfSpanSetRef::default(), + prof_span_set: SharedProcessorProfiles::default(), exchange_injector: DefaultExchangeInjector::create(), } } @@ -61,7 +61,7 @@ impl PipelineBuildResult { Ok(PipelineBuildResult { main_pipeline, sources_pipelines: vec![], - prof_span_set: ProfSpanSetRef::default(), + prof_span_set: SharedProcessorProfiles::default(), exchange_injector: DefaultExchangeInjector::create(), }) } diff --git a/src/query/service/src/pipelines/pipeline_builder.rs b/src/query/service/src/pipelines/pipeline_builder.rs index e534747afed94..00070586b7359 100644 --- a/src/query/service/src/pipelines/pipeline_builder.rs +++ b/src/query/service/src/pipelines/pipeline_builder.rs @@ -43,7 +43,7 @@ use common_pipeline_sinks::Sinker; use common_pipeline_sinks::UnionReceiveSink; use common_pipeline_transforms::processors::transforms::build_full_sort_pipeline; use common_pipeline_transforms::processors::ProfileWrapper; -use common_profile::ProfSpanSetRef; +use common_profile::SharedProcessorProfiles; use common_sql::evaluator::BlockOperator; use common_sql::evaluator::CompoundBlockOperator; use common_sql::executor::AggregateExpand; @@ -123,7 +123,7 @@ pub struct PipelineBuilder { pub index: Option, enable_profiling: bool, - prof_span_set: ProfSpanSetRef, + prof_span_set: SharedProcessorProfiles, exchange_injector: Arc, } @@ -131,7 +131,7 @@ impl PipelineBuilder { pub fn create( ctx: Arc, enable_profiling: bool, - prof_span_set: ProfSpanSetRef, + prof_span_set: SharedProcessorProfiles, ) -> PipelineBuilder { PipelineBuilder { enable_profiling, diff --git a/src/query/service/src/schedulers/fragments/fragmenter.rs b/src/query/service/src/schedulers/fragments/fragmenter.rs index a9e0e246c4587..aa1b1c1ad7fd9 100644 --- a/src/query/service/src/schedulers/fragments/fragmenter.rs +++ b/src/query/service/src/schedulers/fragments/fragmenter.rs @@ -160,8 +160,14 @@ impl PhysicalPlanReplacer for Fragmenter { let input = self.replace(plan.input.as_ref())?; let input_schema = input.output_schema()?; + let plan_id = plan.plan_id; + let source_fragment_id = self.ctx.get_fragment_id(); let plan = PhysicalPlan::ExchangeSink(ExchangeSink { + // TODO(leiysky): we reuse the plan id here, + // should generate a new one for the sink. + plan_id, + input: Box::new(input), schema: input_schema.clone(), kind: plan.kind.clone(), @@ -205,6 +211,10 @@ impl PhysicalPlanReplacer for Fragmenter { self.fragments.push(source_fragment); Ok(PhysicalPlan::ExchangeSource(ExchangeSource { + // TODO(leiysky): we reuse the plan id here, + // should generate a new one for the source. + plan_id, + schema: input_schema, query_id: self.query_id.clone(), diff --git a/src/query/service/src/schedulers/fragments/query_fragment_actions_display.rs b/src/query/service/src/schedulers/fragments/query_fragment_actions_display.rs index 24192b3db53c3..8b73627cb0842 100644 --- a/src/query/service/src/schedulers/fragments/query_fragment_actions_display.rs +++ b/src/query/service/src/schedulers/fragments/query_fragment_actions_display.rs @@ -15,7 +15,7 @@ use std::fmt::Display; use std::fmt::Formatter; -use common_profile::ProfSpanSetRef; +use common_profile::SharedProcessorProfiles; use common_sql::MetadataRef; use crate::api::DataExchange; @@ -80,7 +80,7 @@ impl<'a> Display for QueryFragmentActionsWrap<'a> { let fragment_action = &self.inner.fragment_actions[0]; let plan_display_string = fragment_action .physical_plan - .format(self.metadata.clone(), ProfSpanSetRef::default()) + .format(self.metadata.clone(), SharedProcessorProfiles::default()) .and_then(|node| node.format_pretty_with_prefix(" ")) .unwrap(); write!(f, "{}", plan_display_string)?; diff --git a/src/query/service/src/schedulers/scheduler.rs b/src/query/service/src/schedulers/scheduler.rs index 9596be5b27dd0..397eb5c0c649d 100644 --- a/src/query/service/src/schedulers/scheduler.rs +++ b/src/query/service/src/schedulers/scheduler.rs @@ -16,7 +16,7 @@ use std::sync::Arc; use common_exception::ErrorCode; use common_exception::Result; -use common_profile::ProfSpanSetRef; +use common_profile::SharedProcessorProfiles; use crate::pipelines::PipelineBuildResult; use crate::pipelines::PipelineBuilder; @@ -67,8 +67,11 @@ pub async fn build_local_pipeline( plan: &PhysicalPlan, enable_profiling: bool, ) -> Result { - let pipeline = - PipelineBuilder::create(ctx.clone(), enable_profiling, ProfSpanSetRef::default()); + let pipeline = PipelineBuilder::create( + ctx.clone(), + enable_profiling, + SharedProcessorProfiles::default(), + ); let mut build_res = pipeline.finalize(plan)?; let settings = ctx.get_settings(); diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index 14693694793e8..91850390da3b5 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -51,6 +51,7 @@ use common_meta_app::principal::UserInfo; use common_meta_app::schema::GetTableCopiedFileReq; use common_meta_app::schema::TableInfo; use common_pipeline_core::InputError; +use common_profile::QueryProfileManager; use common_settings::ChangeValue; use common_settings::Settings; use common_storage::DataOperator; @@ -166,6 +167,10 @@ impl QueryContext { DataExchangeManager::instance() } + pub fn get_query_profile_manager(&self) -> Arc { + self.shared.get_query_profile_manager() + } + // Get the current session. pub fn get_current_session(&self) -> Arc { self.shared.session.clone() @@ -526,6 +531,10 @@ impl TableContext for QueryContext { self.query_settings.get_changes() } + fn get_query_profile_manager(&self) -> Arc { + self.shared.get_query_profile_manager() + } + // Get the storage data accessor operator from the session manager. fn get_data_operator(&self) -> Result { Ok(self.shared.data_operator.clone()) diff --git a/src/query/service/src/sessions/query_ctx_shared.rs b/src/query/service/src/sessions/query_ctx_shared.rs index 3aae351a798dc..0ff001b2a4970 100644 --- a/src/query/service/src/sessions/query_ctx_shared.rs +++ b/src/query/service/src/sessions/query_ctx_shared.rs @@ -29,6 +29,7 @@ use common_meta_app::principal::OnErrorMode; use common_meta_app::principal::RoleInfo; use common_meta_app::principal::UserInfo; use common_pipeline_core::InputError; +use common_profile::QueryProfileManager; use common_settings::ChangeValue; use common_settings::Settings; use common_storage::DataOperator; @@ -48,14 +49,6 @@ use crate::storages::Table; type DatabaseAndTable = (String, String, String); /// Data that needs to be shared in a query context. -/// This is very useful, for example, for queries: -/// USE database_1; -/// SELECT -/// (SELECT scalar FROM table_name_1) AS scalar_1, -/// (SELECT scalar FROM table_name_2) AS scalar_2, -/// (SELECT scalar FROM table_name_3) AS scalar_3 -/// FROM table_name_4; -/// For each subquery, they will share a runtime, session, progress, init_query_id pub struct QueryContextShared { /// total_scan_values for scan stats pub(in crate::sessions) total_scan_values: Arc, @@ -91,6 +84,8 @@ pub struct QueryContextShared { pub(in crate::sessions) cacheable: Arc, // Status info. pub(in crate::sessions) status: Arc>, + /// Query profile manager + pub(in crate::sessions) profile_mgr: Arc, } impl QueryContextShared { @@ -123,6 +118,7 @@ impl QueryContextShared { partitions_shas: Arc::new(RwLock::new(vec![])), cacheable: Arc::new(AtomicBool::new(true)), status: Arc::new(RwLock::new("null".to_string())), + profile_mgr: QueryProfileManager::instance(), })) } @@ -373,6 +369,10 @@ impl QueryContextShared { let status = self.status.read(); status.clone() } + + pub fn get_query_profile_manager(&self) -> Arc { + self.profile_mgr.clone() + } } impl Drop for QueryContextShared { diff --git a/src/query/service/tests/it/storages/fuse/operations/commit.rs b/src/query/service/tests/it/storages/fuse/operations/commit.rs index ded6e8f3eaba3..0b5a30d439028 100644 --- a/src/query/service/tests/it/storages/fuse/operations/commit.rs +++ b/src/query/service/tests/it/storages/fuse/operations/commit.rs @@ -84,6 +84,7 @@ use common_meta_app::schema::UpsertTableOptionReq; use common_meta_app::schema::VirtualColumnMeta; use common_meta_types::MetaId; use common_pipeline_core::InputError; +use common_profile::QueryProfileManager; use common_settings::ChangeValue; use common_settings::Settings; use common_storage::DataOperator; @@ -521,6 +522,10 @@ impl TableContext for CtxDelegation { todo!() } + fn get_query_profile_manager(&self) -> Arc { + todo!() + } + fn get_data_operator(&self) -> Result { self.ctx.get_data_operator() } diff --git a/src/query/sql/Cargo.toml b/src/query/sql/Cargo.toml index 5e7a3769056ec..d2554bf7ad1b5 100644 --- a/src/query/sql/Cargo.toml +++ b/src/query/sql/Cargo.toml @@ -28,12 +28,12 @@ common-functions = { path = "../functions" } common-license = { path = "../../common/license" } common-meta-app = { path = "../../meta/app" } common-meta-types = { path = "../../meta/types" } -common-profile = { path = "../../common/profile" } common-pipeline-core = { path = "../pipeline/core" } common-pipeline-sources = { path = "../pipeline/sources" } common-pipeline-transforms = { path = "../pipeline/transforms" } +common-profile = { path = "../profile" } common-settings = { path = "../settings" } common-storage = { path = "../../common/storage" } common-storages-parquet = { path = "../storages/parquet" } diff --git a/src/query/sql/src/executor/format.rs b/src/query/sql/src/executor/format.rs index 5168c31a1135b..92164d3d69fcd 100644 --- a/src/query/sql/src/executor/format.rs +++ b/src/query/sql/src/executor/format.rs @@ -16,7 +16,7 @@ use common_ast::ast::FormatTreeNode; use common_catalog::plan::PartStatistics; use common_exception::Result; use common_functions::BUILTIN_FUNCTIONS; -use common_profile::ProfSpanSetRef; +use common_profile::SharedProcessorProfiles; use itertools::Itertools; use super::AggregateExpand; @@ -52,7 +52,7 @@ impl PhysicalPlan { pub fn format( &self, metadata: MetadataRef, - prof_span_set: ProfSpanSetRef, + prof_span_set: SharedProcessorProfiles, ) -> Result> { to_format_tree(self, &metadata, &prof_span_set) } @@ -114,7 +114,7 @@ impl PhysicalPlan { fn to_format_tree( plan: &PhysicalPlan, metadata: &MetadataRef, - prof_span_set: &ProfSpanSetRef, + prof_span_set: &SharedProcessorProfiles, ) -> Result> { match plan { PhysicalPlan::TableScan(plan) => table_scan_to_format_tree(plan, metadata), @@ -152,6 +152,20 @@ fn to_format_tree( } } +/// Helper function to add profile info to the format tree. +fn append_profile_info( + children: &mut Vec>, + prof_set: &SharedProcessorProfiles, + plan_id: u32, +) { + if let Some(prof) = prof_set.lock().unwrap().get(&plan_id) { + children.push(FormatTreeNode::new(format!( + "total cpu time: {}ms", + prof.cpu_time.as_secs_f64() * 1000.0 + ))); + } +} + fn table_scan_to_format_tree( plan: &TableScan, metadata: &MetadataRef, @@ -269,7 +283,7 @@ fn table_scan_to_format_tree( fn filter_to_format_tree( plan: &Filter, metadata: &MetadataRef, - prof_span_set: &ProfSpanSetRef, + prof_span_set: &SharedProcessorProfiles, ) -> Result> { let filter = plan .predicates @@ -283,12 +297,7 @@ fn filter_to_format_tree( children.extend(items); } - if let Some(prof_span) = prof_span_set.lock().unwrap().get(&plan.plan_id) { - let process_time = prof_span.process_time / 1000 / 1000; // milliseconds - children.push(FormatTreeNode::new(format!( - "total process time: {process_time}ms" - ))); - } + append_profile_info(&mut children, prof_span_set, plan.plan_id); children.push(to_format_tree(&plan.input, metadata, prof_span_set)?); @@ -301,7 +310,7 @@ fn filter_to_format_tree( fn project_to_format_tree( plan: &Project, metadata: &MetadataRef, - prof_span_set: &ProfSpanSetRef, + prof_span_set: &SharedProcessorProfiles, ) -> Result> { let columns = plan .columns @@ -317,12 +326,7 @@ fn project_to_format_tree( children.extend(items); } - if let Some(prof_span) = prof_span_set.lock().unwrap().get(&plan.plan_id) { - let process_time = prof_span.process_time / 1000 / 1000; // milliseconds - children.push(FormatTreeNode::new(format!( - "total process time: {process_time}ms" - ))); - } + append_profile_info(&mut children, prof_span_set, plan.plan_id); children.push(to_format_tree(&plan.input, metadata, prof_span_set)?); @@ -335,7 +339,7 @@ fn project_to_format_tree( fn eval_scalar_to_format_tree( plan: &EvalScalar, metadata: &MetadataRef, - prof_span_set: &ProfSpanSetRef, + prof_span_set: &SharedProcessorProfiles, ) -> Result> { let scalars = plan .exprs @@ -350,12 +354,7 @@ fn eval_scalar_to_format_tree( children.extend(items); } - if let Some(prof_span) = prof_span_set.lock().unwrap().get(&plan.plan_id) { - let process_time = prof_span.process_time / 1000 / 1000; // milliseconds - children.push(FormatTreeNode::new(format!( - "total process time: {process_time}ms" - ))); - } + append_profile_info(&mut children, prof_span_set, plan.plan_id); children.push(to_format_tree(&plan.input, metadata, prof_span_set)?); @@ -380,7 +379,7 @@ pub fn pretty_display_agg_desc(desc: &AggregateFunctionDesc, metadata: &Metadata fn aggregate_expand_to_format_tree( plan: &AggregateExpand, metadata: &MetadataRef, - prof_span_set: &ProfSpanSetRef, + prof_span_set: &SharedProcessorProfiles, ) -> Result> { let sets = plan .grouping_sets @@ -402,12 +401,7 @@ fn aggregate_expand_to_format_tree( children.extend(items); } - if let Some(prof_span) = prof_span_set.lock().unwrap().get(&plan.plan_id) { - let process_time = prof_span.process_time / 1000 / 1000; // milliseconds - children.push(FormatTreeNode::new(format!( - "total process time: {process_time}ms" - ))); - } + append_profile_info(&mut children, prof_span_set, plan.plan_id); children.push(to_format_tree(&plan.input, metadata, prof_span_set)?); @@ -420,7 +414,7 @@ fn aggregate_expand_to_format_tree( fn aggregate_partial_to_format_tree( plan: &AggregatePartial, metadata: &MetadataRef, - prof_span_set: &ProfSpanSetRef, + prof_span_set: &SharedProcessorProfiles, ) -> Result> { let group_by = plan .group_by @@ -448,12 +442,7 @@ fn aggregate_partial_to_format_tree( children.extend(items); } - if let Some(prof_span) = prof_span_set.lock().unwrap().get(&plan.plan_id) { - let process_time = prof_span.process_time / 1000 / 1000; // milliseconds - children.push(FormatTreeNode::new(format!( - "total process time: {process_time}ms" - ))); - } + append_profile_info(&mut children, prof_span_set, plan.plan_id); children.push(to_format_tree(&plan.input, metadata, prof_span_set)?); @@ -466,7 +455,7 @@ fn aggregate_partial_to_format_tree( fn aggregate_final_to_format_tree( plan: &AggregateFinal, metadata: &MetadataRef, - prof_span_set: &ProfSpanSetRef, + prof_span_set: &SharedProcessorProfiles, ) -> Result> { let group_by = plan .group_by @@ -500,12 +489,7 @@ fn aggregate_final_to_format_tree( children.extend(items); } - if let Some(prof_span) = prof_span_set.lock().unwrap().get(&plan.plan_id) { - let process_time = prof_span.process_time / 1000 / 1000; // milliseconds - children.push(FormatTreeNode::new(format!( - "total process time: {process_time}ms" - ))); - } + append_profile_info(&mut children, prof_span_set, plan.plan_id); children.push(to_format_tree(&plan.input, metadata, prof_span_set)?); @@ -518,7 +502,7 @@ fn aggregate_final_to_format_tree( fn window_to_format_tree( plan: &Window, metadata: &MetadataRef, - prof_span_set: &ProfSpanSetRef, + prof_span_set: &SharedProcessorProfiles, ) -> Result> { let partition_by = plan .partition_by @@ -554,12 +538,7 @@ fn window_to_format_tree( FormatTreeNode::new(format!("frame: [{frame}]")), ]; - if let Some(prof_span) = prof_span_set.lock().unwrap().get(&plan.plan_id) { - let process_time = prof_span.process_time / 1000 / 1000; // milliseconds - children.push(FormatTreeNode::new(format!( - "total process time: {process_time}ms" - ))); - } + append_profile_info(&mut children, prof_span_set, plan.plan_id); children.push(to_format_tree(&plan.input, metadata, prof_span_set)?); @@ -572,7 +551,7 @@ fn window_to_format_tree( fn sort_to_format_tree( plan: &Sort, metadata: &MetadataRef, - prof_span_set: &ProfSpanSetRef, + prof_span_set: &SharedProcessorProfiles, ) -> Result> { let sort_keys = plan .order_by @@ -600,12 +579,7 @@ fn sort_to_format_tree( children.extend(items); } - if let Some(prof_span) = prof_span_set.lock().unwrap().get(&plan.plan_id) { - let process_time = prof_span.process_time / 1000 / 1000; // milliseconds - children.push(FormatTreeNode::new(format!( - "total process time: {process_time}ms" - ))); - } + append_profile_info(&mut children, prof_span_set, plan.plan_id); children.push(to_format_tree(&plan.input, metadata, prof_span_set)?); @@ -615,7 +589,7 @@ fn sort_to_format_tree( fn limit_to_format_tree( plan: &Limit, metadata: &MetadataRef, - prof_span_set: &ProfSpanSetRef, + prof_span_set: &SharedProcessorProfiles, ) -> Result> { let mut children = vec![ FormatTreeNode::new(format!( @@ -631,12 +605,7 @@ fn limit_to_format_tree( children.extend(items); } - if let Some(prof_span) = prof_span_set.lock().unwrap().get(&plan.plan_id) { - let process_time = prof_span.process_time / 1000 / 1000; // milliseconds - children.push(FormatTreeNode::new(format!( - "total process time: {process_time}ms" - ))); - } + append_profile_info(&mut children, prof_span_set, plan.plan_id); children.push(to_format_tree(&plan.input, metadata, prof_span_set)?); @@ -646,7 +615,7 @@ fn limit_to_format_tree( fn row_fetch_to_format_tree( plan: &RowFetch, metadata: &MetadataRef, - prof_span_set: &ProfSpanSetRef, + prof_span_set: &SharedProcessorProfiles, ) -> Result> { let table_schema = plan.source.source_info.schema(); let projected_schema = plan.cols_to_fetch.project_schema(&table_schema); @@ -662,12 +631,7 @@ fn row_fetch_to_format_tree( children.extend(items); } - if let Some(prof_span) = prof_span_set.lock().unwrap().get(&plan.plan_id) { - let process_time = prof_span.process_time / 1000 / 1000; // milliseconds - children.push(FormatTreeNode::new(format!( - "total process time: {process_time}ms" - ))); - } + append_profile_info(&mut children, prof_span_set, plan.plan_id); children.push(to_format_tree(&plan.input, metadata, prof_span_set)?); @@ -680,7 +644,7 @@ fn row_fetch_to_format_tree( fn range_join_to_format_tree( plan: &RangeJoin, metadata: &MetadataRef, - prof_span_set: &ProfSpanSetRef, + prof_span_set: &SharedProcessorProfiles, ) -> Result> { let range_join_conditions = plan .conditions @@ -722,12 +686,7 @@ fn range_join_to_format_tree( children.extend(items); } - if let Some(prof_span) = prof_span_set.lock().unwrap().get(&plan.plan_id) { - let process_time = prof_span.process_time / 1000 / 1000; // milliseconds - children.push(FormatTreeNode::new(format!( - "total process time: {process_time}ms" - ))); - } + append_profile_info(&mut children, prof_span_set, plan.plan_id); children.push(left_child); children.push(right_child); @@ -744,7 +703,7 @@ fn range_join_to_format_tree( fn hash_join_to_format_tree( plan: &HashJoin, metadata: &MetadataRef, - prof_span_set: &ProfSpanSetRef, + prof_span_set: &SharedProcessorProfiles, ) -> Result> { let build_keys = plan .build_keys @@ -783,12 +742,7 @@ fn hash_join_to_format_tree( children.extend(items); } - if let Some(prof_span) = prof_span_set.lock().unwrap().get(&plan.plan_id) { - let process_time = prof_span.process_time / 1000 / 1000; // milliseconds - children.push(FormatTreeNode::new(format!( - "total process time: {process_time}ms" - ))); - } + append_profile_info(&mut children, prof_span_set, plan.plan_id); children.push(build_child); children.push(probe_child); @@ -802,7 +756,7 @@ fn hash_join_to_format_tree( fn exchange_to_format_tree( plan: &Exchange, metadata: &MetadataRef, - prof_span_set: &ProfSpanSetRef, + prof_span_set: &SharedProcessorProfiles, ) -> Result> { Ok(FormatTreeNode::with_children("Exchange".to_string(), vec![ FormatTreeNode::new(format!("exchange type: {}", match plan.kind { @@ -825,7 +779,7 @@ fn exchange_to_format_tree( fn union_all_to_format_tree( plan: &UnionAll, metadata: &MetadataRef, - prof_span_set: &ProfSpanSetRef, + prof_span_set: &SharedProcessorProfiles, ) -> Result> { let mut children = vec![]; @@ -834,12 +788,7 @@ fn union_all_to_format_tree( children.extend(items); } - if let Some(prof_span) = prof_span_set.lock().unwrap().get(&plan.plan_id) { - let process_time = prof_span.process_time / 1000 / 1000; // milliseconds - children.push(FormatTreeNode::new(format!( - "total process time: {process_time}ms" - ))); - } + append_profile_info(&mut children, prof_span_set, plan.plan_id); children.extend(vec![ to_format_tree(&plan.left, metadata, prof_span_set)?, @@ -899,7 +848,7 @@ fn exchange_source_to_format_tree(plan: &ExchangeSource) -> Result Result> { let mut children = vec![]; @@ -919,7 +868,7 @@ fn exchange_sink_to_format_tree( fn distributed_insert_to_format_tree( plan: &DistributedInsertSelect, metadata: &MetadataRef, - prof_span_set: &ProfSpanSetRef, + prof_span_set: &SharedProcessorProfiles, ) -> Result> { let children = vec![to_format_tree(&plan.input, metadata, prof_span_set)?]; @@ -932,7 +881,7 @@ fn distributed_insert_to_format_tree( fn project_set_to_format_tree( plan: &ProjectSet, metadata: &MetadataRef, - prof_span_set: &ProfSpanSetRef, + prof_span_set: &SharedProcessorProfiles, ) -> Result> { let mut children = vec![]; @@ -941,12 +890,7 @@ fn project_set_to_format_tree( children.extend(items); } - if let Some(prof_span) = prof_span_set.lock().unwrap().get(&plan.plan_id) { - let process_time = prof_span.process_time / 1000 / 1000; // milliseconds - children.push(FormatTreeNode::new(format!( - "total process time: {process_time}ms" - ))); - } + append_profile_info(&mut children, prof_span_set, plan.plan_id); children.extend(vec![FormatTreeNode::new(format!( "set returning functions: {}", @@ -968,7 +912,7 @@ fn project_set_to_format_tree( fn runtime_filter_source_to_format_tree( plan: &RuntimeFilterSource, metadata: &MetadataRef, - prof_span_set: &ProfSpanSetRef, + prof_span_set: &SharedProcessorProfiles, ) -> Result> { let children = vec![ to_format_tree(&plan.left_side, metadata, prof_span_set)?, diff --git a/src/query/sql/src/executor/mod.rs b/src/query/sql/src/executor/mod.rs index fa6ce91bc2ef3..3bf9ce5b6cef2 100644 --- a/src/query/sql/src/executor/mod.rs +++ b/src/query/sql/src/executor/mod.rs @@ -19,6 +19,7 @@ mod physical_plan; mod physical_plan_builder; mod physical_plan_display; mod physical_plan_visitor; +mod profile; pub mod table_read_plan; mod util; @@ -31,4 +32,5 @@ pub use physical_plan::*; pub use physical_plan_builder::PhysicalPlanBuilder; pub use physical_plan_builder::RangeJoinCondition; pub use physical_plan_visitor::PhysicalPlanReplacer; +pub use profile::*; pub use util::*; diff --git a/src/query/sql/src/executor/physical_plan.rs b/src/query/sql/src/executor/physical_plan.rs index 6f07d3f44f77a..ba3e04ba05bd6 100644 --- a/src/query/sql/src/executor/physical_plan.rs +++ b/src/query/sql/src/executor/physical_plan.rs @@ -569,6 +569,9 @@ impl RangeJoin { #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub struct Exchange { + /// A unique id of operator in a `PhysicalPlan` tree. + pub plan_id: u32, + pub input: Box, pub kind: FragmentKind, pub keys: Vec, @@ -582,6 +585,9 @@ impl Exchange { #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub struct ExchangeSource { + /// A unique id of operator in a `PhysicalPlan` tree. + pub plan_id: u32, + /// Output schema of exchanged data pub schema: DataSchemaRef, @@ -609,6 +615,9 @@ pub enum FragmentKind { #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub struct ExchangeSink { + /// A unique id of operator in a `PhysicalPlan` tree. + pub plan_id: u32, + pub input: Box, /// Input schema of exchanged data pub schema: DataSchemaRef, @@ -651,6 +660,9 @@ impl UnionAll { #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub struct DistributedInsertSelect { + /// A unique id of operator in a `PhysicalPlan` tree. + pub plan_id: u32, + pub input: Box, pub catalog: String, pub table_info: TableInfo, @@ -724,6 +736,32 @@ impl PhysicalPlan { ) } + /// Get the id of the plan node + pub fn get_id(&self) -> u32 { + match self { + PhysicalPlan::TableScan(v) => v.plan_id, + PhysicalPlan::Filter(v) => v.plan_id, + PhysicalPlan::Project(v) => v.plan_id, + PhysicalPlan::EvalScalar(v) => v.plan_id, + PhysicalPlan::ProjectSet(v) => v.plan_id, + PhysicalPlan::AggregateExpand(v) => v.plan_id, + PhysicalPlan::AggregatePartial(v) => v.plan_id, + PhysicalPlan::AggregateFinal(v) => v.plan_id, + PhysicalPlan::Window(v) => v.plan_id, + PhysicalPlan::Sort(v) => v.plan_id, + PhysicalPlan::Limit(v) => v.plan_id, + PhysicalPlan::RowFetch(v) => v.plan_id, + PhysicalPlan::HashJoin(v) => v.plan_id, + PhysicalPlan::RangeJoin(v) => v.plan_id, + PhysicalPlan::Exchange(v) => v.plan_id, + PhysicalPlan::UnionAll(v) => v.plan_id, + PhysicalPlan::RuntimeFilterSource(v) => v.plan_id, + PhysicalPlan::DistributedInsertSelect(v) => v.plan_id, + PhysicalPlan::ExchangeSource(v) => v.plan_id, + PhysicalPlan::ExchangeSink(v) => v.plan_id, + } + } + pub fn output_schema(&self) -> Result { match self { PhysicalPlan::TableScan(plan) => plan.output_schema(), diff --git a/src/query/sql/src/executor/physical_plan_builder.rs b/src/query/sql/src/executor/physical_plan_builder.rs index 3bdf2732ad951..ad10d970c3305 100644 --- a/src/query/sql/src/executor/physical_plan_builder.rs +++ b/src/query/sql/src/executor/physical_plan_builder.rs @@ -590,6 +590,7 @@ impl PhysicalPlanBuilder { .data_type(); PhysicalPlan::Exchange(PhysicalExchange { + plan_id: self.next_plan_id(), kind, input: Box::new(PhysicalPlan::AggregatePartial( aggregate_partial, @@ -787,6 +788,7 @@ impl PhysicalPlanBuilder { Exchange::Merge => FragmentKind::Merge, }; Ok(PhysicalPlan::Exchange(PhysicalExchange { + plan_id: self.next_plan_id(), input, kind, keys, diff --git a/src/query/sql/src/executor/physical_plan_visitor.rs b/src/query/sql/src/executor/physical_plan_visitor.rs index 9ff978c59bf7e..3f7ca9dccfa42 100644 --- a/src/query/sql/src/executor/physical_plan_visitor.rs +++ b/src/query/sql/src/executor/physical_plan_visitor.rs @@ -231,6 +231,7 @@ pub trait PhysicalPlanReplacer { let input = self.replace(&plan.input)?; Ok(PhysicalPlan::Exchange(Exchange { + plan_id: plan.plan_id, input: Box::new(input), kind: plan.kind.clone(), keys: plan.keys.clone(), @@ -245,6 +246,10 @@ pub trait PhysicalPlanReplacer { let input = self.replace(&plan.input)?; Ok(PhysicalPlan::ExchangeSink(ExchangeSink { + // TODO(leiysky): we reuse the plan id of the Exchange node here, + // should generate a new one. + plan_id: plan.plan_id, + input: Box::new(input), schema: plan.schema.clone(), kind: plan.kind.clone(), @@ -273,6 +278,7 @@ pub trait PhysicalPlanReplacer { Ok(PhysicalPlan::DistributedInsertSelect(Box::new( DistributedInsertSelect { + plan_id: plan.plan_id, input: Box::new(input), catalog: plan.catalog.clone(), table_info: plan.table_info.clone(), diff --git a/src/query/sql/src/executor/profile.rs b/src/query/sql/src/executor/profile.rs new file mode 100644 index 0000000000000..ac6d6628ad592 --- /dev/null +++ b/src/query/sql/src/executor/profile.rs @@ -0,0 +1,305 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_exception::ErrorCode; +use common_exception::Result; +use common_profile::PlanNodeProfile; +use common_profile::ProcessorProfiles; +use common_profile::QueryProfile; + +use crate::executor::PhysicalPlan; + +pub struct ProfileHelper; + +impl ProfileHelper { + pub fn build_query_profile( + query_id: &str, + plan: &PhysicalPlan, + profs: &ProcessorProfiles, + ) -> Result { + let mut plan_node_profs = vec![]; + flatten_plan_node_profile(plan, profs, &mut plan_node_profs)?; + + Ok(QueryProfile::new(query_id.to_string(), plan_node_profs)) + } +} + +fn flatten_plan_node_profile( + plan: &PhysicalPlan, + profs: &ProcessorProfiles, + plan_node_profs: &mut Vec, +) -> Result<()> { + match plan { + PhysicalPlan::TableScan(scan) => { + let prof = PlanNodeProfile { + id: scan.plan_id, + plan_node_name: "TableScan".to_string(), + description: "".to_string(), + // We don't record the time spent on table scan for now + cpu_time: Default::default(), + }; + plan_node_profs.push(prof); + } + PhysicalPlan::Filter(filter) => { + flatten_plan_node_profile(&filter.input, profs, plan_node_profs)?; + let proc_prof = profs + .get(&filter.plan_id) + .ok_or_else(|| ErrorCode::Internal("Plan node profile not found"))?; + let prof = PlanNodeProfile { + id: filter.plan_id, + plan_node_name: "Filter".to_string(), + description: "".to_string(), + cpu_time: proc_prof.cpu_time, + }; + plan_node_profs.push(prof); + } + PhysicalPlan::Project(project) => { + flatten_plan_node_profile(&project.input, profs, plan_node_profs)?; + let proc_prof = profs + .get(&project.plan_id) + .ok_or_else(|| ErrorCode::Internal("Plan node profile not found"))?; + let prof = PlanNodeProfile { + id: project.plan_id, + plan_node_name: "Project".to_string(), + description: "".to_string(), + cpu_time: proc_prof.cpu_time, + }; + plan_node_profs.push(prof); + } + PhysicalPlan::EvalScalar(eval) => { + flatten_plan_node_profile(&eval.input, profs, plan_node_profs)?; + let proc_prof = profs + .get(&eval.plan_id) + .ok_or_else(|| ErrorCode::Internal("Plan node profile not found"))?; + let prof = PlanNodeProfile { + id: eval.plan_id, + plan_node_name: "EvalScalar".to_string(), + description: "".to_string(), + cpu_time: proc_prof.cpu_time, + }; + plan_node_profs.push(prof); + } + PhysicalPlan::ProjectSet(project_set) => { + flatten_plan_node_profile(&project_set.input, profs, plan_node_profs)?; + let proc_prof = profs + .get(&project_set.plan_id) + .ok_or_else(|| ErrorCode::Internal("Plan node profile not found"))?; + let prof = PlanNodeProfile { + id: project_set.plan_id, + plan_node_name: "ProjectSet".to_string(), + description: "".to_string(), + cpu_time: proc_prof.cpu_time, + }; + plan_node_profs.push(prof); + } + PhysicalPlan::AggregateExpand(expand) => { + flatten_plan_node_profile(&expand.input, profs, plan_node_profs)?; + let proc_prof = profs + .get(&expand.plan_id) + .ok_or_else(|| ErrorCode::Internal("Plan node profile not found"))?; + let prof = PlanNodeProfile { + id: expand.plan_id, + plan_node_name: "AggregateExpand".to_string(), + description: "".to_string(), + cpu_time: proc_prof.cpu_time, + }; + plan_node_profs.push(prof); + } + PhysicalPlan::AggregatePartial(agg_partial) => { + flatten_plan_node_profile(&agg_partial.input, profs, plan_node_profs)?; + let proc_prof = profs + .get(&agg_partial.plan_id) + .ok_or_else(|| ErrorCode::Internal("Plan node profile not found"))?; + let prof = PlanNodeProfile { + id: agg_partial.plan_id, + plan_node_name: "AggregatePartial".to_string(), + description: "".to_string(), + cpu_time: proc_prof.cpu_time, + }; + plan_node_profs.push(prof); + } + PhysicalPlan::AggregateFinal(agg_final) => { + flatten_plan_node_profile(&agg_final.input, profs, plan_node_profs)?; + let proc_prof = profs + .get(&agg_final.plan_id) + .ok_or_else(|| ErrorCode::Internal("Plan node profile not found"))?; + let prof = PlanNodeProfile { + id: agg_final.plan_id, + plan_node_name: "AggregateFinal".to_string(), + description: "".to_string(), + cpu_time: proc_prof.cpu_time, + }; + plan_node_profs.push(prof); + } + PhysicalPlan::Window(window) => { + flatten_plan_node_profile(&window.input, profs, plan_node_profs)?; + let proc_prof = profs + .get(&window.plan_id) + .ok_or_else(|| ErrorCode::Internal("Plan node profile not found"))?; + let prof = PlanNodeProfile { + id: window.plan_id, + plan_node_name: "Window".to_string(), + description: "".to_string(), + cpu_time: proc_prof.cpu_time, + }; + plan_node_profs.push(prof); + } + PhysicalPlan::Sort(sort) => { + flatten_plan_node_profile(&sort.input, profs, plan_node_profs)?; + let proc_prof = profs + .get(&sort.plan_id) + .ok_or_else(|| ErrorCode::Internal("Plan node profile not found"))?; + let prof = PlanNodeProfile { + id: sort.plan_id, + plan_node_name: "Sort".to_string(), + description: "".to_string(), + cpu_time: proc_prof.cpu_time, + }; + plan_node_profs.push(prof); + } + PhysicalPlan::Limit(limit) => { + flatten_plan_node_profile(&limit.input, profs, plan_node_profs)?; + let proc_prof = profs + .get(&limit.plan_id) + .ok_or_else(|| ErrorCode::Internal("Plan node profile not found"))?; + let prof = PlanNodeProfile { + id: limit.plan_id, + plan_node_name: "Limit".to_string(), + description: "".to_string(), + cpu_time: proc_prof.cpu_time, + }; + plan_node_profs.push(prof); + } + PhysicalPlan::RowFetch(fetch) => { + flatten_plan_node_profile(&fetch.input, profs, plan_node_profs)?; + let proc_prof = profs + .get(&fetch.plan_id) + .ok_or_else(|| ErrorCode::Internal("Plan node profile not found"))?; + let prof = PlanNodeProfile { + id: fetch.plan_id, + plan_node_name: "RowFetch".to_string(), + description: "".to_string(), + cpu_time: proc_prof.cpu_time, + }; + plan_node_profs.push(prof); + } + PhysicalPlan::HashJoin(hash_join) => { + flatten_plan_node_profile(&hash_join.probe, profs, plan_node_profs)?; + flatten_plan_node_profile(&hash_join.build, profs, plan_node_profs)?; + let proc_prof = profs + .get(&hash_join.plan_id) + .ok_or_else(|| ErrorCode::Internal("Plan node profile not found"))?; + let prof = PlanNodeProfile { + id: hash_join.plan_id, + plan_node_name: "HashJoin".to_string(), + description: "".to_string(), + cpu_time: proc_prof.cpu_time, + }; + plan_node_profs.push(prof); + } + PhysicalPlan::RangeJoin(range_join) => { + flatten_plan_node_profile(&range_join.left, profs, plan_node_profs)?; + flatten_plan_node_profile(&range_join.right, profs, plan_node_profs)?; + let proc_prof = profs + .get(&range_join.plan_id) + .ok_or_else(|| ErrorCode::Internal("Plan node profile not found"))?; + let prof = PlanNodeProfile { + id: range_join.plan_id, + plan_node_name: "RangeJoin".to_string(), + description: "".to_string(), + cpu_time: proc_prof.cpu_time, + }; + plan_node_profs.push(prof); + } + PhysicalPlan::Exchange(exchange) => { + flatten_plan_node_profile(&exchange.input, profs, plan_node_profs)?; + let proc_prof = profs + .get(&exchange.plan_id) + .ok_or_else(|| ErrorCode::Internal("Plan node profile not found"))?; + let prof = PlanNodeProfile { + id: exchange.plan_id, + plan_node_name: "Exchange".to_string(), + description: "".to_string(), + cpu_time: proc_prof.cpu_time, + }; + plan_node_profs.push(prof); + } + PhysicalPlan::UnionAll(union) => { + flatten_plan_node_profile(&union.left, profs, plan_node_profs)?; + flatten_plan_node_profile(&union.right, profs, plan_node_profs)?; + let proc_prof = profs + .get(&union.plan_id) + .ok_or_else(|| ErrorCode::Internal("Plan node profile not found"))?; + let prof = PlanNodeProfile { + id: union.plan_id, + plan_node_name: "UnionAll".to_string(), + description: "".to_string(), + cpu_time: proc_prof.cpu_time, + }; + plan_node_profs.push(prof); + } + PhysicalPlan::RuntimeFilterSource(source) => { + let proc_prof = profs + .get(&source.plan_id) + .ok_or_else(|| ErrorCode::Internal("Plan node profile not found"))?; + let prof = PlanNodeProfile { + id: source.plan_id, + plan_node_name: "RuntimeFilterSource".to_string(), + description: "".to_string(), + cpu_time: proc_prof.cpu_time, + }; + plan_node_profs.push(prof); + } + PhysicalPlan::DistributedInsertSelect(select) => { + flatten_plan_node_profile(&select.input, profs, plan_node_profs)?; + let proc_prof = profs + .get(&select.plan_id) + .ok_or_else(|| ErrorCode::Internal("Plan node profile not found"))?; + let prof = PlanNodeProfile { + id: select.plan_id, + plan_node_name: "DistributedInsertSelect".to_string(), + description: "".to_string(), + cpu_time: proc_prof.cpu_time, + }; + plan_node_profs.push(prof); + } + PhysicalPlan::ExchangeSource(source) => { + let proc_prof = profs + .get(&source.plan_id) + .ok_or_else(|| ErrorCode::Internal("Plan node profile not found"))?; + let prof = PlanNodeProfile { + id: source.plan_id, + plan_node_name: "ExchangeSource".to_string(), + description: "".to_string(), + cpu_time: proc_prof.cpu_time, + }; + plan_node_profs.push(prof); + } + PhysicalPlan::ExchangeSink(sink) => { + flatten_plan_node_profile(&sink.input, profs, plan_node_profs)?; + let proc_prof = profs + .get(&sink.plan_id) + .ok_or_else(|| ErrorCode::Internal("Plan node profile not found"))?; + let prof = PlanNodeProfile { + id: sink.plan_id, + plan_node_name: "ExchangeSink".to_string(), + description: "".to_string(), + cpu_time: proc_prof.cpu_time, + }; + plan_node_profs.push(prof); + } + } + + Ok(()) +} diff --git a/src/query/storages/system/src/lib.rs b/src/query/storages/system/src/lib.rs index 9811adeef37d9..1948721b56c4d 100644 --- a/src/query/storages/system/src/lib.rs +++ b/src/query/storages/system/src/lib.rs @@ -38,6 +38,7 @@ mod one_table; mod processes_table; mod query_cache_table; mod query_log_table; +mod query_profile_table; mod roles_table; mod settings_table; mod stages_table; @@ -76,6 +77,7 @@ pub use query_log_table::LogType; pub use query_log_table::QueryLogElement; pub use query_log_table::QueryLogQueue; pub use query_log_table::QueryLogTable; +pub use query_profile_table::QueryProfileTable; pub use roles_table::RolesTable; pub use settings_table::SettingsTable; pub use stages_table::StagesTable; diff --git a/src/query/storages/system/src/query_profile_table.rs b/src/query/storages/system/src/query_profile_table.rs new file mode 100644 index 0000000000000..c2af843fa8456 --- /dev/null +++ b/src/query/storages/system/src/query_profile_table.rs @@ -0,0 +1,102 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_catalog::table::Table; +use common_catalog::table_context::TableContext; +use common_expression::types::NumberDataType; +use common_expression::types::StringType; +use common_expression::types::UInt32Type; +use common_expression::types::UInt64Type; +use common_expression::DataBlock; +use common_expression::FromData; +use common_expression::TableDataType; +use common_expression::TableField; +use common_expression::TableSchemaRefExt; +use common_meta_app::schema::TableIdent; +use common_meta_app::schema::TableInfo; +use common_meta_app::schema::TableMeta; + +use crate::SyncOneBlockSystemTable; +use crate::SyncSystemTable; + +pub struct QueryProfileTable { + table_info: TableInfo, +} + +impl QueryProfileTable { + pub fn create(table_id: u64) -> Arc { + let schema = TableSchemaRefExt::create(vec![ + TableField::new("query_id", TableDataType::String), + TableField::new("plan_id", TableDataType::Number(NumberDataType::UInt32)), + TableField::new("plan_name", TableDataType::String), + TableField::new("description", TableDataType::String), + TableField::new("cpu_time", TableDataType::Number(NumberDataType::UInt64)), + ]); + + let table_info = TableInfo { + desc: "'system'.'query_profile'".to_string(), + ident: TableIdent::new(table_id, 0), + name: "query_profile".to_string(), + meta: TableMeta { + schema, + engine: "QueryProfile".to_string(), + ..Default::default() + }, + ..Default::default() + }; + + SyncOneBlockSystemTable::create(Self { table_info }) + } +} + +impl SyncSystemTable for QueryProfileTable { + const NAME: &'static str = "system.query_profile"; + + fn get_table_info(&self) -> &TableInfo { + &self.table_info + } + + fn get_full_data(&self, ctx: Arc) -> common_exception::Result { + let profile_mgr = ctx.get_query_profile_manager(); + let query_profs = profile_mgr.list_all(); + + let mut query_ids: Vec> = Vec::with_capacity(query_profs.len()); + let mut plan_ids: Vec = Vec::with_capacity(query_profs.len()); + let mut plan_names: Vec> = Vec::with_capacity(query_profs.len()); + let mut descriptions: Vec> = Vec::with_capacity(query_profs.len()); + let mut cpu_times: Vec = Vec::with_capacity(query_profs.len()); + + for prof in query_profs.iter() { + for plan_prof in prof.plan_node_profs.iter() { + query_ids.push(prof.query_id.clone().into_bytes()); + plan_ids.push(plan_prof.id); + plan_names.push(plan_prof.plan_node_name.clone().into_bytes()); + descriptions.push(plan_prof.description.clone().into_bytes()); + cpu_times.push(plan_prof.cpu_time.as_nanos() as u64); + } + } + + let block = DataBlock::new_from_columns(vec![ + StringType::from_data(query_ids), + UInt32Type::from_data(plan_ids), + StringType::from_data(plan_names), + StringType::from_data(descriptions), + UInt64Type::from_data(cpu_times), + ]); + + Ok(block) + } +}