Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(query): use scan table physical plan for copy into table from stage #15016

Merged
merged 10 commits into from
Mar 20, 2024
Merged
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 0 additions & 4 deletions src/query/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,10 +318,6 @@ pub trait Table: Sync + Send {
}
}

fn set_block_thresholds(&self, _thresholds: BlockThresholds) {
unimplemented!()
}

#[async_backtrace::framed]
async fn compact_segments(
&self,
Expand Down
4 changes: 4 additions & 0 deletions src/query/catalog/src/table_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use databend_common_base::base::ProgressValues;
use databend_common_base::runtime::profile::Profile;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::BlockThresholds;
use databend_common_expression::DataBlock;
use databend_common_expression::Expr;
use databend_common_expression::FunctionContext;
Expand Down Expand Up @@ -268,4 +269,7 @@ pub trait TableContext: Send + Sync {

fn has_bloom_runtime_filters(&self, id: usize) -> bool;
fn txn_mgr(&self) -> TxnManagerRef;

fn get_read_block_thresholds(&self) -> BlockThresholds;
fn set_read_block_thresholds(&self, _thresholds: BlockThresholds);
}
55 changes: 35 additions & 20 deletions src/query/service/src/interpreters/interpreter_copy_into_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;
use std::sync::Arc;

use databend_common_exception::Result;
Expand All @@ -27,6 +28,7 @@ use databend_common_sql::executor::physical_plans::CopyIntoTableSource;
use databend_common_sql::executor::physical_plans::Exchange;
use databend_common_sql::executor::physical_plans::FragmentKind;
use databend_common_sql::executor::physical_plans::Project;
use databend_common_sql::executor::physical_plans::TableScan;
use databend_common_sql::executor::table_read_plan::ToReadDataSourcePlan;
use databend_common_sql::executor::PhysicalPlan;
use databend_common_storage::StageFileInfo;
Expand Down Expand Up @@ -94,7 +96,6 @@ impl CopyIntoTableInterpreter {
&self,
plan: &CopyIntoTablePlan,
) -> Result<(PhysicalPlan, Vec<UpdateStreamMetaReq>)> {
let mut next_plan_id = 0;
let to_table = self
.ctx
.get_table(
Expand All @@ -109,36 +110,46 @@ impl CopyIntoTableInterpreter {
update_stream_meta_reqs = update_stream_meta;
let query_physical_plan = Box::new(query_interpreter.build_physical_plan().await?);

let current_plan_id = query_physical_plan.get_id();
next_plan_id = current_plan_id + 2;
let result_columns = query_interpreter.get_result_columns();
CopyIntoTableSource::Query(Box::new(PhysicalPlan::Project(
Project::from_columns_binding(
current_plan_id + 1,
0,
query_physical_plan,
result_columns,
query_interpreter.get_ignore_result(),
)?,
)))
} else {
let stage_table = StageTable::try_create(plan.stage_table_info.clone())?;
let read_source_plan = Box::new(
stage_table
.read_plan_with_catalog(
self.ctx.clone(),
plan.catalog_info.catalog_name().to_string(),
None,
None,
false,
false,
)
.await?,
);
CopyIntoTableSource::Stage(read_source_plan)

let data_source_plan = stage_table
.read_plan_with_catalog(
self.ctx.clone(),
plan.catalog_info.catalog_name().to_string(),
None,
None,
false,
false,
)
.await?;

let mut name_mapping = BTreeMap::new();
for (idx, field) in data_source_plan.schema().fields.iter().enumerate() {
name_mapping.insert(field.name.clone(), idx);
}

CopyIntoTableSource::Stage(Box::new(PhysicalPlan::TableScan(TableScan {
plan_id: 0,
name_mapping,
stat_info: None,
table_index: None,
internal_column: None,
source: Box::new(data_source_plan),
})))
};

let mut root = PhysicalPlan::CopyIntoTable(Box::new(CopyIntoTable {
plan_id: next_plan_id,
plan_id: 0,
catalog_info: plan.catalog_info.clone(),
required_values_schema: plan.required_values_schema.clone(),
values_consts: plan.values_consts.clone(),
Expand All @@ -151,17 +162,21 @@ impl CopyIntoTableInterpreter {

source,
}));
next_plan_id += 1;

if plan.enable_distributed {
root = PhysicalPlan::Exchange(Exchange {
plan_id: next_plan_id,
plan_id: 0,
input: Box::new(root),
kind: FragmentKind::Merge,
keys: Vec::new(),
allow_adjust_parallelism: true,
ignore_exchange: false,
});
}

let mut next_plan_id = 0;
root.adjust_plan_id(&mut next_plan_id);

Ok((root, update_stream_meta_reqs))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ use databend_common_sql::executor::physical_plans::CopyIntoTable;
use databend_common_sql::executor::physical_plans::CopyIntoTableSource;
use databend_common_sql::plans::CopyIntoTableMode;
use databend_common_storage::StageFileInfo;
use databend_common_storages_stage::StageTable;
use log::debug;
use log::info;

Expand All @@ -51,10 +50,11 @@ impl PipelineBuilder {
self.build_pipeline(input)?;
input.output_schema()?
}
CopyIntoTableSource::Stage(source) => {
let stage_table = StageTable::try_create(copy.stage_table_info.clone())?;
stage_table.set_block_thresholds(to_table.get_block_thresholds());
stage_table.read_data(self.ctx.clone(), source, &mut self.main_pipeline, false)?;
CopyIntoTableSource::Stage(input) => {
self.ctx
.set_read_block_thresholds(to_table.get_block_thresholds());

self.build_pipeline(input)?;
copy.required_source_schema.clone()
}
};
Expand Down
8 changes: 4 additions & 4 deletions src/query/service/src/pipelines/builders/builder_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,13 @@ impl PipelineBuilder {

pub(crate) fn build_join(&mut self, join: &HashJoin) -> Result<()> {
// for merge into target table as build side.
let (merge_into_build_table_index, merge_into_is_distributed) =
let (enable_merge_into_optimization, merge_into_is_distributed) =
self.merge_into_get_optimization_flag(join);

let state = self.build_join_state(
join,
merge_into_build_table_index,
merge_into_is_distributed,
enable_merge_into_optimization,
)?;
self.expand_build_side_pipeline(&join.build, join, state.clone())?;
self.build_join_probe(join, state)
Expand All @@ -111,17 +111,17 @@ impl PipelineBuilder {
fn build_join_state(
&mut self,
join: &HashJoin,
merge_into_target_table_index: IndexType,
merge_into_is_distributed: bool,
enable_merge_into_optimization: bool,
) -> Result<Arc<HashJoinState>> {
HashJoinState::try_create(
self.ctx.clone(),
join.build.output_schema()?,
&join.build_projections,
HashJoinDesc::create(join)?,
&join.probe_to_build,
merge_into_target_table_index,
merge_into_is_distributed,
enable_merge_into_optimization,
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,22 @@

use databend_common_sql::executor::physical_plans::HashJoin;
use databend_common_sql::executor::PhysicalPlan;
use databend_common_sql::IndexType;
use databend_common_sql::DUMMY_TABLE_INDEX;
use databend_common_storages_fuse::operations::need_reserve_block_info;

use crate::pipelines::PipelineBuilder;

impl PipelineBuilder {
pub(crate) fn merge_into_get_optimization_flag(&self, join: &HashJoin) -> (IndexType, bool) {
pub(crate) fn merge_into_get_optimization_flag(&self, join: &HashJoin) -> (bool, bool) {
// for merge into target table as build side.
let (merge_into_build_table_index, merge_into_is_distributed) =
if let PhysicalPlan::TableScan(scan) = &*join.build {
let (need_block_info, is_distributed) =
need_reserve_block_info(self.ctx.clone(), scan.table_index);
if need_block_info {
(scan.table_index, is_distributed)
} else {
(DUMMY_TABLE_INDEX, false)
}
} else {
(DUMMY_TABLE_INDEX, false)
};
(merge_into_build_table_index, merge_into_is_distributed)
match &*join.build {
PhysicalPlan::TableScan(scan) => match scan.table_index {
None | Some(databend_common_sql::DUMMY_TABLE_INDEX) => (false, false),
Some(table_index) => match need_reserve_block_info(self.ctx.clone(), table_index) {
(true, is_distributed) => (true, is_distributed),
_ => (false, false),
},
},
_ => (false, false),
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ use databend_common_hashtable::HashJoinHashMap;
use databend_common_hashtable::HashtableKeyable;
use databend_common_sql::plans::JoinType;
use databend_common_sql::ColumnSet;
use databend_common_sql::IndexType;
use ethnum::U256;
use parking_lot::RwLock;

Expand Down Expand Up @@ -130,8 +129,8 @@ impl HashJoinState {
build_projections: &ColumnSet,
hash_join_desc: HashJoinDesc,
probe_to_build: &[(usize, (bool, bool))],
merge_into_target_table_index: IndexType,
merge_into_is_distributed: bool,
enable_merge_into_optimization: bool,
) -> Result<Arc<HashJoinState>> {
if matches!(
hash_join_desc.join_type,
Expand Down Expand Up @@ -161,10 +160,12 @@ impl HashJoinState {
_continue_build_dummy_receiver,
partition_id: AtomicI8::new(-2),
enable_spill,
merge_into_state: MergeIntoState::try_create_merge_into_state(
merge_into_target_table_index,
merge_into_is_distributed,
),
merge_into_state: match enable_merge_into_optimization {
false => None,
true => Some(MergeIntoState::create_merge_into_state(
merge_into_is_distributed,
)),
},
}))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ use databend_common_expression::DataBlock;
use databend_common_hashtable::MergeIntoBlockInfoIndex;
use databend_common_hashtable::RowPtr;
use databend_common_sql::plans::JoinType;
use databend_common_sql::IndexType;
use databend_common_sql::DUMMY_TABLE_INDEX;
use databend_common_storages_fuse::operations::BlockMetaIndex;
use log::info;

Expand Down Expand Up @@ -59,21 +57,14 @@ pub struct MergeIntoState {
}

impl MergeIntoState {
pub(crate) fn try_create_merge_into_state(
merge_into_target_table_index: IndexType,
merge_into_is_distributed: bool,
) -> Option<SyncUnsafeCell<Self>> {
if merge_into_target_table_index != DUMMY_TABLE_INDEX {
Some(SyncUnsafeCell::new(MergeIntoState {
merge_into_is_distributed,
block_info_index: Default::default(),
matched: Vec::new(),
atomic_pointer: MatchedPtr(std::ptr::null_mut()),
chunk_offsets: Vec::with_capacity(100),
}))
} else {
None
}
pub(crate) fn create_merge_into_state(is_distributed: bool) -> SyncUnsafeCell<Self> {
SyncUnsafeCell::new(MergeIntoState {
merge_into_is_distributed: is_distributed,
block_info_index: Default::default(),
matched: Vec::new(),
atomic_pointer: MatchedPtr(std::ptr::null_mut()),
chunk_offsets: Vec::with_capacity(100),
})
}
}

Expand Down
18 changes: 5 additions & 13 deletions src/query/service/src/schedulers/fragments/plan_fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,16 +426,10 @@ impl PlanFragment {

let mut data_sources = HashMap::new();

let mut collect_data_source = |plan: &PhysicalPlan| match plan {
PhysicalPlan::TableScan(scan) => {
let mut collect_data_source = |plan: &PhysicalPlan| {
if let PhysicalPlan::TableScan(scan) = plan {
data_sources.insert(scan.plan_id, *scan.source.clone());
}
PhysicalPlan::CopyIntoTable(copy) => {
if let Some(stage) = copy.source.as_stage().cloned() {
data_sources.insert(copy.plan_id, *stage);
}
}
_ => {}
};

PhysicalPlan::traverse(
Expand Down Expand Up @@ -481,12 +475,10 @@ impl PhysicalPlanReplacer for ReplaceReadSource {
..plan.clone()
})))
}
CopyIntoTableSource::Stage(_) => {
let source = self.sources.remove(&plan.plan_id).ok_or_else(|| {
ErrorCode::Internal("Cannot find data source for copy into plan")
})?;
CopyIntoTableSource::Stage(v) => {
let input = self.replace(v)?;
Ok(PhysicalPlan::CopyIntoTable(Box::new(CopyIntoTable {
source: CopyIntoTableSource::Stage(Box::new(source)),
source: CopyIntoTableSource::Stage(Box::new(input)),
..plan.clone()
})))
}
Expand Down
11 changes: 11 additions & 0 deletions src/query/service/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ use databend_common_config::DATABEND_COMMIT_VERSION;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::date_helper::TzFactory;
use databend_common_expression::BlockThresholds;
use databend_common_expression::DataBlock;
use databend_common_expression::Expr;
use databend_common_expression::FunctionContext;
Expand Down Expand Up @@ -121,6 +122,7 @@ pub struct QueryContext {
version: String,
mysql_version: String,
clickhouse_version: String,
block_threshold: Arc<RwLock<BlockThresholds>>,
partition_queue: Arc<RwLock<VecDeque<PartInfoPtr>>>,
shared: Arc<QueryContextShared>,
query_settings: Arc<Settings>,
Expand Down Expand Up @@ -148,6 +150,7 @@ impl QueryContext {
query_settings,
fragment_id: Arc::new(AtomicUsize::new(0)),
inserted_segment_locs: Arc::new(RwLock::new(HashSet::new())),
block_threshold: Arc::new(RwLock::new(BlockThresholds::default())),
})
}

Expand Down Expand Up @@ -1045,6 +1048,14 @@ impl TableContext for QueryContext {
fn txn_mgr(&self) -> TxnManagerRef {
self.shared.session.session_ctx.txn_mgr()
}

fn get_read_block_thresholds(&self) -> BlockThresholds {
*self.block_threshold.read()
}

fn set_read_block_thresholds(&self, thresholds: BlockThresholds) {
*self.block_threshold.write() = thresholds;
}
}

impl TrySpawn for QueryContext {
Expand Down
Loading
Loading