Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Define DataMaskIdIdent with TIdent #15331

Merged
merged 1 commit into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 37 additions & 38 deletions src/meta/api/src/data_mask_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use databend_common_meta_app::app_error::DatamaskAlreadyExists;
use databend_common_meta_app::app_error::UnknownDatamask;
use databend_common_meta_app::data_mask::CreateDatamaskReply;
use databend_common_meta_app::data_mask::CreateDatamaskReq;
use databend_common_meta_app::data_mask::DataMaskIdIdent;
use databend_common_meta_app::data_mask::DataMaskNameIdent;
use databend_common_meta_app::data_mask::DatamaskId;
use databend_common_meta_app::data_mask::DatamaskMeta;
use databend_common_meta_app::data_mask::DropDatamaskReply;
use databend_common_meta_app::data_mask::DropDatamaskReq;
Expand All @@ -32,9 +32,11 @@ use databend_common_meta_app::id_generator::IdGenerator;
use databend_common_meta_app::schema::CreateOption;
use databend_common_meta_app::schema::TableId;
use databend_common_meta_app::schema::TableMeta;
use databend_common_meta_app::KeyWithTenant;
use databend_common_meta_kvapi::kvapi;
use databend_common_meta_types::ConditionResult::Eq;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::SeqValue;
use databend_common_meta_types::TxnCondition;
use databend_common_meta_types::TxnOp;
use databend_common_meta_types::TxnRequest;
Expand All @@ -46,10 +48,12 @@ use crate::fetch_id;
use crate::get_pb_value;
use crate::get_u64_value;
use crate::kv_app_error::KVAppError;
use crate::kv_pb_api::KVPbApi;
use crate::send_txn;
use crate::serialize_struct;
use crate::serialize_u64;
use crate::txn_backoff::txn_backoff;
use crate::txn_cond_eq_seq;
use crate::txn_cond_seq;
use crate::txn_op_del;
use crate::txn_op_put;
Expand All @@ -64,15 +68,15 @@ impl<KV: kvapi::KVApi<Error = MetaError>> DatamaskApi for KV {
) -> Result<CreateDatamaskReply, KVAppError> {
debug!(req :? =(&req); "DatamaskApi: {}", func_name!());

let name_key = &req.name;
let name_ident = &req.name;

let mut trials = txn_backoff(None, func_name!());
let id = loop {
trials.next().unwrap()?.await;

// Get db mask by name to ensure absence
let (seq, id) = get_u64_value(self, name_key).await?;
debug!(seq = seq, id = id, name_key :? =(name_key); "create_data_mask");
let (seq, id) = get_u64_value(self, name_ident).await?;
debug!(seq = seq, id = id, name_key :? =(name_ident); "create_data_mask");

let mut condition = vec![];
let mut if_then = vec![];
Expand All @@ -82,7 +86,7 @@ impl<KV: kvapi::KVApi<Error = MetaError>> DatamaskApi for KV {
CreateOption::Create => {
return Err(KVAppError::AppError(AppError::DatamaskAlreadyExists(
DatamaskAlreadyExists::new(
name_key.name(),
name_ident.name(),
format!("create data mask: {}", req.name.display()),
),
)));
Expand All @@ -91,7 +95,7 @@ impl<KV: kvapi::KVApi<Error = MetaError>> DatamaskApi for KV {
CreateOption::CreateOrReplace => {
construct_drop_mask_policy_operations(
self,
name_key,
name_ident,
false,
false,
func_name!(),
Expand All @@ -109,22 +113,23 @@ impl<KV: kvapi::KVApi<Error = MetaError>> DatamaskApi for KV {
// data mask name -> data mask table id list

let id = fetch_id(self, IdGenerator::data_mask_id()).await?;
let id_key = DatamaskId { id };
let id_list_key = MaskPolicyTableIdListIdent::new_from(name_key.clone());

let id_ident = DataMaskIdIdent::new(name_ident.tenant(), id);
let id_list_key = MaskPolicyTableIdListIdent::new_from(name_ident.clone());

debug!(
id :? =(&id_key),
name_key :? =(name_key);
id :? =(&id_ident),
name_key :? =(name_ident);
"new datamask id"
);

{
let meta: DatamaskMeta = req.clone().into();
let id_list = MaskpolicyTableIdList::default();
condition.push(txn_cond_seq(name_key, Eq, seq));
condition.push(txn_cond_seq(name_ident, Eq, seq));
if_then.extend( vec![
txn_op_put(name_key, serialize_u64(id)?), // name -> db_id
txn_op_put(&id_key, serialize_struct(&meta)?), // id -> meta
txn_op_put(name_ident, serialize_u64(id)?), // name -> db_id
txn_op_put(&id_ident, serialize_struct(&meta)?), // id -> meta
txn_op_put(&id_list_key, serialize_struct(&id_list)?), /* data mask name -> id_list */
]);

Expand All @@ -137,8 +142,8 @@ impl<KV: kvapi::KVApi<Error = MetaError>> DatamaskApi for KV {
let (succ, _responses) = send_txn(self, txn_req).await?;

debug!(
name :? =(name_key),
id :? =(&id_key),
name :? =(name_ident),
id :? =(&id_ident),
succ = succ;
"create_data_mask"
);
Expand Down Expand Up @@ -217,35 +222,28 @@ async fn get_data_mask_or_err(
msg: impl Display,
) -> Result<(u64, u64, u64, DatamaskMeta), KVAppError> {
let (id_seq, id) = get_u64_value(kv_api, name_key).await?;
data_mask_has_to_exist(id_seq, name_key, &msg)?;
assert_data_mask_exist(id_seq, name_key, &msg)?;

let id_key = DatamaskId { id };
let id_ident = DataMaskIdIdent::new(name_key.tenant(), id);

let (data_mask_seq, data_mask) = get_pb_value(kv_api, &id_key).await?;
data_mask_has_to_exist(data_mask_seq, name_key, msg)?;
let seq_v = kv_api.get_pb(&id_ident).await?;
assert_data_mask_exist(seq_v.seq(), name_key, msg)?;

Ok((
id_seq,
id,
data_mask_seq,
// Safe unwrap(): data_mask_seq > 0 implies data_mask is not None.
data_mask.unwrap(),
))
// Safe unwrap(): data_mask_seq > 0 implies data_mask is not None.
Ok((id_seq, id, seq_v.seq(), seq_v.unwrap().data))
}

/// Return OK if a db_id or db_meta exists by checking the seq.
///
/// Otherwise returns UnknownDatamask error
pub fn data_mask_has_to_exist(
pub fn assert_data_mask_exist(
seq: u64,
name_ident: &DataMaskNameIdent,
msg: impl Display,
) -> Result<(), KVAppError> {
) -> Result<(), AppError> {
if seq == 0 {
debug!(seq = seq, name_ident :? =(name_ident); "data mask does not exist");

Err(KVAppError::AppError(AppError::UnknownDatamask(
UnknownDatamask::new(name_ident.name(), format!("{}: {}", msg, name_ident.name())),
Err(AppError::UnknownDatamask(UnknownDatamask::new(
name_ident.name(),
format!("{}: {}", msg, name_ident.data_mask_name()),
)))
} else {
Ok(())
Expand Down Expand Up @@ -319,20 +317,21 @@ async fn construct_drop_mask_policy_operations(
return Err(err);
}
};
let id_key = DatamaskId { id };

condition.push(txn_cond_seq(&id_key, Eq, data_mask_seq));
if_then.push(txn_op_del(&id_key));
let id_ident = DataMaskIdIdent::new(name_key.tenant(), id);

condition.push(txn_cond_eq_seq(&id_ident, data_mask_seq));
if_then.push(txn_op_del(&id_ident));

if if_delete {
condition.push(txn_cond_seq(name_key, Eq, id_seq));
condition.push(txn_cond_eq_seq(name_key, id_seq));
if_then.push(txn_op_del(name_key));
clear_table_column_mask_policy(kv_api, name_key, condition, if_then).await?;
}

debug!(
name :? =(name_key),
id :? =(&DatamaskId { id }),
id :? =id,
ctx = ctx;
"construct_drop_mask_policy_operations"
);
Expand Down
10 changes: 7 additions & 3 deletions src/meta/api/src/schema_api_test_suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ use databend_common_expression::TableDataType;
use databend_common_expression::TableField;
use databend_common_expression::TableSchema;
use databend_common_meta_app::data_mask::CreateDatamaskReq;
use databend_common_meta_app::data_mask::DataMaskIdIdent;
use databend_common_meta_app::data_mask::DataMaskNameIdent;
use databend_common_meta_app::data_mask::DatamaskId;
use databend_common_meta_app::data_mask::DatamaskMeta;
use databend_common_meta_app::data_mask::DropDatamaskReq;
use databend_common_meta_app::data_mask::MaskPolicyTableIdListIdent;
Expand Down Expand Up @@ -3090,7 +3090,9 @@ impl SchemaApiTestSuite {
};
mt.create_data_mask(req).await?;
let old_id: u64 = get_kv_u64_data(mt.as_kv_api(), &name).await?;
let id_key = DatamaskId { id: old_id };

let id_key = DataMaskIdIdent::new(&tenant, old_id);

let meta: DatamaskMeta = get_kv_data(mt.as_kv_api(), &id_key).await?;
assert_eq!(meta.comment, Some("before".to_string()));

Expand All @@ -3111,7 +3113,9 @@ impl SchemaApiTestSuite {

let id: u64 = get_kv_u64_data(mt.as_kv_api(), &name).await?;
assert_ne!(old_id, id);
let id_key = DatamaskId { id };

let id_key = DataMaskIdIdent::new(&tenant, id);

let meta: DatamaskMeta = get_kv_data(mt.as_kv_api(), &id_key).await?;
assert_eq!(meta.comment, Some("after".to_string()));
}
Expand Down
90 changes: 90 additions & 0 deletions src/meta/app/src/data_mask/data_mask_id_ident.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::tenant_key::ident::TIdent;
use crate::tenant_key::raw::TIdentRaw;

pub type DataMaskIdIdent = TIdent<Resource, u64>;
pub type DataMaskIdIdentRaw = TIdentRaw<Resource, u64>;

pub use kvapi_impl::Resource;

impl DataMaskIdIdent {
pub fn data_mask_id(&self) -> u64 {
*self.name()
}
}

impl DataMaskIdIdentRaw {
pub fn data_mask_id(&self) -> u64 {
*self.name()
}
}

mod kvapi_impl {

use databend_common_meta_kvapi::kvapi;

use crate::data_mask::DatamaskMeta;
use crate::tenant_key::resource::TenantResource;

pub struct Resource;
impl TenantResource for Resource {
const PREFIX: &'static str = "__fd_datamask_by_id";
const TYPE: &'static str = "DataMaskIdIdent";
const HAS_TENANT: bool = false;
type ValueType = DatamaskMeta;
}

impl kvapi::Value for DatamaskMeta {
fn dependency_keys(&self) -> impl IntoIterator<Item = String> {
[]
}
}

// // Use these error types to replace usage of ErrorCode if possible.
// impl From<ExistError<Resource>> for ErrorCode {
// impl From<UnknownError<Resource>> for ErrorCode {
}

#[cfg(test)]
mod tests {
use databend_common_meta_kvapi::kvapi::Key;

use super::DataMaskIdIdent;
use crate::tenant::Tenant;

#[test]
fn test_data_mask_id_ident() {
let tenant = Tenant::new_literal("dummy");
let ident = DataMaskIdIdent::new(tenant, 3);

let key = ident.to_string_key();
assert_eq!(key, "__fd_datamask_by_id/3");

assert_eq!(ident, DataMaskIdIdent::from_str_key(&key).unwrap());
}

#[test]
fn test_data_mask_id_ident_with_key_space() {
// TODO(xp): implement this test
// let tenant = Tenant::new_literal("test");
// let ident = DataMaskIdIdent::new(tenant, 3);
//
// let key = ident.to_string_key();
// assert_eq!(key, "__fd_background_job_by_id/3");
//
// assert_eq!(ident, DataMaskIdIdent::from_str_key(&key).unwrap());
}
}
20 changes: 17 additions & 3 deletions src/meta/app/src/data_mask/data_mask_name_ident.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,42 @@
// limitations under the License.

use crate::tenant_key::ident::TIdent;
use crate::tenant_key::raw::TIdentRaw;

pub type DataMaskNameIdent = TIdent<Resource>;
pub type DataMaskNameIdentRaw = TIdentRaw<Resource>;

pub use kvapi_impl::Resource;

impl DataMaskNameIdent {
pub fn data_mask_name(&self) -> &str {
self.name()
}
}

impl DataMaskNameIdentRaw {
pub fn data_mask_name(&self) -> &str {
self.name()
}
}

mod kvapi_impl {

use databend_common_meta_kvapi::kvapi;
use databend_common_meta_kvapi::kvapi::Key;

use crate::data_mask::DatamaskId;
use crate::data_mask::DataMaskIdIdent;
use crate::tenant_key::resource::TenantResource;

pub struct Resource;
impl TenantResource for Resource {
const PREFIX: &'static str = "__fd_datamask";
const TYPE: &'static str = "DataMaskNameIdent";
const HAS_TENANT: bool = true;
type ValueType = DatamaskId;
type ValueType = DataMaskIdIdent;
}

impl kvapi::Value for DatamaskId {
impl kvapi::Value for DataMaskIdIdent {
fn dependency_keys(&self) -> impl IntoIterator<Item = String> {
[self.to_string_key()]
}
Expand Down
Loading
Loading