-
Notifications
You must be signed in to change notification settings - Fork 762
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature: Auto compact(re-cluster) for multiple table insertion statement #16437
Comments
To implement auto compact (re-cluster) for multiple table insertion statements, you can use the pub async fn hook_compact(
ctx: Arc<QueryContext>,
pipeline: &mut Pipeline,
compact_target: CompactTargetTableDescription,
trace_ctx: CompactHookTraceCtx,
lock_opt: LockTableOption,
) {
let op_name = trace_ctx.operation_name.clone();
if let Err(e) = do_hook_compact(ctx, pipeline, compact_target, trace_ctx, lock_opt).await {
info!("compact hook ({}) with error (ignored): {}", op_name, e);
}
}
async fn do_hook_compact(
ctx: Arc<QueryContext>,
pipeline: &mut Pipeline,
compact_target: CompactTargetTableDescription,
trace_ctx: CompactHookTraceCtx,
lock_opt: LockTableOption,
) -> Result<()> {
if pipeline.is_empty() {
return Ok(());
}
pipeline.set_on_finished(move |info: &ExecutionInfo| {
let compaction_limits = match compact_target.mutation_kind {
MutationKind::Insert => {
let compaction_num_block_hint = ctx.get_compaction_num_block_hint();
info!("hint number of blocks need to be compacted {}", compaction_num_block_hint);
if compaction_num_block_hint == 0 {
return Ok(());
}
CompactionLimits {
segment_limit: None,
block_limit: Some(compaction_num_block_hint as usize),
}
}
_ => {
let auto_compaction_segments_limit = ctx.get_settings().get_auto_compaction_segments_limit()?;
CompactionLimits {
segment_limit: Some(auto_compaction_segments_limit as usize),
block_limit: None,
}
}
};
metrics_inc_compact_hook_main_operation_time_ms(&trace_ctx.operation_name, trace_ctx.start.elapsed().as_millis() as u64);
let compact_start_at = Instant::now();
if info.res.is_ok() {
info!("execute {} finished successfully. running table optimization job.", &trace_ctx.operation_name);
match GlobalIORuntime::instance().block_on({
compact_table(ctx, compact_target, compaction_limits, lock_opt)
}) {
Ok(_) => {
info!("execute {} finished successfully. table optimization job finished.", &trace_ctx.operation_name);
}
Err(e) => { info!("execute {} finished successfully. table optimization job failed. {:?}", &trace_ctx.operation_name, e) }
}
}
metrics_inc_compact_hook_compact_time_ms(&trace_ctx.operation_name, compact_start_at.elapsed().as_millis() as u64);
Ok(())
});
Ok(())
}
async fn compact_table(
ctx: Arc<QueryContext>,
compact_target: CompactTargetTableDescription,
compaction_limits: CompactionLimits,
lock_opt: LockTableOption,
) -> Result<()> {
let table = ctx
.get_table(
&compact_target.catalog,
&compact_target.database,
&compact_target.table,
)
.await?;
let do_recluster = !table.cluster_keys(ctx.clone()).is_empty();
ctx.evict_table_from_cache(
&compact_target.catalog,
&compact_target.database,
&compact_target.table,
)?;
let mut build_res = if do_recluster {
let recluster = RelOperator::Recluster(Recluster {
catalog: compact_target.catalog,
database: compact_target.database,
table: compact_target.table,
filters: None,
limit: compaction_limits.segment_limit,
});
let s_expr = SExpr::create_leaf(Arc::new(recluster));
let recluster_interpreter =
ReclusterTableInterpreter::try_create(ctx.clone(), s_expr, lock_opt, false)?;
recluster_interpreter.execute2().await?
} else {
let compact_block = RelOperator::CompactBlock(OptimizeCompactBlock {
catalog: compact_target.catalog,
database: compact_target.database,
table: compact_target.table,
limit: compaction_limits,
});
let s_expr = SExpr::create_leaf(Arc::new(compact_block));
let compact_interpreter =
OptimizeCompactBlockInterpreter::try_create(ctx.clone(), s_expr, lock_opt, false)?;
compact_interpreter.execute2().await?
};
if build_res.main_pipeline.is_empty() {
return Ok(());
}
let settings = ctx.get_settings();
build_res.set_max_threads(settings.get_max_threads()? as usize);
let settings = ExecutorSettings::try_create(ctx.clone())?;
if build_res.main_pipeline.is_complete_pipeline()? {
let mut pipelines = build_res.sources_pipelines;
pipelines.push(build_res.main_pipeline);
let complete_executor = PipelineCompleteExecutor::from_pipelines(pipelines, settings)?;
let progress_value = ctx.get_write_progress_value();
ctx.clear_segment_locations()?;
ctx.set_executor(complete_executor.get_inner())?;
complete_executor.execute()?;
drop(complete_executor);
ctx.get_write_progress().set(&progress_value);
}
Ok(())
} This code sets up a hook to perform compaction and re-clustering after the insertion pipeline finishes. The Additionally, you can refer to the statement ok
drop table if exists t_compact_0
statement ok
set enable_distributed_compact = 1
statement ok
create table t_compact_0 (a int not null) row_per_block=5 block_per_segment=5
statement ok
insert into t_compact_0 select 50 - number from numbers(100)
statement ok
insert into t_compact_0 select 50 - number from numbers(100)
statement ok
insert into t_compact_0 select 50 - number from numbers(100)
query II
select count(),sum(a) from t_compact_0
----
300 150
statement ok
alter table t_compact_0 set options(row_per_block=10,block_per_segment=10)
statement ok
optimize table t_compact_0 compact
query I
select count() from fuse_snapshot('default', 't_compact_0')
----
4
query II
select count(),sum(a) from t_compact_0
----
300 150
statement ok
alter table t_compact_0 cluster by(abs(a))
statement ok
optimize table t_compact_0 compact
query I
select count() from fuse_snapshot('default', 't_compact_0')
----
6
query II
select count(),sum(a) from t_compact_0
----
300 150 This test script demonstrates creating a table, inserting data, altering table options, and performing compaction and re-clustering operations [2]. Furthermore, the impl PipelineBuilder {
pub(crate) fn build_commit_sink(&mut self, plan: &PhysicalCommitSink) -> Result<()> {
self.build_pipeline(&plan.input)?;
let table = self.ctx.build_table_by_table_info(&plan.table_info, None)?;
let table = FuseTable::try_from_table(table.as_ref())?;
let cluster_key_id = table.cluster_key_id();
self.main_pipeline.try_resize(1)?;
if plan.merge_meta {
self.main_pipeline
.add_accumulating_transformer(|| TransformMergeCommitMeta::create(cluster_key_id));
} else {
self.main_pipeline.add_async_accumulating_transformer(|| {
let base_segments = if matches!(
plan.mutation_kind,
MutationKind::Compact | MutationKind::Insert | MutationKind::Recluster
) {
vec![]
} else {
plan.snapshot.segments().to_vec()
};
// extract re-cluster related mutations from physical plan
let recluster_info = plan.recluster_info.clone().unwrap_or_default();
TableMutationAggregator::create(
table,
self.ctx.clone(),
base_segments,
recluster_info.merged_blocks,
recluster_info.removed_segment_indexes,
recluster_info.removed_statistics,
plan.mutation_kind,
)
});
}
let snapshot_gen = MutationGenerator::new(plan.snapshot.clone(), plan.mutation_kind);
self.main_pipeline.add_sink(|input| {
CommitSink::try_create(
table,
self.ctx.clone(),
None,
plan.update_stream_meta.clone(),
snapshot_gen.clone(),
input,
None,
None,
plan.deduplicated_label.clone(),
)
})
}
} This implementation ensures that the pipeline is properly set up to handle the commit operations, including compaction and re-clustering, by adding the necessary transformers and sinks to the pipeline [3]. |
Summary
Auto compact(re-cluster) for multiple-table insertion.
The text was updated successfully, but these errors were encountered: