diff --git a/src/query/service/src/interpreters/common/file.rs b/src/query/service/src/interpreters/common/file.rs new file mode 100644 index 0000000000000..11a8e89224d2b --- /dev/null +++ b/src/query/service/src/interpreters/common/file.rs @@ -0,0 +1,115 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io; +use std::sync::Arc; + +use chrono::TimeZone; +use chrono::Utc; +use common_catalog::table_context::TableContext; +use common_exception::Result; +use common_meta_types::StageFile; +use common_meta_types::UserStageInfo; +use common_storages_factory::stage::StageTable; +use futures_util::TryStreamExt; +use tracing::debug; +use tracing::warn; + +use crate::sessions::QueryContext; + +pub async fn stat_file( + ctx: &Arc, + stage: &UserStageInfo, + path: &str, +) -> Result { + let table_ctx: Arc = ctx.clone(); + let op = StageTable::get_op(&table_ctx, stage)?; + let meta = op.object(path).metadata().await?; + Ok(StageFile { + path: path.to_string(), + size: meta.content_length(), + md5: meta.content_md5().map(str::to_string), + last_modified: meta + .last_modified() + .map_or(Utc::now(), |t| Utc.timestamp(t.unix_timestamp(), 0)), + creator: None, + etag: meta.etag().map(str::to_string), + }) +} + +/// List files from DAL in recursive way. +/// +/// - If input path is a dir, we will list it recursively. +/// - Or, we will append the file itself, and try to list `path/`. +/// - If not exist, we will try to list `path/` too. +/// +/// TODO(@xuanwo): return a stream instead. +pub async fn list_files( + ctx: &Arc, + stage: &UserStageInfo, + path: &str, +) -> Result> { + let table_ctx: Arc = ctx.clone(); + let op = StageTable::get_op(&table_ctx, stage)?; + let mut files = Vec::new(); + + // - If the path itself is a dir, return directly. + // - Otherwise, return a path suffix by `/` + // - If other errors happen, we will ignore them by returning None. + let dir_path = match op.object(path).metadata().await { + Ok(meta) if meta.mode().is_dir() => Some(path.to_string()), + Ok(meta) if !meta.mode().is_dir() => { + files.push((path.to_string(), meta)); + + None + } + Err(e) if e.kind() == io::ErrorKind::NotFound => None, + Err(e) => return Err(e.into()), + _ => None, + }; + + // Check the if this dir valid and list it recursively. + if let Some(dir) = dir_path { + match op.object(&dir).metadata().await { + Ok(_) => { + let mut ds = op.batch().walk_top_down(&dir)?; + while let Some(de) = ds.try_next().await? { + if de.mode().is_file() { + let path = de.path().to_string(); + let meta = de.metadata().await; + files.push((path, meta)); + } + } + } + Err(e) => warn!("ignore listing {path}/, because: {:?}", e), + }; + } + + let matched_files = files + .into_iter() + .map(|(name, meta)| StageFile { + path: name, + size: meta.content_length(), + md5: meta.content_md5().map(str::to_string), + last_modified: meta + .last_modified() + .map_or(Utc::now(), |t| Utc.timestamp(t.unix_timestamp(), 0)), + creator: None, + etag: meta.etag().map(str::to_string), + }) + .collect::>(); + + debug!("listed files: {:?}", matched_files); + Ok(matched_files) +} diff --git a/src/query/service/src/interpreters/common/grant.rs b/src/query/service/src/interpreters/common/grant.rs new file mode 100644 index 0000000000000..fee8ed6f18ba4 --- /dev/null +++ b/src/query/service/src/interpreters/common/grant.rs @@ -0,0 +1,58 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_catalog::table_context::TableContext; +use common_exception::Result; +use common_meta_types::GrantObject; + +use crate::sessions::QueryContext; + +pub async fn validate_grant_object_exists( + ctx: &Arc, + object: &GrantObject, +) -> Result<()> { + let tenant = ctx.get_tenant(); + + match &object { + GrantObject::Table(catalog_name, database_name, table_name) => { + let catalog = ctx.get_catalog(catalog_name)?; + if !catalog + .exists_table(tenant.as_str(), database_name, table_name) + .await? + { + return Err(common_exception::ErrorCode::UnknownTable(format!( + "table {}.{} not exists", + database_name, table_name, + ))); + } + } + GrantObject::Database(catalog_name, database_name) => { + let catalog = ctx.get_catalog(catalog_name)?; + if !catalog + .exists_database(tenant.as_str(), database_name) + .await? + { + return Err(common_exception::ErrorCode::UnknownDatabase(format!( + "database {} not exists", + database_name, + ))); + } + } + GrantObject::Global => (), + } + + Ok(()) +} diff --git a/src/query/service/src/interpreters/common/mod.rs b/src/query/service/src/interpreters/common/mod.rs new file mode 100644 index 0000000000000..cce0cfe54066c --- /dev/null +++ b/src/query/service/src/interpreters/common/mod.rs @@ -0,0 +1,22 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod file; +mod grant; +mod table; + +pub use file::list_files; +pub use file::stat_file; +pub use grant::validate_grant_object_exists; +pub use table::append2table; diff --git a/src/query/service/src/interpreters/common/table.rs b/src/query/service/src/interpreters/common/table.rs new file mode 100644 index 0000000000000..15aa555cd59e8 --- /dev/null +++ b/src/query/service/src/interpreters/common/table.rs @@ -0,0 +1,86 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_base::base::GlobalIORuntime; +use common_catalog::table::Table; +use common_catalog::table_context::TableContext; +use common_datavalues::DataSchemaRef; +use common_exception::Result; +use common_pipeline_core::Pipeline; + +use crate::pipelines::processors::TransformAddOn; +use crate::pipelines::PipelineBuildResult; +use crate::sessions::QueryContext; + +fn fill_missing_columns( + ctx: Arc, + source_schema: &DataSchemaRef, + target_schema: &DataSchemaRef, + pipeline: &mut Pipeline, +) -> Result<()> { + let need_fill_missing_columns = target_schema != source_schema; + if need_fill_missing_columns { + pipeline.add_transform(|transform_input_port, transform_output_port| { + TransformAddOn::try_create( + transform_input_port, + transform_output_port, + source_schema.clone(), + target_schema.clone(), + ctx.clone(), + ) + })?; + } + Ok(()) +} + +pub fn append2table( + ctx: Arc, + table: Arc, + source_schema: DataSchemaRef, + build_res: &mut PipelineBuildResult, + overwrite: bool, + need_commit: bool, +) -> Result<()> { + fill_missing_columns( + ctx.clone(), + &source_schema, + &table.schema(), + &mut build_res.main_pipeline, + )?; + + table.append_data(ctx.clone(), &mut build_res.main_pipeline, false)?; + + if need_commit { + build_res.main_pipeline.set_on_finished(move |may_error| { + // capture out variable + let overwrite = overwrite; + let ctx = ctx.clone(); + let table = table.clone(); + + if may_error.is_none() { + let append_entries = ctx.consume_precommit_blocks(); + // We must put the commit operation to global runtime, which will avoid the "dispatch dropped without returning error" in tower + return GlobalIORuntime::instance().block_on(async move { + table.commit_insertion(ctx, append_entries, overwrite).await + }); + } + + Err(may_error.as_ref().unwrap().clone()) + }); + } + + Ok(()) +} diff --git a/src/query/service/src/interpreters/interpreter_common.rs b/src/query/service/src/interpreters/interpreter_common.rs deleted file mode 100644 index 28ca6c390eb8a..0000000000000 --- a/src/query/service/src/interpreters/interpreter_common.rs +++ /dev/null @@ -1,231 +0,0 @@ -// Copyright 2021 Datafuse Labs. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::io; -use std::sync::Arc; - -use chrono::TimeZone; -use chrono::Utc; -use common_base::base::GlobalIORuntime; -use common_datavalues::DataSchemaRef; -use common_exception::Result; -use common_meta_types::GrantObject; -use common_meta_types::StageFile; -use common_meta_types::UserStageInfo; -use common_pipeline_core::Pipeline; -use futures::TryStreamExt; -use tracing::debug; -use tracing::warn; - -use crate::pipelines::executor::ExecutorSettings; -use crate::pipelines::executor::PipelineCompleteExecutor; -use crate::pipelines::processors::TransformAddOn; -use crate::pipelines::PipelineBuildResult; -use crate::sessions::QueryContext; -use crate::sessions::TableContext; -use crate::storages::stage::StageTable; -use crate::storages::Table; - -pub fn fill_missing_columns( - ctx: Arc, - source_schema: &DataSchemaRef, - target_schema: &DataSchemaRef, - pipeline: &mut Pipeline, -) -> Result<()> { - let need_fill_missing_columns = target_schema != source_schema; - if need_fill_missing_columns { - pipeline.add_transform(|transform_input_port, transform_output_port| { - TransformAddOn::try_create( - transform_input_port, - transform_output_port, - source_schema.clone(), - target_schema.clone(), - ctx.clone(), - ) - })?; - } - Ok(()) -} - -pub fn append2table( - ctx: Arc, - table: Arc, - source_schema: DataSchemaRef, - build_res: &mut PipelineBuildResult, - overwrite: bool, - need_commit: bool, -) -> Result<()> { - fill_missing_columns( - ctx.clone(), - &source_schema, - &table.schema(), - &mut build_res.main_pipeline, - )?; - - table.append_data(ctx.clone(), &mut build_res.main_pipeline, false)?; - - if need_commit { - build_res.main_pipeline.set_on_finished(move |may_error| { - // capture out variable - let overwrite = overwrite; - let ctx = ctx.clone(); - let table = table.clone(); - - if may_error.is_none() { - let append_entries = ctx.consume_precommit_blocks(); - // We must put the commit operation to global runtime, which will avoid the "dispatch dropped without returning error" in tower - return GlobalIORuntime::instance().block_on(async move { - table.commit_insertion(ctx, append_entries, overwrite).await - }); - } - - Err(may_error.as_ref().unwrap().clone()) - }); - } - - Ok(()) -} - -pub fn execute_pipeline(ctx: Arc, mut res: PipelineBuildResult) -> Result<()> { - let executor_settings = ExecutorSettings::try_create(&ctx.get_settings())?; - res.set_max_threads(ctx.get_settings().get_max_threads()? as usize); - let mut pipelines = res.sources_pipelines; - pipelines.push(res.main_pipeline); - let executor = PipelineCompleteExecutor::from_pipelines(pipelines, executor_settings)?; - ctx.set_executor(Arc::downgrade(&executor.get_inner())); - executor.execute() -} - -pub async fn validate_grant_object_exists( - ctx: &Arc, - object: &GrantObject, -) -> Result<()> { - let tenant = ctx.get_tenant(); - - match &object { - GrantObject::Table(catalog_name, database_name, table_name) => { - let catalog = ctx.get_catalog(catalog_name)?; - if !catalog - .exists_table(tenant.as_str(), database_name, table_name) - .await? - { - return Err(common_exception::ErrorCode::UnknownTable(format!( - "table {}.{} not exists", - database_name, table_name, - ))); - } - } - GrantObject::Database(catalog_name, database_name) => { - let catalog = ctx.get_catalog(catalog_name)?; - if !catalog - .exists_database(tenant.as_str(), database_name) - .await? - { - return Err(common_exception::ErrorCode::UnknownDatabase(format!( - "database {} not exists", - database_name, - ))); - } - } - GrantObject::Global => (), - } - - Ok(()) -} - -pub async fn stat_file( - ctx: &Arc, - stage: &UserStageInfo, - path: &str, -) -> Result { - let table_ctx: Arc = ctx.clone(); - let op = StageTable::get_op(&table_ctx, stage)?; - let meta = op.object(path).metadata().await?; - Ok(StageFile { - path: path.to_string(), - size: meta.content_length(), - md5: meta.content_md5().map(str::to_string), - last_modified: meta - .last_modified() - .map_or(Utc::now(), |t| Utc.timestamp(t.unix_timestamp(), 0)), - creator: None, - etag: meta.etag().map(str::to_string), - }) -} - -/// List files from DAL in recursive way. -/// -/// - If input path is a dir, we will list it recursively. -/// - Or, we will append the file itself, and try to list `path/`. -/// - If not exist, we will try to list `path/` too. -/// -/// TODO(@xuanwo): return a stream instead. -pub async fn list_files( - ctx: &Arc, - stage: &UserStageInfo, - path: &str, -) -> Result> { - let table_ctx: Arc = ctx.clone(); - let op = StageTable::get_op(&table_ctx, stage)?; - let mut files = Vec::new(); - - // - If the path itself is a dir, return directly. - // - Otherwise, return a path suffix by `/` - // - If other errors happen, we will ignore them by returning None. - let dir_path = match op.object(path).metadata().await { - Ok(meta) if meta.mode().is_dir() => Some(path.to_string()), - Ok(meta) if !meta.mode().is_dir() => { - files.push((path.to_string(), meta)); - - None - } - Err(e) if e.kind() == io::ErrorKind::NotFound => None, - Err(e) => return Err(e.into()), - _ => None, - }; - - // Check the if this dir valid and list it recursively. - if let Some(dir) = dir_path { - match op.object(&dir).metadata().await { - Ok(_) => { - let mut ds = op.batch().walk_top_down(&dir)?; - while let Some(de) = ds.try_next().await? { - if de.mode().is_file() { - let path = de.path().to_string(); - let meta = de.metadata().await; - files.push((path, meta)); - } - } - } - Err(e) => warn!("ignore listing {path}/, because: {:?}", e), - }; - } - - let matched_files = files - .into_iter() - .map(|(name, meta)| StageFile { - path: name, - size: meta.content_length(), - md5: meta.content_md5().map(str::to_string), - last_modified: meta - .last_modified() - .map_or(Utc::now(), |t| Utc.timestamp(t.unix_timestamp(), 0)), - creator: None, - etag: meta.etag().map(str::to_string), - }) - .collect::>(); - - debug!("listed files: {:?}", matched_files); - Ok(matched_files) -} diff --git a/src/query/service/src/interpreters/interpreter_copy_v2.rs b/src/query/service/src/interpreters/interpreter_copy_v2.rs index 08ccd6396cbc2..9957024182f5d 100644 --- a/src/query/service/src/interpreters/interpreter_copy_v2.rs +++ b/src/query/service/src/interpreters/interpreter_copy_v2.rs @@ -30,10 +30,10 @@ use common_planner::ReadDataSourcePlan; use common_planner::SourceInfo; use regex::Regex; -use super::append2table; use crate::catalogs::Catalog; -use crate::interpreters::interpreter_common::list_files; -use crate::interpreters::interpreter_common::stat_file; +use crate::interpreters::common::append2table; +use crate::interpreters::common::list_files; +use crate::interpreters::common::stat_file; use crate::interpreters::Interpreter; use crate::interpreters::SelectInterpreterV2; use crate::pipelines::PipelineBuildResult; diff --git a/src/query/service/src/interpreters/interpreter_insert_v2.rs b/src/query/service/src/interpreters/interpreter_insert_v2.rs index 323fb16afa2ca..af58ea5e87b28 100644 --- a/src/query/service/src/interpreters/interpreter_insert_v2.rs +++ b/src/query/service/src/interpreters/interpreter_insert_v2.rs @@ -39,8 +39,8 @@ use common_sql::MetadataRef; use parking_lot::Mutex; use parking_lot::RwLock; -use super::interpreter_common::append2table; use super::plan_schedulers::build_schedule_pipeline; +use crate::interpreters::common::append2table; use crate::interpreters::Interpreter; use crate::interpreters::InterpreterPtr; use crate::pipelines::PipelineBuildResult; diff --git a/src/query/service/src/interpreters/interpreter_list.rs b/src/query/service/src/interpreters/interpreter_list.rs index fffddbc7fb102..5ab9ebf28deb5 100644 --- a/src/query/service/src/interpreters/interpreter_list.rs +++ b/src/query/service/src/interpreters/interpreter_list.rs @@ -23,7 +23,7 @@ use common_exception::Result; use common_planner::plans::ListPlan; use regex::Regex; -use crate::interpreters::interpreter_common::list_files; +use crate::interpreters::common::list_files; use crate::interpreters::Interpreter; use crate::pipelines::PipelineBuildResult; use crate::sessions::QueryContext; diff --git a/src/query/service/src/interpreters/interpreter_privilege_grant.rs b/src/query/service/src/interpreters/interpreter_privilege_grant.rs index c93fd1863af05..ed57a49966ee9 100644 --- a/src/query/service/src/interpreters/interpreter_privilege_grant.rs +++ b/src/query/service/src/interpreters/interpreter_privilege_grant.rs @@ -21,7 +21,7 @@ use common_meta_types::UserPrivilegeSet; use common_planner::plans::GrantPrivilegePlan; use common_users::UserApiProvider; -use crate::interpreters::interpreter_common::validate_grant_object_exists; +use crate::interpreters::common::validate_grant_object_exists; use crate::interpreters::Interpreter; use crate::pipelines::PipelineBuildResult; use crate::sessions::QueryContext; diff --git a/src/query/service/src/interpreters/interpreter_privilege_revoke.rs b/src/query/service/src/interpreters/interpreter_privilege_revoke.rs index a47a7abd3466d..f7ca857daadd2 100644 --- a/src/query/service/src/interpreters/interpreter_privilege_revoke.rs +++ b/src/query/service/src/interpreters/interpreter_privilege_revoke.rs @@ -19,7 +19,7 @@ use common_meta_types::PrincipalIdentity; use common_planner::plans::RevokePrivilegePlan; use common_users::UserApiProvider; -use crate::interpreters::interpreter_common::validate_grant_object_exists; +use crate::interpreters::common::validate_grant_object_exists; use crate::interpreters::Interpreter; use crate::pipelines::PipelineBuildResult; use crate::sessions::QueryContext; diff --git a/src/query/service/src/interpreters/interpreter_user_stage_remove.rs b/src/query/service/src/interpreters/interpreter_user_stage_remove.rs index 67bdcccba42c8..01c5b302ab269 100644 --- a/src/query/service/src/interpreters/interpreter_user_stage_remove.rs +++ b/src/query/service/src/interpreters/interpreter_user_stage_remove.rs @@ -20,7 +20,7 @@ use common_exception::Result; use common_planner::plans::RemoveStagePlan; use regex::Regex; -use crate::interpreters::interpreter_common::list_files; +use crate::interpreters::common::list_files; use crate::interpreters::Interpreter; use crate::pipelines::PipelineBuildResult; use crate::sessions::QueryContext; diff --git a/src/query/service/src/interpreters/mod.rs b/src/query/service/src/interpreters/mod.rs index f4519ee15dff9..2827501c19756 100644 --- a/src/query/service/src/interpreters/mod.rs +++ b/src/query/service/src/interpreters/mod.rs @@ -14,13 +14,13 @@ mod access; // mod async_insert_queue_v2; +mod common; mod fragments; mod interpreter; mod interpreter_call; mod interpreter_cluster_key_alter; mod interpreter_cluster_key_drop; mod interpreter_clustering_history; -mod interpreter_common; mod interpreter_copy_v2; mod interpreter_database_create; mod interpreter_database_drop; @@ -81,6 +81,7 @@ mod interpreter_view_drop; mod plan_schedulers; pub use access::ManagementModeAccess; +pub use common::append2table; pub use fragments::QueryFragmentAction; pub use fragments::QueryFragmentActions; pub use fragments::QueryFragmentsActions; @@ -90,9 +91,6 @@ pub use interpreter_call::CallInterpreter; pub use interpreter_cluster_key_alter::AlterTableClusterKeyInterpreter; pub use interpreter_cluster_key_drop::DropTableClusterKeyInterpreter; pub use interpreter_clustering_history::InterpreterClusteringHistory; -pub use interpreter_common::append2table; -pub use interpreter_common::execute_pipeline; -pub use interpreter_common::fill_missing_columns; pub use interpreter_database_create::CreateDatabaseInterpreter; pub use interpreter_database_drop::DropDatabaseInterpreter; pub use interpreter_database_rename::RenameDatabaseInterpreter; diff --git a/src/query/service/src/pipelines/pipeline_builder.rs b/src/query/service/src/pipelines/pipeline_builder.rs index 1beaba9e86f20..cfbd643803de8 100644 --- a/src/query/service/src/pipelines/pipeline_builder.rs +++ b/src/query/service/src/pipelines/pipeline_builder.rs @@ -35,7 +35,6 @@ use common_sql::evaluator::CompoundChunkOperator; use common_sql::executor::AggregateFunctionDesc; use common_sql::executor::PhysicalScalar; -use crate::interpreters::fill_missing_columns; use crate::pipelines::processors::port::InputPort; use crate::pipelines::processors::transforms::HashJoinDesc; use crate::pipelines::processors::transforms::RightSemiAntiJoinCompactor; @@ -51,6 +50,7 @@ use crate::pipelines::processors::RightJoinCompactor; use crate::pipelines::processors::SinkBuildHashTable; use crate::pipelines::processors::Sinker; use crate::pipelines::processors::SortMergeCompactor; +use crate::pipelines::processors::TransformAddOn; use crate::pipelines::processors::TransformAggregator; use crate::pipelines::processors::TransformCastSchema; use crate::pipelines::processors::TransformHashJoinProbe; @@ -624,13 +624,24 @@ impl PipelineBuilder { .get_catalog(&insert_select.catalog)? .get_table_by_info(&insert_select.table_info)?; - fill_missing_columns( - self.ctx.clone(), - insert_schema, - &table.schema(), - &mut self.main_pipeline, - )?; - + // Fill missing columns. + { + let source_schema = insert_schema; + let target_schema = &table.schema(); + if source_schema != target_schema { + self.main_pipeline.add_transform( + |transform_input_port, transform_output_port| { + TransformAddOn::try_create( + transform_input_port, + transform_output_port, + source_schema.clone(), + target_schema.clone(), + self.ctx.clone(), + ) + }, + )?; + } + } table.append_data(self.ctx.clone(), &mut self.main_pipeline, true)?; Ok(()) diff --git a/src/query/service/tests/it/storages/fuse/table_test_fixture.rs b/src/query/service/tests/it/storages/fuse/table_test_fixture.rs index 89f82786aa76b..57d6abe22c5ef 100644 --- a/src/query/service/tests/it/storages/fuse/table_test_fixture.rs +++ b/src/query/service/tests/it/storages/fuse/table_test_fixture.rs @@ -30,10 +30,11 @@ use common_storage::StorageParams; use common_storages_fuse::FUSE_TBL_XOR_BLOOM_INDEX_PREFIX; use common_streams::SendableDataBlockStream; use databend_query::interpreters::append2table; -use databend_query::interpreters::execute_pipeline; use databend_query::interpreters::CreateTableInterpreterV2; use databend_query::interpreters::Interpreter; use databend_query::interpreters::InterpreterFactory; +use databend_query::pipelines::executor::ExecutorSettings; +use databend_query::pipelines::executor::PipelineCompleteExecutor; use databend_query::pipelines::processors::BlocksSource; use databend_query::pipelines::PipelineBuildResult; use databend_query::sessions::QueryContext; @@ -415,6 +416,16 @@ pub async fn execute_query(ctx: Arc, query: &str) -> Result, mut res: PipelineBuildResult) -> Result<()> { + let executor_settings = ExecutorSettings::try_create(&ctx.get_settings())?; + res.set_max_threads(ctx.get_settings().get_max_threads()? as usize); + let mut pipelines = res.sources_pipelines; + pipelines.push(res.main_pipeline); + let executor = PipelineCompleteExecutor::from_pipelines(pipelines, executor_settings)?; + ctx.set_executor(Arc::downgrade(&executor.get_inner())); + executor.execute() +} + pub async fn execute_command(ctx: Arc, query: &str) -> Result<()> { let res = execute_query(ctx, query).await?; res.try_collect::>().await?;