Skip to content

Commit

Permalink
refactor: define BackgroundJobIdIdent with TIdent, with HAS_TENANT di…
Browse files Browse the repository at this point in the history
…sabled (#15327)
  • Loading branch information
drmingdrmer authored Apr 24, 2024
1 parent 2e12e0b commit 3d18e3e
Show file tree
Hide file tree
Showing 9 changed files with 191 additions and 137 deletions.
95 changes: 43 additions & 52 deletions src/meta/api/src/background_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use chrono::Utc;
use databend_common_meta_app::app_error::AppError;
use databend_common_meta_app::app_error::BackgroundJobAlreadyExists;
use databend_common_meta_app::app_error::UnknownBackgroundJob;
use databend_common_meta_app::background::BackgroundJobId;
use databend_common_meta_app::background::BackgroundJobIdIdent;
use databend_common_meta_app::background::BackgroundJobIdent;
use databend_common_meta_app::background::BackgroundJobInfo;
use databend_common_meta_app::background::BackgroundTaskIdent;
Expand Down Expand Up @@ -50,7 +50,9 @@ use databend_common_meta_types::MatchSeq::Any;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::MetaSpec;
use databend_common_meta_types::Operation;
use databend_common_meta_types::SeqValue;
use databend_common_meta_types::TxnRequest;
use databend_common_meta_types::With;
use log::debug;
use minitrace::func_name;

Expand All @@ -60,6 +62,8 @@ use crate::fetch_id;
use crate::get_pb_value;
use crate::get_u64_value;
use crate::kv_app_error::KVAppError;
use crate::kv_pb_api::KVPbApi;
use crate::kv_pb_api::UpsertPB;
use crate::send_txn;
use crate::serialize_struct;
use crate::serialize_u64;
Expand Down Expand Up @@ -103,7 +107,7 @@ impl<KV: kvapi::KVApi<Error = MetaError>> BackgroundApi for KV {
}

let id = fetch_id(self, IdGenerator::background_job_id()).await?;
let id_key = BackgroundJobId { id };
let id_key = BackgroundJobIdIdent::new(name_key.tenant(), id);

debug!(
id :? =(&id_key),
Expand Down Expand Up @@ -196,10 +200,10 @@ impl<KV: kvapi::KVApi<Error = MetaError>> BackgroundApi for KV {

let name_key = &req.name;

let (id, _, job) =
let (id_ident, _, job) =
get_background_job_or_error(self, name_key, format!("get_: {:?}", name_key)).await?;

Ok(GetBackgroundJobReply { id, info: job })
Ok(GetBackgroundJobReply::new(id_ident, job))
}

#[minitrace::trace]
Expand All @@ -213,17 +217,17 @@ impl<KV: kvapi::KVApi<Error = MetaError>> BackgroundApi for KV {
let reply = self.prefix_list_kv(&prefix).await?;
let mut res = vec![];
for (k, v) in reply {
let ident = BackgroundJobIdent::from_str_key(k.as_str()).map_err(|e| {
KVAppError::MetaError(MetaError::from(InvalidReply::new(
"list_background_jobs",
&e,
)))
})?;
let ident = BackgroundJobIdent::from_str_key(k.as_str())
.map_err(|e| MetaError::from(InvalidReply::new("list_background_jobs", &e)))?;

let job_id = deserialize_u64(&v.data)?;
let r = get_background_job_by_id(self, &BackgroundJobId { id: job_id.0 }).await?;

let req = BackgroundJobIdIdent::new(&req.tenant, job_id.0);
let seq_info = self.get_pb(&req).await?;

// filter none and get the task info
if let Some(task_info) = r.1 {
res.push((r.0, ident.name().to_string(), task_info));
if let Some(sv) = seq_info {
res.push((sv.seq(), ident.job_name().to_string(), sv.data));
}
}
Ok(res)
Expand Down Expand Up @@ -298,54 +302,43 @@ impl<KV: kvapi::KVApi<Error = MetaError>> BackgroundApi for KV {
async fn get_background_job_id(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
name_key: &BackgroundJobIdent,
) -> Result<BackgroundJobId, KVAppError> {
) -> Result<BackgroundJobIdIdent, KVAppError> {
let (id_seq, id) = get_u64_value(kv_api, name_key).await?;
background_job_has_to_exist(id_seq, name_key)?;
Ok(BackgroundJobId { id })
assert_background_job_exist(id_seq, name_key)?;

Ok(BackgroundJobIdIdent::new(name_key.tenant(), id))
}

async fn get_background_job_or_error(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
name_key: &BackgroundJobIdent,
name_ident: &BackgroundJobIdent,
_msg: impl Display,
) -> Result<(u64, u64, BackgroundJobInfo), KVAppError> {
let id_key = get_background_job_id(kv_api, name_key).await?;
let (id_seq, job_info) = get_pb_value(kv_api, &id_key).await?;
background_job_has_to_exist(id_seq, name_key)?;

Ok((
id_key.id,
id_seq,
// Safe unwrap(): background_job_seq > 0 implies background_job is not None.
job_info.unwrap(),
))
) -> Result<(BackgroundJobIdIdent, u64, BackgroundJobInfo), KVAppError> {
let id_ident = get_background_job_id(kv_api, name_ident).await?;

let (id_seq, job_info) = get_pb_value(kv_api, &id_ident).await?;
assert_background_job_exist(id_seq, name_ident)?;

// Safe unwrap(): background_job_seq > 0 implies background_job is not None.
Ok((id_ident, id_seq, job_info.unwrap()))
}

/// Return OK if a db_id or db_meta exists by checking the seq.
///
/// Otherwise returns UnknownBackgroundJob error
pub fn background_job_has_to_exist(
pub fn assert_background_job_exist(
seq: u64,
name_ident: &BackgroundJobIdent,
) -> Result<(), KVAppError> {
) -> Result<(), AppError> {
if seq == 0 {
debug!(seq = seq, name_ident :? =(name_ident); "background job does not exist");
Err(KVAppError::AppError(AppError::UnknownBackgroundJob(
UnknownBackgroundJob::new(name_ident.name(), format!("{:?}", name_ident)),
)))
let unknown = UnknownBackgroundJob::new(name_ident.job_name(), format!("{:?}", name_ident));
Err(AppError::UnknownBackgroundJob(unknown))
} else {
Ok(())
}
}

async fn get_background_job_by_id(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
id: &BackgroundJobId,
) -> Result<(u64, Option<BackgroundJobInfo>), KVAppError> {
let (seq, res) = get_pb_value(kv_api, id).await?;
Ok((seq, res))
}

async fn get_background_task_by_name(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
id: &BackgroundTaskIdent,
Expand All @@ -360,20 +353,18 @@ async fn update_background_job<F: FnOnce(&mut BackgroundJobInfo) -> bool>(
mutation: F,
) -> Result<UpdateBackgroundJobReply, KVAppError> {
debug!(req :? =(name); "BackgroundApi: {}", func_name!());
let (id, id_val_seq, mut info) =
let (id_ident, id_val_seq, mut info) =
get_background_job_or_error(kv_api, name, "update_background_job").await?;

let should_update = mutation(&mut info);
if !should_update {
return Ok(UpdateBackgroundJobReply { id });
return Ok(UpdateBackgroundJobReply::new(id_ident.clone()));
}
let resp = kv_api
.upsert_kv(UpsertKVReq::new(
BackgroundJobId { id }.to_string_key().as_str(),
MatchSeq::Exact(id_val_seq),
Operation::Update(serialize_struct(&info)?),
None,
))
.await?;

let req = UpsertPB::update(id_ident.clone(), info).with(MatchSeq::Exact(id_val_seq));
let resp = kv_api.upsert_pb(&req).await?;

assert!(resp.is_changed());
Ok(UpdateBackgroundJobReply { id })

Ok(UpdateBackgroundJobReply { id_ident })
}
59 changes: 15 additions & 44 deletions src/meta/app/src/background/background_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use chrono::DateTime;
use chrono::Utc;
use cron::Schedule;

use crate::background::BackgroundJobIdIdent;
use crate::background::BackgroundJobIdent;
use crate::background::BackgroundTaskType;
use crate::principal::UserIdentity;
Expand Down Expand Up @@ -214,11 +215,6 @@ impl BackgroundJobInfo {
}
}

#[derive(Clone, Debug, Default, Eq, PartialEq)]
pub struct BackgroundJobId {
pub id: u64,
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct CreateBackgroundJobReq {
pub if_not_exists: bool,
Expand Down Expand Up @@ -259,10 +255,16 @@ impl Display for GetBackgroundJobReq {

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct GetBackgroundJobReply {
pub id: u64,
pub id_ident: BackgroundJobIdIdent,
pub info: BackgroundJobInfo,
}

impl GetBackgroundJobReply {
pub fn new(id_ident: BackgroundJobIdIdent, info: BackgroundJobInfo) -> Self {
Self { id_ident, info }
}
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct UpdateBackgroundJobStatusReq {
pub job_name: BackgroundJobIdent,
Expand Down Expand Up @@ -321,7 +323,13 @@ impl Display for UpdateBackgroundJobReq {

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct UpdateBackgroundJobReply {
pub id: u64,
pub id_ident: BackgroundJobIdIdent,
}

impl UpdateBackgroundJobReply {
pub fn new(id_ident: BackgroundJobIdIdent) -> Self {
Self { id_ident }
}
}

#[derive(Clone, Debug, PartialEq, Eq)]
Expand Down Expand Up @@ -356,40 +364,3 @@ impl Display for ListBackgroundJobsReq {
write!(f, "list_background_job({})", self.tenant.tenant_name())
}
}

mod kvapi_key_impl {
use databend_common_meta_kvapi::kvapi;
use databend_common_meta_kvapi::kvapi::KeyBuilder;
use databend_common_meta_kvapi::kvapi::KeyError;
use databend_common_meta_kvapi::kvapi::KeyParser;

use crate::background::background_job::BackgroundJobId;
use crate::background::BackgroundJobInfo;

impl kvapi::KeyCodec for BackgroundJobId {
fn encode_key(&self, b: KeyBuilder) -> KeyBuilder {
b.push_u64(self.id)
}

fn decode_key(parser: &mut KeyParser) -> Result<Self, KeyError> {
let id = parser.next_u64()?;
Ok(BackgroundJobId { id })
}
}

impl kvapi::Key for BackgroundJobId {
const PREFIX: &'static str = "__fd_background_job_by_id";

type ValueType = BackgroundJobInfo;

fn parent(&self) -> Option<String> {
None
}
}

impl kvapi::Value for BackgroundJobInfo {
fn dependency_keys(&self) -> impl IntoIterator<Item = String> {
[]
}
}
}
90 changes: 90 additions & 0 deletions src/meta/app/src/background/background_job_id_ident.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::tenant_key::ident::TIdent;
use crate::tenant_key::raw::TIdentRaw;

pub type BackgroundJobIdIdent = TIdent<Resource, u64>;
pub type BackgroundJobIdIdentRaw = TIdentRaw<Resource, u64>;

pub use kvapi_impl::Resource;

impl BackgroundJobIdIdent {
pub fn job_id(&self) -> u64 {
*self.name()
}
}

impl BackgroundJobIdIdentRaw {
pub fn job_id(&self) -> u64 {
*self.name()
}
}

mod kvapi_impl {

use databend_common_meta_kvapi::kvapi;

use crate::background::BackgroundJobInfo;
use crate::tenant_key::resource::TenantResource;

pub struct Resource;
impl TenantResource for Resource {
const PREFIX: &'static str = "__fd_background_job_by_id";
const TYPE: &'static str = "BackgroundJobIdIdent";
const HAS_TENANT: bool = false;
type ValueType = BackgroundJobInfo;
}

impl kvapi::Value for BackgroundJobInfo {
fn dependency_keys(&self) -> impl IntoIterator<Item = String> {
[]
}
}

// // Use these error types to replace usage of ErrorCode if possible.
// impl From<ExistError<Resource>> for ErrorCode {
// impl From<UnknownError<Resource>> for ErrorCode {
}

#[cfg(test)]
mod tests {
use databend_common_meta_kvapi::kvapi::Key;

use super::BackgroundJobIdIdent;
use crate::tenant::Tenant;

#[test]
fn test_background_job_id_ident() {
let tenant = Tenant::new_literal("dummy");
let ident = BackgroundJobIdIdent::new(tenant, 3);

let key = ident.to_string_key();
assert_eq!(key, "__fd_background_job_by_id/3");

assert_eq!(ident, BackgroundJobIdIdent::from_str_key(&key).unwrap());
}

#[test]
fn test_background_job_id_ident_with_key_space() {
// TODO(xp): implement this test
// let tenant = Tenant::new_literal("test");
// let ident = BackgroundJobIdIdent::new(tenant, 3);
//
// let key = ident.to_string_key();
// assert_eq!(key, "__fd_background_job_by_id/3");
//
// assert_eq!(ident, BackgroundJobIdIdent::from_str_key(&key).unwrap());
}
}
Loading

0 comments on commit 3d18e3e

Please sign in to comment.