diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_accumulating_async.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_accumulating_async.rs index 951633a4471cc..209474572cce8 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_accumulating_async.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_accumulating_async.rs @@ -21,7 +21,6 @@ use common_pipeline_core::processors::port::InputPort; use common_pipeline_core::processors::port::OutputPort; use common_pipeline_core::processors::processor::Event; use common_pipeline_core::processors::Processor; - #[async_trait::async_trait] pub trait AsyncAccumulatingTransform: Send { const NAME: &'static str; diff --git a/src/query/service/src/interpreters/interpreter_copy.rs b/src/query/service/src/interpreters/interpreter_copy.rs index 02211a1431ea4..8922f41764759 100644 --- a/src/query/service/src/interpreters/interpreter_copy.rs +++ b/src/query/service/src/interpreters/interpreter_copy.rs @@ -13,11 +13,9 @@ // limitations under the License. use std::sync::Arc; -use std::time::Instant; use common_catalog::plan::StageTableInfo; use common_catalog::table::AppendMode; -use common_exception::ErrorCode; use common_exception::Result; use common_expression::infer_table_schema; use common_expression::BlockThresholds; @@ -27,12 +25,11 @@ use common_expression::DataSchemaRefExt; use common_meta_app::principal::StageInfo; use common_pipeline_core::Pipeline; use common_sql::executor::table_read_plan::ToReadDataSourcePlan; -use common_sql::executor::CopyIntoTableFromQuery; -use common_sql::executor::DistributedCopyIntoTableFromStage; +use common_sql::executor::CopyIntoTable; +use common_sql::executor::CopyIntoTableSource; use common_sql::executor::Exchange; use common_sql::executor::FragmentKind; use common_sql::executor::PhysicalPlan; -use common_sql::plans::CopyIntoTableMode; use common_sql::plans::CopyIntoTablePlan; use common_storage::StageFileInfo; use common_storage::StageFilesInfo; @@ -44,12 +41,9 @@ use crate::interpreters::common::check_deduplicate_label; use crate::interpreters::Interpreter; use crate::interpreters::SelectInterpreter; use crate::pipelines::builders::build_append2table_with_commit_pipeline; -use crate::pipelines::builders::build_append_data_pipeline; use crate::pipelines::builders::build_commit_data_pipeline; -use crate::pipelines::builders::set_copy_on_finished; -use crate::pipelines::builders::CopyPlanType; use crate::pipelines::PipelineBuildResult; -use crate::schedulers::build_distributed_pipeline; +use crate::schedulers::build_query_pipeline_without_render_result_set; use crate::sessions::QueryContext; use crate::sessions::TableContext; use crate::sql::plans::CopyPlan; @@ -147,28 +141,36 @@ impl CopyInterpreter { } #[async_backtrace::framed] - async fn try_transform_copy_plan_from_local_to_distributed( + pub async fn build_physical_plan( &self, plan: &CopyIntoTablePlan, - ) -> Result> { - let ctx = self.ctx.clone(); - let to_table = ctx + ) -> Result<(PhysicalPlan, Vec)> { + let to_table = self + .ctx .get_table( plan.catalog_info.catalog_name(), &plan.database_name, &plan.table_name, ) .await?; - let table_ctx: Arc = self.ctx.clone(); - let files = plan.collect_files(&table_ctx).await?; - if files.is_empty() { - return Ok(None); - } - let mut stage_table_info = plan.stage_table_info.clone(); - stage_table_info.files_to_copy = Some(files.clone()); - let stage_table = StageTable::try_create(stage_table_info.clone())?; - if plan.query.is_none() { - let read_source_plan = { + let files = plan.collect_files(self.ctx.as_ref()).await?; + let source = if let Some(ref query) = plan.query { + let (select_interpreter, query_source_schema) = self.build_query(query).await?; + let plan_query = select_interpreter.build_physical_plan().await?; + let result_columns = select_interpreter.get_result_columns(); + CopyIntoTableSource::Query(Box::new(common_sql::executor::QuerySource { + plan: plan_query, + ignore_result: select_interpreter.get_ignore_result(), + result_columns, + query_source_schema, + })) + } else { + let stage_table_info = StageTableInfo { + files_to_copy: Some(files.clone()), + ..plan.stage_table_info.clone() + }; + let stage_table = StageTable::try_create(stage_table_info)?; + let read_source_plan = Box::new( stage_table .read_plan_with_catalog( self.ctx.clone(), @@ -177,64 +179,34 @@ impl CopyInterpreter { None, false, ) - .await? - }; + .await?, + ); + CopyIntoTableSource::Stage(read_source_plan) + }; - if read_source_plan.parts.len() <= 1 { - return Ok(None); - } - Ok(Some(CopyPlanType::DistributedCopyIntoTableFromStage( - DistributedCopyIntoTableFromStage { - // TODO(leiysky): we reuse the id of exchange here, - // which is not correct. We should generate a new id for insert. - plan_id: 0, - catalog_info: plan.catalog_info.clone(), - database_name: plan.database_name.clone(), - table_name: plan.table_name.clone(), - required_values_schema: plan.required_values_schema.clone(), - values_consts: plan.values_consts.clone(), - required_source_schema: plan.required_source_schema.clone(), - stage_table_info: plan.stage_table_info.clone(), - source: Box::new(read_source_plan), - files, - table_info: to_table.get_table_info().clone(), - force: plan.force, - write_mode: plan.write_mode, - thresholds: to_table.get_block_thresholds(), - validation_mode: plan.validation_mode.clone(), - }, - ))) - } else { - // plan query must exist, we can use unwarp directly. - let (select_interpreter, query_source_schema) = - self.build_query(plan.query.as_ref().unwrap()).await?; - let plan_query = select_interpreter.build_physical_plan().await?; - Ok(Some(CopyPlanType::CopyIntoTableFromQuery( - CopyIntoTableFromQuery { - // add exchange plan node to enable distributed - // TODO(leiysky): we reuse the id of exchange here, - // which is not correct. We should generate a new id - plan_id: 0, - ignore_result: select_interpreter.get_ignore_result(), - catalog_info: plan.catalog_info.clone(), - database_name: plan.database_name.clone(), - table_name: plan.table_name.clone(), - required_source_schema: plan.required_source_schema.clone(), - values_consts: plan.values_consts.clone(), - required_values_schema: plan.required_values_schema.clone(), - result_columns: select_interpreter.get_result_columns(), - query_source_schema, - write_mode: plan.write_mode, - validation_mode: plan.validation_mode.clone(), - force: plan.force, - stage_table_info: plan.stage_table_info.clone(), - local_node_id: self.ctx.get_cluster().local_id.clone(), - input: Box::new(plan_query), - files: plan.collect_files(&table_ctx).await?, - table_info: to_table.get_table_info().clone(), - }, - ))) + let mut root = PhysicalPlan::CopyIntoTable(Box::new(CopyIntoTable { + catalog_info: plan.catalog_info.clone(), + required_values_schema: plan.required_values_schema.clone(), + values_consts: plan.values_consts.clone(), + required_source_schema: plan.required_source_schema.clone(), + stage_table_info: plan.stage_table_info.clone(), + table_info: to_table.get_table_info().clone(), + force: plan.force, + write_mode: plan.write_mode, + validation_mode: plan.validation_mode.clone(), + + files: files.clone(), + source, + })); + if plan.enable_distributed { + root = PhysicalPlan::Exchange(Exchange { + plan_id: 0, + input: Box::new(root), + kind: FragmentKind::Merge, + keys: Vec::new(), + }); } + Ok((root, files)) } #[async_backtrace::framed] @@ -268,195 +240,6 @@ impl CopyInterpreter { Ok(()) } - - /// Build a COPY pipeline in standalone mode. - #[async_backtrace::framed] - async fn build_local_copy_into_table_pipeline( - &self, - plan: &CopyIntoTablePlan, - ) -> Result { - let catalog = plan.catalog_info.catalog_name(); - let database = plan.database_name.as_str(); - let table = plan.table_name.as_str(); - - let ctx = self.ctx.clone(); - let to_table = ctx.get_table(catalog, database, table).await?; - - let mut build_res; - let source_schema; - let files; - - match &plan.query { - None => { - let table_ctx: Arc = self.ctx.clone(); - - files = plan.collect_files(&table_ctx).await?; - source_schema = plan.required_source_schema.clone(); - build_res = PipelineBuildResult::create(); - if !files.is_empty() { - self.build_read_stage_table_data_pipeline( - &mut build_res.main_pipeline, - plan, - to_table.get_block_thresholds(), - files.clone(), - ) - .await?; - } else { - return Ok(build_res); - } - } - Some(query) => { - files = plan - .stage_table_info - .files_to_copy - .clone() - .ok_or(ErrorCode::Internal("files_to_copy should not be None"))?; - - let (select_interpreter, query_source_schema) = self.build_query(query).await?; - let plan = select_interpreter.build_physical_plan().await?; - build_res = select_interpreter.build_pipeline(plan).await?; - source_schema = query_source_schema; - } - } - - let file_sizes: u64 = files.iter().map(|f| f.size).sum(); - - // Append data. - { - self.set_status(&format!( - "Copy begin to append data: {} files, size_in_bytes:{} into table", - files.len(), - file_sizes - )); - - let start = Instant::now(); - build_append_data_pipeline( - ctx.clone(), - &mut build_res.main_pipeline, - CopyPlanType::CopyIntoTablePlanOption(plan.clone()), - source_schema, - to_table.clone(), - )?; - - // Perf - { - self.set_status(&format!( - "Copy append data finished, cost:{} secs", - start.elapsed().as_secs() - )); - } - } - - // Commit data. - { - // if it's replace mode, don't commit, because COPY is the source of replace. - match plan.write_mode { - CopyIntoTableMode::Replace => set_copy_on_finished( - ctx, - files, - plan.stage_table_info.stage_info.copy_options.purge, - plan.stage_table_info.stage_info.clone(), - &mut build_res.main_pipeline, - )?, - _ => { - // commit. - build_commit_data_pipeline( - ctx.clone(), - &mut build_res.main_pipeline, - plan.stage_table_info.stage_info.clone(), - to_table, - files, - plan.force, - plan.stage_table_info.stage_info.copy_options.purge, - plan.write_mode.is_overwrite(), - )? - } - } - } - - Ok(build_res) - } - - /// Build distributed pipeline from source node id. - #[async_backtrace::framed] - async fn build_cluster_copy_into_table_pipeline( - &self, - distributed_plan: &CopyPlanType, - ) -> Result { - let ( - catalog_info, - database_name, - table_name, - stage_info, - files, - force, - purge, - is_overwrite, - ); - let mut build_res = match distributed_plan { - CopyPlanType::DistributedCopyIntoTableFromStage(plan) => { - catalog_info = plan.catalog_info.clone(); - database_name = plan.database_name.clone(); - table_name = plan.table_name.clone(); - stage_info = plan.stage_table_info.stage_info.clone(); - files = plan.files.clone(); - force = plan.force; - purge = plan.stage_table_info.stage_info.copy_options.purge; - is_overwrite = plan.write_mode.is_overwrite(); - // add exchange plan node to enable distributed - // TODO(leiysky): we reuse the id of exchange here, - // which is not correct. We should generate a new id for insert. - let exchange_plan = PhysicalPlan::Exchange(Exchange { - plan_id: 0, - input: Box::new(PhysicalPlan::DistributedCopyIntoTableFromStage(Box::new( - plan.clone(), - ))), - kind: FragmentKind::Merge, - keys: Vec::new(), - }); - - build_distributed_pipeline(&self.ctx, &exchange_plan, false).await? - } - CopyPlanType::CopyIntoTableFromQuery(plan) => { - catalog_info = plan.catalog_info.clone(); - database_name = plan.database_name.clone(); - table_name = plan.table_name.clone(); - stage_info = plan.stage_table_info.stage_info.clone(); - files = plan.files.clone(); - force = plan.force; - purge = plan.stage_table_info.stage_info.copy_options.purge; - is_overwrite = plan.write_mode.is_overwrite(); - // add exchange plan node to enable distributed - // TODO(leiysky): we reuse the id of exchange here, - // which is not correct. We should generate a new id - let exchange_plan = PhysicalPlan::Exchange(Exchange { - plan_id: 0, - input: Box::new(PhysicalPlan::CopyIntoTableFromQuery(Box::new(plan.clone()))), - kind: FragmentKind::Merge, - keys: Vec::new(), - }); - build_distributed_pipeline(&self.ctx, &exchange_plan, false).await? - } - _ => unreachable!(), - }; - let to_table = self - .ctx - .get_table(catalog_info.catalog_name(), &database_name, &table_name) - .await?; - - // commit. - build_commit_data_pipeline( - self.ctx.clone(), - &mut build_res.main_pipeline, - stage_info, - to_table, - files, - force, - purge, - is_overwrite, - )?; - Ok(build_res) - } } #[async_trait::async_trait] @@ -476,22 +259,16 @@ impl Interpreter for CopyInterpreter { match &self.plan { CopyPlan::IntoTable(plan) => { - if plan.enable_distributed { - let distributed_plan_op = self - .try_transform_copy_plan_from_local_to_distributed(plan) - .await?; - if let Some(distributed_plan) = distributed_plan_op { - let build_res = self - .build_cluster_copy_into_table_pipeline(&distributed_plan) - .await?; - - Ok(build_res) - } else { - self.build_local_copy_into_table_pipeline(plan).await - } - } else { - self.build_local_copy_into_table_pipeline(plan).await - } + let (physical_plan, files) = self.build_physical_plan(plan).await?; + let mut build_res = build_query_pipeline_without_render_result_set( + &self.ctx, + &physical_plan, + false, + ) + .await?; + build_commit_data_pipeline(&self.ctx, &mut build_res.main_pipeline, plan, &files) + .await?; + Ok(build_res) } CopyPlan::IntoStage { stage, from, path, .. diff --git a/src/query/service/src/interpreters/interpreter_delete.rs b/src/query/service/src/interpreters/interpreter_delete.rs index 147a30cb72fa6..1106073302a6c 100644 --- a/src/query/service/src/interpreters/interpreter_delete.rs +++ b/src/query/service/src/interpreters/interpreter_delete.rs @@ -31,10 +31,11 @@ use common_meta_app::schema::CatalogInfo; use common_meta_app::schema::TableInfo; use common_sql::binder::ColumnBindingBuilder; use common_sql::executor::cast_expr_to_non_null_boolean; -use common_sql::executor::DeleteFinal; use common_sql::executor::DeletePartial; use common_sql::executor::Exchange; use common_sql::executor::FragmentKind; +use common_sql::executor::MutationAggregate; +use common_sql::executor::MutationKind; use common_sql::executor::PhysicalPlan; use common_sql::optimizer::CascadesOptimizer; use common_sql::optimizer::DPhpy; @@ -298,12 +299,15 @@ impl DeleteInterpreter { }); } - Ok(PhysicalPlan::DeleteFinal(Box::new(DeleteFinal { - input: Box::new(root), - snapshot, - table_info, - catalog_info, - }))) + Ok(PhysicalPlan::MutationAggregate(Box::new( + MutationAggregate { + input: Box::new(root), + snapshot, + table_info, + catalog_info, + mutation_kind: MutationKind::Delete, + }, + ))) } } diff --git a/src/query/service/src/interpreters/interpreter_insert.rs b/src/query/service/src/interpreters/interpreter_insert.rs index a3539cb69bd95..44143902b1784 100644 --- a/src/query/service/src/interpreters/interpreter_insert.rs +++ b/src/query/service/src/interpreters/interpreter_insert.rs @@ -12,28 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::VecDeque; -use std::io::BufRead; -use std::io::Cursor; -use std::ops::Not; use std::str::FromStr; use std::sync::Arc; -use aho_corasick::AhoCorasick; -use common_ast::parser::parse_comma_separated_exprs; -use common_ast::parser::tokenize_sql; use common_catalog::table::AppendMode; use common_exception::ErrorCode; use common_exception::Result; -use common_expression::ColumnBuilder; -use common_expression::DataBlock; use common_expression::DataSchema; -use common_expression::DataSchemaRef; -use common_formats::FastFieldDecoderValues; -use common_io::cursor_ext::ReadBytesExt; -use common_io::cursor_ext::ReadCheckPointExt; use common_meta_app::principal::StageFileFormatType; -use common_pipeline_sources::AsyncSource; use common_pipeline_sources::AsyncSourcer; use common_sql::executor::DistributedInsertSelect; use common_sql::executor::PhysicalPlan; @@ -41,13 +27,8 @@ use common_sql::executor::PhysicalPlanBuilder; use common_sql::plans::Insert; use common_sql::plans::InsertInputSource; use common_sql::plans::Plan; -use common_sql::BindContext; -use common_sql::Metadata; -use common_sql::MetadataRef; use common_sql::NameResolutionContext; -use once_cell::sync::Lazy; use parking_lot::Mutex; -use parking_lot::RwLock; use crate::interpreters::common::check_deduplicate_label; use crate::interpreters::Interpreter; @@ -56,15 +37,11 @@ use crate::pipelines::builders::build_append2table_with_commit_pipeline; use crate::pipelines::processors::transforms::TransformRuntimeCastSchema; use crate::pipelines::PipelineBuildResult; use crate::pipelines::SourcePipeBuilder; +use crate::pipelines::ValueSource; use crate::schedulers::build_query_pipeline_without_render_result_set; use crate::sessions::QueryContext; use crate::sessions::TableContext; -// Pre-generate the positions of `(`, `'` and `\` -static PATTERNS: &[&str] = &["(", "'", "\\"]; - -static INSERT_TOKEN_FINDER: Lazy = Lazy::new(|| AhoCorasick::new(PATTERNS).unwrap()); - pub struct InsertInterpreter { ctx: Arc, plan: Insert, @@ -294,261 +271,3 @@ impl Interpreter for InsertInterpreter { Ok(()) } } - -pub struct ValueSource { - data: String, - ctx: Arc, - name_resolution_ctx: NameResolutionContext, - bind_context: BindContext, - schema: DataSchemaRef, - metadata: MetadataRef, - is_finished: bool, -} - -#[async_trait::async_trait] -impl AsyncSource for ValueSource { - const NAME: &'static str = "ValueSource"; - const SKIP_EMPTY_DATA_BLOCK: bool = true; - - #[async_trait::unboxed_simple] - #[async_backtrace::framed] - async fn generate(&mut self) -> Result> { - if self.is_finished { - return Ok(None); - } - - // Use the number of '(' to estimate the number of rows - let mut estimated_rows = 0; - let mut positions = VecDeque::new(); - for mat in INSERT_TOKEN_FINDER.find_iter(&self.data) { - if mat.pattern() == 0.into() { - estimated_rows += 1; - continue; - } - positions.push_back(mat.start()); - } - - let mut reader = Cursor::new(self.data.as_bytes()); - let block = self - .read(estimated_rows, &mut reader, &mut positions) - .await?; - self.is_finished = true; - Ok(Some(block)) - } -} - -impl ValueSource { - pub fn new( - data: String, - ctx: Arc, - name_resolution_ctx: NameResolutionContext, - schema: DataSchemaRef, - ) -> Self { - let bind_context = BindContext::new(); - let metadata = Arc::new(RwLock::new(Metadata::default())); - - Self { - data, - ctx, - name_resolution_ctx, - schema, - bind_context, - metadata, - is_finished: false, - } - } - - #[async_backtrace::framed] - pub async fn read>( - &self, - estimated_rows: usize, - reader: &mut Cursor, - positions: &mut VecDeque, - ) -> Result { - let mut columns = self - .schema - .fields() - .iter() - .map(|f| ColumnBuilder::with_capacity(f.data_type(), estimated_rows)) - .collect::>(); - - let mut bind_context = self.bind_context.clone(); - - let format = self.ctx.get_format_settings()?; - let field_decoder = FastFieldDecoderValues::create_for_insert(format); - - for row in 0.. { - let _ = reader.ignore_white_spaces(); - if reader.eof() { - break; - } - // Not the first row - if row != 0 { - reader.must_ignore_byte(b',')?; - } - - self.parse_next_row( - &field_decoder, - reader, - &mut columns, - positions, - &mut bind_context, - self.metadata.clone(), - ) - .await?; - } - - let columns = columns - .into_iter() - .map(|col| col.build()) - .collect::>(); - Ok(DataBlock::new_from_columns(columns)) - } - - /// Parse single row value, like ('111', 222, 1 + 1) - #[async_backtrace::framed] - async fn parse_next_row>( - &self, - field_decoder: &FastFieldDecoderValues, - reader: &mut Cursor, - columns: &mut [ColumnBuilder], - positions: &mut VecDeque, - bind_context: &mut BindContext, - metadata: MetadataRef, - ) -> Result<()> { - let _ = reader.ignore_white_spaces(); - let col_size = columns.len(); - let start_pos_of_row = reader.checkpoint(); - - // Start of the row --- '(' - if !reader.ignore_byte(b'(') { - return Err(ErrorCode::BadDataValueType( - "Must start with parentheses".to_string(), - )); - } - // Ignore the positions in the previous row. - while let Some(pos) = positions.front() { - if *pos < start_pos_of_row as usize { - positions.pop_front(); - } else { - break; - } - } - - for col_idx in 0..col_size { - let _ = reader.ignore_white_spaces(); - let col_end = if col_idx + 1 == col_size { b')' } else { b',' }; - - let col = columns - .get_mut(col_idx) - .ok_or_else(|| ErrorCode::Internal("ColumnBuilder is None"))?; - - let (need_fallback, pop_count) = field_decoder - .read_field(col, reader, positions) - .map(|_| { - let _ = reader.ignore_white_spaces(); - let need_fallback = reader.ignore_byte(col_end).not(); - (need_fallback, col_idx + 1) - }) - .unwrap_or((true, col_idx)); - - // ColumnBuilder and expr-parser both will eat the end ')' of the row. - if need_fallback { - for col in columns.iter_mut().take(pop_count) { - col.pop(); - } - // rollback to start position of the row - reader.rollback(start_pos_of_row + 1); - skip_to_next_row(reader, 1)?; - let end_pos_of_row = reader.position(); - - // Parse from expression and append all columns. - reader.set_position(start_pos_of_row); - let row_len = end_pos_of_row - start_pos_of_row; - let buf = &reader.remaining_slice()[..row_len as usize]; - - let sql = std::str::from_utf8(buf).unwrap(); - let settings = self.ctx.get_settings(); - let sql_dialect = settings.get_sql_dialect()?; - let tokens = tokenize_sql(sql)?; - let exprs = parse_comma_separated_exprs(&tokens[1..tokens.len()], sql_dialect)?; - - let values = bind_context - .exprs_to_scalar( - exprs, - &self.schema, - self.ctx.clone(), - &self.name_resolution_ctx, - metadata, - ) - .await?; - - for (col, scalar) in columns.iter_mut().zip(values) { - col.push(scalar.as_ref()); - } - reader.set_position(end_pos_of_row); - return Ok(()); - } - } - - Ok(()) - } -} - -// Values |(xxx), (yyy), (zzz) -pub fn skip_to_next_row>(reader: &mut Cursor, mut balance: i32) -> Result<()> { - let _ = reader.ignore_white_spaces(); - - let mut quoted = false; - let mut escaped = false; - - while balance > 0 { - let buffer = reader.remaining_slice(); - if buffer.is_empty() { - break; - } - - let size = buffer.len(); - - let it = buffer - .iter() - .position(|&c| c == b'(' || c == b')' || c == b'\\' || c == b'\''); - - if let Some(it) = it { - let c = buffer[it]; - reader.consume(it + 1); - - if it == 0 && escaped { - escaped = false; - continue; - } - escaped = false; - - match c { - b'\\' => { - escaped = true; - continue; - } - b'\'' => { - quoted ^= true; - continue; - } - b')' => { - if !quoted { - balance -= 1; - } - } - b'(' => { - if !quoted { - balance += 1; - } - } - _ => {} - } - } else { - escaped = false; - reader.consume(size); - } - } - Ok(()) -} diff --git a/src/query/service/src/interpreters/interpreter_replace.rs b/src/query/service/src/interpreters/interpreter_replace.rs index 537130dac08b5..7cdb141c752d3 100644 --- a/src/query/service/src/interpreters/interpreter_replace.rs +++ b/src/query/service/src/interpreters/interpreter_replace.rs @@ -19,34 +19,44 @@ use common_base::runtime::GlobalIORuntime; use common_catalog::table_context::TableContext; use common_exception::ErrorCode; use common_exception::Result; -use common_expression::DataSchema; use common_expression::DataSchemaRef; -use common_pipeline_sources::AsyncSourcer; +use common_meta_app::principal::StageInfo; +use common_sql::executor::AsyncSourcerPlan; +use common_sql::executor::Deduplicate; +use common_sql::executor::Exchange; +use common_sql::executor::MutationAggregate; +use common_sql::executor::MutationKind; +use common_sql::executor::OnConflictField; +use common_sql::executor::PhysicalPlan; +use common_sql::executor::ReplaceInto; +use common_sql::executor::SelectCtx; +use common_sql::plans::CopyPlan; use common_sql::plans::InsertInputSource; use common_sql::plans::OptimizeTableAction; use common_sql::plans::OptimizeTablePlan; use common_sql::plans::Plan; use common_sql::plans::Replace; -use common_sql::NameResolutionContext; +use common_storage::StageFileInfo; +use common_storages_factory::Table; +use common_storages_fuse::FuseTable; use log::info; +use storages_common_table_meta::meta::TableSnapshot; use crate::interpreters::common::check_deduplicate_label; use crate::interpreters::common::metrics_inc_replace_execution_time_ms; use crate::interpreters::common::metrics_inc_replace_mutation_time_ms; use crate::interpreters::interpreter_copy::CopyInterpreter; -use crate::interpreters::interpreter_insert::ValueSource; use crate::interpreters::Interpreter; use crate::interpreters::InterpreterPtr; use crate::interpreters::OptimizeTableInterpreter; use crate::interpreters::SelectInterpreter; -use crate::pipelines::builders::build_fill_missing_columns_pipeline; +use crate::pipelines::builders::set_copy_on_finished; use crate::pipelines::executor::ExecutorSettings; use crate::pipelines::executor::PipelineCompleteExecutor; -use crate::pipelines::processors::TransformCastSchema; use crate::pipelines::PipelineBuildResult; +use crate::schedulers::build_query_pipeline_without_render_result_set; use crate::sessions::QueryContext; -#[allow(dead_code)] pub struct ReplaceInterpreter { ctx: Arc, plan: Replace, @@ -71,42 +81,32 @@ impl Interpreter for ReplaceInterpreter { } self.check_on_conflicts()?; + let start = Instant::now(); + + // replace + let (physical_plan, purge_info) = self.build_physical_plan().await?; + let mut pipeline = + build_query_pipeline_without_render_result_set(&self.ctx, &physical_plan, false) + .await?; + // purge + if let Some((files, stage_info)) = purge_info { + set_copy_on_finished( + self.ctx.clone(), + files, + stage_info.copy_options.purge, + stage_info, + &mut pipeline.main_pipeline, + )?; + } + + // recluster let plan = &self.plan; let table = self .ctx .get_table(&plan.catalog, &plan.database, &plan.table) .await?; - let has_cluster_key = !table.cluster_keys(self.ctx.clone()).is_empty(); - - let mut pipeline = self - .connect_input_source(self.ctx.clone(), &self.plan.source, self.plan.schema()) - .await?; - - if pipeline.main_pipeline.is_empty() { - return Ok(pipeline); - } - - build_fill_missing_columns_pipeline( - self.ctx.clone(), - &mut pipeline.main_pipeline, - table.clone(), - self.plan.schema(), - )?; - - let on_conflict_fields = plan.on_conflict_fields.clone(); - - let start = Instant::now(); - - table - .replace_into( - self.ctx.clone(), - &mut pipeline.main_pipeline, - on_conflict_fields, - ) - .await?; - if !pipeline.main_pipeline.is_empty() && has_cluster_key && self.ctx.get_settings().get_enable_auto_reclustering()? @@ -117,59 +117,187 @@ impl Interpreter for ReplaceInterpreter { let table = self.plan.table.to_string(); pipeline.main_pipeline.set_on_finished(move |err| { metrics_inc_replace_mutation_time_ms(start.elapsed().as_millis() as u64); - if err.is_none() { - info!("execute replace into finished successfully. running table optimization job."); - match GlobalIORuntime::instance().block_on({ - async move { - ctx.evict_table_from_cache(&catalog, &database, &table)?; - let optimize_interpreter = OptimizeTableInterpreter::try_create(ctx.clone(), - OptimizeTablePlan { - catalog, - database, - table, - action: OptimizeTableAction::CompactBlocks, - limit: None, - } - )?; - - let mut build_res = optimize_interpreter.execute2().await?; - - if build_res.main_pipeline.is_empty() { - return Ok(()); - } - - let settings = ctx.get_settings(); - let query_id = ctx.get_id(); - build_res.set_max_threads(settings.get_max_threads()? as usize); - let settings = ExecutorSettings::try_create(&settings, query_id)?; - - if build_res.main_pipeline.is_complete_pipeline()? { - let mut pipelines = build_res.sources_pipelines; - pipelines.push(build_res.main_pipeline); - - let complete_executor = PipelineCompleteExecutor::from_pipelines(pipelines, settings)?; - - ctx.set_executor(complete_executor.get_inner())?; - complete_executor.execute()?; + if err.is_none() { + info!("execute replace into finished successfully. running table optimization job."); + match GlobalIORuntime::instance().block_on({ + async move { + ctx.evict_table_from_cache(&catalog, &database, &table)?; + let optimize_interpreter = OptimizeTableInterpreter::try_create(ctx.clone(), + OptimizeTablePlan { + catalog, + database, + table, + action: OptimizeTableAction::CompactBlocks, + limit: None, + } + )?; + + let mut build_res = optimize_interpreter.execute2().await?; + + if build_res.main_pipeline.is_empty() { + return Ok(()); + } + + let settings = ctx.get_settings(); + let query_id = ctx.get_id(); + build_res.set_max_threads(settings.get_max_threads()? as usize); + let settings = ExecutorSettings::try_create(&settings, query_id)?; + + if build_res.main_pipeline.is_complete_pipeline()? { + let mut pipelines = build_res.sources_pipelines; + pipelines.push(build_res.main_pipeline); + + let complete_executor = PipelineCompleteExecutor::from_pipelines(pipelines, settings)?; + + ctx.set_executor(complete_executor.get_inner())?; + complete_executor.execute()?; + } + Ok(()) } - Ok(()) - } - }) { - Ok(_) => { - info!("execute replace into finished successfully. table optimization job finished."); + }) { + Ok(_) => { + info!("execute replace into finished successfully. table optimization job finished."); + } + Err(e) => { info!("execute replace into finished successfully. table optimization job failed. {:?}", e)} } - Err(e) => { info!("execute replace into finished successfully. table optimization job failed. {:?}", e)} } - } - metrics_inc_replace_execution_time_ms(start.elapsed().as_millis() as u64); - Ok(()) - }); + metrics_inc_replace_execution_time_ms(start.elapsed().as_millis() as u64); + Ok(()) + }); } Ok(pipeline) } } impl ReplaceInterpreter { + async fn build_physical_plan( + &self, + ) -> Result<(Box, Option<(Vec, StageInfo)>)> { + let plan = &self.plan; + let table = self + .ctx + .get_table(&plan.catalog, &plan.database, &plan.table) + .await?; + let catalog = self.ctx.get_catalog(&plan.catalog).await?; + let schema = table.schema(); + let mut on_conflicts = Vec::with_capacity(plan.on_conflict_fields.len()); + for f in &plan.on_conflict_fields { + let field_name = f.name(); + let (field_index, _) = match schema.column_with_name(field_name) { + Some(idx) => idx, + None => { + return Err(ErrorCode::Internal( + "not expected, on conflict field not found (after binding)", + )); + } + }; + on_conflicts.push(OnConflictField { + table_field: f.clone(), + field_index, + }) + } + let fuse_table = + table + .as_any() + .downcast_ref::() + .ok_or(ErrorCode::Unimplemented(format!( + "table {}, engine type {}, does not support REPLACE INTO", + table.name(), + table.get_table_info().engine(), + )))?; + let table_info = fuse_table.get_table_info(); + let base_snapshot = fuse_table.read_table_snapshot().await?.unwrap_or_else(|| { + Arc::new(TableSnapshot::new_empty_snapshot(schema.as_ref().clone())) + }); + + let is_multi_node = !self.ctx.get_cluster().is_empty(); + let is_value_source = matches!(self.plan.source, InsertInputSource::Values(_)); + let is_distributed = is_multi_node + && !is_value_source + && self.ctx.get_settings().get_enable_distributed_replace()?; + let table_is_empty = base_snapshot.segments.is_empty(); + let table_level_range_index = base_snapshot.summary.col_stats.clone(); + let mut purge_info = None; + let (mut root, select_ctx) = self + .connect_input_source( + self.ctx.clone(), + &self.plan.source, + self.plan.schema(), + &mut purge_info, + ) + .await?; + // remove top exchange + if let PhysicalPlan::Exchange(Exchange { input, .. }) = root.as_ref() { + root = input.clone(); + } + if is_distributed { + root = Box::new(PhysicalPlan::Exchange(Exchange { + plan_id: 0, + input: root, + kind: common_sql::executor::FragmentKind::Expansive, + keys: vec![], + })); + } + + let max_num_pruning_columns = self + .ctx + .get_settings() + .get_replace_into_bloom_pruning_max_column_number()?; + let bloom_filter_column_indexes = if !table.cluster_keys(self.ctx.clone()).is_empty() { + fuse_table + .choose_bloom_filter_columns(&on_conflicts, max_num_pruning_columns) + .await? + } else { + vec![] + }; + + root = Box::new(PhysicalPlan::Deduplicate(Deduplicate { + input: root, + on_conflicts: on_conflicts.clone(), + bloom_filter_column_indexes: bloom_filter_column_indexes.clone(), + table_is_empty, + table_info: table_info.clone(), + catalog_info: catalog.info(), + select_ctx, + table_schema: plan.schema.clone(), + table_level_range_index, + need_insert: true, + })); + root = Box::new(PhysicalPlan::ReplaceInto(ReplaceInto { + input: root, + block_thresholds: fuse_table.get_block_thresholds(), + table_info: table_info.clone(), + catalog_info: catalog.info(), + on_conflicts, + bloom_filter_column_indexes, + segments: base_snapshot + .segments + .clone() + .into_iter() + .enumerate() + .collect(), + need_insert: true, + })); + if is_distributed { + root = Box::new(PhysicalPlan::Exchange(Exchange { + plan_id: 0, + input: root, + kind: common_sql::executor::FragmentKind::Merge, + keys: vec![], + })); + } + root = Box::new(PhysicalPlan::MutationAggregate(Box::new( + MutationAggregate { + input: root, + snapshot: (*base_snapshot).clone(), + table_info: table_info.clone(), + catalog_info: catalog.info(), + mutation_kind: MutationKind::Replace, + }, + ))); + Ok((root, purge_info)) + } + fn check_on_conflicts(&self) -> Result<()> { if self.plan.on_conflict_fields.is_empty() { Err(ErrorCode::BadArguments( @@ -185,21 +313,32 @@ impl ReplaceInterpreter { ctx: Arc, source: &'a InsertInputSource, schema: DataSchemaRef, - ) -> Result { + purge_info: &mut Option<(Vec, StageInfo)>, + ) -> Result<(Box, Option)> { match source { - InsertInputSource::Values(data) => { - self.connect_value_source(ctx.clone(), schema.clone(), data) - } + InsertInputSource::Values(data) => self + .connect_value_source(schema.clone(), data) + .map(|x| (x, None)), InsertInputSource::SelectPlan(plan) => { - self.connect_query_plan_source(ctx.clone(), schema.clone(), plan) - .await + self.connect_query_plan_source(ctx.clone(), plan).await } InsertInputSource::Stage(plan) => match *plan.clone() { - Plan::Copy(copy_plan) => { - let interpreter = CopyInterpreter::try_create(ctx.clone(), *copy_plan.clone())?; - interpreter.execute2().await - } + Plan::Copy(copy_plan) => match copy_plan.as_ref() { + CopyPlan::IntoTable(copy_into_table_plan) => { + let interpreter = + CopyInterpreter::try_create(ctx.clone(), *copy_plan.clone())?; + let (physical_plan, files) = interpreter + .build_physical_plan(copy_into_table_plan) + .await?; + *purge_info = Some(( + files, + copy_into_table_plan.stage_table_info.stage_info.clone(), + )); + Ok((Box::new(physical_plan), None)) + } + _ => unreachable!("plan in InsertInputSource::Stage must be CopyIntoTable"), + }, _ => unreachable!("plan in InsertInputSource::Stag must be Copy"), }, _ => Err(ErrorCode::Unimplemented( @@ -210,35 +349,21 @@ impl ReplaceInterpreter { fn connect_value_source( &self, - ctx: Arc, schema: DataSchemaRef, value_data: &str, - ) -> Result { - let mut build_res = PipelineBuildResult::create(); - let settings = ctx.get_settings(); - build_res.main_pipeline.add_source( - |output| { - let name_resolution_ctx = NameResolutionContext::try_from(settings.as_ref())?; - let inner = ValueSource::new( - value_data.to_string(), - ctx.clone(), - name_resolution_ctx, - schema.clone(), - ); - AsyncSourcer::create(ctx.clone(), output, inner) - }, - 1, - )?; - Ok(build_res) + ) -> Result> { + Ok(Box::new(PhysicalPlan::AsyncSourcer(AsyncSourcerPlan { + value_data: value_data.to_string(), + schema, + }))) } #[async_backtrace::framed] async fn connect_query_plan_source<'a>( &'a self, ctx: Arc, - self_schema: DataSchemaRef, query_plan: &Plan, - ) -> Result { + ) -> Result<(Box, Option)> { let (s_expr, metadata, bind_context, formatted_ast) = match query_plan { Plan::Query { s_expr, @@ -259,42 +384,14 @@ impl ReplaceInterpreter { false, )?; - let mut build_res = select_interpreter.execute2().await?; - - let select_schema = query_plan.schema(); - let target_schema = self_schema; - if self.check_schema_cast(query_plan)? { - let func_ctx = ctx.get_function_context()?; - build_res.main_pipeline.add_transform( - |transform_input_port, transform_output_port| { - TransformCastSchema::try_create( - transform_input_port, - transform_output_port, - select_schema.clone(), - target_schema.clone(), - func_ctx.clone(), - ) - }, - )?; - } - - Ok(build_res) - } - - // TODO duplicated - fn check_schema_cast(&self, plan: &Plan) -> Result { - let output_schema = &self.plan.schema; - let select_schema = plan.schema(); - - // validate schema - if select_schema.fields().len() < output_schema.fields().len() { - return Err(ErrorCode::BadArguments( - "Fields in select statement is less than expected", - )); - } - - // check if cast needed - let cast_needed = select_schema != DataSchema::from(output_schema.as_ref()).into(); - Ok(cast_needed) + let physical_plan = select_interpreter + .build_physical_plan() + .await + .map(Box::new)?; + let select_ctx = SelectCtx { + select_column_bindings: bind_context.columns.clone(), + select_schema: query_plan.schema(), + }; + Ok((physical_plan, Some(select_ctx))) } } diff --git a/src/query/service/src/pipelines/builders/copy.rs b/src/query/service/src/pipelines/builders/copy.rs index e41c08ef14a4b..5a584500913c6 100644 --- a/src/query/service/src/pipelines/builders/copy.rs +++ b/src/query/service/src/pipelines/builders/copy.rs @@ -29,8 +29,7 @@ use common_meta_app::principal::StageInfo; use common_meta_app::schema::TableCopiedFileInfo; use common_meta_app::schema::UpsertTableCopiedFileReq; use common_pipeline_core::Pipeline; -use common_sql::executor::CopyIntoTableFromQuery; -use common_sql::executor::DistributedCopyIntoTableFromStage; +use common_sql::executor::CopyIntoTable; use common_sql::plans::CopyIntoTableMode; use common_sql::plans::CopyIntoTablePlan; use common_storage::common_metrics::copy::metrics_inc_copy_purge_files_cost_milliseconds; @@ -47,48 +46,18 @@ use crate::pipelines::processors::transforms::TransformAddConstColumns; use crate::pipelines::processors::TransformCastSchema; use crate::sessions::QueryContext; -pub enum CopyPlanType { - CopyIntoTablePlanOption(CopyIntoTablePlan), - DistributedCopyIntoTableFromStage(DistributedCopyIntoTableFromStage), - // also distributed plan, but we think the real distributed part is the query - // so no "distributed" prefix here. - CopyIntoTableFromQuery(CopyIntoTableFromQuery), -} - pub fn build_append_data_pipeline( ctx: Arc, main_pipeline: &mut Pipeline, - plan: CopyPlanType, + plan: &CopyIntoTable, source_schema: Arc, to_table: Arc, ) -> Result<()> { - let plan_required_source_schema: DataSchemaRef; - let plan_required_values_schema: DataSchemaRef; - let plan_values_consts: Vec; - let plan_write_mode: CopyIntoTableMode; - - match plan { - CopyPlanType::CopyIntoTablePlanOption(plan) => { - plan_required_source_schema = plan.required_source_schema; - plan_required_values_schema = plan.required_values_schema; - plan_values_consts = plan.values_consts; - plan_write_mode = plan.write_mode; - } - CopyPlanType::DistributedCopyIntoTableFromStage(plan) => { - plan_required_source_schema = plan.required_source_schema; - plan_required_values_schema = plan.required_values_schema; - plan_values_consts = plan.values_consts; - plan_write_mode = plan.write_mode; - } - CopyPlanType::CopyIntoTableFromQuery(plan) => { - plan_required_source_schema = plan.required_source_schema; - plan_required_values_schema = plan.required_values_schema; - plan_values_consts = plan.values_consts; - plan_write_mode = plan.write_mode; - } - } - - if source_schema != plan_required_source_schema { + let plan_required_source_schema = &plan.required_source_schema; + let plan_values_consts = &plan.values_consts; + let plan_required_values_schema = &plan.required_values_schema; + let plan_write_mode = &plan.write_mode; + if &source_schema != plan_required_source_schema { // only parquet need cast let func_ctx = ctx.get_function_context()?; main_pipeline.add_transform(|transform_input_port, transform_output_port| { @@ -118,7 +87,7 @@ pub fn build_append_data_pipeline( ctx, main_pipeline, to_table.clone(), - plan_required_values_schema, + plan_required_values_schema.clone(), AppendMode::Copy, )?, CopyIntoTableMode::Replace => {} @@ -126,49 +95,56 @@ pub fn build_append_data_pipeline( ctx, main_pipeline, to_table.clone(), - plan_required_values_schema, + plan_required_values_schema.clone(), AppendMode::Copy, )?, } Ok(()) } -#[allow(clippy::too_many_arguments)] -pub fn build_commit_data_pipeline( - ctx: Arc, +pub async fn build_commit_data_pipeline( + ctx: &Arc, main_pipeline: &mut Pipeline, - stage_info: StageInfo, - to_table: Arc, - files: Vec, - copy_force_option: bool, - copy_purge_option: bool, - insert_overwrite_option: bool, + plan: &CopyIntoTablePlan, + files: &[StageFileInfo], ) -> Result<()> { + let to_table = ctx + .get_table( + plan.catalog_info.catalog_name(), + &plan.database_name, + &plan.table_name, + ) + .await?; // Source node will do: // 1. commit // 2. purge // commit let copied_files_meta_req = build_upsert_copied_files_to_meta_req( ctx.clone(), - to_table.clone(), - stage_info.clone(), - files.clone(), - copy_force_option, + to_table.as_ref(), + &plan.stage_table_info.stage_info, + files, + plan.force, )?; to_table.commit_insertion( ctx.clone(), main_pipeline, copied_files_meta_req, - insert_overwrite_option, + plan.write_mode.is_overwrite(), None, )?; // set on_finished callback. - set_copy_on_finished(ctx, files, copy_purge_option, stage_info, main_pipeline)?; + set_copy_on_finished( + ctx.clone(), + files.to_vec(), + plan.stage_table_info.stage_info.copy_options.purge, + plan.stage_table_info.stage_info.clone(), + main_pipeline, + )?; Ok(()) } - pub fn set_copy_on_finished( ctx: Arc, files: Vec, @@ -223,13 +199,13 @@ pub fn set_copy_on_finished( pub fn build_upsert_copied_files_to_meta_req( ctx: Arc, - to_table: Arc, - stage_info: StageInfo, - copied_files: Vec, + to_table: &dyn Table, + stage_info: &StageInfo, + copied_files: &[StageFileInfo], force: bool, ) -> Result> { let mut copied_file_tree = BTreeMap::new(); - for file in &copied_files { + for file in copied_files { // Short the etag to 7 bytes for less space in metasrv. let short_etag = file.etag.clone().map(|mut v| { v.truncate(7); @@ -275,7 +251,7 @@ fn fill_const_columns( pipeline: &mut Pipeline, input_schema: DataSchemaRef, output_schema: DataSchemaRef, - const_values: Vec, + const_values: &[Scalar], ) -> Result<()> { pipeline.add_transform(|transform_input_port, transform_output_port| { TransformAddConstColumns::try_create( @@ -284,7 +260,7 @@ fn fill_const_columns( transform_output_port, input_schema.clone(), output_schema.clone(), - const_values.clone(), + const_values.to_vec(), ) })?; Ok(()) diff --git a/src/query/service/src/pipelines/builders/mod.rs b/src/query/service/src/pipelines/builders/mod.rs index 0829d266f1507..36bc0cb58a1b7 100644 --- a/src/query/service/src/pipelines/builders/mod.rs +++ b/src/query/service/src/pipelines/builders/mod.rs @@ -19,7 +19,6 @@ pub use copy::build_append_data_pipeline; pub use copy::build_commit_data_pipeline; pub use copy::build_upsert_copied_files_to_meta_req; pub use copy::set_copy_on_finished; -pub use copy::CopyPlanType; pub use table::build_append2table_with_commit_pipeline; pub use table::build_append2table_without_commit_pipeline; pub use table::build_fill_missing_columns_pipeline; diff --git a/src/query/service/src/pipelines/mod.rs b/src/query/service/src/pipelines/mod.rs index 249c98d4e631e..266e1c6e482f0 100644 --- a/src/query/service/src/pipelines/mod.rs +++ b/src/query/service/src/pipelines/mod.rs @@ -28,3 +28,4 @@ pub use pipe::TransformPipeBuilder; pub use pipeline::Pipeline; pub use pipeline_build_res::PipelineBuildResult; pub use pipeline_builder::PipelineBuilder; +pub use pipeline_builder::ValueSource; diff --git a/src/query/service/src/pipelines/pipeline_builder.rs b/src/query/service/src/pipelines/pipeline_builder.rs index 0ea45ff6469ab..537c66679d3fc 100644 --- a/src/query/service/src/pipelines/pipeline_builder.rs +++ b/src/query/service/src/pipelines/pipeline_builder.rs @@ -13,12 +13,20 @@ // limitations under the License. use std::collections::HashMap; +use std::collections::VecDeque; use std::convert::TryFrom; +use std::io::BufRead; +use std::io::Cursor; +use std::ops::Not; use std::sync::Arc; use std::sync::Mutex; use std::time::Instant; +use aho_corasick::AhoCorasick; use async_channel::Receiver; +use common_ast::parser::parse_comma_separated_exprs; +use common_ast::parser::tokenize_sql; +use common_base::base::tokio::sync::Semaphore; use common_catalog::table::AppendMode; use common_exception::ErrorCode; use common_exception::Result; @@ -28,27 +36,36 @@ use common_expression::types::NumberDataType; use common_expression::with_hash_method; use common_expression::with_mappedhash_method; use common_expression::with_number_mapped_type; +use common_expression::ColumnBuilder; use common_expression::DataBlock; +use common_expression::DataSchema; use common_expression::DataSchemaRef; use common_expression::FunctionContext; use common_expression::HashMethodKind; use common_expression::SortColumnDescription; +use common_formats::FastFieldDecoderValues; use common_functions::aggregates::AggregateFunctionFactory; use common_functions::aggregates::AggregateFunctionRef; use common_functions::BUILTIN_FUNCTIONS; +use common_io::cursor_ext::ReadBytesExt; +use common_io::cursor_ext::ReadCheckPointExt; use common_pipeline_core::pipe::Pipe; use common_pipeline_core::pipe::PipeItem; use common_pipeline_core::processors::port::InputPort; +use common_pipeline_core::processors::port::OutputPort; use common_pipeline_core::processors::processor::ProcessorPtr; use common_pipeline_core::processors::Processor; use common_pipeline_sinks::EmptySink; use common_pipeline_sinks::Sinker; use common_pipeline_sinks::UnionReceiveSink; +use common_pipeline_sources::AsyncSource; +use common_pipeline_sources::AsyncSourcer; use common_pipeline_sources::OneBlockSource; use common_pipeline_transforms::processors::profile_wrapper::ProcessorProfileWrapper; use common_pipeline_transforms::processors::profile_wrapper::ProfileStub; use common_pipeline_transforms::processors::profile_wrapper::TransformProfileWrapper; use common_pipeline_transforms::processors::transforms::build_full_sort_pipeline; +use common_pipeline_transforms::processors::transforms::create_dummy_item; use common_pipeline_transforms::processors::transforms::Transformer; use common_profile::SharedProcessorProfiles; use common_sql::evaluator::BlockOperator; @@ -57,12 +74,13 @@ use common_sql::executor::AggregateExpand; use common_sql::executor::AggregateFinal; use common_sql::executor::AggregateFunctionDesc; use common_sql::executor::AggregatePartial; +use common_sql::executor::AsyncSourcerPlan; use common_sql::executor::ConstantTableScan; -use common_sql::executor::CopyIntoTableFromQuery; +use common_sql::executor::CopyIntoTable; +use common_sql::executor::CopyIntoTableSource; use common_sql::executor::CteScan; -use common_sql::executor::DeleteFinal; +use common_sql::executor::Deduplicate; use common_sql::executor::DeletePartial; -use common_sql::executor::DistributedCopyIntoTableFromStage; use common_sql::executor::DistributedInsertSelect; use common_sql::executor::EvalScalar; use common_sql::executor::ExchangeSink; @@ -72,26 +90,38 @@ use common_sql::executor::HashJoin; use common_sql::executor::Lambda; use common_sql::executor::Limit; use common_sql::executor::MaterializedCte; +use common_sql::executor::MutationAggregate; use common_sql::executor::PhysicalPlan; use common_sql::executor::Project; use common_sql::executor::ProjectSet; use common_sql::executor::RangeJoin; +use common_sql::executor::ReplaceInto; use common_sql::executor::RowFetch; use common_sql::executor::RuntimeFilterSource; +use common_sql::executor::SelectCtx; use common_sql::executor::Sort; use common_sql::executor::TableScan; use common_sql::executor::UnionAll; use common_sql::executor::Window; +use common_sql::BindContext; use common_sql::ColumnBinding; use common_sql::IndexType; +use common_sql::Metadata; +use common_sql::MetadataRef; +use common_sql::NameResolutionContext; use common_storage::DataOperator; use common_storages_factory::Table; use common_storages_fuse::operations::build_row_fetcher_pipeline; +use common_storages_fuse::operations::common::TransformSerializeSegment; +use common_storages_fuse::operations::replace_into::BroadcastProcessor; +use common_storages_fuse::operations::replace_into::ReplaceIntoProcessor; +use common_storages_fuse::operations::replace_into::UnbranchedReplaceIntoProcessor; use common_storages_fuse::operations::FillInternalColumnProcessor; -use common_storages_fuse::operations::MutationKind; use common_storages_fuse::operations::TransformSerializeBlock; use common_storages_fuse::FuseTable; use common_storages_stage::StageTable; +use once_cell::sync::Lazy; +use parking_lot::RwLock; use super::processors::transforms::FrameBound; use super::processors::transforms::WindowFunctionInfo; @@ -100,7 +130,6 @@ use crate::api::DefaultExchangeInjector; use crate::api::ExchangeInjector; use crate::pipelines::builders::build_append_data_pipeline; use crate::pipelines::builders::build_fill_missing_columns_pipeline; -use crate::pipelines::builders::CopyPlanType; use crate::pipelines::processors::transforms::build_partition_bucket; use crate::pipelines::processors::transforms::AggregateInjector; use crate::pipelines::processors::transforms::FinalSingleStateAggregator; @@ -220,75 +249,340 @@ impl PipelineBuilder { self.build_runtime_filter_source(runtime_filter_source) } PhysicalPlan::DeletePartial(delete) => self.build_delete_partial(delete), - PhysicalPlan::DeleteFinal(delete) => self.build_delete_final(delete), + PhysicalPlan::MutationAggregate(plan) => self.build_mutation_aggregate(plan), PhysicalPlan::RangeJoin(range_join) => self.build_range_join(range_join), - PhysicalPlan::DistributedCopyIntoTableFromStage(distributed_plan) => { - self.build_distributed_copy_into_table_from_stage(distributed_plan) - } - PhysicalPlan::CopyIntoTableFromQuery(copy_plan) => { - self.build_copy_into_table_from_query(copy_plan) - } PhysicalPlan::MaterializedCte(materialized_cte) => { self.build_materialized_cte(materialized_cte) } + PhysicalPlan::CopyIntoTable(copy) => self.build_copy_into_table(copy), + PhysicalPlan::AsyncSourcer(async_sourcer) => self.build_async_sourcer(async_sourcer), + PhysicalPlan::Deduplicate(deduplicate) => self.build_deduplicate(deduplicate), + PhysicalPlan::ReplaceInto(replace) => self.build_replace_into(replace), } } - fn build_copy_into_table_from_query( - &mut self, - copy_plan: &CopyIntoTableFromQuery, - ) -> Result<()> { - self.build_pipeline(©_plan.input)?; - // render_result for query - PipelineBuilder::render_result_set( - &self.ctx.get_function_context()?, - copy_plan.input.output_schema()?, - ©_plan.result_columns, + fn check_schema_cast( + select_schema: Arc, + output_schema: Arc, + ) -> Result { + // validate schema + if select_schema.fields().len() < output_schema.fields().len() { + return Err(ErrorCode::BadArguments( + "Fields in select statement is less than expected", + )); + } + + // check if cast needed + let cast_needed = select_schema != output_schema; + Ok(cast_needed) + } + + fn build_deduplicate(&mut self, deduplicate: &Deduplicate) -> Result<()> { + let Deduplicate { + input, + on_conflicts, + bloom_filter_column_indexes, + table_is_empty, + table_info, + catalog_info, + select_ctx, + table_level_range_index, + table_schema, + need_insert, + } = deduplicate; + let tbl = self + .ctx + .build_table_by_table_info(catalog_info, table_info, None)?; + let table = FuseTable::try_from_table(tbl.as_ref())?; + let target_schema: Arc = Arc::new(table_schema.clone().into()); + self.build_pipeline(input)?; + if let Some(SelectCtx { + select_column_bindings, + select_schema, + }) = select_ctx + { + PipelineBuilder::render_result_set( + &self.ctx.get_function_context()?, + input.output_schema()?, + select_column_bindings, + &mut self.main_pipeline, + false, + )?; + if Self::check_schema_cast(select_schema.clone(), target_schema.clone())? { + let func_ctx = self.ctx.get_function_context()?; + self.main_pipeline.add_transform( + |transform_input_port, transform_output_port| { + TransformCastSchema::try_create( + transform_input_port, + transform_output_port, + select_schema.clone(), + target_schema.clone(), + func_ctx.clone(), + ) + }, + )?; + } + } + + build_fill_missing_columns_pipeline( + self.ctx.clone(), &mut self.main_pipeline, - copy_plan.ignore_result, + tbl.clone(), + target_schema.clone(), )?; - let to_table = self.ctx.build_table_by_table_info( - ©_plan.catalog_info, - ©_plan.table_info, - None, - )?; - build_append_data_pipeline( + let _ = table.cluster_gen_for_append( self.ctx.clone(), &mut self.main_pipeline, - CopyPlanType::CopyIntoTableFromQuery(copy_plan.clone()), - copy_plan.query_source_schema.clone(), - to_table, + table.get_block_thresholds(), )?; + // 1. resize input to 1, since the UpsertTransform need to de-duplicate inputs "globally" + self.main_pipeline.try_resize(1)?; + + // 2. connect with ReplaceIntoProcessor + + // ┌──────────────────────┐ + // │ ├──┐ + // ┌─────────────┐ │ ├──┘ + // │ UpsertSource├─────►│ ReplaceIntoProcessor │ + // └─────────────┘ │ ├──┐ + // │ ├──┘ + // └──────────────────────┘ + // NOTE: here the pipe items of last pipe are arranged in the following order + // (0) -> output_port_append_data + // (1) -> output_port_merge_into_action + // the "downstream" is supposed to be connected with a processor which can process MergeIntoOperations + // in our case, it is the broadcast processor + let cluster_keys = table.cluster_keys(self.ctx.clone()); + if *need_insert { + let replace_into_processor = ReplaceIntoProcessor::create( + self.ctx.as_ref(), + on_conflicts.clone(), + cluster_keys, + bloom_filter_column_indexes.clone(), + table_schema.as_ref(), + *table_is_empty, + table_level_range_index.clone(), + )?; + self.main_pipeline + .add_pipe(replace_into_processor.into_pipe()); + } else { + let replace_into_processor = UnbranchedReplaceIntoProcessor::create( + self.ctx.as_ref(), + on_conflicts.clone(), + cluster_keys, + bloom_filter_column_indexes.clone(), + table_schema.as_ref(), + *table_is_empty, + table_level_range_index.clone(), + )?; + self.main_pipeline + .add_pipe(replace_into_processor.into_pipe()); + } Ok(()) } - fn build_distributed_copy_into_table_from_stage( - &mut self, - distributed_plan: &DistributedCopyIntoTableFromStage, - ) -> Result<()> { - let to_table = self.ctx.build_table_by_table_info( - &distributed_plan.catalog_info, - &distributed_plan.table_info, - None, + fn build_replace_into(&mut self, replace: &ReplaceInto) -> Result<()> { + let ReplaceInto { + input, + block_thresholds, + table_info, + on_conflicts, + bloom_filter_column_indexes, + catalog_info, + segments, + need_insert, + } = replace; + let max_threads = self.ctx.get_settings().get_max_threads()?; + let segment_partition_num = std::cmp::min(segments.len(), max_threads as usize); + let table = self + .ctx + .build_table_by_table_info(catalog_info, table_info, None)?; + let table = FuseTable::try_from_table(table.as_ref())?; + let cluster_stats_gen = + table.get_cluster_stats_gen(self.ctx.clone(), 0, *block_thresholds)?; + self.build_pipeline(input)?; + // connect to broadcast processor and append transform + let serialize_block_transform = TransformSerializeBlock::try_create( + self.ctx.clone(), + InputPort::create(), + OutputPort::create(), + table, + cluster_stats_gen, )?; - let stage_table_info = distributed_plan.stage_table_info.clone(); - let stage_table = StageTable::try_create(stage_table_info)?; - stage_table.set_block_thresholds(distributed_plan.thresholds); - let ctx = self.ctx.clone(); - let table_ctx: Arc = ctx.clone(); + let block_builder = serialize_block_transform.get_block_builder(); - stage_table.read_data(table_ctx, &distributed_plan.source, &mut self.main_pipeline)?; + let serialize_segment_transform = TransformSerializeSegment::new( + InputPort::create(), + OutputPort::create(), + table, + *block_thresholds, + ); + if !*need_insert { + if segment_partition_num == 0 { + return Ok(()); + } + let broadcast_processor = BroadcastProcessor::new(segment_partition_num); + self.main_pipeline + .add_pipe(Pipe::create(1, segment_partition_num, vec![ + broadcast_processor.into_pipe_item(), + ])); + let max_io_request = self.ctx.get_settings().get_max_storage_io_requests()?; + let io_request_semaphore = Arc::new(Semaphore::new(max_io_request as usize)); - // append data + let merge_into_operation_aggregators = table.merge_into_mutators( + self.ctx.clone(), + segment_partition_num, + block_builder, + on_conflicts.clone(), + bloom_filter_column_indexes.clone(), + segments, + io_request_semaphore, + )?; + self.main_pipeline.add_pipe(Pipe::create( + segment_partition_num, + segment_partition_num, + merge_into_operation_aggregators, + )); + return Ok(()); + } + + if segment_partition_num == 0 { + let dummy_item = create_dummy_item(); + // ┌──────────────────────┐ ┌──────────────────┐ + // │ ├──┬────────►│ SerializeBlock │ + // ┌─────────────┐ │ ├──┘ └──────────────────┘ + // │ UpsertSource├─────►│ ReplaceIntoProcessor │ + // └─────────────┘ │ ├──┐ ┌──────────────────┐ + // │ ├──┴────────►│ DummyTransform │ + // └──────────────────────┘ └──────────────────┘ + // wrap them into pipeline, order matters! + self.main_pipeline.add_pipe(Pipe::create(2, 2, vec![ + serialize_block_transform.into_pipe_item(), + dummy_item, + ])); + } else { + // ┌──────────────────────┐ ┌──────────────────┐ + // │ ├──┬────────►│ SerializeBlock │ + // ┌─────────────┐ │ ├──┘ └──────────────────┘ + // │ UpsertSource├─────►│ ReplaceIntoProcessor │ + // └─────────────┘ │ ├──┐ ┌──────────────────┐ + // │ ├──┴────────►│BroadcastProcessor│ + // └──────────────────────┘ └──────────────────┘ + let broadcast_processor = BroadcastProcessor::new(segment_partition_num); + // wrap them into pipeline, order matters! + self.main_pipeline + .add_pipe(Pipe::create(2, segment_partition_num + 1, vec![ + serialize_block_transform.into_pipe_item(), + broadcast_processor.into_pipe_item(), + ])); + }; + + // 4. connect with MergeIntoOperationAggregators + if segment_partition_num == 0 { + let dummy_item = create_dummy_item(); + self.main_pipeline.add_pipe(Pipe::create(2, 2, vec![ + serialize_segment_transform.into_pipe_item(), + dummy_item, + ])); + } else { + // ┌──────────────────┐ ┌────────────────┐ + // ────►│ SerializeBlock ├──────────────►│SerializeSegment│ + // └──────────────────┘ └────────────────┘ + // + // ┌───────────────────┐ ┌──────────────────────┐ + // ────►│ ├──┬──────────►│MergeIntoOperationAggr│ + // │ ├──┘ └──────────────────────┘ + // │ BroadcastProcessor│ + // │ ├──┐ ┌──────────────────────┐ + // │ ├──┴──────────►│MergeIntoOperationAggr│ + // │ │ └──────────────────────┘ + // │ ├──┐ + // │ ├──┴──────────►┌──────────────────────┐ + // └───────────────────┘ │MergeIntoOperationAggr│ + // └──────────────────────┘ + + let item_size = segment_partition_num + 1; + let mut pipe_items = Vec::with_capacity(item_size); + // setup the dummy transform + pipe_items.push(serialize_segment_transform.into_pipe_item()); + + let max_io_request = self.ctx.get_settings().get_max_storage_io_requests()?; + let io_request_semaphore = Arc::new(Semaphore::new(max_io_request as usize)); + + // setup the merge into operation aggregators + let mut merge_into_operation_aggregators = table.merge_into_mutators( + self.ctx.clone(), + segment_partition_num, + block_builder, + on_conflicts.clone(), + bloom_filter_column_indexes.clone(), + segments, + io_request_semaphore, + )?; + assert_eq!( + segment_partition_num, + merge_into_operation_aggregators.len() + ); + pipe_items.append(&mut merge_into_operation_aggregators); + + // extend the pipeline + assert_eq!(self.main_pipeline.output_len(), item_size); + assert_eq!(pipe_items.len(), item_size); + self.main_pipeline + .add_pipe(Pipe::create(item_size, item_size, pipe_items)); + } + Ok(()) + } + + fn build_async_sourcer(&mut self, async_sourcer: &AsyncSourcerPlan) -> Result<()> { + let settings = self.ctx.get_settings(); + self.main_pipeline.add_source( + |output| { + let name_resolution_ctx = NameResolutionContext::try_from(settings.as_ref())?; + let inner = ValueSource::new( + async_sourcer.value_data.clone(), + self.ctx.clone(), + name_resolution_ctx, + async_sourcer.schema.clone(), + ); + AsyncSourcer::create(self.ctx.clone(), output, inner) + }, + 1, + )?; + Ok(()) + } + + fn build_copy_into_table(&mut self, copy: &CopyIntoTable) -> Result<()> { + let to_table = + self.ctx + .build_table_by_table_info(©.catalog_info, ©.table_info, None)?; + let source_schema = match ©.source { + CopyIntoTableSource::Query(input) => { + self.build_pipeline(&input.plan)?; + Self::render_result_set( + &self.ctx.get_function_context()?, + input.plan.output_schema()?, + &input.result_columns, + &mut self.main_pipeline, + input.ignore_result, + )?; + input.query_source_schema.clone() + } + CopyIntoTableSource::Stage(source) => { + let stage_table = StageTable::try_create(copy.stage_table_info.clone())?; + stage_table.set_block_thresholds(to_table.get_block_thresholds()); + stage_table.read_data(self.ctx.clone(), source, &mut self.main_pipeline)?; + copy.required_source_schema.clone() + } + }; build_append_data_pipeline( - ctx, + self.ctx.clone(), &mut self.main_pipeline, - CopyPlanType::DistributedCopyIntoTableFromStage(distributed_plan.clone()), - distributed_plan.required_source_schema.clone(), + copy, + source_schema, to_table, )?; - Ok(()) } @@ -334,18 +628,18 @@ impl PipelineBuilder { /// +-----------------------+ +----------+ /// |TableMutationAggregator| ---> |CommitSink| /// +-----------------------+ +----------+ - fn build_delete_final(&mut self, delete: &DeleteFinal) -> Result<()> { - self.build_pipeline(&delete.input)?; + fn build_mutation_aggregate(&mut self, plan: &MutationAggregate) -> Result<()> { + self.build_pipeline(&plan.input)?; let table = self.ctx - .build_table_by_table_info(&delete.catalog_info, &delete.table_info, None)?; + .build_table_by_table_info(&plan.catalog_info, &plan.table_info, None)?; let table = FuseTable::try_from_table(table.as_ref())?; let ctx: Arc = self.ctx.clone(); table.chain_mutation_pipes( &ctx, &mut self.main_pipeline, - Arc::new(delete.snapshot.clone()), - MutationKind::Delete, + Arc::new(plan.snapshot.clone()), + plan.mutation_kind, )?; Ok(()) } @@ -1651,3 +1945,266 @@ impl PipelineBuilder { Ok(()) } } + +// Pre-generate the positions of `(`, `'` and `\` +static PATTERNS: &[&str] = &["(", "'", "\\"]; + +static INSERT_TOKEN_FINDER: Lazy = Lazy::new(|| AhoCorasick::new(PATTERNS).unwrap()); + +pub struct ValueSource { + data: String, + ctx: Arc, + name_resolution_ctx: NameResolutionContext, + bind_context: BindContext, + schema: DataSchemaRef, + metadata: MetadataRef, + is_finished: bool, +} + +#[async_trait::async_trait] +impl AsyncSource for ValueSource { + const NAME: &'static str = "ValueSource"; + const SKIP_EMPTY_DATA_BLOCK: bool = true; + + #[async_trait::unboxed_simple] + #[async_backtrace::framed] + async fn generate(&mut self) -> Result> { + if self.is_finished { + return Ok(None); + } + + // Use the number of '(' to estimate the number of rows + let mut estimated_rows = 0; + let mut positions = VecDeque::new(); + for mat in INSERT_TOKEN_FINDER.find_iter(&self.data) { + if mat.pattern() == 0.into() { + estimated_rows += 1; + continue; + } + positions.push_back(mat.start()); + } + + let mut reader = Cursor::new(self.data.as_bytes()); + let block = self + .read(estimated_rows, &mut reader, &mut positions) + .await?; + self.is_finished = true; + Ok(Some(block)) + } +} + +impl ValueSource { + pub fn new( + data: String, + ctx: Arc, + name_resolution_ctx: NameResolutionContext, + schema: DataSchemaRef, + ) -> Self { + let bind_context = BindContext::new(); + let metadata = Arc::new(RwLock::new(Metadata::default())); + + Self { + data, + ctx, + name_resolution_ctx, + schema, + bind_context, + metadata, + is_finished: false, + } + } + + #[async_backtrace::framed] + pub async fn read>( + &self, + estimated_rows: usize, + reader: &mut Cursor, + positions: &mut VecDeque, + ) -> Result { + let mut columns = self + .schema + .fields() + .iter() + .map(|f| ColumnBuilder::with_capacity(f.data_type(), estimated_rows)) + .collect::>(); + + let mut bind_context = self.bind_context.clone(); + + let format = self.ctx.get_format_settings()?; + let field_decoder = FastFieldDecoderValues::create_for_insert(format); + + for row in 0.. { + let _ = reader.ignore_white_spaces(); + if reader.eof() { + break; + } + // Not the first row + if row != 0 { + reader.must_ignore_byte(b',')?; + } + + self.parse_next_row( + &field_decoder, + reader, + &mut columns, + positions, + &mut bind_context, + self.metadata.clone(), + ) + .await?; + } + + let columns = columns + .into_iter() + .map(|col| col.build()) + .collect::>(); + Ok(DataBlock::new_from_columns(columns)) + } + + /// Parse single row value, like ('111', 222, 1 + 1) + #[async_backtrace::framed] + async fn parse_next_row>( + &self, + field_decoder: &FastFieldDecoderValues, + reader: &mut Cursor, + columns: &mut [ColumnBuilder], + positions: &mut VecDeque, + bind_context: &mut BindContext, + metadata: MetadataRef, + ) -> Result<()> { + let _ = reader.ignore_white_spaces(); + let col_size = columns.len(); + let start_pos_of_row = reader.checkpoint(); + + // Start of the row --- '(' + if !reader.ignore_byte(b'(') { + return Err(ErrorCode::BadDataValueType( + "Must start with parentheses".to_string(), + )); + } + // Ignore the positions in the previous row. + while let Some(pos) = positions.front() { + if *pos < start_pos_of_row as usize { + positions.pop_front(); + } else { + break; + } + } + + for col_idx in 0..col_size { + let _ = reader.ignore_white_spaces(); + let col_end = if col_idx + 1 == col_size { b')' } else { b',' }; + + let col = columns + .get_mut(col_idx) + .ok_or_else(|| ErrorCode::Internal("ColumnBuilder is None"))?; + + let (need_fallback, pop_count) = field_decoder + .read_field(col, reader, positions) + .map(|_| { + let _ = reader.ignore_white_spaces(); + let need_fallback = reader.ignore_byte(col_end).not(); + (need_fallback, col_idx + 1) + }) + .unwrap_or((true, col_idx)); + + // ColumnBuilder and expr-parser both will eat the end ')' of the row. + if need_fallback { + for col in columns.iter_mut().take(pop_count) { + col.pop(); + } + // rollback to start position of the row + reader.rollback(start_pos_of_row + 1); + skip_to_next_row(reader, 1)?; + let end_pos_of_row = reader.position(); + + // Parse from expression and append all columns. + reader.set_position(start_pos_of_row); + let row_len = end_pos_of_row - start_pos_of_row; + let buf = &reader.remaining_slice()[..row_len as usize]; + + let sql = std::str::from_utf8(buf).unwrap(); + let settings = self.ctx.get_settings(); + let sql_dialect = settings.get_sql_dialect()?; + let tokens = tokenize_sql(sql)?; + let exprs = parse_comma_separated_exprs(&tokens[1..tokens.len()], sql_dialect)?; + + let values = bind_context + .exprs_to_scalar( + exprs, + &self.schema, + self.ctx.clone(), + &self.name_resolution_ctx, + metadata, + ) + .await?; + + for (col, scalar) in columns.iter_mut().zip(values) { + col.push(scalar.as_ref()); + } + reader.set_position(end_pos_of_row); + return Ok(()); + } + } + + Ok(()) + } +} + +// Values |(xxx), (yyy), (zzz) +pub fn skip_to_next_row>(reader: &mut Cursor, mut balance: i32) -> Result<()> { + let _ = reader.ignore_white_spaces(); + + let mut quoted = false; + let mut escaped = false; + + while balance > 0 { + let buffer = reader.remaining_slice(); + if buffer.is_empty() { + break; + } + + let size = buffer.len(); + + let it = buffer + .iter() + .position(|&c| c == b'(' || c == b')' || c == b'\\' || c == b'\''); + + if let Some(it) = it { + let c = buffer[it]; + reader.consume(it + 1); + + if it == 0 && escaped { + escaped = false; + continue; + } + escaped = false; + + match c { + b'\\' => { + escaped = true; + continue; + } + b'\'' => { + quoted ^= true; + continue; + } + b')' => { + if !quoted { + balance -= 1; + } + } + b'(' => { + if !quoted { + balance += 1; + } + } + _ => {} + } + } else { + escaped = false; + reader.consume(size); + } + } + Ok(()) +} diff --git a/src/query/service/src/schedulers/fragments/fragmenter.rs b/src/query/service/src/schedulers/fragments/fragmenter.rs index 20296cc5d53a0..c4d1be1afcc70 100644 --- a/src/query/service/src/schedulers/fragments/fragmenter.rs +++ b/src/query/service/src/schedulers/fragments/fragmenter.rs @@ -16,8 +16,11 @@ use std::sync::Arc; use common_catalog::table_context::TableContext; use common_exception::Result; -use common_sql::executor::DistributedCopyIntoTableFromStage; +use common_sql::executor::CopyIntoTable; +use common_sql::executor::CopyIntoTableSource; use common_sql::executor::FragmentKind; +use common_sql::executor::QuerySource; +use common_sql::executor::ReplaceInto; use crate::api::BroadcastExchange; use crate::api::DataExchange; @@ -48,9 +51,12 @@ pub struct Fragmenter { /// SelectLeaf: visiting a source fragment of select statement. /// /// DeleteLeaf: visiting a source fragment of delete statement. +/// +/// Replace: visiting a fragment that contains a replace into plan. enum State { SelectLeaf, DeleteLeaf, + ReplaceInto, Other, } @@ -139,14 +145,37 @@ impl PhysicalPlanReplacer for Fragmenter { Ok(PhysicalPlan::TableScan(plan.clone())) } - fn replace_copy_into_table( + fn replace_replace_into( &mut self, - plan: &DistributedCopyIntoTableFromStage, + plan: &common_sql::executor::ReplaceInto, ) -> Result { - self.state = State::SelectLeaf; - Ok(PhysicalPlan::DistributedCopyIntoTableFromStage(Box::new( - plan.clone(), - ))) + let input = self.replace(&plan.input)?; + self.state = State::ReplaceInto; + + Ok(PhysicalPlan::ReplaceInto(ReplaceInto { + input: Box::new(input), + ..plan.clone() + })) + } + + // TODO(Sky): remove rebudant code + fn replace_copy_into_table(&mut self, plan: &CopyIntoTable) -> Result { + match &plan.source { + CopyIntoTableSource::Stage(_) => { + self.state = State::SelectLeaf; + Ok(PhysicalPlan::CopyIntoTable(Box::new(plan.clone()))) + } + CopyIntoTableSource::Query(query_ctx) => { + let input = self.replace(&query_ctx.plan)?; + Ok(PhysicalPlan::CopyIntoTable(Box::new(CopyIntoTable { + source: CopyIntoTableSource::Query(Box::new(QuerySource { + plan: input, + ..*query_ctx.clone() + })), + ..plan.clone() + }))) + } + } } fn replace_delete_partial( @@ -215,6 +244,7 @@ impl PhysicalPlanReplacer for Fragmenter { State::SelectLeaf => FragmentType::Source, State::DeleteLeaf => FragmentType::DeleteLeaf, State::Other => FragmentType::Intermediate, + State::ReplaceInto => FragmentType::ReplaceInto, }; self.state = State::Other; let exchange = Self::get_exchange( diff --git a/src/query/service/src/schedulers/fragments/plan_fragment.rs b/src/query/service/src/schedulers/fragments/plan_fragment.rs index 5094c26197cab..a456f7f7aae23 100644 --- a/src/query/service/src/schedulers/fragments/plan_fragment.rs +++ b/src/query/service/src/schedulers/fragments/plan_fragment.rs @@ -12,14 +12,21 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::sync::Arc; use common_catalog::plan::DataSourcePlan; use common_catalog::plan::Partitions; use common_exception::ErrorCode; use common_exception::Result; +use common_sql::executor::CopyIntoTable; +use common_sql::executor::CopyIntoTableSource; +use common_sql::executor::Deduplicate; use common_sql::executor::DeletePartial; -use common_sql::executor::DistributedCopyIntoTableFromStage; +use common_sql::executor::QuerySource; +use common_sql::executor::ReplaceInto; +use common_storages_fuse::TableContext; +use storages_common_table_meta::meta::Location; use crate::api::DataExchange; use crate::schedulers::Fragmenter; @@ -47,6 +54,8 @@ pub enum FragmentType { /// Leaf fragment of a delete plan, which contains /// a `DeletePartial` operator. DeleteLeaf, + /// Intermediate fragment of a replace into plan, which contains a `ReplaceInto` operator. + ReplaceInto, } #[derive(Clone)] @@ -125,6 +134,14 @@ impl PlanFragment { } actions.add_fragment_actions(fragment_actions)?; } + FragmentType::ReplaceInto => { + // Redistribute partitions + let mut fragment_actions = self.redistribute_replace_into(ctx)?; + if let Some(ref exchange) = self.exchange { + fragment_actions.set_exchange(exchange.clone()); + } + actions.add_fragment_actions(fragment_actions)?; + } } Ok(()) @@ -194,6 +211,68 @@ impl PlanFragment { Ok(fragment_actions) } + fn reshuffle( + executors: Vec, + partitions: Vec, + ) -> Result>> { + let num_parts = partitions.len(); + let num_executors = executors.len(); + let mut executors_sorted = executors; + executors_sorted.sort(); + let mut executor_part = HashMap::default(); + // the first num_parts % num_executors get parts_per_node parts + // the remaining get parts_per_node - 1 parts + let parts_per_node = (num_parts + num_executors - 1) / num_executors; + for (idx, executor) in executors_sorted.iter().enumerate() { + let begin = parts_per_node * idx; + let end = num_parts.min(parts_per_node * (idx + 1)); + let parts = partitions[begin..end].to_vec(); + executor_part.insert(executor.clone(), parts); + if end == num_parts && idx < num_executors - 1 { + // reach here only when num_executors > num_parts + executors_sorted[(idx + 1)..].iter().for_each(|executor| { + executor_part.insert(executor.clone(), vec![]); + }); + break; + } + } + + Ok(executor_part) + } + + fn redistribute_replace_into(&self, ctx: Arc) -> Result { + let plan = match &self.plan { + PhysicalPlan::ExchangeSink(plan) => plan, + _ => unreachable!("logic error"), + }; + let plan = match plan.input.as_ref() { + PhysicalPlan::ReplaceInto(plan) => plan, + _ => unreachable!("logic error"), + }; + let partitions = &plan.segments; + let executors = Fragmenter::get_executors(ctx.clone()); + let mut fragment_actions = QueryFragmentActions::create(self.fragment_id); + let partition_reshuffle = Self::reshuffle(executors, partitions.clone())?; + + let local_id = &ctx.get_cluster().local_id; + + for (executor, parts) in partition_reshuffle.iter() { + let mut plan = self.plan.clone(); + let need_insert = executor == local_id; + + let mut replace_replace_into = ReplaceReplaceInto { + partitions: parts.clone(), + need_insert, + }; + plan = replace_replace_into.replace(&plan)?; + + fragment_actions + .add_action(QueryFragmentAction::create(executor.clone(), plan.clone())); + } + + Ok(fragment_actions) + } + fn get_read_source(&self) -> Result { if self.fragment_type != FragmentType::Source { return Err(ErrorCode::Internal( @@ -203,12 +282,14 @@ impl PlanFragment { let mut source = vec![]; - let mut collect_read_source = |plan: &PhysicalPlan| { - if let PhysicalPlan::TableScan(scan) = plan { - source.push(*scan.source.clone()) - } else if let PhysicalPlan::DistributedCopyIntoTableFromStage(distributed_plan) = plan { - source.push(*distributed_plan.source.clone()) + let mut collect_read_source = |plan: &PhysicalPlan| match plan { + PhysicalPlan::TableScan(scan) => source.push(*scan.source.clone()), + PhysicalPlan::CopyIntoTable(copy) => { + if let Some(stage) = copy.source.as_stage().cloned() { + source.push(*stage); + } } + _ => {} }; PhysicalPlan::traverse( @@ -244,16 +325,25 @@ impl PhysicalPlanReplacer for ReplaceReadSource { })) } - fn replace_copy_into_table( - &mut self, - plan: &DistributedCopyIntoTableFromStage, - ) -> Result { - Ok(PhysicalPlan::DistributedCopyIntoTableFromStage(Box::new( - DistributedCopyIntoTableFromStage { - source: Box::new(self.source.clone()), - ..plan.clone() - }, - ))) + fn replace_copy_into_table(&mut self, plan: &CopyIntoTable) -> Result { + match &plan.source { + CopyIntoTableSource::Query(query_ctx) => { + let input = self.replace(&query_ctx.plan)?; + Ok(PhysicalPlan::CopyIntoTable(Box::new(CopyIntoTable { + source: CopyIntoTableSource::Query(Box::new(QuerySource { + plan: input, + ..*query_ctx.clone() + })), + ..plan.clone() + }))) + } + CopyIntoTableSource::Stage(_) => { + Ok(PhysicalPlan::CopyIntoTable(Box::new(CopyIntoTable { + source: CopyIntoTableSource::Stage(Box::new(self.source.clone())), + ..plan.clone() + }))) + } + } } } @@ -269,3 +359,30 @@ impl PhysicalPlanReplacer for ReplaceDeletePartial { }))) } } + +struct ReplaceReplaceInto { + pub partitions: Vec<(usize, Location)>, + pub need_insert: bool, +} + +impl PhysicalPlanReplacer for ReplaceReplaceInto { + fn replace_replace_into(&mut self, plan: &ReplaceInto) -> Result { + let input = self.replace(&plan.input)?; + Ok(PhysicalPlan::ReplaceInto(ReplaceInto { + input: Box::new(input), + need_insert: self.need_insert, + segments: self.partitions.clone(), + ..plan.clone() + })) + } + + fn replace_deduplicate(&mut self, plan: &Deduplicate) -> Result { + let input = self.replace(&plan.input)?; + Ok(PhysicalPlan::Deduplicate(Deduplicate { + input: Box::new(input), + need_insert: self.need_insert, + table_is_empty: self.partitions.is_empty(), + ..plan.clone() + })) + } +} diff --git a/src/query/service/tests/it/storages/fuse/operations/replace_into.rs b/src/query/service/tests/it/storages/fuse/operations/replace_into.rs index 9ec8102697858..d708070db1a9b 100644 --- a/src/query/service/tests/it/storages/fuse/operations/replace_into.rs +++ b/src/query/service/tests/it/storages/fuse/operations/replace_into.rs @@ -29,6 +29,7 @@ fn test_partition() -> Result<()> { let segments = (0..number_segment) .map(|idx| (format!("{idx}"), format_version)) .collect::>(); + let segments: Vec<_> = segments.into_iter().enumerate().collect(); for _ in 0..100 { let num_partition: usize = if number_segment == 1 { @@ -42,7 +43,7 @@ fn test_partition() -> Result<()> { assert_eq!(partitions.len(), num_partition); // check segments - let origin = segments.iter().enumerate(); + let origin = &segments; let segment_of_chunks = partitions .iter() .flatten() @@ -50,8 +51,8 @@ fn test_partition() -> Result<()> { .collect::>(); for (origin_idx, origin_location) in origin { - let (seg_idx, seg_location) = segment_of_chunks[origin_idx]; - assert_eq!(origin_idx, *seg_idx); + let (seg_idx, seg_location) = segment_of_chunks[*origin_idx]; + assert_eq!(origin_idx, seg_idx); assert_eq!(origin_location, seg_location); } } diff --git a/src/query/service/tests/it/storages/testdata/settings_table.txt b/src/query/service/tests/it/storages/testdata/settings_table.txt index 799708ee21e92..dfa70176f82e3 100644 --- a/src/query/service/tests/it/storages/testdata/settings_table.txt +++ b/src/query/service/tests/it/storages/testdata/settings_table.txt @@ -11,6 +11,7 @@ DB.Table: 'system'.'settings', Table: settings-table_id:1, ver:0, Engine: System | 'enable_bushy_join' | '0' | '0' | 'SESSION' | 'Enables generating a bushy join plan with the optimizer.' | 'UInt64' | | 'enable_cbo' | '1' | '1' | 'SESSION' | 'Enables cost-based optimization.' | 'UInt64' | | 'enable_distributed_copy_into' | '0' | '0' | 'SESSION' | 'Enable distributed execution of copy into.' | 'UInt64' | +| 'enable_distributed_replace_into' | '0' | '0' | 'SESSION' | 'Enable distributed execution of replace into.' | 'UInt64' | | 'enable_dphyp' | '1' | '1' | 'SESSION' | 'Enables dphyp join order algorithm.' | 'UInt64' | | 'enable_query_result_cache' | '0' | '0' | 'SESSION' | 'Enables caching query results to improve performance for identical queries.' | 'UInt64' | | 'enable_replace_into_bloom_pruning' | '1' | '1' | 'SESSION' | 'Enables bloom pruning for replace-into statement.' | 'UInt64' | diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index ed041f6dabcf2..87e496fa27ca0 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -328,6 +328,12 @@ impl DefaultSettings { possible_values: None, display_in_show_settings: true, }), + ("enable_distributed_replace_into", DefaultSettingValue { + value: UserSettingValue::UInt64(0), + desc: "Enable distributed execution of replace into.", + possible_values: None, + display_in_show_settings: true, + }), ("enable_aggregating_index_scan", DefaultSettingValue { value: UserSettingValue::UInt64(1), desc: "Enable scanning aggregating index data while querying.", diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index c9b1bfbfd0aa2..732309600b2e6 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -383,6 +383,14 @@ impl Settings { self.try_set_u64("enable_distributed_copy_into", u64::from(val)) } + pub fn get_enable_distributed_replace(&self) -> Result { + Ok(self.try_get_u64("enable_distributed_replace_into")? != 0) + } + + pub fn set_enable_distributed_replace(&self, val: bool) -> Result<()> { + self.try_set_u64("enable_distributed_repalce_into", u64::from(val)) + } + pub fn get_enable_aggregating_index_scan(&self) -> Result { Ok(self.try_get_u64("enable_aggregating_index_scan")? != 0) } diff --git a/src/query/sql/src/executor/format.rs b/src/query/sql/src/executor/format.rs index 409218b327a09..ddd2ffef2560f 100644 --- a/src/query/sql/src/executor/format.rs +++ b/src/query/sql/src/executor/format.rs @@ -23,16 +23,15 @@ use super::AggregateExpand; use super::AggregateFinal; use super::AggregateFunctionDesc; use super::AggregatePartial; -use super::CopyIntoTableFromQuery; -use super::DeleteFinal; +use super::CopyIntoTable; use super::DeletePartial; -use super::DistributedCopyIntoTableFromStage; use super::EvalScalar; use super::Exchange; use super::Filter; use super::HashJoin; use super::Lambda; use super::Limit; +use super::MutationAggregate; use super::PhysicalPlan; use super::Project; use super::ProjectSet; @@ -184,7 +183,7 @@ fn to_format_tree( PhysicalPlan::DeletePartial(plan) => { delete_partial_to_format_tree(plan.as_ref(), metadata, profs) } - PhysicalPlan::DeleteFinal(plan) => { + PhysicalPlan::MutationAggregate(plan) => { delete_final_to_format_tree(plan.as_ref(), metadata, profs) } PhysicalPlan::ProjectSet(plan) => project_set_to_format_tree(plan, metadata, profs), @@ -193,10 +192,10 @@ fn to_format_tree( runtime_filter_source_to_format_tree(plan, metadata, profs) } PhysicalPlan::RangeJoin(plan) => range_join_to_format_tree(plan, metadata, profs), - PhysicalPlan::DistributedCopyIntoTableFromStage(plan) => { - distributed_copy_into_table_from_stage(plan) - } - PhysicalPlan::CopyIntoTableFromQuery(plan) => copy_into_table_from_query(plan), + PhysicalPlan::CopyIntoTable(plan) => copy_into_table(plan), + PhysicalPlan::AsyncSourcer(_) => Ok(FormatTreeNode::new("AsyncSourcer".to_string())), + PhysicalPlan::Deduplicate(_) => Ok(FormatTreeNode::new("Deduplicate".to_string())), + PhysicalPlan::ReplaceInto(_) => Ok(FormatTreeNode::new("Replace".to_string())), PhysicalPlan::CteScan(plan) => cte_scan_to_format_tree(plan), PhysicalPlan::MaterializedCte(plan) => { materialized_cte_to_format_tree(plan, metadata, profs) @@ -231,25 +230,10 @@ fn append_profile_info( } } -fn distributed_copy_into_table_from_stage( - plan: &DistributedCopyIntoTableFromStage, -) -> Result> { - Ok(FormatTreeNode::new(format!( - "copy into table {}.{}.{} from {:?}", - plan.catalog_info.catalog_name(), - plan.database_name, - plan.table_name, - plan.source - ))) -} - -fn copy_into_table_from_query(plan: &CopyIntoTableFromQuery) -> Result> { +fn copy_into_table(plan: &CopyIntoTable) -> Result> { Ok(FormatTreeNode::new(format!( - "copy into table {}.{}.{} from {:?}", - plan.catalog_info.catalog_name(), - plan.database_name, - plan.table_name, - plan.input + "CopyIntoTable: {}", + plan.table_info ))) } @@ -998,7 +982,7 @@ fn delete_partial_to_format_tree( } fn delete_final_to_format_tree( - plan: &DeleteFinal, + plan: &MutationAggregate, metadata: &MetadataRef, prof_span_set: &SharedProcessorProfiles, ) -> Result> { diff --git a/src/query/sql/src/executor/physical_plan.rs b/src/query/sql/src/executor/physical_plan.rs index 30fa77a91b037..66b27c6438daa 100644 --- a/src/query/sql/src/executor/physical_plan.rs +++ b/src/query/sql/src/executor/physical_plan.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::BTreeMap; +use std::collections::HashMap; use std::fmt::Display; use std::fmt::Formatter; @@ -26,6 +27,7 @@ use common_expression::types::DataType; use common_expression::types::NumberDataType; use common_expression::BlockThresholds; use common_expression::Column; +use common_expression::ColumnId; use common_expression::DataBlock; use common_expression::DataField; use common_expression::DataSchemaRef; @@ -39,6 +41,9 @@ use common_functions::BUILTIN_FUNCTIONS; use common_meta_app::schema::CatalogInfo; use common_meta_app::schema::TableInfo; use common_storage::StageFileInfo; +use enum_as_inner::EnumAsInner; +use storages_common_table_meta::meta::ColumnStatistics; +use storages_common_table_meta::meta::Location; use storages_common_table_meta::meta::TableSnapshot; use crate::executor::explain::PlanStatsInfo; @@ -840,66 +845,41 @@ impl UnionAll { } #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -pub struct DistributedCopyIntoTableFromStage { - pub plan_id: u32, +pub struct CopyIntoTable { pub catalog_info: CatalogInfo, - pub database_name: String, - pub table_name: String, - // ... into table() .. -> pub required_values_schema: DataSchemaRef, - // (1, ?, 'a', ?) -> (1, 'a') pub values_consts: Vec, - // (1, ?, 'a', ?) -> (?, ?) pub required_source_schema: DataSchemaRef, - pub write_mode: CopyIntoTableMode, pub validation_mode: ValidationMode, pub force: bool, - pub stage_table_info: StageTableInfo, - pub source: Box, - - pub thresholds: BlockThresholds, pub files: Vec, pub table_info: TableInfo, + + pub source: CopyIntoTableSource, } -impl DistributedCopyIntoTableFromStage { - pub fn output_schema(&self) -> Result { - Ok(DataSchemaRef::default()) - } +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, EnumAsInner)] +pub enum CopyIntoTableSource { + Query(Box), + Stage(Box), } #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -pub struct CopyIntoTableFromQuery { - pub plan_id: u32, - pub catalog_info: CatalogInfo, - pub database_name: String, - pub table_name: String, - - pub required_values_schema: DataSchemaRef, // ... into table() .. -> - pub values_consts: Vec, // (1, ?, 'a', ?) -> (1, 'a') - pub required_source_schema: DataSchemaRef, // (1, ?, 'a', ?) -> (?, ?) - // these three fields are used for query result render +pub struct QuerySource { + pub plan: PhysicalPlan, pub query_source_schema: DataSchemaRef, pub ignore_result: bool, pub result_columns: Vec, - - pub write_mode: CopyIntoTableMode, - pub validation_mode: ValidationMode, - pub force: bool, - - pub stage_table_info: StageTableInfo, - pub local_node_id: String, - // after build_query, we will make it as the input - pub input: Box, - pub files: Vec, - pub table_info: TableInfo, } -impl CopyIntoTableFromQuery { +impl CopyIntoTable { pub fn output_schema(&self) -> Result { - Ok(DataSchemaRef::default()) + match &self.source { + CopyIntoTableSource::Query(query_ctx) => Ok(query_ctx.query_source_schema.clone()), + CopyIntoTableSource::Stage(_) => Ok(self.required_values_schema.clone()), + } } } @@ -960,18 +940,74 @@ impl DeletePartial { } } -impl DeleteFinal { +impl MutationAggregate { pub fn output_schema(&self) -> Result { Ok(DataSchemaRef::default()) } } +// TODO(sky): make TableMutationAggregator distributed #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -pub struct DeleteFinal { +pub struct MutationAggregate { pub input: Box, pub snapshot: TableSnapshot, pub table_info: TableInfo, pub catalog_info: CatalogInfo, + pub mutation_kind: MutationKind, +} + +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, Copy)] +/// This is used by MutationAccumulator, so no compact here. +pub enum MutationKind { + Delete, + Update, + Replace, + Recluster, + Insert, +} + +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +pub struct AsyncSourcerPlan { + pub value_data: String, + pub schema: DataSchemaRef, +} + +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +pub struct Deduplicate { + pub input: Box, + pub on_conflicts: Vec, + pub bloom_filter_column_indexes: Vec, + pub table_is_empty: bool, + pub table_info: TableInfo, + pub catalog_info: CatalogInfo, + pub table_schema: TableSchemaRef, + pub select_ctx: Option, + pub table_level_range_index: HashMap, + pub need_insert: bool, +} + +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +pub struct SelectCtx { + pub select_column_bindings: Vec, + pub select_schema: DataSchemaRef, +} + +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +pub struct OnConflictField { + pub table_field: common_expression::TableField, + pub field_index: common_expression::FieldIndex, +} + +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +pub struct ReplaceInto { + pub input: Box, + pub block_thresholds: BlockThresholds, + pub table_info: TableInfo, + pub on_conflicts: Vec, + pub bloom_filter_column_indexes: Vec, + pub catalog_info: CatalogInfo, + pub segments: Vec<(usize, Location)>, + pub need_insert: bool, } #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] @@ -998,7 +1034,7 @@ impl Display for RefreshIndex { } } -#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, EnumAsInner)] pub enum PhysicalPlan { TableScan(TableScan), Filter(Filter), @@ -1024,17 +1060,19 @@ pub enum PhysicalPlan { /// For insert into ... select ... in cluster DistributedInsertSelect(Box), - /// add distributed copy into table from @stage - DistributedCopyIntoTableFromStage(Box), - /// /// add distributed copy into table from query - CopyIntoTableFromQuery(Box), /// Synthesized by fragmenter ExchangeSource(ExchangeSource), ExchangeSink(ExchangeSink), - /// For distributed delete + /// Delete DeletePartial(Box), - DeleteFinal(Box), + MutationAggregate(Box), + /// Copy into table + CopyIntoTable(Box), + /// Replace + AsyncSourcer(AsyncSourcerPlan), + Deduplicate(Deduplicate), + ReplaceInto(ReplaceInto), } impl PhysicalPlan { @@ -1070,13 +1108,17 @@ impl PhysicalPlan { PhysicalPlan::DistributedInsertSelect(v) => v.plan_id, PhysicalPlan::ExchangeSource(v) => v.plan_id, PhysicalPlan::ExchangeSink(v) => v.plan_id, - PhysicalPlan::DeletePartial(_) | PhysicalPlan::DeleteFinal(_) => unreachable!(), - // for distributed_copy_into_table, planId is useless - PhysicalPlan::DistributedCopyIntoTableFromStage(v) => v.plan_id, - PhysicalPlan::CopyIntoTableFromQuery(v) => v.plan_id, PhysicalPlan::CteScan(v) => v.plan_id, PhysicalPlan::MaterializedCte(v) => v.plan_id, PhysicalPlan::ConstantTableScan(v) => v.plan_id, + PhysicalPlan::DeletePartial(_) + | PhysicalPlan::MutationAggregate(_) + | PhysicalPlan::CopyIntoTable(_) + | PhysicalPlan::AsyncSourcer(_) + | PhysicalPlan::Deduplicate(_) + | PhysicalPlan::ReplaceInto(_) => { + unreachable!() + } } } @@ -1103,13 +1145,15 @@ impl PhysicalPlan { PhysicalPlan::ProjectSet(plan) => plan.output_schema(), PhysicalPlan::RuntimeFilterSource(plan) => plan.output_schema(), PhysicalPlan::DeletePartial(plan) => plan.output_schema(), - PhysicalPlan::DeleteFinal(plan) => plan.output_schema(), + PhysicalPlan::MutationAggregate(plan) => plan.output_schema(), PhysicalPlan::RangeJoin(plan) => plan.output_schema(), - PhysicalPlan::DistributedCopyIntoTableFromStage(plan) => plan.output_schema(), - PhysicalPlan::CopyIntoTableFromQuery(plan) => plan.output_schema(), + PhysicalPlan::CopyIntoTable(plan) => plan.output_schema(), PhysicalPlan::CteScan(plan) => plan.output_schema(), PhysicalPlan::MaterializedCte(plan) => plan.output_schema(), PhysicalPlan::ConstantTableScan(plan) => plan.output_schema(), + PhysicalPlan::AsyncSourcer(_) + | PhysicalPlan::Deduplicate(_) + | PhysicalPlan::ReplaceInto(_) => Ok(DataSchemaRef::default()), } } @@ -1136,12 +1180,12 @@ impl PhysicalPlan { PhysicalPlan::ProjectSet(_) => "Unnest".to_string(), PhysicalPlan::RuntimeFilterSource(_) => "RuntimeFilterSource".to_string(), PhysicalPlan::DeletePartial(_) => "DeletePartial".to_string(), - PhysicalPlan::DeleteFinal(_) => "DeleteFinal".to_string(), + PhysicalPlan::MutationAggregate(_) => "MutationAggregate".to_string(), PhysicalPlan::RangeJoin(_) => "RangeJoin".to_string(), - PhysicalPlan::DistributedCopyIntoTableFromStage(_) => { - "DistributedCopyIntoTableFromStage".to_string() - } - PhysicalPlan::CopyIntoTableFromQuery(_) => "CopyIntoTableFromQuery".to_string(), + PhysicalPlan::CopyIntoTable(_) => "CopyIntoTable".to_string(), + PhysicalPlan::AsyncSourcer(_) => "AsyncSourcer".to_string(), + PhysicalPlan::Deduplicate(_) => "Deduplicate".to_string(), + PhysicalPlan::ReplaceInto(_) => "Replace".to_string(), PhysicalPlan::CteScan(_) => "PhysicalCteScan".to_string(), PhysicalPlan::MaterializedCte(_) => "PhysicalMaterializedCte".to_string(), PhysicalPlan::ConstantTableScan(_) => "PhysicalConstantTableScan".to_string(), @@ -1177,7 +1221,7 @@ impl PhysicalPlan { Box::new(std::iter::once(plan.input.as_ref())) } PhysicalPlan::DeletePartial(_plan) => Box::new(std::iter::empty()), - PhysicalPlan::DeleteFinal(plan) => Box::new(std::iter::once(plan.input.as_ref())), + PhysicalPlan::MutationAggregate(plan) => Box::new(std::iter::once(plan.input.as_ref())), PhysicalPlan::ProjectSet(plan) => Box::new(std::iter::once(plan.input.as_ref())), PhysicalPlan::RuntimeFilterSource(plan) => Box::new( std::iter::once(plan.left_side.as_ref()) @@ -1186,8 +1230,10 @@ impl PhysicalPlan { PhysicalPlan::RangeJoin(plan) => Box::new( std::iter::once(plan.left.as_ref()).chain(std::iter::once(plan.right.as_ref())), ), - PhysicalPlan::DistributedCopyIntoTableFromStage(_) => Box::new(std::iter::empty()), - PhysicalPlan::CopyIntoTableFromQuery(_) => Box::new(std::iter::empty()), + PhysicalPlan::CopyIntoTable(_) => Box::new(std::iter::empty()), + PhysicalPlan::AsyncSourcer(_) => Box::new(std::iter::empty()), + PhysicalPlan::Deduplicate(plan) => Box::new(std::iter::once(plan.input.as_ref())), + PhysicalPlan::ReplaceInto(plan) => Box::new(std::iter::once(plan.input.as_ref())), PhysicalPlan::MaterializedCte(plan) => Box::new( std::iter::once(plan.left.as_ref()).chain(std::iter::once(plan.right.as_ref())), ), @@ -1210,8 +1256,6 @@ impl PhysicalPlan { PhysicalPlan::DistributedInsertSelect(plan) => plan.input.try_find_single_data_source(), PhysicalPlan::ProjectSet(plan) => plan.input.try_find_single_data_source(), PhysicalPlan::RowFetch(plan) => plan.input.try_find_single_data_source(), - PhysicalPlan::CopyIntoTableFromQuery(plan) => plan.input.try_find_single_data_source(), - PhysicalPlan::DistributedCopyIntoTableFromStage(plan) => Some(&plan.source), PhysicalPlan::RuntimeFilterSource(_) | PhysicalPlan::UnionAll(_) | PhysicalPlan::ExchangeSource(_) @@ -1222,9 +1266,13 @@ impl PhysicalPlan { | PhysicalPlan::AggregateFinal(_) | PhysicalPlan::AggregatePartial(_) | PhysicalPlan::DeletePartial(_) - | PhysicalPlan::DeleteFinal(_) - | PhysicalPlan::CteScan(_) - | PhysicalPlan::ConstantTableScan(_) => None, + | PhysicalPlan::MutationAggregate(_) + | PhysicalPlan::CopyIntoTable(_) + | PhysicalPlan::AsyncSourcer(_) + | PhysicalPlan::Deduplicate(_) + | PhysicalPlan::ReplaceInto(_) + | PhysicalPlan::ConstantTableScan(_) + | PhysicalPlan::CteScan(_) => None, } } } diff --git a/src/query/sql/src/executor/physical_plan_display.rs b/src/query/sql/src/executor/physical_plan_display.rs index 5a3a4fddb8703..92d12ebcaa253 100644 --- a/src/query/sql/src/executor/physical_plan_display.rs +++ b/src/query/sql/src/executor/physical_plan_display.rs @@ -19,12 +19,14 @@ use common_functions::BUILTIN_FUNCTIONS; use itertools::Itertools; use super::AggregateExpand; -use super::CopyIntoTableFromQuery; -use super::DeleteFinal; +use super::AsyncSourcerPlan; +use super::CopyIntoTable; +use super::Deduplicate; use super::DeletePartial; -use super::DistributedCopyIntoTableFromStage; use super::DistributedInsertSelect; +use super::MutationAggregate; use super::ProjectSet; +use super::ReplaceInto; use super::RowFetch; use crate::executor::AggregateFinal; use crate::executor::AggregatePartial; @@ -83,17 +85,15 @@ impl<'a> Display for PhysicalPlanIndentFormatDisplay<'a> { PhysicalPlan::UnionAll(union_all) => write!(f, "{}", union_all)?, PhysicalPlan::DistributedInsertSelect(insert_select) => write!(f, "{}", insert_select)?, PhysicalPlan::DeletePartial(delete) => write!(f, "{}", delete)?, - PhysicalPlan::DeleteFinal(delete) => write!(f, "{}", delete)?, + PhysicalPlan::MutationAggregate(mutation) => write!(f, "{}", mutation)?, PhysicalPlan::ProjectSet(unnest) => write!(f, "{}", unnest)?, PhysicalPlan::Lambda(lambda) => write!(f, "{}", lambda)?, PhysicalPlan::RuntimeFilterSource(plan) => write!(f, "{}", plan)?, PhysicalPlan::RangeJoin(plan) => write!(f, "{}", plan)?, - PhysicalPlan::DistributedCopyIntoTableFromStage(copy_into_table_from_stage) => { - write!(f, "{}", copy_into_table_from_stage)? - } - PhysicalPlan::CopyIntoTableFromQuery(copy_into_table_from_query) => { - write!(f, "{}", copy_into_table_from_query)? - } + PhysicalPlan::CopyIntoTable(copy_into_table) => write!(f, "{}", copy_into_table)?, + PhysicalPlan::AsyncSourcer(async_sourcer) => write!(f, "{}", async_sourcer)?, + PhysicalPlan::Deduplicate(deduplicate) => write!(f, "{}", deduplicate)?, + PhysicalPlan::ReplaceInto(replace) => write!(f, "{}", replace)?, PhysicalPlan::CteScan(cte_scan) => write!(f, "{}", cte_scan)?, PhysicalPlan::MaterializedCte(plan) => write!(f, "{}", plan)?, PhysicalPlan::ConstantTableScan(scan) => write!(f, "{}", scan)?, @@ -400,20 +400,14 @@ impl Display for DeletePartial { } } -impl Display for DeleteFinal { +impl Display for MutationAggregate { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "DeleteFinal") + write!(f, "MutationAggregate") } } -impl Display for DistributedCopyIntoTableFromStage { +impl Display for CopyIntoTable { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "DistributedCopyIntoTableFromStage") - } -} - -impl Display for CopyIntoTableFromQuery { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "CopyIntoTableFromQuery") + write!(f, "CopyIntoTable") } } @@ -439,6 +433,24 @@ impl Display for ProjectSet { } } +impl Display for AsyncSourcerPlan { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "AsyncSourcer") + } +} + +impl Display for Deduplicate { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "Deduplicate") + } +} + +impl Display for ReplaceInto { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "Replace") + } +} + impl Display for Lambda { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { let scalars = self diff --git a/src/query/sql/src/executor/physical_plan_visitor.rs b/src/query/sql/src/executor/physical_plan_visitor.rs index ad009be5f0bd6..0c7b4077eabc1 100644 --- a/src/query/sql/src/executor/physical_plan_visitor.rs +++ b/src/query/sql/src/executor/physical_plan_visitor.rs @@ -17,10 +17,11 @@ use common_exception::Result; use super::AggregateExpand; use super::AggregateFinal; use super::AggregatePartial; -use super::CopyIntoTableFromQuery; -use super::DeleteFinal; +use super::AsyncSourcerPlan; +use super::CopyIntoTable; +use super::CopyIntoTableSource; +use super::Deduplicate; use super::DeletePartial; -use super::DistributedCopyIntoTableFromStage; use super::DistributedInsertSelect; use super::EvalScalar; use super::Exchange; @@ -30,9 +31,12 @@ use super::Filter; use super::HashJoin; use super::Lambda; use super::Limit; +use super::MutationAggregate; use super::PhysicalPlan; use super::Project; use super::ProjectSet; +use super::QuerySource; +use super::ReplaceInto; use super::RowFetch; use super::Sort; use super::TableScan; @@ -69,14 +73,12 @@ pub trait PhysicalPlanReplacer { PhysicalPlan::Lambda(plan) => self.replace_lambda(plan), PhysicalPlan::RuntimeFilterSource(plan) => self.replace_runtime_filter_source(plan), PhysicalPlan::DeletePartial(plan) => self.replace_delete_partial(plan), - PhysicalPlan::DeleteFinal(plan) => self.replace_delete_final(plan), + PhysicalPlan::MutationAggregate(plan) => self.replace_delete_final(plan), PhysicalPlan::RangeJoin(plan) => self.replace_range_join(plan), - PhysicalPlan::DistributedCopyIntoTableFromStage(plan) => { - self.replace_copy_into_table(plan) - } - PhysicalPlan::CopyIntoTableFromQuery(plan) => { - self.replace_copy_into_table_from_query(plan) - } + PhysicalPlan::CopyIntoTable(plan) => self.replace_copy_into_table(plan), + PhysicalPlan::AsyncSourcer(plan) => self.replace_async_sourcer(plan), + PhysicalPlan::Deduplicate(plan) => self.replace_deduplicate(plan), + PhysicalPlan::ReplaceInto(plan) => self.replace_replace_into(plan), PhysicalPlan::MaterializedCte(plan) => self.replace_materialized_cte(plan), PhysicalPlan::ConstantTableScan(plan) => self.replace_constant_table_scan(plan), } @@ -319,26 +321,22 @@ pub trait PhysicalPlanReplacer { })) } - fn replace_copy_into_table( - &mut self, - plan: &DistributedCopyIntoTableFromStage, - ) -> Result { - Ok(PhysicalPlan::DistributedCopyIntoTableFromStage(Box::new( - plan.clone(), - ))) - } - - fn replace_copy_into_table_from_query( - &mut self, - plan: &CopyIntoTableFromQuery, - ) -> Result { - let input = self.replace(&plan.input)?; - Ok(PhysicalPlan::CopyIntoTableFromQuery(Box::new( - CopyIntoTableFromQuery { - input: Box::new(input), - ..plan.clone() - }, - ))) + fn replace_copy_into_table(&mut self, plan: &CopyIntoTable) -> Result { + match &plan.source { + CopyIntoTableSource::Stage(_) => { + Ok(PhysicalPlan::CopyIntoTable(Box::new(plan.clone()))) + } + CopyIntoTableSource::Query(query_ctx) => { + let input = self.replace(&query_ctx.plan)?; + Ok(PhysicalPlan::CopyIntoTable(Box::new(CopyIntoTable { + source: CopyIntoTableSource::Query(Box::new(QuerySource { + plan: input, + ..*query_ctx.clone() + })), + ..plan.clone() + }))) + } + } } fn replace_insert_select(&mut self, plan: &DistributedInsertSelect) -> Result { @@ -362,12 +360,34 @@ pub trait PhysicalPlanReplacer { Ok(PhysicalPlan::DeletePartial(Box::new(plan.clone()))) } - fn replace_delete_final(&mut self, plan: &DeleteFinal) -> Result { + fn replace_delete_final(&mut self, plan: &MutationAggregate) -> Result { let input = self.replace(&plan.input)?; - Ok(PhysicalPlan::DeleteFinal(Box::new(DeleteFinal { + Ok(PhysicalPlan::MutationAggregate(Box::new( + MutationAggregate { + input: Box::new(input), + ..plan.clone() + }, + ))) + } + + fn replace_async_sourcer(&mut self, plan: &AsyncSourcerPlan) -> Result { + Ok(PhysicalPlan::AsyncSourcer(plan.clone())) + } + + fn replace_deduplicate(&mut self, plan: &Deduplicate) -> Result { + let input = self.replace(&plan.input)?; + Ok(PhysicalPlan::Deduplicate(Deduplicate { input: Box::new(input), ..plan.clone() - }))) + })) + } + + fn replace_replace_into(&mut self, plan: &ReplaceInto) -> Result { + let input = self.replace(&plan.input)?; + Ok(PhysicalPlan::ReplaceInto(ReplaceInto { + input: Box::new(input), + ..plan.clone() + })) } fn replace_project_set(&mut self, plan: &ProjectSet) -> Result { @@ -418,6 +438,7 @@ impl PhysicalPlan { visit(plan); match plan { PhysicalPlan::TableScan(_) + | PhysicalPlan::AsyncSourcer(_) | PhysicalPlan::CteScan(_) | PhysicalPlan::ConstantTableScan(_) => {} PhysicalPlan::Filter(plan) => { @@ -474,10 +495,12 @@ impl PhysicalPlan { PhysicalPlan::Lambda(plan) => { Self::traverse(&plan.input, pre_visit, visit, post_visit) } - PhysicalPlan::DistributedCopyIntoTableFromStage(_) => {} - PhysicalPlan::CopyIntoTableFromQuery(plan) => { - Self::traverse(&plan.input, pre_visit, visit, post_visit); - } + PhysicalPlan::CopyIntoTable(plan) => match &plan.source { + CopyIntoTableSource::Query(input) => { + Self::traverse(&input.plan, pre_visit, visit, post_visit); + } + CopyIntoTableSource::Stage(_) => {} + }, PhysicalPlan::RuntimeFilterSource(plan) => { Self::traverse(&plan.left_side, pre_visit, visit, post_visit); Self::traverse(&plan.right_side, pre_visit, visit, post_visit); @@ -487,7 +510,13 @@ impl PhysicalPlan { Self::traverse(&plan.right, pre_visit, visit, post_visit); } PhysicalPlan::DeletePartial(_) => {} - PhysicalPlan::DeleteFinal(plan) => { + PhysicalPlan::MutationAggregate(plan) => { + Self::traverse(&plan.input, pre_visit, visit, post_visit); + } + PhysicalPlan::Deduplicate(plan) => { + Self::traverse(&plan.input, pre_visit, visit, post_visit); + } + PhysicalPlan::ReplaceInto(plan) => { Self::traverse(&plan.input, pre_visit, visit, post_visit); } PhysicalPlan::MaterializedCte(plan) => { diff --git a/src/query/sql/src/executor/profile.rs b/src/query/sql/src/executor/profile.rs index 2b30b9cbf10ea..dd486679720da 100644 --- a/src/query/sql/src/executor/profile.rs +++ b/src/query/sql/src/executor/profile.rs @@ -491,9 +491,12 @@ fn flatten_plan_node_profile( plan_node_profs.push(prof); } PhysicalPlan::MaterializedCte(_) | PhysicalPlan::ConstantTableScan(_) => todo!(), - PhysicalPlan::DeletePartial(_) | PhysicalPlan::DeleteFinal(_) => unreachable!(), - PhysicalPlan::DistributedCopyIntoTableFromStage(_) => unreachable!(), - PhysicalPlan::CopyIntoTableFromQuery(_) => unreachable!(), + PhysicalPlan::DeletePartial(_) + | PhysicalPlan::MutationAggregate(_) + | PhysicalPlan::CopyIntoTable(_) + | PhysicalPlan::AsyncSourcer(_) + | PhysicalPlan::Deduplicate(_) + | PhysicalPlan::ReplaceInto(_) => unreachable!(), } Ok(()) diff --git a/src/query/sql/src/planner/binder/copy.rs b/src/query/sql/src/planner/binder/copy.rs index 310196300929f..0525378bafd22 100644 --- a/src/query/sql/src/planner/binder/copy.rs +++ b/src/query/sql/src/planner/binder/copy.rs @@ -672,7 +672,7 @@ impl<'a> Binder { select_list: &'a [SelectTarget], alias: &Option, ) -> Result { - let need_copy_file_infos = plan.collect_files(&self.ctx).await?; + let need_copy_file_infos = plan.collect_files(self.ctx.as_ref()).await?; if need_copy_file_infos.is_empty() { return Ok(Plan::Copy(Box::new(CopyPlan::NoFileToCopy))); diff --git a/src/query/sql/src/planner/plans/copy.rs b/src/query/sql/src/planner/plans/copy.rs index 9dc6125e6fedb..bad54093a4cfc 100644 --- a/src/query/sql/src/planner/plans/copy.rs +++ b/src/query/sql/src/planner/plans/copy.rs @@ -15,7 +15,6 @@ use std::fmt::Debug; use std::fmt::Formatter; use std::str::FromStr; -use std::sync::Arc; use std::time::Instant; use common_catalog::plan::StageTableInfo; @@ -98,7 +97,7 @@ pub struct CopyIntoTablePlan { } impl CopyIntoTablePlan { - pub async fn collect_files(&self, ctx: &Arc) -> Result> { + pub async fn collect_files(&self, ctx: &dyn TableContext) -> Result> { ctx.set_status_info("begin to list files"); let start = Instant::now(); diff --git a/src/query/storages/fuse/src/fuse_table.rs b/src/query/storages/fuse/src/fuse_table.rs index 21e911f73d7c8..d286cf2210f98 100644 --- a/src/query/storages/fuse/src/fuse_table.rs +++ b/src/query/storages/fuse/src/fuse_table.rs @@ -15,6 +15,7 @@ use std::any::Any; use std::collections::BTreeMap; use std::collections::HashMap; +use std::panic; use std::str; use std::str::FromStr; use std::sync::Arc; @@ -545,12 +546,11 @@ impl Table for FuseTable { #[async_backtrace::framed] async fn replace_into( &self, - ctx: Arc, - pipeline: &mut Pipeline, - on_conflict_fields: Vec, + _ctx: Arc, + _pipeline: &mut Pipeline, + _on_conflict_fields: Vec, ) -> Result<()> { - self.build_replace_pipeline(ctx, on_conflict_fields, pipeline) - .await + panic!("deprecated") } fn commit_insertion( diff --git a/src/query/storages/fuse/src/operations/commit.rs b/src/query/storages/fuse/src/operations/commit.rs index bef9b30988da1..0939a8c5f2279 100644 --- a/src/query/storages/fuse/src/operations/commit.rs +++ b/src/query/storages/fuse/src/operations/commit.rs @@ -34,6 +34,7 @@ use common_meta_types::MatchSeq; use common_pipeline_core::processors::processor::ProcessorPtr; use common_pipeline_core::Pipeline; use common_pipeline_transforms::processors::transforms::AsyncAccumulatingTransformer; +use common_sql::executor::MutationKind; use log::debug; use log::info; use log::warn; @@ -61,7 +62,6 @@ use crate::operations::common::AbortOperation; use crate::operations::common::AppendGenerator; use crate::operations::common::CommitSink; use crate::operations::common::ConflictResolveContext; -use crate::operations::common::MutationKind; use crate::operations::common::TableMutationAggregator; use crate::operations::common::TransformSerializeSegment; use crate::statistics::merge_statistics; diff --git a/src/query/storages/fuse/src/operations/common/mod.rs b/src/query/storages/fuse/src/operations/common/mod.rs index 145c1cd41ffb2..9c986e143f57a 100644 --- a/src/query/storages/fuse/src/operations/common/mod.rs +++ b/src/query/storages/fuse/src/operations/common/mod.rs @@ -20,7 +20,6 @@ mod snapshot_generator; pub use abort_operation::AbortOperation; pub use mutation_accumulator::MutationAccumulator; -pub use mutation_accumulator::MutationKind; pub use mutation_log::*; pub use processors::*; pub use snapshot_generator::*; diff --git a/src/query/storages/fuse/src/operations/common/mutation_accumulator.rs b/src/query/storages/fuse/src/operations/common/mutation_accumulator.rs index e23745c80b411..a484657004fcc 100644 --- a/src/query/storages/fuse/src/operations/common/mutation_accumulator.rs +++ b/src/query/storages/fuse/src/operations/common/mutation_accumulator.rs @@ -23,6 +23,7 @@ use common_catalog::table_context::TableContext; use common_exception::Result; use common_expression::BlockThresholds; use common_expression::TableSchemaRef; +use common_sql::executor::MutationKind; use log::info; use opendal::Operator; use storages_common_table_meta::meta::BlockMeta; @@ -79,16 +80,6 @@ impl BlockMutations { } } -#[derive(Clone, Copy)] -/// This is used by MutationAccumulator, so no compact here. -pub enum MutationKind { - Delete, - Update, - Replace, - Recluster, - Insert, -} - pub struct MutationAccumulator { ctx: Arc, schema: TableSchemaRef, diff --git a/src/query/storages/fuse/src/operations/common/processors/transform_mutation_aggregator.rs b/src/query/storages/fuse/src/operations/common/processors/transform_mutation_aggregator.rs index 45e769b4c3642..92a522e77d70d 100644 --- a/src/query/storages/fuse/src/operations/common/processors/transform_mutation_aggregator.rs +++ b/src/query/storages/fuse/src/operations/common/processors/transform_mutation_aggregator.rs @@ -20,11 +20,11 @@ use common_exception::Result; use common_expression::BlockMetaInfoPtr; use common_expression::DataBlock; use common_pipeline_transforms::processors::transforms::transform_accumulating_async::AsyncAccumulatingTransform; +use common_sql::executor::MutationKind; use log::debug; use storages_common_table_meta::meta::Location; use storages_common_table_meta::meta::Statistics; -use crate::operations::common::mutation_accumulator::MutationKind; use crate::operations::common::CommitMeta; use crate::operations::common::MutationAccumulator; use crate::operations::common::MutationLogs; diff --git a/src/query/storages/fuse/src/operations/mod.rs b/src/query/storages/fuse/src/operations/mod.rs index 7c26c688769f6..77a50bda99d55 100644 --- a/src/query/storages/fuse/src/operations/mod.rs +++ b/src/query/storages/fuse/src/operations/mod.rs @@ -26,8 +26,8 @@ mod read; mod read_data; mod read_partitions; mod recluster; -mod replace; -mod replace_into; +pub mod replace; +pub mod replace_into; mod revert; mod truncate; mod update; @@ -36,7 +36,6 @@ pub mod util; pub use agg_index_sink::AggIndexSink; pub use common::BlockMetaIndex; pub use common::FillInternalColumnProcessor; -pub use common::MutationKind; pub use common::TransformSerializeBlock; pub use compact::CompactOptions; pub use mutation::BlockCompactMutator; diff --git a/src/query/storages/fuse/src/operations/recluster.rs b/src/query/storages/fuse/src/operations/recluster.rs index 9e7c1ab93bb89..c28edfe16f6eb 100644 --- a/src/query/storages/fuse/src/operations/recluster.rs +++ b/src/query/storages/fuse/src/operations/recluster.rs @@ -29,13 +29,13 @@ use common_pipeline_core::processors::processor::ProcessorPtr; use common_pipeline_transforms::processors::transforms::build_merge_sort_pipeline; use common_pipeline_transforms::processors::transforms::AsyncAccumulatingTransformer; use common_sql::evaluator::CompoundBlockOperator; +use common_sql::executor::MutationKind; use log::info; use storages_common_table_meta::meta::BlockMeta; use crate::operations::common::BlockMetaIndex; use crate::operations::common::CommitSink; use crate::operations::common::MutationGenerator; -use crate::operations::common::MutationKind; use crate::operations::common::TableMutationAggregator; use crate::operations::common::TransformSerializeBlock; use crate::operations::common::TransformSerializeSegment; diff --git a/src/query/storages/fuse/src/operations/replace.rs b/src/query/storages/fuse/src/operations/replace.rs index 25dca314735f2..196d78b4bdd51 100644 --- a/src/query/storages/fuse/src/operations/replace.rs +++ b/src/query/storages/fuse/src/operations/replace.rs @@ -17,41 +17,30 @@ use std::sync::Arc; use common_base::base::tokio::sync::Semaphore; use common_catalog::table::Table; use common_catalog::table_context::TableContext; -use common_exception::ErrorCode; use common_exception::Result; use common_expression::FieldIndex; -use common_expression::TableField; -use common_pipeline_core::pipe::Pipe; use common_pipeline_core::pipe::PipeItem; -use common_pipeline_core::processors::port::InputPort; -use common_pipeline_core::processors::port::OutputPort; use common_pipeline_core::processors::processor::ProcessorPtr; -use common_pipeline_transforms::processors::transforms::create_dummy_item; use common_pipeline_transforms::processors::transforms::AsyncAccumulatingTransformer; -use log::info; +use common_sql::executor::MutationKind; +use common_sql::executor::OnConflictField; use rand::prelude::SliceRandom; use storages_common_index::BloomIndex; use storages_common_table_meta::meta::Location; use storages_common_table_meta::meta::TableSnapshot; -use super::common::MutationKind; use crate::io::BlockBuilder; use crate::io::ReadSettings; use crate::operations::common::CommitSink; use crate::operations::common::MutationGenerator; use crate::operations::common::TableMutationAggregator; -use crate::operations::common::TransformSerializeBlock; -use crate::operations::common::TransformSerializeSegment; use crate::operations::mutation::SegmentIndex; -use crate::operations::replace_into::BroadcastProcessor; use crate::operations::replace_into::MergeIntoOperationAggregator; -use crate::operations::replace_into::OnConflictField; -use crate::operations::replace_into::ReplaceIntoProcessor; use crate::pipelines::Pipeline; use crate::FuseTable; impl FuseTable { - // The pipeline going to be constructed + // The big picture of the replace into pipeline: // // - If table is not empty: // @@ -104,224 +93,18 @@ impl FuseTable { // └─────►│ResizeProcessor(1) ├──────►│TableMutationAggregator├────────►│ CommitSink │ // └───────────────────┘ └───────────────────────┘ └───────────────────┘ - #[async_backtrace::framed] - pub async fn build_replace_pipeline<'a>( - &'a self, - ctx: Arc, - on_conflict_field_identifiers: Vec, - pipeline: &'a mut Pipeline, - ) -> Result<()> { - let schema = self.table_info.schema(); - - let mut on_conflicts = Vec::with_capacity(on_conflict_field_identifiers.len()); - for f in on_conflict_field_identifiers { - let field_name = f.name(); - let (field_index, _) = - schema - .column_with_name(field_name) - .ok_or(ErrorCode::Internal( - "not expected, on conflict field not found (after binding)", - ))?; - on_conflicts.push(OnConflictField { - table_field: f.clone(), - field_index, - }) - } - - let cluster_stats_gen = - self.cluster_gen_for_append(ctx.clone(), pipeline, self.get_block_thresholds())?; - - // 1. resize input to 1, since the UpsertTransform need to de-duplicate inputs "globally" - pipeline.try_resize(1)?; - - // 2. connect with ReplaceIntoProcessor - - // ┌──────────────────────┐ - // │ ├──┐ - // ┌─────────────┐ │ ├──┘ - // │ UpsertSource├─────►│ ReplaceIntoProcessor │ - // └─────────────┘ │ ├──┐ - // │ ├──┘ - // └──────────────────────┘ - // NOTE: here the pipe items of last pipe are arranged in the following order - // (0) -> output_port_append_data - // (1) -> output_port_merge_into_action - // the "downstream" is supposed to be connected with a processor which can process MergeIntoOperations - // in our case, it is the broadcast processor - - let base_snapshot = self.read_table_snapshot().await?.unwrap_or_else(|| { - Arc::new(TableSnapshot::new_empty_snapshot(schema.as_ref().clone())) - }); - - let table_is_empty = base_snapshot.segments.is_empty(); - let table_level_range_index = base_snapshot.summary.col_stats.clone(); - let cluster_keys = self.cluster_keys(ctx.clone()); - - // currently, we only try apply bloom filter pruning when the table is clustered - let bloom_filter_column_indexes: Vec = if !cluster_keys.is_empty() - && ctx.get_settings().get_enable_replace_into_bloom_pruning()? - { - let max_num_pruning_columns = ctx - .get_settings() - .get_replace_into_bloom_pruning_max_column_number()?; - self.choose_bloom_filter_columns(&on_conflicts, max_num_pruning_columns) - .await? - } else { - info!("replace-into, bloom filter pruning not enabled."); - vec![] - }; - - info!( - "replace-into, bloom filter field chosen, {:?}", - bloom_filter_column_indexes - .iter() - .map(|idx| (idx, on_conflicts[*idx].table_field.name.clone())) - .collect::>(), - ); - - let replace_into_processor = ReplaceIntoProcessor::create( - ctx.as_ref(), - on_conflicts.clone(), - cluster_keys, - bloom_filter_column_indexes.clone(), - schema.as_ref(), - table_is_empty, - table_level_range_index, - )?; - - pipeline.add_pipe(replace_into_processor.into_pipe()); - - // 3. connect to broadcast processor and append transform - - let max_threads = ctx.get_settings().get_max_threads()?; - let segment_partition_num = - std::cmp::min(base_snapshot.segments.len(), max_threads as usize); - - let serialize_block_transform = TransformSerializeBlock::try_create( - ctx.clone(), - InputPort::create(), - OutputPort::create(), - self, - cluster_stats_gen, - )?; - let block_builder = serialize_block_transform.get_block_builder(); - - let serialize_segment_transform = TransformSerializeSegment::new( - InputPort::create(), - OutputPort::create(), - self, - self.get_block_thresholds(), - ); - - if segment_partition_num == 0 { - let dummy_item = create_dummy_item(); - // ┌──────────────────────┐ ┌──────────────────┐ - // │ ├──┬────────►│ SerializeBlock │ - // ┌─────────────┐ │ ├──┘ └──────────────────┘ - // │ UpsertSource├─────►│ ReplaceIntoProcessor │ - // └─────────────┘ │ ├──┐ ┌──────────────────┐ - // │ ├──┴────────►│ DummyTransform │ - // └──────────────────────┘ └──────────────────┘ - // wrap them into pipeline, order matters! - pipeline.add_pipe(Pipe::create(2, 2, vec![ - serialize_block_transform.into_pipe_item(), - dummy_item, - ])); - } else { - // ┌──────────────────────┐ ┌──────────────────┐ - // │ ├──┬────────►│ SerializeBlock │ - // ┌─────────────┐ │ ├──┘ └──────────────────┘ - // │ UpsertSource├─────►│ ReplaceIntoProcessor │ - // └─────────────┘ │ ├──┐ ┌──────────────────┐ - // │ ├──┴────────►│BroadcastProcessor│ - // └──────────────────────┘ └──────────────────┘ - let broadcast_processor = BroadcastProcessor::new(segment_partition_num); - // wrap them into pipeline, order matters! - pipeline.add_pipe(Pipe::create(2, segment_partition_num + 1, vec![ - serialize_block_transform.into_pipe_item(), - broadcast_processor.into_pipe_item(), - ])); - }; - - // 4. connect with MergeIntoOperationAggregators - if segment_partition_num == 0 { - let dummy_item = create_dummy_item(); - pipeline.add_pipe(Pipe::create(2, 2, vec![ - serialize_segment_transform.into_pipe_item(), - dummy_item, - ])); - } else { - // ┌──────────────────┐ ┌────────────────┐ - // ────►│ SerializeBlock ├──────────────►│SerializeSegment│ - // └──────────────────┘ └────────────────┘ - // - // ┌───────────────────┐ ┌──────────────────────┐ - // ────►│ ├──┬──────────►│MergeIntoOperationAggr│ - // │ ├──┘ └──────────────────────┘ - // │ BroadcastProcessor│ - // │ ├──┐ ┌──────────────────────┐ - // │ ├──┴──────────►│MergeIntoOperationAggr│ - // │ │ └──────────────────────┘ - // │ ├──┐ - // │ ├──┴──────────►┌──────────────────────┐ - // └───────────────────┘ │MergeIntoOperationAggr│ - // └──────────────────────┘ - - let item_size = segment_partition_num + 1; - let mut pipe_items = Vec::with_capacity(item_size); - // setup the dummy transform - pipe_items.push(serialize_segment_transform.into_pipe_item()); - - let max_io_request = ctx.get_settings().get_max_storage_io_requests()?; - let io_request_semaphore = Arc::new(Semaphore::new(max_io_request as usize)); - - // setup the merge into operation aggregators - let mut merge_into_operation_aggregators = self - .merge_into_mutators( - ctx.clone(), - segment_partition_num, - block_builder, - on_conflicts.clone(), - bloom_filter_column_indexes, - &base_snapshot, - io_request_semaphore, - ) - .await?; - assert_eq!( - segment_partition_num, - merge_into_operation_aggregators.len() - ); - pipe_items.append(&mut merge_into_operation_aggregators); - - // extend the pipeline - assert_eq!(pipeline.output_len(), item_size); - assert_eq!(pipe_items.len(), item_size); - pipeline.add_pipe(Pipe::create(item_size, item_size, pipe_items)); - } - - // 5. connect with mutation pipes, the TableMutationAggregator, then CommitSink - // - // ┌───────────────────┐ ┌───────────────────────┐ ┌───────────────────┐ - // │ResizeProcessor(1) ├──────►│TableMutationAggregator├────────►│ CommitSink │ - // └───────────────────┘ └───────────────────────┘ └───────────────────┘ - self.chain_mutation_pipes(&ctx, pipeline, base_snapshot, MutationKind::Replace)?; - - Ok(()) - } - - #[async_backtrace::framed] #[allow(clippy::too_many_arguments)] - async fn merge_into_mutators( + pub fn merge_into_mutators( &self, ctx: Arc, num_partition: usize, block_builder: BlockBuilder, on_conflicts: Vec, bloom_filter_column_indexes: Vec, - table_snapshot: &TableSnapshot, + segments: &[(usize, Location)], io_request_semaphore: Arc, ) -> Result> { - let chunks = Self::partition_segments(&table_snapshot.segments, num_partition); + let chunks = Self::partition_segments(segments, num_partition); let read_settings = ReadSettings::from_ctx(&ctx)?; let mut items = Vec::with_capacity(num_partition); for chunk_of_segment_locations in chunks { @@ -343,17 +126,17 @@ impl FuseTable { } pub fn partition_segments( - segments: &[Location], + segments: &[(usize, Location)], num_partition: usize, ) -> Vec> { let chunk_size = segments.len() / num_partition; assert!(chunk_size >= 1); + let mut segments = segments.to_vec(); - let mut indexed_segment = segments.iter().enumerate().collect::>(); - indexed_segment.shuffle(&mut rand::thread_rng()); + segments.shuffle(&mut rand::thread_rng()); let mut chunks = Vec::with_capacity(num_partition); - for chunk in indexed_segment.chunks(chunk_size) { + for chunk in segments.chunks(chunk_size) { let mut segment_chunk = chunk .iter() .map(|(segment_idx, location)| (*segment_idx, (*location).clone())) @@ -415,7 +198,7 @@ impl FuseTable { // choose the bloom filter columns (from on-conflict fields). // columns with larger number of number-of-distinct-values, will be kept, is their types // are supported by bloom index. - async fn choose_bloom_filter_columns( + pub async fn choose_bloom_filter_columns( &self, on_conflicts: &[OnConflictField], max_num_columns: u64, diff --git a/src/query/storages/fuse/src/operations/replace_into/mod.rs b/src/query/storages/fuse/src/operations/replace_into/mod.rs index 8a173111eb4f2..42b7de4353360 100644 --- a/src/query/storages/fuse/src/operations/replace_into/mod.rs +++ b/src/query/storages/fuse/src/operations/replace_into/mod.rs @@ -19,9 +19,4 @@ mod processors; pub use processors::BroadcastProcessor; pub use processors::MergeIntoOperationAggregator; pub use processors::ReplaceIntoProcessor; - -#[derive(Clone, Debug)] -pub struct OnConflictField { - pub table_field: common_expression::TableField, - pub field_index: common_expression::FieldIndex, -} +pub use processors::UnbranchedReplaceIntoProcessor; diff --git a/src/query/storages/fuse/src/operations/replace_into/mutator/merge_into_mutator.rs b/src/query/storages/fuse/src/operations/replace_into/mutator/merge_into_mutator.rs index 2e38ee4f1f864..8a0a07a57c982 100644 --- a/src/query/storages/fuse/src/operations/replace_into/mutator/merge_into_mutator.rs +++ b/src/query/storages/fuse/src/operations/replace_into/mutator/merge_into_mutator.rs @@ -34,6 +34,7 @@ use common_expression::FieldIndex; use common_expression::Scalar; use common_expression::TableSchema; use common_sql::evaluator::BlockOperator; +use common_sql::executor::OnConflictField; use log::info; use log::warn; use opendal::Operator; @@ -78,8 +79,6 @@ use crate::operations::replace_into::meta::merge_into_operation_meta::MergeIntoO use crate::operations::replace_into::meta::merge_into_operation_meta::UniqueKeyDigest; use crate::operations::replace_into::mutator::column_hash::row_hash_of_columns; use crate::operations::replace_into::mutator::deletion_accumulator::DeletionAccumulator; -use crate::operations::replace_into::OnConflictField; - struct AggregationContext { segment_locations: AHashMap, // the fields specified in ON CONFLICT clause diff --git a/src/query/storages/fuse/src/operations/replace_into/mutator/mutator_replace_into.rs b/src/query/storages/fuse/src/operations/replace_into/mutator/mutator_replace_into.rs index 1fe4a06d04f2b..03854ebf1ecbc 100644 --- a/src/query/storages/fuse/src/operations/replace_into/mutator/mutator_replace_into.rs +++ b/src/query/storages/fuse/src/operations/replace_into/mutator/mutator_replace_into.rs @@ -38,6 +38,7 @@ use common_expression::TableSchema; use common_expression::Value; use common_functions::aggregates::eval_aggr; use common_functions::BUILTIN_FUNCTIONS; +use common_sql::executor::OnConflictField; use log::info; use storages_common_index::BloomIndex; use storages_common_table_meta::meta::ColumnStatistics; @@ -51,7 +52,6 @@ use crate::operations::replace_into::meta::merge_into_operation_meta::MergeIntoO use crate::operations::replace_into::meta::merge_into_operation_meta::UniqueKeyDigest; use crate::operations::replace_into::mutator::column_hash::row_hash_of_columns; use crate::operations::replace_into::mutator::column_hash::RowScalarValue; -use crate::operations::replace_into::OnConflictField; // Replace is somehow a simplified merge_into, which // - do insertion for "matched" branch diff --git a/src/query/storages/fuse/src/operations/replace_into/processors/mod.rs b/src/query/storages/fuse/src/operations/replace_into/processors/mod.rs index 39a8df54fa435..dc4827c29e82b 100644 --- a/src/query/storages/fuse/src/operations/replace_into/processors/mod.rs +++ b/src/query/storages/fuse/src/operations/replace_into/processors/mod.rs @@ -16,8 +16,10 @@ mod processor_broadcast; mod processor_replace_into; +mod processor_unbranched_replace_into; mod transform_merge_into_mutation_aggregator; pub use processor_broadcast::BroadcastProcessor; pub use processor_replace_into::ReplaceIntoProcessor; +pub use processor_unbranched_replace_into::UnbranchedReplaceIntoProcessor; pub use transform_merge_into_mutation_aggregator::MergeIntoOperationAggregator; diff --git a/src/query/storages/fuse/src/operations/replace_into/processors/processor_replace_into.rs b/src/query/storages/fuse/src/operations/replace_into/processors/processor_replace_into.rs index c1faa1710e4e7..ed98b09342be1 100644 --- a/src/query/storages/fuse/src/operations/replace_into/processors/processor_replace_into.rs +++ b/src/query/storages/fuse/src/operations/replace_into/processors/processor_replace_into.rs @@ -31,12 +31,12 @@ use common_pipeline_core::processors::port::OutputPort; use common_pipeline_core::processors::processor::Event; use common_pipeline_core::processors::processor::ProcessorPtr; use common_pipeline_core::processors::Processor; +use common_sql::executor::OnConflictField; use storages_common_table_meta::meta::ColumnStatistics; use crate::metrics::metrics_inc_replace_block_number_input; use crate::metrics::metrics_inc_replace_process_input_block_time_ms; use crate::operations::replace_into::mutator::mutator_replace_into::ReplaceIntoMutator; -use crate::operations::replace_into::OnConflictField; pub struct ReplaceIntoProcessor { replace_into_mutator: ReplaceIntoMutator, diff --git a/src/query/storages/fuse/src/operations/replace_into/processors/processor_unbranched_replace_into.rs b/src/query/storages/fuse/src/operations/replace_into/processors/processor_unbranched_replace_into.rs new file mode 100644 index 0000000000000..cc1b5c3b13784 --- /dev/null +++ b/src/query/storages/fuse/src/operations/replace_into/processors/processor_unbranched_replace_into.rs @@ -0,0 +1,162 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Instant; + +use common_catalog::table_context::TableContext; +use common_exception::Result; +use common_expression::ColumnId; +use common_expression::DataBlock; +use common_expression::FieldIndex; +use common_expression::RemoteExpr; +use common_expression::TableSchema; +use common_pipeline_core::pipe::Pipe; +use common_pipeline_core::pipe::PipeItem; +use common_pipeline_core::processors::port::InputPort; +use common_pipeline_core::processors::port::OutputPort; +use common_pipeline_core::processors::processor::Event; +use common_pipeline_core::processors::processor::ProcessorPtr; +use common_pipeline_core::processors::Processor; +use common_sql::executor::OnConflictField; +use storages_common_table_meta::meta::ColumnStatistics; + +use crate::metrics::metrics_inc_replace_process_input_block_time_ms; +use crate::operations::replace_into::mutator::mutator_replace_into::ReplaceIntoMutator; + +pub struct UnbranchedReplaceIntoProcessor { + replace_into_mutator: ReplaceIntoMutator, + + // stage data blocks + input_port: Arc, + output_port_merge_into_action: Arc, + + input_data: Option, + output_data_merge_into_action: Option, + + target_table_empty: bool, +} + +impl UnbranchedReplaceIntoProcessor { + pub fn create( + ctx: &dyn TableContext, + on_conflict_fields: Vec, + cluster_keys: Vec>, + bloom_filter_column_indexes: Vec, + table_schema: &TableSchema, + target_table_empty: bool, + table_range_idx: HashMap, + ) -> Result { + let replace_into_mutator = ReplaceIntoMutator::try_create( + ctx, + on_conflict_fields, + cluster_keys, + bloom_filter_column_indexes, + table_schema, + table_range_idx, + )?; + let input_port = InputPort::create(); + let output_port_merge_into_action = OutputPort::create(); + + Ok(Self { + replace_into_mutator, + input_port, + output_port_merge_into_action, + input_data: None, + output_data_merge_into_action: None, + target_table_empty, + }) + } + + pub fn into_pipe(self) -> Pipe { + let pipe_item = self.into_pipe_item(); + Pipe::create(1, 1, vec![pipe_item]) + } + + pub fn into_pipe_item(self) -> PipeItem { + let input = self.input_port.clone(); + let output_port_merge_into_action = self.output_port_merge_into_action.clone(); + let processor_ptr = ProcessorPtr::create(Box::new(self)); + PipeItem::create(processor_ptr, vec![input], vec![ + output_port_merge_into_action, + ]) + } +} + +#[async_trait::async_trait] +impl Processor for UnbranchedReplaceIntoProcessor { + fn name(&self) -> String { + "UnbranchedReplaceIntoProcessor".to_owned() + } + + fn as_any(&mut self) -> &mut dyn Any { + self + } + fn event(&mut self) -> Result { + let finished = + self.input_port.is_finished() && self.output_data_merge_into_action.is_none(); + + if finished { + self.output_port_merge_into_action.finish(); + return Ok(Event::Finished); + } + + let mut pushed_something = false; + if self.output_port_merge_into_action.can_push() { + if let Some(data) = self.output_data_merge_into_action.take() { + self.output_port_merge_into_action.push_data(Ok(data)); + pushed_something = true; + } + } + + if pushed_something { + Ok(Event::NeedConsume) + } else { + if self.input_data.is_some() { + return Ok(Event::Sync); + } + + if self.input_port.has_data() { + if self.output_data_merge_into_action.is_none() { + // no pending data (being sent to down streams) + self.input_data = Some(self.input_port.pull_data().unwrap()?); + Ok(Event::Sync) + } else { + // data pending + Ok(Event::NeedConsume) + } + } else { + self.input_port.set_need_data(); + Ok(Event::NeedData) + } + } + } + + fn process(&mut self) -> Result<()> { + if let Some(data_block) = self.input_data.take() { + let start = Instant::now(); + let merge_into_action = self.replace_into_mutator.process_input_block(&data_block)?; + metrics_inc_replace_process_input_block_time_ms(start.elapsed().as_millis() as u64); + if !self.target_table_empty { + self.output_data_merge_into_action = + Some(DataBlock::empty_with_meta(Box::new(merge_into_action))); + } + return Ok(()); + } + + Ok(()) + } +} diff --git a/src/query/storages/fuse/src/operations/update.rs b/src/query/storages/fuse/src/operations/update.rs index 002ac2ff1c375..729e54a34c370 100644 --- a/src/query/storages/fuse/src/operations/update.rs +++ b/src/query/storages/fuse/src/operations/update.rs @@ -28,11 +28,11 @@ use common_expression::TableSchema; use common_expression::ROW_ID_COL_NAME; use common_functions::BUILTIN_FUNCTIONS; use common_sql::evaluator::BlockOperator; +use common_sql::executor::MutationKind; use common_sql::plans::PREDICATE_COLUMN_NAME; use log::info; use storages_common_table_meta::meta::TableSnapshot; -use crate::operations::common::MutationKind; use crate::operations::common::TransformSerializeBlock; use crate::operations::mutation::MutationAction; use crate::operations::mutation::MutationSource; diff --git a/src/query/storages/random/src/random_table.rs b/src/query/storages/random/src/random_table.rs index 22cad449c147e..f38b9178c4b06 100644 --- a/src/query/storages/random/src/random_table.rs +++ b/src/query/storages/random/src/random_table.rs @@ -182,6 +182,14 @@ impl Table for RandomTable { ); } + if plan.parts.is_empty() { + let output = OutputPort::create(); + builder.add_source( + output.clone(), + RandomSource::create(ctx.clone(), output, output_schema, 0)?, + ); + } + pipeline.add_pipe(builder.finalize()); Ok(()) } diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0023_replace_into b/tests/sqllogictests/suites/base/09_fuse_engine/09_0023_replace_into index dc221b0f19ddd..2e436da70c052 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0023_replace_into +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0023_replace_into @@ -1,3 +1,6 @@ +statement ok +set enable_distributed_replace_into = 1; + statement ok DROP DATABASE IF EXISTS db_09_0023 diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0024_replace_into_issue_10861 b/tests/sqllogictests/suites/base/09_fuse_engine/09_0024_replace_into_issue_10861 index d79064b26801e..9a8141f92cb29 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0024_replace_into_issue_10861 +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0024_replace_into_issue_10861 @@ -1,3 +1,6 @@ +statement ok +set enable_distributed_replace_into = 1; + statement ok DROP DATABASE IF EXISTS issue_10861 diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0024_replace_into_issue_12126 b/tests/sqllogictests/suites/base/09_fuse_engine/09_0024_replace_into_issue_12126 index 438b220a5e1dc..271ebb0c43c63 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0024_replace_into_issue_12126 +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0024_replace_into_issue_12126 @@ -1,3 +1,6 @@ +statement ok +set enable_distributed_replace_into = 1; + statement ok DROP DATABASE IF EXISTS issue_12126 diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0024_replace_into_issue_12294 b/tests/sqllogictests/suites/base/09_fuse_engine/09_0024_replace_into_issue_12294 index 47a2f9cbc97b8..2144ec2903e24 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0024_replace_into_issue_12294 +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0024_replace_into_issue_12294 @@ -1,3 +1,6 @@ +statement ok +set enable_distributed_replace_into = 1; + statement ok DROP DATABASE IF EXISTS issue_12294 diff --git a/tests/suites/1_stateful/07_stage_attachment/07_0001_replace_with_stage.result b/tests/suites/1_stateful/07_stage_attachment/07_0001_replace_with_stage.result index 38ee0c8bacf7e..ec104925a6812 100644 --- a/tests/suites/1_stateful/07_stage_attachment/07_0001_replace_with_stage.result +++ b/tests/suites/1_stateful/07_stage_attachment/07_0001_replace_with_stage.result @@ -34,7 +34,6 @@ null 6 'HangZhou' 92 China 9 'Changsha' 91 China 10 'Hong Kong‘ 88 China -133 { "code": 4000, "message": "duplicated data detected in the values being replaced into (only the first one will be described): at row 7, [\"id\":10]" diff --git a/tests/suites/1_stateful/07_stage_attachment/07_0001_replace_with_stage.sh b/tests/suites/1_stateful/07_stage_attachment/07_0001_replace_with_stage.sh index 8e834d164bca5..dd0c4856b0fc4 100755 --- a/tests/suites/1_stateful/07_stage_attachment/07_0001_replace_with_stage.sh +++ b/tests/suites/1_stateful/07_stage_attachment/07_0001_replace_with_stage.sh @@ -49,7 +49,7 @@ echo "select * from sample order by id" | $MYSQL_CLIENT_CONNECT # duplicate value would show error and would not take effect aws --endpoint-url ${STORAGE_S3_ENDPOINT_URL} s3 cp s3://testbucket/admin/data/sample_3_duplicate.csv s3://testbucket/admin/stage/internal/s1/sample4.csv >/dev/null -curl -s -u root: -XPOST "http://localhost:${QUERY_HTTP_HANDLER_PORT}/v1/query" --header 'Content-Type: application/json' -d '{"sql": "replace into sample (Id, City, Score) ON(Id) values (?,?,?)", "stage_attachment": {"location": "@s1/sample4.csv", "copy_options": {"purge": "true"}}}' | jq -r '.stats.scan_progress.bytes, .error' +curl -s -u root: -XPOST "http://localhost:${QUERY_HTTP_HANDLER_PORT}/v1/query" --header 'Content-Type: application/json' -d '{"sql": "replace into sample (Id, City, Score) ON(Id) values (?,?,?)", "stage_attachment": {"location": "@s1/sample4.csv", "copy_options": {"purge": "true"}}}' | jq -r '.error' echo "select * from sample order by id" | $MYSQL_CLIENT_CONNECT ### Drop table.