Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: replace NonEmptyString with Tenant #15103

Merged
merged 4 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion src/bendpy/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use databend_common_exception::Result;
use databend_common_meta_app::principal::GrantObject;
use databend_common_meta_app::principal::UserInfo;
use databend_common_meta_app::principal::UserPrivilegeSet;
use databend_common_meta_app::tenant::Tenant;
use databend_common_meta_types::NonEmptyString;
use databend_common_users::UserApiProvider;
use databend_query::sessions::QueryContext;
Expand Down Expand Up @@ -57,13 +58,14 @@ impl PySessionContext {
};

let tenant = NonEmptyString::new(tenant).unwrap();
let tenant = Tenant::new_nonempty(tenant);

let config = GlobalConfig::instance();
UserApiProvider::try_create_simple(config.meta.to_meta_grpc_client_conf(), &tenant)
.await
.unwrap();

session.set_current_tenant(tenant.to_string());
session.set_current_tenant(tenant);

let mut user = UserInfo::new_no_auth("root", "%");
user.grants.grant_privileges(
Expand Down
2 changes: 1 addition & 1 deletion src/bendpy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ fn databend(_py: Python, m: &PyModule) -> PyResult<()> {
GlobalServices::init(&conf).await.unwrap();

// init oss license manager
OssLicenseManager::init(conf.query.tenant_id.to_string()).unwrap();
OssLicenseManager::init(conf.query.tenant_id.name().to_string()).unwrap();
ClusterDiscovery::instance()
.register_to_metastore(&conf)
.await
Expand Down
2 changes: 1 addition & 1 deletion src/binaries/query/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ async fn precheck_services(conf: &InnerConfig) -> Result<()> {
traces_sample_rate,
..Default::default()
})));
sentry::configure_scope(|scope| scope.set_tag("tenant", tenant));
sentry::configure_scope(|scope| scope.set_tag("tenant", tenant.name()));
sentry::configure_scope(|scope| scope.set_tag("cluster_id", cluster_id));
sentry::configure_scope(|scope| scope.set_tag("address", flight_addr));
}
Expand Down
2 changes: 1 addition & 1 deletion src/binaries/query/oss_main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,6 @@ async fn main_entrypoint() -> Result<()> {

init_services(&conf).await?;
// init oss license manager
OssLicenseManager::init(conf.query.tenant_id.to_string())?;
OssLicenseManager::init(conf.query.tenant_id.name().to_string())?;
start_services(&conf).await
}
8 changes: 2 additions & 6 deletions src/meta/api/src/crud/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use databend_common_meta_kvapi::kvapi::ValueWithName;
use databend_common_meta_types::MatchSeq;
use databend_common_meta_types::MatchSeqExt;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::NonEmptyString;
use databend_common_meta_types::SeqV;
use databend_common_meta_types::SeqValue;
use databend_common_meta_types::With;
Expand Down Expand Up @@ -58,13 +57,10 @@ pub struct CrudMgr<R> {

impl<R> CrudMgr<R> {
/// Create a new `CrudMgr` instance providing CRUD access for a key space defined by `R`: [`TenantResource`].
pub fn create(
kv_api: Arc<dyn kvapi::KVApi<Error = MetaError>>,
tenant: &NonEmptyString,
) -> Self {
pub fn create(kv_api: Arc<dyn kvapi::KVApi<Error = MetaError>>, tenant: &Tenant) -> Self {
CrudMgr {
kv_api,
tenant: Tenant::new_nonempty(tenant.clone()),
tenant: tenant.clone(),
_p: Default::default(),
}
}
Expand Down
43 changes: 27 additions & 16 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use databend_common_meta_app::app_error::StreamVersionMismatched;
use databend_common_meta_app::app_error::TableAlreadyExists;
use databend_common_meta_app::app_error::TableLockExpired;
use databend_common_meta_app::app_error::TableVersionMismatched;
use databend_common_meta_app::app_error::TenantIsEmpty;
use databend_common_meta_app::app_error::UndropDbHasNoHistory;
use databend_common_meta_app::app_error::UndropDbWithNoDropTime;
use databend_common_meta_app::app_error::UndropTableAlreadyExists;
Expand Down Expand Up @@ -180,6 +181,8 @@ use databend_common_meta_app::share::ShareGrantObject;
use databend_common_meta_app::share::ShareNameIdent;
use databend_common_meta_app::share::ShareSpec;
use databend_common_meta_app::share::ShareTableInfoMap;
use databend_common_meta_app::tenant::Tenant;
use databend_common_meta_app::KeyWithTenant;
use databend_common_meta_kvapi::kvapi;
use databend_common_meta_kvapi::kvapi::Key;
use databend_common_meta_kvapi::kvapi::UpsertKVReq;
Expand All @@ -193,6 +196,7 @@ use databend_common_meta_types::MatchSeqExt;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::MetaId;
use databend_common_meta_types::MetaNetworkError;
use databend_common_meta_types::NonEmptyString;
use databend_common_meta_types::Operation;
use databend_common_meta_types::SeqV;
use databend_common_meta_types::TxnCondition;
Expand Down Expand Up @@ -933,7 +937,10 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
return Err(KVAppError::AppError(AppError::IndexAlreadyExists(
IndexAlreadyExists::new(
&tenant_index.index_name,
format!("create index with tenant: {}", tenant_index.tenant),
format!(
"create index with tenant: {}",
tenant_index.tenant.display()
),
),
)));
}
Expand Down Expand Up @@ -3892,8 +3899,8 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
} else {
Err(KVAppError::AppError(AppError::CatalogAlreadyExists(
CatalogAlreadyExists::new(
&name_key.catalog_name,
format!("create catalog: tenant: {}", name_key.tenant),
name_key.name(),
format!("create catalog: tenant: {}", name_key.tenant_name()),
),
)))
};
Expand Down Expand Up @@ -3972,8 +3979,12 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
loop {
trials.next().unwrap()?.await;

let res =
get_catalog_or_err(self, name_key, format!("drop_catalog: {}", &name_key)).await;
let res = get_catalog_or_err(
self,
name_key,
format!("drop_catalog: {}", name_key.display()),
)
.await;

let (_, catalog_id, catalog_meta_seq, _) = match res {
Ok(x) => x,
Expand Down Expand Up @@ -4041,11 +4052,11 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
) -> Result<Vec<Arc<CatalogInfo>>, KVAppError> {
debug!(req :? =(&req); "SchemaApi: {}", func_name!());

let name_key = CatalogNameIdent {
tenant: req.tenant,
// Using a empty catalog to to list all
catalog_name: "".to_string(),
};
let tenant = Tenant::new_nonempty(
NonEmptyString::new(req.tenant)
.map_err(|_e| AppError::from(TenantIsEmpty::new("SchemaApi::list_catalogs")))?,
);
let name_key = CatalogNameIdent::new(tenant, "");

// Pairs of catalog-name and catalog_id with seq
let (tenant_catalog_names, catalog_ids) = list_u64_value(self, &name_key).await?;
Expand Down Expand Up @@ -4076,10 +4087,10 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
catalog_id: catalog_ids[i],
}
.into(),
name_ident: CatalogNameIdent {
tenant: name_key.tenant.clone(),
catalog_name: tenant_catalog_names[i].catalog_name.clone(),
}
name_ident: CatalogNameIdent::new(
name_key.tenant().clone(),
tenant_catalog_names[i].name(),
)
.into(),
meta: catalog_meta,
};
Expand Down Expand Up @@ -5494,8 +5505,8 @@ pub fn catalog_has_to_exist(

Err(KVAppError::AppError(AppError::UnknownCatalog(
UnknownCatalog::new(
&catalog_name_ident.catalog_name,
format!("{}: {}", msg, catalog_name_ident),
catalog_name_ident.name(),
format!("{}: {}", msg, catalog_name_ident.display()),
),
)))
} else {
Expand Down
20 changes: 8 additions & 12 deletions src/meta/api/src/schema_api_test_suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1419,16 +1419,17 @@ impl SchemaApiTestSuite {

#[minitrace::trace]
async fn catalog_create_get_list_drop<MT: SchemaApi>(&self, mt: &MT) -> anyhow::Result<()> {
let tenant = "tenant1";
let tenant_name = "tenant1";
let tenant = Tenant::new_literal(tenant_name);

let catalog_name = "catalog1";

let ident = CatalogNameIdent::new(tenant.clone(), catalog_name);

info!("--- create catalog1");
let req = CreateCatalogReq {
if_not_exists: false,
name_ident: CatalogNameIdent {
tenant: tenant.to_string(),
catalog_name: catalog_name.to_string(),
},
name_ident: ident.clone(),
meta: CatalogMeta {
catalog_option: CatalogOption::Iceberg(IcebergCatalogOption {
storage_params: Box::new(StorageParams::S3(StorageS3Config {
Expand All @@ -1443,9 +1444,7 @@ impl SchemaApiTestSuite {
let res = mt.create_catalog(req).await?;
info!("create catalog res: {:?}", res);

let got = mt
.get_catalog(GetCatalogReq::new(tenant, catalog_name))
.await?;
let got = mt.get_catalog(GetCatalogReq::new(ident.clone())).await?;
assert_eq!(got.id.catalog_id, res.catalog_id);
assert_eq!(got.name_ident.tenant, "tenant1");
assert_eq!(got.name_ident.catalog_name, "catalog1");
Expand All @@ -1458,10 +1457,7 @@ impl SchemaApiTestSuite {
let _ = mt
.drop_catalog(DropCatalogReq {
if_exists: false,
name_ident: CatalogNameIdent {
tenant: tenant.to_string(),
catalog_name: catalog_name.to_string(),
},
name_ident: ident.clone(),
})
.await?;

Expand Down
1 change: 0 additions & 1 deletion src/meta/app/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ anyerror = { workspace = true }
chrono = { workspace = true }
chrono-tz = { workspace = true }
cron = "0.12.0"
derive_more = { workspace = true }
enumflags2 = { workspace = true }
hex = "0.4.3"
itertools = { workspace = true }
Expand Down
30 changes: 30 additions & 0 deletions src/meta/app/src/app_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,26 @@ pub trait AppErrorMessage: Display {
}
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, thiserror::Error)]
#[error("Tenant is empty when: `{context}`")]
pub struct TenantIsEmpty {
context: String,
}

impl TenantIsEmpty {
pub fn new(context: impl ToString) -> Self {
Self {
context: context.to_string(),
}
}
}

impl From<TenantIsEmpty> for ErrorCode {
fn from(err: TenantIsEmpty) -> Self {
ErrorCode::TenantIsEmpty(err.to_string())
}
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, thiserror::Error)]
#[error("DatabaseAlreadyExists: `{db_name}` while `{context}`")]
pub struct DatabaseAlreadyExists {
Expand Down Expand Up @@ -890,6 +910,9 @@ impl VirtualColumnNotFound {
/// The application does not get expected result but there is nothing wrong with meta-service.
#[derive(thiserror::Error, serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)]
pub enum AppError {
#[error(transparent)]
TenantIsEmpty(#[from] TenantIsEmpty),

#[error(transparent)]
TableVersionMismatched(#[from] TableVersionMismatched),

Expand Down Expand Up @@ -1051,6 +1074,12 @@ pub enum AppError {
MultiStatementTxnCommitFailed(#[from] MultiStmtTxnCommitFailed),
}

impl AppErrorMessage for TenantIsEmpty {
fn message(&self) -> String {
self.to_string()
}
}

impl AppErrorMessage for UnknownBackgroundJob {
fn message(&self) -> String {
format!("Unknown background job '{}'", self.name)
Expand Down Expand Up @@ -1380,6 +1409,7 @@ impl AppErrorMessage for VirtualColumnAlreadyExists {
impl From<AppError> for ErrorCode {
fn from(app_err: AppError) -> Self {
match app_err {
AppError::TenantIsEmpty(err) => ErrorCode::TenantIsEmpty(err.message()),
AppError::UnknownDatabase(err) => ErrorCode::UnknownDatabase(err.message()),
AppError::UnknownDatabaseId(err) => ErrorCode::UnknownDatabaseId(err.message()),
AppError::UnknownTableId(err) => ErrorCode::UnknownTableId(err.message()),
Expand Down
16 changes: 10 additions & 6 deletions src/meta/app/src/principal/user_defined_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,18 @@ use chrono::Utc;
use databend_common_expression::types::DataType;
use databend_common_meta_kvapi::kvapi::Key;

use crate::tenant::Tenant;

#[derive(Clone, Debug, Eq, PartialEq)]
pub struct UdfName {
pub tenant: String,
pub tenant: Tenant,
pub name: String,
}

impl UdfName {
pub fn new(tenant: impl ToString, name: impl ToString) -> Self {
pub fn new(tenant: &Tenant, name: impl ToString) -> Self {
Self {
tenant: tenant.to_string(),
tenant: tenant.clone(),
name: name.to_string(),
}
}
Expand Down Expand Up @@ -219,23 +221,25 @@ mod kv_api_impl {

/// It belongs to a tenant
fn parent(&self) -> Option<String> {
Some(Tenant::new(&self.tenant).to_string_key())
Some(self.tenant.to_string_key())
}

fn to_string_key(&self) -> String {
kvapi::KeyBuilder::new_prefixed(Self::PREFIX)
.push_str(&self.tenant)
.push_str(self.tenant.name())
.push_str(&self.name)
.done()
}

fn from_str_key(s: &str) -> Result<Self, kvapi::KeyError> {
let mut p = kvapi::KeyParser::new_prefixed(s, Self::PREFIX)?;

let tenant = p.next_str()?;
let tenant = p.next_nonempty()?;
let name = p.next_str()?;
p.done()?;

let tenant = Tenant::new_nonempty(tenant);

Ok(UdfName { tenant, name })
}
}
Expand Down
Loading
Loading