Skip to content

Commit

Permalink
feat(executor): Introduce QueryProfileManager to collect query prof…
Browse files Browse the repository at this point in the history
…ilings (databendlabs#11760)

* introduce profile manager to collect query profilings

* fix license header

* fix license header

* format
  • Loading branch information
leiysky authored and andylokandy committed Nov 27, 2023
1 parent c751983 commit 3cb790b
Show file tree
Hide file tree
Showing 36 changed files with 882 additions and 219 deletions.
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ members = [
"src/common/openai",
"src/common/tracing",
"src/common/storage",
"src/common/profile",
"src/common/vector",
"src/common/license",
# Query
Expand All @@ -45,6 +44,7 @@ members = [
"src/query/pipeline/sinks",
"src/query/pipeline/sources",
"src/query/pipeline/transforms",
"src/query/profile",
"src/query/settings",
"src/query/sql",
"src/query/storages/common/blocks",
Expand Down
71 changes: 0 additions & 71 deletions src/common/profile/src/span.rs

This file was deleted.

1 change: 1 addition & 0 deletions src/query/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ common-io = { path = "../../common/io" }
common-meta-app = { path = "../../meta/app" }
common-meta-types = { path = "../../meta/types" }
common-pipeline-core = { path = "../pipeline/core" }
common-profile = { path = "../profile" }
common-settings = { path = "../settings" }
common-storage = { path = "../../common/storage" }

Expand Down
3 changes: 3 additions & 0 deletions src/query/catalog/src/table_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use common_meta_app::principal::OnErrorMode;
use common_meta_app::principal::RoleInfo;
use common_meta_app::principal::UserInfo;
use common_pipeline_core::InputError;
use common_profile::QueryProfileManager;
use common_settings::ChangeValue;
use common_settings::Settings;
use common_storage::DataOperator;
Expand Down Expand Up @@ -135,6 +136,8 @@ pub trait TableContext: Send + Sync {
fn apply_changed_settings(&self, changes: HashMap<String, ChangeValue>) -> Result<()>;
fn get_changed_settings(&self) -> HashMap<String, ChangeValue>;

fn get_query_profile_manager(&self) -> Arc<QueryProfileManager>;

// Get the storage data accessor operator from the session manager.
fn get_data_operator(&self) -> Result<DataOperator>;

Expand Down
2 changes: 1 addition & 1 deletion src/query/pipeline/transforms/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ common-base = { path = "../../../common/base" }
common-exception = { path = "../../../common/exception" }
common-expression = { path = "../../expression" }
common-pipeline-core = { path = "../core" }
common-profile = { path = "../../../common/profile" }
common-profile = { path = "../../profile" }
match-template = "0.0.1"

async-backtrace = { workspace = true }
Expand Down
18 changes: 9 additions & 9 deletions src/query/pipeline/transforms/src/processors/profile_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@ use std::time::Instant;
use common_exception::Result;
use common_pipeline_core::processors::processor::Event;
use common_pipeline_core::processors::Processor;
use common_profile::ProfSpanBuilder;
use common_profile::ProfSpanSetRef;
use common_profile::ProcessorProfile;
use common_profile::SharedProcessorProfiles;

pub struct ProfileWrapper<T> {
inner: T,
prof_span_id: u32,
prof_span_set: ProfSpanSetRef,
prof_span_builder: ProfSpanBuilder,
prof_span_set: SharedProcessorProfiles,

prof: ProcessorProfile,
}

impl<T> ProfileWrapper<T>
Expand All @@ -33,13 +34,13 @@ where T: Processor + 'static
pub fn create(
inner: T,
prof_span_id: u32,
prof_span_set: ProfSpanSetRef,
prof_span_set: SharedProcessorProfiles,
) -> Box<dyn Processor> {
Box::new(Self {
inner,
prof_span_id,
prof_span_set,
prof_span_builder: ProfSpanBuilder::default(),
prof: ProcessorProfile::default(),
})
}
}
Expand All @@ -62,7 +63,7 @@ where T: Processor + 'static
self.prof_span_set
.lock()
.unwrap()
.update(self.prof_span_id, self.prof_span_builder.clone().finish());
.update(self.prof_span_id, self.prof);
Ok(Event::Finished)
}
v => Ok(v),
Expand All @@ -73,8 +74,7 @@ where T: Processor + 'static
let instant = Instant::now();
self.inner.process()?;
let elapsed = instant.elapsed();
self.prof_span_builder
.accumulate_process_time(elapsed.as_nanos() as u64);
self.prof = self.prof + ProcessorProfile { cpu_time: elapsed };
Ok(())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use common_expression::DataSchemaRef;
use common_expression::SortColumnDescription;
use common_pipeline_core::processors::processor::ProcessorPtr;
use common_pipeline_core::Pipeline;
use common_profile::ProfSpanSetRef;
use common_profile::SharedProcessorProfiles;

use super::transform_multi_sort_merge::try_add_multi_sort_merge;
use super::transform_sort_merge::try_create_transform_sort_merge;
Expand All @@ -33,7 +33,7 @@ pub fn build_full_sort_pipeline(
limit: Option<usize>,
partial_block_size: usize,
final_block_size: usize,
prof_info: Option<(u32, ProfSpanSetRef)>,
prof_info: Option<(u32, SharedProcessorProfiles)>,
after_exchange: bool,
) -> Result<()> {
// Partial sort
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,8 @@ edition = { workspace = true }
[lib]
doctest = false
test = false

[dependencies]
common-base = { path = "../../common/base" }

dashmap = "5.4"
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod span;
pub use span::*;
mod mgr;
mod proc;
mod prof;

pub use mgr::QueryProfileManager;
pub use proc::*;
pub use prof::*;
134 changes: 134 additions & 0 deletions src/query/profile/src/mgr.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::VecDeque;
use std::sync::Arc;
use std::sync::Mutex;

use common_base::base::GlobalInstance;
use dashmap::mapref::entry::Entry;
use dashmap::DashMap;

use crate::prof::QueryProfile;

/// Default capacity of the LRU cache of query profiles.
const DEFAULT_QUERY_PROFILE_LIMIT: usize = 20;

/// Manager of query profiling.
/// This is a singleton in every databend-query process.
pub struct QueryProfileManager {
/// The LRU cache of query profiles.
profiles: Lru,
}

impl QueryProfileManager {
fn new(capacity: usize) -> Self {
QueryProfileManager {
profiles: Lru::new(capacity),
}
}

pub fn init() {
GlobalInstance::set(Arc::new(Self::new(DEFAULT_QUERY_PROFILE_LIMIT)));
}

pub fn instance() -> Arc<Self> {
GlobalInstance::get()
}

/// Try to get the query profile by query ID.
pub fn get(&self, query_id: &str) -> Option<Arc<QueryProfile>> {
self.profiles.get(query_id)
}

/// Inserts a query profile.
pub fn insert(&self, query_profile: Arc<QueryProfile>) {
self.profiles
.insert(query_profile.query_id.clone(), query_profile);
}

/// Lists all query profiles.
pub fn list_all(&self) -> Vec<Arc<QueryProfile>> {
self.profiles.list_all()
}
}

impl Default for QueryProfileManager {
fn default() -> Self {
QueryProfileManager::new(DEFAULT_QUERY_PROFILE_LIMIT)
}
}

/// An LRU cache of query profiles. The expired query profiles
/// will be removed.
struct Lru {
/// The maximum number of query profiles to keep in memory.
/// If the number of query profiles exceeds this number,
/// the oldest one will be removed.
capacity: usize,

/// The query profiles.
/// The key is the query ID.
/// The value is the query profile.
profiles: DashMap<String, Arc<QueryProfile>>,

/// An LRU list of query IDs.
lru: Mutex<VecDeque<String>>,
}

impl Lru {
/// Creates a new LRU cache.
pub fn new(capacity: usize) -> Self {
Lru {
capacity,
profiles: DashMap::with_capacity(capacity),
lru: Mutex::new(VecDeque::with_capacity(capacity)),
}
}

/// Gets the query profile by the query ID.
/// Notice that this method required to acquire the shared lock of the LRU list.
/// So don't call this method when the lock is already acquired.
pub fn get(&self, query_id: &str) -> Option<Arc<QueryProfile>> {
self.profiles.get(query_id).map(|v| v.value().clone())
}

/// Inserts a query profile.
/// This operation is thread-safe.
pub fn insert(&self, query_id: String, query_profile: Arc<QueryProfile>) {
// Lock the LRU list to ensure the consistency between the LRU list and the query profiles.
let mut lru = self.lru.lock().unwrap();

if let Entry::Occupied(mut prof) = self.profiles.entry(query_id.clone()) {
prof.insert(query_profile);
return;
}

if self.profiles.len() >= self.capacity {
if let Some(query_id) = lru.pop_front() {
self.profiles.remove(&query_id);
}
}

self.profiles.insert(query_id.clone(), query_profile);
lru.push_back(query_id);
}

/// Lists all query profiles.
/// Notice that this method required to acquire the shared lock of the LRU list.
/// So don't call this method when the lock is already acquired.
pub fn list_all(&self) -> Vec<Arc<QueryProfile>> {
self.profiles.iter().map(|v| v.value().clone()).collect()
}
}
Loading

0 comments on commit 3cb790b

Please sign in to comment.