Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
b41sh committed Jan 22, 2025
1 parent 6d9560f commit 212ea30
Show file tree
Hide file tree
Showing 5 changed files with 158 additions and 99 deletions.
68 changes: 67 additions & 1 deletion src/query/ast/src/parser/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3556,6 +3556,72 @@ pub fn alter_database_action(i: Input) -> IResult<AlterDatabaseAction> {
)(i)
}

pub fn modify_column_type(i: Input) -> IResult<ColumnDefinition> {
#[derive(Clone)]
enum ColumnConstraint {
Nullable(bool),
DefaultExpr(Box<Expr>),
}

let nullable = alt((
value(ColumnConstraint::Nullable(true), rule! { NULL }),
value(ColumnConstraint::Nullable(false), rule! { NOT ~ ^NULL }),
));
let expr = alt((map(
rule! {
DEFAULT ~ ^#subexpr(NOT_PREC)
},
|(_, default_expr)| ColumnConstraint::DefaultExpr(Box::new(default_expr)),
),));

let comment = map(
rule! {
COMMENT ~ #literal_string
},
|(_, comment)| comment,
);

map_res(
rule! {
#ident
~ #type_name
~ ( #nullable | #expr )*
~ ( #comment )?
: "`<column name> <type> [DEFAULT <expr>] [COMMENT '<comment>']`"
},
|(name, data_type, constraints, comment)| {
let mut def = ColumnDefinition {
name,
data_type,
expr: None,
comment,
};
for constraint in constraints {
match constraint {
ColumnConstraint::Nullable(nullable) => {
if (nullable && matches!(def.data_type, TypeName::NotNull(_)))
|| (!nullable && matches!(def.data_type, TypeName::Nullable(_)))
{
return Err(nom::Err::Failure(ErrorKind::Other(
"ambiguous NOT NULL constraint",
)));
}
if nullable {
def.data_type = def.data_type.wrap_nullable();
} else {
def.data_type = def.data_type.wrap_not_null();
}
}
ColumnConstraint::DefaultExpr(default_expr) => {
def.expr = Some(ColumnExpr::Default(default_expr))
}
}
}
Ok(def)
},
)(i)
}

pub fn modify_column_action(i: Input) -> IResult<ModifyColumnAction> {
let set_mask_policy = map(
rule! {
Expand All @@ -3582,7 +3648,7 @@ pub fn modify_column_action(i: Input) -> IResult<ModifyColumnAction> {

let modify_column_type = map(
rule! {
#column_def ~ ("," ~ COLUMN? ~ #column_def)*
#modify_column_type ~ ("," ~ COLUMN? ~ #modify_column_type)*
},
|(column_def, column_def_vec)| {
let mut column_defs = vec![column_def];
Expand Down
18 changes: 6 additions & 12 deletions src/query/service/src/interpreters/interpreter_table_add_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,27 +113,21 @@ impl Interpreter for AddTableColumnInterpreter {
.meta
.add_column(&field, &self.plan.comment, index)?;

if let Some(ComputedExpr::Stored(stored_expr)) = field.computed_expr {
// if the new column is a stored computed field,
// need rebuild the table to generate stored computed column.
if let Some(ComputedExpr::Stored(_)) = field.computed_expr {
let fuse_table = FuseTable::try_from_table(tbl.as_ref())?;
let prev_snapshot_id = fuse_table
.read_table_snapshot()
.await
.map_or(None, |v| v.map(|snapshot| snapshot.snapshot_id));

let new_schema = table_info.meta.schema.remove_virtual_computed_fields();
let field_index = new_schema.index_of(&field.name)?;

// computed columns will generated from other columns.
let new_schema = table_info.meta.schema.remove_computed_fields();
let query_fields = new_schema
.fields()
.iter()
.enumerate()
.map(|(index, field)| {
if index == field_index {
format!("{} AS `{}`", stored_expr, field.name)
} else {
format!("`{}`", field.name)
}
})
.map(|field| format!("`{}`", field.name))
.collect::<Vec<_>>()
.join(", ");

Expand Down
134 changes: 50 additions & 84 deletions src/query/service/src/interpreters/interpreter_table_modify_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;

Expand Down Expand Up @@ -148,7 +147,6 @@ impl ModifyTableColumnInterpreter {
let schema = table.schema().as_ref().clone();
let table_info = table.get_table_info();
let mut new_schema = schema.clone();

// first check default expr before lock table
for (field, _comment) in field_and_comments {
if let Some((i, old_field)) = schema.column_with_name(&field.name) {
Expand All @@ -158,14 +156,17 @@ impl ModifyTableColumnInterpreter {
let _ = new_schema.drop_column(&field.name);
let _ = new_schema.add_column(field, i);
} else {
new_schema.fields[i] = field.clone();
// new field don't have `column_id`, assign field directly will cause `column_id` lost.
new_schema.fields[i].data_type = field.data_type.clone();
new_schema.fields[i].default_expr = field.default_expr.clone();
new_schema.fields[i].computed_expr = field.computed_expr.clone();
}
if let Some(default_expr) = &field.default_expr {
let default_expr = default_expr.to_string();
new_schema.fields[i].default_expr = Some(default_expr);
let _ = field_default_value(self.ctx.clone(), &new_schema.fields[i])?;
}
if old_field.data_type != field.data_type && field.computed_expr.is_none() {
if old_field.data_type != field.data_type {
// Check if this column is referenced by computed columns.
let data_schema = DataSchema::from(&new_schema);
check_referenced_computed_columns(
Expand Down Expand Up @@ -264,88 +265,24 @@ impl ModifyTableColumnInterpreter {
return Ok(PipelineBuildResult::create());
}

// if alter column from string to binary in parquet, we don't need to rebuild table
let is_alter_column_string_to_binary = table.storage_format_as_parquet()
&& schema
.fields()
.iter()
.zip(new_schema.fields())
.all(|(old_field, new_field)| {
fn is_string_to_binary(old_ty: &TableDataType, new_ty: &TableDataType) -> bool {
match (old_ty, new_ty) {
(TableDataType::String, TableDataType::Binary) => true,
(TableDataType::Nullable(old_ty), TableDataType::Nullable(new_ty)) => {
is_string_to_binary(old_ty, new_ty)
}
(TableDataType::Map(old_ty), TableDataType::Map(new_ty)) => {
is_string_to_binary(old_ty, new_ty)
}
(TableDataType::Array(old_ty), TableDataType::Array(new_ty)) => {
is_string_to_binary(old_ty, new_ty)
}
(
TableDataType::Tuple {
fields_type: old_tys,
..
},
TableDataType::Tuple {
fields_type: new_tys,
..
},
) => {
old_tys.len() == new_tys.len()
&& old_tys
.iter()
.zip(new_tys)
.all(|(old_ty, new_ty)| is_string_to_binary(old_ty, new_ty))
}
_ => false,
}
}

let TableField {
name: old_name,
default_expr: old_default_expr,
data_type: old_data_type,
column_id: old_column_id,
computed_expr: old_computed_expr,
} = old_field;
let TableField {
name: new_name,
default_expr: new_default_expr,
data_type: new_data_type,
column_id: new_column_id,
computed_expr: new_computed_expr,
} = new_field;
old_name == new_name
&& old_default_expr == new_default_expr
&& old_column_id == new_column_id
&& old_computed_expr == new_computed_expr
&& (old_data_type == new_data_type
|| is_string_to_binary(&old_field.data_type, &new_field.data_type))
});

let new_schema_without_virtual_fields = new_schema.remove_virtual_computed_fields();
let mut modified_field_indices = HashSet::new();
let mut modified_stored_field_indices = HashMap::new();
let new_schema_without_computed_fields = new_schema.remove_computed_fields();
for (field, _) in field_and_comments {
if let Some(ComputedExpr::Virtual(_)) = field.computed_expr {
continue;
}
let field_index = new_schema_without_virtual_fields.index_of(&field.name)?;
if let Some(ComputedExpr::Stored(stored_expr)) = &field.computed_expr {
modified_stored_field_indices.insert(field_index, stored_expr.clone());
let field_index = new_schema_without_computed_fields.index_of(&field.name)?;
let old_field = schema.field_with_name(&field.name)?;
let is_alter_column_string_to_binary =
is_string_to_binary(&old_field.data_type, &field.data_type);
// if alter column from string to binary in parquet, we don't need to rebuild table
if (table.storage_format_as_parquet() && is_alter_column_string_to_binary)
|| old_field.data_type.remove_nullable() == field.data_type.remove_nullable()
{
continue;
}
modified_field_indices.insert(field_index);
}

table_info.meta.schema = new_schema.clone().into();

if is_alter_column_string_to_binary
&& modified_field_indices.is_empty()
&& modified_stored_field_indices.is_empty()
{
if modified_field_indices.is_empty() {
let table_id = table_info.ident.table_id;
let table_version = table_info.ident.seq;

Expand All @@ -363,14 +300,12 @@ impl ModifyTableColumnInterpreter {
}

// construct sql for selecting data from old table
let query_fields = new_schema_without_virtual_fields
let query_fields = new_schema_without_computed_fields
.fields()
.iter()
.enumerate()
.map(|(index, field)| {
if let Some(stored_expr) = modified_stored_field_indices.remove(&index) {
format!("{} AS `{}`", stored_expr, field.name)
} else if modified_field_indices.contains(&index) {
if modified_field_indices.contains(&index) {
let old_field = schema.field_with_name(&field.name).unwrap();
// If the column type is Tuple or Array(Tuple), the difference in the number of leaf columns may cause
// the auto cast to fail.
Expand Down Expand Up @@ -511,7 +446,7 @@ impl ModifyTableColumnInterpreter {
self.ctx.clone(),
sql,
table_info,
new_schema.into(),
new_schema_without_computed_fields.into(),
prev_snapshot_id,
)
.await
Expand Down Expand Up @@ -675,6 +610,38 @@ impl Interpreter for ModifyTableColumnInterpreter {
}
}

fn is_string_to_binary(old_ty: &TableDataType, new_ty: &TableDataType) -> bool {
match (old_ty, new_ty) {
(TableDataType::String, TableDataType::Binary) => true,
(TableDataType::Nullable(old_ty), TableDataType::Nullable(new_ty)) => {
is_string_to_binary(old_ty, new_ty)
}
(TableDataType::Map(old_ty), TableDataType::Map(new_ty)) => {
is_string_to_binary(old_ty, new_ty)
}
(TableDataType::Array(old_ty), TableDataType::Array(new_ty)) => {
is_string_to_binary(old_ty, new_ty)
}
(
TableDataType::Tuple {
fields_type: old_tys,
..
},
TableDataType::Tuple {
fields_type: new_tys,
..
},
) => {
old_tys.len() == new_tys.len()
&& old_tys
.iter()
.zip(new_tys)
.all(|(old_ty, new_ty)| is_string_to_binary(old_ty, new_ty))
}
_ => false,
}
}

pub(crate) async fn build_select_insert_plan(
ctx: Arc<QueryContext>,
sql: String,
Expand All @@ -685,7 +652,6 @@ pub(crate) async fn build_select_insert_plan(
// 1. build plan by sql
let mut planner = Planner::new(ctx.clone());
let (plan, _extras) = planner.plan_sql(&sql).await?;

let select_schema = plan.schema();

// 2. build physical plan by plan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,12 @@ SELECT * FROM `05_0028_at_t0` order by a
statement ok
DROP TABLE IF EXISTS `05_0028_at_t0`

statement ok
DROP TABLE IF EXISTS `05_0028_at_t0_3`

statement ok
DROP TABLE IF EXISTS `05_0028_at_t0_4`

statement ok
CREATE TABLE `05_0028_at_t0_3`(c int not null)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,34 @@ statement ok
create table t1(a string null default 'a', b string null as (concat(a, '-', c)) stored, c string null default 'c', d string null as (reverse(a)) virtual)

statement ok
alter table t1 add column f string null as (lower(c)) virtual
insert into t1 values ('A1', 'C1'), ('A2', 'C2')

query TTTT
select * from t1
----
A1 A1-C1 C1 1A
A2 A2-C2 C2 2A

statement ok
alter table t1 add column e string null as (lower(c)) virtual

query TTTTT
select * from t1
----
A1 A1-C1 C1 1A c1
A2 A2-C2 C2 2A c2

statement ok
alter table t1 add column f string null as (concat(lower(c), '-', a)) stored

query TTTTTT
select * from t1
----
A1 A1-C1 C1 1A c1 c1-A1
A2 A2-C2 C2 2A c2 c2-A2

statement error 1065
alter table t1 add column e string null as (upper(c)) stored
alter table t1 add column g string null as (concat(a, '-', b)) stored

statement ok
alter table t1 drop column b
Expand All @@ -42,6 +66,9 @@ alter table t1 drop column a
statement ok
alter table t1 drop column d

statement ok
alter table t1 drop column f

statement ok
alter table t1 rename column a to x

Expand Down

0 comments on commit 212ea30

Please sign in to comment.