Skip to content

Commit

Permalink
Merge pull request #8616 from Xuanwo/cache-operator
Browse files Browse the repository at this point in the history
feat: Add cache operator in global services
  • Loading branch information
Xuanwo authored Nov 3, 2022
2 parents cb2359f + f081659 commit 23b2390
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 35 deletions.
8 changes: 5 additions & 3 deletions src/binaries/query/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,10 @@ async fn main(_global_tracker: Arc<RuntimeTracker>) -> common_exception::Result<
println!("Databend Query");
println!();
println!("Version: {}", *DATABEND_COMMIT_VERSION);
println!("Log:");
println!(" File: {}", conf.log.file);
println!(" Stderr: {}", conf.log.stderr);
println!();
println!("Logging:");
println!(" file: {}", conf.log.file);
println!(" stderr: {}", conf.log.stderr);
println!(
"Meta: {}",
if conf.meta.address.is_empty() && conf.meta.endpoints.is_empty() {
Expand All @@ -187,6 +188,7 @@ async fn main(_global_tracker: Arc<RuntimeTracker>) -> common_exception::Result<
}
);
println!("Storage: {}", conf.storage.params);
println!("Cache: {}", conf.cache.params);
println!();
println!("Admin");
println!(" listened at {}", conf.query.admin_api_address);
Expand Down
24 changes: 12 additions & 12 deletions src/common/storage/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,44 +85,44 @@ impl Display for StorageParams {
match self {
StorageParams::Azblob(v) => write!(
f,
"azblob://container={},root={},endpoint={}",
"azblob | container={},root={},endpoint={}",
v.container, v.root, v.endpoint_url
),
StorageParams::Fs(v) => write!(f, "fs://root={}", v.root),
StorageParams::Fs(v) => write!(f, "fs | root={}", v.root),
StorageParams::Ftp(v) => {
write!(f, "ftp://root={},endpoint={}", v.root, v.endpoint)
write!(f, "ftp | root={},endpoint={}", v.root, v.endpoint)
}
StorageParams::Gcs(v) => write!(
f,
"gcs://bucket={},root={},endpoint={}",
"gcs | bucket={},root={},endpoint={}",
v.bucket, v.root, v.endpoint_url
),
#[cfg(feature = "storage-hdfs")]
StorageParams::Hdfs(v) => {
write!(f, "hdfs://root={},name_node={}", v.root, v.name_node)
write!(f, "hdfs | root={},name_node={}", v.root, v.name_node)
}
StorageParams::Http(v) => {
write!(f, "http://endpoint={},paths={:?}", v.endpoint_url, v.paths)
write!(f, "http | endpoint={},paths={:?}", v.endpoint_url, v.paths)
}
StorageParams::Ipfs(c) => {
write!(f, "ipfs://endpoint={},root={}", c.endpoint_url, c.root)
write!(f, "ipfs | endpoint={},root={}", c.endpoint_url, c.root)
}
StorageParams::Memory => write!(f, "memory://"),
StorageParams::Moka(_) => write!(f, "moka://"),
StorageParams::Memory => write!(f, "memory"),
StorageParams::Moka(_) => write!(f, "moka"),
StorageParams::Obs(v) => write!(
f,
"obs://bucket={},root={},endpoint={}",
"obs | bucket={},root={},endpoint={}",
v.bucket, v.root, v.endpoint_url
),
StorageParams::Oss(v) => write!(
f,
"oss://bucket={},root={},endpoint={}",
"oss | bucket={},root={},endpoint={}",
v.bucket, v.root, v.endpoint_url
),
StorageParams::S3(v) => {
write!(
f,
"s3://bucket={},root={},endpoint={}",
"s3 | bucket={},root={},endpoint={}",
v.bucket, v.root, v.endpoint_url
)
}
Expand Down
28 changes: 23 additions & 5 deletions src/common/storage/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use crate::config::StorageHttpConfig;
use crate::config::StorageMokaConfig;
use crate::config::StorageObsConfig;
use crate::runtime_layer::RuntimeLayer;
use crate::CacheConfig;
use crate::StorageConfig;
use crate::StorageOssConfig;

Expand Down Expand Up @@ -398,7 +399,7 @@ static CACHE_OPERATOR: OnceCell<Singleton<CacheOperator>> = OnceCell::new();

impl CacheOperator {
pub async fn init(
conf: &StorageConfig,
conf: &CacheConfig,
v: Singleton<CacheOperator>,
) -> common_exception::Result<()> {
v.init(Self::try_create(conf).await?)?;
Expand All @@ -407,15 +408,32 @@ impl CacheOperator {
Ok(())
}

pub async fn try_create(conf: &StorageConfig) -> common_exception::Result<CacheOperator> {
let op = init_operator(&conf.params)?;
pub async fn try_create(conf: &CacheConfig) -> common_exception::Result<CacheOperator> {
let operator = init_operator(&conf.params)?;

// OpenDAL will send a real request to underlying storage to check whether it works or not.
// If this check failed, it's highly possible that the users have configured it wrongly.
//
// Make sure the check is called inside GlobalIORuntime to prevent
// IO hang on reuse connection.
let op = operator.clone();
if let Err(cause) = GlobalIORuntime::instance()
.spawn(async move { op.object("health_check").create().await })
.await
.expect("join must succeed")
{
return Err(ErrorCode::StorageUnavailable(format!(
"current configured cache is not available: config: {:?}, cause: {cause}",
conf
)));
}

Ok(CacheOperator { op })
Ok(CacheOperator { op: operator })
}

pub fn instance() -> CacheOperator {
match CACHE_OPERATOR.get() {
None => panic!("StorageOperator is not init"),
None => panic!("CacheOperator is not init"),
Some(op) => op.get(),
}
}
Expand Down
22 changes: 22 additions & 0 deletions src/query/service/src/global_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use common_catalog::catalog::CatalogManager;
use common_config::Config;
use common_exception::Result;
use common_fuse_meta::caches::CacheManager;
use common_storage::CacheOperator;
use common_storage::DataOperator;
use common_storage::ShareTableConfig;
use common_tracing::QueryLogger;
Expand All @@ -41,6 +42,7 @@ pub struct GlobalServices {
query_logger: UnsafeCell<Option<Arc<QueryLogger>>>,
cluster_discovery: UnsafeCell<Option<Arc<ClusterDiscovery>>>,
storage_operator: UnsafeCell<Option<DataOperator>>,
cache_operator: UnsafeCell<Option<CacheOperator>>,
cache_manager: UnsafeCell<Option<Arc<CacheManager>>>,
catalog_manager: UnsafeCell<Option<Arc<CatalogManager>>>,
http_query_manager: UnsafeCell<Option<Arc<HttpQueryManager>>>,
Expand All @@ -61,6 +63,7 @@ impl GlobalServices {
query_logger: UnsafeCell::new(None),
cluster_discovery: UnsafeCell::new(None),
storage_operator: UnsafeCell::new(None),
cache_operator: UnsafeCell::new(None),
cache_manager: UnsafeCell::new(None),
catalog_manager: UnsafeCell::new(None),
http_query_manager: UnsafeCell::new(None),
Expand All @@ -82,6 +85,7 @@ impl GlobalServices {
ClusterDiscovery::init(config.clone(), global_services.clone()).await?;

DataOperator::init(&config.storage, global_services.clone()).await?;
CacheOperator::init(&config.cache, global_services.clone()).await?;

ShareTableConfig::init(
&config.query.share_endpoint_address,
Expand Down Expand Up @@ -177,6 +181,24 @@ impl SingletonImpl<DataOperator> for GlobalServices {
}
}

impl SingletonImpl<CacheOperator> for GlobalServices {
fn get(&self) -> CacheOperator {
unsafe {
match &*self.cache_operator.get() {
None => panic!("CacheOperator is not init"),
Some(op) => op.clone(),
}
}
}

fn init(&self, value: CacheOperator) -> Result<()> {
unsafe {
*(self.cache_operator.get() as *mut Option<CacheOperator>) = Some(value);
Ok(())
}
}
}

impl SingletonImpl<Arc<CacheManager>> for GlobalServices {
fn get(&self) -> Arc<CacheManager> {
unsafe {
Expand Down
50 changes: 35 additions & 15 deletions src/query/service/tests/it/tests/sessions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use common_catalog::catalog::CatalogManager;
use common_config::Config;
use common_exception::Result;
use common_fuse_meta::caches::CacheManager;
use common_storage::CacheOperator;
use common_storage::DataOperator;
use common_tracing::set_panic_hook;
use common_tracing::QueryLogger;
Expand Down Expand Up @@ -62,11 +63,13 @@ impl Drop for TestGuard {
/// Hard code, in order to make each test share the global service instance, we made some hack code
/// - We use thread names as key to store global service instances, because rust test passes the test name through the thread name
/// - In the debug version, we enable the transfer of thread names by environment variables.
#[derive(Default)]
pub struct TestGlobalServices {
global_runtime: Mutex<HashMap<String, Arc<Runtime>>>,
query_logger: Mutex<HashMap<String, Arc<QueryLogger>>>,
cluster_discovery: Mutex<HashMap<String, Arc<ClusterDiscovery>>>,
storage_operator: Mutex<HashMap<String, DataOperator>>,
cache_operator: Mutex<HashMap<String, CacheOperator>>,
cache_manager: Mutex<HashMap<String, Arc<CacheManager>>>,
catalog_manager: Mutex<HashMap<String, Arc<CatalogManager>>>,
http_query_manager: Mutex<HashMap<String, Arc<HttpQueryManager>>>,
Expand All @@ -86,21 +89,7 @@ impl TestGlobalServices {
pub async fn setup(config: Config) -> Result<TestGuard> {
set_panic_hook();
std::env::set_var("UNIT_TEST", "TRUE");
let global_services = GLOBAL.get_or_init(|| {
Arc::new(TestGlobalServices {
global_runtime: Mutex::new(HashMap::new()),
query_logger: Mutex::new(HashMap::new()),
cluster_discovery: Mutex::new(HashMap::new()),
storage_operator: Mutex::new(HashMap::new()),
cache_manager: Mutex::new(HashMap::new()),
catalog_manager: Mutex::new(HashMap::new()),
http_query_manager: Mutex::new(HashMap::new()),
data_exchange_manager: Mutex::new(HashMap::new()),
session_manager: Mutex::new(HashMap::new()),
users_manager: Mutex::new(HashMap::new()),
users_role_manager: Mutex::new(HashMap::new()),
})
});
let global_services = GLOBAL.get_or_init(|| Arc::new(TestGlobalServices::default()));

// The order of initialization is very important
let app_name_shuffle = format!("{}-{}", config.query.tenant_id, config.query.cluster_id);
Expand All @@ -112,6 +101,7 @@ impl TestGlobalServices {
ClusterDiscovery::init(config.clone(), global_services.clone()).await?;

DataOperator::init(&config.storage, global_services.clone()).await?;
CacheOperator::init(&config.cache, global_services.clone()).await?;
CacheManager::init(&config.query, global_services.clone())?;
CatalogManager::init(&config, global_services.clone()).await?;
HttpQueryManager::init(&config, global_services.clone()).await?;
Expand Down Expand Up @@ -163,6 +153,12 @@ impl TestGlobalServices {
drop(storage_operator_guard);
drop(storage_operator);
}
{
let mut cache_operator_guard = self.cache_operator.lock();
let cache_operator = cache_operator_guard.remove(key);
drop(cache_operator_guard);
drop(cache_operator);
}
{
let mut cache_manager_guard = self.cache_manager.lock();
let cache_manager = cache_manager_guard.remove(key);
Expand Down Expand Up @@ -304,6 +300,30 @@ impl SingletonImpl<DataOperator> for TestGlobalServices {
}
}

impl SingletonImpl<CacheOperator> for TestGlobalServices {
fn get(&self) -> CacheOperator {
match std::thread::current().name() {
None => panic!("CacheOperator is not init"),
Some(name) => match self.cache_operator.lock().get(name) {
None => panic!("CacheOperator is not init, while in test '{}'", name),
Some(cache_operator) => cache_operator.clone(),
},
}
}

fn init(&self, value: CacheOperator) -> Result<()> {
match std::thread::current().name() {
None => panic!("thread name is none"),
Some(name) => match self.cache_operator.lock().entry(name.to_string()) {
Entry::Vacant(v) => v.insert(value),
Entry::Occupied(_v) => panic!("CacheOperator set twice in test[{:?}]", name),
},
};

Ok(())
}
}

impl SingletonImpl<Arc<CacheManager>> for TestGlobalServices {
fn get(&self) -> Arc<CacheManager> {
match std::thread::current().name() {
Expand Down

1 comment on commit 23b2390

@vercel
Copy link

@vercel vercel bot commented on 23b2390 Nov 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

databend – ./

databend-git-main-databend.vercel.app
databend.vercel.app
databend.rs
databend-databend.vercel.app

Please sign in to comment.