Skip to content

Commit

Permalink
fix: stream illegal after source rename (databendlabs#15843)
Browse files Browse the repository at this point in the history
* feat(meta): support use id get table/db_name

add get_table_name_by_id and get_db_name_by_id

* remove extras codes

* stream source table and database can support rename

* fix

* make lint

* fix

* fix review commend

* add test

* fix review commend

* fix review commend

* fix

* fix

* fix test

---------

Co-authored-by: taichong <[email protected]>
  • Loading branch information
zhyass and TCeason authored Jun 21, 2024
1 parent 7b9394b commit 24a3041
Show file tree
Hide file tree
Showing 25 changed files with 398 additions and 169 deletions.
2 changes: 2 additions & 0 deletions src/meta/api/src/schema_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@ pub trait SchemaApi: Send + Sync {

async fn get_db_name_by_id(&self, db_id: MetaId) -> Result<String, KVAppError>;

async fn get_table_name_by_id(&self, table_id: MetaId) -> Result<Option<String>, MetaError>;

async fn get_table_copied_file_info(
&self,
req: GetTableCopiedFileReq,
Expand Down
16 changes: 16 additions & 0 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2433,6 +2433,22 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
Ok(db_names)
}

#[logcall::logcall("debug")]
#[minitrace::trace]
async fn get_table_name_by_id(&self, table_id: MetaId) -> Result<Option<String>, MetaError> {
debug!(req :? =(&table_id); "SchemaApi: {}", func_name!());

let table_id_to_name_key = TableIdToName { table_id };

let seq_table_name = self.get_pb(&table_id_to_name_key).await?;

debug!(ident :% =(&table_id_to_name_key); "get_table_name_by_id");

let table_name = seq_table_name.map(|s| s.data.table_name);

Ok(table_name)
}

#[logcall::logcall("debug")]
#[minitrace::trace]
async fn drop_table_by_id(&self, req: DropTableByIdReq) -> Result<DropTableReply, KVAppError> {
Expand Down
36 changes: 36 additions & 0 deletions src/meta/api/src/schema_api_test_suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ impl SchemaApiTestSuite {
.drop_table_without_tableid_to_name(&b.build().await)
.await?;

suite.get_table_name_by_id(&b.build().await).await?;
suite.get_db_name_by_id(&b.build().await).await?;
suite.test_sequence(&b.build().await).await?;

Expand Down Expand Up @@ -5434,6 +5435,41 @@ impl SchemaApiTestSuite {
Ok(())
}

#[minitrace::trace]
async fn get_table_name_by_id<MT>(&self, mt: &MT) -> anyhow::Result<()>
where MT: SchemaApi + kvapi::AsKVApi<Error = MetaError> {
let tenant_name = "tenant1";
let db_name = "db1";
let tbl_name = "tb2";

let mut util = Util::new(mt, tenant_name, db_name, tbl_name, "eng1");
let table_id;

info!("--- prepare db and table");
{
util.create_db().await?;
let (tid, _table_meta) = util.create_table().await?;
table_id = tid;
}

info!("--- get_table_name_by_id ");
{
info!("--- get_table_name_by_id ");
{
let got = mt.get_table_name_by_id(table_id).await?;
assert!(got.is_some());
assert_eq!(tbl_name.to_owned(), got.unwrap());
}

info!("--- get_table_name_by_id with not exists table_id");
{
let got = mt.get_table_name_by_id(1024).await?;
assert!(got.is_none());
}
}
Ok(())
}

#[minitrace::trace]
async fn get_db_name_by_id<MT: SchemaApi>(&self, mt: &MT) -> anyhow::Result<()> {
let tenant_name = "tenant1";
Expand Down
11 changes: 7 additions & 4 deletions src/query/catalog/src/catalog/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,17 +206,20 @@ pub trait Catalog: DynClone + Send + Sync + Debug {
&self,
tenant: &Tenant,
table_ids: &[MetaId],
) -> databend_common_exception::Result<Vec<Option<String>>>;
) -> Result<Vec<Option<String>>>;

// Mget the db name by meta id.
async fn get_db_name_by_id(&self, db_ids: MetaId) -> databend_common_exception::Result<String>;
// Get the db name by meta id.
async fn get_db_name_by_id(&self, db_ids: MetaId) -> Result<String>;

// Mget the dbs name by meta ids.
async fn mget_database_names_by_ids(
&self,
tenant: &Tenant,
db_ids: &[MetaId],
) -> databend_common_exception::Result<Vec<Option<String>>>;
) -> Result<Vec<Option<String>>>;

// Get the table name by meta id.
async fn get_table_name_by_id(&self, table_id: MetaId) -> Result<Option<String>>;

// Get one table by db and table name.
async fn get_table(
Expand Down
8 changes: 6 additions & 2 deletions src/query/catalog/src/catalog/session_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,12 @@ impl Catalog for SessionCatalog {
self.inner.mget_table_names_by_ids(tenant, table_ids).await
}

// Mget the db name by meta id.
async fn get_db_name_by_id(&self, db_id: MetaId) -> databend_common_exception::Result<String> {
async fn get_table_name_by_id(&self, table_id: MetaId) -> Result<Option<String>> {
self.inner.get_table_name_by_id(table_id).await
}

// Get the db name by meta id.
async fn get_db_name_by_id(&self, db_id: MetaId) -> Result<String> {
self.inner.get_db_name_by_id(db_id).await
}

Expand Down
26 changes: 17 additions & 9 deletions src/query/ee/src/stream/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::collections::HashMap;
use std::sync::Arc;

use databend_common_base::base::GlobalInstance;
use databend_common_catalog::table::Table;
use databend_common_catalog::table::TableExt;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
Expand All @@ -37,11 +38,11 @@ use databend_enterprise_stream_handler::StreamHandler;
use databend_enterprise_stream_handler::StreamHandlerWrapper;
use databend_storages_common_table_meta::table::OPT_KEY_CHANGE_TRACKING;
use databend_storages_common_table_meta::table::OPT_KEY_CHANGE_TRACKING_BEGIN_VER;
use databend_storages_common_table_meta::table::OPT_KEY_DATABASE_NAME;
use databend_storages_common_table_meta::table::OPT_KEY_DATABASE_ID;
use databend_storages_common_table_meta::table::OPT_KEY_MODE;
use databend_storages_common_table_meta::table::OPT_KEY_SNAPSHOT_LOCATION;
use databend_storages_common_table_meta::table::OPT_KEY_TABLE_ID;
use databend_storages_common_table_meta::table::OPT_KEY_TABLE_NAME;
use databend_storages_common_table_meta::table::OPT_KEY_SOURCE_DATABASE_ID;
use databend_storages_common_table_meta::table::OPT_KEY_SOURCE_TABLE_ID;
use databend_storages_common_table_meta::table::OPT_KEY_TABLE_VER;

pub struct RealStreamHandler {}
Expand Down Expand Up @@ -107,14 +108,21 @@ impl StreamHandler for RealStreamHandler {
.await?;
table.check_changes_valid(&plan.table_database, &plan.table_name, change_desc.seq)?;

let db_id = table
.get_table_info()
.options()
.get(OPT_KEY_DATABASE_ID)
.ok_or_else(|| {
ErrorCode::Internal(format!(
"Invalid fuse table, table option {} not found",
OPT_KEY_DATABASE_ID
))
})?;

let mut options = BTreeMap::new();
options.insert(OPT_KEY_MODE.to_string(), change_desc.mode.to_string());
options.insert(OPT_KEY_TABLE_NAME.to_string(), plan.table_name.clone());
options.insert(
OPT_KEY_DATABASE_NAME.to_string(),
plan.table_database.clone(),
);
options.insert(OPT_KEY_TABLE_ID.to_string(), table_id.to_string());
options.insert(OPT_KEY_SOURCE_DATABASE_ID.to_owned(), db_id.to_string());
options.insert(OPT_KEY_SOURCE_TABLE_ID.to_string(), table_id.to_string());
options.insert(OPT_KEY_TABLE_VER.to_string(), change_desc.seq.to_string());
if let Some(snapshot_loc) = change_desc.location {
options.insert(OPT_KEY_SNAPSHOT_LOCATION.to_string(), snapshot_loc);
Expand Down
10 changes: 10 additions & 0 deletions src/query/service/src/catalogs/default/database_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,16 @@ impl Catalog for DatabaseCatalog {
Ok(tables)
}

#[async_backtrace::framed]
async fn get_table_name_by_id(&self, table_id: MetaId) -> Result<Option<String>> {
let res = self.immutable_catalog.get_table_name_by_id(table_id).await;

match res {
Ok(Some(x)) => Ok(Some(x)),
Ok(None) | Err(_) => self.mutable_catalog.get_table_name_by_id(table_id).await,
}
}

#[async_backtrace::framed]
async fn get_db_name_by_id(&self, db_id: MetaId) -> Result<String> {
let res = self.immutable_catalog.get_db_name_by_id(db_id).await;
Expand Down
12 changes: 10 additions & 2 deletions src/query/service/src/catalogs/default/immutable_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ impl Catalog for ImmutableCatalog {
&self,
_tenant: &Tenant,
table_ids: &[MetaId],
) -> databend_common_exception::Result<Vec<Option<String>>> {
) -> Result<Vec<Option<String>>> {
let mut table_name = Vec::with_capacity(table_ids.len());
for id in table_ids {
if let Some(table) = self.sys_db_meta.get_by_id(id) {
Expand All @@ -219,7 +219,7 @@ impl Catalog for ImmutableCatalog {
Ok(table_name)
}

async fn get_db_name_by_id(&self, db_id: MetaId) -> databend_common_exception::Result<String> {
async fn get_db_name_by_id(&self, db_id: MetaId) -> Result<String> {
if self.sys_db.get_db_info().ident.db_id == db_id {
Ok("system".to_string())
} else if self.info_schema_db.get_db_info().ident.db_id == db_id {
Expand Down Expand Up @@ -248,6 +248,14 @@ impl Catalog for ImmutableCatalog {
Ok(res)
}

async fn get_table_name_by_id(&self, table_id: MetaId) -> Result<Option<String>> {
let table_name = self
.sys_db_meta
.get_by_id(&table_id)
.map(|v| v.name().to_string());
Ok(table_name)
}

#[async_backtrace::framed]
async fn get_table(
&self,
Expand Down
15 changes: 9 additions & 6 deletions src/query/service/src/catalogs/default/mutable_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,10 +367,7 @@ impl Catalog for MutableCatalog {
}

#[async_backtrace::framed]
async fn get_table_meta_by_id(
&self,
table_id: MetaId,
) -> databend_common_exception::Result<Option<SeqV<TableMeta>>> {
async fn get_table_meta_by_id(&self, table_id: MetaId) -> Result<Option<SeqV<TableMeta>>> {
let res = self.ctx.meta.get_table_by_id(table_id).await?;
Ok(res)
}
Expand All @@ -379,13 +376,13 @@ impl Catalog for MutableCatalog {
&self,
_tenant: &Tenant,
table_ids: &[MetaId],
) -> databend_common_exception::Result<Vec<Option<String>>> {
) -> Result<Vec<Option<String>>> {
let res = self.ctx.meta.mget_table_names_by_ids(table_ids).await?;
Ok(res)
}

#[async_backtrace::framed]
async fn get_db_name_by_id(&self, db_id: MetaId) -> databend_common_exception::Result<String> {
async fn get_db_name_by_id(&self, db_id: MetaId) -> Result<String> {
let res = self.ctx.meta.get_db_name_by_id(db_id).await?;
Ok(res)
}
Expand All @@ -399,6 +396,12 @@ impl Catalog for MutableCatalog {
Ok(res)
}

#[async_backtrace::framed]
async fn get_table_name_by_id(&self, table_id: MetaId) -> Result<Option<String>> {
let res = self.ctx.meta.get_table_name_by_id(table_id).await?;
Ok(res)
}

#[async_backtrace::framed]
async fn get_table(
&self,
Expand Down
25 changes: 25 additions & 0 deletions src/query/service/src/interpreters/common/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::collections::HashSet;
use std::sync::Arc;

use chrono::Utc;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_license::license::Feature;
use databend_common_license::license_manager::get_license_manager;
Expand All @@ -30,7 +31,11 @@ use databend_common_storages_fuse::FuseTable;
use databend_common_storages_fuse::TableContext;
use databend_common_storages_stream::stream_table::StreamTable;
use databend_common_storages_stream::stream_table::STREAM_ENGINE;
use databend_storages_common_table_meta::table::OPT_KEY_DATABASE_ID;
use databend_storages_common_table_meta::table::OPT_KEY_DATABASE_NAME;
use databend_storages_common_table_meta::table::OPT_KEY_SNAPSHOT_LOCATION;
use databend_storages_common_table_meta::table::OPT_KEY_SOURCE_DATABASE_ID;
use databend_storages_common_table_meta::table::OPT_KEY_TABLE_NAME;
use databend_storages_common_table_meta::table::OPT_KEY_TABLE_VER;

use crate::sessions::QueryContext;
Expand Down Expand Up @@ -64,6 +69,26 @@ pub async fn dml_build_update_stream_req(
options.insert(OPT_KEY_SNAPSHOT_LOCATION.to_string(), snapshot_loc);
}

// To be compatible with older versions, set source database id.
if options.get(OPT_KEY_SOURCE_DATABASE_ID).is_none() {
let source_db_id = inner_fuse
.get_table_info()
.options()
.get(OPT_KEY_DATABASE_ID)
.ok_or_else(|| {
ErrorCode::Internal(format!(
"Invalid fuse table, table option {} not found",
OPT_KEY_DATABASE_ID
))
})?;
options.insert(
OPT_KEY_SOURCE_DATABASE_ID.to_owned(),
source_db_id.to_string(),
);
options.remove(OPT_KEY_DATABASE_NAME);
options.remove(OPT_KEY_TABLE_NAME);
}

reqs.push(UpdateStreamMetaReq {
stream_id: stream_info.ident.table_id,
seq: MatchSeq::Exact(stream_info.ident.seq),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::sync::Arc;

use databend_common_ast::parser::Dialect;
use databend_common_catalog::catalog::Catalog;
use databend_common_catalog::table::Table;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
Expand All @@ -30,6 +31,7 @@ use databend_common_storages_stream::stream_table::STREAM_ENGINE;
use databend_common_storages_view::view_table::QUERY;
use databend_common_storages_view::view_table::VIEW_ENGINE;
use databend_storages_common_table_meta::table::is_internal_opt_key;
use databend_storages_common_table_meta::table::StreamMode;
use databend_storages_common_table_meta::table::OPT_KEY_STORAGE_PREFIX;
use databend_storages_common_table_meta::table::OPT_KEY_TABLE_ATTACHED_DATA_URI;
use databend_storages_common_table_meta::table::OPT_KEY_TABLE_ATTACHED_READ_ONLY;
Expand Down Expand Up @@ -86,7 +88,13 @@ impl Interpreter for ShowCreateTableInterpreter {
.unwrap_or(false),
};

let create_query = Self::show_create_query(&self.plan.database, table.as_ref(), &settings)?;
let create_query = Self::show_create_query(
catalog.as_ref(),
&self.plan.database,
table.as_ref(),
&settings,
)
.await?;

let block = DataBlock::new(
vec![
Expand All @@ -107,13 +115,14 @@ impl Interpreter for ShowCreateTableInterpreter {
}

impl ShowCreateTableInterpreter {
pub fn show_create_query(
pub async fn show_create_query(
catalog: &dyn Catalog,
database: &str,
table: &dyn Table,
settings: &ShowCreateQuerySettings,
) -> Result<String> {
match table.engine() {
STREAM_ENGINE => Self::show_create_stream_query(table),
STREAM_ENGINE => Self::show_create_stream_query(catalog, table).await,
VIEW_ENGINE => Self::show_create_view_query(table, database),
_ => match table.options().get(OPT_KEY_STORAGE_PREFIX) {
Some(_) => Ok(Self::show_attach_table_query(table, database)),
Expand Down Expand Up @@ -283,15 +292,23 @@ impl ShowCreateTableInterpreter {
Ok(view_create_sql)
}

fn show_create_stream_query(table: &dyn Table) -> Result<String> {
async fn show_create_stream_query(catalog: &dyn Catalog, table: &dyn Table) -> Result<String> {
let stream_table = StreamTable::try_from_table(table)?;
let source_database_name = stream_table.source_database_name(catalog).await?;
let source_table_name = stream_table.source_table_name(catalog).await?;
let mode = stream_table.mode();

let mut create_sql = format!(
"CREATE STREAM `{}` ON TABLE `{}`.`{}`",
stream_table.name(),
stream_table.source_table_database(),
stream_table.source_table_name()
source_database_name,
source_table_name
);

if matches!(mode, StreamMode::Standard) {
create_sql.push_str(" APPEND_ONLY = false");
}

let comment = stream_table.get_table_info().meta.comment.clone();
if !comment.is_empty() {
create_sql.push_str(format!(" COMMENT = '{}'", comment).as_str());
Expand Down
Loading

0 comments on commit 24a3041

Please sign in to comment.