Skip to content

Commit

Permalink
refactor: remove count-table for tenant. It is not used at all (#15134)
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer authored Mar 30, 2024
1 parent b721dd0 commit 88a884e
Show file tree
Hide file tree
Showing 14 changed files with 13 additions and 427 deletions.
4 changes: 0 additions & 4 deletions src/meta/api/src/schema_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
use std::sync::Arc;

use databend_common_meta_app::schema::CatalogInfo;
use databend_common_meta_app::schema::CountTablesReply;
use databend_common_meta_app::schema::CountTablesReq;
use databend_common_meta_app::schema::CreateCatalogReply;
use databend_common_meta_app::schema::CreateCatalogReq;
use databend_common_meta_app::schema::CreateDatabaseReply;
Expand Down Expand Up @@ -272,8 +270,6 @@ pub trait SchemaApi: Send + Sync {
req: GcDroppedTableReq,
) -> Result<GcDroppedTableResp, KVAppError>;

async fn count_tables(&self, req: CountTablesReq) -> Result<CountTablesReply, KVAppError>;

async fn list_lock_revisions(
&self,
req: ListLockRevReq,
Expand Down
193 changes: 0 additions & 193 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,6 @@ use databend_common_meta_app::schema::CatalogIdToName;
use databend_common_meta_app::schema::CatalogInfo;
use databend_common_meta_app::schema::CatalogMeta;
use databend_common_meta_app::schema::CatalogNameIdent;
use databend_common_meta_app::schema::CountTablesKey;
use databend_common_meta_app::schema::CountTablesReply;
use databend_common_meta_app::schema::CountTablesReq;
use databend_common_meta_app::schema::CreateCatalogReply;
use databend_common_meta_app::schema::CreateCatalogReq;
use databend_common_meta_app::schema::CreateDatabaseReply;
Expand Down Expand Up @@ -204,7 +201,6 @@ use databend_common_meta_types::TxnGetRequest;
use databend_common_meta_types::TxnGetResponse;
use databend_common_meta_types::TxnOp;
use databend_common_meta_types::TxnRequest;
use databend_common_meta_types::UpsertKV;
use futures::TryStreamExt;
use log::debug;
use log::error;
Expand Down Expand Up @@ -1521,7 +1517,6 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {

debug!(req :? =(&req); "SchemaApi: {}", func_name!());

let tenant = req.name_ident.tenant();
let tenant_dbname_tbname = &req.name_ident;
let tenant_dbname = req.name_ident.db_name_ident();

Expand Down Expand Up @@ -1554,15 +1549,11 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
table_name: req.name_ident.table_name.clone(),
};

// fixed
let key_table_count = CountTablesKey::new(tenant.name());

// The keys of values to re-fetch for every retry in this txn.
let keys = vec![
key_dbid.to_string_key(),
key_dbid_tbname.to_string_key(),
key_table_id_list.to_string_key(),
key_table_count.to_string_key(),
];

// Initialize required key-values
Expand All @@ -1574,21 +1565,6 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
.collect::<Vec<_>>()
};

// Initialize table count if needed
assert_eq!(data[3].key, key_table_count.to_string_key());
if data[3].value.is_none() {
init_table_count(self, &key_table_count).await?;

// Re-fetch
data = {
let values = self.mget_kv(&keys).await?;
keys.iter()
.zip(values.into_iter())
.map(|(k, v)| TxnGetResponse::new(k, v.map(pb::SeqV::from)))
.collect::<Vec<_>>()
};
}

let mut trials = txn_backoff(None, func_name!());
loop {
trials.next().unwrap()?.await;
Expand Down Expand Up @@ -1630,7 +1606,6 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {

let mut condition = vec![];
let mut if_then = vec![];
let mut tb_count = None;

let opt = {
let d = data.remove(0);
Expand Down Expand Up @@ -1660,7 +1635,6 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
db_id.data,
false,
false,
&mut tb_count,
&mut condition,
&mut if_then,
)
Expand All @@ -1680,14 +1654,6 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
v.unwrap_or_default()
};

let mut tb_count = {
let d = data.remove(0);
let (k, v) = deserialize_id_get_response::<CountTablesKey>(d)?;
assert_eq!(key_table_count, k);

v.unwrap_or_default()
};

// Table id is unique and does not need to re-generate in every loop.
if key_table_id.is_none() {
let id = fetch_id(self, IdGenerator::table_id()).await?;
Expand Down Expand Up @@ -1733,15 +1699,6 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
txn_op_put(&key_table_id_to_name, serialize_struct(&key_dbid_tbname)?), /* __fd_table_id_to_name/db_id/table_name -> DBIdTableName */
]);

// tb_id_seq is 0 means that is a create operation, in this case need to update table count
if tb_id_seq == 0 {
tb_count.data.0 += 1;
// update table count atomically
condition.push(txn_cond_seq(&key_table_count, Eq, tb_count.seq));
// _fd_table_count/tenant -> tb_count
if_then.push(txn_op_put(&key_table_count, serialize_u64(tb_count.data)?));
}

let txn_req = TxnRequest {
condition,
if_then,
Expand Down Expand Up @@ -1811,9 +1768,6 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {

let tenant_dbname_tbname = &req.name_ident;
let tenant_dbname = req.name_ident.db_name_ident();
let mut tbcount_found = false;
let mut tb_count = 0;
let mut tb_count_seq;

let mut trials = txn_backoff(None, func_name!());
loop {
Expand Down Expand Up @@ -1883,22 +1837,6 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
let tbid = TableId { table_id };
let (tb_meta_seq, tb_meta): (_, Option<TableMeta>) = get_pb_value(self, &tbid).await?;

// get current table count from _fd_table_count/tenant
let tb_count_key = CountTablesKey {
tenant: tenant_dbname.tenant.name().to_string(),
};
(tb_count_seq, tb_count) = {
let (seq, count) = get_u64_value(self, &tb_count_key).await?;
if seq > 0 {
(seq, count)
} else if !tbcount_found {
// only count_tables for the first time.
tbcount_found = true;
(0, count_tables(self, &tb_count_key).await?)
} else {
(0, tb_count)
}
};
// add drop_on time on table meta
// (db_id, table_name) -> table_id

Expand Down Expand Up @@ -1928,8 +1866,6 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
txn_cond_seq(&dbid_tbname, Eq, tb_id_seq),
// table is not changed
txn_cond_seq(&tbid, Eq, tb_meta_seq),
// update table count atomically
txn_cond_seq(&tb_count_key, Eq, tb_count_seq),
],
if_then: vec![
// Changing a table in a db has to update the seq of db_meta,
Expand All @@ -1938,7 +1874,6 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
txn_op_put(&dbid_tbname, serialize_u64(table_id)?), /* (tenant, db_id, tb_name) -> tb_id */
// txn_op_put(&dbid_tbname_idlist, serialize_struct(&tb_id_list)?)?, // _fd_table_id_list/db_id/table_name -> tb_id_list
txn_op_put(&tbid, serialize_struct(&tb_meta)?), /* (tenant, db_id, tb_id) -> tb_meta */
txn_op_put(&tb_count_key, serialize_u64(tb_count + 1)?), /* _fd_table_count/tenant -> tb_count */
],
else_then: vec![],
};
Expand Down Expand Up @@ -2564,7 +2499,6 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
let table_id = req.tb_id;
debug!(req :? =(&table_id); "SchemaApi: {}", func_name!());

let mut tb_count = None;
let tenant = &req.tenant;

let mut trials = txn_backoff(None, func_name!());
Expand All @@ -2582,7 +2516,6 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
req.db_id,
req.if_exists,
true,
&mut tb_count,
&mut condition,
&mut if_then,
)
Expand Down Expand Up @@ -3584,65 +3517,6 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
Ok(GcDroppedTableResp {})
}

/// Get the count of tables for one tenant.
///
/// Accept tenant name and returns the count of tables for the tenant.
///
/// It get the count from kv space first,
/// if not found, it will compute the count by listing all databases and table ids.
#[logcall::logcall("debug")]
#[minitrace::trace]
async fn count_tables(&self, req: CountTablesReq) -> Result<CountTablesReply, KVAppError> {
debug!(req :? =(&req); "SchemaApi: {}", func_name!());

let key = CountTablesKey {
tenant: req.tenant.to_string_key(),
};

let count = loop {
let (seq, cnt) = {
// get the count from kv space first
let (seq, c) = get_u64_value(self, &key).await?;
if seq > 0 {
// if seq > 0, we can get the count directly
break c;
}

// if not, we should compute the count from by listing all databases and table ids

// this line of codes will only be executed once,
// because if `send_txn` failed, it means another txn will put the count value into the kv space,
// and then the next loop will get the count value through `get_u64_value`.
(0, count_tables(self, &key).await?)
};

let key = CountTablesKey {
tenant: req.tenant.clone(),
};

let txn_req = TxnRequest {
// table count should not be changed.
condition: vec![txn_cond_seq(&key, Eq, seq)],
if_then: vec![txn_op_put(&key, serialize_u64(cnt)?)],
else_then: vec![],
};

let (succ, _) = send_txn(self, txn_req).await?;
// if txn succeeds, count can be returned safely
if succ {
break cnt;
}
};

debug!(
tenant = &req.tenant,
count = count;
"count tables for a tenant"
);

Ok(CountTablesReply { count })
}

#[minitrace::trace]
async fn list_lock_revisions(
&self,
Expand Down Expand Up @@ -4275,7 +4149,6 @@ async fn construct_drop_table_txn_operations(
db_id: u64,
if_exists: bool,
if_delete: bool,
tb_count_opt: &mut Option<u64>,
condition: &mut Vec<TxnCondition>,
if_then: &mut Vec<TxnOp>,
) -> Result<(Option<(Vec<ShareSpec>, Vec<ShareTableInfoMap>)>, u64), KVAppError> {
Expand Down Expand Up @@ -4322,24 +4195,6 @@ async fn construct_drop_table_txn_operations(
};
}

// get current table count from _fd_table_count/<tenant>
let tb_count_key = CountTablesKey {
tenant: tenant.name().to_string(),
};
let (tb_count_seq, tb_count) = {
let (seq, count) = get_u64_value(kv_api, &tb_count_key).await?;
if seq > 0 {
(seq, count)
} else if tb_count_opt.is_none() {
// only count_tables for the first time.
let count = count_tables(kv_api, &tb_count_key).await?;
*tb_count_opt = Some(count);
(0, count)
} else {
(0, tb_count_opt.unwrap())
}
};

let (db_meta_seq, db_meta) = get_db_by_id_or_err(kv_api, db_id, "drop_table_by_id").await?;

// cannot operate on shared database
Expand Down Expand Up @@ -4385,11 +4240,6 @@ async fn construct_drop_table_txn_operations(
condition.push(txn_cond_seq(&dbid_tbname, Eq, tb_id_seq));
// (db_id, tb_name) -> tb_id
if_then.push(txn_op_del(&dbid_tbname));

// update table count atomically
condition.push(txn_cond_seq(&tb_count_key, Eq, tb_count_seq));
// _fd_table_count/tenant -> tb_count
if_then.push(txn_op_put(&tb_count_key, serialize_u64(tb_count - 1)?));
}

// remove table from share
Expand Down Expand Up @@ -4771,49 +4621,6 @@ fn table_has_to_not_exist(
}
}

/// Initialize count of tables for one tenant.
async fn init_table_count(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
key: &CountTablesKey,
) -> Result<(), KVAppError> {
let n = count_tables(kv_api, key).await?;

kv_api
.upsert_kv(UpsertKV::insert(key.to_string_key(), &serialize_u64(n)?))
.await?;

Ok(())
}

/// Get the count of tables for one tenant by listing databases and table ids.
///
/// It returns (seq, `u64` value).
/// If the count value is not in the kv space, (0, `u64` value) is returned.
async fn count_tables(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
key: &CountTablesKey,
) -> Result<u64, KVAppError> {
// For backward compatibility:
// If the table count of a tenant is not found in kv space,,
// we should compute the count by listing all tables of the tenant.
let databases = kv_api
.list_databases(ListDatabaseReq {
tenant: Tenant::new_or_err(&key.tenant, func_name!())?,
filter: None,
})
.await?;
let mut count = 0;
for db in databases.into_iter() {
let dbid_tbname = DBIdTableName {
db_id: db.ident.db_id,
table_name: "".to_string(),
};
let (_, ids) = list_u64_value(kv_api, &dbid_tbname).await?;
count += ids.len() as u64;
}
Ok(count)
}

async fn get_share_table_info_map(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
table_meta: &TableMeta,
Expand Down
Loading

0 comments on commit 88a884e

Please sign in to comment.