Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: refine replace into pruning for table with cluster keys #12147

Merged
merged 12 commits into from
Jul 24, 2023
2 changes: 1 addition & 1 deletion src/query/expression/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ impl DataBlock {
}

Ok(Self {
columns: self.columns.clone(),
columns: self.columns,
num_rows: self.num_rows,
meta,
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ DB.Table: 'system'.'settings', Table: settings-table_id:1, ver:0, Engine: System
| 'enable_distributed_copy_into' | '0' | '0' | 'SESSION' | 'Enable distributed execution of copy into.' | 'UInt64' |
| 'enable_dphyp' | '1' | '1' | 'SESSION' | 'Enables dphyp join order algorithm.' | 'UInt64' |
| 'enable_query_result_cache' | '0' | '0' | 'SESSION' | 'Enables caching query results to improve performance for identical queries.' | 'UInt64' |
| 'enable_replace_into_partitioning' | '1' | '1' | 'SESSION' | 'Enables partitioning for replace-into statement (if table has cluster keys).' | 'UInt64' |
| 'enable_runtime_filter' | '0' | '0' | 'SESSION' | 'Enables runtime filter optimization for JOIN.' | 'UInt64' |
| 'enable_table_lock' | '1' | '1' | 'SESSION' | 'Enables table lock if necessary (enabled by default).' | 'UInt64' |
| 'flight_client_timeout' | '60' | '60' | 'SESSION' | 'Sets the maximum time in seconds that a flight client request can be processed.' | 'UInt64' |
Expand Down
6 changes: 6 additions & 0 deletions src/query/settings/src/settings_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,12 @@ impl DefaultSettings {
possible_values: None,
display_in_show_settings: true,
}),
("enable_replace_into_partitioning", DefaultSettingValue {
value: UserSettingValue::UInt64(1),
desc: "Enables partitioning for replace-into statement (if table has cluster keys).",
possible_values: None,
display_in_show_settings: true,
}),
]);

Ok(Arc::new(DefaultSettings {
Expand Down
7 changes: 7 additions & 0 deletions src/query/settings/src/settings_getter_setter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,4 +406,11 @@ impl Settings {
pub fn set_use_parquet2(&self, val: bool) -> Result<()> {
self.try_set_u64("use_parquet2", u64::from(val))
}

pub fn get_enable_replace_into_partitioning(&self) -> Result<bool> {
Ok(self.try_get_u64("enable_replace_into_partitioning")? != 0)
}
pub fn set_enable_replace_into_partitioning(&self, val: bool) -> Result<()> {
self.try_set_u64("enable_replace_into_partitioning", u64::from(val))
}
}
15 changes: 15 additions & 0 deletions src/query/storages/fuse/src/metrics/fuse_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,3 +228,18 @@ pub fn metrics_inc_replace_whole_block_deletion(c: u64) {
pub fn metrics_inc_replace_block_of_zero_row_deleted(c: u64) {
increment_gauge!(key!("replace_into_block_of_zero_row_deleted"), c as f64);
}

pub fn metrics_inc_replace_original_row_number(c: u64) {
increment_gauge!(key!("replace_into_original_row_number"), c as f64);
}

pub fn metrics_inc_replace_row_number_after_table_level_pruning(c: u64) {
increment_gauge!(
key!("replace_into_row_number_after_table_level_pruning"),
c as f64
);
}

pub fn metrics_inc_replace_partition_number(c: u64) {
increment_gauge!(key!("replace_into_partition_number"), c as f64);
}
19 changes: 12 additions & 7 deletions src/query/storages/fuse/src/operations/replace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,17 +152,22 @@ impl FuseTable {
Arc::new(TableSnapshot::new_empty_snapshot(schema.as_ref().clone()))
});

let empty_table = base_snapshot.segments.is_empty();
let replace_into_processor =
ReplaceIntoProcessor::create(on_conflicts.clone(), empty_table);
let table_is_empty = base_snapshot.segments.is_empty();
let table_level_range_index = base_snapshot.summary.col_stats.clone();
let cluster_keys = self.cluster_keys(ctx.clone());
let replace_into_processor = ReplaceIntoProcessor::create(
ctx.as_ref(),
on_conflicts.clone(),
cluster_keys,
schema.as_ref(),
table_is_empty,
table_level_range_index,
)?;

pipeline.add_pipe(replace_into_processor.into_pipe());

// 3. connect to broadcast processor and append transform

let base_snapshot = self.read_table_snapshot().await?.unwrap_or_else(|| {
Arc::new(TableSnapshot::new_empty_snapshot(schema.as_ref().clone()))
});

let max_threads = ctx.get_settings().get_max_threads()?;
let segment_partition_num =
std::cmp::min(base_snapshot.segments.len(), max_threads as usize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,15 @@ use common_expression::BlockMetaInfoDowncast;
use common_expression::DataBlock;
use common_expression::Scalar;

// This mod need to be refactored, since it not longer aiming to be
// used in the implementation of `MERGE INTO` statement in the future.
//
// unfortunately, distributed `replace-into` is being implemented in parallel,
// to avoid the potential heavy merge conflicts, the refactoring is postponed.

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)]
pub enum MergeIntoOperation {
Delete(DeletionByColumn),
Delete(Vec<DeletionByColumn>),
None,
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

use std::hash::Hasher;

use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::types::decimal::DecimalScalar;
use common_expression::types::AnyType;
use common_expression::types::DecimalSize;
Expand All @@ -23,16 +25,29 @@ use common_expression::Value;
use siphasher::sip128;
use siphasher::sip128::Hasher128;

pub fn row_hash_of_columns<'a>(column_values: &'a [&'a Value<AnyType>], row_idx: usize) -> u128 {
pub(crate) trait RowScalarValue {
fn row_scalar(&self, idx: usize) -> Result<ScalarRef>;
}

impl RowScalarValue for Value<AnyType> {
fn row_scalar(&self, idx: usize) -> Result<ScalarRef> {
match self {
Value::Scalar(v) => Ok(v.as_ref()),
Value::Column(c) => c.index(idx).ok_or_else(|| {
ErrorCode::Internal(format!(
"index out of range while getting row scalar value from column. idx {}, len {}",
idx,
c.len()
))
}),
}
}
}

pub fn row_hash_of_columns(column_values: &[&Value<AnyType>], row_idx: usize) -> Result<u128> {
let mut sip = sip128::SipHasher24::new();
for col in column_values {
let value = match col {
Value::Scalar(v) => v.as_ref(),
Value::Column(c) => c
.index(row_idx)
.expect("column index out of range (calculate columns hash)"),
};

let value = col.row_scalar(row_idx)?;
match value {
ScalarRef::Number(v) => match v {
NumberScalar::UInt8(v) => sip.write_u8(v),
Expand Down Expand Up @@ -70,5 +85,5 @@ pub fn row_hash_of_columns<'a>(column_values: &'a [&'a Value<AnyType>], row_idx:
}
}
}
sip.finish128().as_u128()
Ok(sip.finish128().as_u128())
}
Original file line number Diff line number Diff line change
Expand Up @@ -177,41 +177,57 @@ impl MergeIntoOperationAggregator {
// aggregate mutations (currently, deletion only)
impl MergeIntoOperationAggregator {
#[async_backtrace::framed]
pub async fn accumulate(&mut self, merge_action: MergeIntoOperation) -> Result<()> {
pub async fn accumulate(&mut self, merge_into_operation: MergeIntoOperation) -> Result<()> {
let aggregation_ctx = &self.aggregation_ctx;
match merge_action {
MergeIntoOperation::Delete(DeletionByColumn {
columns_min_max,
key_hashes,
}) => {
match merge_into_operation {
MergeIntoOperation::Delete(partitions) => {
for (segment_index, (path, ver)) in &aggregation_ctx.segment_locations {
// segment level
let load_param = LoadParams {
location: path.clone(),
len_hint: None,
ver: *ver,
put_cache: true,
};
// for typical configuration, segment cache is enabled, thus after the first loop, we are reading from cache
let segment_info = aggregation_ctx.segment_reader.read(&load_param).await?;
let segment_info: SegmentInfo = segment_info.as_ref().try_into()?;

// segment level
if aggregation_ctx.overlapped(&segment_info.summary.col_stats, &columns_min_max)
let compact_segment_info =
aggregation_ctx.segment_reader.read(&load_param).await?;
let mut segment_info: Option<SegmentInfo> = None;

for DeletionByColumn {
columns_min_max,
key_hashes,
} in &partitions
{
// block level
let mut num_blocks_mutated = 0;
for (block_index, block_meta) in segment_info.blocks.iter().enumerate() {
if aggregation_ctx.overlapped(&block_meta.col_stats, &columns_min_max) {
num_blocks_mutated += 1;
self.deletion_accumulator.add_block_deletion(
*segment_index,
block_index,
&key_hashes,
)
if aggregation_ctx
.overlapped(&compact_segment_info.summary.col_stats, columns_min_max)
{
let seg = match &segment_info {
None => {
// un-compact the segment if necessary
segment_info = Some(compact_segment_info.as_ref().try_into()?);
segment_info.as_ref().unwrap()
}
Some(v) => v,
};

// block level
for (block_index, block_meta) in seg.blocks.iter().enumerate() {
if aggregation_ctx
.overlapped(&block_meta.col_stats, columns_min_max)
{
self.deletion_accumulator.add_block_deletion(
*segment_index,
block_index,
key_hashes,
)
}
}
}
metrics_inc_replace_block_number_after_pruning(num_blocks_mutated);
}

metrics_inc_replace_block_number_after_pruning(
self.deletion_accumulator.deletions.len() as u64,
);
}
}
MergeIntoOperation::None => {}
Expand Down Expand Up @@ -338,7 +354,7 @@ impl AggregationContext {

let mut bitmap = MutableBitmap::new();
for row in 0..num_rows {
let hash = row_hash_of_columns(&columns, row);
let hash = row_hash_of_columns(&columns, row)?;
bitmap.push(!deleted_key_hashes.contains(&hash));
}

Expand Down
Loading