Skip to content

Commit

Permalink
feat(ddl): Support add stored computed column (databendlabs#17347)
Browse files Browse the repository at this point in the history
* feat(ddl): Support add stored computed column

* fix

* fix tests

* add comments
  • Loading branch information
b41sh authored Jan 23, 2025
1 parent bfe0c3e commit 7515db6
Show file tree
Hide file tree
Showing 6 changed files with 245 additions and 182 deletions.
4 changes: 2 additions & 2 deletions src/query/service/src/interpreters/common/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ pub fn check_referenced_computed_columns(
Ok(expr) => {
if expr.data_type() != f.data_type() {
return Err(ErrorCode::ColumnReferencedByComputedColumn(format!(
"expected computed column expression have type {}, but `{}` has type {}.",
"expected computed column expression have type {}, but got type {}, may caused by modify column `{}`.",
f.data_type(),
column,
expr.data_type(),
column,
)));
}
}
Expand Down
51 changes: 44 additions & 7 deletions src/query/service/src/interpreters/interpreter_table_add_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use databend_common_catalog::table::Table;
use databend_common_catalog::table::TableExt;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::ComputedExpr;
use databend_common_license::license::Feature::ComputedColumn;
use databend_common_license::license_manager::LicenseManagerSwitch;
use databend_common_meta_app::schema::DatabaseType;
Expand All @@ -39,6 +40,7 @@ use databend_storages_common_table_meta::table::OPT_KEY_SNAPSHOT_LOCATION;
use log::info;

use crate::interpreters::interpreter_table_create::is_valid_column;
use crate::interpreters::interpreter_table_modify_column::build_select_insert_plan;
use crate::interpreters::Interpreter;
use crate::interpreters::MutationInterpreter;
use crate::pipelines::PipelineBuildResult;
Expand Down Expand Up @@ -76,7 +78,7 @@ impl Interpreter for AddTableColumnInterpreter {
// check mutability
tbl.check_mutable()?;

let table_info = tbl.get_table_info();
let mut table_info = tbl.get_table_info().clone();
let engine = table_info.engine();
if matches!(engine, VIEW_ENGINE | STREAM_ENGINE) {
return Err(ErrorCode::TableEngineNotSupported(format!(
Expand All @@ -92,7 +94,6 @@ impl Interpreter for AddTableColumnInterpreter {
}

let catalog = self.ctx.get_catalog(catalog_name).await?;
let mut new_table_meta = table_info.meta.clone();
let field = self.plan.field.clone();
if field.computed_expr().is_some() {
LicenseManagerSwitch::instance()
Expand All @@ -105,11 +106,47 @@ impl Interpreter for AddTableColumnInterpreter {
is_valid_column(field.name())?;
let index = match &self.plan.option {
AddColumnOption::First => 0,
AddColumnOption::After(name) => new_table_meta.schema.index_of(name)? + 1,
AddColumnOption::End => new_table_meta.schema.num_fields(),
AddColumnOption::After(name) => table_info.meta.schema.index_of(name)? + 1,
AddColumnOption::End => table_info.meta.schema.num_fields(),
};
new_table_meta.add_column(&field, &self.plan.comment, index)?;
table_info
.meta
.add_column(&field, &self.plan.comment, index)?;

// if the new column is a stored computed field,
// need rebuild the table to generate stored computed column.
if let Some(ComputedExpr::Stored(_)) = field.computed_expr {
let fuse_table = FuseTable::try_from_table(tbl.as_ref())?;
let prev_snapshot_id = fuse_table
.read_table_snapshot()
.await
.map_or(None, |v| v.map(|snapshot| snapshot.snapshot_id));

// computed columns will generated from other columns.
let new_schema = table_info.meta.schema.remove_computed_fields();
let query_fields = new_schema
.fields()
.iter()
.map(|field| format!("`{}`", field.name))
.collect::<Vec<_>>()
.join(", ");

let sql = format!(
"SELECT {} FROM `{}`.`{}`",
query_fields, self.plan.database, self.plan.table
);

return build_select_insert_plan(
self.ctx.clone(),
sql,
table_info.clone(),
new_schema.into(),
prev_snapshot_id,
)
.await;
}

let mut new_table_meta = table_info.meta.clone();
let _ = generate_new_snapshot(self.ctx.as_ref(), tbl.as_ref(), &mut new_table_meta).await?;
let table_id = table_info.ident.table_id;
let table_version = table_info.ident.seq;
Expand All @@ -120,14 +157,14 @@ impl Interpreter for AddTableColumnInterpreter {
new_table_meta,
};

let _resp = catalog.update_single_table_meta(req, table_info).await?;
let _resp = catalog.update_single_table_meta(req, &table_info).await?;

// If the column is not deterministic, update to refresh the value with default expr.
if !self.plan.is_deterministic {
self.ctx
.evict_table_from_cache(catalog_name, db_name, tbl_name)?;
let query = format!(
"update `{}`.`{}` set `{}` = {};",
"UPDATE `{}`.`{}` SET `{}` = {};",
db_name,
tbl_name,
field.name(),
Expand Down
Loading

0 comments on commit 7515db6

Please sign in to comment.