From 0e12f288ff71eb1b5bb26bae3e32d431c376eb46 Mon Sep 17 00:00:00 2001 From: sundyli <543950155@qq.com> Date: Mon, 23 Dec 2024 09:57:08 +0800 Subject: [PATCH] feat(query): refactor vacuum temp files (#17089) * update * update * update * update * update * update * update * fix * fix * update * update * Update push_down_filter_join_left_outer.test * update tests * update tests * update tests * update reset --- .../src/storages/fuse/operations/handler.rs | 6 +- .../fuse/operations/vacuum_temporary_files.rs | 385 +++++++++--------- .../it/storages/fuse/operations/vacuum.rs | 9 +- .../vacuum_handler/src/vacuum_handler.rs | 13 +- src/query/pipeline/core/src/lib.rs | 1 - src/query/pipeline/core/src/pipeline.rs | 7 - src/query/service/src/clusters/cluster.rs | 10 + .../src/interpreters/hook/vacuum_hook.rs | 12 +- .../service/src/interpreters/interpreter.rs | 5 +- .../interpreters/interpreter_table_analyze.rs | 2 +- .../interpreter_vacuum_temporary_files.rs | 6 +- .../pipelines/builders/builder_aggregate.rs | 23 +- .../src/pipelines/builders/builder_join.rs | 4 +- .../src/pipelines/builders/builder_sort.rs | 8 +- .../pipelines/builders/builder_union_all.rs | 2 +- .../service/src/pipelines/pipeline_builder.rs | 9 + .../aggregator/aggregate_exchange_injector.rs | 31 +- .../serde/transform_aggregate_spill_writer.rs | 97 ++--- ...transform_exchange_aggregate_serializer.rs | 73 ++-- .../transforms/hash_join/hash_join_spiller.rs | 15 +- .../transforms/transform_sort_spill.rs | 5 - .../transforms/transform_stream_sort_spill.rs | 10 +- .../transform_window_partition_collect.rs | 5 +- .../flight/v1/exchange/exchange_manager.rs | 2 +- .../flight/v1/packets/packet_publisher.rs | 2 +- .../src/servers/http/v1/query/http_query.rs | 2 +- .../servers/mysql/mysql_interactive_worker.rs | 2 +- src/query/service/src/sessions/query_ctx.rs | 89 +++- .../service/src/sessions/query_ctx_shared.rs | 4 + src/query/service/src/spillers/serialize.rs | 2 + src/query/service/src/spillers/spiller.rs | 77 +++- .../others/suggested_background_tasks.rs | 3 +- .../service/tests/it/spillers/spiller.rs | 5 +- .../sql/src/planner/semantic/type_check.rs | 21 +- .../storages/system/src/temp_files_table.rs | 3 +- .../03_0000_vacuum_temporary_files.test | 24 +- .../push_down_filter_join_left_outer.test | 4 +- .../push_down_filter_join_left_outer.test | 2 +- .../query/functions/02_0070_function_nvl.test | 2 +- tests/sqllogictests/suites/tpch/spill.test | 188 ++++----- .../20+_others/20_0014_sort_spill.result | 22 +- ...4_sort_spill.sql => 20_0014_sort_spill.sh} | 16 +- 42 files changed, 684 insertions(+), 524 deletions(-) rename tests/suites/0_stateless/20+_others/{20_0014_sort_spill.sql => 20_0014_sort_spill.sh} (84%) mode change 100644 => 100755 diff --git a/src/query/ee/src/storages/fuse/operations/handler.rs b/src/query/ee/src/storages/fuse/operations/handler.rs index 51574620ee98d..9de42f9aecbf4 100644 --- a/src/query/ee/src/storages/fuse/operations/handler.rs +++ b/src/query/ee/src/storages/fuse/operations/handler.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::sync::Arc; -use std::time::Duration; use chrono::DateTime; use chrono::Utc; @@ -24,6 +23,7 @@ use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; use databend_common_storages_fuse::FuseTable; use databend_enterprise_vacuum_handler::vacuum_handler::VacuumDropTablesResult; +use databend_enterprise_vacuum_handler::vacuum_handler::VacuumTempOptions; use databend_enterprise_vacuum_handler::VacuumHandler; use databend_enterprise_vacuum_handler::VacuumHandlerWrapper; @@ -57,10 +57,10 @@ impl VacuumHandler for RealVacuumHandler { &self, abort_checker: AbortChecker, temporary_dir: String, - retain: Option, + options: &VacuumTempOptions, vacuum_limit: usize, ) -> Result { - do_vacuum_temporary_files(abort_checker, temporary_dir, retain, vacuum_limit).await + do_vacuum_temporary_files(abort_checker, temporary_dir, options, vacuum_limit).await } } diff --git a/src/query/ee/src/storages/fuse/operations/vacuum_temporary_files.rs b/src/query/ee/src/storages/fuse/operations/vacuum_temporary_files.rs index 7678b1e4f181c..91dd9b285ba41 100644 --- a/src/query/ee/src/storages/fuse/operations/vacuum_temporary_files.rs +++ b/src/query/ee/src/storages/fuse/operations/vacuum_temporary_files.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; +use std::io::BufRead; use std::time::Duration; use std::time::Instant; use std::time::SystemTime; @@ -20,243 +22,258 @@ use std::time::UNIX_EPOCH; use databend_common_catalog::table_context::AbortChecker; use databend_common_exception::Result; use databend_common_storage::DataOperator; +use databend_enterprise_vacuum_handler::vacuum_handler::VacuumTempOptions; use futures_util::stream; use futures_util::TryStreamExt; use log::info; -use opendal::Entry; -use opendal::EntryMode; -use opendal::Metakey; +use opendal::Buffer; +use opendal::ErrorKind; // Default retention duration for temporary files: 3 days. const DEFAULT_RETAIN_DURATION: Duration = Duration::from_secs(60 * 60 * 24 * 3); +const SPILL_META_SUFFIX: &str = ".list"; #[async_backtrace::framed] pub async fn do_vacuum_temporary_files( abort_checker: AbortChecker, temporary_dir: String, - retain: Option, + options: &VacuumTempOptions, limit: usize, ) -> Result { if limit == 0 { return Ok(0); } - let expire_time = retain.unwrap_or(DEFAULT_RETAIN_DURATION).as_millis() as i64; + match options { + VacuumTempOptions::QueryHook(nodes_num, query_id) => { + vacuum_query_hook(abort_checker, &temporary_dir, *nodes_num, query_id, limit).await + } + VacuumTempOptions::VacuumCommand(duration) => { + vacuum_by_duration(abort_checker, &temporary_dir, limit, duration).await + } + } +} + +async fn vacuum_by_duration( + abort_checker: AbortChecker, + temporary_dir: &str, + mut limit: usize, + duration: &Option, +) -> Result { + let operator = DataOperator::instance().operator(); + let start_time = Instant::now(); + + let expire_time = duration.unwrap_or(DEFAULT_RETAIN_DURATION).as_millis() as i64; let timestamp = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_millis() as i64; - let operator = DataOperator::instance().operator(); - + let mut removed_total = 0; let temporary_dir = format!("{}/", temporary_dir.trim_end_matches('/')); + let mut ds = operator.lister_with(&temporary_dir).await?; + + let mut temp_files = Vec::new(); + let mut gc_metas = HashSet::new(); + + // We may delete next entries during iteration + // So we can't use + loop { + let de = ds.try_next().await; + match de { + Ok(Some(de)) => { + abort_checker.try_check_aborting()?; + if de.path() == temporary_dir { + continue; + } + let name = de.name(); + let meta = if de.metadata().last_modified().is_none() { + operator.stat(de.path()).await + } else { + Ok(de.metadata().clone()) + }; + + if meta.is_err() { + continue; + } + let meta = meta.unwrap(); - let mut ds = operator - .lister_with(&temporary_dir) - .metakey(Metakey::Mode | Metakey::LastModified) - .await?; - - let mut removed_temp_files = 0; - let mut total_cleaned_size = 0; - let mut total_batch_size = 0; - let start_time = Instant::now(); - - while removed_temp_files < limit { - let instant = Instant::now(); - let mut end_of_stream = true; - let mut remove_temp_files_path = Vec::with_capacity(1000); - let mut batch_size = 0; - - while let Some(de) = ds.try_next().await? { - abort_checker.try_check_aborting()?; - if de.path() == temporary_dir { - continue; - } - - let meta = de.metadata(); - - match meta.mode() { - EntryMode::DIR => { - let life_mills = - match operator.exists(&format!("{}finished", de.path())).await? { - true => 0, - false => expire_time, - }; - - vacuum_finished_query( - &abort_checker, - start_time, - &mut removed_temp_files, - &mut total_cleaned_size, - &mut batch_size, - &de, + if let Some(modified) = meta.last_modified() { + if timestamp - modified.timestamp_millis() < expire_time { + continue; + } + } + if meta.is_file() { + if name.ends_with(SPILL_META_SUFFIX) { + if gc_metas.contains(name) { + continue; + } + let removed = + vacuum_by_meta(&temporary_dir, de.path(), limit, &mut removed_total) + .await?; + limit = limit.saturating_sub(removed); + gc_metas.insert(name.to_owned()); + } else { + temp_files.push(de.path().to_owned()); + if temp_files.len() >= limit { + break; + } + } + } else { + let removed = vacuum_by_meta( + &temporary_dir, + &format!("{}{}", de.path().trim_end_matches('/'), SPILL_META_SUFFIX), limit, - timestamp, - life_mills, + &mut removed_total, ) .await?; + // by meta + if removed > 0 { + let meta_name = format!("{}{}", name, SPILL_META_SUFFIX); + if gc_metas.contains(&meta_name) { + continue; + } - if removed_temp_files >= limit { - end_of_stream = false; - break; + limit = limit.saturating_sub(removed); + gc_metas.insert(meta_name); + } else { + // by list + let removed = + vacuum_by_list_dir(de.path(), limit, &mut removed_total).await?; + limit = limit.saturating_sub(removed); } } - EntryMode::FILE => { - if let Some(modified) = meta.last_modified() { - if timestamp - modified.timestamp_millis() >= expire_time { - removed_temp_files += 1; - remove_temp_files_path.push(de.path().to_string()); - batch_size += meta.content_length() as usize; - - if removed_temp_files >= limit || remove_temp_files_path.len() >= 1000 { - end_of_stream = false; - break; - } - } - } + if limit == 0 { + break; } - EntryMode::Unknown => unreachable!(), } + Ok(None) => break, + Err(e) if e.kind() == ErrorKind::NotFound => continue, + Err(e) => return Err(e.into()), } + } - if !remove_temp_files_path.is_empty() { - let cur_removed = remove_temp_files_path.len(); - total_cleaned_size += batch_size; - operator - .remove_via(stream::iter(remove_temp_files_path)) - .await?; - - // Log for the current batch - info!( - "vacuum removed {} temp files in {:?}(elapsed: {} seconds), batch size: {} bytes", - cur_removed, - temporary_dir, - instant.elapsed().as_secs(), - batch_size - ); - - // Log for the total progress - info!( - "Total progress: {} files removed, total cleaned size: {} bytes, total batch size: {} bytes", - removed_temp_files, - total_cleaned_size, - total_batch_size + batch_size - ); - } - - total_batch_size += batch_size; - - if end_of_stream { - break; - } + if temp_files.len() <= limit { + removed_total += temp_files.len(); + let _ = operator + .remove_via(stream::iter(temp_files.into_iter())) + .await; } // Log for the final total progress info!( - "vacuum finished, total cleaned {} files, total cleaned size: {} bytes, total elapsed: {} seconds", - removed_temp_files, - total_cleaned_size, + "vacuum command finished, total cleaned {} files, total elapsed: {} seconds", + removed_total, start_time.elapsed().as_secs() ); + Ok(removed_total) +} + +async fn vacuum_query_hook( + abort_checker: AbortChecker, + temporary_dir: &str, + nodes_num: usize, + query_id: &str, + mut limit: usize, +) -> Result { + let mut removed_total = 0; - Ok(removed_temp_files) + for i in 0..nodes_num { + if limit == 0 { + break; + } + abort_checker.try_check_aborting()?; + let meta_file_path = format!("{}/{}_{}{}", temporary_dir, query_id, i, SPILL_META_SUFFIX); + let removed = + vacuum_by_meta(temporary_dir, &meta_file_path, limit, &mut removed_total).await?; + limit = limit.saturating_sub(removed); + } + Ok(removed_total) } -async fn vacuum_finished_query( - abort_checker: &AbortChecker, - total_instant: Instant, - removed_temp_files: &mut usize, - total_cleaned_size: &mut usize, - batch_size: &mut usize, - parent: &Entry, +async fn vacuum_by_meta( + temporary_dir: &str, + meta_file_path: &str, limit: usize, - timestamp: i64, - life_mills: i64, -) -> Result<()> { + removed_total: &mut usize, +) -> Result { let operator = DataOperator::instance().operator(); + let meta: Buffer; + let r = operator.read(meta_file_path).await; + match r { + Ok(r) => meta = r, + Err(e) if e.kind() == ErrorKind::NotFound => return Ok(0), + Err(e) => return Err(e.into()), + } + let meta = meta.to_bytes(); + let start_time = Instant::now(); + let files: Vec = meta.lines().map(|x| Ok(x?)).collect::>>()?; - let mut all_files_removed = true; - let mut ds = operator - .lister_with(parent.path()) - .metakey(Metakey::Mode | Metakey::LastModified) - .await?; - - while *removed_temp_files < limit { - let instant = Instant::now(); - - let mut end_of_stream = true; - let mut all_each_files_removed = true; - let mut remove_temp_files_path = Vec::with_capacity(1001); - - while let Some(de) = ds.try_next().await? { - abort_checker.try_check_aborting()?; - if de.path() == parent.path() { - continue; - } - - let meta = de.metadata(); - if meta.is_file() { - if de.name() == "finished" { - continue; - } - - if let Some(modified) = meta.last_modified() { - if timestamp - modified.timestamp_millis() >= life_mills { - *removed_temp_files += 1; - remove_temp_files_path.push(de.path().to_string()); - *batch_size += meta.content_length() as usize; - - if *removed_temp_files >= limit || remove_temp_files_path.len() >= 1000 { - end_of_stream = false; - break; - } - - continue; - } - } - } - - all_each_files_removed = false; - } + let (to_be_removed, remain) = files.split_at(limit.min(files.len())); + let remain = remain.to_vec(); - all_files_removed &= all_each_files_removed; + let cur_removed = to_be_removed.len(); + let remove_temp_files_path = stream::iter( + files + .into_iter() + .filter(|f| f.starts_with(temporary_dir)) + .take(limit), + ); + let _ = operator.remove_via(remove_temp_files_path).await; - if !remove_temp_files_path.is_empty() { - let cur_removed = remove_temp_files_path.len(); - *total_cleaned_size += *batch_size; - operator - .remove_via(stream::iter(remove_temp_files_path)) - .await?; + // update unfinished meta file + if !remain.is_empty() { + let remain = remain.join("\n"); + operator.write(meta_file_path, remain).await?; + } - // Log for the current batch - info!( - "vacuum removed {} temp files in {:?}(elapsed: {} seconds), batch size: {} bytes", - cur_removed, - parent.path(), - instant.elapsed().as_secs(), - *batch_size - ); + *removed_total += cur_removed; + // Log for the current batch + info!( + "Total progress: {} files removed, now vacuum removed {} temp files from meta: {}(elapsed: {} seconds)", + *removed_total, + cur_removed, + meta_file_path, + start_time.elapsed().as_secs(), + ); + + Ok(cur_removed) +} - // Log for the total progress - info!( - "Total progress: {} files removed, total cleaned size: {} bytes, total elapsed: {} seconds", - *removed_temp_files, - *total_cleaned_size, - total_instant.elapsed().as_secs() - ); - } +async fn vacuum_by_list_dir( + dir_path: &str, + limit: usize, + removed_total: &mut usize, +) -> Result { + let start_time = Instant::now(); + let operator = DataOperator::instance().operator(); + let mut r = operator.lister_with(dir_path).recursive(true).await?; + let mut batches = vec![]; - if end_of_stream { - break; + while let Some(entry) = r.try_next().await? { + // Let's remove it at last + if entry.path() == dir_path { + continue; } + let path = entry.path().to_string(); + batches.push(path); } + batches.push(dir_path.to_owned()); - if all_files_removed { - operator - .delete(&format!("{}finished", parent.path())) - .await?; - operator.delete(parent.path()).await?; - } + let cur_removed = batches.len().min(limit); + let _ = operator + .remove_via(stream::iter(batches.into_iter().take(limit))) + .await; - Ok(()) + *removed_total += cur_removed; + // Log for the current batch + info!( + "Total progress: {} files removed, now vacuum removed {} temp files from list query dir: {}(elapsed: {} seconds)", + *removed_total, + cur_removed, + dir_path, + start_time.elapsed().as_secs(), + ); + + Ok(cur_removed) } diff --git a/src/query/ee/tests/it/storages/fuse/operations/vacuum.rs b/src/query/ee/tests/it/storages/fuse/operations/vacuum.rs index db935672a8064..d7430eab68254 100644 --- a/src/query/ee/tests/it/storages/fuse/operations/vacuum.rs +++ b/src/query/ee/tests/it/storages/fuse/operations/vacuum.rs @@ -29,6 +29,7 @@ use databend_enterprise_query::storages::fuse::operations::vacuum_drop_tables::d use databend_enterprise_query::storages::fuse::operations::vacuum_drop_tables::vacuum_drop_tables_by_table_info; use databend_enterprise_query::storages::fuse::operations::vacuum_temporary_files::do_vacuum_temporary_files; use databend_enterprise_query::storages::fuse::vacuum_drop_tables; +use databend_enterprise_vacuum_handler::vacuum_handler::VacuumTempOptions; use databend_query::test_kits::*; use databend_storages_common_io::Files; use databend_storages_common_table_meta::table::OPT_KEY_DATABASE_ID; @@ -147,7 +148,7 @@ async fn test_do_vacuum_temporary_files() -> Result<()> { let r = do_vacuum_temporary_files( Arc::new(AbortRightNow), "test_dir/".to_string(), - Some(Duration::from_secs(2)), + &VacuumTempOptions::VacuumCommand(Some(Duration::from_secs(2))), 1, ) .await; @@ -159,7 +160,7 @@ async fn test_do_vacuum_temporary_files() -> Result<()> { do_vacuum_temporary_files( no_abort.clone(), "test_dir/".to_string(), - Some(Duration::from_secs(2)), + &VacuumTempOptions::VacuumCommand(Some(Duration::from_secs(2))), 1, ) .await?; @@ -176,7 +177,7 @@ async fn test_do_vacuum_temporary_files() -> Result<()> { do_vacuum_temporary_files( no_abort.clone(), "test_dir/".to_string(), - Some(Duration::from_secs(2)), + &VacuumTempOptions::VacuumCommand(Some(Duration::from_secs(2))), 2, ) .await?; @@ -187,7 +188,7 @@ async fn test_do_vacuum_temporary_files() -> Result<()> { do_vacuum_temporary_files( no_abort.clone(), "test_dir/".to_string(), - Some(Duration::from_secs(3)), + &VacuumTempOptions::VacuumCommand(Some(Duration::from_secs(3))), 1000, ) .await?; diff --git a/src/query/ee_features/vacuum_handler/src/vacuum_handler.rs b/src/query/ee_features/vacuum_handler/src/vacuum_handler.rs index 7f2640ea0bf2c..e83b94181de94 100644 --- a/src/query/ee_features/vacuum_handler/src/vacuum_handler.rs +++ b/src/query/ee_features/vacuum_handler/src/vacuum_handler.rs @@ -51,11 +51,18 @@ pub trait VacuumHandler: Sync + Send { &self, abort_checker: AbortChecker, temporary_dir: String, - retain: Option, + options: &VacuumTempOptions, vacuum_limit: usize, ) -> Result; } +#[derive(Debug, Clone)] +pub enum VacuumTempOptions { + // nodes_num, query_id + QueryHook(usize, String), + VacuumCommand(Option), +} + pub struct VacuumHandlerWrapper { handler: Box, } @@ -95,11 +102,11 @@ impl VacuumHandlerWrapper { &self, abort_checker: AbortChecker, temporary_dir: String, - retain: Option, + options: &VacuumTempOptions, vacuum_limit: usize, ) -> Result { self.handler - .do_vacuum_temporary_files(abort_checker, temporary_dir, retain, vacuum_limit) + .do_vacuum_temporary_files(abort_checker, temporary_dir, options, vacuum_limit) .await } } diff --git a/src/query/pipeline/core/src/lib.rs b/src/query/pipeline/core/src/lib.rs index 8e0a3ab7dd59b..d064965129771 100644 --- a/src/query/pipeline/core/src/lib.rs +++ b/src/query/pipeline/core/src/lib.rs @@ -41,7 +41,6 @@ pub use pipe::PipeItem; pub use pipe::SinkPipeBuilder; pub use pipe::SourcePipeBuilder; pub use pipe::TransformPipeBuilder; -pub use pipeline::query_spill_prefix; pub use pipeline::DynTransformBuilder; pub use pipeline::Pipeline; pub use processors::PlanProfile; diff --git a/src/query/pipeline/core/src/pipeline.rs b/src/query/pipeline/core/src/pipeline.rs index 9d74f386490e1..8072b7b997b88 100644 --- a/src/query/pipeline/core/src/pipeline.rs +++ b/src/query/pipeline/core/src/pipeline.rs @@ -579,10 +579,3 @@ impl Drop for Pipeline { }) } } - -pub fn query_spill_prefix(tenant: &str, query_id: &str) -> String { - match query_id.is_empty() { - true => format!("_query_spill/{}", tenant), - false => format!("_query_spill/{}/{}", tenant, query_id), - } -} diff --git a/src/query/service/src/clusters/cluster.rs b/src/query/service/src/clusters/cluster.rs index 8ab381200664d..83065031d5fe5 100644 --- a/src/query/service/src/clusters/cluster.rs +++ b/src/query/service/src/clusters/cluster.rs @@ -79,6 +79,7 @@ pub trait ClusterHelper { fn is_empty(&self) -> bool; fn is_local(&self, node: &NodeInfo) -> bool; fn local_id(&self) -> String; + fn ordered_index(&self) -> usize; fn get_nodes(&self) -> Vec>; @@ -115,6 +116,15 @@ impl ClusterHelper for Cluster { self.local_id.clone() } + fn ordered_index(&self) -> usize { + let mut nodes = self.get_nodes(); + nodes.sort_by(|a, b| a.id.cmp(&b.id)); + nodes + .iter() + .position(|x| x.id == self.local_id) + .unwrap_or(0) + } + fn get_nodes(&self) -> Vec> { self.nodes.to_vec() } diff --git a/src/query/service/src/interpreters/hook/vacuum_hook.rs b/src/query/service/src/interpreters/hook/vacuum_hook.rs index 31e55c3c5e7fc..1e1907b549ab9 100644 --- a/src/query/service/src/interpreters/hook/vacuum_hook.rs +++ b/src/query/service/src/interpreters/hook/vacuum_hook.rs @@ -13,27 +13,26 @@ // limitations under the License. use std::sync::Arc; -use std::time::Duration; use databend_common_base::runtime::GlobalIORuntime; use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; use databend_common_license::license::Feature::Vacuum; use databend_common_license::license_manager::LicenseManagerSwitch; -use databend_common_pipeline_core::query_spill_prefix; use databend_common_storage::DataOperator; use databend_enterprise_vacuum_handler::get_vacuum_handler; +use databend_enterprise_vacuum_handler::vacuum_handler::VacuumTempOptions; use databend_storages_common_cache::TempDirManager; use log::warn; use opendal::Buffer; use rand::Rng; +use crate::clusters::ClusterHelper; use crate::sessions::QueryContext; pub fn hook_vacuum_temp_files(query_ctx: &Arc) -> Result<()> { - let tenant = query_ctx.get_tenant(); let settings = query_ctx.get_settings(); - let spill_prefix = query_spill_prefix(tenant.tenant_name(), &query_ctx.get_id()); + let spill_prefix = query_ctx.query_tenant_spill_prefix(); let vacuum_limit = settings.get_max_vacuum_temp_files_after_query()?; // disable all s3 operator if vacuum limit = 0 @@ -44,13 +43,16 @@ pub fn hook_vacuum_temp_files(query_ctx: &Arc) -> Result<()> { { let handler = get_vacuum_handler(); + let cluster_nodes = query_ctx.get_cluster().get_nodes().len(); + let query_id = query_ctx.get_id(); + let abort_checker = query_ctx.clone().get_abort_checker(); let _ = GlobalIORuntime::instance().block_on(async move { let removed_files = handler .do_vacuum_temporary_files( abort_checker, spill_prefix.clone(), - Some(Duration::from_secs(0)), + &VacuumTempOptions::QueryHook(cluster_nodes, query_id), vacuum_limit as usize, ) .await; diff --git a/src/query/service/src/interpreters/interpreter.rs b/src/query/service/src/interpreters/interpreter.rs index 1c79b737c5b56..3948cd47f256e 100644 --- a/src/query/service/src/interpreters/interpreter.rs +++ b/src/query/service/src/interpreters/interpreter.rs @@ -232,7 +232,7 @@ async fn plan_sql( let mut planner = Planner::new_with_query_executor( ctx.clone(), Arc::new(ServiceQueryExecutor::new(QueryContext::create_from( - ctx.clone(), + ctx.as_ref(), ))), ); @@ -292,7 +292,6 @@ fn attach_query_hash(ctx: &Arc, stmt: &mut Option, sql: pub fn on_execution_finished(info: &ExecutionInfo, query_ctx: Arc) -> Result<()> { let mut has_profiles = false; query_ctx.add_query_profiles(&info.profiling); - let query_profiles = query_ctx.get_query_profiles(); if !query_profiles.is_empty() { has_profiles = true; @@ -320,9 +319,7 @@ pub fn on_execution_finished(info: &ExecutionInfo, query_ctx: Arc) } hook_clear_m_cte_temp_table(&query_ctx)?; - hook_vacuum_temp_files(&query_ctx)?; - hook_disk_temp_dir(&query_ctx)?; let err_opt = match &info.res { diff --git a/src/query/service/src/interpreters/interpreter_table_analyze.rs b/src/query/service/src/interpreters/interpreter_table_analyze.rs index f3a5d2b53bccf..e6f3867ac2c80 100644 --- a/src/query/service/src/interpreters/interpreter_table_analyze.rs +++ b/src/query/service/src/interpreters/interpreter_table_analyze.rs @@ -228,7 +228,7 @@ impl Interpreter for AnalyzeTableInterpreter { histogram_plan = remove_exchange(histogram_plan); } let mut histogram_build_res = build_query_pipeline( - &QueryContext::create_from(self.ctx.clone()), + &QueryContext::create_from(self.ctx.as_ref()), &bind_context.columns, &histogram_plan, false, diff --git a/src/query/service/src/interpreters/interpreter_vacuum_temporary_files.rs b/src/query/service/src/interpreters/interpreter_vacuum_temporary_files.rs index 56345773ce6c5..78c4757b4c6e4 100644 --- a/src/query/service/src/interpreters/interpreter_vacuum_temporary_files.rs +++ b/src/query/service/src/interpreters/interpreter_vacuum_temporary_files.rs @@ -20,9 +20,9 @@ use databend_common_expression::DataBlock; use databend_common_expression::FromData; use databend_common_license::license::Feature::Vacuum; use databend_common_license::license_manager::LicenseManagerSwitch; -use databend_common_pipeline_core::query_spill_prefix; use databend_common_sql::plans::VacuumTemporaryFilesPlan; use databend_enterprise_vacuum_handler::get_vacuum_handler; +use databend_enterprise_vacuum_handler::vacuum_handler::VacuumTempOptions; use crate::interpreters::Interpreter; use crate::pipelines::PipelineBuildResult; @@ -57,12 +57,12 @@ impl Interpreter for VacuumTemporaryFilesInterpreter { let handler = get_vacuum_handler(); - let temporary_files_prefix = query_spill_prefix(self.ctx.get_tenant().tenant_name(), ""); + let temporary_files_prefix = self.ctx.query_tenant_spill_prefix(); let removed_files = handler .do_vacuum_temporary_files( self.ctx.clone().get_abort_checker(), temporary_files_prefix, - self.plan.retain, + &VacuumTempOptions::VacuumCommand(self.plan.retain), self.plan.limit.map(|x| x as usize).unwrap_or(usize::MAX), ) .await?; diff --git a/src/query/service/src/pipelines/builders/builder_aggregate.rs b/src/query/service/src/pipelines/builders/builder_aggregate.rs index 2c2223efd2f27..1bd90e015149e 100644 --- a/src/query/service/src/pipelines/builders/builder_aggregate.rs +++ b/src/query/service/src/pipelines/builders/builder_aggregate.rs @@ -23,7 +23,6 @@ use databend_common_expression::LimitType; use databend_common_expression::SortColumnDescription; use databend_common_functions::aggregates::AggregateFunctionFactory; use databend_common_pipeline_core::processors::ProcessorPtr; -use databend_common_pipeline_core::query_spill_prefix; use databend_common_pipeline_transforms::processors::TransformPipelineHelper; use databend_common_pipeline_transforms::processors::TransformSortPartial; use databend_common_sql::executor::physical_plans::AggregateExpand; @@ -162,17 +161,19 @@ impl PipelineBuilder { // If cluster mode, spill write will be completed in exchange serialize, because we need scatter the block data first if !self.is_exchange_neighbor { let operator = DataOperator::instance().operator(); - let location_prefix = - query_spill_prefix(self.ctx.get_tenant().tenant_name(), &self.ctx.get_id()); + let location_prefix = self.ctx.query_id_spill_prefix(); + self.main_pipeline.add_transform(|input, output| { - Ok(ProcessorPtr::create(TransformAggregateSpillWriter::create( - self.ctx.clone(), - input, - output, - operator.clone(), - params.clone(), - location_prefix.clone(), - ))) + Ok(ProcessorPtr::create( + TransformAggregateSpillWriter::try_create( + self.ctx.clone(), + input, + output, + operator.clone(), + params.clone(), + location_prefix.clone(), + )?, + )) })?; } diff --git a/src/query/service/src/pipelines/builders/builder_join.rs b/src/query/service/src/pipelines/builders/builder_join.rs index 08fe01e8eb6ec..5dc183497928d 100644 --- a/src/query/service/src/pipelines/builders/builder_join.rs +++ b/src/query/service/src/pipelines/builders/builder_join.rs @@ -65,7 +65,7 @@ impl PipelineBuilder { range_join: &RangeJoin, state: Arc, ) -> Result<()> { - let right_side_context = QueryContext::create_from(self.ctx.clone()); + let right_side_context = QueryContext::create_from(self.ctx.as_ref()); let mut right_side_builder = PipelineBuilder::create( self.func_ctx.clone(), self.settings.clone(), @@ -134,7 +134,7 @@ impl PipelineBuilder { hash_join_plan: &HashJoin, join_state: Arc, ) -> Result<()> { - let build_side_context = QueryContext::create_from(self.ctx.clone()); + let build_side_context = QueryContext::create_from(self.ctx.as_ref()); let mut build_side_builder = PipelineBuilder::create( self.func_ctx.clone(), self.settings.clone(), diff --git a/src/query/service/src/pipelines/builders/builder_sort.rs b/src/query/service/src/pipelines/builders/builder_sort.rs index ced60f0c5c9eb..3b2300f346201 100644 --- a/src/query/service/src/pipelines/builders/builder_sort.rs +++ b/src/query/service/src/pipelines/builders/builder_sort.rs @@ -19,7 +19,6 @@ use databend_common_expression::DataSchemaRef; use databend_common_expression::LimitType; use databend_common_expression::SortColumnDescription; use databend_common_pipeline_core::processors::ProcessorPtr; -use databend_common_pipeline_core::query_spill_prefix; use databend_common_pipeline_core::Pipeline; use databend_common_pipeline_transforms::processors::add_k_way_merge_sort; use databend_common_pipeline_transforms::processors::sort::utils::add_order_field; @@ -277,12 +276,11 @@ impl SortPipelineBuilder { if may_spill { let schema = add_order_field(sort_merge_output_schema.clone(), &self.sort_desc); + let location_prefix = self.ctx.query_id_spill_prefix(); + let config = SpillerConfig { spiller_type: SpillerType::OrderBy, - location_prefix: query_spill_prefix( - self.ctx.get_tenant().tenant_name(), - &self.ctx.get_id(), - ), + location_prefix, disk_spill: None, use_parquet: settings.get_spilling_file_format()?.is_parquet(), }; diff --git a/src/query/service/src/pipelines/builders/builder_union_all.rs b/src/query/service/src/pipelines/builders/builder_union_all.rs index ab4b572519ba4..61890313a3673 100644 --- a/src/query/service/src/pipelines/builders/builder_union_all.rs +++ b/src/query/service/src/pipelines/builders/builder_union_all.rs @@ -71,7 +71,7 @@ impl PipelineBuilder { } fn expand_union_all(&mut self, input: &PhysicalPlan) -> Result> { - let union_ctx = QueryContext::create_from(self.ctx.clone()); + let union_ctx = QueryContext::create_from(self.ctx.as_ref()); let mut pipeline_builder = PipelineBuilder::create( self.func_ctx.clone(), self.settings.clone(), diff --git a/src/query/service/src/pipelines/pipeline_builder.rs b/src/query/service/src/pipelines/pipeline_builder.rs index 59b4f57e156fa..8a87ce1e7eb7c 100644 --- a/src/query/service/src/pipelines/pipeline_builder.rs +++ b/src/query/service/src/pipelines/pipeline_builder.rs @@ -20,8 +20,10 @@ use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::DataField; use databend_common_expression::FunctionContext; +use databend_common_pipeline_core::always_callback; use databend_common_pipeline_core::processors::PlanScope; use databend_common_pipeline_core::processors::PlanScopeGuard; +use databend_common_pipeline_core::ExecutionInfo; use databend_common_pipeline_core::Pipeline; use databend_common_settings::Settings; use databend_common_sql::executor::PhysicalPlan; @@ -91,6 +93,13 @@ impl PipelineBuilder { } } + // unload spill metas + self.main_pipeline + .set_on_finished(always_callback(move |_info: &ExecutionInfo| { + self.ctx.unload_spill_meta(); + Ok(()) + })); + Ok(PipelineBuildResult { main_pipeline: self.main_pipeline, sources_pipelines: self.pipelines, diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs index 83d5dd0f791fa..b19f952c78eca 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs @@ -15,7 +15,6 @@ use std::sync::Arc; use bumpalo::Bump; -use databend_common_catalog::table_context::TableContext; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::BlockMetaInfoDowncast; @@ -24,7 +23,6 @@ use databend_common_expression::PartitionedPayload; use databend_common_expression::Payload; use databend_common_expression::PayloadFlushState; use databend_common_pipeline_core::processors::ProcessorPtr; -use databend_common_pipeline_core::query_spill_prefix; use databend_common_pipeline_core::Pipeline; use databend_common_settings::FlightCompression; use databend_common_storage::DataOperator; @@ -211,7 +209,6 @@ impl FlightScatter for HashTableHashScatter { pub struct AggregateInjector { ctx: Arc, - tenant: String, aggregator_params: Arc, } @@ -220,10 +217,8 @@ impl AggregateInjector { ctx: Arc, params: Arc, ) -> Arc { - let tenant = ctx.get_tenant(); Arc::new(AggregateInjector { ctx, - tenant: tenant.tenant_name().to_string(), aggregator_params: params, }) } @@ -259,17 +254,19 @@ impl ExchangeInjector for AggregateInjector { let params = self.aggregator_params.clone(); let operator = DataOperator::instance().operator(); - let location_prefix = query_spill_prefix(&self.tenant, &self.ctx.get_id()); + let location_prefix = self.ctx.query_id_spill_prefix(); pipeline.add_transform(|input, output| { - Ok(ProcessorPtr::create(TransformAggregateSpillWriter::create( - self.ctx.clone(), - input, - output, - operator.clone(), - params.clone(), - location_prefix.clone(), - ))) + Ok(ProcessorPtr::create( + TransformAggregateSpillWriter::try_create( + self.ctx.clone(), + input, + output, + operator.clone(), + params.clone(), + location_prefix.clone(), + )?, + )) })?; pipeline.add_transform(|input, output| { @@ -285,7 +282,7 @@ impl ExchangeInjector for AggregateInjector { ) -> Result<()> { let params = self.aggregator_params.clone(); let operator = DataOperator::instance().operator(); - let location_prefix = query_spill_prefix(&self.tenant, &self.ctx.get_id()); + let location_prefix = self.ctx.query_id_spill_prefix(); let schema = shuffle_params.schema.clone(); let local_id = &shuffle_params.executor_id; @@ -297,7 +294,7 @@ impl ExchangeInjector for AggregateInjector { pipeline.add_transform(|input, output| { Ok(ProcessorPtr::create( - TransformExchangeAggregateSerializer::create( + TransformExchangeAggregateSerializer::try_create( self.ctx.clone(), input, output, @@ -307,7 +304,7 @@ impl ExchangeInjector for AggregateInjector { compression, schema.clone(), local_pos, - ), + )?, )) })?; diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_spill_writer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_spill_writer.rs index 5bfba9d7f2bb8..d37051de63f96 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_spill_writer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_spill_writer.rs @@ -16,7 +16,6 @@ use std::any::Any; use std::sync::Arc; use std::time::Instant; -use databend_common_base::base::GlobalUniqName; use databend_common_base::base::ProgressValues; use databend_common_base::runtime::profile::Profile; use databend_common_base::runtime::profile::ProfileStatisticsName; @@ -39,6 +38,9 @@ use crate::pipelines::processors::transforms::aggregator::AggregateMeta; use crate::pipelines::processors::transforms::aggregator::AggregatorParams; use crate::pipelines::processors::transforms::aggregator::BucketSpilledPayload; use crate::sessions::QueryContext; +use crate::spillers::Spiller; +use crate::spillers::SpillerConfig; +use crate::spillers::SpillerType; pub struct TransformAggregateSpillWriter { ctx: Arc, @@ -46,33 +48,39 @@ pub struct TransformAggregateSpillWriter { output: Arc, _params: Arc, - operator: Operator, - location_prefix: String, + spiller: Arc, spilled_block: Option, spilling_meta: Option, spilling_future: Option>>, } impl TransformAggregateSpillWriter { - pub fn create( + pub fn try_create( ctx: Arc, input: Arc, output: Arc, operator: Operator, params: Arc, location_prefix: String, - ) -> Box { - Box::new(TransformAggregateSpillWriter { + ) -> Result> { + let config = SpillerConfig { + spiller_type: SpillerType::Aggregation, + location_prefix, + disk_spill: None, + use_parquet: ctx.get_settings().get_spilling_file_format()?.is_parquet(), + }; + + let spiller = Spiller::create(ctx.clone(), operator, config.clone())?; + Ok(Box::new(TransformAggregateSpillWriter { ctx, input, output, _params: params, - operator, - location_prefix, + spiller: Arc::new(spiller), spilled_block: None, spilling_meta: None, spilling_future: None, - }) + })) } } @@ -148,8 +156,7 @@ impl Processor for TransformAggregateSpillWriter { AggregateMeta::AggregateSpilling(payload) => { self.spilling_future = Some(agg_spilling_aggregate_payload( self.ctx.clone(), - self.operator.clone(), - &self.location_prefix, + self.spiller.clone(), payload, )?); @@ -176,19 +183,16 @@ impl Processor for TransformAggregateSpillWriter { pub fn agg_spilling_aggregate_payload( ctx: Arc, - operator: Operator, - location_prefix: &str, + spiller: Arc, partitioned_payload: PartitionedPayload, ) -> Result>> { - let unique_name = GlobalUniqName::unique(); - let location = format!("{}/{}", location_prefix, unique_name); - let mut write_size = 0; let partition_count = partitioned_payload.partition_count(); let mut write_data = Vec::with_capacity(partition_count); let mut spilled_buckets_payloads = Vec::with_capacity(partition_count); // Record how many rows are spilled. let mut rows = 0; + let location = spiller.create_unique_location(); for (bucket, payload) in partitioned_payload.payloads.into_iter().enumerate() { if payload.len() == 0 { continue; @@ -221,51 +225,38 @@ pub fn agg_spilling_aggregate_payload( Ok(Box::pin(async move { let instant = Instant::now(); - - let mut write_bytes = 0; - if !write_data.is_empty() { - let mut writer = operator - .writer_with(&location) - .chunk(8 * 1024 * 1024) + let (location, write_bytes) = spiller + .spill_stream_aggregate_buffer(Some(location), write_data) .await?; - for write_bucket_data in write_data.into_iter() { - for data in write_bucket_data.into_iter() { - write_bytes += data.len(); - writer.write(data).await?; - } + // perf + { + Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillWriteCount, 1); + Profile::record_usize_profile( + ProfileStatisticsName::RemoteSpillWriteBytes, + write_bytes, + ); + Profile::record_usize_profile( + ProfileStatisticsName::RemoteSpillWriteTime, + instant.elapsed().as_millis() as usize, + ); } - writer.close().await?; - } + { + let progress_val = ProgressValues { + rows, + bytes: write_bytes, + }; + ctx.get_aggregate_spill_progress().incr(&progress_val); + } - // perf - { - Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillWriteCount, 1); - Profile::record_usize_profile( - ProfileStatisticsName::RemoteSpillWriteBytes, - write_bytes, - ); - Profile::record_usize_profile( - ProfileStatisticsName::RemoteSpillWriteTime, - instant.elapsed().as_millis() as usize, + info!( + "Write aggregate spill {} successfully, elapsed: {:?}", + location, + instant.elapsed() ); } - { - let progress_val = ProgressValues { - rows, - bytes: write_bytes, - }; - ctx.get_aggregate_spill_progress().incr(&progress_val); - } - - info!( - "Write aggregate spill {} successfully, elapsed: {:?}", - location, - instant.elapsed() - ); - Ok(DataBlock::empty_with_meta(AggregateMeta::create_spilled( spilled_buckets_payloads, ))) diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs index 64422ad916870..65e9e5cdc513a 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs @@ -17,7 +17,6 @@ use std::time::Instant; use arrow_ipc::writer::IpcWriteOptions; use arrow_ipc::CompressionType; -use databend_common_base::base::GlobalUniqName; use databend_common_base::base::ProgressValues; use databend_common_base::runtime::profile::Profile; use databend_common_base::runtime::profile::ProfileStatisticsName; @@ -57,20 +56,22 @@ use crate::pipelines::processors::transforms::aggregator::SerializeAggregateStre use crate::servers::flight::v1::exchange::serde::serialize_block; use crate::servers::flight::v1::exchange::ExchangeShuffleMeta; use crate::sessions::QueryContext; +use crate::spillers::Spiller; +use crate::spillers::SpillerConfig; +use crate::spillers::SpillerType; pub struct TransformExchangeAggregateSerializer { ctx: Arc, local_pos: usize, options: IpcWriteOptions, - operator: Operator, - location_prefix: String, params: Arc, + spiller: Arc, } impl TransformExchangeAggregateSerializer { #[allow(clippy::too_many_arguments)] - pub fn create( + pub fn try_create( ctx: Arc, input: Arc, output: Arc, @@ -81,7 +82,7 @@ impl TransformExchangeAggregateSerializer { compression: Option, _schema: DataSchemaRef, local_pos: usize, - ) -> Box { + ) -> Result> { let compression = match compression { None => None, Some(compression) => match compression { @@ -89,17 +90,27 @@ impl TransformExchangeAggregateSerializer { FlightCompression::Zstd => Some(CompressionType::ZSTD), }, }; - - BlockMetaTransformer::create(input, output, TransformExchangeAggregateSerializer { - ctx, - params, - operator, + let config = SpillerConfig { + spiller_type: SpillerType::Aggregation, location_prefix, - local_pos, - options: IpcWriteOptions::default() - .try_with_compression(compression) - .unwrap(), - }) + disk_spill: None, + use_parquet: ctx.get_settings().get_spilling_file_format()?.is_parquet(), + }; + + let spiller = Spiller::create(ctx.clone(), operator, config.clone())?; + Ok(BlockMetaTransformer::create( + input, + output, + TransformExchangeAggregateSerializer { + ctx, + params, + local_pos, + spiller: spiller.into(), + options: IpcWriteOptions::default() + .try_with_compression(compression) + .unwrap(), + }, + )) } } @@ -125,14 +136,12 @@ impl BlockMetaTransform for TransformExchangeAggregateSeria match index == self.local_pos { true => local_agg_spilling_aggregate_payload( self.ctx.clone(), - self.operator.clone(), - &self.location_prefix, + self.spiller.clone(), payload, )?, - false => agg_spilling_aggregate_payload( + false => exchange_agg_spilling_aggregate_payload( self.ctx.clone(), - self.operator.clone(), - &self.location_prefix, + self.spiller.clone(), payload, )?, }, @@ -175,15 +184,11 @@ impl BlockMetaTransform for TransformExchangeAggregateSeria } } -fn agg_spilling_aggregate_payload( +fn exchange_agg_spilling_aggregate_payload( ctx: Arc, - operator: Operator, - location_prefix: &str, + spiller: Arc, partitioned_payload: PartitionedPayload, ) -> Result>> { - let unique_name = GlobalUniqName::unique(); - let location = format!("{}/{}", location_prefix, unique_name); - let partition_count = partitioned_payload.partition_count(); let mut write_size = 0; let mut write_data = Vec::with_capacity(partition_count); @@ -225,21 +230,9 @@ fn agg_spilling_aggregate_payload( Ok(Box::pin(async move { if !write_data.is_empty() { let instant = Instant::now(); - - let mut write_bytes = 0; - let mut writer = operator - .writer_with(&location) - .chunk(8 * 1024 * 1024) + let (location, write_bytes) = spiller + .spill_stream_aggregate_buffer(None, write_data) .await?; - for write_bucket_data in write_data.into_iter() { - for data in write_bucket_data.into_iter() { - write_bytes += data.len(); - writer.write(data).await?; - } - } - - writer.close().await?; - // perf { Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillWriteCount, 1); diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_spiller.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_spiller.rs index 23f0472489ed3..fa6d4a9abf954 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_spiller.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_spiller.rs @@ -20,7 +20,6 @@ use databend_common_expression::DataBlock; use databend_common_expression::Expr; use databend_common_expression::FunctionContext; use databend_common_expression::HashMethodKind; -use databend_common_pipeline_core::query_spill_prefix; use databend_common_sql::plans::JoinType; use databend_common_storage::DataOperator; use databend_common_storages_fuse::TableContext; @@ -67,9 +66,12 @@ impl HashJoinSpiller { } else { SpillerType::HashJoinProbe }; + + let location_prefix = ctx.query_id_spill_prefix(); + let spill_config = SpillerConfig { spiller_type, - location_prefix: query_spill_prefix(ctx.get_tenant().tenant_name(), &ctx.get_id()), + location_prefix, disk_spill: None, use_parquet: ctx.get_settings().get_spilling_file_format()?.is_parquet(), }; @@ -183,9 +185,10 @@ impl HashJoinSpiller { } } else { // Cross join. - let spilled_files = self.spiller.spilled_files(); - if !spilled_files.is_empty() { - let file_index = self.next_restore_file; + let spilled_files = self.spiller.private_spilled_files(); + let file_index = self.next_restore_file; + + if file_index < spilled_files.len() { let spilled_data = self .spiller .read_spilled_file(&spilled_files[file_index]) @@ -246,7 +249,7 @@ impl HashJoinSpiller { } pub fn has_next_restore_file(&self) -> bool { - self.next_restore_file < self.spiller.spilled_files().len() + self.next_restore_file < self.spiller.private_spilled_files().len() || (self.next_restore_file == 0 && !self.partition_buffer.is_partition_empty(0)) } diff --git a/src/query/service/src/pipelines/processors/transforms/transform_sort_spill.rs b/src/query/service/src/pipelines/processors/transforms/transform_sort_spill.rs index c65d1e964b0b7..1b4848c8fbb6f 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_sort_spill.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_sort_spill.rs @@ -159,8 +159,6 @@ where } Some(None) => unreachable!(), None => { - // If we get a memory block at initial state, it means we will never spill data. - debug_assert!(self.spiller.columns_layout.is_empty()); self.output_block(block); self.state = State::NoSpill; Ok(Event::NeedConsume) @@ -296,9 +294,6 @@ where let spiller_snapshot = Arc::new(self.spiller.clone()); for _ in 0..num_streams - streams.len() { let files = self.unmerged_blocks.pop_front().unwrap(); - for file in files.iter() { - self.spiller.columns_layout.remove(file); - } let stream = BlockStream::Spilled((files, spiller_snapshot.clone())); streams.push(stream); } diff --git a/src/query/service/src/pipelines/processors/transforms/transform_stream_sort_spill.rs b/src/query/service/src/pipelines/processors/transforms/transform_stream_sort_spill.rs index 7078ca4ee2a77..1a338592cdafd 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_stream_sort_spill.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_stream_sort_spill.rs @@ -47,7 +47,6 @@ use databend_common_pipeline_transforms::processors::SortSpillParams; use rand::rngs::StdRng; use rand::SeedableRng; -use crate::spillers::Layout; use crate::spillers::Location; use crate::spillers::Spiller; @@ -694,7 +693,7 @@ where struct SpillableBlock { data: Option, rows: usize, - location: Option<(Location, Layout)>, + location: Option, domain: Column, processed: usize, } @@ -732,7 +731,7 @@ impl SpillableBlock { async fn spill(&mut self, spiller: &Spiller) -> Result<()> { let data = self.data.take().unwrap(); if self.location.is_none() { - let location = spiller.spill_unmanage(vec![data]).await?; + let location = spiller.spill(vec![data]).await?; self.location = Some(location); } Ok(()) @@ -829,10 +828,7 @@ impl BoundBlockStream { } let location = block.location.as_ref().unwrap(); - let data = self - .spiller - .read_unmanage_spilled_file(&location.0, &location.1) - .await?; + let data = self.spiller.read_spilled_file(location).await?; block.data = Some(if block.processed != 0 { debug_assert_eq!(block.rows + block.processed, data.num_rows()); data.slice(block.processed..data.num_rows()) diff --git a/src/query/service/src/pipelines/processors/transforms/window/partition/transform_window_partition_collect.rs b/src/query/service/src/pipelines/processors/transforms/window/partition/transform_window_partition_collect.rs index 746735aa97436..e6c6ed20be85e 100644 --- a/src/query/service/src/pipelines/processors/transforms/window/partition/transform_window_partition_collect.rs +++ b/src/query/service/src/pipelines/processors/transforms/window/partition/transform_window_partition_collect.rs @@ -29,11 +29,9 @@ use databend_common_pipeline_core::processors::Event; use databend_common_pipeline_core::processors::InputPort; use databend_common_pipeline_core::processors::OutputPort; use databend_common_pipeline_core::processors::Processor; -use databend_common_pipeline_core::query_spill_prefix; use databend_common_pipeline_transforms::processors::sort_merge; use databend_common_settings::Settings; use databend_common_storage::DataOperator; -use databend_common_storages_fuse::TableContext; use super::WindowPartitionBuffer; use super::WindowPartitionMeta; @@ -115,9 +113,10 @@ impl TransformWindowPartitionCollect { partition_id[*partition] = new_partition_id; } + let location_prefix = ctx.query_id_spill_prefix(); let spill_config = SpillerConfig { spiller_type: SpillerType::Window, - location_prefix: query_spill_prefix(ctx.get_tenant().tenant_name(), &ctx.get_id()), + location_prefix, disk_spill, use_parquet: settings.get_spilling_file_format()?.is_parquet(), }; diff --git a/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs b/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs index 1cbb961f798d7..d158ccf3c9b89 100644 --- a/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs +++ b/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs @@ -965,7 +965,7 @@ impl FragmentCoordinator { if !self.initialized { self.initialized = true; - let pipeline_ctx = QueryContext::create_from(ctx.clone()); + let pipeline_ctx = QueryContext::create_from(ctx.as_ref()); unsafe { pipeline_ctx diff --git a/src/query/service/src/servers/flight/v1/packets/packet_publisher.rs b/src/query/service/src/servers/flight/v1/packets/packet_publisher.rs index 12bb478627759..1b091e396b2ba 100644 --- a/src/query/service/src/servers/flight/v1/packets/packet_publisher.rs +++ b/src/query/service/src/servers/flight/v1/packets/packet_publisher.rs @@ -170,7 +170,7 @@ impl QueryEnv { local_id: GlobalConfig::instance().query.node_id.clone(), }))?; - query_ctx.set_id(self.query_id.clone()); + query_ctx.update_init_query_id(self.query_id.clone()); query_ctx.attach_query_str(self.query_kind, "".to_string()); Ok(query_ctx) diff --git a/src/query/service/src/servers/http/v1/query/http_query.rs b/src/query/service/src/servers/http/v1/query/http_query.rs index 0464b2fe49f0c..d8a1017fbd932 100644 --- a/src/query/service/src/servers/http/v1/query/http_query.rs +++ b/src/query/service/src/servers/http/v1/query/http_query.rs @@ -485,7 +485,7 @@ impl HttpQuery { } // TODO: validate the query_id to be uuid format - ctx.set_id(query_id.clone()); + ctx.update_init_query_id(query_id.clone()); let session_id = session.get_id().clone(); let node_id = ctx.get_cluster().local_id.clone(); diff --git a/src/query/service/src/servers/mysql/mysql_interactive_worker.rs b/src/query/service/src/servers/mysql/mysql_interactive_worker.rs index 0f014f0f77f46..bf64b9bfd9ba9 100644 --- a/src/query/service/src/servers/mysql/mysql_interactive_worker.rs +++ b/src/query/service/src/servers/mysql/mysql_interactive_worker.rs @@ -384,7 +384,7 @@ impl InteractiveWorkerBase { None => { info!("Normal query: {}", query); let context = self.session.create_query_context().await?; - context.set_id(query_id); + context.update_init_query_id(query_id); // Use interpreter_plan_sql, we can write the query log if an error occurs. let (plan, _, _guard) = interpreter_plan_sql(context.clone(), query, true).await?; diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index 140efbf3eb29d..c372651b9dd75 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -35,6 +35,7 @@ use databend_common_base::base::Progress; use databend_common_base::base::ProgressValues; use databend_common_base::runtime::profile::Profile; use databend_common_base::runtime::profile::ProfileStatisticsName; +use databend_common_base::runtime::GlobalIORuntime; use databend_common_base::runtime::TrySpawn; use databend_common_base::JoinHandle; use databend_common_catalog::catalog::CATALOG_DEFAULT; @@ -122,6 +123,7 @@ use xorf::BinaryFuse16; use crate::catalogs::Catalog; use crate::clusters::Cluster; +use crate::clusters::ClusterHelper; use crate::locks::LockManager; use crate::pipelines::executor::PipelineExecutor; use crate::servers::flight::v1::exchange::DataExchangeManager; @@ -139,7 +141,6 @@ const MYSQL_VERSION: &str = "8.0.26"; const CLICKHOUSE_VERSION: &str = "8.12.14"; const COPIED_FILES_FILTER_BATCH_SIZE: usize = 1000; -#[derive(Clone)] pub struct QueryContext { version: String, mysql_version: String, @@ -160,7 +161,7 @@ impl QueryContext { // Each table will create a new QueryContext // So partition_queue could be independent in each table context // see `builder_join.rs` for more details - pub fn create_from(other: Arc) -> Arc { + pub fn create_from(other: &QueryContext) -> Arc { QueryContext::create_from_shared(other.shared.clone()) } @@ -315,7 +316,8 @@ impl QueryContext { self.shared.set_affect(affect) } - pub fn set_id(&self, id: String) { + pub fn update_init_query_id(&self, id: String) { + self.shared.spilled_files.write().clear(); *self.shared.init_query_id.write() = id; } @@ -356,6 +358,44 @@ impl QueryContext { self.shared.clear_tables_cache() } + pub fn add_spill_file( + &self, + location: crate::spillers::Location, + layout: crate::spillers::Layout, + ) { + let mut w = self.shared.spilled_files.write(); + w.insert(location, layout); + } + + pub fn get_spill_layout( + &self, + location: &crate::spillers::Location, + ) -> Option { + let r = self.shared.spilled_files.read(); + r.get(location).cloned() + } + + pub fn get_spilled_files(&self) -> Vec { + let r = self.shared.spilled_files.read(); + r.keys().cloned().collect() + } + + pub fn query_tenant_spill_prefix(&self) -> String { + let tenant = self.get_tenant(); + format!("_query_spill/{}", tenant.tenant_name()) + } + + pub fn query_id_spill_prefix(&self) -> String { + let tenant = self.get_tenant(); + let node_index = self.get_cluster().ordered_index(); + format!( + "_query_spill/{}/{}_{}", + tenant.tenant_name(), + self.get_id(), + node_index + ) + } + #[async_backtrace::framed] async fn get_table_from_shared( &self, @@ -387,6 +427,49 @@ impl QueryContext { }; Ok(table) } + + pub fn unload_spill_meta(&self) { + const SPILL_META_SUFFIX: &str = ".list"; + let mut w = self.shared.spilled_files.write(); + let mut remote_spill_files = w + .iter() + .map(|(k, _)| k) + .filter_map(|l| match l { + crate::spillers::Location::Remote(r) => Some(r), + _ => None, + }) + .cloned() + .collect::>(); + w.clear(); + + if !remote_spill_files.is_empty() { + let location_prefix = self.query_tenant_spill_prefix(); + let node_idx = self.get_cluster().ordered_index(); + let meta_path = format!( + "{}/{}_{}{}", + location_prefix, + self.get_id(), + node_idx, + SPILL_META_SUFFIX + ); + let op = DataOperator::instance().operator(); + // append dir and current meta + remote_spill_files.push(meta_path.clone()); + remote_spill_files.push(format!( + "{}/{}_{}/", + location_prefix, + self.get_id(), + node_idx + )); + let joined_contents = remote_spill_files.join("\n"); + + if let Err(e) = GlobalIORuntime::instance().block_on::<(), (), _>(async move { + Ok(op.write(&meta_path, joined_contents).await?) + }) { + log::error!("create spill meta file error: {}", e); + } + } + } } #[async_trait::async_trait] diff --git a/src/query/service/src/sessions/query_ctx_shared.rs b/src/query/service/src/sessions/query_ctx_shared.rs index 86ffdcfae17f1..d5da138021735 100644 --- a/src/query/service/src/sessions/query_ctx_shared.rs +++ b/src/query/service/src/sessions/query_ctx_shared.rs @@ -142,6 +142,9 @@ pub struct QueryContextShared { pub(in crate::sessions) query_cache_metrics: DataCacheMetrics, pub(in crate::sessions) query_queued_duration: Arc>, + + pub(in crate::sessions) spilled_files: + Arc>>, } impl QueryContextShared { @@ -198,6 +201,7 @@ impl QueryContextShared { merge_into_join: Default::default(), multi_table_insert_status: Default::default(), query_queued_duration: Arc::new(RwLock::new(Duration::from_secs(0))), + spilled_files: Default::default(), })) } diff --git a/src/query/service/src/spillers/serialize.rs b/src/query/service/src/spillers/serialize.rs index 30c851718042a..ad65616469437 100644 --- a/src/query/service/src/spillers/serialize.rs +++ b/src/query/service/src/spillers/serialize.rs @@ -43,6 +43,7 @@ use parquet::format::FileMetaData; pub enum Layout { ArrowIpc(Box<[usize]>), Parquet, + Aggregate, } pub(super) struct BlocksEncoder { @@ -116,6 +117,7 @@ pub(super) fn deserialize_block(columns_layout: &Layout, mut data: Buffer) -> Da DataBlock::new_from_columns(columns) } Layout::Parquet => bare_blocks_from_parquet(Reader(data)).unwrap(), + Layout::Aggregate => unreachable!(), } } diff --git a/src/query/service/src/spillers/spiller.rs b/src/query/service/src/spillers/spiller.rs index 95878cc38d224..3193924569ef2 100644 --- a/src/query/service/src/spillers/spiller.rs +++ b/src/query/service/src/spillers/spiller.rs @@ -35,6 +35,7 @@ use databend_storages_common_cache::TempDir; use databend_storages_common_cache::TempPath; use opendal::Buffer; use opendal::Operator; +use parking_lot::RwLock; use super::serialize::*; use crate::sessions::QueryContext; @@ -46,8 +47,7 @@ pub enum SpillerType { HashJoinProbe, Window, OrderBy, - // Todo: Add more spillers type - // Aggregation + Aggregation, } impl Display for SpillerType { @@ -57,6 +57,7 @@ impl Display for SpillerType { SpillerType::HashJoinProbe => write!(f, "HashJoinProbe"), SpillerType::Window => write!(f, "Window"), SpillerType::OrderBy => write!(f, "OrderBy"), + SpillerType::Aggregation => write!(f, "Aggregation"), } } } @@ -92,11 +93,12 @@ pub struct Spiller { local_operator: Option, use_parquet: bool, _spiller_type: SpillerType, + + // Stores the spilled files that controlled by current spiller + private_spilled_files: Arc>>, pub join_spilling_partition_bits: usize, /// 1 partition -> N partition files pub partition_location: HashMap>, - /// Record columns layout for spilled data, will be used when read data from disk - pub columns_layout: HashMap, /// Record how many bytes have been spilled for each partition. pub partition_spilled_bytes: HashMap, } @@ -132,9 +134,9 @@ impl Spiller { local_operator, use_parquet, _spiller_type: spiller_type, + private_spilled_files: Default::default(), join_spilling_partition_bits: settings.get_join_spilling_partition_bits()?, partition_location: Default::default(), - columns_layout: Default::default(), partition_spilled_bytes: Default::default(), }) } @@ -144,16 +146,18 @@ impl Spiller { } /// Spill some [`DataBlock`] to storage. These blocks will be concat into one. - pub async fn spill(&mut self, data_block: Vec) -> Result { + pub async fn spill(&self, data_block: Vec) -> Result { let (location, layout) = self.spill_unmanage(data_block).await?; // Record columns layout for spilled data. - self.columns_layout.insert(location.clone(), layout); - + self.ctx.add_spill_file(location.clone(), layout.clone()); + self.private_spilled_files + .write() + .insert(location.clone(), layout); Ok(location) } - pub async fn spill_unmanage(&self, data_block: Vec) -> Result<(Location, Layout)> { + async fn spill_unmanage(&self, data_block: Vec) -> Result<(Location, Layout)> { debug_assert!(!data_block.is_empty()); let instant = Instant::now(); @@ -171,12 +175,45 @@ impl Spiller { // Record statistics. record_write_profile(&location, &instant, data_size); - let layout = columns_layout.pop().unwrap(); - Ok((location, layout)) } + pub fn create_unique_location(&self) -> String { + format!("{}/{}", self.location_prefix, GlobalUniqName::unique()) + } + + pub async fn spill_stream_aggregate_buffer( + &self, + location: Option, + write_data: Vec>>, + ) -> Result<(String, usize)> { + let mut write_bytes = 0; + let location = location + .unwrap_or_else(|| format!("{}/{}", self.location_prefix, GlobalUniqName::unique())); + + let mut writer = self + .operator + .writer_with(&location) + .chunk(8 * 1024 * 1024) + .await?; + for write_bucket_data in write_data.into_iter() { + for data in write_bucket_data.into_iter() { + write_bytes += data.len(); + writer.write(data).await?; + } + } + + writer.close().await?; + self.ctx + .add_spill_file(Location::Remote(location.clone()), Layout::Aggregate); + + self.private_spilled_files + .write() + .insert(Location::Remote(location.clone()), Layout::Aggregate); + Ok((location, write_bytes)) + } + #[async_backtrace::framed] /// Spill data block with partition pub async fn spill_with_partition( @@ -234,6 +271,7 @@ impl Spiller { .. } = encoder; + let layout = columns_layout.last().unwrap().clone(); let partitions = partition_ids .into_iter() .zip( @@ -248,10 +286,13 @@ impl Spiller { // Spill data to storage. let instant = Instant::now(); let location = self.write_encodes(write_bytes, buf).await?; - // Record statistics. record_write_profile(&location, &instant, write_bytes); + self.ctx.add_spill_file(location.clone(), layout.clone()); + self.private_spilled_files + .write() + .insert(location.clone(), layout); Ok(MergedPartition { location, partitions, @@ -261,11 +302,11 @@ impl Spiller { /// Read a certain file to a [`DataBlock`]. /// We should guarantee that the file is managed by this spiller. pub async fn read_spilled_file(&self, location: &Location) -> Result { - let layout = self.columns_layout.get(location).unwrap(); - self.read_unmanage_spilled_file(location, layout).await + let layout = self.ctx.get_spill_layout(location).unwrap(); + self.read_unmanage_spilled_file(location, &layout).await } - pub async fn read_unmanage_spilled_file( + async fn read_unmanage_spilled_file( &self, location: &Location, columns_layout: &Layout, @@ -279,6 +320,7 @@ impl Spiller { debug_assert_eq!(path.size(), layout.iter().sum::()) } Layout::Parquet => {} + Layout::Aggregate => {} } match self.local_operator { @@ -442,8 +484,9 @@ impl Spiller { BlocksEncoder::new(self.use_parquet, align, 8 * 1024 * 1024) } - pub(crate) fn spilled_files(&self) -> Vec { - self.columns_layout.keys().cloned().collect() + pub(crate) fn private_spilled_files(&self) -> Vec { + let r = self.private_spilled_files.read(); + r.keys().cloned().collect() } } diff --git a/src/query/service/src/table_functions/others/suggested_background_tasks.rs b/src/query/service/src/table_functions/others/suggested_background_tasks.rs index b1654fc23af4b..637b99b5fe591 100644 --- a/src/query/service/src/table_functions/others/suggested_background_tasks.rs +++ b/src/query/service/src/table_functions/others/suggested_background_tasks.rs @@ -242,7 +242,8 @@ impl AsyncSource for SuggestedBackgroundTasksSource { LicenseManagerSwitch::instance() .check_enterprise_enabled(ctx.get_license_key(), Feature::BackgroundService)?; - let suggestions = Self::all_suggestions(Arc::new(ctx.clone())).await?; + let ctx = QueryContext::create_from(ctx); + let suggestions = Self::all_suggestions(ctx).await?; Ok(Some(self.to_block(suggestions)?)) } } diff --git a/src/query/service/tests/it/spillers/spiller.rs b/src/query/service/tests/it/spillers/spiller.rs index 52319f6a667b8..afdfc3ef715f3 100644 --- a/src/query/service/tests/it/spillers/spiller.rs +++ b/src/query/service/tests/it/spillers/spiller.rs @@ -24,7 +24,6 @@ use databend_common_expression::types::NumberScalar; use databend_common_expression::DataBlock; use databend_common_expression::FromData; use databend_common_expression::ScalarRef; -use databend_common_pipeline_core::query_spill_prefix; use databend_common_storage::DataOperator; use databend_query::spillers::Location; use databend_query::spillers::Spiller; @@ -37,10 +36,10 @@ async fn test_spill_with_partition() -> Result<()> { let fixture = TestFixture::setup().await?; let ctx = fixture.new_query_ctx().await?; - let tenant = ctx.get_tenant(); + let location_prefix = ctx.query_id_spill_prefix(); let spiller_config = SpillerConfig { spiller_type: SpillerType::HashJoinBuild, - location_prefix: query_spill_prefix(tenant.tenant_name(), &ctx.get_id()), + location_prefix, disk_spill: None, use_parquet: ctx.get_settings().get_spilling_file_format()?.is_parquet(), }; diff --git a/src/query/sql/src/planner/semantic/type_check.rs b/src/query/sql/src/planner/semantic/type_check.rs index cccedda5e664e..ae48c35f55a9d 100644 --- a/src/query/sql/src/planner/semantic/type_check.rs +++ b/src/query/sql/src/planner/semantic/type_check.rs @@ -3220,10 +3220,23 @@ impl<'a> TypeChecker<'a> { ])) } ("ifnull" | "nvl", args) => { - // Rewrite ifnull(x, y) to coalesce(x, y) - // Rewrite nvl(x, y) to coalesce(x, y) - // nvl is essentially an alias for ifnull. - Some(self.resolve_function(span, "coalesce", vec![], args)) + if args.len() == 2 { + // Rewrite ifnull(x, y) | nvl(x, y) to if(is_null(x), y, x) + Some(self.resolve_function(span, "if", vec![], &[ + &Expr::IsNull { + span, + expr: Box::new(args[0].clone()), + not: false, + }, + args[1], + args[0], + ])) + } else { + // Rewrite ifnull(args) to coalesce(x, y) + // Rewrite nvl(args) to coalesce(args) + // nvl is essentially an alias for ifnull. + Some(self.resolve_function(span, "coalesce", vec![], args)) + } } ("nvl2", &[arg_x, arg_y, arg_z]) => { // Rewrite nvl2(x, y, z) to if(is_not_null(x), y, z) diff --git a/src/query/storages/system/src/temp_files_table.rs b/src/query/storages/system/src/temp_files_table.rs index e714adacc4ad5..972308cef170f 100644 --- a/src/query/storages/system/src/temp_files_table.rs +++ b/src/query/storages/system/src/temp_files_table.rs @@ -43,7 +43,6 @@ use databend_common_meta_app::schema::TableInfo; use databend_common_meta_app::schema::TableMeta; use databend_common_pipeline_core::processors::OutputPort; use databend_common_pipeline_core::processors::ProcessorPtr; -use databend_common_pipeline_core::query_spill_prefix; use databend_common_pipeline_core::Pipeline; use databend_common_pipeline_sources::EmptySource; use databend_common_pipeline_sources::StreamSource; @@ -152,7 +151,7 @@ impl TempFilesTable { push_downs: Option, ) -> Result { let tenant = ctx.get_tenant(); - let location_prefix = format!("{}/", query_spill_prefix(tenant.tenant_name(), "")); + let location_prefix = format!("_query_spill/{}/", tenant.tenant_name()); let limit = push_downs.as_ref().and_then(|x| x.limit); let operator = DataOperator::instance().operator(); diff --git a/tests/sqllogictests/suites/ee/03_ee_vacuum/03_0000_vacuum_temporary_files.test b/tests/sqllogictests/suites/ee/03_ee_vacuum/03_0000_vacuum_temporary_files.test index 5b786560ae53d..3cc188418475b 100644 --- a/tests/sqllogictests/suites/ee/03_ee_vacuum/03_0000_vacuum_temporary_files.test +++ b/tests/sqllogictests/suites/ee/03_ee_vacuum/03_0000_vacuum_temporary_files.test @@ -12,31 +12,35 @@ ## See the License for the specific language governing permissions and ## limitations under the License. -onlyif mysql statement ok set max_threads = 8; -onlyif mysql statement ok set aggregate_spilling_bytes_threshold_per_proc = 1024 * 1024 * 1; -onlyif mysql +query TIFS +SELECT COUNT() FROM (SELECT number::string, count() FROM numbers_mt(100000) group by number::string order by 1 desc); +---- +100000 + +query TIFS +SELECT COUNT() > 0 as c FROM system.temp_files +---- +0 + statement ok set max_vacuum_temp_files_after_query = 1; -onlyif mysql query TIFS -SELECT COUNT() FROM (SELECT number::string, count() FROM numbers_mt(100000) group by number::string); +SELECT COUNT() FROM (SELECT number::string, count() FROM numbers_mt(100000) group by number::string order by 1 desc); ---- 100000 -onlyif mysql query TIFS -SELECT COUNT() FROM (SELECT COUNT() as c FROM system.temp_files) WHERE c > 0; +SELECT COUNT() > 0 as c FROM system.temp_files ---- 1 -onlyif mysql query TIFS SELECT sleep(2) from numbers(1); ---- @@ -45,20 +49,16 @@ SELECT sleep(2) from numbers(1); statement ok VACUUM TEMPORARY FILES RETAIN 2 SECONDS; -onlyif mysql query TIFS SELECT COUNT() FROM (SELECT COUNT() as c FROM system.temp_files) WHERE c > 0; ---- 0 -onlyif mysql statement ok unset max_threads; -onlyif mysql statement ok set max_vacuum_temp_files_after_query = 0; -onlyif mysql statement ok set aggregate_spilling_bytes_threshold_per_proc = 0; diff --git a/tests/sqllogictests/suites/mode/standalone/explain/push_down_filter/push_down_filter_join/push_down_filter_join_left_outer.test b/tests/sqllogictests/suites/mode/standalone/explain/push_down_filter/push_down_filter_join/push_down_filter_join_left_outer.test index e198e6215683b..8d68e6c3e8d92 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain/push_down_filter/push_down_filter_join/push_down_filter_join_left_outer.test +++ b/tests/sqllogictests/suites/mode/standalone/explain/push_down_filter/push_down_filter_join/push_down_filter_join_left_outer.test @@ -241,7 +241,7 @@ HashJoin │ └── estimated rows: 0.00 └── Filter(Probe) ├── output columns: [m1.id (#0)] - ├── filters: [is_true(if(CAST(is_not_null(base.context (#1)) AS Boolean NULL), CAST(assume_not_null(base.context (#1)) AS String NULL), true, '', NULL) = '')] + ├── filters: [is_true(if(CAST(NOT is_not_null(base.context (#1)) AS Boolean NULL), '', base.context (#1)) = '')] ├── estimated rows: 0.00 └── TableScan ├── table: default.default.m1 @@ -250,7 +250,7 @@ HashJoin ├── read size: 0 ├── partitions total: 0 ├── partitions scanned: 0 - ├── push downs: [filters: [is_true(if(CAST(is_not_null(m1.context (#1)) AS Boolean NULL), CAST(assume_not_null(m1.context (#1)) AS String NULL), true, '', NULL) = '')], limit: NONE] + ├── push downs: [filters: [is_true(if(CAST(NOT is_not_null(m1.context (#1)) AS Boolean NULL), '', m1.context (#1)) = '')], limit: NONE] └── estimated rows: 0.00 statement ok diff --git a/tests/sqllogictests/suites/mode/standalone/explain_native/push_down_filter/push_down_filter_join/push_down_filter_join_left_outer.test b/tests/sqllogictests/suites/mode/standalone/explain_native/push_down_filter/push_down_filter_join/push_down_filter_join_left_outer.test index 98246dc50c830..bcfa4e97b75eb 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain_native/push_down_filter/push_down_filter_join/push_down_filter_join_left_outer.test +++ b/tests/sqllogictests/suites/mode/standalone/explain_native/push_down_filter/push_down_filter_join/push_down_filter_join_left_outer.test @@ -222,7 +222,7 @@ HashJoin ├── read size: 0 ├── partitions total: 0 ├── partitions scanned: 0 - ├── push downs: [filters: [is_true(if(CAST(is_not_null(m1.context (#1)) AS Boolean NULL), CAST(assume_not_null(m1.context (#1)) AS String NULL), true, '', NULL) = '')], limit: NONE] + ├── push downs: [filters: [is_true(if(CAST(NOT is_not_null(m1.context (#1)) AS Boolean NULL), '', m1.context (#1)) = '')], limit: NONE] └── estimated rows: 0.00 statement ok diff --git a/tests/sqllogictests/suites/query/functions/02_0070_function_nvl.test b/tests/sqllogictests/suites/query/functions/02_0070_function_nvl.test index a001f1d49e51f..e10d5f0a96ece 100644 --- a/tests/sqllogictests/suites/query/functions/02_0070_function_nvl.test +++ b/tests/sqllogictests/suites/query/functions/02_0070_function_nvl.test @@ -1,5 +1,5 @@ query B -select nvl(nullif(rand()>0.5, true), false) is null from (unnest(range(1,10))); +select nvl(nullif(rand()>0, true), false) is null from (unnest(range(1,10))); ---- 0 0 diff --git a/tests/sqllogictests/suites/tpch/spill.test b/tests/sqllogictests/suites/tpch/spill.test index c393f2082b61f..2ae54528132b4 100644 --- a/tests/sqllogictests/suites/tpch/spill.test +++ b/tests/sqllogictests/suites/tpch/spill.test @@ -42,7 +42,7 @@ include ./queries.test query I SELECT count(*), sum(p_size), - sum(l_linenumber) + sum(l_linenumber) > 30000000 -- cross join may have different order FROM ( SELECT * FROM lineitem, @@ -50,7 +50,7 @@ FROM ( LIMIT 10000000 ); ---- -10000000 253667500 31260000 +10000000 253667500 1 # INNER JOIN @@ -69,8 +69,8 @@ WHERE o_orderkey IN ( FROM lineitem GROUP BY l_orderkey HAVING sum(l_quantity) > 300 - ) AND - c_custkey=o_custkey AND + ) AND + c_custkey=o_custkey AND o_orderkey=l_orderkey GROUP BY c_name, c_custkey, @@ -89,25 +89,25 @@ Customer#000011459 11459 551136 1993-05-19 386812.74 308.00 # LEFT OUTER JOIN query I -SELECT - c_count, - count(*) AS custdist -FROM +SELECT + c_count, + count(*) AS custdist +FROM ( - SELECT - c_custkey, - count(o_orderkey) AS c_count - FROM - customer - RIGHT OUTER JOIN orders ON c_custkey = o_custkey - AND o_comment NOT LIKE '%pending%deposits%' - GROUP BY + SELECT + c_custkey, + count(o_orderkey) AS c_count + FROM + customer + RIGHT OUTER JOIN orders ON c_custkey = o_custkey + AND o_comment NOT LIKE '%pending%deposits%' + GROUP BY c_custkey - ) c_orders -GROUP BY - c_count -ORDER BY - custdist DESC, + ) c_orders +GROUP BY + c_count +ORDER BY + custdist DESC, c_count DESC; ---- 10 676 @@ -208,25 +208,25 @@ where lineitem.l_receiptdate > lineitem.l_commitdate and # RIGHT OUTER JOIN query I -SELECT - c_count, - count(*) AS custdist -FROM +SELECT + c_count, + count(*) AS custdist +FROM ( - SELECT - c_custkey, - count(o_orderkey) AS c_count - FROM - customer - LEFT OUTER JOIN orders ON c_custkey = o_custkey - AND o_comment NOT LIKE '%pending%deposits%' - GROUP BY + SELECT + c_custkey, + count(o_orderkey) AS c_count + FROM + customer + LEFT OUTER JOIN orders ON c_custkey = o_custkey + AND o_comment NOT LIKE '%pending%deposits%' + GROUP BY c_custkey - ) c_orders -GROUP BY - c_count -ORDER BY - custdist DESC, + ) c_orders +GROUP BY + c_count +ORDER BY + custdist DESC, c_count DESC; ---- 0 5000 @@ -270,38 +270,38 @@ ORDER BY # RIGHT SEMI JOIN query I -select - sum(numwait) -from +select + sum(numwait) +from ( - select - s_name, + select + s_name, truncate( - count(*), + count(*), 4 - ) as numwait - from - supplier, - lineitem l1, - orders - where - s_suppkey = l1.l_suppkey - and o_orderkey = l1.l_orderkey - and o_orderstatus = 'F' - and l1.l_receiptdate > l1.l_commitdate + ) as numwait + from + supplier, + lineitem l1, + orders + where + s_suppkey = l1.l_suppkey + and o_orderkey = l1.l_orderkey + and o_orderstatus = 'F' + and l1.l_receiptdate > l1.l_commitdate and exists ( - select - * - from - lineitem l2 - where - l2.l_orderkey = l1.l_orderkey + select + * + from + lineitem l2 + where + l2.l_orderkey = l1.l_orderkey and l2.l_suppkey <> l1.l_suppkey - ) - group by - s_name - order by - numwait desc, + ) + group by + s_name + order by + numwait desc, s_name ); ---- @@ -310,39 +310,39 @@ from # RIGHT ANTI JOIN query I -select - sum(numwait) -from +select + sum(numwait) +from ( - select - s_name, + select + s_name, truncate( - count(*), + count(*), 4 - ) as numwait - from - supplier, - lineitem l1, - orders - where - s_suppkey = l1.l_suppkey - and o_orderkey = l1.l_orderkey - and o_orderstatus = 'F' - and l1.l_receiptdate > l1.l_commitdate + ) as numwait + from + supplier, + lineitem l1, + orders + where + s_suppkey = l1.l_suppkey + and o_orderkey = l1.l_orderkey + and o_orderstatus = 'F' + and l1.l_receiptdate > l1.l_commitdate and not exists ( - select - * - from - lineitem l3 - where - l3.l_orderkey = l1.l_orderkey - and l3.l_suppkey <> l1.l_suppkey + select + * + from + lineitem l3 + where + l3.l_orderkey = l1.l_orderkey + and l3.l_suppkey <> l1.l_suppkey and l3.l_receiptdate > l3.l_commitdate - ) - group by - s_name - order by - numwait desc, + ) + group by + s_name + order by + numwait desc, s_name ); ---- @@ -376,7 +376,7 @@ FROM ( SELECT c_custkey, count(o_orderkey) AS c_count FROM customer - FULL OUTER JOIN orders ON c_custkey=o_custkey AND + FULL OUTER JOIN orders ON c_custkey=o_custkey AND o_comment NOT LIKE '%pending%deposits%' GROUP BY c_custkey ) c_orders diff --git a/tests/suites/0_stateless/20+_others/20_0014_sort_spill.result b/tests/suites/0_stateless/20+_others/20_0014_sort_spill.result index a0967d9ce6914..91e71a1f07d42 100644 --- a/tests/suites/0_stateless/20+_others/20_0014_sort_spill.result +++ b/tests/suites/0_stateless/20+_others/20_0014_sort_spill.result @@ -1,19 +1,19 @@ ==TEST GLOBAL SORT== 3 1 -0 -1 +false +true NULL 1 ==Test if the spill is activated== -2 +true ==Enable sort_spilling_bytes_threshold_per_proc== 1 -0 -1 +false +true NULL -1 -0 +true +false NULL 3 7 2 8 @@ -40,7 +40,7 @@ NULL 6 2 5 1 ==Test a== -16 +true ==Test b== 1 2 NULL @@ -66,10 +66,10 @@ NULL NULL ==TEST TOP-N SORT== 1 ==Test c== -0 +false ==Test d== 1 -1 +true ==Test e== 1 2 NULL @@ -83,4 +83,4 @@ NULL NULL 2 5 ==Test f== 1 -9 +true diff --git a/tests/suites/0_stateless/20+_others/20_0014_sort_spill.sql b/tests/suites/0_stateless/20+_others/20_0014_sort_spill.sh old mode 100644 new mode 100755 similarity index 84% rename from tests/suites/0_stateless/20+_others/20_0014_sort_spill.sql rename to tests/suites/0_stateless/20+_others/20_0014_sort_spill.sh index fb07f5d5f7ad3..ac73bc14e261b --- a/tests/suites/0_stateless/20+_others/20_0014_sort_spill.sql +++ b/tests/suites/0_stateless/20+_others/20_0014_sort_spill.sh @@ -1,3 +1,8 @@ +#!/usr/bin/env bash +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +. "$CURDIR"/../../../shell_env.sh + +$BENDSQL_CLIENT_CONNECT --query=""" SELECT '==TEST GLOBAL SORT=='; SET max_vacuum_temp_files_after_query= 0; set sort_spilling_bytes_threshold_per_proc = 8; @@ -19,7 +24,7 @@ SELECT '==Test if the spill is activated=='; -- Test if the spill is activated. set sort_spilling_bytes_threshold_per_proc = 0; -SELECT any_if(count, number = 2) - any_if(count, number = 1) FROM temp_files_count; +SELECT (any_if(count, number = 2) - any_if(count, number = 1)) > 0 FROM temp_files_count; set sort_spilling_bytes_threshold_per_proc = 8; SELECT '==Enable sort_spilling_bytes_threshold_per_proc=='; @@ -50,7 +55,7 @@ INSERT INTO temp_files_count SELECT COUNT() as count, 4 as number FROM system.te SELECT '==Test a=='; set sort_spilling_bytes_threshold_per_proc = 0; -SELECT any_if(count, number = 4) - any_if(count, number = 3) FROM temp_files_count; +SELECT (any_if(count, number = 4) - any_if(count, number = 3)) > 0 FROM temp_files_count; set sort_spilling_bytes_threshold_per_proc = 8; SELECT '==Test b=='; @@ -80,7 +85,7 @@ SELECT '==Test d=='; INSERT INTO temp_files_count SELECT COUNT() as count, 6 as number FROM system.temp_files; set sort_spilling_bytes_threshold_per_proc = 0; -SELECT any_if(count, number = 6) - any_if(count, number = 5) FROM temp_files_count; +SELECT (any_if(count, number = 6) - any_if(count, number = 5)) > 0 FROM temp_files_count; set sort_spilling_bytes_threshold_per_proc = 60; SELECT '==Test e=='; @@ -98,4 +103,7 @@ INSERT INTO temp_files_count SELECT COUNT() as count, 8 as number FROM system.te unset max_vacuum_temp_files_after_query; unset enable_experimental_stream_sort_spilling; set sort_spilling_bytes_threshold_per_proc = 0; -SELECT any_if(count, number = 8) - any_if(count, number = 7) FROM temp_files_count; \ No newline at end of file +SELECT (any_if(count, number = 8) - any_if(count, number = 7)) > 0 FROM temp_files_count; + +SET max_vacuum_temp_files_after_query= 300000; +"""