diff --git a/src/common/storage/src/stage.rs b/src/common/storage/src/stage.rs index 25b1c37b149aa..dba71b23b8db1 100644 --- a/src/common/storage/src/stage.rs +++ b/src/common/storage/src/stage.rs @@ -108,7 +108,6 @@ impl StageFilesInfo { &self, operator: &Operator, thread_num: usize, - first_only: bool, max_files: Option, ) -> Result> { if self.path == STDIN_FD { @@ -118,7 +117,7 @@ impl StageFilesInfo { let max_files = max_files.unwrap_or(usize::MAX); if let Some(files) = &self.files { let file_infos = self - .stat_concurrent(operator, thread_num, first_only, max_files, files) + .stat_concurrent(operator, thread_num, max_files, files) .await?; let mut res = Vec::with_capacity(file_infos.len()); @@ -136,24 +135,21 @@ impl StageFilesInfo { Ok(res) } else { let pattern = self.get_pattern()?; - StageFilesInfo::list_files_with_pattern( - operator, &self.path, pattern, first_only, max_files, - ) - .await + StageFilesInfo::list_files_with_pattern(operator, &self.path, pattern, max_files).await } } #[async_backtrace::framed] pub async fn first_file(&self, operator: &Operator) -> Result { // We only fetch first file. - let mut files = self.list(operator, 1, true, None).await?; + let mut files = self.list(operator, 1, Some(1)).await?; files .pop() .ok_or_else(|| ErrorCode::BadArguments("no file found")) } pub fn blocking_first_file(&self, operator: &Operator) -> Result { - let mut files = self.blocking_list(operator, true, None)?; + let mut files = self.blocking_list(operator, Some(1))?; files .pop() .ok_or_else(|| ErrorCode::BadArguments("no file found")) @@ -162,11 +158,9 @@ impl StageFilesInfo { pub fn blocking_list( &self, operator: &Operator, - first_only: bool, max_files: Option, ) -> Result> { let max_files = max_files.unwrap_or(usize::MAX); - let mut limit = 0; if let Some(files) = &self.files { let mut res = Vec::new(); for file in files { @@ -183,18 +177,14 @@ impl StageFilesInfo { "{full_path} is not a file" ))); } - if first_only { - break; - } - limit += 1; - if limit == max_files { + if res.len() == max_files { return Ok(res); } } Ok(res) } else { let pattern = self.get_pattern()?; - blocking_list_files_with_pattern(operator, &self.path, pattern, first_only, max_files) + blocking_list_files_with_pattern(operator, &self.path, pattern, max_files) } } @@ -203,7 +193,6 @@ impl StageFilesInfo { operator: &Operator, path: &str, pattern: Option, - first_only: bool, max_files: usize, ) -> Result> { if path == STDIN_FD { @@ -226,16 +215,15 @@ impl StageFilesInfo { .recursive(true) .metakey(StageFileInfo::meta_query()) .await?; - let mut limit: usize = 0; + + if files.len() == max_files { + return Ok(files); + } while let Some(obj) = lister.try_next().await? { let meta = obj.metadata(); if check_file(&obj.path()[prefix_len..], meta.mode(), &pattern) { files.push(StageFileInfo::new(obj.path().to_string(), meta)); - if first_only { - return Ok(files); - } - limit += 1; - if limit == max_files { + if files.len() == max_files { return Ok(files); } } @@ -249,11 +237,10 @@ impl StageFilesInfo { &self, operator: &Operator, thread_num: usize, - first_only: bool, max_files: usize, files: &[String], ) -> Result>> { - if first_only { + if max_files == 1 { let Some(file) = files.first() else { return Ok(vec![]); }; @@ -302,7 +289,6 @@ fn blocking_list_files_with_pattern( operator: &Operator, path: &str, pattern: Option, - first_only: bool, max_files: usize, ) -> Result> { if path == STDIN_FD { @@ -326,17 +312,15 @@ fn blocking_list_files_with_pattern( .recursive(true) .metakey(StageFileInfo::meta_query()) .call()?; - let mut limit = 0; + if files.len() == max_files { + return Ok(files); + } for obj in list { let obj = obj?; let meta = obj.metadata(); if check_file(&obj.path()[prefix_len..], meta.mode(), &pattern) { files.push(StageFileInfo::new(obj.path().to_string(), meta)); - if first_only { - return Ok(files); - } - limit += 1; - if limit == max_files { + if files.len() == max_files { return Ok(files); } } diff --git a/src/query/catalog/src/plan/datasource/datasource_info/stage.rs b/src/query/catalog/src/plan/datasource/datasource_info/stage.rs index ab96ba9451f9b..7e78932ba8711 100644 --- a/src/query/catalog/src/plan/datasource/datasource_info/stage.rs +++ b/src/query/catalog/src/plan/datasource/datasource_info/stage.rs @@ -70,7 +70,7 @@ pub async fn list_stage_files( ) -> Result> { let op = init_stage_operator(stage_info)?; let infos = files_info - .list(&op, thread_num, false, max_files) + .list(&op, thread_num, max_files) .await? .into_iter() .collect::>(); diff --git a/src/query/service/src/interpreters/interpreter_user_stage_remove.rs b/src/query/service/src/interpreters/interpreter_user_stage_remove.rs index 2bae6c9b2f1e7..7908961616a92 100644 --- a/src/query/service/src/interpreters/interpreter_user_stage_remove.rs +++ b/src/query/service/src/interpreters/interpreter_user_stage_remove.rs @@ -69,7 +69,7 @@ impl Interpreter for RemoveUserStageInterpreter { pattern, }; let files: Vec = files_info - .list(&op, thread_num, false, None) + .list(&op, thread_num, None) .await? .into_iter() .map(|file_with_meta| file_with_meta.path) diff --git a/src/query/service/src/table_functions/list_stage/list_stage_table.rs b/src/query/service/src/table_functions/list_stage/list_stage_table.rs index c595fdb584599..92b9d55d17a8a 100644 --- a/src/query/service/src/table_functions/list_stage/list_stage_table.rs +++ b/src/query/service/src/table_functions/list_stage/list_stage_table.rs @@ -210,7 +210,7 @@ impl AsyncSource for ListStagesSource { pattern: self.args_parsed.files_info.pattern.clone(), }; - let files = files_info.list(&op, thread_num, false, None).await?; + let files = files_info.list(&op, thread_num, None).await?; let names: Vec = files.iter().map(|file| file.path.to_string()).collect(); diff --git a/src/query/sql/src/planner/plans/copy_into_table.rs b/src/query/sql/src/planner/plans/copy_into_table.rs index 64aa11464537b..c82b29ddd2863 100644 --- a/src/query/sql/src/planner/plans/copy_into_table.rs +++ b/src/query/sql/src/planner/plans/copy_into_table.rs @@ -127,21 +127,19 @@ impl CopyIntoTablePlan { if self.force { stage_table_info .files_info - .blocking_list(&operator, false, max_files) + .blocking_list(&operator, max_files) } else { - stage_table_info - .files_info - .blocking_list(&operator, false, None) + stage_table_info.files_info.blocking_list(&operator, None) } } else if self.force { stage_table_info .files_info - .list(&operator, thread_num, false, max_files) + .list(&operator, thread_num, max_files) .await } else { stage_table_info .files_info - .list(&operator, thread_num, false, None) + .list(&operator, thread_num, None) .await }?; diff --git a/src/query/sql/src/planner/plans/scalar_expr.rs b/src/query/sql/src/planner/plans/scalar_expr.rs index 06a96ba2f618d..fb09ef0e4b11a 100644 --- a/src/query/sql/src/planner/plans/scalar_expr.rs +++ b/src/query/sql/src/planner/plans/scalar_expr.rs @@ -607,7 +607,7 @@ fn hash_column_set(columns: &ColumnSet, state: &mut H) { columns.iter().for_each(|c| c.hash(state)); } -/// UDFCall includes server & lambda call +/// UDFCall includes script & lambda call #[derive(Clone, Debug, Educe)] #[educe(PartialEq, Eq, Hash)] pub struct UDFCall { diff --git a/src/query/storages/parquet/src/parquet_rs/parquet_table/partition.rs b/src/query/storages/parquet/src/parquet_rs/parquet_table/partition.rs index 21045fc106890..02676833ff23c 100644 --- a/src/query/storages/parquet/src/parquet_rs/parquet_table/partition.rs +++ b/src/query/storages/parquet/src/parquet_rs/parquet_table/partition.rs @@ -60,7 +60,7 @@ impl ParquetRSTable { .collect::>(), None => self .files_info - .list(&self.operator, thread_num, false, None) + .list(&self.operator, thread_num, None) .await? .into_iter() .map(|f| (f.path, f.size)) diff --git a/src/query/storages/parquet/src/parquet_rs/parquet_table/table.rs b/src/query/storages/parquet/src/parquet_rs/parquet_table/table.rs index e4554ef0ac32f..d2f610ef01736 100644 --- a/src/query/storages/parquet/src/parquet_rs/parquet_table/table.rs +++ b/src/query/storages/parquet/src/parquet_rs/parquet_table/table.rs @@ -274,7 +274,7 @@ impl Table for ParquetRSTable { .collect::>(), None => self .files_info - .list(&self.operator, thread_num, false, None) + .list(&self.operator, thread_num, None) .await? .into_iter() .map(|f| (f.path, f.size))