Skip to content

Commit

Permalink
fix(query): fix a number of bugs in the spill config. (#17103)
Browse files Browse the repository at this point in the history
* refactor(config): change spill_local_disk_path from OsString to String and update related logic

- Updated SpillConfig struct to use String instead of OsString for spill_local_disk_path.
- Adjusted conversion logic in cache_config_converters to handle the new String type.
- Added a test for spill configuration to ensure correct loading and path assignment.
- Minor improvements in error handling and temporary directory management.

* fix
  • Loading branch information
forsaken628 authored Dec 24, 2024
1 parent b72f3e1 commit ee3e395
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 18 deletions.
25 changes: 14 additions & 11 deletions src/query/config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::env;
use std::ffi::OsString;
use std::fmt;
use std::fmt::Debug;
use std::fmt::Formatter;
Expand Down Expand Up @@ -2958,7 +2957,7 @@ pub struct DiskCacheConfig {
pub struct SpillConfig {
/// Path of spill to local disk. disable if it's empty.
#[clap(long, value_name = "VALUE", default_value = "")]
pub spill_local_disk_path: OsString,
pub spill_local_disk_path: String,

#[clap(long, value_name = "VALUE", default_value = "30")]
/// Percentage of reserve disk space that won't be used for spill to local disk.
Expand Down Expand Up @@ -3039,7 +3038,11 @@ mod cache_config_converters {
{
spill.spill_local_disk_path = PathBuf::from(&cache.disk_cache_config.path)
.join("temp/_query_spill")
.into();
.into_os_string()
.into_string()
.map_err(|s| {
ErrorCode::Internal(format!("failed to convert os string to string: {s:?}"))
})?
};

Ok(InnerConfig {
Expand Down Expand Up @@ -3120,20 +3123,20 @@ mod cache_config_converters {
fn try_from(value: SpillConfig) -> std::result::Result<Self, Self::Error> {
let SpillConfig {
spill_local_disk_path,
spill_local_disk_reserved_space_percentage: spill_local_disk_max_space_percentage,
spill_local_disk_reserved_space_percentage: reserved,
spill_local_disk_max_bytes,
} = value;
if !spill_local_disk_max_space_percentage.is_normal()
|| spill_local_disk_max_space_percentage.is_sign_negative()
|| spill_local_disk_max_space_percentage > OrderedFloat(100.0)
if !reserved.is_normal()
|| reserved.is_sign_negative()
|| reserved > OrderedFloat(100.0)
{
return Err(ErrorCode::InvalidArgument(
"invalid spill_local_disk_max_space_percentage",
));
Err(ErrorCode::InvalidArgument(format!(
"invalid spill_local_disk_reserved_space_percentage: {reserved}"
)))?;
}
Ok(Self {
path: spill_local_disk_path,
reserved_disk_ratio: spill_local_disk_max_space_percentage / 100.0,
reserved_disk_ratio: reserved / 100.0,
global_bytes_limit: spill_local_disk_max_bytes,
})
}
Expand Down
5 changes: 2 additions & 3 deletions src/query/config/src/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

use std::collections::HashMap;
use std::ffi::OsString;
use std::fmt;
use std::fmt::Debug;
use std::fmt::Display;
Expand Down Expand Up @@ -719,7 +718,7 @@ impl Default for CacheConfig {
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct SpillConfig {
/// Path of spill to local disk. disable if it's empty.
pub path: OsString,
pub path: String,

/// Ratio of the reserve of the disk space.
pub reserved_disk_ratio: OrderedFloat<f64>,
Expand All @@ -731,7 +730,7 @@ pub struct SpillConfig {
impl Default for SpillConfig {
fn default() -> Self {
Self {
path: OsString::from(""),
path: "".to_string(),
reserved_disk_ratio: OrderedFloat(0.3),
global_bytes_limit: u64::MAX,
}
Expand Down
32 changes: 32 additions & 0 deletions src/query/service/tests/it/configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -926,6 +926,38 @@ protocol = "binary"
Ok(())
}

#[test]
fn test_spill_config() -> Result<()> {
let file_path = temp_dir().join("databend_test_spill_config.toml");

let mut f = fs::File::create(&file_path)?;
f.write_all(
r#"
[spill]
spill_local_disk_path = "/data/spill"
spill_local_disk_reserved_space_percentage = 0.5
"#
.as_bytes(),
)?;

// Make sure all data flushed.
f.flush()?;

temp_env::with_vars(
vec![("CONFIG_FILE", Some(file_path.to_string_lossy().as_ref()))],
|| {
let cfg = InnerConfig::load_for_test().expect("config load failed");

assert_eq!(cfg.spill.path, "/data/spill");
},
);

// remove temp file
fs::remove_file(file_path)?;

Ok(())
}

/// Test new hive catalog
#[test]
fn test_override_config_new_hive_catalog() -> Result<()> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,9 @@ DB.Table: 'system'.'configs', Table: configs-table_id:1, ver:0, Engine: SystemCo
| 'query' | 'udf_server_allow_list' | '' | '' |
| 'query' | 'udfs' | '{"name":"test_builtin_ping","definition":"CREATE OR REPLACE FUNCTION test_builtin_ping (STRING)\n RETURNS STRING\n LANGUAGE python\nHANDLER = 'ping'\nADDRESS = 'https://databend.com';"}' | '' |
| 'query' | 'users' | '{"name":"root","auth_type":"no_password","auth_string":null}' | '' |
| 'spill' | 'spill_local_disk_max_bytes' | '18446744073709551615' | '' |
| 'spill' | 'spill_local_disk_path' | '' | '' |
| 'spill' | 'spill_local_disk_reserved_space_percentage' | '30.0' | '' |
| 'storage' | 'allow_insecure' | 'true' | '' |
| 'storage' | 'azblob.account_key' | '' | '' |
| 'storage' | 'azblob.account_name' | '' | '' |
Expand Down
7 changes: 3 additions & 4 deletions src/query/storages/common/cache/src/temp_dir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl TempDirManager {
if let Err(e) = remove_dir_all(&path) {
if !matches!(e.kind(), ErrorKind::NotFound) {
return Err(ErrorCode::StorageUnavailable(format!(
"can't clean temp dir: {e}",
"can't clean temp dir {path:?}: {e}",
)));
}
}
Expand Down Expand Up @@ -373,7 +373,6 @@ impl Drop for InnerPath {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::ffi::OsString;
use std::fs;
use std::sync::atomic::Ordering;

Expand All @@ -385,7 +384,7 @@ mod tests {
GlobalInstance::init_testing(thread.name().unwrap());

let config = SpillConfig {
path: OsString::from("test_data"),
path: "test_data".to_string(),
reserved_disk_ratio: 0.01.into(),
global_bytes_limit: 1 << 30,
};
Expand Down Expand Up @@ -425,7 +424,7 @@ mod tests {
GlobalInstance::init_testing(thread.name().unwrap());

let config = SpillConfig {
path: OsString::from("test_data2"),
path: "test_data2".to_string(),
reserved_disk_ratio: 0.99.into(),
global_bytes_limit: 1 << 30,
};
Expand Down
11 changes: 11 additions & 0 deletions src/query/storages/system/src/configs_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,17 @@ impl SyncSystemTable for ConfigsTable {
cache_config_value,
);

let spill_config = config.spill;
let spill_config_value = serde_json::to_value(spill_config)?;
ConfigsTable::extract_config(
&mut names,
&mut values,
&mut groups,
&mut descs,
"spill".to_string(),
spill_config_value,
);

let storage_config = config.storage;
let storage_config_value = serde_json::to_value(storage_config)?;
ConfigsTable::extract_config(
Expand Down

0 comments on commit ee3e395

Please sign in to comment.