Skip to content

Commit

Permalink
refactor: refine replace into pruning for table with cluster keys (#1โ€ฆ
Browse files Browse the repository at this point in the history
โ€ฆ2147)

* refactor: refine replace into pruning

* parition rows (WIP)

* partition by left most cluster key

* more metric

* add new setting enable_replace_into_partitioning

* refine merge_into_mutator

* only un-compact the segment info when necessary

* minor gc

* chore

* adjust metric

* fix typos
  • Loading branch information
dantengsky authored Jul 24, 2023
1 parent ec26e8d commit fd255f8
Show file tree
Hide file tree
Showing 11 changed files with 381 additions and 58 deletions.
2 changes: 1 addition & 1 deletion src/query/expression/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ impl DataBlock {
}

Ok(Self {
columns: self.columns.clone(),
columns: self.columns,
num_rows: self.num_rows,
meta,
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ DB.Table: 'system'.'settings', Table: settings-table_id:1, ver:0, Engine: System
| 'enable_distributed_copy_into' | '0' | '0' | 'SESSION' | 'Enable distributed execution of copy into.' | 'UInt64' |
| 'enable_dphyp' | '1' | '1' | 'SESSION' | 'Enables dphyp join order algorithm.' | 'UInt64' |
| 'enable_query_result_cache' | '0' | '0' | 'SESSION' | 'Enables caching query results to improve performance for identical queries.' | 'UInt64' |
| 'enable_replace_into_partitioning' | '1' | '1' | 'SESSION' | 'Enables partitioning for replace-into statement (if table has cluster keys).' | 'UInt64' |
| 'enable_runtime_filter' | '0' | '0' | 'SESSION' | 'Enables runtime filter optimization for JOIN.' | 'UInt64' |
| 'enable_table_lock' | '1' | '1' | 'SESSION' | 'Enables table lock if necessary (enabled by default).' | 'UInt64' |
| 'flight_client_timeout' | '60' | '60' | 'SESSION' | 'Sets the maximum time in seconds that a flight client request can be processed.' | 'UInt64' |
Expand Down
6 changes: 6 additions & 0 deletions src/query/settings/src/settings_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,12 @@ impl DefaultSettings {
possible_values: None,
display_in_show_settings: true,
}),
("enable_replace_into_partitioning", DefaultSettingValue {
value: UserSettingValue::UInt64(1),
desc: "Enables partitioning for replace-into statement (if table has cluster keys).",
possible_values: None,
display_in_show_settings: true,
}),
]);

Ok(Arc::new(DefaultSettings {
Expand Down
7 changes: 7 additions & 0 deletions src/query/settings/src/settings_getter_setter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,4 +406,11 @@ impl Settings {
pub fn set_use_parquet2(&self, val: bool) -> Result<()> {
self.try_set_u64("use_parquet2", u64::from(val))
}

pub fn get_enable_replace_into_partitioning(&self) -> Result<bool> {
Ok(self.try_get_u64("enable_replace_into_partitioning")? != 0)
}
pub fn set_enable_replace_into_partitioning(&self, val: bool) -> Result<()> {
self.try_set_u64("enable_replace_into_partitioning", u64::from(val))
}
}
15 changes: 15 additions & 0 deletions src/query/storages/fuse/src/metrics/fuse_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,3 +228,18 @@ pub fn metrics_inc_replace_whole_block_deletion(c: u64) {
pub fn metrics_inc_replace_block_of_zero_row_deleted(c: u64) {
increment_gauge!(key!("replace_into_block_of_zero_row_deleted"), c as f64);
}

pub fn metrics_inc_replace_original_row_number(c: u64) {
increment_gauge!(key!("replace_into_original_row_number"), c as f64);
}

pub fn metrics_inc_replace_row_number_after_table_level_pruning(c: u64) {
increment_gauge!(
key!("replace_into_row_number_after_table_level_pruning"),
c as f64
);
}

pub fn metrics_inc_replace_partition_number(c: u64) {
increment_gauge!(key!("replace_into_partition_number"), c as f64);
}
19 changes: 12 additions & 7 deletions src/query/storages/fuse/src/operations/replace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,17 +152,22 @@ impl FuseTable {
Arc::new(TableSnapshot::new_empty_snapshot(schema.as_ref().clone()))
});

let empty_table = base_snapshot.segments.is_empty();
let replace_into_processor =
ReplaceIntoProcessor::create(on_conflicts.clone(), empty_table);
let table_is_empty = base_snapshot.segments.is_empty();
let table_level_range_index = base_snapshot.summary.col_stats.clone();
let cluster_keys = self.cluster_keys(ctx.clone());
let replace_into_processor = ReplaceIntoProcessor::create(
ctx.as_ref(),
on_conflicts.clone(),
cluster_keys,
schema.as_ref(),
table_is_empty,
table_level_range_index,
)?;

pipeline.add_pipe(replace_into_processor.into_pipe());

// 3. connect to broadcast processor and append transform

let base_snapshot = self.read_table_snapshot().await?.unwrap_or_else(|| {
Arc::new(TableSnapshot::new_empty_snapshot(schema.as_ref().clone()))
});

let max_threads = ctx.get_settings().get_max_threads()?;
let segment_partition_num =
std::cmp::min(base_snapshot.segments.len(), max_threads as usize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,15 @@ use common_expression::BlockMetaInfoDowncast;
use common_expression::DataBlock;
use common_expression::Scalar;

// This mod need to be refactored, since it not longer aiming to be
// used in the implementation of `MERGE INTO` statement in the future.
//
// unfortunately, distributed `replace-into` is being implemented in parallel,
// to avoid the potential heavy merge conflicts, the refactoring is postponed.

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)]
pub enum MergeIntoOperation {
Delete(DeletionByColumn),
Delete(Vec<DeletionByColumn>),
None,
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

use std::hash::Hasher;

use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::types::decimal::DecimalScalar;
use common_expression::types::AnyType;
use common_expression::types::DecimalSize;
Expand All @@ -23,16 +25,29 @@ use common_expression::Value;
use siphasher::sip128;
use siphasher::sip128::Hasher128;

pub fn row_hash_of_columns<'a>(column_values: &'a [&'a Value<AnyType>], row_idx: usize) -> u128 {
pub(crate) trait RowScalarValue {
fn row_scalar(&self, idx: usize) -> Result<ScalarRef>;
}

impl RowScalarValue for Value<AnyType> {
fn row_scalar(&self, idx: usize) -> Result<ScalarRef> {
match self {
Value::Scalar(v) => Ok(v.as_ref()),
Value::Column(c) => c.index(idx).ok_or_else(|| {
ErrorCode::Internal(format!(
"index out of range while getting row scalar value from column. idx {}, len {}",
idx,
c.len()
))
}),
}
}
}

pub fn row_hash_of_columns(column_values: &[&Value<AnyType>], row_idx: usize) -> Result<u128> {
let mut sip = sip128::SipHasher24::new();
for col in column_values {
let value = match col {
Value::Scalar(v) => v.as_ref(),
Value::Column(c) => c
.index(row_idx)
.expect("column index out of range (calculate columns hash)"),
};

let value = col.row_scalar(row_idx)?;
match value {
ScalarRef::Number(v) => match v {
NumberScalar::UInt8(v) => sip.write_u8(v),
Expand Down Expand Up @@ -70,5 +85,5 @@ pub fn row_hash_of_columns<'a>(column_values: &'a [&'a Value<AnyType>], row_idx:
}
}
}
sip.finish128().as_u128()
Ok(sip.finish128().as_u128())
}
Original file line number Diff line number Diff line change
Expand Up @@ -177,41 +177,57 @@ impl MergeIntoOperationAggregator {
// aggregate mutations (currently, deletion only)
impl MergeIntoOperationAggregator {
#[async_backtrace::framed]
pub async fn accumulate(&mut self, merge_action: MergeIntoOperation) -> Result<()> {
pub async fn accumulate(&mut self, merge_into_operation: MergeIntoOperation) -> Result<()> {
let aggregation_ctx = &self.aggregation_ctx;
match merge_action {
MergeIntoOperation::Delete(DeletionByColumn {
columns_min_max,
key_hashes,
}) => {
match merge_into_operation {
MergeIntoOperation::Delete(partitions) => {
for (segment_index, (path, ver)) in &aggregation_ctx.segment_locations {
// segment level
let load_param = LoadParams {
location: path.clone(),
len_hint: None,
ver: *ver,
put_cache: true,
};
// for typical configuration, segment cache is enabled, thus after the first loop, we are reading from cache
let segment_info = aggregation_ctx.segment_reader.read(&load_param).await?;
let segment_info: SegmentInfo = segment_info.as_ref().try_into()?;

// segment level
if aggregation_ctx.overlapped(&segment_info.summary.col_stats, &columns_min_max)
let compact_segment_info =
aggregation_ctx.segment_reader.read(&load_param).await?;
let mut segment_info: Option<SegmentInfo> = None;

for DeletionByColumn {
columns_min_max,
key_hashes,
} in &partitions
{
// block level
let mut num_blocks_mutated = 0;
for (block_index, block_meta) in segment_info.blocks.iter().enumerate() {
if aggregation_ctx.overlapped(&block_meta.col_stats, &columns_min_max) {
num_blocks_mutated += 1;
self.deletion_accumulator.add_block_deletion(
*segment_index,
block_index,
&key_hashes,
)
if aggregation_ctx
.overlapped(&compact_segment_info.summary.col_stats, columns_min_max)
{
let seg = match &segment_info {
None => {
// un-compact the segment if necessary
segment_info = Some(compact_segment_info.as_ref().try_into()?);
segment_info.as_ref().unwrap()
}
Some(v) => v,
};

// block level
for (block_index, block_meta) in seg.blocks.iter().enumerate() {
if aggregation_ctx
.overlapped(&block_meta.col_stats, columns_min_max)
{
self.deletion_accumulator.add_block_deletion(
*segment_index,
block_index,
key_hashes,
)
}
}
}
metrics_inc_replace_block_number_after_pruning(num_blocks_mutated);
}

metrics_inc_replace_block_number_after_pruning(
self.deletion_accumulator.deletions.len() as u64,
);
}
}
MergeIntoOperation::None => {}
Expand Down Expand Up @@ -338,7 +354,7 @@ impl AggregationContext {

let mut bitmap = MutableBitmap::new();
for row in 0..num_rows {
let hash = row_hash_of_columns(&columns, row);
let hash = row_hash_of_columns(&columns, row)?;
bitmap.push(!deleted_key_hashes.contains(&hash));
}

Expand Down
Loading

1 comment on commit fd255f8

@vercel
Copy link

@vercel vercel bot commented on fd255f8 Jul 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

databend โ€“ ./

databend-git-main-databend.vercel.app
databend-databend.vercel.app
databend.rs
databend.vercel.app

Please sign in to comment.