Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(query): support send spill file stats to client #17186

Merged
merged 2 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/common/base/src/base/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub use ordered_float::OrderedFloat;
pub use profiling::Profiling;
pub use progress::Progress;
pub use progress::ProgressValues;
pub use progress::SpillProgress;
pub use select::select3;
pub use select::Select3Output;
pub use semaphore::Semaphore;
Expand Down
17 changes: 17 additions & 0 deletions src/common/base/src/base/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,20 @@ impl Progress {
ProgressValues { rows, bytes }
}
}

#[derive(Debug, Clone, Deserialize, Serialize, Default)]
pub struct SpillProgress {
pub file_nums: usize,
pub bytes: usize,
}

impl SpillProgress {
pub fn new(file_nums: usize, bytes: usize) -> Self {
Self { file_nums, bytes }
}

pub fn incr(&mut self, other: &SpillProgress) {
self.file_nums += other.file_nums;
self.bytes += other.bytes;
}
}
6 changes: 3 additions & 3 deletions src/query/service/src/interpreters/hook/vacuum_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ pub fn hook_vacuum_temp_files(query_ctx: &Arc<QueryContext>) -> Result<()> {

let mut node_files = HashMap::new();
for node in cluster.nodes.iter() {
let num = query_ctx.get_spill_file_nums(Some(node.id.clone()));
if num != 0 {
let stats = query_ctx.get_spill_file_stats(Some(node.id.clone()));
if stats.file_nums != 0 {
if let Some(index) = cluster.index_of_nodeid(&node.id) {
node_files.insert(index, num);
node_files.insert(index, stats.file_nums);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,9 @@ impl StatisticsSender {
progress_info.push(ProgressInfo::ResultProgress(result_progress_values));
}

let spill_file_nums = ctx.get_spill_file_nums(None);
if spill_file_nums != 0 {
progress_info.push(ProgressInfo::SpillTotalFileNums(spill_file_nums));
let stats = ctx.get_spill_file_stats(None);
if stats.file_nums != 0 {
progress_info.push(ProgressInfo::SpillTotalStats(stats))
}
progress_info
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use byteorder::BigEndian;
use byteorder::ReadBytesExt;
use byteorder::WriteBytesExt;
use databend_common_base::base::ProgressValues;
use databend_common_base::base::SpillProgress;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;

Expand All @@ -33,7 +34,7 @@ pub enum ProgressInfo {
ScanProgress(ProgressValues),
WriteProgress(ProgressValues),
ResultProgress(ProgressValues),
SpillTotalFileNums(usize),
SpillTotalStats(SpillProgress),
}

impl ProgressInfo {
Expand All @@ -42,8 +43,8 @@ impl ProgressInfo {
ProgressInfo::ScanProgress(values) => ctx.get_scan_progress().incr(values),
ProgressInfo::WriteProgress(values) => ctx.get_write_progress().incr(values),
ProgressInfo::ResultProgress(values) => ctx.get_result_progress().incr(values),
ProgressInfo::SpillTotalFileNums(values) => {
ctx.set_cluster_spill_file_nums(source_target, *values)
ProgressInfo::SpillTotalStats(values) => {
ctx.set_cluster_spill_progress(source_target, values.clone())
}
};
}
Expand All @@ -53,9 +54,10 @@ impl ProgressInfo {
ProgressInfo::ScanProgress(values) => (1_u8, values),
ProgressInfo::WriteProgress(values) => (2_u8, values),
ProgressInfo::ResultProgress(values) => (3_u8, values),
ProgressInfo::SpillTotalFileNums(values) => {
ProgressInfo::SpillTotalStats(values) => {
bytes.write_u8(4)?;
bytes.write_u64::<BigEndian>(values as u64)?;
bytes.write_u64::<BigEndian>(values.file_nums as u64)?;
bytes.write_u64::<BigEndian>(values.bytes as u64)?;
return Ok(());
}
};
Expand All @@ -70,8 +72,11 @@ impl ProgressInfo {
let info_type = bytes.read_u8()?;

if info_type == 4 {
let values = bytes.read_u64::<BigEndian>()? as usize;
return Ok(ProgressInfo::SpillTotalFileNums(values));
let nums = bytes.read_u64::<BigEndian>()? as usize;
let bytes = bytes.read_u64::<BigEndian>()? as usize;
return Ok(ProgressInfo::SpillTotalStats(SpillProgress::new(
nums, bytes,
)));
}

let rows = bytes.read_u64::<BigEndian>()? as usize;
Expand Down
3 changes: 3 additions & 0 deletions src/query/service/src/servers/http/v1/query/execute_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::time::SystemTime;

use databend_common_base::base::tokio::sync::RwLock;
use databend_common_base::base::ProgressValues;
use databend_common_base::base::SpillProgress;
use databend_common_base::runtime::CatchUnwindFuture;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
Expand Down Expand Up @@ -78,6 +79,7 @@ pub struct Progresses {
pub write_progress: ProgressValues,
pub result_progress: ProgressValues,
pub total_scan: ProgressValues,
pub spill_progress: SpillProgress,
}

impl Progresses {
Expand All @@ -87,6 +89,7 @@ impl Progresses {
write_progress: ctx.get_write_progress_value(),
result_progress: ctx.get_result_progress_value(),
total_scan: ctx.get_total_scan_value(),
spill_progress: ctx.get_total_spill_progress(),
}
}
}
Expand Down
36 changes: 26 additions & 10 deletions src/query/service/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use dashmap::mapref::multiple::RefMulti;
use dashmap::DashMap;
use databend_common_base::base::Progress;
use databend_common_base::base::ProgressValues;
use databend_common_base::base::SpillProgress;
use databend_common_base::runtime::profile::Profile;
use databend_common_base::runtime::profile::ProfileStatisticsName;
use databend_common_base::runtime::GlobalIORuntime;
Expand Down Expand Up @@ -318,7 +319,7 @@ impl QueryContext {

pub fn update_init_query_id(&self, id: String) {
self.shared.spilled_files.write().clear();
self.shared.cluster_spill_file_nums.write().clear();
self.shared.cluster_spill_progress.write().clear();
*self.shared.init_query_id.write() = id;
}

Expand Down Expand Up @@ -363,32 +364,47 @@ impl QueryContext {
&self,
location: crate::spillers::Location,
layout: crate::spillers::Layout,
data_size: usize,
) {
if matches!(location, crate::spillers::Location::Remote(_)) {
let current_id = self.get_cluster().local_id();
let mut w = self.shared.cluster_spill_file_nums.write();
w.entry(current_id).and_modify(|e| *e += 1).or_insert(1);
let mut w = self.shared.cluster_spill_progress.write();
let p = SpillProgress::new(1, data_size);
w.entry(current_id)
.and_modify(|stats| {
stats.incr(&p);
})
.or_insert(p);
}
{
let mut w = self.shared.spilled_files.write();
w.insert(location, layout);
}
}

pub fn set_cluster_spill_file_nums(&self, source_target: &str, num: usize) {
if num != 0 {
pub fn set_cluster_spill_progress(&self, source_target: &str, stats: SpillProgress) {
if stats.file_nums != 0 {
let _ = self
.shared
.cluster_spill_file_nums
.cluster_spill_progress
.write()
.insert(source_target.to_string(), num);
.insert(source_target.to_string(), stats);
}
}

pub fn get_spill_file_nums(&self, node_id: Option<String>) -> usize {
let r = self.shared.cluster_spill_file_nums.read();
pub fn get_spill_file_stats(&self, node_id: Option<String>) -> SpillProgress {
let r = self.shared.cluster_spill_progress.read();
let node_id = node_id.unwrap_or(self.get_cluster().local_id());
r.get(&node_id).cloned().unwrap_or(0)
r.get(&node_id).cloned().unwrap_or(SpillProgress::default())
}

pub fn get_total_spill_progress(&self) -> SpillProgress {
let r = self.shared.cluster_spill_progress.read();
let mut total = SpillProgress::default();
for (_, stats) in r.iter() {
total.incr(stats);
}
total
}

pub fn get_spill_layout(
Expand Down
5 changes: 3 additions & 2 deletions src/query/service/src/sessions/query_ctx_shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::time::SystemTime;
use dashmap::DashMap;
use databend_common_base::base::short_sql;
use databend_common_base::base::Progress;
use databend_common_base::base::SpillProgress;
use databend_common_base::runtime::drop_guard;
use databend_common_base::runtime::Runtime;
use databend_common_catalog::catalog::Catalog;
Expand Down Expand Up @@ -143,7 +144,7 @@ pub struct QueryContextShared {

pub(in crate::sessions) query_queued_duration: Arc<RwLock<Duration>>,

pub(in crate::sessions) cluster_spill_file_nums: Arc<RwLock<HashMap<String, usize>>>,
pub(in crate::sessions) cluster_spill_progress: Arc<RwLock<HashMap<String, SpillProgress>>>,
pub(in crate::sessions) spilled_files:
Arc<RwLock<HashMap<crate::spillers::Location, crate::spillers::Layout>>>,
}
Expand Down Expand Up @@ -203,7 +204,7 @@ impl QueryContextShared {
multi_table_insert_status: Default::default(),
query_queued_duration: Arc::new(RwLock::new(Duration::from_secs(0))),

cluster_spill_file_nums: Default::default(),
cluster_spill_progress: Default::default(),
spilled_files: Default::default(),
}))
}
Expand Down
22 changes: 15 additions & 7 deletions src/query/service/src/spillers/spiller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,17 +147,21 @@ impl Spiller {

/// Spill some [`DataBlock`] to storage. These blocks will be concat into one.
pub async fn spill(&self, data_block: Vec<DataBlock>) -> Result<Location> {
let (location, layout) = self.spill_unmanage(data_block).await?;
let (location, layout, data_size) = self.spill_unmanage(data_block).await?;

// Record columns layout for spilled data.
self.ctx.add_spill_file(location.clone(), layout.clone());
self.ctx
.add_spill_file(location.clone(), layout.clone(), data_size);
self.private_spilled_files
.write()
.insert(location.clone(), layout);
Ok(location)
}

async fn spill_unmanage(&self, data_block: Vec<DataBlock>) -> Result<(Location, Layout)> {
async fn spill_unmanage(
&self,
data_block: Vec<DataBlock>,
) -> Result<(Location, Layout, usize)> {
debug_assert!(!data_block.is_empty());
let instant = Instant::now();

Expand All @@ -176,7 +180,7 @@ impl Spiller {
// Record statistics.
record_write_profile(&location, &instant, data_size);
let layout = columns_layout.pop().unwrap();
Ok((location, layout))
Ok((location, layout, data_size))
}

pub fn create_unique_location(&self) -> String {
Expand Down Expand Up @@ -205,8 +209,11 @@ impl Spiller {
}

writer.close().await?;
self.ctx
.add_spill_file(Location::Remote(location.clone()), Layout::Aggregate);
self.ctx.add_spill_file(
Location::Remote(location.clone()),
Layout::Aggregate,
write_bytes,
);

self.private_spilled_files
.write()
Expand Down Expand Up @@ -289,7 +296,8 @@ impl Spiller {
// Record statistics.
record_write_profile(&location, &instant, write_bytes);

self.ctx.add_spill_file(location.clone(), layout.clone());
self.ctx
.add_spill_file(location.clone(), layout.clone(), write_bytes);
self.private_spilled_files
.write()
.insert(location.clone(), layout);
Expand Down
2 changes: 1 addition & 1 deletion tests/sqllogictests/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ static HYBRID_CONFIGS: LazyLock<Vec<(Box<ClientType>, usize)>> = LazyLock::new(|
(Box::new(ClientType::MySQL), 3),
(
Box::new(ClientType::Ttc(
"sundyli/ttc-rust:latest".to_string(),
"datafuselabs/ttc-rust:latest".to_string(),
TTC_PORT_START,
)),
7,
Expand Down
Loading