Skip to content

Commit

Permalink
refactor copy into
Browse files Browse the repository at this point in the history
  • Loading branch information
SkyFan2002 committed Jul 17, 2023
1 parent 60a4d3c commit b564f78
Show file tree
Hide file tree
Showing 13 changed files with 202 additions and 552 deletions.
348 changes: 61 additions & 287 deletions src/query/service/src/interpreters/interpreter_copy.rs

Large diffs are not rendered by default.

96 changes: 34 additions & 62 deletions src/query/service/src/pipelines/builders/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ use common_meta_app::principal::StageInfo;
use common_meta_app::schema::TableCopiedFileInfo;
use common_meta_app::schema::UpsertTableCopiedFileReq;
use common_pipeline_core::Pipeline;
use common_sql::executor::CopyIntoTableFromQuery;
use common_sql::executor::DistributedCopyIntoTableFromStage;
use common_sql::executor::CopyIntoTable;
use common_sql::plans::CopyIntoTableMode;
use common_sql::plans::CopyIntoTablePlan;
use common_storage::common_metrics::copy::metrics_inc_copy_purge_files_cost_milliseconds;
Expand All @@ -47,48 +46,18 @@ use crate::pipelines::processors::transforms::TransformAddConstColumns;
use crate::pipelines::processors::TransformCastSchema;
use crate::sessions::QueryContext;

pub enum CopyPlanType {
CopyIntoTablePlanOption(CopyIntoTablePlan),
DistributedCopyIntoTableFromStage(DistributedCopyIntoTableFromStage),
// also distributed plan, but we think the real distributed part is the query
// so no "distributed" prefix here.
CopyIntoTableFromQuery(CopyIntoTableFromQuery),
}

pub fn build_append_data_pipeline(
ctx: Arc<QueryContext>,
main_pipeline: &mut Pipeline,
plan: CopyPlanType,
plan: &CopyIntoTable,
source_schema: Arc<DataSchema>,
to_table: Arc<dyn Table>,
) -> Result<()> {
let plan_required_source_schema: DataSchemaRef;
let plan_required_values_schema: DataSchemaRef;
let plan_values_consts: Vec<Scalar>;
let plan_write_mode: CopyIntoTableMode;

match plan {
CopyPlanType::CopyIntoTablePlanOption(plan) => {
plan_required_source_schema = plan.required_source_schema;
plan_required_values_schema = plan.required_values_schema;
plan_values_consts = plan.values_consts;
plan_write_mode = plan.write_mode;
}
CopyPlanType::DistributedCopyIntoTableFromStage(plan) => {
plan_required_source_schema = plan.required_source_schema;
plan_required_values_schema = plan.required_values_schema;
plan_values_consts = plan.values_consts;
plan_write_mode = plan.write_mode;
}
CopyPlanType::CopyIntoTableFromQuery(plan) => {
plan_required_source_schema = plan.required_source_schema;
plan_required_values_schema = plan.required_values_schema;
plan_values_consts = plan.values_consts;
plan_write_mode = plan.write_mode;
}
}

if source_schema != plan_required_source_schema {
let plan_required_source_schema = &plan.required_source_schema;
let plan_values_consts = &plan.values_consts;
let plan_required_values_schema = &plan.required_values_schema;
let plan_write_mode = &plan.write_mode;
if &source_schema != plan_required_source_schema {
// only parquet need cast
let func_ctx = ctx.get_function_context()?;
main_pipeline.add_transform(|transform_input_port, transform_output_port| {
Expand Down Expand Up @@ -118,56 +87,59 @@ pub fn build_append_data_pipeline(
ctx,
main_pipeline,
to_table.clone(),
plan_required_values_schema,
plan_required_values_schema.clone(),
AppendMode::Copy,
)?,
CopyIntoTableMode::Replace => {}
CopyIntoTableMode::Copy => build_append2table_without_commit_pipeline(
ctx,
main_pipeline,
to_table.clone(),
plan_required_values_schema,
plan_required_values_schema.clone(),
AppendMode::Copy,
)?,
}
Ok(())
}

#[allow(clippy::too_many_arguments)]
pub fn build_commit_data_pipeline(
ctx: Arc<QueryContext>,
pub async fn build_commit_data_pipeline(
ctx: &Arc<QueryContext>,
main_pipeline: &mut Pipeline,
stage_info: StageInfo,
to_table: Arc<dyn Table>,
files: Vec<StageFileInfo>,
copy_force_option: bool,
copy_purge_option: bool,
insert_overwrite_option: bool,
plan: &CopyIntoTablePlan,
files: &[StageFileInfo],
) -> Result<()> {
let to_table = ctx
.get_table(&plan.catalog_name, &plan.database_name, &plan.table_name)
.await?;
// Source node will do:
// 1. commit
// 2. purge
// commit
let copied_files_meta_req = build_upsert_copied_files_to_meta_req(
ctx.clone(),
to_table.clone(),
stage_info.clone(),
files.clone(),
copy_force_option,
to_table.as_ref(),
&plan.stage_table_info.stage_info,
files,
plan.force,
)?;

to_table.commit_insertion(
ctx.clone(),
main_pipeline,
copied_files_meta_req,
insert_overwrite_option,
plan.write_mode.is_overwrite(),
)?;

// set on_finished callback.
set_copy_on_finished(ctx, files, copy_purge_option, stage_info, main_pipeline)?;
set_copy_on_finished(
ctx.clone(),
files.to_vec(),
plan.stage_table_info.stage_info.copy_options.purge,
plan.stage_table_info.stage_info.clone(),
main_pipeline,
)?;
Ok(())
}

pub fn set_copy_on_finished(
ctx: Arc<QueryContext>,
files: Vec<StageFileInfo>,
Expand Down Expand Up @@ -222,13 +194,13 @@ pub fn set_copy_on_finished(

pub fn build_upsert_copied_files_to_meta_req(
ctx: Arc<QueryContext>,
to_table: Arc<dyn Table>,
stage_info: StageInfo,
copied_files: Vec<StageFileInfo>,
to_table: &dyn Table,
stage_info: &StageInfo,
copied_files: &[StageFileInfo],
force: bool,
) -> Result<Option<UpsertTableCopiedFileReq>> {
let mut copied_file_tree = BTreeMap::new();
for file in &copied_files {
for file in copied_files {
// Short the etag to 7 bytes for less space in metasrv.
let short_etag = file.etag.clone().map(|mut v| {
v.truncate(7);
Expand Down Expand Up @@ -274,7 +246,7 @@ fn fill_const_columns(
pipeline: &mut Pipeline,
input_schema: DataSchemaRef,
output_schema: DataSchemaRef,
const_values: Vec<Scalar>,
const_values: &[Scalar],
) -> Result<()> {
pipeline.add_transform(|transform_input_port, transform_output_port| {
TransformAddConstColumns::try_create(
Expand All @@ -283,7 +255,7 @@ fn fill_const_columns(
transform_output_port,
input_schema.clone(),
output_schema.clone(),
const_values.clone(),
const_values.to_vec(),
)
})?;
Ok(())
Expand Down
1 change: 0 additions & 1 deletion src/query/service/src/pipelines/builders/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ pub use copy::build_append_data_pipeline;
pub use copy::build_commit_data_pipeline;
pub use copy::build_upsert_copied_files_to_meta_req;
pub use copy::set_copy_on_finished;
pub use copy::CopyPlanType;
pub use table::build_append2table_with_commit_pipeline;
pub use table::build_append2table_without_commit_pipeline;
pub use table::build_fill_missing_columns_pipeline;
80 changes: 36 additions & 44 deletions src/query/service/src/pipelines/pipeline_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::convert::TryFrom;
use std::sync::Arc;
use std::time::Instant;

use async_channel::Receiver;
use common_catalog::table::AppendMode;
Expand Down Expand Up @@ -54,10 +55,10 @@ use common_sql::executor::AggregateExpand;
use common_sql::executor::AggregateFinal;
use common_sql::executor::AggregateFunctionDesc;
use common_sql::executor::AggregatePartial;
use common_sql::executor::CopyIntoTableFromQuery;
use common_sql::executor::CopyIntoTable;
use common_sql::executor::CopyIntoTableSource;
use common_sql::executor::DeleteFinal;
use common_sql::executor::DeletePartial;
use common_sql::executor::DistributedCopyIntoTableFromStage;
use common_sql::executor::DistributedInsertSelect;
use common_sql::executor::EvalScalar;
use common_sql::executor::ExchangeSink;
Expand Down Expand Up @@ -86,6 +87,7 @@ use common_storages_fuse::operations::TransformSerializeBlock;
use common_storages_fuse::FuseTable;
use common_storages_stage::StageTable;
use petgraph::matrix_graph::Zero;
use tracing::info;

use super::processors::transforms::FrameBound;
use super::processors::transforms::WindowFunctionInfo;
Expand All @@ -94,7 +96,6 @@ use crate::api::DefaultExchangeInjector;
use crate::api::ExchangeInjector;
use crate::pipelines::builders::build_append_data_pipeline;
use crate::pipelines::builders::build_fill_missing_columns_pipeline;
use crate::pipelines::builders::CopyPlanType;
use crate::pipelines::processors::transforms::build_partition_bucket;
use crate::pipelines::processors::transforms::AggregateInjector;
use crate::pipelines::processors::transforms::FinalSingleStateAggregator;
Expand Down Expand Up @@ -176,6 +177,11 @@ impl PipelineBuilder {
})
}

fn set_status(&self, status: &str) {
self.ctx.set_status_info(status);
info!(status);
}

fn build_pipeline(&mut self, plan: &PhysicalPlan) -> Result<()> {
match plan {
PhysicalPlan::TableScan(scan) => self.build_table_scan(scan),
Expand Down Expand Up @@ -206,55 +212,41 @@ impl PipelineBuilder {
PhysicalPlan::DeletePartial(delete) => self.build_delete_partial(delete),
PhysicalPlan::DeleteFinal(delete) => self.build_delete_final(delete),
PhysicalPlan::RangeJoin(range_join) => self.build_range_join(range_join),
PhysicalPlan::DistributedCopyIntoTableFromStage(distributed_plan) => {
self.build_distributed_copy_into_table_from_stage(distributed_plan)
}
PhysicalPlan::CopyIntoTableFromQuery(copy_plan) => {
self.build_copy_into_table_from_query(copy_plan)
}
PhysicalPlan::CopyIntoTable(copy) => self.build_copy_into_table(copy),
}
}

fn build_copy_into_table_from_query(
&mut self,
copy_plan: &CopyIntoTableFromQuery,
) -> Result<()> {
self.build_pipeline(&copy_plan.input)?;
let catalog = self.ctx.get_catalog(&copy_plan.catalog_name)?;
let to_table = catalog.get_table_by_info(&copy_plan.table_info)?;
fn build_copy_into_table(&mut self, copy: &CopyIntoTable) -> Result<()> {
let catalog = self.ctx.get_catalog(&copy.catalog_name)?;
let to_table = catalog.get_table_by_info(&copy.table_info)?;
let file_sizes: u64 = copy.files.iter().map(|f| f.size).sum();
match &copy.source {
CopyIntoTableSource::Query(input) => {
self.build_pipeline(input)?;
}
CopyIntoTableSource::Stage(source) => {
let stage_table = StageTable::try_create(copy.stage_table_info.clone())?;
stage_table.set_block_thresholds(to_table.get_block_thresholds());
stage_table.read_data(self.ctx.clone(), source, &mut self.main_pipeline)?;
}
}
self.set_status(&format!(
"Copy begin to append data: {} files, size_in_bytes:{} into table",
copy.files.len(),
file_sizes
));
let start = Instant::now();
build_append_data_pipeline(
self.ctx.clone(),
&mut self.main_pipeline,
CopyPlanType::CopyIntoTableFromQuery(copy_plan.clone()),
copy_plan.required_source_schema.clone(),
to_table,
)?;
Ok(())
}

fn build_distributed_copy_into_table_from_stage(
&mut self,
distributed_plan: &DistributedCopyIntoTableFromStage,
) -> Result<()> {
let catalog = self.ctx.get_catalog(&distributed_plan.catalog_name)?;
let to_table = catalog.get_table_by_info(&distributed_plan.table_info)?;
let stage_table_info = distributed_plan.stage_table_info.clone();
let stage_table = StageTable::try_create(stage_table_info)?;
stage_table.set_block_thresholds(distributed_plan.thresholds);
let ctx = self.ctx.clone();
let table_ctx: Arc<dyn TableContext> = ctx.clone();

stage_table.read_data(table_ctx, &distributed_plan.source, &mut self.main_pipeline)?;

// append data
build_append_data_pipeline(
ctx,
&mut self.main_pipeline,
CopyPlanType::DistributedCopyIntoTableFromStage(distributed_plan.clone()),
distributed_plan.required_source_schema.clone(),
copy,
copy.required_source_schema.clone(),
to_table,
)?;

self.set_status(&format!(
"Copy append data finished, cost:{} secs",
start.elapsed().as_secs()
));
Ok(())
}

Expand Down
16 changes: 7 additions & 9 deletions src/query/service/src/schedulers/fragments/fragmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ use std::sync::Arc;

use common_catalog::table_context::TableContext;
use common_exception::Result;
use common_sql::executor::DistributedCopyIntoTableFromStage;
use common_sql::executor::CopyIntoTable;
use common_sql::executor::CopyIntoTableSource;
use common_sql::executor::FragmentKind;

use crate::api::BroadcastExchange;
Expand Down Expand Up @@ -139,14 +140,11 @@ impl PhysicalPlanReplacer for Fragmenter {
Ok(PhysicalPlan::TableScan(plan.clone()))
}

fn replace_copy_into_table(
&mut self,
plan: &DistributedCopyIntoTableFromStage,
) -> Result<PhysicalPlan> {
self.state = State::SelectLeaf;
Ok(PhysicalPlan::DistributedCopyIntoTableFromStage(Box::new(
plan.clone(),
)))
fn replace_copy_into_table(&mut self, plan: &CopyIntoTable) -> Result<PhysicalPlan> {
if let CopyIntoTableSource::Stage(_) = plan.source {
self.state = State::SelectLeaf;
}
Ok(PhysicalPlan::CopyIntoTable(Box::new(plan.clone())))
}

fn replace_delete_partial(
Expand Down
Loading

0 comments on commit b564f78

Please sign in to comment.