Skip to content

Commit

Permalink
Merge pull request #2355 from dantengsky/feature-remote-backend
Browse files Browse the repository at this point in the history
ISSUE-2361: remote backend for fuse table
  • Loading branch information
databend-bot authored Oct 21, 2021
2 parents 85a25b8 + b8b128e commit 2b9cd07
Show file tree
Hide file tree
Showing 37 changed files with 260 additions and 119 deletions.
9 changes: 9 additions & 0 deletions common/datavalues/src/data_value_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,15 @@ impl DataValue {
}
DataType::Boolean => try_build_array! {values},
DataType::String => try_build_array! {String, values},
DataType::Date16 => {
try_build_array! {PrimitiveArrayBuilder, u16, UInt16, values}
}
DataType::Date32 => {
try_build_array! {PrimitiveArrayBuilder, i32, Int32, values}
}
DataType::DateTime32(_) => {
try_build_array! {PrimitiveArrayBuilder, u32, UInt32, values}
}
other => Result::Err(ErrorCode::BadDataValueType(format!(
"Unexpected type:{} for DataValue List",
other
Expand Down
1 change: 1 addition & 0 deletions common/exception/src/exception.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ build_exceptions! {

ConcurrentSnapshotInstall(2404),
IllegalSnapshot(2405),
TableVersionMissMatch(2406),

// KVSrv server error

Expand Down
11 changes: 6 additions & 5 deletions common/meta/api/src/meta_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
use std::sync::Arc;

use common_exception::Result;
use common_meta_types::CommitTableReply;
use common_meta_types::CreateDatabaseReply;
use common_meta_types::CreateTableReply;
use common_meta_types::DatabaseInfo;
use common_meta_types::MetaId;
use common_meta_types::MetaVersion;
use common_meta_types::TableInfo;
use common_meta_types::UpsertTableOptionReply;
use common_planners::CreateDatabasePlan;
use common_planners::CreateTablePlan;
use common_planners::DropDatabasePlan;
Expand Down Expand Up @@ -56,12 +56,13 @@ pub trait MetaApi: Send + Sync {
table_version: Option<MetaVersion>,
) -> Result<Arc<TableInfo>>;

async fn commit_table(
async fn upsert_table_option(
&self,
table_id: MetaId,
new_table_version: MetaVersion,
new_snapshot_location: String,
) -> Result<CommitTableReply>;
table_version: MetaVersion,
option_key: String,
option_value: String,
) -> Result<UpsertTableOptionReply>;

fn name(&self) -> String;
}
12 changes: 7 additions & 5 deletions common/meta/embedded/src/meta_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ use common_exception::Result;
use common_meta_api::MetaApi;
use common_meta_raft_store::state_machine::AppliedState;
use common_meta_types::Cmd;
use common_meta_types::CommitTableReply;
use common_meta_types::CreateDatabaseReply;
use common_meta_types::CreateTableReply;
use common_meta_types::DatabaseInfo;
use common_meta_types::MetaId;
use common_meta_types::MetaVersion;
use common_meta_types::Table;
use common_meta_types::TableInfo;
use common_meta_types::UpsertTableOptionReply;
use common_planners::CreateDatabasePlan;
use common_planners::CreateTablePlan;
use common_planners::DropDatabasePlan;
Expand Down Expand Up @@ -123,6 +123,7 @@ impl MetaApi for MetaEmbedded {

let table = Table {
table_id: 0,
table_version: 0,
table_name: table_name.to_string(),
database_id: 0, // this field is unused during the creation of table
db_name: db_name.to_string(),
Expand Down Expand Up @@ -260,12 +261,13 @@ impl MetaApi for MetaEmbedded {
Ok(Arc::new(rst))
}

async fn commit_table(
async fn upsert_table_option(
&self,
_table_id: MetaId,
_new_table_version: MetaVersion,
_new_snapshot_location: String,
) -> Result<CommitTableReply> {
_table_version: MetaVersion,
_option_key: String,
_option_value: String,
) -> Result<UpsertTableOptionReply> {
todo!()
}

Expand Down
15 changes: 15 additions & 0 deletions common/meta/flight/src/flight_action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use common_meta_types::MetaVersion;
use common_meta_types::PrefixListReply;
use common_meta_types::TableInfo;
use common_meta_types::UpsertKVActionReply;
use common_meta_types::UpsertTableOptionReply;
use common_planners::CreateDatabasePlan;
use common_planners::CreateTablePlan;
use common_planners::DropDatabasePlan;
Expand Down Expand Up @@ -71,6 +72,7 @@ pub enum MetaFlightAction {
GetTableExt(GetTableExtReq),
GetTables(GetTablesAction),
GetDatabases(GetDatabasesAction),
CommitTable(UpsertTableOptionReq),

// general purpose kv
UpsertKV(UpsertKVAction),
Expand Down Expand Up @@ -254,6 +256,19 @@ action_declare!(
MetaFlightAction::GetTableExt
);

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq)]
pub struct UpsertTableOptionReq {
pub table_id: MetaId,
pub table_version: MetaVersion,
pub option_key: String,
pub option_value: String,
}
action_declare!(
UpsertTableOptionReq,
UpsertTableOptionReply,
MetaFlightAction::CommitTable
);

// - get tables
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq)]
pub struct GetTablesAction {
Expand Down
22 changes: 15 additions & 7 deletions common/meta/flight/src/impls/meta_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
use std::sync::Arc;

use common_meta_api::MetaApi;
use common_meta_types::CommitTableReply;
use common_meta_types::CreateDatabaseReply;
use common_meta_types::CreateTableReply;
use common_meta_types::DatabaseInfo;
use common_meta_types::MetaId;
use common_meta_types::MetaVersion;
use common_meta_types::TableInfo;
use common_meta_types::UpsertTableOptionReply;
use common_planners::CreateDatabasePlan;
use common_planners::CreateTablePlan;
use common_planners::DropDatabasePlan;
Expand All @@ -38,6 +38,7 @@ use crate::GetTableAction;
use crate::GetTableExtReq;
use crate::GetTablesAction;
use crate::MetaFlightClient;
use crate::UpsertTableOptionReq;

#[async_trait::async_trait]
impl MetaApi for MetaFlightClient {
Expand Down Expand Up @@ -101,13 +102,20 @@ impl MetaApi for MetaFlightClient {
self.do_action(GetTableExtReq { tbl_id, tbl_ver }).await
}

async fn commit_table(
async fn upsert_table_option(
&self,
_table_id: MetaId,
_new_table_version: MetaVersion,
_new_snapshot_location: String,
) -> common_exception::Result<CommitTableReply> {
todo!()
table_id: MetaId,
table_version: MetaVersion,
option_key: String,
option_value: String,
) -> common_exception::Result<UpsertTableOptionReply> {
self.do_action(UpsertTableOptionReq {
table_id,
table_version,
option_key,
option_value,
})
.await
}

fn name(&self) -> String {
Expand Down
1 change: 1 addition & 0 deletions common/meta/raft-store/src/state_machine/sm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ impl StateMachine {
let table_id = self.incr_seq(SEQ_TABLE_ID).await?;
let table = Table {
table_id,
table_version: 0,
table_name: table_name.to_string(),
database_id: dbi.database_id,
db_name: db_name.to_string(),
Expand Down
2 changes: 1 addition & 1 deletion common/meta/types/src/commit_table_reply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@
// limitations under the License.
//

pub type CommitTableReply = ();
pub type UpsertTableOptionReply = ();
2 changes: 1 addition & 1 deletion common/meta/types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub use cluster::Node;
pub use cluster::NodeInfo;
pub use cluster::Slot;
pub use cmd::Cmd;
pub use commit_table_reply::CommitTableReply;
pub use commit_table_reply::UpsertTableOptionReply;
pub use common_meta_sled_store::KVMeta;
pub use common_meta_sled_store::KVValue;
pub use common_meta_sled_store::SeqValue;
Expand Down
2 changes: 2 additions & 0 deletions common/meta/types/src/table_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ use crate::MetaVersion;
pub struct Table {
pub table_id: u64,

pub table_version: u64,

/// name of this table
pub table_name: String,

Expand Down
1 change: 1 addition & 0 deletions metasrv/src/executor/action_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ impl ActionHandler {
MetaFlightAction::GetTable(a) => s.serialize(self.handle(a).await?),
MetaFlightAction::GetTables(a) => s.serialize(self.handle(a).await?),
MetaFlightAction::GetTableExt(a) => s.serialize(self.handle(a).await?),
MetaFlightAction::CommitTable(a) => s.serialize(self.handle(a).await?),
}
}
}
21 changes: 20 additions & 1 deletion metasrv/src/executor/meta_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use common_meta_flight::GetDatabasesAction;
use common_meta_flight::GetTableAction;
use common_meta_flight::GetTableExtReq;
use common_meta_flight::GetTablesAction;
use common_meta_flight::UpsertTableOptionReq;
use common_meta_raft_store::state_machine::AppliedState;
use common_meta_types::Cmd::CreateDatabase;
use common_meta_types::Cmd::CreateTable;
Expand All @@ -41,6 +42,7 @@ use common_meta_types::DatabaseInfo;
use common_meta_types::LogEntry;
use common_meta_types::Table;
use common_meta_types::TableInfo;
use common_meta_types::UpsertTableOptionReply;
use log::info;

use crate::executor::action_handler::RequestHandler;
Expand Down Expand Up @@ -156,6 +158,7 @@ impl RequestHandler<CreateTableAction> for ActionHandler {

let table = Table {
table_id: 0,
table_version: 0,
table_name: table_name.to_string(),
database_id: 0, // this field is unused during the creation of table
db_name: db_name.to_string(),
Expand Down Expand Up @@ -277,7 +280,7 @@ impl RequestHandler<GetTableAction> for ActionHandler {
let rst = TableInfo {
database_id: db.database_id,
table_id: table.table_id,
version: 0, // placeholder, not yet implemented in meta service
version: table.table_version,
db: db_name.clone(),
name: table_name.clone(),
schema: Arc::new(arrow_schema.into()),
Expand Down Expand Up @@ -348,3 +351,19 @@ impl RequestHandler<GetTablesAction> for ActionHandler {
Ok(res)
}
}
#[async_trait::async_trait]
impl RequestHandler<UpsertTableOptionReq> for ActionHandler {
async fn handle(
&self,
req: UpsertTableOptionReq,
) -> common_exception::Result<UpsertTableOptionReply> {
self.meta_node
.upsert_table_opt(
req.table_id,
req.table_version,
req.option_key,
req.option_value,
)
.await
}
}
29 changes: 29 additions & 0 deletions metasrv/src/meta_service/raftmeta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1027,6 +1027,35 @@ impl MetaNode {
sm.get_table(tid)
}

#[tracing::instrument(level = "debug", skip(self))]
pub async fn upsert_table_opt(
&self,
table_id: u64,
table_version: u64,
opt_key: String,
opt_value: String,
) -> common_exception::Result<()> {
// non-consensus modification, tobe fixed latter
let mut sm = self.sto.state_machine.write().await;
if let Some(tbl) = sm.tables.get_mut(&table_id) {
if tbl.table_version != table_version {
Err(ErrorCode::TableVersionMissMatch(format!(
"targeting version {}, current version {}",
table_version, tbl.table_version,
)))
} else {
tbl.table_options.insert(opt_key, opt_value);
tbl.table_version += 1;
Ok(())
}
} else {
Err(ErrorCode::UnknownTable(format!(
"unknown table of id {}",
table_id
)))
}
}

#[tracing::instrument(level = "debug", skip(self))]
pub async fn get_kv(&self, key: &str) -> common_exception::Result<Option<SeqValue<KVValue>>> {
// inconsistent get: from local state machine
Expand Down
9 changes: 5 additions & 4 deletions query/src/catalogs/backends/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
use std::sync::Arc;

use common_exception::Result;
use common_meta_types::CommitTableReply;
use common_meta_types::CreateDatabaseReply;
use common_meta_types::CreateTableReply;
use common_meta_types::DatabaseInfo;
use common_meta_types::MetaId;
use common_meta_types::MetaVersion;
use common_meta_types::TableInfo;
use common_meta_types::UpsertTableOptionReply;
use common_planners::CreateDatabasePlan;
use common_planners::CreateTablePlan;
use common_planners::DropDatabasePlan;
Expand Down Expand Up @@ -55,12 +55,13 @@ pub trait MetaApiSync: Send + Sync {
table_version: Option<MetaVersion>,
) -> Result<Arc<TableInfo>>;

fn commit_table(
fn upsert_table_option(
&self,
table_id: MetaId,
new_table_version: MetaVersion,
new_snapshot_location: String,
) -> Result<CommitTableReply>;
table_option_key: String,
table_option_value: String,
) -> Result<UpsertTableOptionReply>;

fn name(&self) -> String;
}
Loading

0 comments on commit 2b9cd07

Please sign in to comment.