Skip to content

Commit

Permalink
feat(ddl): Support add stored computed column
Browse files Browse the repository at this point in the history
  • Loading branch information
b41sh committed Jan 21, 2025
1 parent 7969d36 commit 836264c
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 168 deletions.
68 changes: 1 addition & 67 deletions src/query/ast/src/parser/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3556,72 +3556,6 @@ pub fn alter_database_action(i: Input) -> IResult<AlterDatabaseAction> {
)(i)
}

pub fn modify_column_type(i: Input) -> IResult<ColumnDefinition> {
#[derive(Clone)]
enum ColumnConstraint {
Nullable(bool),
DefaultExpr(Box<Expr>),
}

let nullable = alt((
value(ColumnConstraint::Nullable(true), rule! { NULL }),
value(ColumnConstraint::Nullable(false), rule! { NOT ~ ^NULL }),
));
let expr = alt((map(
rule! {
DEFAULT ~ ^#subexpr(NOT_PREC)
},
|(_, default_expr)| ColumnConstraint::DefaultExpr(Box::new(default_expr)),
),));

let comment = map(
rule! {
COMMENT ~ #literal_string
},
|(_, comment)| comment,
);

map_res(
rule! {
#ident
~ #type_name
~ ( #nullable | #expr )*
~ ( #comment )?
: "`<column name> <type> [DEFAULT <expr>] [COMMENT '<comment>']`"
},
|(name, data_type, constraints, comment)| {
let mut def = ColumnDefinition {
name,
data_type,
expr: None,
comment,
};
for constraint in constraints {
match constraint {
ColumnConstraint::Nullable(nullable) => {
if (nullable && matches!(def.data_type, TypeName::NotNull(_)))
|| (!nullable && matches!(def.data_type, TypeName::Nullable(_)))
{
return Err(nom::Err::Failure(ErrorKind::Other(
"ambiguous NOT NULL constraint",
)));
}
if nullable {
def.data_type = def.data_type.wrap_nullable();
} else {
def.data_type = def.data_type.wrap_not_null();
}
}
ColumnConstraint::DefaultExpr(default_expr) => {
def.expr = Some(ColumnExpr::Default(default_expr))
}
}
}
Ok(def)
},
)(i)
}

pub fn modify_column_action(i: Input) -> IResult<ModifyColumnAction> {
let set_mask_policy = map(
rule! {
Expand All @@ -3648,7 +3582,7 @@ pub fn modify_column_action(i: Input) -> IResult<ModifyColumnAction> {

let modify_column_type = map(
rule! {
#modify_column_type ~ ("," ~ COLUMN? ~ #modify_column_type)*
#column_def ~ ("," ~ COLUMN? ~ #column_def)*
},
|(column_def, column_def_vec)| {
let mut column_defs = vec![column_def];
Expand Down
4 changes: 2 additions & 2 deletions src/query/service/src/interpreters/common/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ pub fn check_referenced_computed_columns(
Ok(expr) => {
if expr.data_type() != f.data_type() {
return Err(ErrorCode::ColumnReferencedByComputedColumn(format!(
"expected computed column expression have type {}, but `{}` has type {}.",
"expected computed column expression have type {}, but got type {}, may caused by modify column `{}`.",
f.data_type(),
column,
expr.data_type(),
column,
)));
}
}
Expand Down
57 changes: 50 additions & 7 deletions src/query/service/src/interpreters/interpreter_table_add_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use databend_common_catalog::table::Table;
use databend_common_catalog::table::TableExt;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::ComputedExpr;
use databend_common_license::license::Feature::ComputedColumn;
use databend_common_license::license_manager::LicenseManagerSwitch;
use databend_common_meta_app::schema::DatabaseType;
Expand All @@ -39,6 +40,7 @@ use databend_storages_common_table_meta::table::OPT_KEY_SNAPSHOT_LOCATION;
use log::info;

use crate::interpreters::interpreter_table_create::is_valid_column;
use crate::interpreters::interpreter_table_modify_column::build_select_insert_plan;
use crate::interpreters::Interpreter;
use crate::interpreters::MutationInterpreter;
use crate::pipelines::PipelineBuildResult;
Expand Down Expand Up @@ -76,7 +78,7 @@ impl Interpreter for AddTableColumnInterpreter {
// check mutability
tbl.check_mutable()?;

let table_info = tbl.get_table_info();
let mut table_info = tbl.get_table_info().clone();
let engine = table_info.engine();
if matches!(engine, VIEW_ENGINE | STREAM_ENGINE) {
return Err(ErrorCode::TableEngineNotSupported(format!(
Expand All @@ -92,7 +94,6 @@ impl Interpreter for AddTableColumnInterpreter {
}

let catalog = self.ctx.get_catalog(catalog_name).await?;
let mut new_table_meta = table_info.meta.clone();
let field = self.plan.field.clone();
if field.computed_expr().is_some() {
LicenseManagerSwitch::instance()
Expand All @@ -105,11 +106,53 @@ impl Interpreter for AddTableColumnInterpreter {
is_valid_column(field.name())?;
let index = match &self.plan.option {
AddColumnOption::First => 0,
AddColumnOption::After(name) => new_table_meta.schema.index_of(name)? + 1,
AddColumnOption::End => new_table_meta.schema.num_fields(),
AddColumnOption::After(name) => table_info.meta.schema.index_of(name)? + 1,
AddColumnOption::End => table_info.meta.schema.num_fields(),
};
new_table_meta.add_column(&field, &self.plan.comment, index)?;
table_info
.meta
.add_column(&field, &self.plan.comment, index)?;

if let Some(ComputedExpr::Stored(stored_expr)) = field.computed_expr {
let fuse_table = FuseTable::try_from_table(tbl.as_ref())?;
let prev_snapshot_id = fuse_table
.read_table_snapshot()
.await
.map_or(None, |v| v.map(|snapshot| snapshot.snapshot_id));

let new_schema = table_info.meta.schema.remove_virtual_computed_fields();
let field_index = new_schema.index_of(&field.name)?;

let query_fields = new_schema
.fields()
.iter()
.enumerate()
.map(|(index, field)| {
if index == field_index {
format!("{} AS `{}`", stored_expr, field.name)
} else {
format!("`{}`", field.name)
}
})
.collect::<Vec<_>>()
.join(", ");

let sql = format!(
"SELECT {} FROM `{}`.`{}`",
query_fields, self.plan.database, self.plan.table
);

return build_select_insert_plan(
self.ctx.clone(),
sql,
table_info.clone(),
new_schema.into(),
prev_snapshot_id,
)
.await;
}

let mut new_table_meta = table_info.meta.clone();
let _ = generate_new_snapshot(self.ctx.as_ref(), tbl.as_ref(), &mut new_table_meta).await?;
let table_id = table_info.ident.table_id;
let table_version = table_info.ident.seq;
Expand All @@ -120,14 +163,14 @@ impl Interpreter for AddTableColumnInterpreter {
new_table_meta,
};

let _resp = catalog.update_single_table_meta(req, table_info).await?;
let _resp = catalog.update_single_table_meta(req, &table_info).await?;

// If the column is not deterministic, update to refresh the value with default expr.
if !self.plan.is_deterministic {
self.ctx
.evict_table_from_cache(catalog_name, db_name, tbl_name)?;
let query = format!(
"update `{}`.`{}` set `{}` = {};",
"UPDATE `{}`.`{}` SET `{}` = {};",
db_name,
tbl_name,
field.name(),
Expand Down
Loading

0 comments on commit 836264c

Please sign in to comment.