Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(executor): Introduce QueryProfileManager to collect query profilings #11760

Merged
merged 4 commits into from
Jun 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ members = [
"src/common/openai",
"src/common/tracing",
"src/common/storage",
"src/common/profile",
"src/common/vector",
"src/common/license",
# Query
Expand All @@ -45,6 +44,7 @@ members = [
"src/query/pipeline/sinks",
"src/query/pipeline/sources",
"src/query/pipeline/transforms",
"src/query/profile",
"src/query/settings",
"src/query/sql",
"src/query/storages/common/blocks",
Expand Down
71 changes: 0 additions & 71 deletions src/common/profile/src/span.rs

This file was deleted.

1 change: 1 addition & 0 deletions src/query/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ common-io = { path = "../../common/io" }
common-meta-app = { path = "../../meta/app" }
common-meta-types = { path = "../../meta/types" }
common-pipeline-core = { path = "../pipeline/core" }
common-profile = { path = "../profile" }
common-settings = { path = "../settings" }
common-storage = { path = "../../common/storage" }

Expand Down
3 changes: 3 additions & 0 deletions src/query/catalog/src/table_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use common_meta_app::principal::OnErrorMode;
use common_meta_app::principal::RoleInfo;
use common_meta_app::principal::UserInfo;
use common_pipeline_core::InputError;
use common_profile::QueryProfileManager;
use common_settings::ChangeValue;
use common_settings::Settings;
use common_storage::DataOperator;
Expand Down Expand Up @@ -135,6 +136,8 @@ pub trait TableContext: Send + Sync {
fn apply_changed_settings(&self, changes: HashMap<String, ChangeValue>) -> Result<()>;
fn get_changed_settings(&self) -> HashMap<String, ChangeValue>;

fn get_query_profile_manager(&self) -> Arc<QueryProfileManager>;

// Get the storage data accessor operator from the session manager.
fn get_data_operator(&self) -> Result<DataOperator>;

Expand Down
2 changes: 1 addition & 1 deletion src/query/pipeline/transforms/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ common-base = { path = "../../../common/base" }
common-exception = { path = "../../../common/exception" }
common-expression = { path = "../../expression" }
common-pipeline-core = { path = "../core" }
common-profile = { path = "../../../common/profile" }
common-profile = { path = "../../profile" }
match-template = "0.0.1"

async-backtrace = { workspace = true }
Expand Down
18 changes: 9 additions & 9 deletions src/query/pipeline/transforms/src/processors/profile_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@ use std::time::Instant;
use common_exception::Result;
use common_pipeline_core::processors::processor::Event;
use common_pipeline_core::processors::Processor;
use common_profile::ProfSpanBuilder;
use common_profile::ProfSpanSetRef;
use common_profile::ProcessorProfile;
use common_profile::SharedProcessorProfiles;

pub struct ProfileWrapper<T> {
inner: T,
prof_span_id: u32,
prof_span_set: ProfSpanSetRef,
prof_span_builder: ProfSpanBuilder,
prof_span_set: SharedProcessorProfiles,

prof: ProcessorProfile,
}

impl<T> ProfileWrapper<T>
Expand All @@ -33,13 +34,13 @@ where T: Processor + 'static
pub fn create(
inner: T,
prof_span_id: u32,
prof_span_set: ProfSpanSetRef,
prof_span_set: SharedProcessorProfiles,
) -> Box<dyn Processor> {
Box::new(Self {
inner,
prof_span_id,
prof_span_set,
prof_span_builder: ProfSpanBuilder::default(),
prof: ProcessorProfile::default(),
})
}
}
Expand All @@ -62,7 +63,7 @@ where T: Processor + 'static
self.prof_span_set
.lock()
.unwrap()
.update(self.prof_span_id, self.prof_span_builder.clone().finish());
.update(self.prof_span_id, self.prof);
Ok(Event::Finished)
}
v => Ok(v),
Expand All @@ -73,8 +74,7 @@ where T: Processor + 'static
let instant = Instant::now();
self.inner.process()?;
let elapsed = instant.elapsed();
self.prof_span_builder
.accumulate_process_time(elapsed.as_nanos() as u64);
self.prof = self.prof + ProcessorProfile { cpu_time: elapsed };
Ok(())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use common_expression::DataSchemaRef;
use common_expression::SortColumnDescription;
use common_pipeline_core::processors::processor::ProcessorPtr;
use common_pipeline_core::Pipeline;
use common_profile::ProfSpanSetRef;
use common_profile::SharedProcessorProfiles;

use super::transform_multi_sort_merge::try_add_multi_sort_merge;
use super::transform_sort_merge::try_create_transform_sort_merge;
Expand All @@ -33,7 +33,7 @@ pub fn build_full_sort_pipeline(
limit: Option<usize>,
partial_block_size: usize,
final_block_size: usize,
prof_info: Option<(u32, ProfSpanSetRef)>,
prof_info: Option<(u32, SharedProcessorProfiles)>,
after_exchange: bool,
) -> Result<()> {
// Partial sort
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,8 @@ edition = { workspace = true }
[lib]
doctest = false
test = false

[dependencies]
common-base = { path = "../../common/base" }

dashmap = "5.4"
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod span;
pub use span::*;
mod mgr;
mod proc;
mod prof;

pub use mgr::QueryProfileManager;
pub use proc::*;
pub use prof::*;
134 changes: 134 additions & 0 deletions src/query/profile/src/mgr.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::VecDeque;
use std::sync::Arc;
use std::sync::Mutex;

use common_base::base::GlobalInstance;
use dashmap::mapref::entry::Entry;
use dashmap::DashMap;

use crate::prof::QueryProfile;

/// Default capacity of the LRU cache of query profiles.
const DEFAULT_QUERY_PROFILE_LIMIT: usize = 20;

/// Manager of query profiling.
/// This is a singleton in every databend-query process.
pub struct QueryProfileManager {
/// The LRU cache of query profiles.
profiles: Lru,
}

impl QueryProfileManager {
fn new(capacity: usize) -> Self {
QueryProfileManager {
profiles: Lru::new(capacity),
}
}

pub fn init() {
GlobalInstance::set(Arc::new(Self::new(DEFAULT_QUERY_PROFILE_LIMIT)));
}

pub fn instance() -> Arc<Self> {
GlobalInstance::get()
}

/// Try to get the query profile by query ID.
pub fn get(&self, query_id: &str) -> Option<Arc<QueryProfile>> {
self.profiles.get(query_id)
}

/// Inserts a query profile.
pub fn insert(&self, query_profile: Arc<QueryProfile>) {
self.profiles
.insert(query_profile.query_id.clone(), query_profile);
}

/// Lists all query profiles.
pub fn list_all(&self) -> Vec<Arc<QueryProfile>> {
self.profiles.list_all()
}
}

impl Default for QueryProfileManager {
fn default() -> Self {
QueryProfileManager::new(DEFAULT_QUERY_PROFILE_LIMIT)
}
}

/// An LRU cache of query profiles. The expired query profiles
/// will be removed.
struct Lru {
/// The maximum number of query profiles to keep in memory.
/// If the number of query profiles exceeds this number,
/// the oldest one will be removed.
capacity: usize,

/// The query profiles.
/// The key is the query ID.
/// The value is the query profile.
profiles: DashMap<String, Arc<QueryProfile>>,

/// An LRU list of query IDs.
lru: Mutex<VecDeque<String>>,
}

impl Lru {
/// Creates a new LRU cache.
pub fn new(capacity: usize) -> Self {
Lru {
capacity,
profiles: DashMap::with_capacity(capacity),
lru: Mutex::new(VecDeque::with_capacity(capacity)),
}
}

/// Gets the query profile by the query ID.
/// Notice that this method required to acquire the shared lock of the LRU list.
/// So don't call this method when the lock is already acquired.
pub fn get(&self, query_id: &str) -> Option<Arc<QueryProfile>> {
self.profiles.get(query_id).map(|v| v.value().clone())
}

/// Inserts a query profile.
/// This operation is thread-safe.
pub fn insert(&self, query_id: String, query_profile: Arc<QueryProfile>) {
// Lock the LRU list to ensure the consistency between the LRU list and the query profiles.
let mut lru = self.lru.lock().unwrap();

if let Entry::Occupied(mut prof) = self.profiles.entry(query_id.clone()) {
prof.insert(query_profile);
return;
}

if self.profiles.len() >= self.capacity {
if let Some(query_id) = lru.pop_front() {
self.profiles.remove(&query_id);
}
}

self.profiles.insert(query_id.clone(), query_profile);
lru.push_back(query_id);
}

/// Lists all query profiles.
/// Notice that this method required to acquire the shared lock of the LRU list.
/// So don't call this method when the lock is already acquired.
pub fn list_all(&self) -> Vec<Arc<QueryProfile>> {
self.profiles.iter().map(|v| v.value().clone()).collect()
}
}
Loading