Skip to content

Commit

Permalink
feat(executor): processor level profile(part 1) (#13637)
Browse files Browse the repository at this point in the history
* feat(executor): processor level profile

* feat(executor): processor level profile

* feat(executor): processor level profile

* feat(executor): processor level profile
  • Loading branch information
zhang2014 authored Nov 8, 2023
1 parent 09edee5 commit 48dbda1
Show file tree
Hide file tree
Showing 10 changed files with 168 additions and 24 deletions.
1 change: 1 addition & 0 deletions src/query/pipeline/core/src/processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub mod processor;

mod duplicate_processor;
mod port_trigger;
pub mod profile;
mod resize_processor;
mod shuffle_processor;

Expand Down
9 changes: 9 additions & 0 deletions src/query/pipeline/core/src/processors/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ use minitrace::prelude::*;
use petgraph::graph::node_index;
use petgraph::prelude::NodeIndex;

use crate::processors::profile::Profile;

#[derive(Debug)]
pub enum Event {
NeedData,
Expand Down Expand Up @@ -78,6 +80,8 @@ pub trait Processor: Send {
async fn async_process(&mut self) -> Result<()> {
Err(ErrorCode::Unimplemented("Unimplemented async_process."))
}

fn record_profile(&self, _profile: &Profile) {}
}

#[derive(Clone)]
Expand Down Expand Up @@ -128,6 +132,11 @@ impl ProcessorPtr {
(*self.inner.get()).un_reacted(cause, self.id().index())
}

/// # Safety
pub unsafe fn record_profile(&self, profile: &Profile) {
(*self.inner.get()).record_profile(profile)
}

/// # Safety
pub unsafe fn interrupt(&self) {
(*self.inner.get()).interrupt()
Expand Down
40 changes: 40 additions & 0 deletions src/query/pipeline/core/src/processors/profile.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::atomic::AtomicU64;

#[derive(Default)]
pub struct Profile {
/// The id of processor
pub pid: usize,
/// The name of processor
pub p_name: String,

/// The time spent to process in nanoseconds
pub cpu_time: AtomicU64,
/// The time spent to wait in nanoseconds, usually used to
/// measure the time spent on waiting for I/O
pub wait_time: AtomicU64,
}

impl Profile {
pub fn create(pid: usize, p_name: String) -> Profile {
Profile {
pid,
p_name,
cpu_time: AtomicU64::new(0),
wait_time: AtomicU64::new(0),
}
}
}
26 changes: 23 additions & 3 deletions src/query/service/src/pipelines/executor/executor_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use common_base::runtime::TrySpawn;
use common_exception::ErrorCode;
use common_exception::Result;
use common_pipeline_core::processors::processor::EventCause;
use common_pipeline_core::processors::profile::Profile;
use log::debug;
use log::trace;
use minitrace::prelude::*;
Expand Down Expand Up @@ -62,27 +63,31 @@ struct EdgeInfo {
output_index: usize,
}

struct Node {
pub(crate) struct Node {
state: std::sync::Mutex<State>,
processor: ProcessorPtr,
pub(crate) processor: ProcessorPtr,

pub(crate) profile: Arc<Profile>,
updated_list: Arc<UpdateList>,
inputs_port: Vec<Arc<InputPort>>,
outputs_port: Vec<Arc<OutputPort>>,
}

impl Node {
pub fn create(
pid: usize,
processor: &ProcessorPtr,
inputs_port: &[Arc<InputPort>],
outputs_port: &[Arc<OutputPort>],
) -> Arc<Node> {
let p_name = unsafe { processor.name() };
Arc::new(Node {
state: std::sync::Mutex::new(State::Idle),
processor: processor.clone(),
updated_list: UpdateList::create(),
inputs_port: inputs_port.to_vec(),
outputs_port: outputs_port.to_vec(),
profile: Arc::new(Profile::create(pid, p_name)),
})
}

Expand Down Expand Up @@ -145,7 +150,9 @@ impl ExecutingGraph {
let mut pipe_edges = Vec::with_capacity(pipe.output_length);

for item in &pipe.items {
let node = Node::create(&item.processor, &item.inputs_port, &item.outputs_port);
let pid = graph.node_count();
let node =
Node::create(pid, &item.processor, &item.inputs_port, &item.outputs_port);

let graph_node_index = graph.add_node(node.clone());
unsafe {
Expand Down Expand Up @@ -229,6 +236,7 @@ impl ExecutingGraph {
let mut need_schedule_edges = VecDeque::new();

need_schedule_nodes.push_back(index);

while !need_schedule_nodes.is_empty() || !need_schedule_edges.is_empty() {
// To avoid lock too many times, we will try to cache lock.
let mut state_guard_cache = None;
Expand Down Expand Up @@ -426,6 +434,18 @@ impl RunningGraph {
Ok(schedule_queue)
}

pub(crate) fn get_node(&self, pid: NodeIndex) -> &Node {
&self.0.graph[pid]
}

pub fn get_proc_profiles(&self) -> Vec<Arc<Profile>> {
self.0
.graph
.node_weights()
.map(|x| x.profile.clone())
.collect::<Vec<_>>()
}

pub fn interrupt_running_nodes(&self) {
unsafe {
for node_index in self.0.graph.node_indices() {
Expand Down
3 changes: 3 additions & 0 deletions src/query/service/src/pipelines/executor/executor_settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@ use common_settings::Settings;
#[derive(Clone)]
pub struct ExecutorSettings {
pub query_id: Arc<String>,
pub enable_profiling: bool,
pub max_execute_time_in_seconds: Duration,
}

impl ExecutorSettings {
pub fn try_create(settings: &Settings, query_id: String) -> Result<ExecutorSettings> {
let enable_profiling = settings.get_enable_query_profiling()?;
let max_execute_time_in_seconds = settings.get_max_execute_time_in_seconds()?;
Ok(ExecutorSettings {
enable_profiling,
query_id: Arc::new(query_id),
max_execute_time_in_seconds: Duration::from_secs(max_execute_time_in_seconds),
})
Expand Down
16 changes: 14 additions & 2 deletions src/query/service/src/pipelines/executor/executor_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::collections::VecDeque;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;

use common_base::base::tokio::sync::Notify;
use common_exception::Result;
Expand Down Expand Up @@ -197,11 +198,22 @@ pub struct CompletedAsyncTask {
pub id: NodeIndex,
pub worker_id: usize,
pub res: Result<()>,
pub elapsed: Option<Duration>,
}

impl CompletedAsyncTask {
pub fn create(id: NodeIndex, worker_id: usize, res: Result<()>) -> Self {
CompletedAsyncTask { id, worker_id, res }
pub fn create(
id: NodeIndex,
worker_id: usize,
res: Result<()>,
elapsed: Option<Duration>,
) -> Self {
CompletedAsyncTask {
id,
worker_id,
res,
elapsed,
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
use std::fmt::Debug;
use std::fmt::Formatter;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;

use common_exception::ErrorCode;
use common_exception::Result;
Expand Down Expand Up @@ -67,20 +69,34 @@ impl ExecutorWorkerContext {
std::mem::replace(&mut self.task, ExecutorTask::None)
}

pub unsafe fn execute_task(&mut self) -> Result<Option<NodeIndex>> {
pub unsafe fn execute_task<const ENABLE_PROFILING: bool>(
&mut self,
) -> Result<(NodeIndex, bool, Option<Duration>)> {
match std::mem::replace(&mut self.task, ExecutorTask::None) {
ExecutorTask::None => Err(ErrorCode::Internal("Execute none task.")),
ExecutorTask::Sync(processor) => self.execute_sync_task(processor),
ExecutorTask::Sync(processor) => self.execute_sync_task::<ENABLE_PROFILING>(processor),
ExecutorTask::AsyncCompleted(task) => match task.res {
Ok(_) => Ok(Some(task.id)),
Ok(_) => Ok((task.id, true, task.elapsed)),
Err(cause) => Err(cause),
},
}
}

unsafe fn execute_sync_task(&mut self, processor: ProcessorPtr) -> Result<Option<NodeIndex>> {
processor.process()?;
Ok(Some(processor.id()))
unsafe fn execute_sync_task<const ENABLE_PROFILING: bool>(
&mut self,
proc: ProcessorPtr,
) -> Result<(NodeIndex, bool, Option<Duration>)> {
match ENABLE_PROFILING {
true => {
let instant = Instant::now();
proc.process()?;
Ok((proc.id(), false, Some(instant.elapsed())))
}
false => {
proc.process()?;
Ok((proc.id(), false, None))
}
}
}

pub fn get_workers_condvar(&self) -> &Arc<WorkersCondvar> {
Expand Down
53 changes: 45 additions & 8 deletions src/query/service/src/pipelines/executor/pipeline_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::intrinsics::assume;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Instant;

Expand All @@ -26,6 +28,7 @@ use common_base::runtime::TrySpawn;
use common_base::GLOBAL_TASK;
use common_exception::ErrorCode;
use common_exception::Result;
use common_pipeline_core::processors::profile::Profile;
use common_pipeline_core::LockGuard;
use futures::future::select;
use futures_util::future::Either;
Expand Down Expand Up @@ -354,8 +357,14 @@ impl PipelineExecutor {
thread_join_handles.push(Thread::named_spawn(Some(name), move || unsafe {
let _g = span.set_local_parent();
let this_clone = this.clone();
let enable_profiling = this.settings.enable_profiling;
let try_result = catch_unwind(move || -> Result<()> {
match this_clone.execute_single_thread(thread_num) {
let res = match enable_profiling {
true => this_clone.execute_single_thread::<true>(thread_num),
false => this_clone.execute_single_thread::<false>(thread_num),
};

match res {
Ok(_) => Ok(()),
Err(cause) => {
if log::max_level() == LevelFilter::Trace {
Expand Down Expand Up @@ -384,7 +393,10 @@ impl PipelineExecutor {
/// # Safety
///
/// Method is thread unsafe and require thread safe call
pub unsafe fn execute_single_thread(&self, thread_num: usize) -> Result<()> {
pub unsafe fn execute_single_thread<const ENABLE_PROFILING: bool>(
&self,
thread_num: usize,
) -> Result<()> {
let workers_condvar = self.workers_condvar.clone();
let mut context = ExecutorWorkerContext::create(
thread_num,
Expand All @@ -399,13 +411,34 @@ impl PipelineExecutor {
}

while !self.global_tasks_queue.is_finished() && context.has_task() {
if let Some(executed_pid) = context.execute_task()? {
// Not scheduled graph if pipeline is finished.
if !self.global_tasks_queue.is_finished() {
// We immediately schedule the processor again.
let schedule_queue = self.graph.schedule_queue(executed_pid)?;
schedule_queue.schedule(&self.global_tasks_queue, &mut context, self);
let (executed_pid, is_async, elapsed) =
context.execute_task::<ENABLE_PROFILING>()?;

if ENABLE_PROFILING {
let node = self.graph.get_node(executed_pid);
if let Some(elapsed) = elapsed {
let nanos = elapsed.as_nanos();
assume(nanos < 18446744073709551615_u128);

if is_async {
node.profile
.wait_time
.fetch_add(nanos as u64, Ordering::Relaxed);
} else {
node.profile
.cpu_time
.fetch_add(nanos as u64, Ordering::Relaxed);
}
}

node.processor.record_profile(&node.profile);
}

// Not scheduled graph if pipeline is finished.
if !self.global_tasks_queue.is_finished() {
// We immediately schedule the processor again.
let schedule_queue = self.graph.schedule_queue(executed_pid)?;
schedule_queue.schedule(&self.global_tasks_queue, &mut context, self);
}
}
}
Expand All @@ -416,6 +449,10 @@ impl PipelineExecutor {
pub fn format_graph_nodes(&self) -> String {
self.graph.format_graph_nodes()
}

pub fn get_profiles(&self) -> Vec<Arc<Profile>> {
self.graph.get_proc_profiles()
}
}

impl Drop for PipelineExecutor {
Expand Down
15 changes: 10 additions & 5 deletions src/query/service/src/pipelines/executor/processor_async_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub struct ProcessorAsyncTask {
processor_id: NodeIndex,
queue: Arc<ExecutorTasksQueue>,
workers_condvar: Arc<WorkersCondvar>,
inner: BoxFuture<'static, Result<()>>,
inner: BoxFuture<'static, (Duration, Result<()>)>,
}

impl ProcessorAsyncTask {
Expand Down Expand Up @@ -88,7 +88,7 @@ impl ProcessorAsyncTask {
);
}
Either::Right((res, _)) => {
return res;
return (start.elapsed(), res);
}
}
}
Expand Down Expand Up @@ -116,17 +116,22 @@ impl Future for ProcessorAsyncTask {

match catch_unwind(move || inner.poll(cx)) {
Ok(Poll::Pending) => Poll::Pending,
Ok(Poll::Ready(res)) => {
Ok(Poll::Ready((elapsed, res))) => {
self.queue.completed_async_task(
self.workers_condvar.clone(),
CompletedAsyncTask::create(self.processor_id, self.worker_id, res),
CompletedAsyncTask::create(
self.processor_id,
self.worker_id,
res,
Some(elapsed),
),
);
Poll::Ready(())
}
Err(cause) => {
self.queue.completed_async_task(
self.workers_condvar.clone(),
CompletedAsyncTask::create(self.processor_id, self.worker_id, Err(cause)),
CompletedAsyncTask::create(self.processor_id, self.worker_id, Err(cause), None),
);

Poll::Ready(())
Expand Down
Loading

0 comments on commit 48dbda1

Please sign in to comment.