Skip to content

Commit

Permalink
refactor copy into
Browse files Browse the repository at this point in the history
  • Loading branch information
SkyFan2002 committed Jul 17, 2023
1 parent 929c6be commit b3a6add
Show file tree
Hide file tree
Showing 9 changed files with 103 additions and 309 deletions.
220 changes: 19 additions & 201 deletions src/query/service/src/interpreters/interpreter_copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,9 @@
// limitations under the License.

use std::sync::Arc;
use std::time::Instant;

use common_catalog::plan::StageTableInfo;
use common_catalog::table::AppendMode;
use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::infer_table_schema;
use common_expression::BlockThresholds;
Expand All @@ -28,29 +26,22 @@ use common_meta_app::principal::StageInfo;
use common_pipeline_core::Pipeline;
use common_sql::executor::table_read_plan::ToReadDataSourcePlan;
use common_sql::executor::CopyIntoTable;
use common_sql::executor::CopyIntoTableFromQuery;
use common_sql::executor::CopyIntoTableSource;
use common_sql::executor::Exchange;
use common_sql::executor::FragmentKind;
use common_sql::executor::PhysicalPlan;
use common_sql::plans::CopyIntoTableMode;
use common_sql::plans::CopyIntoTablePlan;
use common_storage::StageFileInfo;
use common_storage::StageFilesInfo;
use common_storages_stage::StageTable;
use itertools::Merge;
use tracing::info;

use crate::interpreters::common::check_deduplicate_label;
use crate::interpreters::Interpreter;
use crate::interpreters::SelectInterpreter;
use crate::pipelines::builders::build_append2table_with_commit_pipeline;
use crate::pipelines::builders::build_append_data_pipeline;
use crate::pipelines::builders::build_commit_data_pipeline;
use crate::pipelines::builders::set_copy_on_finished;
use crate::pipelines::builders::CopyPlanType;
use crate::pipelines::PipelineBuildResult;
use crate::schedulers::build_distributed_pipeline;
use crate::schedulers::build_query_pipeline_without_render_result_set;
use crate::sessions::QueryContext;
use crate::sessions::TableContext;
Expand Down Expand Up @@ -149,18 +140,21 @@ impl CopyInterpreter {
}

#[async_backtrace::framed]
async fn build_physical_plan(&self, plan: &CopyIntoTablePlan) -> Result<PhysicalPlan> {
async fn build_physical_plan(
&self,
plan: &CopyIntoTablePlan,
) -> Result<(PhysicalPlan, Vec<StageFileInfo>)> {
let to_table = self
.ctx
.get_table(&plan.catalog_name, &plan.database_name, &plan.table_name)
.await?;
let files = plan.collect_files(self.ctx.as_ref()).await?;
let source = if let Some(ref query) = plan.query {
let (select_interpreter, _) = self.build_query(plan.query.as_ref().unwrap()).await?;
CopyIntoTableSource::Query(select_interpreter.build_physical_plan().await?)
let (select_interpreter, _) = self.build_query(query).await?;
CopyIntoTableSource::Query(Box::new(select_interpreter.build_physical_plan().await?))
} else {
let stage_table_info = StageTableInfo {
files_to_copy: Some(files),
files_to_copy: Some(files.clone()),
..plan.stage_table_info.clone()
};
let stage_table = StageTable::try_create(stage_table_info)?;
Expand Down Expand Up @@ -189,7 +183,7 @@ impl CopyInterpreter {
write_mode: plan.write_mode,
validation_mode: plan.validation_mode.clone(),

files,
files: files.clone(),
source,
}));
if plan.enable_distributed {
Expand All @@ -200,7 +194,7 @@ impl CopyInterpreter {
keys: Vec::new(),
});
}
Ok(root)
Ok((root, files))
}

#[async_backtrace::framed]
Expand Down Expand Up @@ -234,190 +228,6 @@ impl CopyInterpreter {

Ok(())
}

/// Build a COPY pipeline in standalone mode.
#[async_backtrace::framed]
async fn build_local_copy_into_table_pipeline(
&self,
plan: &CopyIntoTablePlan,
) -> Result<PipelineBuildResult> {
let catalog = plan.catalog_name.as_str();
let database = plan.database_name.as_str();
let table = plan.table_name.as_str();

let ctx = self.ctx.clone();
let to_table = ctx.get_table(catalog, database, table).await?;

let mut build_res;
let source_schema;
let files;

match &plan.query {
None => {
files = plan.collect_files(self.ctx.as_ref()).await?;
source_schema = plan.required_source_schema.clone();
build_res = PipelineBuildResult::create();
if files.is_empty() {
return Ok(build_res);
}
self.build_read_stage_table_data_pipeline(
&mut build_res.main_pipeline,
plan,
to_table.get_block_thresholds(),
files.clone(),
)
.await?;
}
Some(query) => {
files = plan
.stage_table_info
.files_to_copy
.clone()
.ok_or(ErrorCode::Internal("files_to_copy should not be None"))?;

let (select_interpreter, query_source_schema) = self.build_query(query).await?;
let plan = select_interpreter.build_physical_plan().await?;
build_res = select_interpreter.build_pipeline(plan).await?;
source_schema = query_source_schema;
}
}

let file_sizes: u64 = files.iter().map(|f| f.size).sum();

// Append data.
{
self.set_status(&format!(
"Copy begin to append data: {} files, size_in_bytes:{} into table",
files.len(),
file_sizes
));

let start = Instant::now();
build_append_data_pipeline(
ctx.clone(),
&mut build_res.main_pipeline,
CopyPlanType::CopyIntoTablePlanOption(plan.clone()),
source_schema,
to_table.clone(),
)?;

// Perf
{
self.set_status(&format!(
"Copy append data finished, cost:{} secs",
start.elapsed().as_secs()
));
}
}

// Commit data.
{
// if it's replace mode, don't commit, because COPY is the source of replace.
match plan.write_mode {
CopyIntoTableMode::Replace => set_copy_on_finished(
ctx,
files,
plan.stage_table_info.stage_info.copy_options.purge,
plan.stage_table_info.stage_info.clone(),
&mut build_res.main_pipeline,
)?,
_ => {
// commit.
build_commit_data_pipeline(
ctx.clone(),
&mut build_res.main_pipeline,
plan.stage_table_info.stage_info.clone(),
to_table,
files,
plan.force,
plan.stage_table_info.stage_info.copy_options.purge,
plan.write_mode.is_overwrite(),
)?
}
}
}

Ok(build_res)
}

/// Build distributed pipeline from source node id.
#[async_backtrace::framed]
async fn build_cluster_copy_into_table_pipeline(
&self,
distributed_plan: &CopyPlanType,
) -> Result<PipelineBuildResult> {
let (
catalog_name,
database_name,
table_name,
stage_info,
files,
force,
purge,
is_overwrite,
);
let mut build_res = match distributed_plan {
CopyPlanType::CopyIntoTableFromStage(plan) => {
catalog_name = plan.catalog_name.clone();
database_name = plan.database_name.clone();
table_name = plan.table_name.clone();
stage_info = plan.stage_table_info.stage_info.clone();
files = plan.files.clone();
force = plan.force;
purge = plan.stage_table_info.stage_info.copy_options.purge;
is_overwrite = plan.write_mode.is_overwrite();
// add exchange plan node to enable distributed
// TODO(leiysky): we reuse the id of exchange here,
// which is not correct. We should generate a new id for insert.
let exchange_plan = PhysicalPlan::Exchange(Exchange {
plan_id: 0,
input: Box::new(PhysicalPlan::CopyIntoTable(Box::new(plan.clone()))),
kind: FragmentKind::Merge,
keys: Vec::new(),
});

build_distributed_pipeline(&self.ctx, &exchange_plan, false).await?
}
CopyPlanType::CopyIntoTableFromQuery(plan) => {
catalog_name = plan.catalog_name.clone();
database_name = plan.database_name.clone();
table_name = plan.table_name.clone();
stage_info = plan.stage_table_info.stage_info.clone();
files = plan.files.clone();
force = plan.force;
purge = plan.stage_table_info.stage_info.copy_options.purge;
is_overwrite = plan.write_mode.is_overwrite();
// add exchange plan node to enable distributed
// TODO(leiysky): we reuse the id of exchange here,
// which is not correct. We should generate a new id
let exchange_plan = PhysicalPlan::Exchange(Exchange {
plan_id: 0,
input: Box::new(PhysicalPlan::CopyIntoTableFromQuery(Box::new(plan.clone()))),
kind: FragmentKind::Merge,
keys: Vec::new(),
});
build_distributed_pipeline(&self.ctx, &exchange_plan, false).await?
}
_ => unreachable!(),
};
let to_table = self
.ctx
.get_table(&catalog_name, &database_name, &table_name)
.await?;

// commit.
build_commit_data_pipeline(
self.ctx.clone(),
&mut build_res.main_pipeline,
stage_info,
to_table,
files,
force,
purge,
is_overwrite,
)?;
Ok(build_res)
}
}

#[async_trait::async_trait]
Expand All @@ -435,8 +245,16 @@ impl Interpreter for CopyInterpreter {

match &self.plan {
CopyPlan::IntoTable(plan) => {
let plan = self.build_physical_plan(plan).await?;
build_query_pipeline_without_render_result_set(&self.ctx, &plan, false).await
let (physical_plan, files) = self.build_physical_plan(plan).await?;
let mut build_res = build_query_pipeline_without_render_result_set(
&self.ctx,
&physical_plan,
false,
)
.await?;
build_commit_data_pipeline(&self.ctx, &mut build_res.main_pipeline, plan, &files)
.await?;
Ok(build_res)
}
CopyPlan::IntoStage {
stage, from, path, ..
Expand Down
Loading

0 comments on commit b3a6add

Please sign in to comment.