Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(executor): processor level profile(part 1) #13637

Merged
merged 4 commits into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/query/pipeline/core/src/processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub mod processor;

mod duplicate_processor;
mod port_trigger;
pub mod profile;
mod resize_processor;
mod shuffle_processor;

Expand Down
9 changes: 9 additions & 0 deletions src/query/pipeline/core/src/processors/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ use minitrace::prelude::*;
use petgraph::graph::node_index;
use petgraph::prelude::NodeIndex;

use crate::processors::profile::Profile;

#[derive(Debug)]
pub enum Event {
NeedData,
Expand Down Expand Up @@ -78,6 +80,8 @@ pub trait Processor: Send {
async fn async_process(&mut self) -> Result<()> {
Err(ErrorCode::Unimplemented("Unimplemented async_process."))
}

fn record_profile(&self, _profile: &Profile) {}
}

#[derive(Clone)]
Expand Down Expand Up @@ -128,6 +132,11 @@ impl ProcessorPtr {
(*self.inner.get()).un_reacted(cause, self.id().index())
}

/// # Safety
pub unsafe fn record_profile(&self, profile: &Profile) {
(*self.inner.get()).record_profile(profile)
}

/// # Safety
pub unsafe fn interrupt(&self) {
(*self.inner.get()).interrupt()
Expand Down
40 changes: 40 additions & 0 deletions src/query/pipeline/core/src/processors/profile.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::atomic::AtomicU64;

#[derive(Default)]
pub struct Profile {
/// The id of processor
pub pid: usize,
/// The name of processor
pub p_name: String,

/// The time spent to process in nanoseconds
pub cpu_time: AtomicU64,
/// The time spent to wait in nanoseconds, usually used to
/// measure the time spent on waiting for I/O
pub wait_time: AtomicU64,
}

impl Profile {
pub fn create(pid: usize, p_name: String) -> Profile {
Profile {
pid,
p_name,
cpu_time: AtomicU64::new(0),
wait_time: AtomicU64::new(0),
}
}
}
26 changes: 23 additions & 3 deletions src/query/service/src/pipelines/executor/executor_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use common_base::runtime::TrySpawn;
use common_exception::ErrorCode;
use common_exception::Result;
use common_pipeline_core::processors::processor::EventCause;
use common_pipeline_core::processors::profile::Profile;
use log::debug;
use log::trace;
use minitrace::prelude::*;
Expand Down Expand Up @@ -62,27 +63,31 @@ struct EdgeInfo {
output_index: usize,
}

struct Node {
pub(crate) struct Node {
state: std::sync::Mutex<State>,
processor: ProcessorPtr,
pub(crate) processor: ProcessorPtr,

pub(crate) profile: Arc<Profile>,
updated_list: Arc<UpdateList>,
inputs_port: Vec<Arc<InputPort>>,
outputs_port: Vec<Arc<OutputPort>>,
}

impl Node {
pub fn create(
pid: usize,
processor: &ProcessorPtr,
inputs_port: &[Arc<InputPort>],
outputs_port: &[Arc<OutputPort>],
) -> Arc<Node> {
let p_name = unsafe { processor.name() };
Arc::new(Node {
state: std::sync::Mutex::new(State::Idle),
processor: processor.clone(),
updated_list: UpdateList::create(),
inputs_port: inputs_port.to_vec(),
outputs_port: outputs_port.to_vec(),
profile: Arc::new(Profile::create(pid, p_name)),
})
}

Expand Down Expand Up @@ -145,7 +150,9 @@ impl ExecutingGraph {
let mut pipe_edges = Vec::with_capacity(pipe.output_length);

for item in &pipe.items {
let node = Node::create(&item.processor, &item.inputs_port, &item.outputs_port);
let pid = graph.node_count();
let node =
Node::create(pid, &item.processor, &item.inputs_port, &item.outputs_port);

let graph_node_index = graph.add_node(node.clone());
unsafe {
Expand Down Expand Up @@ -229,6 +236,7 @@ impl ExecutingGraph {
let mut need_schedule_edges = VecDeque::new();

need_schedule_nodes.push_back(index);

while !need_schedule_nodes.is_empty() || !need_schedule_edges.is_empty() {
// To avoid lock too many times, we will try to cache lock.
let mut state_guard_cache = None;
Expand Down Expand Up @@ -426,6 +434,18 @@ impl RunningGraph {
Ok(schedule_queue)
}

pub(crate) fn get_node(&self, pid: NodeIndex) -> &Node {
&self.0.graph[pid]
}

pub fn get_proc_profiles(&self) -> Vec<Arc<Profile>> {
self.0
.graph
.node_weights()
.map(|x| x.profile.clone())
.collect::<Vec<_>>()
}

pub fn interrupt_running_nodes(&self) {
unsafe {
for node_index in self.0.graph.node_indices() {
Expand Down
3 changes: 3 additions & 0 deletions src/query/service/src/pipelines/executor/executor_settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@ use common_settings::Settings;
#[derive(Clone)]
pub struct ExecutorSettings {
pub query_id: Arc<String>,
pub enable_profiling: bool,
pub max_execute_time_in_seconds: Duration,
}

impl ExecutorSettings {
pub fn try_create(settings: &Settings, query_id: String) -> Result<ExecutorSettings> {
let enable_profiling = settings.get_enable_query_profiling()?;
let max_execute_time_in_seconds = settings.get_max_execute_time_in_seconds()?;
Ok(ExecutorSettings {
enable_profiling,
query_id: Arc::new(query_id),
max_execute_time_in_seconds: Duration::from_secs(max_execute_time_in_seconds),
})
Expand Down
16 changes: 14 additions & 2 deletions src/query/service/src/pipelines/executor/executor_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::collections::VecDeque;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;

use common_base::base::tokio::sync::Notify;
use common_exception::Result;
Expand Down Expand Up @@ -197,11 +198,22 @@ pub struct CompletedAsyncTask {
pub id: NodeIndex,
pub worker_id: usize,
pub res: Result<()>,
pub elapsed: Option<Duration>,
}

impl CompletedAsyncTask {
pub fn create(id: NodeIndex, worker_id: usize, res: Result<()>) -> Self {
CompletedAsyncTask { id, worker_id, res }
pub fn create(
id: NodeIndex,
worker_id: usize,
res: Result<()>,
elapsed: Option<Duration>,
) -> Self {
CompletedAsyncTask {
id,
worker_id,
res,
elapsed,
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
use std::fmt::Debug;
use std::fmt::Formatter;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;

use common_exception::ErrorCode;
use common_exception::Result;
Expand Down Expand Up @@ -67,20 +69,34 @@ impl ExecutorWorkerContext {
std::mem::replace(&mut self.task, ExecutorTask::None)
}

pub unsafe fn execute_task(&mut self) -> Result<Option<NodeIndex>> {
pub unsafe fn execute_task<const ENABLE_PROFILING: bool>(
&mut self,
) -> Result<(NodeIndex, bool, Option<Duration>)> {
match std::mem::replace(&mut self.task, ExecutorTask::None) {
ExecutorTask::None => Err(ErrorCode::Internal("Execute none task.")),
ExecutorTask::Sync(processor) => self.execute_sync_task(processor),
ExecutorTask::Sync(processor) => self.execute_sync_task::<ENABLE_PROFILING>(processor),
ExecutorTask::AsyncCompleted(task) => match task.res {
Ok(_) => Ok(Some(task.id)),
Ok(_) => Ok((task.id, true, task.elapsed)),
Err(cause) => Err(cause),
},
}
}

unsafe fn execute_sync_task(&mut self, processor: ProcessorPtr) -> Result<Option<NodeIndex>> {
processor.process()?;
Ok(Some(processor.id()))
unsafe fn execute_sync_task<const ENABLE_PROFILING: bool>(
&mut self,
proc: ProcessorPtr,
) -> Result<(NodeIndex, bool, Option<Duration>)> {
match ENABLE_PROFILING {
true => {
let instant = Instant::now();
proc.process()?;
Ok((proc.id(), false, Some(instant.elapsed())))
}
false => {
proc.process()?;
Ok((proc.id(), false, None))
}
}
}

pub fn get_workers_condvar(&self) -> &Arc<WorkersCondvar> {
Expand Down
53 changes: 45 additions & 8 deletions src/query/service/src/pipelines/executor/pipeline_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::intrinsics::assume;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Instant;

Expand All @@ -26,6 +28,7 @@ use common_base::runtime::TrySpawn;
use common_base::GLOBAL_TASK;
use common_exception::ErrorCode;
use common_exception::Result;
use common_pipeline_core::processors::profile::Profile;
use common_pipeline_core::LockGuard;
use futures::future::select;
use futures_util::future::Either;
Expand Down Expand Up @@ -354,8 +357,14 @@ impl PipelineExecutor {
thread_join_handles.push(Thread::named_spawn(Some(name), move || unsafe {
let _g = span.set_local_parent();
let this_clone = this.clone();
let enable_profiling = this.settings.enable_profiling;
let try_result = catch_unwind(move || -> Result<()> {
match this_clone.execute_single_thread(thread_num) {
let res = match enable_profiling {
true => this_clone.execute_single_thread::<true>(thread_num),
false => this_clone.execute_single_thread::<false>(thread_num),
};

match res {
Ok(_) => Ok(()),
Err(cause) => {
if log::max_level() == LevelFilter::Trace {
Expand Down Expand Up @@ -384,7 +393,10 @@ impl PipelineExecutor {
/// # Safety
///
/// Method is thread unsafe and require thread safe call
pub unsafe fn execute_single_thread(&self, thread_num: usize) -> Result<()> {
pub unsafe fn execute_single_thread<const ENABLE_PROFILING: bool>(
&self,
thread_num: usize,
) -> Result<()> {
let workers_condvar = self.workers_condvar.clone();
let mut context = ExecutorWorkerContext::create(
thread_num,
Expand All @@ -399,13 +411,34 @@ impl PipelineExecutor {
}

while !self.global_tasks_queue.is_finished() && context.has_task() {
if let Some(executed_pid) = context.execute_task()? {
// Not scheduled graph if pipeline is finished.
if !self.global_tasks_queue.is_finished() {
// We immediately schedule the processor again.
let schedule_queue = self.graph.schedule_queue(executed_pid)?;
schedule_queue.schedule(&self.global_tasks_queue, &mut context, self);
let (executed_pid, is_async, elapsed) =
context.execute_task::<ENABLE_PROFILING>()?;

if ENABLE_PROFILING {
let node = self.graph.get_node(executed_pid);
if let Some(elapsed) = elapsed {
let nanos = elapsed.as_nanos();
assume(nanos < 18446744073709551615_u128);

if is_async {
node.profile
.wait_time
.fetch_add(nanos as u64, Ordering::Relaxed);
} else {
node.profile
.cpu_time
.fetch_add(nanos as u64, Ordering::Relaxed);
}
}

node.processor.record_profile(&node.profile);
}

// Not scheduled graph if pipeline is finished.
if !self.global_tasks_queue.is_finished() {
// We immediately schedule the processor again.
let schedule_queue = self.graph.schedule_queue(executed_pid)?;
schedule_queue.schedule(&self.global_tasks_queue, &mut context, self);
}
}
}
Expand All @@ -416,6 +449,10 @@ impl PipelineExecutor {
pub fn format_graph_nodes(&self) -> String {
self.graph.format_graph_nodes()
}

pub fn get_profiles(&self) -> Vec<Arc<Profile>> {
self.graph.get_proc_profiles()
}
}

impl Drop for PipelineExecutor {
Expand Down
15 changes: 10 additions & 5 deletions src/query/service/src/pipelines/executor/processor_async_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub struct ProcessorAsyncTask {
processor_id: NodeIndex,
queue: Arc<ExecutorTasksQueue>,
workers_condvar: Arc<WorkersCondvar>,
inner: BoxFuture<'static, Result<()>>,
inner: BoxFuture<'static, (Duration, Result<()>)>,
}

impl ProcessorAsyncTask {
Expand Down Expand Up @@ -88,7 +88,7 @@ impl ProcessorAsyncTask {
);
}
Either::Right((res, _)) => {
return res;
return (start.elapsed(), res);
}
}
}
Expand Down Expand Up @@ -116,17 +116,22 @@ impl Future for ProcessorAsyncTask {

match catch_unwind(move || inner.poll(cx)) {
Ok(Poll::Pending) => Poll::Pending,
Ok(Poll::Ready(res)) => {
Ok(Poll::Ready((elapsed, res))) => {
self.queue.completed_async_task(
self.workers_condvar.clone(),
CompletedAsyncTask::create(self.processor_id, self.worker_id, res),
CompletedAsyncTask::create(
self.processor_id,
self.worker_id,
res,
Some(elapsed),
),
);
Poll::Ready(())
}
Err(cause) => {
self.queue.completed_async_task(
self.workers_condvar.clone(),
CompletedAsyncTask::create(self.processor_id, self.worker_id, Err(cause)),
CompletedAsyncTask::create(self.processor_id, self.worker_id, Err(cause), None),
);

Poll::Ready(())
Expand Down
Loading