Skip to content

Commit

Permalink
Fix test global services
Browse files Browse the repository at this point in the history
Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo committed Nov 3, 2022
1 parent 10d7d15 commit f081659
Showing 1 changed file with 35 additions and 15 deletions.
50 changes: 35 additions & 15 deletions src/query/service/tests/it/tests/sessions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use common_catalog::catalog::CatalogManager;
use common_config::Config;
use common_exception::Result;
use common_fuse_meta::caches::CacheManager;
use common_storage::CacheOperator;
use common_storage::DataOperator;
use common_tracing::set_panic_hook;
use common_tracing::QueryLogger;
Expand Down Expand Up @@ -62,11 +63,13 @@ impl Drop for TestGuard {
/// Hard code, in order to make each test share the global service instance, we made some hack code
/// - We use thread names as key to store global service instances, because rust test passes the test name through the thread name
/// - In the debug version, we enable the transfer of thread names by environment variables.
#[derive(Default)]
pub struct TestGlobalServices {
global_runtime: Mutex<HashMap<String, Arc<Runtime>>>,
query_logger: Mutex<HashMap<String, Arc<QueryLogger>>>,
cluster_discovery: Mutex<HashMap<String, Arc<ClusterDiscovery>>>,
storage_operator: Mutex<HashMap<String, DataOperator>>,
cache_operator: Mutex<HashMap<String, CacheOperator>>,
cache_manager: Mutex<HashMap<String, Arc<CacheManager>>>,
catalog_manager: Mutex<HashMap<String, Arc<CatalogManager>>>,
http_query_manager: Mutex<HashMap<String, Arc<HttpQueryManager>>>,
Expand All @@ -86,21 +89,7 @@ impl TestGlobalServices {
pub async fn setup(config: Config) -> Result<TestGuard> {
set_panic_hook();
std::env::set_var("UNIT_TEST", "TRUE");
let global_services = GLOBAL.get_or_init(|| {
Arc::new(TestGlobalServices {
global_runtime: Mutex::new(HashMap::new()),
query_logger: Mutex::new(HashMap::new()),
cluster_discovery: Mutex::new(HashMap::new()),
storage_operator: Mutex::new(HashMap::new()),
cache_manager: Mutex::new(HashMap::new()),
catalog_manager: Mutex::new(HashMap::new()),
http_query_manager: Mutex::new(HashMap::new()),
data_exchange_manager: Mutex::new(HashMap::new()),
session_manager: Mutex::new(HashMap::new()),
users_manager: Mutex::new(HashMap::new()),
users_role_manager: Mutex::new(HashMap::new()),
})
});
let global_services = GLOBAL.get_or_init(|| Arc::new(TestGlobalServices::default()));

// The order of initialization is very important
let app_name_shuffle = format!("{}-{}", config.query.tenant_id, config.query.cluster_id);
Expand All @@ -112,6 +101,7 @@ impl TestGlobalServices {
ClusterDiscovery::init(config.clone(), global_services.clone()).await?;

DataOperator::init(&config.storage, global_services.clone()).await?;
CacheOperator::init(&config.cache, global_services.clone()).await?;
CacheManager::init(&config.query, global_services.clone())?;
CatalogManager::init(&config, global_services.clone()).await?;
HttpQueryManager::init(&config, global_services.clone()).await?;
Expand Down Expand Up @@ -163,6 +153,12 @@ impl TestGlobalServices {
drop(storage_operator_guard);
drop(storage_operator);
}
{
let mut cache_operator_guard = self.cache_operator.lock();
let cache_operator = cache_operator_guard.remove(key);
drop(cache_operator_guard);
drop(cache_operator);
}
{
let mut cache_manager_guard = self.cache_manager.lock();
let cache_manager = cache_manager_guard.remove(key);
Expand Down Expand Up @@ -304,6 +300,30 @@ impl SingletonImpl<DataOperator> for TestGlobalServices {
}
}

impl SingletonImpl<CacheOperator> for TestGlobalServices {
fn get(&self) -> CacheOperator {
match std::thread::current().name() {
None => panic!("CacheOperator is not init"),
Some(name) => match self.cache_operator.lock().get(name) {
None => panic!("CacheOperator is not init, while in test '{}'", name),
Some(cache_operator) => cache_operator.clone(),
},
}
}

fn init(&self, value: CacheOperator) -> Result<()> {
match std::thread::current().name() {
None => panic!("thread name is none"),
Some(name) => match self.cache_operator.lock().entry(name.to_string()) {
Entry::Vacant(v) => v.insert(value),
Entry::Occupied(_v) => panic!("CacheOperator set twice in test[{:?}]", name),
},
};

Ok(())
}
}

impl SingletonImpl<Arc<CacheManager>> for TestGlobalServices {
fn get(&self) -> Arc<CacheManager> {
match std::thread::current().name() {
Expand Down

0 comments on commit f081659

Please sign in to comment.