Skip to content

Commit

Permalink
refactor codes
Browse files Browse the repository at this point in the history
  • Loading branch information
zhyass committed Mar 19, 2024
1 parent 9b47de7 commit 3306bf3
Show file tree
Hide file tree
Showing 9 changed files with 580 additions and 494 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

use databend_common_catalog::table_context::TableContext;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::types::DataType;
use databend_common_expression::ColumnId;
use databend_common_expression::Scalar;
use databend_common_expression::TableDataType;
use databend_common_expression::TableSchema;
use databend_common_sql::field_default_value;
use databend_storages_common_table_meta::meta::ClusterKey;
use databend_storages_common_table_meta::meta::ColumnStatistics;
use databend_storages_common_table_meta::meta::Statistics;
use databend_storages_common_table_meta::meta::TableSnapshot;
use log::warn;
use uuid::Uuid;

use crate::operations::common::ConflictResolveContext;
use crate::operations::common::SnapshotGenerator;
use crate::operations::common::SnapshotMerged;
use crate::statistics::reducers::merge_statistics_mut;

#[derive(Clone)]
pub struct AppendGenerator {
ctx: Arc<dyn TableContext>,
leaf_default_values: HashMap<ColumnId, Scalar>,
overwrite: bool,
conflict_resolve_ctx: ConflictResolveContext,
}

impl AppendGenerator {
pub fn new(ctx: Arc<dyn TableContext>, overwrite: bool) -> Self {
AppendGenerator {
ctx,
leaf_default_values: HashMap::new(),
overwrite,
conflict_resolve_ctx: ConflictResolveContext::None,
}
}

fn check_fill_default(&self, summary: &Statistics) -> Result<bool> {
let mut fill_default_values = false;
// check if need to fill default value in statistics
for column_id in self
.conflict_resolve_ctx()?
.0
.merged_statistics
.col_stats
.keys()
{
if !summary.col_stats.contains_key(column_id) {
fill_default_values = true;
break;
}
}
Ok(fill_default_values)
}
}

impl AppendGenerator {
fn conflict_resolve_ctx(&self) -> Result<(&SnapshotMerged, &TableSchema)> {
match &self.conflict_resolve_ctx {
ConflictResolveContext::AppendOnly((ctx, schema)) => Ok((ctx, schema.as_ref())),
_ => Err(ErrorCode::Internal(
"conflict_resolve_ctx should only be Appendonly in AppendGenerator",
)),
}
}
}

#[async_trait::async_trait]
impl SnapshotGenerator for AppendGenerator {
fn set_conflict_resolve_context(&mut self, ctx: ConflictResolveContext) {
self.conflict_resolve_ctx = ctx;
}

async fn fill_default_values(
&mut self,
schema: TableSchema,
previous: &Option<Arc<TableSnapshot>>,
) -> Result<()> {
if let Some(snapshot) = previous {
if !self.overwrite && self.check_fill_default(&snapshot.summary)? {
let mut default_values = Vec::with_capacity(schema.num_fields());
for field in schema.fields() {
default_values.push(field_default_value(self.ctx.clone(), field)?);
}
self.leaf_default_values = schema.field_leaf_default_values(&default_values);
}
}
Ok(())
}

fn generate_new_snapshot(
&self,
schema: TableSchema,
cluster_key_meta: Option<ClusterKey>,
previous: Option<Arc<TableSnapshot>>,
) -> Result<TableSnapshot> {
let (snapshot_merged, expected_schema) = self.conflict_resolve_ctx()?;
if is_column_type_modified(&schema, expected_schema) {
return Err(ErrorCode::UnresolvableConflict(format!(
"schema was changed during insert, expected:{:?}, actual:{:?}",
expected_schema, schema
)));
}
let mut prev_timestamp = None;
let mut prev_snapshot_id = None;
let mut table_statistics_location = None;
let mut index_info_locations = None;
let mut new_segments = snapshot_merged.merged_segments.clone();
let mut new_summary = snapshot_merged.merged_statistics.clone();

if let Some(snapshot) = &previous {
prev_timestamp = snapshot.timestamp;
prev_snapshot_id = Some((snapshot.snapshot_id, snapshot.format_version));
table_statistics_location = snapshot.table_statistics_location.clone();
index_info_locations = snapshot.index_info_locations.clone();

if !self.overwrite {
let mut summary = snapshot.summary.clone();

let leaf_fields = schema.leaf_fields();
let column_data_types: HashMap<ColumnId, &TableDataType> =
HashMap::from_iter(leaf_fields.iter().map(|f| (f.column_id, &f.data_type)));

if self.check_fill_default(&summary)? {
self.leaf_default_values
.iter()
.for_each(|(col_id, default_value)| {
if let Some(data_type) = column_data_types.get(col_id) {
if !summary.col_stats.contains_key(col_id) {
assert!(
default_value
.as_ref()
.is_value_of_type(&DataType::from(*data_type)),
"default value: {:?} is not of type: {:?}",
default_value,
data_type
);
if let Some((min, max)) = crate::statistics::scalar_min_max(
&DataType::from(*data_type),
default_value.clone(),
) {
let (null_count, distinct_of_values) =
if default_value.is_null() {
(summary.row_count, Some(0))
} else {
(0, Some(1))
};
let col_stat = ColumnStatistics::new(
min,
max,
null_count,
0,
distinct_of_values,
);
summary.col_stats.insert(*col_id, col_stat);
}
}
} else {
warn!("column id:{} not found in schema, while populating min/max values. the schema is {:?}", col_id, schema);
}
});
}

new_segments = snapshot_merged
.merged_segments
.iter()
.chain(snapshot.segments.iter())
.cloned()
.collect();

merge_statistics_mut(
&mut new_summary,
&summary,
cluster_key_meta.clone().map(|v| v.0),
);
}
}

// check if need to auto compact
// the algorithm is: if the number of imperfect blocks is greater than the threshold, then auto compact.
// the threshold is set by the setting `auto_compaction_imperfect_blocks_threshold`, default is 50.
let imperfect_count = new_summary.block_count - new_summary.perfect_block_count;
let auto_compaction_imperfect_blocks_threshold = self
.ctx
.get_settings()
.get_auto_compaction_imperfect_blocks_threshold()?;
let auto_compact = imperfect_count >= auto_compaction_imperfect_blocks_threshold;
self.ctx.set_need_compact_after_write(auto_compact);

Ok(TableSnapshot::new(
Uuid::new_v4(),
&prev_timestamp,
prev_snapshot_id,
schema,
new_summary,
new_segments,
cluster_key_meta,
table_statistics_location,
index_info_locations,
))
}
}

fn is_column_type_modified(schema: &TableSchema, expected_schema: &TableSchema) -> bool {
let expected: HashMap<_, _> = expected_schema
.fields()
.iter()
.map(|f| (f.column_id, &f.data_type))
.collect();
schema.fields().iter().any(|f| {
expected
.get(&f.column_id)
.is_some_and(|ty| **ty != f.data_type)
})
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::HashSet;
use std::ops::Range;

use databend_common_expression::TableSchemaRef;
use databend_storages_common_table_meta::meta::Location;
use databend_storages_common_table_meta::meta::Statistics;
use databend_storages_common_table_meta::meta::TableSnapshot;

#[allow(clippy::large_enum_variant)]
#[derive(Clone, serde::Serialize, serde::Deserialize, Debug, PartialEq)]
pub enum ConflictResolveContext {
None,
AppendOnly((SnapshotMerged, TableSchemaRef)),
ModifiedSegmentExistsInLatest(SnapshotChanges),
}

impl ConflictResolveContext {
pub fn is_latest_snapshot_append_only(
base: &TableSnapshot,
latest: &TableSnapshot,
) -> Option<Range<usize>> {
let base_segments = &base.segments;
let latest_segments = &latest.segments;

let base_segments_len = base_segments.len();
let latest_segments_len = latest_segments.len();

if latest_segments_len >= base_segments_len
&& base_segments[0..base_segments_len]
== latest_segments[(latest_segments_len - base_segments_len)..latest_segments_len]
{
Some(0..(latest_segments_len - base_segments_len))
} else {
None
}
}

pub fn is_modified_segments_exists_in_latest(
base: &TableSnapshot,
latest: &TableSnapshot,
replaced_segments: &HashMap<usize, Location>,
removed_segments: &[usize],
) -> Option<(Vec<usize>, HashMap<usize, Location>)> {
let latest_segments = latest
.segments
.iter()
.enumerate()
.map(|(i, x)| (x, i))
.collect::<HashMap<_, usize>>();
let mut removed = Vec::with_capacity(removed_segments.len());
for removed_segment in removed_segments {
let removed_segment = &base.segments[*removed_segment];
if let Some(position) = latest_segments.get(removed_segment) {
removed.push(*position);
} else {
return None;
}
}

let mut replaced = HashMap::with_capacity(replaced_segments.len());
for (position, location) in replaced_segments {
let origin_segment = &base.segments[*position];
if let Some(position) = latest_segments.get(origin_segment) {
replaced.insert(*position, location.clone());
} else {
return None;
}
}
Some((removed, replaced))
}

pub fn merge_segments(
mut base_segments: Vec<Location>,
appended_segments: Vec<Location>,
replaced_segments: HashMap<usize, Location>,
removed_segment_indexes: Vec<usize>,
) -> Vec<Location> {
replaced_segments
.into_iter()
.for_each(|(k, v)| base_segments[k] = v);

let mut blanks = removed_segment_indexes;
blanks.sort_unstable();
let mut merged_segments =
Vec::with_capacity(base_segments.len() + appended_segments.len() - blanks.len());
if !blanks.is_empty() {
let mut last = 0;
for blank in blanks {
merged_segments.extend_from_slice(&base_segments[last..blank]);
last = blank + 1;
}
merged_segments.extend_from_slice(&base_segments[last..]);
} else {
merged_segments = base_segments;
}

appended_segments
.into_iter()
.chain(merged_segments)
.collect()
}
}

#[derive(Clone, serde::Serialize, serde::Deserialize, Debug, PartialEq, Default)]
pub struct SnapshotChanges {
pub appended_segments: Vec<Location>,
pub replaced_segments: HashMap<usize, Location>,
pub removed_segment_indexes: Vec<usize>,

pub merged_statistics: Statistics,
pub removed_statistics: Statistics,
}

impl SnapshotChanges {
pub fn check_intersect(&self, other: &SnapshotChanges) -> bool {
if Self::is_slice_intersect(&self.appended_segments, &other.appended_segments) {
return true;
}
for o in &other.replaced_segments {
if self.replaced_segments.contains_key(o.0) {
return true;
}
}
if Self::is_slice_intersect(
&self.removed_segment_indexes,
&other.removed_segment_indexes,
) {
return true;
}
false
}

fn is_slice_intersect<T: Eq + std::hash::Hash>(l: &[T], r: &[T]) -> bool {
let (l, r) = if l.len() > r.len() { (l, r) } else { (r, l) };
let l = l.iter().collect::<HashSet<_>>();
for x in r {
if l.contains(x) {
return true;
}
}
false
}
}

#[derive(Clone, serde::Serialize, serde::Deserialize, Debug, PartialEq)]
pub struct SnapshotMerged {
pub merged_segments: Vec<Location>,
pub merged_statistics: Statistics,
}
Loading

0 comments on commit 3306bf3

Please sign in to comment.