diff --git a/src/query/storages/fuse/src/operations/common/generators/append_generator.rs b/src/query/storages/fuse/src/operations/common/generators/append_generator.rs new file mode 100644 index 0000000000000..b8912b4385670 --- /dev/null +++ b/src/query/storages/fuse/src/operations/common/generators/append_generator.rs @@ -0,0 +1,234 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use databend_common_catalog::table_context::TableContext; +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_expression::types::DataType; +use databend_common_expression::ColumnId; +use databend_common_expression::Scalar; +use databend_common_expression::TableDataType; +use databend_common_expression::TableSchema; +use databend_common_sql::field_default_value; +use databend_storages_common_table_meta::meta::ClusterKey; +use databend_storages_common_table_meta::meta::ColumnStatistics; +use databend_storages_common_table_meta::meta::Statistics; +use databend_storages_common_table_meta::meta::TableSnapshot; +use log::warn; +use uuid::Uuid; + +use crate::operations::common::ConflictResolveContext; +use crate::operations::common::SnapshotGenerator; +use crate::operations::common::SnapshotMerged; +use crate::statistics::reducers::merge_statistics_mut; + +#[derive(Clone)] +pub struct AppendGenerator { + ctx: Arc, + leaf_default_values: HashMap, + overwrite: bool, + conflict_resolve_ctx: ConflictResolveContext, +} + +impl AppendGenerator { + pub fn new(ctx: Arc, overwrite: bool) -> Self { + AppendGenerator { + ctx, + leaf_default_values: HashMap::new(), + overwrite, + conflict_resolve_ctx: ConflictResolveContext::None, + } + } + + fn check_fill_default(&self, summary: &Statistics) -> Result { + let mut fill_default_values = false; + // check if need to fill default value in statistics + for column_id in self + .conflict_resolve_ctx()? + .0 + .merged_statistics + .col_stats + .keys() + { + if !summary.col_stats.contains_key(column_id) { + fill_default_values = true; + break; + } + } + Ok(fill_default_values) + } +} + +impl AppendGenerator { + fn conflict_resolve_ctx(&self) -> Result<(&SnapshotMerged, &TableSchema)> { + match &self.conflict_resolve_ctx { + ConflictResolveContext::AppendOnly((ctx, schema)) => Ok((ctx, schema.as_ref())), + _ => Err(ErrorCode::Internal( + "conflict_resolve_ctx should only be Appendonly in AppendGenerator", + )), + } + } +} + +#[async_trait::async_trait] +impl SnapshotGenerator for AppendGenerator { + fn set_conflict_resolve_context(&mut self, ctx: ConflictResolveContext) { + self.conflict_resolve_ctx = ctx; + } + + async fn fill_default_values( + &mut self, + schema: TableSchema, + previous: &Option>, + ) -> Result<()> { + if let Some(snapshot) = previous { + if !self.overwrite && self.check_fill_default(&snapshot.summary)? { + let mut default_values = Vec::with_capacity(schema.num_fields()); + for field in schema.fields() { + default_values.push(field_default_value(self.ctx.clone(), field)?); + } + self.leaf_default_values = schema.field_leaf_default_values(&default_values); + } + } + Ok(()) + } + + fn generate_new_snapshot( + &self, + schema: TableSchema, + cluster_key_meta: Option, + previous: Option>, + ) -> Result { + let (snapshot_merged, expected_schema) = self.conflict_resolve_ctx()?; + if is_column_type_modified(&schema, expected_schema) { + return Err(ErrorCode::UnresolvableConflict(format!( + "schema was changed during insert, expected:{:?}, actual:{:?}", + expected_schema, schema + ))); + } + let mut prev_timestamp = None; + let mut prev_snapshot_id = None; + let mut table_statistics_location = None; + let mut index_info_locations = None; + let mut new_segments = snapshot_merged.merged_segments.clone(); + let mut new_summary = snapshot_merged.merged_statistics.clone(); + + if let Some(snapshot) = &previous { + prev_timestamp = snapshot.timestamp; + prev_snapshot_id = Some((snapshot.snapshot_id, snapshot.format_version)); + table_statistics_location = snapshot.table_statistics_location.clone(); + index_info_locations = snapshot.index_info_locations.clone(); + + if !self.overwrite { + let mut summary = snapshot.summary.clone(); + + let leaf_fields = schema.leaf_fields(); + let column_data_types: HashMap = + HashMap::from_iter(leaf_fields.iter().map(|f| (f.column_id, &f.data_type))); + + if self.check_fill_default(&summary)? { + self.leaf_default_values + .iter() + .for_each(|(col_id, default_value)| { + if let Some(data_type) = column_data_types.get(col_id) { + if !summary.col_stats.contains_key(col_id) { + assert!( + default_value + .as_ref() + .is_value_of_type(&DataType::from(*data_type)), + "default value: {:?} is not of type: {:?}", + default_value, + data_type + ); + if let Some((min, max)) = crate::statistics::scalar_min_max( + &DataType::from(*data_type), + default_value.clone(), + ) { + let (null_count, distinct_of_values) = + if default_value.is_null() { + (summary.row_count, Some(0)) + } else { + (0, Some(1)) + }; + let col_stat = ColumnStatistics::new( + min, + max, + null_count, + 0, + distinct_of_values, + ); + summary.col_stats.insert(*col_id, col_stat); + } + } + } else { + warn!("column id:{} not found in schema, while populating min/max values. the schema is {:?}", col_id, schema); + } + }); + } + + new_segments = snapshot_merged + .merged_segments + .iter() + .chain(snapshot.segments.iter()) + .cloned() + .collect(); + + merge_statistics_mut( + &mut new_summary, + &summary, + cluster_key_meta.clone().map(|v| v.0), + ); + } + } + + // check if need to auto compact + // the algorithm is: if the number of imperfect blocks is greater than the threshold, then auto compact. + // the threshold is set by the setting `auto_compaction_imperfect_blocks_threshold`, default is 50. + let imperfect_count = new_summary.block_count - new_summary.perfect_block_count; + let auto_compaction_imperfect_blocks_threshold = self + .ctx + .get_settings() + .get_auto_compaction_imperfect_blocks_threshold()?; + let auto_compact = imperfect_count >= auto_compaction_imperfect_blocks_threshold; + self.ctx.set_need_compact_after_write(auto_compact); + + Ok(TableSnapshot::new( + Uuid::new_v4(), + &prev_timestamp, + prev_snapshot_id, + schema, + new_summary, + new_segments, + cluster_key_meta, + table_statistics_location, + index_info_locations, + )) + } +} + +fn is_column_type_modified(schema: &TableSchema, expected_schema: &TableSchema) -> bool { + let expected: HashMap<_, _> = expected_schema + .fields() + .iter() + .map(|f| (f.column_id, &f.data_type)) + .collect(); + schema.fields().iter().any(|f| { + expected + .get(&f.column_id) + .is_some_and(|ty| **ty != f.data_type) + }) +} diff --git a/src/query/storages/fuse/src/operations/common/generators/conflict_resolve_context.rs b/src/query/storages/fuse/src/operations/common/generators/conflict_resolve_context.rs new file mode 100644 index 0000000000000..4f647bfbff9b4 --- /dev/null +++ b/src/query/storages/fuse/src/operations/common/generators/conflict_resolve_context.rs @@ -0,0 +1,164 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::collections::HashSet; +use std::ops::Range; + +use databend_common_expression::TableSchemaRef; +use databend_storages_common_table_meta::meta::Location; +use databend_storages_common_table_meta::meta::Statistics; +use databend_storages_common_table_meta::meta::TableSnapshot; + +#[allow(clippy::large_enum_variant)] +#[derive(Clone, serde::Serialize, serde::Deserialize, Debug, PartialEq)] +pub enum ConflictResolveContext { + None, + AppendOnly((SnapshotMerged, TableSchemaRef)), + ModifiedSegmentExistsInLatest(SnapshotChanges), +} + +impl ConflictResolveContext { + pub fn is_latest_snapshot_append_only( + base: &TableSnapshot, + latest: &TableSnapshot, + ) -> Option> { + let base_segments = &base.segments; + let latest_segments = &latest.segments; + + let base_segments_len = base_segments.len(); + let latest_segments_len = latest_segments.len(); + + if latest_segments_len >= base_segments_len + && base_segments[0..base_segments_len] + == latest_segments[(latest_segments_len - base_segments_len)..latest_segments_len] + { + Some(0..(latest_segments_len - base_segments_len)) + } else { + None + } + } + + pub fn is_modified_segments_exists_in_latest( + base: &TableSnapshot, + latest: &TableSnapshot, + replaced_segments: &HashMap, + removed_segments: &[usize], + ) -> Option<(Vec, HashMap)> { + let latest_segments = latest + .segments + .iter() + .enumerate() + .map(|(i, x)| (x, i)) + .collect::>(); + let mut removed = Vec::with_capacity(removed_segments.len()); + for removed_segment in removed_segments { + let removed_segment = &base.segments[*removed_segment]; + if let Some(position) = latest_segments.get(removed_segment) { + removed.push(*position); + } else { + return None; + } + } + + let mut replaced = HashMap::with_capacity(replaced_segments.len()); + for (position, location) in replaced_segments { + let origin_segment = &base.segments[*position]; + if let Some(position) = latest_segments.get(origin_segment) { + replaced.insert(*position, location.clone()); + } else { + return None; + } + } + Some((removed, replaced)) + } + + pub fn merge_segments( + mut base_segments: Vec, + appended_segments: Vec, + replaced_segments: HashMap, + removed_segment_indexes: Vec, + ) -> Vec { + replaced_segments + .into_iter() + .for_each(|(k, v)| base_segments[k] = v); + + let mut blanks = removed_segment_indexes; + blanks.sort_unstable(); + let mut merged_segments = + Vec::with_capacity(base_segments.len() + appended_segments.len() - blanks.len()); + if !blanks.is_empty() { + let mut last = 0; + for blank in blanks { + merged_segments.extend_from_slice(&base_segments[last..blank]); + last = blank + 1; + } + merged_segments.extend_from_slice(&base_segments[last..]); + } else { + merged_segments = base_segments; + } + + appended_segments + .into_iter() + .chain(merged_segments) + .collect() + } +} + +#[derive(Clone, serde::Serialize, serde::Deserialize, Debug, PartialEq, Default)] +pub struct SnapshotChanges { + pub appended_segments: Vec, + pub replaced_segments: HashMap, + pub removed_segment_indexes: Vec, + + pub merged_statistics: Statistics, + pub removed_statistics: Statistics, +} + +impl SnapshotChanges { + pub fn check_intersect(&self, other: &SnapshotChanges) -> bool { + if Self::is_slice_intersect(&self.appended_segments, &other.appended_segments) { + return true; + } + for o in &other.replaced_segments { + if self.replaced_segments.contains_key(o.0) { + return true; + } + } + if Self::is_slice_intersect( + &self.removed_segment_indexes, + &other.removed_segment_indexes, + ) { + return true; + } + false + } + + fn is_slice_intersect(l: &[T], r: &[T]) -> bool { + let (l, r) = if l.len() > r.len() { (l, r) } else { (r, l) }; + let l = l.iter().collect::>(); + for x in r { + if l.contains(x) { + return true; + } + } + false + } +} + +#[derive(Clone, serde::Serialize, serde::Deserialize, Debug, PartialEq)] +pub struct SnapshotMerged { + pub merged_segments: Vec, + pub merged_statistics: Statistics, +} diff --git a/src/query/storages/fuse/src/operations/common/generators/mod.rs b/src/query/storages/fuse/src/operations/common/generators/mod.rs new file mode 100644 index 0000000000000..c296b4318cf69 --- /dev/null +++ b/src/query/storages/fuse/src/operations/common/generators/mod.rs @@ -0,0 +1,25 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod append_generator; +mod conflict_resolve_context; +mod mutation_generator; +mod snapshot_generator; + +pub use append_generator::AppendGenerator; +pub use conflict_resolve_context::ConflictResolveContext; +pub use conflict_resolve_context::SnapshotChanges; +pub use conflict_resolve_context::SnapshotMerged; +pub use mutation_generator::MutationGenerator; +pub use snapshot_generator::SnapshotGenerator; diff --git a/src/query/storages/fuse/src/operations/common/generators/mutation_generator.rs b/src/query/storages/fuse/src/operations/common/generators/mutation_generator.rs new file mode 100644 index 0000000000000..b67be8cfddf00 --- /dev/null +++ b/src/query/storages/fuse/src/operations/common/generators/mutation_generator.rs @@ -0,0 +1,111 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_expression::TableSchema; +use databend_common_metrics::storage::*; +use databend_storages_common_table_meta::meta::ClusterKey; +use databend_storages_common_table_meta::meta::TableSnapshot; +use log::info; +use uuid::Uuid; + +use crate::operations::common::ConflictResolveContext; +use crate::operations::common::SnapshotGenerator; +use crate::statistics::merge_statistics; +use crate::statistics::reducers::deduct_statistics_mut; + +#[derive(Clone)] +pub struct MutationGenerator { + base_snapshot: Arc, + conflict_resolve_ctx: ConflictResolveContext, +} + +impl MutationGenerator { + pub fn new(base_snapshot: Arc) -> Self { + MutationGenerator { + base_snapshot, + conflict_resolve_ctx: ConflictResolveContext::None, + } + } +} + +impl SnapshotGenerator for MutationGenerator { + fn generate_new_snapshot( + &self, + schema: TableSchema, + cluster_key_meta: Option, + previous: Option>, + ) -> Result { + let default_cluster_key_id = cluster_key_meta.clone().map(|v| v.0); + + let previous = + previous.unwrap_or_else(|| Arc::new(TableSnapshot::new_empty_snapshot(schema.clone()))); + match &self.conflict_resolve_ctx { + ConflictResolveContext::AppendOnly(_) | ConflictResolveContext::None => { + return Err(ErrorCode::Internal( + "conflict_resolve_ctx should not be AppendOnly in MutationGenerator", + )); + } + ConflictResolveContext::ModifiedSegmentExistsInLatest(ctx) => { + if let Some((removed, replaced)) = + ConflictResolveContext::is_modified_segments_exists_in_latest( + &self.base_snapshot, + &previous, + &ctx.replaced_segments, + &ctx.removed_segment_indexes, + ) + { + info!("resolvable conflicts detected"); + metrics_inc_commit_mutation_modified_segment_exists_in_latest(); + let new_segments = ConflictResolveContext::merge_segments( + previous.segments.clone(), + ctx.appended_segments.clone(), + replaced, + removed, + ); + let mut new_summary = merge_statistics( + &ctx.merged_statistics, + &previous.summary, + default_cluster_key_id, + ); + deduct_statistics_mut(&mut new_summary, &ctx.removed_statistics); + let new_snapshot = TableSnapshot::new( + Uuid::new_v4(), + &previous.timestamp, + Some((previous.snapshot_id, previous.format_version)), + schema, + new_summary, + new_segments, + cluster_key_meta, + previous.table_statistics_location.clone(), + previous.index_info_locations.clone(), + ); + return Ok(new_snapshot); + } + } + } + metrics_inc_commit_mutation_unresolvable_conflict(); + Err(ErrorCode::UnresolvableConflict(format!( + "conflict resolve context:{:?}", + self.conflict_resolve_ctx + ))) + } + + fn set_conflict_resolve_context(&mut self, ctx: ConflictResolveContext) { + self.conflict_resolve_ctx = ctx; + } +} diff --git a/src/query/storages/fuse/src/operations/common/generators/snapshot_generator.rs b/src/query/storages/fuse/src/operations/common/generators/snapshot_generator.rs new file mode 100644 index 0000000000000..5807cfa825f10 --- /dev/null +++ b/src/query/storages/fuse/src/operations/common/generators/snapshot_generator.rs @@ -0,0 +1,42 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use databend_common_exception::Result; +use databend_common_expression::TableSchema; +use databend_storages_common_table_meta::meta::ClusterKey; +use databend_storages_common_table_meta::meta::TableSnapshot; + +use crate::operations::common::ConflictResolveContext; + +#[async_trait::async_trait] +pub trait SnapshotGenerator { + fn set_conflict_resolve_context(&mut self, ctx: ConflictResolveContext); + + async fn fill_default_values( + &mut self, + _schema: TableSchema, + _snapshot: &Option>, + ) -> Result<()> { + Ok(()) + } + + fn generate_new_snapshot( + &self, + schema: TableSchema, + cluster_key_meta: Option, + previous: Option>, + ) -> Result; +} diff --git a/src/query/storages/fuse/src/operations/common/abort_operation.rs b/src/query/storages/fuse/src/operations/common/meta/abort_operation.rs similarity index 100% rename from src/query/storages/fuse/src/operations/common/abort_operation.rs rename to src/query/storages/fuse/src/operations/common/meta/abort_operation.rs diff --git a/src/query/storages/fuse/src/operations/common/meta/mod.rs b/src/query/storages/fuse/src/operations/common/meta/mod.rs index 0fa2e0b041572..252cdff8b14b7 100644 --- a/src/query/storages/fuse/src/operations/common/meta/mod.rs +++ b/src/query/storages/fuse/src/operations/common/meta/mod.rs @@ -12,8 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod abort_operation; mod commit_meta; mod mutation_log; +pub use abort_operation::AbortOperation; pub use commit_meta::*; pub use mutation_log::*; diff --git a/src/query/storages/fuse/src/operations/common/mod.rs b/src/query/storages/fuse/src/operations/common/mod.rs index 950d885d2e977..640fa9161f03e 100644 --- a/src/query/storages/fuse/src/operations/common/mod.rs +++ b/src/query/storages/fuse/src/operations/common/mod.rs @@ -12,12 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod abort_operation; +mod generators; mod meta; mod processors; -mod snapshot_generator; -pub use abort_operation::AbortOperation; +pub use generators::*; pub use meta::*; pub use processors::*; -pub use snapshot_generator::*; diff --git a/src/query/storages/fuse/src/operations/common/snapshot_generator.rs b/src/query/storages/fuse/src/operations/common/snapshot_generator.rs deleted file mode 100644 index b7f8358252cfa..0000000000000 --- a/src/query/storages/fuse/src/operations/common/snapshot_generator.rs +++ /dev/null @@ -1,490 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashMap; -use std::collections::HashSet; -use std::ops::Range; -use std::sync::Arc; - -use databend_common_catalog::table_context::TableContext; -use databend_common_exception::ErrorCode; -use databend_common_exception::Result; -use databend_common_expression::types::DataType; -use databend_common_expression::ColumnId; -use databend_common_expression::Scalar; -use databend_common_expression::TableDataType; -use databend_common_expression::TableSchema; -use databend_common_expression::TableSchemaRef; -use databend_common_metrics::storage::*; -use databend_common_sql::field_default_value; -use databend_storages_common_table_meta::meta::ClusterKey; -use databend_storages_common_table_meta::meta::ColumnStatistics; -use databend_storages_common_table_meta::meta::Location; -use databend_storages_common_table_meta::meta::Statistics; -use databend_storages_common_table_meta::meta::TableSnapshot; -use log::info; -use log::warn; -use uuid::Uuid; - -use crate::statistics::merge_statistics; -use crate::statistics::reducers::deduct_statistics_mut; -use crate::statistics::reducers::merge_statistics_mut; - -#[async_trait::async_trait] -pub trait SnapshotGenerator { - fn set_conflict_resolve_context(&mut self, ctx: ConflictResolveContext); - - async fn fill_default_values( - &mut self, - _schema: TableSchema, - _snapshot: &Option>, - ) -> Result<()> { - Ok(()) - } - - fn generate_new_snapshot( - &self, - schema: TableSchema, - cluster_key_meta: Option, - previous: Option>, - ) -> Result; -} - -#[derive(Clone, serde::Serialize, serde::Deserialize, Debug, PartialEq, Default)] -pub struct SnapshotChanges { - pub appended_segments: Vec, - pub replaced_segments: HashMap, - pub removed_segment_indexes: Vec, - - pub merged_statistics: Statistics, - pub removed_statistics: Statistics, -} - -impl SnapshotChanges { - pub fn check_intersect(&self, other: &SnapshotChanges) -> bool { - if Self::is_slice_intersect(&self.appended_segments, &other.appended_segments) { - return true; - } - for o in &other.replaced_segments { - if self.replaced_segments.contains_key(o.0) { - return true; - } - } - if Self::is_slice_intersect( - &self.removed_segment_indexes, - &other.removed_segment_indexes, - ) { - return true; - } - false - } - - fn is_slice_intersect(l: &[T], r: &[T]) -> bool { - let (l, r) = if l.len() > r.len() { (l, r) } else { (r, l) }; - let l = l.iter().collect::>(); - for x in r { - if l.contains(x) { - return true; - } - } - false - } -} - -#[derive(Clone, serde::Serialize, serde::Deserialize, Debug, PartialEq)] -pub struct SnapshotMerged { - pub merged_segments: Vec, - pub merged_statistics: Statistics, -} - -#[allow(clippy::large_enum_variant)] -#[derive(Clone, serde::Serialize, serde::Deserialize, Debug, PartialEq)] -pub enum ConflictResolveContext { - AppendOnly((SnapshotMerged, TableSchemaRef)), - ModifiedSegmentExistsInLatest(SnapshotChanges), -} - -impl ConflictResolveContext { - pub fn is_latest_snapshot_append_only( - base: &TableSnapshot, - latest: &TableSnapshot, - ) -> Option> { - let base_segments = &base.segments; - let latest_segments = &latest.segments; - - let base_segments_len = base_segments.len(); - let latest_segments_len = latest_segments.len(); - - if latest_segments_len >= base_segments_len - && base_segments[0..base_segments_len] - == latest_segments[(latest_segments_len - base_segments_len)..latest_segments_len] - { - Some(0..(latest_segments_len - base_segments_len)) - } else { - None - } - } - - pub fn is_modified_segments_exists_in_latest( - base: &TableSnapshot, - latest: &TableSnapshot, - replaced_segments: &HashMap, - removed_segments: &[usize], - ) -> Option<(Vec, HashMap)> { - let latest_segments = latest - .segments - .iter() - .enumerate() - .map(|(i, x)| (x, i)) - .collect::>(); - let mut removed = Vec::with_capacity(removed_segments.len()); - for removed_segment in removed_segments { - let removed_segment = &base.segments[*removed_segment]; - if let Some(position) = latest_segments.get(removed_segment) { - removed.push(*position); - } else { - return None; - } - } - - let mut replaced = HashMap::with_capacity(replaced_segments.len()); - for (position, location) in replaced_segments { - let origin_segment = &base.segments[*position]; - if let Some(position) = latest_segments.get(origin_segment) { - replaced.insert(*position, location.clone()); - } else { - return None; - } - } - Some((removed, replaced)) - } - - pub fn merge_segments( - mut base_segments: Vec, - appended_segments: Vec, - replaced_segments: HashMap, - removed_segment_indexes: Vec, - ) -> Vec { - replaced_segments - .into_iter() - .for_each(|(k, v)| base_segments[k] = v); - - let mut blanks = removed_segment_indexes; - blanks.sort_unstable(); - let mut merged_segments = - Vec::with_capacity(base_segments.len() + appended_segments.len() - blanks.len()); - if !blanks.is_empty() { - let mut last = 0; - for blank in blanks { - merged_segments.extend_from_slice(&base_segments[last..blank]); - last = blank + 1; - } - merged_segments.extend_from_slice(&base_segments[last..]); - } else { - merged_segments = base_segments; - } - - appended_segments - .into_iter() - .chain(merged_segments) - .collect() - } -} - -#[derive(Clone)] -pub struct MutationGenerator { - base_snapshot: Arc, - conflict_resolve_ctx: Option, -} - -impl MutationGenerator { - pub fn new(base_snapshot: Arc) -> Self { - MutationGenerator { - base_snapshot, - conflict_resolve_ctx: None, - } - } -} - -impl SnapshotGenerator for MutationGenerator { - fn generate_new_snapshot( - &self, - schema: TableSchema, - cluster_key_meta: Option, - previous: Option>, - ) -> Result { - let default_cluster_key_id = cluster_key_meta.clone().map(|v| v.0); - - let previous = - previous.unwrap_or_else(|| Arc::new(TableSnapshot::new_empty_snapshot(schema.clone()))); - let ctx = self - .conflict_resolve_ctx - .as_ref() - .ok_or_else(|| ErrorCode::Internal("conflict_solve_ctx not set"))?; - match ctx { - ConflictResolveContext::AppendOnly(_) => { - return Err(ErrorCode::Internal( - "conflict_resolve_ctx should not be AppendOnly in MutationGenerator", - )); - } - ConflictResolveContext::ModifiedSegmentExistsInLatest(ctx) => { - if let Some((removed, replaced)) = - ConflictResolveContext::is_modified_segments_exists_in_latest( - &self.base_snapshot, - &previous, - &ctx.replaced_segments, - &ctx.removed_segment_indexes, - ) - { - info!("resolvable conflicts detected"); - metrics_inc_commit_mutation_modified_segment_exists_in_latest(); - let new_segments = ConflictResolveContext::merge_segments( - previous.segments.clone(), - ctx.appended_segments.clone(), - replaced, - removed, - ); - let mut new_summary = merge_statistics( - &ctx.merged_statistics, - &previous.summary, - default_cluster_key_id, - ); - deduct_statistics_mut(&mut new_summary, &ctx.removed_statistics); - let new_snapshot = TableSnapshot::new( - Uuid::new_v4(), - &previous.timestamp, - Some((previous.snapshot_id, previous.format_version)), - schema, - new_summary, - new_segments, - cluster_key_meta, - previous.table_statistics_location.clone(), - previous.index_info_locations.clone(), - ); - return Ok(new_snapshot); - } - } - } - metrics_inc_commit_mutation_unresolvable_conflict(); - Err(ErrorCode::UnresolvableConflict(format!( - "conflict resolve context:{:?}", - ctx - ))) - } - - fn set_conflict_resolve_context(&mut self, ctx: ConflictResolveContext) { - self.conflict_resolve_ctx = Some(ctx); - } -} - -#[derive(Clone)] -pub struct AppendGenerator { - ctx: Arc, - leaf_default_values: HashMap, - overwrite: bool, - conflict_resolve_ctx: Option, -} - -impl AppendGenerator { - pub fn new(ctx: Arc, overwrite: bool) -> Self { - AppendGenerator { - ctx, - leaf_default_values: HashMap::new(), - overwrite, - conflict_resolve_ctx: None, - } - } - - fn check_fill_default(&self, summary: &Statistics) -> Result { - let mut fill_default_values = false; - // check if need to fill default value in statistics - for column_id in self - .conflict_resolve_ctx()? - .0 - .merged_statistics - .col_stats - .keys() - { - if !summary.col_stats.contains_key(column_id) { - fill_default_values = true; - break; - } - } - Ok(fill_default_values) - } -} - -impl AppendGenerator { - fn conflict_resolve_ctx(&self) -> Result<(&SnapshotMerged, &TableSchema)> { - let ctx = self - .conflict_resolve_ctx - .as_ref() - .ok_or_else(|| ErrorCode::Internal("conflict_solve_ctx not set"))?; - match ctx { - ConflictResolveContext::AppendOnly((ctx, schema)) => Ok((ctx, schema.as_ref())), - _ => Err(ErrorCode::Internal( - "conflict_resolve_ctx should only be Appendonly in AppendGenerator", - )), - } - } -} - -#[async_trait::async_trait] -impl SnapshotGenerator for AppendGenerator { - fn set_conflict_resolve_context(&mut self, ctx: ConflictResolveContext) { - self.conflict_resolve_ctx = Some(ctx); - } - - async fn fill_default_values( - &mut self, - schema: TableSchema, - previous: &Option>, - ) -> Result<()> { - if let Some(snapshot) = previous { - if !self.overwrite && self.check_fill_default(&snapshot.summary)? { - let mut default_values = Vec::with_capacity(schema.num_fields()); - for field in schema.fields() { - default_values.push(field_default_value(self.ctx.clone(), field)?); - } - self.leaf_default_values = schema.field_leaf_default_values(&default_values); - } - } - Ok(()) - } - - fn generate_new_snapshot( - &self, - schema: TableSchema, - cluster_key_meta: Option, - previous: Option>, - ) -> Result { - let (snapshot_merged, expected_schema) = self.conflict_resolve_ctx()?; - if is_column_type_modified(&schema, expected_schema) { - return Err(ErrorCode::UnresolvableConflict(format!( - "schema was changed during insert, expected:{:?}, actual:{:?}", - expected_schema, schema - ))); - } - let mut prev_timestamp = None; - let mut prev_snapshot_id = None; - let mut table_statistics_location = None; - let mut index_info_locations = None; - let mut new_segments = snapshot_merged.merged_segments.clone(); - let mut new_summary = snapshot_merged.merged_statistics.clone(); - - if let Some(snapshot) = &previous { - prev_timestamp = snapshot.timestamp; - prev_snapshot_id = Some((snapshot.snapshot_id, snapshot.format_version)); - table_statistics_location = snapshot.table_statistics_location.clone(); - index_info_locations = snapshot.index_info_locations.clone(); - - if !self.overwrite { - let mut summary = snapshot.summary.clone(); - - let leaf_fields = schema.leaf_fields(); - let column_data_types: HashMap = - HashMap::from_iter(leaf_fields.iter().map(|f| (f.column_id, &f.data_type))); - - if self.check_fill_default(&summary)? { - self.leaf_default_values - .iter() - .for_each(|(col_id, default_value)| { - if let Some(data_type) = column_data_types.get(col_id) { - if !summary.col_stats.contains_key(col_id) { - assert!( - default_value - .as_ref() - .is_value_of_type(&DataType::from(*data_type)), - "default value: {:?} is not of type: {:?}", - default_value, - data_type - ); - if let Some((min, max)) = crate::statistics::scalar_min_max( - &DataType::from(*data_type), - default_value.clone(), - ) { - let (null_count, distinct_of_values) = - if default_value.is_null() { - (summary.row_count, Some(0)) - } else { - (0, Some(1)) - }; - let col_stat = ColumnStatistics::new( - min, - max, - null_count, - 0, - distinct_of_values, - ); - summary.col_stats.insert(*col_id, col_stat); - } - } - } else { - warn!("column id:{} not found in schema, while populating min/max values. the schema is {:?}", col_id, schema); - } - }); - } - - new_segments = snapshot_merged - .merged_segments - .iter() - .chain(snapshot.segments.iter()) - .cloned() - .collect(); - - merge_statistics_mut( - &mut new_summary, - &summary, - cluster_key_meta.clone().map(|v| v.0), - ); - } - } - - // check if need to auto compact - // the algorithm is: if the number of imperfect blocks is greater than the threshold, then auto compact. - // the threshold is set by the setting `auto_compaction_imperfect_blocks_threshold`, default is 50. - let imperfect_count = new_summary.block_count - new_summary.perfect_block_count; - let auto_compaction_imperfect_blocks_threshold = self - .ctx - .get_settings() - .get_auto_compaction_imperfect_blocks_threshold()?; - let auto_compact = imperfect_count >= auto_compaction_imperfect_blocks_threshold; - self.ctx.set_need_compact_after_write(auto_compact); - - Ok(TableSnapshot::new( - Uuid::new_v4(), - &prev_timestamp, - prev_snapshot_id, - schema, - new_summary, - new_segments, - cluster_key_meta, - table_statistics_location, - index_info_locations, - )) - } -} - -fn is_column_type_modified(schema: &TableSchema, expected_schema: &TableSchema) -> bool { - let expected: HashMap<_, _> = expected_schema - .fields() - .iter() - .map(|f| (f.column_id, &f.data_type)) - .collect(); - schema.fields().iter().any(|f| { - expected - .get(&f.column_id) - .is_some_and(|ty| **ty != f.data_type) - }) -}