Skip to content

Commit

Permalink
feat(query): support send spill file stats to client (#17186)
Browse files Browse the repository at this point in the history
  • Loading branch information
sundy-li authored Jan 7, 2025
1 parent c17767b commit f715a98
Show file tree
Hide file tree
Showing 10 changed files with 84 additions and 33 deletions.
1 change: 1 addition & 0 deletions src/common/base/src/base/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub use ordered_float::OrderedFloat;
pub use profiling::Profiling;
pub use progress::Progress;
pub use progress::ProgressValues;
pub use progress::SpillProgress;
pub use select::select3;
pub use select::Select3Output;
pub use semaphore::Semaphore;
Expand Down
17 changes: 17 additions & 0 deletions src/common/base/src/base/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,20 @@ impl Progress {
ProgressValues { rows, bytes }
}
}

#[derive(Debug, Clone, Deserialize, Serialize, Default)]
pub struct SpillProgress {
pub file_nums: usize,
pub bytes: usize,
}

impl SpillProgress {
pub fn new(file_nums: usize, bytes: usize) -> Self {
Self { file_nums, bytes }
}

pub fn incr(&mut self, other: &SpillProgress) {
self.file_nums += other.file_nums;
self.bytes += other.bytes;
}
}
6 changes: 3 additions & 3 deletions src/query/service/src/interpreters/hook/vacuum_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ pub fn hook_vacuum_temp_files(query_ctx: &Arc<QueryContext>) -> Result<()> {

let mut node_files = HashMap::new();
for node in cluster.nodes.iter() {
let num = query_ctx.get_spill_file_nums(Some(node.id.clone()));
if num != 0 {
let stats = query_ctx.get_spill_file_stats(Some(node.id.clone()));
if stats.file_nums != 0 {
if let Some(index) = cluster.index_of_nodeid(&node.id) {
node_files.insert(index, num);
node_files.insert(index, stats.file_nums);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,9 @@ impl StatisticsSender {
progress_info.push(ProgressInfo::ResultProgress(result_progress_values));
}

let spill_file_nums = ctx.get_spill_file_nums(None);
if spill_file_nums != 0 {
progress_info.push(ProgressInfo::SpillTotalFileNums(spill_file_nums));
let stats = ctx.get_spill_file_stats(None);
if stats.file_nums != 0 {
progress_info.push(ProgressInfo::SpillTotalStats(stats))
}
progress_info
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use byteorder::BigEndian;
use byteorder::ReadBytesExt;
use byteorder::WriteBytesExt;
use databend_common_base::base::ProgressValues;
use databend_common_base::base::SpillProgress;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;

Expand All @@ -33,7 +34,7 @@ pub enum ProgressInfo {
ScanProgress(ProgressValues),
WriteProgress(ProgressValues),
ResultProgress(ProgressValues),
SpillTotalFileNums(usize),
SpillTotalStats(SpillProgress),
}

impl ProgressInfo {
Expand All @@ -42,8 +43,8 @@ impl ProgressInfo {
ProgressInfo::ScanProgress(values) => ctx.get_scan_progress().incr(values),
ProgressInfo::WriteProgress(values) => ctx.get_write_progress().incr(values),
ProgressInfo::ResultProgress(values) => ctx.get_result_progress().incr(values),
ProgressInfo::SpillTotalFileNums(values) => {
ctx.set_cluster_spill_file_nums(source_target, *values)
ProgressInfo::SpillTotalStats(values) => {
ctx.set_cluster_spill_progress(source_target, values.clone())
}
};
}
Expand All @@ -53,9 +54,10 @@ impl ProgressInfo {
ProgressInfo::ScanProgress(values) => (1_u8, values),
ProgressInfo::WriteProgress(values) => (2_u8, values),
ProgressInfo::ResultProgress(values) => (3_u8, values),
ProgressInfo::SpillTotalFileNums(values) => {
ProgressInfo::SpillTotalStats(values) => {
bytes.write_u8(4)?;
bytes.write_u64::<BigEndian>(values as u64)?;
bytes.write_u64::<BigEndian>(values.file_nums as u64)?;
bytes.write_u64::<BigEndian>(values.bytes as u64)?;
return Ok(());
}
};
Expand All @@ -70,8 +72,11 @@ impl ProgressInfo {
let info_type = bytes.read_u8()?;

if info_type == 4 {
let values = bytes.read_u64::<BigEndian>()? as usize;
return Ok(ProgressInfo::SpillTotalFileNums(values));
let nums = bytes.read_u64::<BigEndian>()? as usize;
let bytes = bytes.read_u64::<BigEndian>()? as usize;
return Ok(ProgressInfo::SpillTotalStats(SpillProgress::new(
nums, bytes,
)));
}

let rows = bytes.read_u64::<BigEndian>()? as usize;
Expand Down
3 changes: 3 additions & 0 deletions src/query/service/src/servers/http/v1/query/execute_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::time::SystemTime;

use databend_common_base::base::tokio::sync::RwLock;
use databend_common_base::base::ProgressValues;
use databend_common_base::base::SpillProgress;
use databend_common_base::runtime::CatchUnwindFuture;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
Expand Down Expand Up @@ -78,6 +79,7 @@ pub struct Progresses {
pub write_progress: ProgressValues,
pub result_progress: ProgressValues,
pub total_scan: ProgressValues,
pub spill_progress: SpillProgress,
}

impl Progresses {
Expand All @@ -87,6 +89,7 @@ impl Progresses {
write_progress: ctx.get_write_progress_value(),
result_progress: ctx.get_result_progress_value(),
total_scan: ctx.get_total_scan_value(),
spill_progress: ctx.get_total_spill_progress(),
}
}
}
Expand Down
36 changes: 26 additions & 10 deletions src/query/service/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use dashmap::mapref::multiple::RefMulti;
use dashmap::DashMap;
use databend_common_base::base::Progress;
use databend_common_base::base::ProgressValues;
use databend_common_base::base::SpillProgress;
use databend_common_base::runtime::profile::Profile;
use databend_common_base::runtime::profile::ProfileStatisticsName;
use databend_common_base::runtime::GlobalIORuntime;
Expand Down Expand Up @@ -318,7 +319,7 @@ impl QueryContext {

pub fn update_init_query_id(&self, id: String) {
self.shared.spilled_files.write().clear();
self.shared.cluster_spill_file_nums.write().clear();
self.shared.cluster_spill_progress.write().clear();
*self.shared.init_query_id.write() = id;
}

Expand Down Expand Up @@ -363,32 +364,47 @@ impl QueryContext {
&self,
location: crate::spillers::Location,
layout: crate::spillers::Layout,
data_size: usize,
) {
if matches!(location, crate::spillers::Location::Remote(_)) {
let current_id = self.get_cluster().local_id();
let mut w = self.shared.cluster_spill_file_nums.write();
w.entry(current_id).and_modify(|e| *e += 1).or_insert(1);
let mut w = self.shared.cluster_spill_progress.write();
let p = SpillProgress::new(1, data_size);
w.entry(current_id)
.and_modify(|stats| {
stats.incr(&p);
})
.or_insert(p);
}
{
let mut w = self.shared.spilled_files.write();
w.insert(location, layout);
}
}

pub fn set_cluster_spill_file_nums(&self, source_target: &str, num: usize) {
if num != 0 {
pub fn set_cluster_spill_progress(&self, source_target: &str, stats: SpillProgress) {
if stats.file_nums != 0 {
let _ = self
.shared
.cluster_spill_file_nums
.cluster_spill_progress
.write()
.insert(source_target.to_string(), num);
.insert(source_target.to_string(), stats);
}
}

pub fn get_spill_file_nums(&self, node_id: Option<String>) -> usize {
let r = self.shared.cluster_spill_file_nums.read();
pub fn get_spill_file_stats(&self, node_id: Option<String>) -> SpillProgress {
let r = self.shared.cluster_spill_progress.read();
let node_id = node_id.unwrap_or(self.get_cluster().local_id());
r.get(&node_id).cloned().unwrap_or(0)
r.get(&node_id).cloned().unwrap_or(SpillProgress::default())
}

pub fn get_total_spill_progress(&self) -> SpillProgress {
let r = self.shared.cluster_spill_progress.read();
let mut total = SpillProgress::default();
for (_, stats) in r.iter() {
total.incr(stats);
}
total
}

pub fn get_spill_layout(
Expand Down
5 changes: 3 additions & 2 deletions src/query/service/src/sessions/query_ctx_shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::time::SystemTime;
use dashmap::DashMap;
use databend_common_base::base::short_sql;
use databend_common_base::base::Progress;
use databend_common_base::base::SpillProgress;
use databend_common_base::runtime::drop_guard;
use databend_common_base::runtime::Runtime;
use databend_common_catalog::catalog::Catalog;
Expand Down Expand Up @@ -143,7 +144,7 @@ pub struct QueryContextShared {

pub(in crate::sessions) query_queued_duration: Arc<RwLock<Duration>>,

pub(in crate::sessions) cluster_spill_file_nums: Arc<RwLock<HashMap<String, usize>>>,
pub(in crate::sessions) cluster_spill_progress: Arc<RwLock<HashMap<String, SpillProgress>>>,
pub(in crate::sessions) spilled_files:
Arc<RwLock<HashMap<crate::spillers::Location, crate::spillers::Layout>>>,
}
Expand Down Expand Up @@ -203,7 +204,7 @@ impl QueryContextShared {
multi_table_insert_status: Default::default(),
query_queued_duration: Arc::new(RwLock::new(Duration::from_secs(0))),

cluster_spill_file_nums: Default::default(),
cluster_spill_progress: Default::default(),
spilled_files: Default::default(),
}))
}
Expand Down
22 changes: 15 additions & 7 deletions src/query/service/src/spillers/spiller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,17 +147,21 @@ impl Spiller {

/// Spill some [`DataBlock`] to storage. These blocks will be concat into one.
pub async fn spill(&self, data_block: Vec<DataBlock>) -> Result<Location> {
let (location, layout) = self.spill_unmanage(data_block).await?;
let (location, layout, data_size) = self.spill_unmanage(data_block).await?;

// Record columns layout for spilled data.
self.ctx.add_spill_file(location.clone(), layout.clone());
self.ctx
.add_spill_file(location.clone(), layout.clone(), data_size);
self.private_spilled_files
.write()
.insert(location.clone(), layout);
Ok(location)
}

async fn spill_unmanage(&self, data_block: Vec<DataBlock>) -> Result<(Location, Layout)> {
async fn spill_unmanage(
&self,
data_block: Vec<DataBlock>,
) -> Result<(Location, Layout, usize)> {
debug_assert!(!data_block.is_empty());
let instant = Instant::now();

Expand All @@ -176,7 +180,7 @@ impl Spiller {
// Record statistics.
record_write_profile(&location, &instant, data_size);
let layout = columns_layout.pop().unwrap();
Ok((location, layout))
Ok((location, layout, data_size))
}

pub fn create_unique_location(&self) -> String {
Expand Down Expand Up @@ -205,8 +209,11 @@ impl Spiller {
}

writer.close().await?;
self.ctx
.add_spill_file(Location::Remote(location.clone()), Layout::Aggregate);
self.ctx.add_spill_file(
Location::Remote(location.clone()),
Layout::Aggregate,
write_bytes,
);

self.private_spilled_files
.write()
Expand Down Expand Up @@ -289,7 +296,8 @@ impl Spiller {
// Record statistics.
record_write_profile(&location, &instant, write_bytes);

self.ctx.add_spill_file(location.clone(), layout.clone());
self.ctx
.add_spill_file(location.clone(), layout.clone(), write_bytes);
self.private_spilled_files
.write()
.insert(location.clone(), layout);
Expand Down
2 changes: 1 addition & 1 deletion tests/sqllogictests/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ static HYBRID_CONFIGS: LazyLock<Vec<(Box<ClientType>, usize)>> = LazyLock::new(|
(Box::new(ClientType::MySQL), 3),
(
Box::new(ClientType::Ttc(
"sundyli/ttc-rust:latest".to_string(),
"datafuselabs/ttc-rust:latest".to_string(),
TTC_PORT_START,
)),
7,
Expand Down

0 comments on commit f715a98

Please sign in to comment.