From 55479a6f257bcea17ce4220a270d3319d24db3a8 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 3 Nov 2022 10:37:41 +0800 Subject: [PATCH 1/3] feat: Add cache operator in global services Signed-off-by: Xuanwo --- src/common/storage/src/operator.rs | 28 +++++++++++++++++++----- src/query/service/src/global_services.rs | 22 +++++++++++++++++++ 2 files changed, 45 insertions(+), 5 deletions(-) diff --git a/src/common/storage/src/operator.rs b/src/common/storage/src/operator.rs index eadfe6025d50d..572f0baf18a10 100644 --- a/src/common/storage/src/operator.rs +++ b/src/common/storage/src/operator.rs @@ -48,6 +48,7 @@ use crate::config::StorageHttpConfig; use crate::config::StorageMokaConfig; use crate::config::StorageObsConfig; use crate::runtime_layer::RuntimeLayer; +use crate::CacheConfig; use crate::StorageConfig; use crate::StorageOssConfig; @@ -398,7 +399,7 @@ static CACHE_OPERATOR: OnceCell> = OnceCell::new(); impl CacheOperator { pub async fn init( - conf: &StorageConfig, + conf: &CacheConfig, v: Singleton, ) -> common_exception::Result<()> { v.init(Self::try_create(conf).await?)?; @@ -407,15 +408,32 @@ impl CacheOperator { Ok(()) } - pub async fn try_create(conf: &StorageConfig) -> common_exception::Result { - let op = init_operator(&conf.params)?; + pub async fn try_create(conf: &CacheConfig) -> common_exception::Result { + let operator = init_operator(&conf.params)?; + + // OpenDAL will send a real request to underlying storage to check whether it works or not. + // If this check failed, it's highly possible that the users have configured it wrongly. + // + // Make sure the check is called inside GlobalIORuntime to prevent + // IO hang on reuse connection. + let op = operator.clone(); + if let Err(cause) = GlobalIORuntime::instance() + .spawn(async move { op.object("health_check").create().await }) + .await + .expect("join must succeed") + { + return Err(ErrorCode::StorageUnavailable(format!( + "current configured cache is not available: config: {:?}, cause: {cause}", + conf + ))); + } - Ok(CacheOperator { op }) + Ok(CacheOperator { op: operator }) } pub fn instance() -> CacheOperator { match CACHE_OPERATOR.get() { - None => panic!("StorageOperator is not init"), + None => panic!("CacheOperator is not init"), Some(op) => op.get(), } } diff --git a/src/query/service/src/global_services.rs b/src/query/service/src/global_services.rs index 9d12969d43ed8..10cb89e9c29d4 100644 --- a/src/query/service/src/global_services.rs +++ b/src/query/service/src/global_services.rs @@ -24,6 +24,7 @@ use common_catalog::catalog::CatalogManager; use common_config::Config; use common_exception::Result; use common_fuse_meta::caches::CacheManager; +use common_storage::CacheOperator; use common_storage::DataOperator; use common_storage::ShareTableConfig; use common_tracing::QueryLogger; @@ -41,6 +42,7 @@ pub struct GlobalServices { query_logger: UnsafeCell>>, cluster_discovery: UnsafeCell>>, storage_operator: UnsafeCell>, + cache_operator: UnsafeCell>, cache_manager: UnsafeCell>>, catalog_manager: UnsafeCell>>, http_query_manager: UnsafeCell>>, @@ -61,6 +63,7 @@ impl GlobalServices { query_logger: UnsafeCell::new(None), cluster_discovery: UnsafeCell::new(None), storage_operator: UnsafeCell::new(None), + cache_operator: UnsafeCell::new(None), cache_manager: UnsafeCell::new(None), catalog_manager: UnsafeCell::new(None), http_query_manager: UnsafeCell::new(None), @@ -82,6 +85,7 @@ impl GlobalServices { ClusterDiscovery::init(config.clone(), global_services.clone()).await?; DataOperator::init(&config.storage, global_services.clone()).await?; + CacheOperator::init(&config.cache, global_services.clone()).await?; ShareTableConfig::init( &config.query.share_endpoint_address, @@ -177,6 +181,24 @@ impl SingletonImpl for GlobalServices { } } +impl SingletonImpl for GlobalServices { + fn get(&self) -> CacheOperator { + unsafe { + match &*self.cache_operator.get() { + None => panic!("CacheOperator is not init"), + Some(op) => op.clone(), + } + } + } + + fn init(&self, value: CacheOperator) -> Result<()> { + unsafe { + *(self.cache_operator.get() as *mut Option) = Some(value); + Ok(()) + } + } +} + impl SingletonImpl> for GlobalServices { fn get(&self) -> Arc { unsafe { From 10d7d15bdc4c4aec36fb39dd99a7aa4db162e51a Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 3 Nov 2022 10:46:49 +0800 Subject: [PATCH 2/3] Improve display Signed-off-by: Xuanwo --- src/binaries/query/main.rs | 8 +++++--- src/common/storage/src/config.rs | 24 ++++++++++++------------ 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/src/binaries/query/main.rs b/src/binaries/query/main.rs index 823823cdb54d8..7b54189ecd553 100644 --- a/src/binaries/query/main.rs +++ b/src/binaries/query/main.rs @@ -173,9 +173,10 @@ async fn main(_global_tracker: Arc) -> common_exception::Result< println!("Databend Query"); println!(); println!("Version: {}", *DATABEND_COMMIT_VERSION); - println!("Log:"); - println!(" File: {}", conf.log.file); - println!(" Stderr: {}", conf.log.stderr); + println!(); + println!("Logging:"); + println!(" file: {}", conf.log.file); + println!(" stderr: {}", conf.log.stderr); println!( "Meta: {}", if conf.meta.address.is_empty() && conf.meta.endpoints.is_empty() { @@ -187,6 +188,7 @@ async fn main(_global_tracker: Arc) -> common_exception::Result< } ); println!("Storage: {}", conf.storage.params); + println!("Cache: {}", conf.cache.params); println!(); println!("Admin"); println!(" listened at {}", conf.query.admin_api_address); diff --git a/src/common/storage/src/config.rs b/src/common/storage/src/config.rs index 72b7b8e5ea6c6..43402e6031b81 100644 --- a/src/common/storage/src/config.rs +++ b/src/common/storage/src/config.rs @@ -85,44 +85,44 @@ impl Display for StorageParams { match self { StorageParams::Azblob(v) => write!( f, - "azblob://container={},root={},endpoint={}", + "azblob | container={},root={},endpoint={}", v.container, v.root, v.endpoint_url ), - StorageParams::Fs(v) => write!(f, "fs://root={}", v.root), + StorageParams::Fs(v) => write!(f, "fs | root={}", v.root), StorageParams::Ftp(v) => { - write!(f, "ftp://root={},endpoint={}", v.root, v.endpoint) + write!(f, "ftp | root={},endpoint={}", v.root, v.endpoint) } StorageParams::Gcs(v) => write!( f, - "gcs://bucket={},root={},endpoint={}", + "gcs | bucket={},root={},endpoint={}", v.bucket, v.root, v.endpoint_url ), #[cfg(feature = "storage-hdfs")] StorageParams::Hdfs(v) => { - write!(f, "hdfs://root={},name_node={}", v.root, v.name_node) + write!(f, "hdfs | root={},name_node={}", v.root, v.name_node) } StorageParams::Http(v) => { - write!(f, "http://endpoint={},paths={:?}", v.endpoint_url, v.paths) + write!(f, "http | endpoint={},paths={:?}", v.endpoint_url, v.paths) } StorageParams::Ipfs(c) => { - write!(f, "ipfs://endpoint={},root={}", c.endpoint_url, c.root) + write!(f, "ipfs | endpoint={},root={}", c.endpoint_url, c.root) } - StorageParams::Memory => write!(f, "memory://"), - StorageParams::Moka(_) => write!(f, "moka://"), + StorageParams::Memory => write!(f, "memory"), + StorageParams::Moka(_) => write!(f, "moka"), StorageParams::Obs(v) => write!( f, - "obs://bucket={},root={},endpoint={}", + "obs | bucket={},root={},endpoint={}", v.bucket, v.root, v.endpoint_url ), StorageParams::Oss(v) => write!( f, - "oss://bucket={},root={},endpoint={}", + "oss | bucket={},root={},endpoint={}", v.bucket, v.root, v.endpoint_url ), StorageParams::S3(v) => { write!( f, - "s3://bucket={},root={},endpoint={}", + "s3 | bucket={},root={},endpoint={}", v.bucket, v.root, v.endpoint_url ) } From f08165949dd2be660ba55be99f5c64e244e2f929 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 3 Nov 2022 12:51:55 +0800 Subject: [PATCH 3/3] Fix test global services Signed-off-by: Xuanwo --- src/query/service/tests/it/tests/sessions.rs | 50 ++++++++++++++------ 1 file changed, 35 insertions(+), 15 deletions(-) diff --git a/src/query/service/tests/it/tests/sessions.rs b/src/query/service/tests/it/tests/sessions.rs index e2c9aece8c7de..5ebd5b44a10ea 100644 --- a/src/query/service/tests/it/tests/sessions.rs +++ b/src/query/service/tests/it/tests/sessions.rs @@ -24,6 +24,7 @@ use common_catalog::catalog::CatalogManager; use common_config::Config; use common_exception::Result; use common_fuse_meta::caches::CacheManager; +use common_storage::CacheOperator; use common_storage::DataOperator; use common_tracing::set_panic_hook; use common_tracing::QueryLogger; @@ -62,11 +63,13 @@ impl Drop for TestGuard { /// Hard code, in order to make each test share the global service instance, we made some hack code /// - We use thread names as key to store global service instances, because rust test passes the test name through the thread name /// - In the debug version, we enable the transfer of thread names by environment variables. +#[derive(Default)] pub struct TestGlobalServices { global_runtime: Mutex>>, query_logger: Mutex>>, cluster_discovery: Mutex>>, storage_operator: Mutex>, + cache_operator: Mutex>, cache_manager: Mutex>>, catalog_manager: Mutex>>, http_query_manager: Mutex>>, @@ -86,21 +89,7 @@ impl TestGlobalServices { pub async fn setup(config: Config) -> Result { set_panic_hook(); std::env::set_var("UNIT_TEST", "TRUE"); - let global_services = GLOBAL.get_or_init(|| { - Arc::new(TestGlobalServices { - global_runtime: Mutex::new(HashMap::new()), - query_logger: Mutex::new(HashMap::new()), - cluster_discovery: Mutex::new(HashMap::new()), - storage_operator: Mutex::new(HashMap::new()), - cache_manager: Mutex::new(HashMap::new()), - catalog_manager: Mutex::new(HashMap::new()), - http_query_manager: Mutex::new(HashMap::new()), - data_exchange_manager: Mutex::new(HashMap::new()), - session_manager: Mutex::new(HashMap::new()), - users_manager: Mutex::new(HashMap::new()), - users_role_manager: Mutex::new(HashMap::new()), - }) - }); + let global_services = GLOBAL.get_or_init(|| Arc::new(TestGlobalServices::default())); // The order of initialization is very important let app_name_shuffle = format!("{}-{}", config.query.tenant_id, config.query.cluster_id); @@ -112,6 +101,7 @@ impl TestGlobalServices { ClusterDiscovery::init(config.clone(), global_services.clone()).await?; DataOperator::init(&config.storage, global_services.clone()).await?; + CacheOperator::init(&config.cache, global_services.clone()).await?; CacheManager::init(&config.query, global_services.clone())?; CatalogManager::init(&config, global_services.clone()).await?; HttpQueryManager::init(&config, global_services.clone()).await?; @@ -163,6 +153,12 @@ impl TestGlobalServices { drop(storage_operator_guard); drop(storage_operator); } + { + let mut cache_operator_guard = self.cache_operator.lock(); + let cache_operator = cache_operator_guard.remove(key); + drop(cache_operator_guard); + drop(cache_operator); + } { let mut cache_manager_guard = self.cache_manager.lock(); let cache_manager = cache_manager_guard.remove(key); @@ -304,6 +300,30 @@ impl SingletonImpl for TestGlobalServices { } } +impl SingletonImpl for TestGlobalServices { + fn get(&self) -> CacheOperator { + match std::thread::current().name() { + None => panic!("CacheOperator is not init"), + Some(name) => match self.cache_operator.lock().get(name) { + None => panic!("CacheOperator is not init, while in test '{}'", name), + Some(cache_operator) => cache_operator.clone(), + }, + } + } + + fn init(&self, value: CacheOperator) -> Result<()> { + match std::thread::current().name() { + None => panic!("thread name is none"), + Some(name) => match self.cache_operator.lock().entry(name.to_string()) { + Entry::Vacant(v) => v.insert(value), + Entry::Occupied(_v) => panic!("CacheOperator set twice in test[{:?}]", name), + }, + }; + + Ok(()) + } +} + impl SingletonImpl> for TestGlobalServices { fn get(&self) -> Arc { match std::thread::current().name() {