Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: replace String with Tenant #15119

Merged
merged 6 commits into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions src/binaries/metabench/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use databend_common_meta_app::schema::TableCopiedFileInfo;
use databend_common_meta_app::schema::TableCopiedFileNameIdent;
use databend_common_meta_app::schema::TableNameIdent;
use databend_common_meta_app::schema::UpsertTableOptionReq;
use databend_common_meta_app::tenant::Tenant;
use databend_common_meta_client::ClientHandle;
use databend_common_meta_client::MetaGrpcClient;
use databend_common_meta_kvapi::kvapi::KVApi;
Expand Down Expand Up @@ -177,7 +178,7 @@ async fn benchmark_upsert(client: &Arc<ClientHandle>, prefix: u64, client_num: u
}

async fn benchmark_table(client: &Arc<ClientHandle>, prefix: u64, client_num: u64, i: u64) {
let tenant = || format!("tenant-{}-{}", prefix, client_num);
let tenant = || Tenant::new_literal(&format!("tenant-{}-{}", prefix, client_num));
let db_name = || format!("db-{}-{}", prefix, client_num);
let table_name = || format!("table-{}-{}", prefix, client_num);

Expand Down Expand Up @@ -215,7 +216,7 @@ async fn benchmark_table(client: &Arc<ClientHandle>, prefix: u64, client_num: u6
print_res(i, "create_table", &res);

let res = client
.get_table(GetTableReq::new(tenant(), db_name(), table_name()))
.get_table(GetTableReq::new(&tenant(), db_name(), table_name()))
.await;

print_res(i, "get_table", &res);
Expand Down Expand Up @@ -256,12 +257,12 @@ async fn benchmark_table(client: &Arc<ClientHandle>, prefix: u64, client_num: u6
}

async fn benchmark_get_table(client: &Arc<ClientHandle>, prefix: u64, client_num: u64, i: u64) {
let tenant = || format!("tenant-{}-{}", prefix, client_num);
let tenant = || Tenant::new_literal(&format!("tenant-{}-{}", prefix, client_num));
let db_name = || format!("db-{}-{}", prefix, client_num);
let table_name = || format!("table-{}-{}", prefix, client_num);

let res = client
.get_table(GetTableReq::new(tenant(), db_name(), table_name()))
.get_table(GetTableReq::new(&tenant(), db_name(), table_name()))
.await;

print_res(i, "get_table", &res);
Expand Down
7 changes: 7 additions & 0 deletions src/meta/api/src/kv_app_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::any::type_name;

use databend_common_exception::ErrorCode;
use databend_common_meta_app::app_error::AppError;
use databend_common_meta_app::app_error::TenantIsEmpty;
use databend_common_meta_stoerr::MetaStorageError;
use databend_common_meta_types::InvalidReply;
use databend_common_meta_types::MetaAPIError;
Expand Down Expand Up @@ -100,6 +101,12 @@ impl From<InvalidReply> for KVAppError {
}
}

impl From<TenantIsEmpty> for KVAppError {
fn from(value: TenantIsEmpty) -> Self {
KVAppError::AppError(AppError::from(value))
}
}

impl TryInto<MetaAPIError> for KVAppError {
type Error = InvalidReply;

Expand Down
69 changes: 37 additions & 32 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
return Err(KVAppError::AppError(AppError::DatabaseAlreadyExists(
DatabaseAlreadyExists::new(
&name_key.db_name,
format!("create db: tenant: {}", name_key.tenant),
format!("create db: tenant: {}", name_key.tenant.name()),
),
)));
}
Expand Down Expand Up @@ -1555,7 +1555,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
};

// fixed
let key_table_count = CountTablesKey::new(tenant);
let key_table_count = CountTablesKey::new(tenant.name());

// The keys of values to re-fetch for every retry in this txn.
let keys = vec![
Expand Down Expand Up @@ -1621,7 +1621,10 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
// cannot operate on shared database
if let Some(from_share) = db_meta.data.from_share {
return Err(KVAppError::AppError(AppError::ShareHasNoGrantedPrivilege(
ShareHasNoGrantedPrivilege::new(&from_share.tenant, &from_share.share_name),
ShareHasNoGrantedPrivilege::new(
from_share.tenant.name(),
&from_share.share_name,
),
)));
}

Expand Down Expand Up @@ -1652,7 +1655,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
construct_drop_table_txn_operations(
self,
req.name_ident.table_name.clone(),
req.name_ident.tenant.clone(),
&req.name_ident.tenant,
*id.data,
db_id.data,
false,
Expand Down Expand Up @@ -1824,7 +1827,10 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
// cannot operate on shared database
if let Some(from_share) = db_meta.from_share {
return Err(KVAppError::AppError(AppError::ShareHasNoGrantedPrivilege(
ShareHasNoGrantedPrivilege::new(&from_share.tenant, &from_share.share_name),
ShareHasNoGrantedPrivilege::new(
from_share.tenant.name(),
&from_share.share_name,
),
)));
}

Expand Down Expand Up @@ -1879,7 +1885,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {

// get current table count from _fd_table_count/tenant
let tb_count_key = CountTablesKey {
tenant: tenant_dbname.tenant.clone(),
tenant: tenant_dbname.tenant.name().to_string(),
};
(tb_count_seq, tb_count) = {
let (seq, count) = get_u64_value(self, &tb_count_key).await?;
Expand Down Expand Up @@ -1978,7 +1984,10 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
// cannot operate on shared database
if let Some(from_share) = db_meta.from_share {
return Err(KVAppError::AppError(AppError::ShareHasNoGrantedPrivilege(
ShareHasNoGrantedPrivilege::new(&from_share.tenant, &from_share.share_name),
ShareHasNoGrantedPrivilege::new(
from_share.tenant.name(),
&from_share.share_name,
),
)));
}

Expand Down Expand Up @@ -2228,7 +2237,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
name: tenant_dbname_tbname.table_name.clone(),
// Safe unwrap() because: tb_meta_seq > 0
meta: tb_meta.unwrap(),
tenant: req.tenant.clone(),
tenant: req.tenant.name().to_string(),
db_type,
};

Expand Down Expand Up @@ -2353,7 +2362,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
),
name: table_id_list_key.table_name.clone(),
meta: tb_meta,
tenant: tenant_dbname.tenant.clone(),
tenant: tenant_dbname.tenant.name().to_string(),
db_type,
};

Expand Down Expand Up @@ -2568,7 +2577,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
let opt = construct_drop_table_txn_operations(
self,
req.table_name.clone(),
req.tenant.clone(),
&req.tenant,
table_id,
req.db_id,
req.if_exists,
Expand All @@ -2592,7 +2601,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
let (succ, _responses) = send_txn(self, txn_req).await?;

debug!(
tenant :% =(&tenant),
tenant :% =(tenant.display()),
id :? =(&table_id),
succ = succ;
"drop_table_by_id"
Expand Down Expand Up @@ -4261,7 +4270,7 @@ async fn construct_drop_index_txn_operations(
async fn construct_drop_table_txn_operations(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
table_name: String,
tenant: String,
tenant: &Tenant,
table_id: u64,
db_id: u64,
if_exists: bool,
Expand Down Expand Up @@ -4315,7 +4324,7 @@ async fn construct_drop_table_txn_operations(

// get current table count from _fd_table_count/<tenant>
let tb_count_key = CountTablesKey {
tenant: tenant.clone(),
tenant: tenant.name().to_string(),
};
let (tb_count_seq, tb_count) = {
let (seq, count) = get_u64_value(kv_api, &tb_count_key).await?;
Expand All @@ -4336,13 +4345,13 @@ async fn construct_drop_table_txn_operations(
// cannot operate on shared database
if let Some(from_share) = db_meta.from_share {
return Err(KVAppError::AppError(AppError::ShareHasNoGrantedPrivilege(
ShareHasNoGrantedPrivilege::new(&from_share.tenant, &from_share.share_name),
ShareHasNoGrantedPrivilege::new(from_share.tenant.name(), &from_share.share_name),
)));
}

debug!(
ident :% =(&tbid),
tenant :% =(&tenant);
tenant :% =(tenant.display());
"drop table by id"
);

Expand Down Expand Up @@ -4387,15 +4396,8 @@ async fn construct_drop_table_txn_operations(
let mut spec_vec = Vec::with_capacity(db_meta.shared_by.len());
let mut mut_share_table_info = Vec::with_capacity(db_meta.shared_by.len());
for share_id in &db_meta.shared_by {
let res = remove_table_from_share(
kv_api,
*share_id,
table_id,
tenant.clone(),
condition,
if_then,
)
.await;
let res =
remove_table_from_share(kv_api, *share_id, table_id, tenant, condition, if_then).await;

match res {
Ok((share_name, share_meta, share_table_info)) => {
Expand All @@ -4409,7 +4411,9 @@ async fn construct_drop_table_txn_operations(
KVAppError::AppError(AppError::UnknownShareId(_)) => {
error!(
"UnknownShareId {} when drop_table_by_id tenant:{} table_id:{} shared by",
share_id, tenant, table_id
share_id,
tenant.name(),
table_id
);
}
_ => return Err(e),
Expand Down Expand Up @@ -4794,7 +4798,7 @@ async fn count_tables(
// we should compute the count by listing all tables of the tenant.
let databases = kv_api
.list_databases(ListDatabaseReq {
tenant: key.tenant.clone(),
tenant: Tenant::new_or_err(&key.tenant, func_name!())?,
filter: None,
})
.await?;
Expand Down Expand Up @@ -5015,7 +5019,7 @@ async fn batch_filter_table_info(
),
name: table_name.clone(),
meta: tb_meta,
tenant: tenant_dbname.tenant.clone(),
tenant: tenant_dbname.tenant.name().to_string(),
db_type: DatabaseType::NormalDB,
};

Expand Down Expand Up @@ -5202,7 +5206,7 @@ pub(crate) async fn get_index_or_err(
async fn gc_dropped_db_by_id(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
db_id: u64,
tenant: String,
tenant: Tenant,
db_name: String,
) -> Result<(), KVAppError> {
// List tables by tenant, db_id, table_name.
Expand Down Expand Up @@ -5307,7 +5311,7 @@ async fn gc_dropped_db_by_id(

async fn gc_dropped_table_by_id(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
tenant: String,
tenant: Tenant,
db_id: u64,
table_id: u64,
table_name: String,
Expand Down Expand Up @@ -5399,13 +5403,14 @@ async fn gc_dropped_table_data(

async fn gc_dropped_table_index(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
tenant: &str,
tenant: &Tenant,
table_id: u64,
if_then: &mut Vec<TxnOp>,
) -> Result<(), KVAppError> {
// Get index id list by `prefix_list` "<prefix>/<tenant>"
// Get index id list by `prefix_list` "<prefix>/<tenant>/"
let prefix_key = kvapi::KeyBuilder::new_prefixed(IndexNameIdent::PREFIX)
.push_str(tenant)
.push_str(tenant.name())
.push_raw("")
.done();

let id_list = kv_api.prefix_list_kv(&prefix_key).await?;
Expand Down
Loading
Loading