diff --git a/src/query/catalog/src/table.rs b/src/query/catalog/src/table.rs index ac35cca7ee19e..c31d033f3658b 100644 --- a/src/query/catalog/src/table.rs +++ b/src/query/catalog/src/table.rs @@ -240,6 +240,23 @@ pub trait Table: Sync + Send { ))) } + async fn update( + &self, + ctx: Arc, + filter: Option>, + col_indices: Vec, + update_list: Vec<(usize, RemoteExpr)>, + pipeline: &mut Pipeline, + ) -> Result<()> { + let (_, _, _, _, _) = (ctx, filter, col_indices, update_list, pipeline); + + Err(ErrorCode::Unimplemented(format!( + "table {}, of engine type {}, does not support UPDATE", + self.name(), + self.get_table_info().engine(), + ))) + } + fn get_block_compact_thresholds(&self) -> BlockCompactThresholds { BlockCompactThresholds { max_rows_per_block: 1000 * 1000, diff --git a/src/query/service/src/interpreters/interpreter_delete.rs b/src/query/service/src/interpreters/interpreter_delete.rs index e4144b6cf5a3f..31f71a4a54601 100644 --- a/src/query/service/src/interpreters/interpreter_delete.rs +++ b/src/query/service/src/interpreters/interpreter_delete.rs @@ -17,7 +17,6 @@ use std::sync::Arc; use common_exception::Result; use common_expression::DataSchemaRef; use common_pipeline_core::Pipeline; -use common_sql::plans::DeletePlan; use crate::interpreters::Interpreter; use crate::pipelines::executor::ExecutorSettings; @@ -25,6 +24,7 @@ use crate::pipelines::executor::PipelineCompleteExecutor; use crate::pipelines::PipelineBuildResult; use crate::sessions::QueryContext; use crate::sessions::TableContext; +use crate::sql::plans::DeletePlan; use crate::sql::plans::ScalarExpr; /// interprets DeletePlan @@ -34,7 +34,7 @@ pub struct DeleteInterpreter { } impl DeleteInterpreter { - /// Create the DelectInterpreter from DelectPlan + /// Create the DeleteInterpreter from DeletePlan pub fn try_create(ctx: Arc, plan: DeletePlan) -> Result { Ok(DeleteInterpreter { ctx, plan }) } @@ -47,7 +47,7 @@ impl Interpreter for DeleteInterpreter { "DeleteInterpreter" } - /// Get the schema of SelectPlan + /// Get the schema of DeletePlan fn schema(&self) -> DataSchemaRef { self.plan.schema() } @@ -59,6 +59,7 @@ impl Interpreter for DeleteInterpreter { let db_name = self.plan.database_name.as_str(); let tbl_name = self.plan.table_name.as_str(); let tbl = self.ctx.get_table(catalog_name, db_name, tbl_name).await?; + let (filter, col_indices) = if let Some(scalar) = &self.plan.selection { let filter = scalar.as_expr()?.as_remote_expr(); let col_indices = scalar.used_columns().into_iter().collect(); diff --git a/src/query/service/src/interpreters/interpreter_factory.rs b/src/query/service/src/interpreters/interpreter_factory.rs index 942c1b1487065..302322bfd5eb7 100644 --- a/src/query/service/src/interpreters/interpreter_factory.rs +++ b/src/query/service/src/interpreters/interpreter_factory.rs @@ -15,7 +15,6 @@ use std::sync::Arc; use common_ast::ast::ExplainKind; -use common_exception::ErrorCode; use common_exception::Result; use tracing::error; @@ -35,6 +34,7 @@ use crate::interpreters::CreateShareInterpreter; use crate::interpreters::DropShareInterpreter; use crate::interpreters::DropUserInterpreter; use crate::interpreters::SetRoleInterpreter; +use crate::interpreters::UpdateInterpreter; use crate::sessions::QueryContext; use crate::sql::plans::Plan; @@ -200,9 +200,10 @@ impl InterpreterFactory { *delete.clone(), )?)), - Plan::Update(_update) => Err(ErrorCode::Unimplemented( - "Unimplement for update".to_string(), - )), + Plan::Update(update) => Ok(Arc::new(UpdateInterpreter::try_create( + ctx, + *update.clone(), + )?)), // Roles Plan::CreateRole(create_role) => Ok(Arc::new(CreateRoleInterpreter::try_create( diff --git a/src/query/service/src/interpreters/interpreter_update.rs b/src/query/service/src/interpreters/interpreter_update.rs new file mode 100644 index 0000000000000..d9a7603f0da21 --- /dev/null +++ b/src/query/service/src/interpreters/interpreter_update.rs @@ -0,0 +1,161 @@ +// Copyright 2021 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_exception::ErrorCode; +use common_exception::Result; +use common_expression::types::DataType; +use common_expression::DataSchema; +use common_expression::DataSchemaRef; +use common_pipeline_core::Pipeline; +use common_sql::plans::BoundColumnRef; +use common_sql::plans::CastExpr; +use common_sql::plans::FunctionCall; +use common_sql::BindContext; +use common_sql::ColumnBinding; +use common_sql::Scalar; +use common_sql::Visibility; + +use crate::interpreters::Interpreter; +use crate::pipelines::executor::ExecutorSettings; +use crate::pipelines::executor::PipelineCompleteExecutor; +use crate::pipelines::PipelineBuildResult; +use crate::sessions::QueryContext; +use crate::sessions::TableContext; +use crate::sql::plans::ScalarExpr; +use crate::sql::plans::UpdatePlan; + +/// interprets UpdatePlan +pub struct UpdateInterpreter { + ctx: Arc, + plan: UpdatePlan, +} + +impl UpdateInterpreter { + /// Create the UpdateInterpreter from UpdatePlan + pub fn try_create(ctx: Arc, plan: UpdatePlan) -> Result { + Ok(UpdateInterpreter { ctx, plan }) + } +} + +#[async_trait::async_trait] +impl Interpreter for UpdateInterpreter { + /// Get the name of current interpreter + fn name(&self) -> &str { + "UpdateInterpreter" + } + + /// Get the schema of UpdatePlan + fn schema(&self) -> DataSchemaRef { + self.plan.schema() + } + + #[tracing::instrument(level = "debug", name = "update_interpreter_execute", skip(self), fields(ctx.id = self.ctx.get_id().as_str()))] + async fn execute2(&self) -> Result { + // TODO check privilege + let catalog_name = self.plan.catalog.as_str(); + let db_name = self.plan.database.as_str(); + let tbl_name = self.plan.table.as_str(); + let tbl = self.ctx.get_table(catalog_name, db_name, tbl_name).await?; + + let (filter, col_indices) = if let Some(scalar) = &self.plan.selection { + let filter = scalar.as_expr()?.as_remote_expr(); + let col_indices = scalar.used_columns().into_iter().collect(); + (Some(filter), col_indices) + } else { + (None, vec![]) + }; + + let predicate = Scalar::BoundColumnRef(BoundColumnRef { + column: ColumnBinding { + database_name: None, + table_name: None, + column_name: "_predicate".to_string(), + index: tbl.schema().num_fields(), + data_type: Box::new(DataType::Boolean), + visibility: Visibility::Visible, + }, + }); + + let schema: DataSchema = tbl.schema().into(); + let update_list = self.plan.update_list.iter().try_fold( + Vec::with_capacity(self.plan.update_list.len()), + |mut acc, (id, scalar)| { + let filed = schema.field(*id); + let left = Scalar::CastExpr(CastExpr { + argument: Box::new(scalar.clone()), + from_type: Box::new(scalar.data_type()), + target_type: Box::new(filed.data_type().clone()), + }); + let scalar = if col_indices.is_empty() { + // The condition is always true. + // Replace column to the result of the following expression: + // CAST(expression, type) + left + } else { + // Replace column to the result of the following expression: + // if(condition, CAST(expression, type), column) + let mut right = None; + for column_binding in self.plan.bind_context.columns.iter() { + if BindContext::match_column_binding( + Some(db_name), + Some(tbl_name), + filed.name(), + column_binding, + ) { + right = Some(Scalar::BoundColumnRef(BoundColumnRef { + column: column_binding.clone(), + })); + break; + } + } + let right = right.ok_or_else(|| ErrorCode::Internal("It's a bug"))?; + let return_type = right.data_type(); + Scalar::FunctionCall(FunctionCall { + params: vec![], + arguments: vec![predicate.clone(), left, right], + func_name: "if".to_string(), + return_type: Box::new(return_type), + }) + }; + acc.push((*id, scalar.as_expr()?.as_remote_expr())); + Ok::<_, ErrorCode>(acc) + }, + )?; + + let mut pipeline = Pipeline::create(); + tbl.update( + self.ctx.clone(), + filter, + col_indices, + update_list, + &mut pipeline, + ) + .await?; + if !pipeline.pipes.is_empty() { + let settings = self.ctx.get_settings(); + pipeline.set_max_threads(settings.get_max_threads()? as usize); + let query_id = self.ctx.get_id(); + let executor_settings = ExecutorSettings::try_create(&settings, query_id)?; + let executor = PipelineCompleteExecutor::try_create(pipeline, executor_settings)?; + + self.ctx.set_executor(Arc::downgrade(&executor.get_inner())); + executor.execute()?; + drop(executor); + } + + Ok(PipelineBuildResult::create()) + } +} diff --git a/src/query/service/src/interpreters/mod.rs b/src/query/service/src/interpreters/mod.rs index 9174a381e6ca8..f484bb563d165 100644 --- a/src/query/service/src/interpreters/mod.rs +++ b/src/query/service/src/interpreters/mod.rs @@ -70,6 +70,7 @@ mod interpreter_table_show_create; mod interpreter_table_truncate; mod interpreter_table_undrop; mod interpreter_unsetting; +mod interpreter_update; mod interpreter_use_database; mod interpreter_user_alter; mod interpreter_user_create; @@ -135,6 +136,7 @@ pub use interpreter_table_show_create::ShowCreateTableInterpreter; pub use interpreter_table_truncate::TruncateTableInterpreter; pub use interpreter_table_undrop::UndropTableInterpreter; pub use interpreter_unsetting::UnSettingInterpreter; +pub use interpreter_update::UpdateInterpreter; pub use interpreter_use_database::UseDatabaseInterpreter; pub use interpreter_user_alter::AlterUserInterpreter; pub use interpreter_user_create::CreateUserInterpreter; diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/deletion_mutator.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/deletion.rs similarity index 100% rename from src/query/service/tests/it/storages/fuse/operations/mutation/deletion_mutator.rs rename to src/query/service/tests/it/storages/fuse/operations/mutation/deletion.rs diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/mod.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/mod.rs index 634e6cc25d6bb..639ff7ebea084 100644 --- a/src/query/service/tests/it/storages/fuse/operations/mutation/mod.rs +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/mod.rs @@ -13,8 +13,8 @@ // limitations under the License. mod block_compact_mutator; -mod deletion_mutator; +mod deletion; mod recluster_mutator; mod segments_compact_mutator; -pub use deletion_mutator::do_deletion; +pub use deletion::do_deletion; diff --git a/src/query/sql/src/planner/binder/delete.rs b/src/query/sql/src/planner/binder/delete.rs index 09fc0bef0dd76..f505f9cb0407d 100644 --- a/src/query/sql/src/planner/binder/delete.rs +++ b/src/query/sql/src/planner/binder/delete.rs @@ -65,14 +65,6 @@ impl<'a> Binder { &[], ); - let table = self - .ctx - .get_table(&catalog_name, &database_name, &table_name) - .await?; - - let tbl_info = table.get_table_info(); - let table_id = tbl_info.ident; - let selection = if let Some(expr) = filter { let (scalar, _) = scalar_binder.bind(expr).await?; Some(scalar) @@ -84,8 +76,6 @@ impl<'a> Binder { catalog_name, database_name, table_name, - table_id, - metadata: self.metadata.clone(), selection, }; Ok(Plan::Delete(Box::new(plan))) diff --git a/src/query/sql/src/planner/binder/update.rs b/src/query/sql/src/planner/binder/update.rs index 1965d729d32c7..358386c3cd682 100644 --- a/src/query/sql/src/planner/binder/update.rs +++ b/src/query/sql/src/planner/binder/update.rs @@ -23,6 +23,7 @@ use crate::binder::Binder; use crate::binder::ScalarBinder; use crate::normalize_identifier; use crate::plans::Plan; +use crate::plans::Scalar; use crate::plans::UpdatePlan; use crate::BindContext; @@ -67,7 +68,6 @@ impl<'a> Binder { .ctx .get_table(&catalog_name, &database_name, &table_name) .await?; - let table_id = table.get_id(); let mut scalar_binder = ScalarBinder::new( &context, @@ -88,12 +88,23 @@ impl<'a> Binder { ))); } + // TODO(zhyass): selection and update_list support subquery. let (scalar, _) = scalar_binder.bind(&update_expr.expr).await?; + if matches!(scalar, Scalar::SubqueryExpr(_)) { + return Err(ErrorCode::Internal( + "Update does not support subquery temporarily", + )); + } update_columns.insert(index, scalar); } let push_downs = if let Some(expr) = selection { let (scalar, _) = scalar_binder.bind(expr).await?; + if matches!(scalar, Scalar::SubqueryExpr(_)) { + return Err(ErrorCode::Internal( + "Update does not support subquery temporarily", + )); + } Some(scalar) } else { None @@ -103,9 +114,9 @@ impl<'a> Binder { catalog: catalog_name, database: database_name, table: table_name, - table_id, update_list: update_columns, selection: push_downs, + bind_context: Box::new(context.clone()), }; Ok(Plan::Update(Box::new(plan))) } diff --git a/src/query/sql/src/planner/mod.rs b/src/query/sql/src/planner/mod.rs index cada12f16f57a..e417a7dc8db82 100644 --- a/src/query/sql/src/planner/mod.rs +++ b/src/query/sql/src/planner/mod.rs @@ -34,6 +34,7 @@ pub use expression_parser::parse_exprs; pub use expression_parser::parse_to_remote_string_exprs; pub use metadata::*; pub use planner::Planner; +pub use plans::Scalar; pub use plans::ScalarExpr; pub use semantic::normalize_identifier; pub use semantic::validate_function_arg; diff --git a/src/query/sql/src/planner/plans/delete.rs b/src/query/sql/src/planner/plans/delete.rs index 0d1f6090b9726..6ad0eccaf909d 100644 --- a/src/query/sql/src/planner/plans/delete.rs +++ b/src/query/sql/src/planner/plans/delete.rs @@ -16,18 +16,14 @@ use std::sync::Arc; use common_expression::DataSchema; use common_expression::DataSchemaRef; -use common_meta_app::schema::TableIdent; use crate::plans::Scalar; -use crate::MetadataRef; #[derive(Clone, Debug)] pub struct DeletePlan { pub catalog_name: String, pub database_name: String, pub table_name: String, - pub table_id: TableIdent, - pub metadata: MetadataRef, pub selection: Option, } diff --git a/src/query/sql/src/planner/plans/update.rs b/src/query/sql/src/planner/plans/update.rs index 768c9ed12c979..7ce2469b55689 100644 --- a/src/query/sql/src/planner/plans/update.rs +++ b/src/query/sql/src/planner/plans/update.rs @@ -13,17 +13,26 @@ // limitations under the License. use std::collections::HashMap; +use std::sync::Arc; -use common_meta_types::MetaId; +use common_expression::DataSchema; +use common_expression::DataSchemaRef; use crate::plans::Scalar; +use crate::BindContext; #[derive(Clone, Debug)] pub struct UpdatePlan { pub catalog: String, pub database: String, pub table: String, - pub table_id: MetaId, pub update_list: HashMap, pub selection: Option, + pub bind_context: Box, +} + +impl UpdatePlan { + pub fn schema(&self) -> DataSchemaRef { + Arc::new(DataSchema::empty()) + } } diff --git a/src/query/storages/common/pruner/src/topn_pruner.rs b/src/query/storages/common/pruner/src/topn_pruner.rs index f4ea31246fd8b..b29712ca1c051 100644 --- a/src/query/storages/common/pruner/src/topn_pruner.rs +++ b/src/query/storages/common/pruner/src/topn_pruner.rs @@ -22,7 +22,7 @@ use common_expression::TableSchemaRef; use storages_common_table_meta::meta::BlockMeta; use storages_common_table_meta::meta::ColumnStatistics; -#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)] +#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, PartialEq)] pub struct BlockMetaIndex { pub segment_idx: usize, pub block_idx: usize, diff --git a/src/query/storages/fuse/src/fuse_table.rs b/src/query/storages/fuse/src/fuse_table.rs index f8762404c926d..31dc18d520871 100644 --- a/src/query/storages/fuse/src/fuse_table.rs +++ b/src/query/storages/fuse/src/fuse_table.rs @@ -36,7 +36,6 @@ use common_exception::ErrorCode; use common_exception::Result; use common_expression::BlockCompactThresholds; use common_expression::DataBlock; -// use common_sql::ExpressionParser; use common_expression::RemoteExpr; use common_meta_app::schema::DatabaseType; use common_meta_app::schema::TableInfo; @@ -561,6 +560,18 @@ impl Table for FuseTable { self.do_delete(ctx, filter, col_indices, pipeline).await } + async fn update( + &self, + ctx: Arc, + filter: Option>, + col_indices: Vec, + update_list: Vec<(usize, RemoteExpr)>, + pipeline: &mut Pipeline, + ) -> Result<()> { + self.do_update(ctx, filter, col_indices, update_list, pipeline) + .await + } + fn get_block_compact_thresholds(&self) -> BlockCompactThresholds { let max_rows_per_block = self.get_option(FUSE_OPT_KEY_ROW_PER_BLOCK, DEFAULT_ROW_PER_BLOCK); let min_rows_per_block = (max_rows_per_block as f64 * 0.8) as usize; diff --git a/src/query/storages/fuse/src/operations/delete.rs b/src/query/storages/fuse/src/operations/delete.rs index 4484cc7b27009..94aecc8f15eda 100644 --- a/src/query/storages/fuse/src/operations/delete.rs +++ b/src/query/storages/fuse/src/operations/delete.rs @@ -39,10 +39,12 @@ use common_sql::evaluator::BlockOperator; use storages_common_table_meta::meta::Location; use storages_common_table_meta::meta::TableSnapshot; -use crate::operations::mutation::DeletionPartInfo; -use crate::operations::mutation::DeletionSource; -use crate::operations::mutation::DeletionTransform; +use crate::operations::mutation::MutationAction; +use crate::operations::mutation::MutationPartInfo; use crate::operations::mutation::MutationSink; +use crate::operations::mutation::MutationSource; +use crate::operations::mutation::MutationTransform; +use crate::operations::mutation::SerializeDataTransform; use crate::pipelines::processors::port::InputPort; use crate::pipelines::processors::port::OutputPort; use crate::pipelines::Pipe; @@ -53,13 +55,13 @@ use crate::FuseTable; impl FuseTable { /// The flow of Pipeline is as follows: - /// +---------------+ - /// |DeletionSource1| ------ - /// +---------------+ | +-----------------+ +------------+ - /// | ... | ... | ---> |DeletionTransform| ---> |MutationSink| - /// +---------------+ | +-----------------+ +------------+ - /// |DeletionSourceN| ------ - /// +---------------+ + /// +---------------+ +-----------------------+ + /// |MutationSource1| ---> |SerializeDataTransform1| ------ + /// +---------------+ +-----------------------+ | +-----------------+ +------------+ + /// | ... | ---> | ... | ... | ---> |MutationTransform| ---> |MutationSink| + /// +---------------+ +-----------------------+ | +-----------------+ +------------+ + /// |MutationSourceN| ---> |SerializeDataTransformN| ------ + /// +---------------+ +-----------------------+ pub async fn do_delete( &self, ctx: Arc, @@ -105,7 +107,7 @@ impl FuseTable { // if the `filter_expr` is of "constant" nullary : // for the whole block, whether all of the rows should be kept or dropped, // we can just return from here, without accessing the block data - if self.try_eval_const(ctx.clone(), &self.table_info.schema(), &filter_expr)? { + if self.try_eval_const(ctx.clone(), &self.schema(), &filter_expr)? { let progress_values = ProgressValues { rows: snapshot.summary.row_count as usize, bytes: snapshot.summary.uncompressed_byte_size as usize, @@ -123,7 +125,18 @@ impl FuseTable { self.try_add_deletion_source(ctx.clone(), &filter_expr, col_indices, &snapshot, pipeline) .await?; - self.try_add_deletion_transform(ctx.clone(), snapshot.segments.clone(), pipeline)?; + let cluster_stats_gen = self.cluster_stats_gen(ctx.clone())?; + pipeline.add_transform(|input, output| { + SerializeDataTransform::try_create( + ctx.clone(), + input, + output, + self, + cluster_stats_gen.clone(), + ) + })?; + + self.try_add_mutation_transform(ctx.clone(), snapshot.segments.clone(), pipeline)?; pipeline.add_sink(|input| { MutationSink::try_create(self, ctx.clone(), snapshot.clone(), input) @@ -131,7 +144,7 @@ impl FuseTable { Ok(()) } - fn try_eval_const( + pub fn try_eval_const( &self, ctx: Arc, schema: &TableSchema, @@ -176,9 +189,69 @@ impl FuseTable { pipeline: &mut Pipeline, ) -> Result<()> { let projection = Projection::Columns(col_indices.clone()); + self.mutation_block_purning( + ctx.clone(), + vec![filter.clone()], + projection.clone(), + base_snapshot, + ) + .await?; + + let block_reader = self.create_block_reader(projection)?; + let schema = block_reader.schema(); + let filter = + Arc::new(filter.as_expr(&BUILTIN_FUNCTIONS).map(|expr| { + expr.project_column_ref(|name| schema.column_with_name(name).unwrap().0) + })); + + let all_col_ids = self.all_the_columns_ids(); + let remain_col_ids: Vec = all_col_ids + .into_iter() + .filter(|id| !col_indices.contains(id)) + .collect(); + let mut source_col_ids = col_indices; + let remain_reader = if remain_col_ids.is_empty() { + Arc::new(None) + } else { + source_col_ids.extend_from_slice(&remain_col_ids); + Arc::new(Some( + (*self.create_block_reader(Projection::Columns(remain_col_ids))?).clone(), + )) + }; + + // resort the block. + let mut projection = (0..source_col_ids.len()).collect::>(); + projection.sort_by_key(|&i| source_col_ids[i]); + let ops = vec![BlockOperator::Project { projection }]; + + let max_threads = ctx.get_settings().get_max_threads()? as usize; + // Add source pipe. + pipeline.add_source( + |output| { + MutationSource::try_create( + ctx.clone(), + MutationAction::Deletion, + output, + filter.clone(), + block_reader.clone(), + remain_reader.clone(), + ops.clone(), + ) + }, + max_threads, + ) + } + + pub async fn mutation_block_purning( + &self, + ctx: Arc, + filters: Vec>, + projection: Projection, + base_snapshot: &TableSnapshot, + ) -> Result<()> { let push_down = Some(PushDownInfo { - projection: Some(projection.clone()), - filters: vec![filter.clone()], + projection: Some(projection), + filters, ..PushDownInfo::default() }); @@ -212,43 +285,13 @@ impl FuseTable { index_stats .into_iter() .zip(inner_parts.partitions.into_iter()) - .map(|((a, b), c)| DeletionPartInfo::create(a, b, c)) + .map(|((a, b), c)| MutationPartInfo::create(a, b, c)) .collect(), ); - ctx.try_set_partitions(parts)?; - - let block_reader = self.create_block_reader(projection.clone())?; - let all_col_ids = self.all_the_columns_ids(); - let remain_col_ids: Vec = all_col_ids - .into_iter() - .filter(|id| !col_indices.contains(id)) - .collect(); - let remain_reader = if remain_col_ids.is_empty() { - Arc::new(None) - } else { - Arc::new(Some( - (*self.create_block_reader(Projection::Columns(remain_col_ids))?).clone(), - )) - }; - - let max_threads = ctx.get_settings().get_max_threads()? as usize; - // Add source pipe. - pipeline.add_source( - |output| { - DeletionSource::try_create( - ctx.clone(), - output, - self, - block_reader.clone(), - Arc::new(filter.clone()), - remain_reader.clone(), - ) - }, - max_threads, - ) + ctx.try_set_partitions(parts) } - fn try_add_deletion_transform( + pub fn try_add_mutation_transform( &self, ctx: Arc, base_segments: Vec, @@ -266,7 +309,7 @@ impl FuseTable { inputs_port.push(InputPort::create()); } let output_port = OutputPort::create(); - let processor = DeletionTransform::try_create( + let processor = MutationTransform::try_create( ctx, self.schema(), inputs_port.clone(), diff --git a/src/query/storages/fuse/src/operations/mod.rs b/src/query/storages/fuse/src/operations/mod.rs index 9a1be3b4683d8..7f93420821a50 100644 --- a/src/query/storages/fuse/src/operations/mod.rs +++ b/src/query/storages/fuse/src/operations/mod.rs @@ -26,6 +26,7 @@ mod read_data; mod read_partitions; mod recluster; mod truncate; +mod update; mod fuse_source; mod read; diff --git a/src/query/storages/fuse/src/operations/mutation/compact/compact_transform.rs b/src/query/storages/fuse/src/operations/mutation/compact/compact_transform.rs index 619ea3ff83bae..bc8635229f5cf 100644 --- a/src/query/storages/fuse/src/operations/mutation/compact/compact_transform.rs +++ b/src/query/storages/fuse/src/operations/mutation/compact/compact_transform.rs @@ -46,6 +46,7 @@ use crate::io::TableMetaLocationGenerator; use crate::io::WriteSettings; use crate::metrics::*; use crate::operations::mutation::AbortOperation; +use crate::operations::mutation::SerializeState; use crate::pipelines::processors::port::InputPort; use crate::pipelines::processors::port::OutputPort; use crate::pipelines::processors::processor::Event; @@ -54,13 +55,6 @@ use crate::pipelines::processors::Processor; use crate::statistics::reduce_block_statistics; use crate::statistics::reducers::reduce_block_metas; -struct SerializeState { - block_data: Vec, - block_location: String, - index_data: Option>, - index_location: Option, -} - enum State { Consume, ReadBlocks, diff --git a/src/query/storages/fuse/src/operations/mutation/compact/merge_segments_transform.rs b/src/query/storages/fuse/src/operations/mutation/compact/merge_segments_transform.rs index c36283f7d1843..71090b708d04f 100644 --- a/src/query/storages/fuse/src/operations/mutation/compact/merge_segments_transform.rs +++ b/src/query/storages/fuse/src/operations/mutation/compact/merge_segments_transform.rs @@ -27,7 +27,7 @@ use storages_common_table_meta::meta::Versioned; use crate::operations::mutation::compact::CompactSinkMeta; use crate::operations::mutation::AbortOperation; use crate::operations::mutation::BlockCompactMutator; -use crate::operations::mutation::MutationMeta; +use crate::operations::mutation::MutationSinkMeta; use crate::pipelines::processors::port::InputPort; use crate::pipelines::processors::port::OutputPort; use crate::pipelines::processors::processor::Event; @@ -172,7 +172,7 @@ impl Processor for MergeSegmentsTransform { .sorted_by_key(|&(_, r)| *r) .map(|(l, _)| l) .collect(); - let meta = MutationMeta::create( + let meta = MutationSinkMeta::create( merged_segments, std::mem::take(&mut self.merged_statistics), std::mem::take(&mut self.abort_operation), diff --git a/src/query/storages/fuse/src/operations/mutation/deletion/deletion_meta.rs b/src/query/storages/fuse/src/operations/mutation/deletion/deletion_meta.rs deleted file mode 100644 index 0dc9ec0949d55..0000000000000 --- a/src/query/storages/fuse/src/operations/mutation/deletion/deletion_meta.rs +++ /dev/null @@ -1,73 +0,0 @@ -// Copyright 2022 Datafuse Labs. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::any::Any; -use std::sync::Arc; - -use common_exception::ErrorCode; -use common_exception::Result; -use common_expression::BlockMetaInfo; -use common_expression::BlockMetaInfoPtr; -use storages_common_pruner::BlockMetaIndex; -use storages_common_table_meta::meta::BlockMeta; - -#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)] -pub enum Deletion { - DoNothing, - Replaced(Arc), - Deleted, -} - -#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)] -pub struct DeletionSourceMeta { - pub index: BlockMetaIndex, - pub op: Deletion, -} - -#[typetag::serde(name = "deletion_source_meta")] -impl BlockMetaInfo for DeletionSourceMeta { - fn as_any(&self) -> &dyn Any { - self - } - - fn as_mut_any(&mut self) -> &mut dyn Any { - self - } - - fn clone_self(&self) -> Box { - Box::new(self.clone()) - } - - fn equals(&self, info: &Box) -> bool { - match info.as_any().downcast_ref::() { - None => false, - Some(other) => self == other, - } - } -} - -impl DeletionSourceMeta { - pub fn create(index: BlockMetaIndex, op: Deletion) -> BlockMetaInfoPtr { - Box::new(DeletionSourceMeta { index, op }) - } - - pub fn from_meta(info: &BlockMetaInfoPtr) -> Result<&DeletionSourceMeta> { - match info.as_any().downcast_ref::() { - Some(part_ref) => Ok(part_ref), - None => Err(ErrorCode::Internal( - "Cannot downcast from BlockMetaInfo to DeletionSourceMeta.", - )), - } - } -} diff --git a/src/query/storages/fuse/src/operations/mutation/deletion/deletion_source.rs b/src/query/storages/fuse/src/operations/mutation/deletion/deletion_source.rs deleted file mode 100644 index 2ed98ec6f4a69..0000000000000 --- a/src/query/storages/fuse/src/operations/mutation/deletion/deletion_source.rs +++ /dev/null @@ -1,430 +0,0 @@ -// Copyright 2021 Datafuse Labs. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::any::Any; -use std::ops::Not; -use std::sync::Arc; - -use common_catalog::plan::PartInfoPtr; -use common_catalog::table_context::TableContext; -use common_exception::ErrorCode; -use common_exception::Result; -use common_expression::types::AnyType; -use common_expression::Column; -use common_expression::DataBlock; -use common_expression::DataSchema; -use common_expression::Evaluator; -use common_expression::RemoteExpr; -use common_expression::TableSchemaRef; -use common_expression::TableSchemaRefExt; -use common_expression::Value; -use common_functions::scalars::BUILTIN_FUNCTIONS; -use opendal::Operator; -use storages_common_blocks::blocks_to_parquet; -use storages_common_pruner::BlockMetaIndex; -use storages_common_table_meta::meta::BlockMeta; -use storages_common_table_meta::meta::ClusterStatistics; -use storages_common_table_meta::table::TableCompression; - -use super::deletion_meta::Deletion; -use super::deletion_meta::DeletionSourceMeta; -use super::deletion_part::DeletionPartInfo; -use crate::fuse_part::FusePartInfo; -use crate::io::write_data; -use crate::io::BlockReader; -use crate::io::ReadSettings; -use crate::io::TableMetaLocationGenerator; -use crate::operations::util; -use crate::operations::BloomIndexState; -use crate::pipelines::processors::port::OutputPort; -use crate::pipelines::processors::processor::Event; -use crate::pipelines::processors::processor::ProcessorPtr; -use crate::pipelines::processors::Processor; -use crate::statistics::gen_columns_statistics; -use crate::statistics::ClusterStatsGenerator; -use crate::FuseTable; -use crate::Table; - -type DataChunks = Vec<(usize, Vec)>; - -struct SerializeState { - block_data: Vec, - block_location: String, - index_data: Option>, - index_location: Option, -} - -enum State { - ReadData(Option), - FilterData(PartInfoPtr, DataChunks), - ReadRemain { - part: PartInfoPtr, - data_block: DataBlock, - filter: Value, - }, - MergeRemain { - part: PartInfoPtr, - chunks: DataChunks, - data_block: DataBlock, - filter: Value, - }, - NeedSerialize(DataBlock), - Serialized(SerializeState, Arc), - Generated(Deletion), - Output(Option, DataBlock), - Finish, -} - -pub struct DeletionSource { - state: State, - ctx: Arc, - output: Arc, - location_gen: TableMetaLocationGenerator, - dal: Operator, - block_reader: Arc, - filter: Arc>, - remain_reader: Arc>, - - source_schema: TableSchemaRef, - output_schema: TableSchemaRef, - index: BlockMetaIndex, - cluster_stats_gen: ClusterStatsGenerator, - origin_stats: Option, - table_compression: TableCompression, -} - -impl DeletionSource { - pub fn try_create( - ctx: Arc, - output: Arc, - table: &FuseTable, - block_reader: Arc, - filter: Arc>, - remain_reader: Arc>, - ) -> Result { - let mut srouce_fields = block_reader.schema().fields().clone(); - if let Some(remain_reader) = remain_reader.as_ref() { - srouce_fields.extend_from_slice(remain_reader.schema().fields()); - } - let source_schema = TableSchemaRefExt::create(srouce_fields); - Ok(ProcessorPtr::create(Box::new(DeletionSource { - state: State::ReadData(None), - ctx: ctx.clone(), - output, - location_gen: table.meta_location_generator().clone(), - dal: table.get_operator(), - block_reader, - filter, - remain_reader, - source_schema, - output_schema: table.schema(), - index: BlockMetaIndex { - segment_idx: 0, - block_idx: 0, - }, - cluster_stats_gen: table.cluster_stats_gen(ctx)?, - origin_stats: None, - table_compression: table.table_compression, - }))) - } -} - -#[async_trait::async_trait] -impl Processor for DeletionSource { - fn name(&self) -> String { - "DeletionSource".to_string() - } - - fn as_any(&mut self) -> &mut dyn Any { - self - } - - fn event(&mut self) -> Result { - if matches!(self.state, State::ReadData(None)) { - self.state = match self.ctx.try_get_part() { - None => State::Finish, - Some(part) => State::ReadData(Some(part)), - } - } - - if matches!(self.state, State::Finish) { - self.output.finish(); - return Ok(Event::Finished); - } - - if self.output.is_finished() { - return Ok(Event::Finished); - } - - if !self.output.can_push() { - return Ok(Event::NeedConsume); - } - - if matches!(self.state, State::Output(_, _)) { - if let State::Output(part, data_block) = - std::mem::replace(&mut self.state, State::Finish) - { - self.state = match part { - None => State::Finish, - Some(part) => State::ReadData(Some(part)), - }; - - self.output.push_data(Ok(data_block)); - return Ok(Event::NeedConsume); - } - } - - if matches!( - self.state, - State::ReadData(_) | State::ReadRemain { .. } | State::Serialized(_, _) - ) { - Ok(Event::Async) - } else { - Ok(Event::Sync) - } - } - - fn process(&mut self) -> Result<()> { - match std::mem::replace(&mut self.state, State::Finish) { - State::FilterData(part, chunks) => { - let data_block = self - .block_reader - .deserialize_parquet_chunks(part.clone(), chunks)?; - - let func_ctx = self.ctx.try_get_function_context()?; - let evaluator = Evaluator::new(&data_block, func_ctx, &BUILTIN_FUNCTIONS); - let expr = self - .filter - .as_expr(&BUILTIN_FUNCTIONS) - .unwrap() - .project_column_ref(|name| self.source_schema.index_of(name).unwrap()); - let res = evaluator.run(&expr).map_err(|(_, e)| { - ErrorCode::Internal(format!("eval try eval const failed: {}.", e)) - })?; - let predicates = DataBlock::cast_to_nonull_boolean(&res).ok_or_else(|| { - ErrorCode::BadArguments( - "Result of filter expression cannot be converted to boolean.", - ) - })?; - - let predicate_col = predicates.into_column().unwrap(); - let filter = Value::Column(Column::Boolean(predicate_col.not())); - if !DataBlock::filter_exists(&filter)? { - // all the rows should be removed. - self.state = State::Generated(Deletion::Deleted); - } else { - let num_rows = data_block.num_rows(); - let data_block = data_block.filter(&filter)?; - if data_block.num_rows() == num_rows { - // none of the rows should be removed. - self.state = State::Generated(Deletion::DoNothing); - } else if self.remain_reader.is_none() { - let src_schema = self.block_reader.data_schema(); - let dest_schema = self.output_schema.clone().into(); - let block = data_block.resort(&src_schema, &dest_schema)?; - self.state = State::NeedSerialize(block); - } else { - self.state = State::ReadRemain { - part, - data_block, - filter, - } - } - } - } - State::MergeRemain { - part, - chunks, - mut data_block, - filter, - } => { - let mut fields = self.block_reader.data_fields(); - let merged = if chunks.is_empty() { - data_block - } else if let Some(remain_reader) = self.remain_reader.as_ref() { - let mut remain_fields = remain_reader.data_fields(); - fields.append(&mut remain_fields); - let remain_block = remain_reader.deserialize_parquet_chunks(part, chunks)?; - let remain_block = remain_block.filter(&filter)?; - for col in remain_block.columns() { - data_block.add_column(col.clone()); - } - data_block - } else { - return Err(ErrorCode::Internal("It's a bug. Need remain reader")); - }; - - let src_schema = DataSchema::new(fields); - let dest_schema = self.output_schema.clone().into(); - let block = merged.resort(&src_schema, &dest_schema)?; - self.state = State::NeedSerialize(block); - } - State::NeedSerialize(block) => { - let cluster_stats = self - .cluster_stats_gen - .gen_with_origin_stats(&block, std::mem::take(&mut self.origin_stats))?; - - let row_count = block.num_rows() as u64; - let block_size = block.memory_size() as u64; - let (block_location, block_id) = self.location_gen.gen_block_location(); - - // build block index. - let location = self.location_gen.block_bloom_index_location(&block_id); - let bloom_index_state = BloomIndexState::try_create( - self.ctx.clone(), - self.source_schema.clone(), - &block, - location, - )?; - let column_distinct_count = bloom_index_state - .as_ref() - .map(|i| i.column_distinct_count.clone()); - let col_stats = gen_columns_statistics(&block, column_distinct_count)?; - - // serialize data block. - let mut block_data = Vec::with_capacity(100 * 1024 * 1024); - let schema = self.source_schema.clone(); - let (file_size, meta_data) = blocks_to_parquet( - &schema, - vec![block], - &mut block_data, - self.table_compression, - )?; - let col_metas = util::column_metas(&meta_data)?; - - let (index_data, index_location, index_size) = - if let Some(bloom_index_state) = bloom_index_state { - ( - Some(bloom_index_state.data.clone()), - Some(bloom_index_state.location.clone()), - bloom_index_state.size, - ) - } else { - (None, None, 0u64) - }; - - // new block meta. - let new_meta = Arc::new(BlockMeta::new( - row_count, - block_size, - file_size, - col_stats, - col_metas, - cluster_stats, - block_location.clone(), - index_location.clone(), - index_size, - self.table_compression.into(), - )); - - self.state = State::Serialized( - SerializeState { - block_data, - block_location: block_location.0, - index_data, - index_location: index_location.map(|l| l.0), - }, - new_meta, - ); - } - State::Generated(op) => { - let meta = DeletionSourceMeta::create(self.index.clone(), op); - let new_part = self.ctx.try_get_part(); - self.state = State::Output(new_part, DataBlock::empty_with_meta(meta)); - } - _ => return Err(ErrorCode::Internal("It's a bug.")), - } - Ok(()) - } - - async fn async_process(&mut self) -> Result<()> { - match std::mem::replace(&mut self.state, State::Finish) { - State::ReadData(Some(part)) => { - let settings = ReadSettings::from_ctx(&self.ctx)?; - let deletion_part = DeletionPartInfo::from_part(&part)?; - self.index = deletion_part.index.clone(); - self.origin_stats = deletion_part.cluster_stats.clone(); - let part = deletion_part.inner_part.clone(); - let fuse_part = FusePartInfo::from_part(&part)?; - - let read_res = self - .block_reader - .read_columns_data_by_merge_io( - &settings, - &fuse_part.location, - &fuse_part.columns_meta, - ) - .await?; - let chunks = read_res - .columns_chunks()? - .into_iter() - .map(|(column_idx, column_chunk)| (column_idx, column_chunk.to_vec())) - .collect::>(); - - self.state = State::FilterData(part, chunks); - } - State::ReadRemain { - part, - data_block, - filter, - } => { - if let Some(remain_reader) = self.remain_reader.as_ref() { - let fuse_part = FusePartInfo::from_part(&part)?; - - let settings = ReadSettings::from_ctx(&self.ctx)?; - let read_res = remain_reader - .read_columns_data_by_merge_io( - &settings, - &fuse_part.location, - &fuse_part.columns_meta, - ) - .await?; - let chunks = read_res - .columns_chunks()? - .into_iter() - .map(|(column_idx, column_chunk)| (column_idx, column_chunk.to_vec())) - .collect::>(); - - self.state = State::MergeRemain { - part, - chunks, - data_block, - filter, - }; - } else { - return Err(ErrorCode::Internal("It's a bug. No remain reader")); - } - } - State::Serialized(serialize_state, block_meta) => { - // write block data. - write_data( - &serialize_state.block_data, - &self.dal, - &serialize_state.block_location, - ) - .await?; - // write index data. - if let (Some(index_data), Some(index_location)) = - (serialize_state.index_data, serialize_state.index_location) - { - write_data(&index_data, &self.dal, &index_location).await?; - } - - self.state = State::Generated(Deletion::Replaced(block_meta)); - } - _ => return Err(ErrorCode::Internal("It's a bug.")), - } - Ok(()) - } -} diff --git a/src/query/storages/fuse/src/operations/mutation/deletion/mod.rs b/src/query/storages/fuse/src/operations/mutation/deletion/mod.rs deleted file mode 100644 index c6c06846a3f71..0000000000000 --- a/src/query/storages/fuse/src/operations/mutation/deletion/mod.rs +++ /dev/null @@ -1,23 +0,0 @@ -// Copyright 2022 Datafuse Labs. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -mod deletion_meta; -mod deletion_part; -mod deletion_source; -mod deletion_transform; - -pub use deletion_meta::Deletion; -pub use deletion_part::DeletionPartInfo; -pub use deletion_source::DeletionSource; -pub use deletion_transform::DeletionTransform; diff --git a/src/query/storages/fuse/src/operations/mutation/mod.rs b/src/query/storages/fuse/src/operations/mutation/mod.rs index d127ac1d6587d..4e541a7d72613 100644 --- a/src/query/storages/fuse/src/operations/mutation/mod.rs +++ b/src/query/storages/fuse/src/operations/mutation/mod.rs @@ -15,10 +15,13 @@ pub mod abort_operation; pub mod base_mutator; mod compact; -mod deletion; pub mod mutation_meta; +mod mutation_part; pub mod mutation_sink; +mod mutation_source; +mod mutation_transform; pub mod recluster_mutator; +mod serialize_data_transform; pub use abort_operation::AbortOperation; pub use base_mutator::BaseMutator; @@ -29,9 +32,15 @@ pub use compact::MergeSegmentsTransform; pub use compact::SegmentCompactMutator; pub use compact::SegmentCompactionState; pub use compact::SegmentCompactor; -pub use deletion::DeletionPartInfo; -pub use deletion::DeletionSource; -pub use deletion::DeletionTransform; -pub use mutation_meta::MutationMeta; +pub use mutation_meta::Mutation; +pub use mutation_meta::MutationSinkMeta; +pub use mutation_meta::MutationTransformMeta; +pub use mutation_meta::SerializeDataMeta; +pub use mutation_part::MutationPartInfo; pub use mutation_sink::MutationSink; +pub use mutation_source::MutationAction; +pub use mutation_source::MutationSource; +pub use mutation_transform::MutationTransform; pub use recluster_mutator::ReclusterMutator; +pub use serialize_data_transform::SerializeDataTransform; +pub use serialize_data_transform::SerializeState; diff --git a/src/query/storages/fuse/src/operations/mutation/mutation_meta.rs b/src/query/storages/fuse/src/operations/mutation/mutation_meta.rs index 48b2a03f38efd..f079cb816e3c2 100644 --- a/src/query/storages/fuse/src/operations/mutation/mutation_meta.rs +++ b/src/query/storages/fuse/src/operations/mutation/mutation_meta.rs @@ -13,25 +13,128 @@ // limitations under the License. use std::any::Any; +use std::sync::Arc; use common_exception::ErrorCode; use common_exception::Result; use common_expression::BlockMetaInfo; use common_expression::BlockMetaInfoPtr; +use storages_common_pruner::BlockMetaIndex; +use storages_common_table_meta::meta::BlockMeta; +use storages_common_table_meta::meta::ClusterStatistics; use storages_common_table_meta::meta::Location; use storages_common_table_meta::meta::Statistics; use crate::operations::mutation::AbortOperation; #[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)] -pub struct MutationMeta { +pub struct SerializeDataMeta { + pub index: BlockMetaIndex, + pub cluster_stats: Option, +} + +#[typetag::serde(name = "serialize_data_meta")] +impl BlockMetaInfo for SerializeDataMeta { + fn as_any(&self) -> &dyn Any { + self + } + + fn as_mut_any(&mut self) -> &mut dyn Any { + self + } + + fn clone_self(&self) -> Box { + Box::new(self.clone()) + } + + fn equals(&self, info: &Box) -> bool { + match info.as_any().downcast_ref::() { + None => false, + Some(other) => self == other, + } + } +} + +impl SerializeDataMeta { + pub fn create( + index: BlockMetaIndex, + cluster_stats: Option, + ) -> BlockMetaInfoPtr { + Box::new(SerializeDataMeta { + index, + cluster_stats, + }) + } + + pub fn from_meta(info: &BlockMetaInfoPtr) -> Result<&SerializeDataMeta> { + match info.as_any().downcast_ref::() { + Some(part_ref) => Ok(part_ref), + None => Err(ErrorCode::Internal( + "Cannot downcast from BlockMetaInfo to SerializeDataMeta.", + )), + } + } +} + +#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)] +pub enum Mutation { + DoNothing, + Replaced(Arc), + Deleted, +} + +#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)] +pub struct MutationTransformMeta { + pub index: BlockMetaIndex, + pub op: Mutation, +} + +#[typetag::serde(name = "mutation_transform_meta")] +impl BlockMetaInfo for MutationTransformMeta { + fn as_any(&self) -> &dyn Any { + self + } + + fn as_mut_any(&mut self) -> &mut dyn Any { + self + } + + fn clone_self(&self) -> Box { + Box::new(self.clone()) + } + + fn equals(&self, info: &Box) -> bool { + match info.as_any().downcast_ref::() { + None => false, + Some(other) => self == other, + } + } +} + +impl MutationTransformMeta { + pub fn create(index: BlockMetaIndex, op: Mutation) -> BlockMetaInfoPtr { + Box::new(MutationTransformMeta { index, op }) + } + + pub fn from_meta(info: &BlockMetaInfoPtr) -> Result<&MutationTransformMeta> { + match info.as_any().downcast_ref::() { + Some(part_ref) => Ok(part_ref), + None => Err(ErrorCode::Internal( + "Cannot downcast from BlockMetaInfo to MutationTransformMeta.", + )), + } + } +} + +#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)] +pub struct MutationSinkMeta { pub segments: Vec, pub summary: Statistics, pub abort_operation: AbortOperation, } -#[typetag::serde(name = "mutation_meta")] -impl BlockMetaInfo for MutationMeta { +#[typetag::serde(name = "mutation_sink_meta")] +impl BlockMetaInfo for MutationSinkMeta { fn as_any(&self) -> &dyn Any { self } @@ -45,31 +148,31 @@ impl BlockMetaInfo for MutationMeta { } fn equals(&self, info: &Box) -> bool { - match info.as_any().downcast_ref::() { + match info.as_any().downcast_ref::() { None => false, Some(other) => self == other, } } } -impl MutationMeta { +impl MutationSinkMeta { pub fn create( segments: Vec, summary: Statistics, abort_operation: AbortOperation, ) -> BlockMetaInfoPtr { - Box::new(MutationMeta { + Box::new(MutationSinkMeta { segments, summary, abort_operation, }) } - pub fn from_meta(info: &BlockMetaInfoPtr) -> Result<&MutationMeta> { - match info.as_any().downcast_ref::() { + pub fn from_meta(info: &BlockMetaInfoPtr) -> Result<&MutationSinkMeta> { + match info.as_any().downcast_ref::() { Some(part_ref) => Ok(part_ref), None => Err(ErrorCode::Internal( - "Cannot downcast from ChunkMetaInfo to MutationMeta.", + "Cannot downcast from BlockMetaInfo to MutationSinkMeta.", )), } } diff --git a/src/query/storages/fuse/src/operations/mutation/deletion/deletion_part.rs b/src/query/storages/fuse/src/operations/mutation/mutation_part.rs similarity index 79% rename from src/query/storages/fuse/src/operations/mutation/deletion/deletion_part.rs rename to src/query/storages/fuse/src/operations/mutation/mutation_part.rs index 0f284b9341a6f..2f9aff13e622a 100644 --- a/src/query/storages/fuse/src/operations/mutation/deletion/deletion_part.rs +++ b/src/query/storages/fuse/src/operations/mutation/mutation_part.rs @@ -23,20 +23,20 @@ use storages_common_pruner::BlockMetaIndex; use storages_common_table_meta::meta::ClusterStatistics; #[derive(serde::Serialize, serde::Deserialize, PartialEq)] -pub struct DeletionPartInfo { +pub struct MutationPartInfo { pub index: BlockMetaIndex, pub cluster_stats: Option, pub inner_part: PartInfoPtr, } -#[typetag::serde(name = "deletion")] -impl PartInfo for DeletionPartInfo { +#[typetag::serde(name = "mutation")] +impl PartInfo for MutationPartInfo { fn as_any(&self) -> &dyn Any { self } fn equals(&self, info: &Box) -> bool { - match info.as_any().downcast_ref::() { + match info.as_any().downcast_ref::() { None => false, Some(other) => self == other, } @@ -47,24 +47,24 @@ impl PartInfo for DeletionPartInfo { } } -impl DeletionPartInfo { +impl MutationPartInfo { pub fn create( index: BlockMetaIndex, cluster_stats: Option, inner_part: PartInfoPtr, ) -> PartInfoPtr { - Arc::new(Box::new(DeletionPartInfo { + Arc::new(Box::new(MutationPartInfo { index, cluster_stats, inner_part, })) } - pub fn from_part(info: &PartInfoPtr) -> Result<&DeletionPartInfo> { - match info.as_any().downcast_ref::() { + pub fn from_part(info: &PartInfoPtr) -> Result<&MutationPartInfo> { + match info.as_any().downcast_ref::() { Some(part_ref) => Ok(part_ref), None => Err(ErrorCode::Internal( - "Cannot downcast from PartInfo to DeletionPartInfo.", + "Cannot downcast from PartInfo to MutationPartInfo.", )), } } diff --git a/src/query/storages/fuse/src/operations/mutation/mutation_sink.rs b/src/query/storages/fuse/src/operations/mutation/mutation_sink.rs index f3066f9a659d6..ca2fb511212a0 100644 --- a/src/query/storages/fuse/src/operations/mutation/mutation_sink.rs +++ b/src/query/storages/fuse/src/operations/mutation/mutation_sink.rs @@ -15,8 +15,6 @@ use std::any::Any; use std::sync::Arc; -use common_base::base::Progress; -use common_base::base::ProgressValues; use common_catalog::table::Table; use common_catalog::table::TableExt; use common_catalog::table_context::TableContext; @@ -37,7 +35,7 @@ use crate::metrics::metrics_inc_commit_mutation_unresolvable_conflict; use crate::operations::commit::Conflict; use crate::operations::commit::MutatorConflictDetector; use crate::operations::mutation::AbortOperation; -use crate::operations::mutation::MutationMeta; +use crate::operations::mutation::MutationSinkMeta; use crate::pipelines::processors::port::InputPort; use crate::pipelines::processors::processor::Event; use crate::pipelines::processors::processor::ProcessorPtr; @@ -65,7 +63,6 @@ pub struct MutationSink { ctx: Arc, dal: Operator, location_gen: TableMetaLocationGenerator, - scan_progress: Arc, table: Arc, base_snapshot: Arc, @@ -87,13 +84,11 @@ impl MutationSink { base_snapshot: Arc, input: Arc, ) -> Result { - let scan_progress = ctx.get_scan_progress(); Ok(ProcessorPtr::create(Box::new(MutationSink { state: State::None, ctx, dal: table.get_operator(), location_gen: table.meta_location_generator.clone(), - scan_progress, table: Arc::new(table.clone()), base_snapshot, merged_segments: vec![], @@ -158,23 +153,7 @@ impl Processor for MutationSink { fn process(&mut self) -> Result<()> { match std::mem::replace(&mut self.state, State::None) { State::ReadMeta(input_meta) => { - let meta = MutationMeta::from_meta(&input_meta)?; - - let affect_rows = self - .base_snapshot - .summary - .row_count - .abs_diff(meta.summary.row_count); - let affect_bytes = self - .base_snapshot - .summary - .uncompressed_byte_size - .abs_diff(meta.summary.uncompressed_byte_size); - let progress_values = ProgressValues { - rows: affect_rows as usize, - bytes: affect_bytes as usize, - }; - self.scan_progress.incr(&progress_values); + let meta = MutationSinkMeta::from_meta(&input_meta)?; self.merged_segments = meta.segments.clone(); self.merged_statistics = meta.summary.clone(); diff --git a/src/query/storages/fuse/src/operations/mutation/mutation_source.rs b/src/query/storages/fuse/src/operations/mutation/mutation_source.rs new file mode 100644 index 0000000000000..cbbb20f5c9c9e --- /dev/null +++ b/src/query/storages/fuse/src/operations/mutation/mutation_source.rs @@ -0,0 +1,374 @@ +// Copyright 2023 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::ops::Not; +use std::sync::Arc; + +use common_base::base::Progress; +use common_base::base::ProgressValues; +use common_catalog::plan::PartInfoPtr; +use common_catalog::table_context::TableContext; +use common_exception::ErrorCode; +use common_exception::Result; +use common_expression::types::AnyType; +use common_expression::types::DataType; +use common_expression::BlockEntry; +use common_expression::Column; +use common_expression::DataBlock; +use common_expression::Evaluator; +use common_expression::Expr; +use common_expression::Value; +use common_functions::scalars::BUILTIN_FUNCTIONS; +use common_sql::evaluator::BlockOperator; +use storages_common_pruner::BlockMetaIndex; +use storages_common_table_meta::meta::ClusterStatistics; + +use crate::fuse_part::FusePartInfo; +use crate::io::BlockReader; +use crate::io::ReadSettings; +use crate::operations::mutation::MutationPartInfo; +use crate::operations::mutation::SerializeDataMeta; +use crate::pipelines::processors::port::OutputPort; +use crate::pipelines::processors::processor::Event; +use crate::pipelines::processors::processor::ProcessorPtr; +use crate::pipelines::processors::Processor; + +type DataChunks = Vec<(usize, Vec)>; + +pub enum MutationAction { + Deletion, + Update, +} + +enum State { + ReadData(Option), + FilterData(PartInfoPtr, DataChunks), + ReadRemain { + part: PartInfoPtr, + data_block: DataBlock, + filter: Value, + }, + MergeRemain { + part: PartInfoPtr, + chunks: DataChunks, + data_block: DataBlock, + filter: Value, + }, + PerformOperator(DataBlock), + Output(Option, DataBlock), + Finish, +} + +pub struct MutationSource { + state: State, + output: Arc, + scan_progress: Arc, + + ctx: Arc, + filter: Arc>, + block_reader: Arc, + remain_reader: Arc>, + operators: Vec, + action: MutationAction, + + index: BlockMetaIndex, + origin_stats: Option, +} + +impl MutationSource { + #![allow(clippy::too_many_arguments)] + pub fn try_create( + ctx: Arc, + action: MutationAction, + output: Arc, + filter: Arc>, + block_reader: Arc, + remain_reader: Arc>, + operators: Vec, + ) -> Result { + let scan_progress = ctx.get_scan_progress(); + Ok(ProcessorPtr::create(Box::new(MutationSource { + state: State::ReadData(None), + output, + scan_progress, + ctx: ctx.clone(), + filter, + block_reader, + remain_reader, + operators, + action, + index: BlockMetaIndex::default(), + origin_stats: None, + }))) + } +} + +#[async_trait::async_trait] +impl Processor for MutationSource { + fn name(&self) -> String { + "MutationSource".to_string() + } + + fn as_any(&mut self) -> &mut dyn Any { + self + } + + fn event(&mut self) -> Result { + if matches!(self.state, State::ReadData(None)) { + self.state = match self.ctx.try_get_part() { + None => State::Finish, + Some(part) => State::ReadData(Some(part)), + } + } + + if matches!(self.state, State::Finish) { + self.output.finish(); + return Ok(Event::Finished); + } + + if self.output.is_finished() { + return Ok(Event::Finished); + } + + if !self.output.can_push() { + return Ok(Event::NeedConsume); + } + + if matches!(self.state, State::Output(_, _)) { + if let State::Output(part, data_block) = + std::mem::replace(&mut self.state, State::Finish) + { + self.state = match part { + None => State::Finish, + Some(part) => State::ReadData(Some(part)), + }; + + self.output.push_data(Ok(data_block)); + return Ok(Event::NeedConsume); + } + } + + if matches!(self.state, State::ReadData(_) | State::ReadRemain { .. }) { + Ok(Event::Async) + } else { + Ok(Event::Sync) + } + } + + fn process(&mut self) -> Result<()> { + match std::mem::replace(&mut self.state, State::Finish) { + State::FilterData(part, chunks) => { + let mut data_block = self + .block_reader + .deserialize_parquet_chunks(part.clone(), chunks)?; + let num_rows = data_block.num_rows(); + + if let Some(filter) = self.filter.as_ref() { + let func_ctx = self.ctx.try_get_function_context()?; + let evaluator = Evaluator::new(&data_block, func_ctx, &BUILTIN_FUNCTIONS); + + let res = evaluator.run(filter).map_err(|(_, e)| { + ErrorCode::Internal(format!("eval filter failed: {}.", e)) + })?; + let predicates = DataBlock::cast_to_nonull_boolean(&res).ok_or_else(|| { + ErrorCode::BadArguments( + "Result of filter expression cannot be converted to boolean.", + ) + })?; + + let affect_rows = match &predicates { + Value::Scalar(v) => { + if *v { + num_rows + } else { + 0 + } + } + Value::Column(bitmap) => bitmap.len() - bitmap.unset_bits(), + }; + + if affect_rows != 0 { + let progress_values = ProgressValues { + rows: affect_rows, + bytes: 0, + }; + self.scan_progress.incr(&progress_values); + + match self.action { + MutationAction::Deletion => { + if affect_rows == num_rows { + // all the rows should be removed. + let meta = SerializeDataMeta::create( + self.index.clone(), + self.origin_stats.clone(), + ); + self.state = State::Output( + self.ctx.try_get_part(), + DataBlock::empty_with_meta(meta), + ); + } else { + let predicate_col = predicates.into_column().unwrap(); + let filter = + Value::Column(Column::Boolean(predicate_col.not())); + data_block = data_block.filter(&filter)?; + if self.remain_reader.is_none() { + self.state = State::PerformOperator(data_block); + } else { + self.state = State::ReadRemain { + part, + data_block, + filter, + } + } + } + } + MutationAction::Update => { + let filter = Value::upcast(predicates); + if self.remain_reader.is_none() { + data_block.add_column(BlockEntry { + data_type: DataType::Boolean, + value: filter, + }); + self.state = State::PerformOperator(data_block); + } else { + self.state = State::ReadRemain { + part, + data_block, + filter, + }; + } + } + } + } else { + // Do nothing. + self.state = State::Output(self.ctx.try_get_part(), DataBlock::empty()); + } + } else { + let progress_values = ProgressValues { + rows: num_rows, + bytes: 0, + }; + self.scan_progress.incr(&progress_values); + self.state = State::PerformOperator(data_block); + } + } + State::MergeRemain { + part, + chunks, + mut data_block, + filter, + } => { + if let Some(remain_reader) = self.remain_reader.as_ref() { + let remain_block = remain_reader.deserialize_parquet_chunks(part, chunks)?; + + match self.action { + MutationAction::Deletion => { + let remain_block = remain_block.filter(&filter)?; + for col in remain_block.columns() { + data_block.add_column(col.clone()); + } + } + MutationAction::Update => { + for col in remain_block.columns() { + data_block.add_column(col.clone()); + } + data_block.add_column(BlockEntry { + data_type: DataType::Boolean, + value: filter, + }); + } + } + } else { + return Err(ErrorCode::Internal("It's a bug. Need remain reader")); + }; + + self.state = State::PerformOperator(data_block); + } + State::PerformOperator(data_block) => { + let func_ctx = self.ctx.try_get_function_context()?; + let block = self + .operators + .iter() + .try_fold(data_block, |input, op| op.execute(&func_ctx, input))?; + let meta = SerializeDataMeta::create(self.index.clone(), self.origin_stats.clone()); + self.state = State::Output(self.ctx.try_get_part(), block.add_meta(Some(meta))?); + } + _ => return Err(ErrorCode::Internal("It's a bug.")), + } + Ok(()) + } + + async fn async_process(&mut self) -> Result<()> { + match std::mem::replace(&mut self.state, State::Finish) { + State::ReadData(Some(part)) => { + let settings = ReadSettings::from_ctx(&self.ctx)?; + let part = MutationPartInfo::from_part(&part)?; + self.index = part.index.clone(); + self.origin_stats = part.cluster_stats.clone(); + let inner_part = part.inner_part.clone(); + let fuse_part = FusePartInfo::from_part(&inner_part)?; + + let read_res = self + .block_reader + .read_columns_data_by_merge_io( + &settings, + &fuse_part.location, + &fuse_part.columns_meta, + ) + .await?; + let chunks = read_res + .columns_chunks()? + .into_iter() + .map(|(column_idx, column_chunk)| (column_idx, column_chunk.to_vec())) + .collect::>(); + self.state = State::FilterData(inner_part, chunks); + } + State::ReadRemain { + part, + data_block, + filter, + } => { + if let Some(remain_reader) = self.remain_reader.as_ref() { + let fuse_part = FusePartInfo::from_part(&part)?; + + let settings = ReadSettings::from_ctx(&self.ctx)?; + let read_res = remain_reader + .read_columns_data_by_merge_io( + &settings, + &fuse_part.location, + &fuse_part.columns_meta, + ) + .await?; + let chunks = read_res + .columns_chunks()? + .into_iter() + .map(|(column_idx, column_chunk)| (column_idx, column_chunk.to_vec())) + .collect::>(); + + self.state = State::MergeRemain { + part, + chunks, + data_block, + filter, + }; + } else { + return Err(ErrorCode::Internal("It's a bug. No remain reader")); + } + } + _ => return Err(ErrorCode::Internal("It's a bug.")), + } + Ok(()) + } +} diff --git a/src/query/storages/fuse/src/operations/mutation/deletion/deletion_transform.rs b/src/query/storages/fuse/src/operations/mutation/mutation_transform.rs similarity index 93% rename from src/query/storages/fuse/src/operations/mutation/deletion/deletion_transform.rs rename to src/query/storages/fuse/src/operations/mutation/mutation_transform.rs index 3ceaf7a53fc34..69ddfe5632f39 100644 --- a/src/query/storages/fuse/src/operations/mutation/deletion/deletion_transform.rs +++ b/src/query/storages/fuse/src/operations/mutation/mutation_transform.rs @@ -34,10 +34,10 @@ use storages_common_table_meta::meta::Statistics; use crate::io::try_join_futures_with_vec; use crate::io::SegmentsIO; use crate::io::TableMetaLocationGenerator; -use crate::operations::mutation::deletion::deletion_meta::DeletionSourceMeta; -use crate::operations::mutation::deletion::Deletion; use crate::operations::mutation::AbortOperation; -use crate::operations::mutation::MutationMeta; +use crate::operations::mutation::Mutation; +use crate::operations::mutation::MutationSinkMeta; +use crate::operations::mutation::MutationTransformMeta; use crate::pipelines::processors::port::InputPort; use crate::pipelines::processors::port::OutputPort; use crate::pipelines::processors::processor::Event; @@ -46,7 +46,7 @@ use crate::pipelines::processors::Processor; use crate::statistics::reducers::merge_statistics_mut; use crate::statistics::reducers::reduce_block_metas; -type DeletionMap = HashMap)>, Vec)>; +type MutationMap = HashMap)>, Vec)>; struct SerializedData { data: Vec, @@ -70,7 +70,7 @@ enum State { }, } -pub struct DeletionTransform { +pub struct MutationTransform { state: State, ctx: Arc, schema: TableSchemaRef, @@ -82,13 +82,13 @@ pub struct DeletionTransform { abort_operation: AbortOperation, inputs: Vec>, - input_metas: DeletionMap, + input_metas: MutationMap, cur_input_index: usize, output: Arc, output_data: Option, } -impl DeletionTransform { +impl MutationTransform { #[allow(clippy::too_many_arguments)] pub fn try_create( ctx: Arc, @@ -100,7 +100,7 @@ impl DeletionTransform { base_segments: Vec, thresholds: BlockCompactThresholds, ) -> Result { - Ok(ProcessorPtr::create(Box::new(DeletionTransform { + Ok(ProcessorPtr::create(Box::new(MutationTransform { state: State::None, ctx, schema, @@ -165,7 +165,7 @@ impl DeletionTransform { try_join_futures_with_vec( self.ctx.clone(), handles, - "deletion-write-segments-worker".to_owned(), + "mutation-write-segments-worker".to_owned(), ) .await? .into_iter() @@ -175,9 +175,9 @@ impl DeletionTransform { } #[async_trait::async_trait] -impl Processor for DeletionTransform { +impl Processor for MutationTransform { fn name(&self) -> String { - "DeletionTransform".to_string() + "MutationTransform".to_string() } fn as_any(&mut self) -> &mut dyn Any { @@ -233,22 +233,22 @@ impl Processor for DeletionTransform { .get_meta() .cloned() .ok_or_else(|| ErrorCode::Internal("No block meta. It's a bug"))?; - let meta = DeletionSourceMeta::from_meta(&input_meta)?; + let meta = MutationTransformMeta::from_meta(&input_meta)?; match &meta.op { - Deletion::Replaced(block_meta) => { + Mutation::Replaced(block_meta) => { self.input_metas .entry(meta.index.segment_idx) .and_modify(|v| v.0.push((meta.index.block_idx, block_meta.clone()))) .or_insert((vec![(meta.index.block_idx, block_meta.clone())], vec![])); self.abort_operation.add_block(block_meta); } - Deletion::Deleted => { + Mutation::Deleted => { self.input_metas .entry(meta.index.segment_idx) .and_modify(|v| v.1.push(meta.index.block_idx)) .or_insert((vec![], vec![meta.index.block_idx])); } - Deletion::DoNothing => (), + Mutation::DoNothing => (), } } State::GenerateSegments(segment_infos) => { @@ -310,7 +310,7 @@ impl Processor for DeletionTransform { }; } State::Output { segments, summary } => { - let meta = MutationMeta::create( + let meta = MutationSinkMeta::create( segments, summary, std::mem::take(&mut self.abort_operation), diff --git a/src/query/storages/fuse/src/operations/mutation/serialize_data_transform.rs b/src/query/storages/fuse/src/operations/mutation/serialize_data_transform.rs new file mode 100644 index 0000000000000..b453d18d32fde --- /dev/null +++ b/src/query/storages/fuse/src/operations/mutation/serialize_data_transform.rs @@ -0,0 +1,264 @@ +// Copyright 2023 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::sync::Arc; + +use common_catalog::table::Table; +use common_catalog::table_context::TableContext; +use common_exception::ErrorCode; +use common_exception::Result; +use common_expression::DataBlock; +use common_expression::TableSchemaRef; +use common_pipeline_core::processors::port::InputPort; +use common_pipeline_core::processors::processor::ProcessorPtr; +use opendal::Operator; +use storages_common_blocks::blocks_to_parquet; +use storages_common_pruner::BlockMetaIndex; +use storages_common_table_meta::meta::BlockMeta; +use storages_common_table_meta::meta::ClusterStatistics; +use storages_common_table_meta::table::TableCompression; + +use crate::io::write_data; +use crate::io::TableMetaLocationGenerator; +use crate::operations::mutation::Mutation; +use crate::operations::mutation::MutationTransformMeta; +use crate::operations::mutation::SerializeDataMeta; +use crate::operations::util; +use crate::operations::BloomIndexState; +use crate::pipelines::processors::port::OutputPort; +use crate::pipelines::processors::processor::Event; +use crate::pipelines::processors::Processor; +use crate::statistics::gen_columns_statistics; +use crate::statistics::ClusterStatsGenerator; +use crate::FuseTable; + +pub struct SerializeState { + pub block_data: Vec, + pub block_location: String, + pub index_data: Option>, + pub index_location: Option, +} + +enum State { + Consume, + NeedSerialize(DataBlock), + Serialized(SerializeState, Arc), + Output(Mutation), +} + +pub struct SerializeDataTransform { + state: State, + ctx: Arc, + input: Arc, + output: Arc, + output_data: Option, + + location_gen: TableMetaLocationGenerator, + dal: Operator, + cluster_stats_gen: ClusterStatsGenerator, + + schema: TableSchemaRef, + index: BlockMetaIndex, + origin_stats: Option, + table_compression: TableCompression, +} + +impl SerializeDataTransform { + pub fn try_create( + ctx: Arc, + input: Arc, + output: Arc, + table: &FuseTable, + cluster_stats_gen: ClusterStatsGenerator, + ) -> Result { + Ok(ProcessorPtr::create(Box::new(SerializeDataTransform { + state: State::Consume, + ctx: ctx.clone(), + input, + output, + output_data: None, + location_gen: table.meta_location_generator().clone(), + dal: table.get_operator(), + cluster_stats_gen, + schema: table.schema(), + index: BlockMetaIndex::default(), + origin_stats: None, + table_compression: table.table_compression, + }))) + } +} + +#[async_trait::async_trait] +impl Processor for SerializeDataTransform { + fn name(&self) -> String { + "SerializeDataTransform".to_string() + } + + fn as_any(&mut self) -> &mut dyn Any { + self + } + + fn event(&mut self) -> Result { + if matches!(self.state, State::NeedSerialize(_) | State::Output(_)) { + return Ok(Event::Sync); + } + + if matches!(self.state, State::Serialized(_, _)) { + return Ok(Event::Async); + } + + if self.output.is_finished() { + return Ok(Event::Finished); + } + + if !self.output.can_push() { + return Ok(Event::NeedConsume); + } + + if let Some(data_block) = self.output_data.take() { + self.output.push_data(Ok(data_block)); + return Ok(Event::NeedConsume); + } + + if self.input.is_finished() { + self.output.finish(); + return Ok(Event::Finished); + } + + if !self.input.has_data() { + self.input.set_need_data(); + return Ok(Event::NeedData); + } + + let mut input_data = self.input.pull_data().unwrap()?; + let meta = input_data.take_meta(); + if let Some(meta) = meta { + let meta = SerializeDataMeta::from_meta(&meta)?; + self.index = meta.index.clone(); + self.origin_stats = meta.cluster_stats.clone(); + if input_data.is_empty() { + self.state = State::Output(Mutation::Deleted); + } else { + self.state = State::NeedSerialize(input_data); + } + } else { + self.state = State::Output(Mutation::DoNothing); + } + Ok(Event::Sync) + } + + fn process(&mut self) -> Result<()> { + match std::mem::replace(&mut self.state, State::Consume) { + State::NeedSerialize(block) => { + let cluster_stats = self + .cluster_stats_gen + .gen_with_origin_stats(&block, std::mem::take(&mut self.origin_stats))?; + + let row_count = block.num_rows() as u64; + let block_size = block.memory_size() as u64; + let (block_location, block_id) = self.location_gen.gen_block_location(); + + // build block index. + let location = self.location_gen.block_bloom_index_location(&block_id); + let bloom_index_state = BloomIndexState::try_create( + self.ctx.clone(), + self.schema.clone(), + &block, + location, + )?; + let column_distinct_count = bloom_index_state + .as_ref() + .map(|i| i.column_distinct_count.clone()); + let col_stats = gen_columns_statistics(&block, column_distinct_count)?; + + // serialize data block. + let mut block_data = Vec::with_capacity(100 * 1024 * 1024); + let schema = self.schema.clone(); + let (file_size, meta_data) = blocks_to_parquet( + &schema, + vec![block], + &mut block_data, + self.table_compression, + )?; + let col_metas = util::column_metas(&meta_data)?; + + let (index_data, index_location, index_size) = + if let Some(bloom_index_state) = bloom_index_state { + ( + Some(bloom_index_state.data.clone()), + Some(bloom_index_state.location.clone()), + bloom_index_state.size, + ) + } else { + (None, None, 0u64) + }; + + // new block meta. + let new_meta = Arc::new(BlockMeta::new( + row_count, + block_size, + file_size, + col_stats, + col_metas, + cluster_stats, + block_location.clone(), + index_location.clone(), + index_size, + self.table_compression.into(), + )); + + self.state = State::Serialized( + SerializeState { + block_data, + block_location: block_location.0, + index_data, + index_location: index_location.map(|l| l.0), + }, + new_meta, + ); + } + State::Output(op) => { + let meta = MutationTransformMeta::create(self.index.clone(), op); + self.output_data = Some(DataBlock::empty_with_meta(meta)); + } + _ => return Err(ErrorCode::Internal("It's a bug.")), + } + Ok(()) + } + + async fn async_process(&mut self) -> Result<()> { + match std::mem::replace(&mut self.state, State::Consume) { + State::Serialized(serialize_state, block_meta) => { + // write block data. + write_data( + &serialize_state.block_data, + &self.dal, + &serialize_state.block_location, + ) + .await?; + // write index data. + if let (Some(index_data), Some(index_location)) = + (serialize_state.index_data, serialize_state.index_location) + { + write_data(&index_data, &self.dal, &index_location).await?; + } + + self.state = State::Output(Mutation::Replaced(block_meta)); + } + _ => return Err(ErrorCode::Internal("It's a bug.")), + } + Ok(()) + } +} diff --git a/src/query/storages/fuse/src/operations/read/mod.rs b/src/query/storages/fuse/src/operations/read/mod.rs index 3279f6c5efc10..6b0f87d325276 100644 --- a/src/query/storages/fuse/src/operations/read/mod.rs +++ b/src/query/storages/fuse/src/operations/read/mod.rs @@ -21,3 +21,4 @@ mod parquet_data_source_deserializer; mod parquet_data_source_reader; pub use fuse_source::build_fuse_parquet_source_pipeline; +pub use parquet_data_source::DataSourceMeta; diff --git a/src/query/storages/fuse/src/operations/update.rs b/src/query/storages/fuse/src/operations/update.rs new file mode 100644 index 0000000000000..1cc62a69898e0 --- /dev/null +++ b/src/query/storages/fuse/src/operations/update.rs @@ -0,0 +1,209 @@ +// Copyright 2023 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeMap; +use std::sync::Arc; + +use common_catalog::plan::Projection; +use common_catalog::table::Table; +use common_catalog::table_context::TableContext; +use common_exception::Result; +use common_expression::RemoteExpr; +use common_expression::TableDataType; +use common_expression::TableField; +use common_expression::TableSchema; +use common_functions::scalars::BUILTIN_FUNCTIONS; +use common_sql::evaluator::BlockOperator; +use storages_common_table_meta::meta::TableSnapshot; + +use crate::operations::mutation::MutationAction; +use crate::operations::mutation::MutationSink; +use crate::operations::mutation::MutationSource; +use crate::operations::mutation::SerializeDataTransform; +use crate::pipelines::Pipeline; +use crate::statistics::ClusterStatsGenerator; +use crate::FuseTable; + +impl FuseTable { + /// UPDATE column = expression WHERE condition + /// The flow of Pipeline is the same as that of deletion. + pub async fn do_update( + &self, + ctx: Arc, + filter: Option>, + col_indices: Vec, + update_list: Vec<(usize, RemoteExpr)>, + pipeline: &mut Pipeline, + ) -> Result<()> { + let snapshot_opt = self.read_table_snapshot().await?; + + // check if table is empty + let snapshot = if let Some(val) = snapshot_opt { + val + } else { + // no snapshot, no update + return Ok(()); + }; + + if snapshot.summary.row_count == 0 { + // empty snapshot, no update + return Ok(()); + } + + let mut filter = filter; + if col_indices.is_empty() && filter.is_some() { + let filter_expr = filter.clone().unwrap(); + if !self.try_eval_const(ctx.clone(), &self.schema(), &filter_expr)? { + // The condition is always false, do nothing. + return Ok(()); + } + // The condition is always true. + filter = None; + } + + self.try_add_update_source( + ctx.clone(), + filter, + col_indices, + update_list, + &snapshot, + pipeline, + ) + .await?; + + // TODO(zhyass): support cluster stats generator. + pipeline.add_transform(|input, output| { + SerializeDataTransform::try_create( + ctx.clone(), + input, + output, + self, + ClusterStatsGenerator::default(), + ) + })?; + + self.try_add_mutation_transform(ctx.clone(), snapshot.segments.clone(), pipeline)?; + + pipeline.add_sink(|input| { + MutationSink::try_create(self, ctx.clone(), snapshot.clone(), input) + })?; + Ok(()) + } + + async fn try_add_update_source( + &self, + ctx: Arc, + filter: Option>, + col_indices: Vec, + update_list: Vec<(usize, RemoteExpr)>, + base_snapshot: &TableSnapshot, + pipeline: &mut Pipeline, + ) -> Result<()> { + let all_col_ids = self.all_the_columns_ids(); + let schema = self.schema(); + + let mut offset_map = BTreeMap::new(); + let mut remain_reader = None; + let mut pos = 0; + let (projection, input_schema) = if col_indices.is_empty() { + all_col_ids.iter().for_each(|&id| { + offset_map.insert(id, pos); + pos += 1; + }); + + (Projection::Columns(all_col_ids), schema.clone()) + } else { + col_indices.iter().for_each(|&id| { + offset_map.insert(id, pos); + pos += 1; + }); + + let mut fields: Vec = col_indices + .iter() + .map(|idx| schema.fields()[*idx].clone()) + .collect(); + + let remain_col_ids: Vec = all_col_ids + .into_iter() + .filter(|id| !col_indices.contains(id)) + .collect(); + if !remain_col_ids.is_empty() { + remain_col_ids.iter().for_each(|&id| { + offset_map.insert(id, pos); + pos += 1; + }); + + let reader = self.create_block_reader(Projection::Columns(remain_col_ids))?; + fields.extend_from_slice(reader.schema().fields()); + remain_reader = Some((*reader).clone()); + } + + fields.push(TableField::new("_predicate", TableDataType::Boolean)); + pos += 1; + + ( + Projection::Columns(col_indices.clone()), + Arc::new(TableSchema::new(fields)), + ) + }; + + let mut ops = Vec::with_capacity(update_list.len() + 1); + for (id, remote_expr) in update_list.into_iter() { + let expr = remote_expr + .as_expr(&BUILTIN_FUNCTIONS) + .unwrap() + .project_column_ref(|name| input_schema.index_of(name).unwrap()); + ops.push(BlockOperator::Map { expr }); + offset_map.insert(id, pos); + pos += 1; + } + ops.push(BlockOperator::Project { + projection: offset_map.values().cloned().collect(), + }); + + let block_reader = self.create_block_reader(projection.clone())?; + let remain_reader = Arc::new(remain_reader); + let (filter_expr, filters) = if let Some(remote_expr) = filter { + let schema = block_reader.schema(); + ( + Arc::new(remote_expr.as_expr(&BUILTIN_FUNCTIONS).map(|expr| { + expr.project_column_ref(|name| schema.column_with_name(name).unwrap().0) + })), + vec![remote_expr], + ) + } else { + (Arc::new(None), vec![]) + }; + + self.mutation_block_purning(ctx.clone(), filters, projection, base_snapshot) + .await?; + + let max_threads = ctx.get_settings().get_max_threads()? as usize; + // Add source pipe. + pipeline.add_source( + |output| { + MutationSource::try_create( + ctx.clone(), + MutationAction::Update, + output, + filter_expr.clone(), + block_reader.clone(), + remain_reader.clone(), + ops.clone(), + ) + }, + max_threads, + ) + } +} diff --git a/tests/sqllogictests/suites/base/03_dml/03_0035_update b/tests/sqllogictests/suites/base/03_dml/03_0035_update new file mode 100644 index 0000000000000..ae0e732c46fc4 --- /dev/null +++ b/tests/sqllogictests/suites/base/03_dml/03_0035_update @@ -0,0 +1,75 @@ +statement ok +DROP DATABASE IF EXISTS db1 + +statement ok +CREATE DATABASE db1 + +statement ok +USE db1 + +statement ok +CREATE TABLE IF NOT EXISTS t1(a Int, b Date) + +statement ok +INSERT INTO t1 VALUES(1, '2022-12-30') + +statement ok +INSERT INTO t1 VALUES(2, '2023-01-01') + +statement ok +UPDATE t1 SET a = 3 WHERE b > '2022-12-31' + +query IT +SELECT * FROM t1 ORDER BY b +---- +1 2022-12-30 +3 2023-01-01 + +statement ok +UPDATE t1 SET a = 2, b = '2022-12-31' WHERE b > '2022-12-31' + +query IT +SELECT * FROM t1 ORDER BY b +---- +1 2022-12-30 +2 2022-12-31 + +statement ok +UPDATE t1 SET a = 3 WHERE false + +query B +select count(*) = 0 from t1 WHERE a = 3 +---- +1 + +statement ok +UPDATE t1 SET a = 3 WHERE true + +query B +select count(*) = 2 from t1 WHERE a = 3 +---- +1 + +statement error 1006 +UPDATE t1 SET a = 3, a = 4 WHERE b > '2022-12-31' + +statement ok +CREATE TABLE IF NOT EXISTS t2(a Int, b Date) + +statement ok +INSERT INTO t2 VALUES(1, '2022-12-30') + +statement ok +INSERT INTO t2 VALUES(2, '2023-01-01') + +statement error 1001 +UPDATE t1 SET a = 2 WHERE a in (SELECT a FROM t2 WHERE b > '2022-12-31') + +statement ok +drop table t1 all + +statement ok +drop table t2 all + +statement ok +DROP DATABASE db1 diff --git a/tests/sqllogictests/suites/duckdb/issues/monetdb/test_correlated_update.test b/tests/sqllogictests/suites/duckdb/issues/monetdb/test_correlated_update.test index 0e7c704c3cc3c..b160bd0673db8 100644 --- a/tests/sqllogictests/suites/duckdb/issues/monetdb/test_correlated_update.test +++ b/tests/sqllogictests/suites/duckdb/issues/monetdb/test_correlated_update.test @@ -16,7 +16,7 @@ insert into t1284791a values (1,'1') statement ok insert into t1284791b values (1,'2') -statement error 1002 +statement error 1001 update t1284791a set val1 = (select val2 from t1284791b where id1 = id2) where id1 in (select id2 from t1284791b) query IT