From 2db922d930541d4266ad7dc90964eaef953698cd Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Tue, 7 Jan 2025 10:26:34 +0800 Subject: [PATCH] feat(query): support send spill file stats to client --- src/common/base/src/base/mod.rs | 1 + src/common/base/src/base/progress.rs | 17 +++++++++ .../src/interpreters/hook/vacuum_hook.rs | 6 ++-- .../flight/v1/exchange/statistics_sender.rs | 6 ++-- .../v1/packets/packet_data_progressinfo.rs | 19 ++++++---- .../servers/http/v1/query/execute_state.rs | 3 ++ src/query/service/src/sessions/query_ctx.rs | 36 +++++++++++++------ .../service/src/sessions/query_ctx_shared.rs | 5 +-- src/query/service/src/spillers/spiller.rs | 22 ++++++++---- tests/sqllogictests/src/main.rs | 2 +- 10 files changed, 84 insertions(+), 33 deletions(-) diff --git a/src/common/base/src/base/mod.rs b/src/common/base/src/base/mod.rs index e77671ec3cb48..ad268f081a8f4 100644 --- a/src/common/base/src/base/mod.rs +++ b/src/common/base/src/base/mod.rs @@ -41,6 +41,7 @@ pub use ordered_float::OrderedFloat; pub use profiling::Profiling; pub use progress::Progress; pub use progress::ProgressValues; +pub use progress::SpillProgress; pub use select::select3; pub use select::Select3Output; pub use semaphore::Semaphore; diff --git a/src/common/base/src/base/progress.rs b/src/common/base/src/base/progress.rs index a125a545d1a38..0b444164fe690 100644 --- a/src/common/base/src/base/progress.rs +++ b/src/common/base/src/base/progress.rs @@ -62,3 +62,20 @@ impl Progress { ProgressValues { rows, bytes } } } + +#[derive(Debug, Clone, Deserialize, Serialize, Default)] +pub struct SpillProgress { + pub file_nums: usize, + pub bytes: usize, +} + +impl SpillProgress { + pub fn new(file_nums: usize, bytes: usize) -> Self { + Self { file_nums, bytes } + } + + pub fn incr(&mut self, other: &SpillProgress) { + self.file_nums += other.file_nums; + self.bytes += other.bytes; + } +} diff --git a/src/query/service/src/interpreters/hook/vacuum_hook.rs b/src/query/service/src/interpreters/hook/vacuum_hook.rs index 271120514b039..e8229029a33e0 100644 --- a/src/query/service/src/interpreters/hook/vacuum_hook.rs +++ b/src/query/service/src/interpreters/hook/vacuum_hook.rs @@ -48,10 +48,10 @@ pub fn hook_vacuum_temp_files(query_ctx: &Arc) -> Result<()> { let mut node_files = HashMap::new(); for node in cluster.nodes.iter() { - let num = query_ctx.get_spill_file_nums(Some(node.id.clone())); - if num != 0 { + let stats = query_ctx.get_spill_file_stats(Some(node.id.clone())); + if stats.file_nums != 0 { if let Some(index) = cluster.index_of_nodeid(&node.id) { - node_files.insert(index, num); + node_files.insert(index, stats.file_nums); } } } diff --git a/src/query/service/src/servers/flight/v1/exchange/statistics_sender.rs b/src/query/service/src/servers/flight/v1/exchange/statistics_sender.rs index 8a3e5db2df15e..33ea8c1f915b7 100644 --- a/src/query/service/src/servers/flight/v1/exchange/statistics_sender.rs +++ b/src/query/service/src/servers/flight/v1/exchange/statistics_sender.rs @@ -230,9 +230,9 @@ impl StatisticsSender { progress_info.push(ProgressInfo::ResultProgress(result_progress_values)); } - let spill_file_nums = ctx.get_spill_file_nums(None); - if spill_file_nums != 0 { - progress_info.push(ProgressInfo::SpillTotalFileNums(spill_file_nums)); + let stats = ctx.get_spill_file_stats(None); + if stats.file_nums != 0 { + progress_info.push(ProgressInfo::SpillTotalStats(stats)) } progress_info } diff --git a/src/query/service/src/servers/flight/v1/packets/packet_data_progressinfo.rs b/src/query/service/src/servers/flight/v1/packets/packet_data_progressinfo.rs index bba7ef9e5011f..87a64a0fe8a0e 100644 --- a/src/query/service/src/servers/flight/v1/packets/packet_data_progressinfo.rs +++ b/src/query/service/src/servers/flight/v1/packets/packet_data_progressinfo.rs @@ -21,6 +21,7 @@ use byteorder::BigEndian; use byteorder::ReadBytesExt; use byteorder::WriteBytesExt; use databend_common_base::base::ProgressValues; +use databend_common_base::base::SpillProgress; use databend_common_exception::ErrorCode; use databend_common_exception::Result; @@ -33,7 +34,7 @@ pub enum ProgressInfo { ScanProgress(ProgressValues), WriteProgress(ProgressValues), ResultProgress(ProgressValues), - SpillTotalFileNums(usize), + SpillTotalStats(SpillProgress), } impl ProgressInfo { @@ -42,8 +43,8 @@ impl ProgressInfo { ProgressInfo::ScanProgress(values) => ctx.get_scan_progress().incr(values), ProgressInfo::WriteProgress(values) => ctx.get_write_progress().incr(values), ProgressInfo::ResultProgress(values) => ctx.get_result_progress().incr(values), - ProgressInfo::SpillTotalFileNums(values) => { - ctx.set_cluster_spill_file_nums(source_target, *values) + ProgressInfo::SpillTotalStats(values) => { + ctx.set_cluster_spill_progress(source_target, values.clone()) } }; } @@ -53,9 +54,10 @@ impl ProgressInfo { ProgressInfo::ScanProgress(values) => (1_u8, values), ProgressInfo::WriteProgress(values) => (2_u8, values), ProgressInfo::ResultProgress(values) => (3_u8, values), - ProgressInfo::SpillTotalFileNums(values) => { + ProgressInfo::SpillTotalStats(values) => { bytes.write_u8(4)?; - bytes.write_u64::(values as u64)?; + bytes.write_u64::(values.file_nums as u64)?; + bytes.write_u64::(values.bytes as u64)?; return Ok(()); } }; @@ -70,8 +72,11 @@ impl ProgressInfo { let info_type = bytes.read_u8()?; if info_type == 4 { - let values = bytes.read_u64::()? as usize; - return Ok(ProgressInfo::SpillTotalFileNums(values)); + let nums = bytes.read_u64::()? as usize; + let bytes = bytes.read_u64::()? as usize; + return Ok(ProgressInfo::SpillTotalStats(SpillProgress::new( + nums, bytes, + ))); } let rows = bytes.read_u64::()? as usize; diff --git a/src/query/service/src/servers/http/v1/query/execute_state.rs b/src/query/service/src/servers/http/v1/query/execute_state.rs index d1e2fe8e0bdfa..14741394c1bb5 100644 --- a/src/query/service/src/servers/http/v1/query/execute_state.rs +++ b/src/query/service/src/servers/http/v1/query/execute_state.rs @@ -18,6 +18,7 @@ use std::time::SystemTime; use databend_common_base::base::tokio::sync::RwLock; use databend_common_base::base::ProgressValues; +use databend_common_base::base::SpillProgress; use databend_common_base::runtime::CatchUnwindFuture; use databend_common_exception::ErrorCode; use databend_common_exception::Result; @@ -78,6 +79,7 @@ pub struct Progresses { pub write_progress: ProgressValues, pub result_progress: ProgressValues, pub total_scan: ProgressValues, + pub spill_progress: SpillProgress, } impl Progresses { @@ -87,6 +89,7 @@ impl Progresses { write_progress: ctx.get_write_progress_value(), result_progress: ctx.get_result_progress_value(), total_scan: ctx.get_total_scan_value(), + spill_progress: ctx.get_total_spill_progress(), } } } diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index e1d56ae423e5c..0392c22f6336b 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -33,6 +33,7 @@ use dashmap::mapref::multiple::RefMulti; use dashmap::DashMap; use databend_common_base::base::Progress; use databend_common_base::base::ProgressValues; +use databend_common_base::base::SpillProgress; use databend_common_base::runtime::profile::Profile; use databend_common_base::runtime::profile::ProfileStatisticsName; use databend_common_base::runtime::GlobalIORuntime; @@ -318,7 +319,7 @@ impl QueryContext { pub fn update_init_query_id(&self, id: String) { self.shared.spilled_files.write().clear(); - self.shared.cluster_spill_file_nums.write().clear(); + self.shared.cluster_spill_progress.write().clear(); *self.shared.init_query_id.write() = id; } @@ -363,11 +364,17 @@ impl QueryContext { &self, location: crate::spillers::Location, layout: crate::spillers::Layout, + data_size: usize, ) { if matches!(location, crate::spillers::Location::Remote(_)) { let current_id = self.get_cluster().local_id(); - let mut w = self.shared.cluster_spill_file_nums.write(); - w.entry(current_id).and_modify(|e| *e += 1).or_insert(1); + let mut w = self.shared.cluster_spill_progress.write(); + let p = SpillProgress::new(1, data_size); + w.entry(current_id) + .and_modify(|stats| { + stats.incr(&p); + }) + .or_insert(p); } { let mut w = self.shared.spilled_files.write(); @@ -375,20 +382,29 @@ impl QueryContext { } } - pub fn set_cluster_spill_file_nums(&self, source_target: &str, num: usize) { - if num != 0 { + pub fn set_cluster_spill_progress(&self, source_target: &str, stats: SpillProgress) { + if stats.file_nums != 0 { let _ = self .shared - .cluster_spill_file_nums + .cluster_spill_progress .write() - .insert(source_target.to_string(), num); + .insert(source_target.to_string(), stats); } } - pub fn get_spill_file_nums(&self, node_id: Option) -> usize { - let r = self.shared.cluster_spill_file_nums.read(); + pub fn get_spill_file_stats(&self, node_id: Option) -> SpillProgress { + let r = self.shared.cluster_spill_progress.read(); let node_id = node_id.unwrap_or(self.get_cluster().local_id()); - r.get(&node_id).cloned().unwrap_or(0) + r.get(&node_id).cloned().unwrap_or(SpillProgress::default()) + } + + pub fn get_total_spill_progress(&self) -> SpillProgress { + let r = self.shared.cluster_spill_progress.read(); + let mut total = SpillProgress::default(); + for (_, stats) in r.iter() { + total.incr(stats); + } + total } pub fn get_spill_layout( diff --git a/src/query/service/src/sessions/query_ctx_shared.rs b/src/query/service/src/sessions/query_ctx_shared.rs index 6a760f11fc61e..c31fe083316b2 100644 --- a/src/query/service/src/sessions/query_ctx_shared.rs +++ b/src/query/service/src/sessions/query_ctx_shared.rs @@ -24,6 +24,7 @@ use std::time::SystemTime; use dashmap::DashMap; use databend_common_base::base::short_sql; use databend_common_base::base::Progress; +use databend_common_base::base::SpillProgress; use databend_common_base::runtime::drop_guard; use databend_common_base::runtime::Runtime; use databend_common_catalog::catalog::Catalog; @@ -143,7 +144,7 @@ pub struct QueryContextShared { pub(in crate::sessions) query_queued_duration: Arc>, - pub(in crate::sessions) cluster_spill_file_nums: Arc>>, + pub(in crate::sessions) cluster_spill_progress: Arc>>, pub(in crate::sessions) spilled_files: Arc>>, } @@ -203,7 +204,7 @@ impl QueryContextShared { multi_table_insert_status: Default::default(), query_queued_duration: Arc::new(RwLock::new(Duration::from_secs(0))), - cluster_spill_file_nums: Default::default(), + cluster_spill_progress: Default::default(), spilled_files: Default::default(), })) } diff --git a/src/query/service/src/spillers/spiller.rs b/src/query/service/src/spillers/spiller.rs index 3193924569ef2..5111d597c0346 100644 --- a/src/query/service/src/spillers/spiller.rs +++ b/src/query/service/src/spillers/spiller.rs @@ -147,17 +147,21 @@ impl Spiller { /// Spill some [`DataBlock`] to storage. These blocks will be concat into one. pub async fn spill(&self, data_block: Vec) -> Result { - let (location, layout) = self.spill_unmanage(data_block).await?; + let (location, layout, data_size) = self.spill_unmanage(data_block).await?; // Record columns layout for spilled data. - self.ctx.add_spill_file(location.clone(), layout.clone()); + self.ctx + .add_spill_file(location.clone(), layout.clone(), data_size); self.private_spilled_files .write() .insert(location.clone(), layout); Ok(location) } - async fn spill_unmanage(&self, data_block: Vec) -> Result<(Location, Layout)> { + async fn spill_unmanage( + &self, + data_block: Vec, + ) -> Result<(Location, Layout, usize)> { debug_assert!(!data_block.is_empty()); let instant = Instant::now(); @@ -176,7 +180,7 @@ impl Spiller { // Record statistics. record_write_profile(&location, &instant, data_size); let layout = columns_layout.pop().unwrap(); - Ok((location, layout)) + Ok((location, layout, data_size)) } pub fn create_unique_location(&self) -> String { @@ -205,8 +209,11 @@ impl Spiller { } writer.close().await?; - self.ctx - .add_spill_file(Location::Remote(location.clone()), Layout::Aggregate); + self.ctx.add_spill_file( + Location::Remote(location.clone()), + Layout::Aggregate, + write_bytes, + ); self.private_spilled_files .write() @@ -289,7 +296,8 @@ impl Spiller { // Record statistics. record_write_profile(&location, &instant, write_bytes); - self.ctx.add_spill_file(location.clone(), layout.clone()); + self.ctx + .add_spill_file(location.clone(), layout.clone(), write_bytes); self.private_spilled_files .write() .insert(location.clone(), layout); diff --git a/tests/sqllogictests/src/main.rs b/tests/sqllogictests/src/main.rs index 36537af3c7226..05dd03bedd274 100644 --- a/tests/sqllogictests/src/main.rs +++ b/tests/sqllogictests/src/main.rs @@ -63,7 +63,7 @@ static HYBRID_CONFIGS: LazyLock, usize)>> = LazyLock::new(| (Box::new(ClientType::MySQL), 3), ( Box::new(ClientType::Ttc( - "sundyli/ttc-rust:latest".to_string(), + "datafuselabs/ttc-rust:latest".to_string(), TTC_PORT_START, )), 7,