From 48dbda1ebfb43432dc0e86216b36d356c33d675a Mon Sep 17 00:00:00 2001 From: Winter Zhang Date: Wed, 8 Nov 2023 18:02:17 +0800 Subject: [PATCH] feat(executor): processor level profile(part 1) (#13637) * feat(executor): processor level profile * feat(executor): processor level profile * feat(executor): processor level profile * feat(executor): processor level profile --- src/query/pipeline/core/src/processors/mod.rs | 1 + .../pipeline/core/src/processors/processor.rs | 9 ++++ .../pipeline/core/src/processors/profile.rs | 40 ++++++++++++++ .../src/pipelines/executor/executor_graph.rs | 26 +++++++-- .../pipelines/executor/executor_settings.rs | 3 ++ .../src/pipelines/executor/executor_tasks.rs | 16 +++++- .../executor/executor_worker_context.rs | 28 +++++++--- .../pipelines/executor/pipeline_executor.rs | 53 ++++++++++++++++--- .../executor/processor_async_task.rs | 15 ++++-- .../pipelines/executor/pipeline_executor.rs | 1 + 10 files changed, 168 insertions(+), 24 deletions(-) create mode 100644 src/query/pipeline/core/src/processors/profile.rs diff --git a/src/query/pipeline/core/src/processors/mod.rs b/src/query/pipeline/core/src/processors/mod.rs index 629c052e2257b..cb6d8731f1e85 100644 --- a/src/query/pipeline/core/src/processors/mod.rs +++ b/src/query/pipeline/core/src/processors/mod.rs @@ -17,6 +17,7 @@ pub mod processor; mod duplicate_processor; mod port_trigger; +pub mod profile; mod resize_processor; mod shuffle_processor; diff --git a/src/query/pipeline/core/src/processors/processor.rs b/src/query/pipeline/core/src/processors/processor.rs index e7c167367f5ce..cb8adc6e1b395 100644 --- a/src/query/pipeline/core/src/processors/processor.rs +++ b/src/query/pipeline/core/src/processors/processor.rs @@ -24,6 +24,8 @@ use minitrace::prelude::*; use petgraph::graph::node_index; use petgraph::prelude::NodeIndex; +use crate::processors::profile::Profile; + #[derive(Debug)] pub enum Event { NeedData, @@ -78,6 +80,8 @@ pub trait Processor: Send { async fn async_process(&mut self) -> Result<()> { Err(ErrorCode::Unimplemented("Unimplemented async_process.")) } + + fn record_profile(&self, _profile: &Profile) {} } #[derive(Clone)] @@ -128,6 +132,11 @@ impl ProcessorPtr { (*self.inner.get()).un_reacted(cause, self.id().index()) } + /// # Safety + pub unsafe fn record_profile(&self, profile: &Profile) { + (*self.inner.get()).record_profile(profile) + } + /// # Safety pub unsafe fn interrupt(&self) { (*self.inner.get()).interrupt() diff --git a/src/query/pipeline/core/src/processors/profile.rs b/src/query/pipeline/core/src/processors/profile.rs new file mode 100644 index 0000000000000..e556b1e5cb762 --- /dev/null +++ b/src/query/pipeline/core/src/processors/profile.rs @@ -0,0 +1,40 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::atomic::AtomicU64; + +#[derive(Default)] +pub struct Profile { + /// The id of processor + pub pid: usize, + /// The name of processor + pub p_name: String, + + /// The time spent to process in nanoseconds + pub cpu_time: AtomicU64, + /// The time spent to wait in nanoseconds, usually used to + /// measure the time spent on waiting for I/O + pub wait_time: AtomicU64, +} + +impl Profile { + pub fn create(pid: usize, p_name: String) -> Profile { + Profile { + pid, + p_name, + cpu_time: AtomicU64::new(0), + wait_time: AtomicU64::new(0), + } + } +} diff --git a/src/query/service/src/pipelines/executor/executor_graph.rs b/src/query/service/src/pipelines/executor/executor_graph.rs index 0b72e8885e5f2..a40b271af6abb 100644 --- a/src/query/service/src/pipelines/executor/executor_graph.rs +++ b/src/query/service/src/pipelines/executor/executor_graph.rs @@ -24,6 +24,7 @@ use common_base::runtime::TrySpawn; use common_exception::ErrorCode; use common_exception::Result; use common_pipeline_core::processors::processor::EventCause; +use common_pipeline_core::processors::profile::Profile; use log::debug; use log::trace; use minitrace::prelude::*; @@ -62,10 +63,11 @@ struct EdgeInfo { output_index: usize, } -struct Node { +pub(crate) struct Node { state: std::sync::Mutex, - processor: ProcessorPtr, + pub(crate) processor: ProcessorPtr, + pub(crate) profile: Arc, updated_list: Arc, inputs_port: Vec>, outputs_port: Vec>, @@ -73,16 +75,19 @@ struct Node { impl Node { pub fn create( + pid: usize, processor: &ProcessorPtr, inputs_port: &[Arc], outputs_port: &[Arc], ) -> Arc { + let p_name = unsafe { processor.name() }; Arc::new(Node { state: std::sync::Mutex::new(State::Idle), processor: processor.clone(), updated_list: UpdateList::create(), inputs_port: inputs_port.to_vec(), outputs_port: outputs_port.to_vec(), + profile: Arc::new(Profile::create(pid, p_name)), }) } @@ -145,7 +150,9 @@ impl ExecutingGraph { let mut pipe_edges = Vec::with_capacity(pipe.output_length); for item in &pipe.items { - let node = Node::create(&item.processor, &item.inputs_port, &item.outputs_port); + let pid = graph.node_count(); + let node = + Node::create(pid, &item.processor, &item.inputs_port, &item.outputs_port); let graph_node_index = graph.add_node(node.clone()); unsafe { @@ -229,6 +236,7 @@ impl ExecutingGraph { let mut need_schedule_edges = VecDeque::new(); need_schedule_nodes.push_back(index); + while !need_schedule_nodes.is_empty() || !need_schedule_edges.is_empty() { // To avoid lock too many times, we will try to cache lock. let mut state_guard_cache = None; @@ -426,6 +434,18 @@ impl RunningGraph { Ok(schedule_queue) } + pub(crate) fn get_node(&self, pid: NodeIndex) -> &Node { + &self.0.graph[pid] + } + + pub fn get_proc_profiles(&self) -> Vec> { + self.0 + .graph + .node_weights() + .map(|x| x.profile.clone()) + .collect::>() + } + pub fn interrupt_running_nodes(&self) { unsafe { for node_index in self.0.graph.node_indices() { diff --git a/src/query/service/src/pipelines/executor/executor_settings.rs b/src/query/service/src/pipelines/executor/executor_settings.rs index b937ee680db1b..095b77ee6b2c8 100644 --- a/src/query/service/src/pipelines/executor/executor_settings.rs +++ b/src/query/service/src/pipelines/executor/executor_settings.rs @@ -21,13 +21,16 @@ use common_settings::Settings; #[derive(Clone)] pub struct ExecutorSettings { pub query_id: Arc, + pub enable_profiling: bool, pub max_execute_time_in_seconds: Duration, } impl ExecutorSettings { pub fn try_create(settings: &Settings, query_id: String) -> Result { + let enable_profiling = settings.get_enable_query_profiling()?; let max_execute_time_in_seconds = settings.get_max_execute_time_in_seconds()?; Ok(ExecutorSettings { + enable_profiling, query_id: Arc::new(query_id), max_execute_time_in_seconds: Duration::from_secs(max_execute_time_in_seconds), }) diff --git a/src/query/service/src/pipelines/executor/executor_tasks.rs b/src/query/service/src/pipelines/executor/executor_tasks.rs index deb16bf9b59f1..e51120fcc916e 100644 --- a/src/query/service/src/pipelines/executor/executor_tasks.rs +++ b/src/query/service/src/pipelines/executor/executor_tasks.rs @@ -16,6 +16,7 @@ use std::collections::VecDeque; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use std::sync::Arc; +use std::time::Duration; use common_base::base::tokio::sync::Notify; use common_exception::Result; @@ -197,11 +198,22 @@ pub struct CompletedAsyncTask { pub id: NodeIndex, pub worker_id: usize, pub res: Result<()>, + pub elapsed: Option, } impl CompletedAsyncTask { - pub fn create(id: NodeIndex, worker_id: usize, res: Result<()>) -> Self { - CompletedAsyncTask { id, worker_id, res } + pub fn create( + id: NodeIndex, + worker_id: usize, + res: Result<()>, + elapsed: Option, + ) -> Self { + CompletedAsyncTask { + id, + worker_id, + res, + elapsed, + } } } diff --git a/src/query/service/src/pipelines/executor/executor_worker_context.rs b/src/query/service/src/pipelines/executor/executor_worker_context.rs index 24643afb4ac81..561303d90b41a 100644 --- a/src/query/service/src/pipelines/executor/executor_worker_context.rs +++ b/src/query/service/src/pipelines/executor/executor_worker_context.rs @@ -15,6 +15,8 @@ use std::fmt::Debug; use std::fmt::Formatter; use std::sync::Arc; +use std::time::Duration; +use std::time::Instant; use common_exception::ErrorCode; use common_exception::Result; @@ -67,20 +69,34 @@ impl ExecutorWorkerContext { std::mem::replace(&mut self.task, ExecutorTask::None) } - pub unsafe fn execute_task(&mut self) -> Result> { + pub unsafe fn execute_task( + &mut self, + ) -> Result<(NodeIndex, bool, Option)> { match std::mem::replace(&mut self.task, ExecutorTask::None) { ExecutorTask::None => Err(ErrorCode::Internal("Execute none task.")), - ExecutorTask::Sync(processor) => self.execute_sync_task(processor), + ExecutorTask::Sync(processor) => self.execute_sync_task::(processor), ExecutorTask::AsyncCompleted(task) => match task.res { - Ok(_) => Ok(Some(task.id)), + Ok(_) => Ok((task.id, true, task.elapsed)), Err(cause) => Err(cause), }, } } - unsafe fn execute_sync_task(&mut self, processor: ProcessorPtr) -> Result> { - processor.process()?; - Ok(Some(processor.id())) + unsafe fn execute_sync_task( + &mut self, + proc: ProcessorPtr, + ) -> Result<(NodeIndex, bool, Option)> { + match ENABLE_PROFILING { + true => { + let instant = Instant::now(); + proc.process()?; + Ok((proc.id(), false, Some(instant.elapsed()))) + } + false => { + proc.process()?; + Ok((proc.id(), false, None)) + } + } } pub fn get_workers_condvar(&self) -> &Arc { diff --git a/src/query/service/src/pipelines/executor/pipeline_executor.rs b/src/query/service/src/pipelines/executor/pipeline_executor.rs index b4d8a04c6ef8d..f1f748e2c1df3 100644 --- a/src/query/service/src/pipelines/executor/pipeline_executor.rs +++ b/src/query/service/src/pipelines/executor/pipeline_executor.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::intrinsics::assume; +use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Instant; @@ -26,6 +28,7 @@ use common_base::runtime::TrySpawn; use common_base::GLOBAL_TASK; use common_exception::ErrorCode; use common_exception::Result; +use common_pipeline_core::processors::profile::Profile; use common_pipeline_core::LockGuard; use futures::future::select; use futures_util::future::Either; @@ -354,8 +357,14 @@ impl PipelineExecutor { thread_join_handles.push(Thread::named_spawn(Some(name), move || unsafe { let _g = span.set_local_parent(); let this_clone = this.clone(); + let enable_profiling = this.settings.enable_profiling; let try_result = catch_unwind(move || -> Result<()> { - match this_clone.execute_single_thread(thread_num) { + let res = match enable_profiling { + true => this_clone.execute_single_thread::(thread_num), + false => this_clone.execute_single_thread::(thread_num), + }; + + match res { Ok(_) => Ok(()), Err(cause) => { if log::max_level() == LevelFilter::Trace { @@ -384,7 +393,10 @@ impl PipelineExecutor { /// # Safety /// /// Method is thread unsafe and require thread safe call - pub unsafe fn execute_single_thread(&self, thread_num: usize) -> Result<()> { + pub unsafe fn execute_single_thread( + &self, + thread_num: usize, + ) -> Result<()> { let workers_condvar = self.workers_condvar.clone(); let mut context = ExecutorWorkerContext::create( thread_num, @@ -399,13 +411,34 @@ impl PipelineExecutor { } while !self.global_tasks_queue.is_finished() && context.has_task() { - if let Some(executed_pid) = context.execute_task()? { - // Not scheduled graph if pipeline is finished. - if !self.global_tasks_queue.is_finished() { - // We immediately schedule the processor again. - let schedule_queue = self.graph.schedule_queue(executed_pid)?; - schedule_queue.schedule(&self.global_tasks_queue, &mut context, self); + let (executed_pid, is_async, elapsed) = + context.execute_task::()?; + + if ENABLE_PROFILING { + let node = self.graph.get_node(executed_pid); + if let Some(elapsed) = elapsed { + let nanos = elapsed.as_nanos(); + assume(nanos < 18446744073709551615_u128); + + if is_async { + node.profile + .wait_time + .fetch_add(nanos as u64, Ordering::Relaxed); + } else { + node.profile + .cpu_time + .fetch_add(nanos as u64, Ordering::Relaxed); + } } + + node.processor.record_profile(&node.profile); + } + + // Not scheduled graph if pipeline is finished. + if !self.global_tasks_queue.is_finished() { + // We immediately schedule the processor again. + let schedule_queue = self.graph.schedule_queue(executed_pid)?; + schedule_queue.schedule(&self.global_tasks_queue, &mut context, self); } } } @@ -416,6 +449,10 @@ impl PipelineExecutor { pub fn format_graph_nodes(&self) -> String { self.graph.format_graph_nodes() } + + pub fn get_profiles(&self) -> Vec> { + self.graph.get_proc_profiles() + } } impl Drop for PipelineExecutor { diff --git a/src/query/service/src/pipelines/executor/processor_async_task.rs b/src/query/service/src/pipelines/executor/processor_async_task.rs index b27a6922b82cf..d52eb7b07fb2b 100644 --- a/src/query/service/src/pipelines/executor/processor_async_task.rs +++ b/src/query/service/src/pipelines/executor/processor_async_task.rs @@ -40,7 +40,7 @@ pub struct ProcessorAsyncTask { processor_id: NodeIndex, queue: Arc, workers_condvar: Arc, - inner: BoxFuture<'static, Result<()>>, + inner: BoxFuture<'static, (Duration, Result<()>)>, } impl ProcessorAsyncTask { @@ -88,7 +88,7 @@ impl ProcessorAsyncTask { ); } Either::Right((res, _)) => { - return res; + return (start.elapsed(), res); } } } @@ -116,17 +116,22 @@ impl Future for ProcessorAsyncTask { match catch_unwind(move || inner.poll(cx)) { Ok(Poll::Pending) => Poll::Pending, - Ok(Poll::Ready(res)) => { + Ok(Poll::Ready((elapsed, res))) => { self.queue.completed_async_task( self.workers_condvar.clone(), - CompletedAsyncTask::create(self.processor_id, self.worker_id, res), + CompletedAsyncTask::create( + self.processor_id, + self.worker_id, + res, + Some(elapsed), + ), ); Poll::Ready(()) } Err(cause) => { self.queue.completed_async_task( self.workers_condvar.clone(), - CompletedAsyncTask::create(self.processor_id, self.worker_id, Err(cause)), + CompletedAsyncTask::create(self.processor_id, self.worker_id, Err(cause), None), ); Poll::Ready(()) diff --git a/src/query/service/tests/it/pipelines/executor/pipeline_executor.rs b/src/query/service/tests/it/pipelines/executor/pipeline_executor.rs index 084f4ba7be25e..83c72971a6b4e 100644 --- a/src/query/service/tests/it/pipelines/executor/pipeline_executor.rs +++ b/src/query/service/tests/it/pipelines/executor/pipeline_executor.rs @@ -39,6 +39,7 @@ use databend_query::test_kits::create_query_context; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_always_call_on_finished() -> Result<()> { let settings = ExecutorSettings { + enable_profiling: false, query_id: Arc::new("".to_string()), max_execute_time_in_seconds: Default::default(), };