Skip to content

Commit

Permalink
chore(query): simplify stage list (#15303)
Browse files Browse the repository at this point in the history
chore(query): simplify list
  • Loading branch information
sundy-li authored Apr 23, 2024
1 parent c60f157 commit 1877668
Show file tree
Hide file tree
Showing 8 changed files with 26 additions and 44 deletions.
48 changes: 16 additions & 32 deletions src/common/storage/src/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ impl StageFilesInfo {
&self,
operator: &Operator,
thread_num: usize,
first_only: bool,
max_files: Option<usize>,
) -> Result<Vec<StageFileInfo>> {
if self.path == STDIN_FD {
Expand All @@ -118,7 +117,7 @@ impl StageFilesInfo {
let max_files = max_files.unwrap_or(usize::MAX);
if let Some(files) = &self.files {
let file_infos = self
.stat_concurrent(operator, thread_num, first_only, max_files, files)
.stat_concurrent(operator, thread_num, max_files, files)
.await?;
let mut res = Vec::with_capacity(file_infos.len());

Expand All @@ -136,24 +135,21 @@ impl StageFilesInfo {
Ok(res)
} else {
let pattern = self.get_pattern()?;
StageFilesInfo::list_files_with_pattern(
operator, &self.path, pattern, first_only, max_files,
)
.await
StageFilesInfo::list_files_with_pattern(operator, &self.path, pattern, max_files).await
}
}

#[async_backtrace::framed]
pub async fn first_file(&self, operator: &Operator) -> Result<StageFileInfo> {
// We only fetch first file.
let mut files = self.list(operator, 1, true, None).await?;
let mut files = self.list(operator, 1, Some(1)).await?;
files
.pop()
.ok_or_else(|| ErrorCode::BadArguments("no file found"))
}

pub fn blocking_first_file(&self, operator: &Operator) -> Result<StageFileInfo> {
let mut files = self.blocking_list(operator, true, None)?;
let mut files = self.blocking_list(operator, Some(1))?;
files
.pop()
.ok_or_else(|| ErrorCode::BadArguments("no file found"))
Expand All @@ -162,11 +158,9 @@ impl StageFilesInfo {
pub fn blocking_list(
&self,
operator: &Operator,
first_only: bool,
max_files: Option<usize>,
) -> Result<Vec<StageFileInfo>> {
let max_files = max_files.unwrap_or(usize::MAX);
let mut limit = 0;
if let Some(files) = &self.files {
let mut res = Vec::new();
for file in files {
Expand All @@ -183,18 +177,14 @@ impl StageFilesInfo {
"{full_path} is not a file"
)));
}
if first_only {
break;
}
limit += 1;
if limit == max_files {
if res.len() == max_files {
return Ok(res);
}
}
Ok(res)
} else {
let pattern = self.get_pattern()?;
blocking_list_files_with_pattern(operator, &self.path, pattern, first_only, max_files)
blocking_list_files_with_pattern(operator, &self.path, pattern, max_files)
}
}

Expand All @@ -203,7 +193,6 @@ impl StageFilesInfo {
operator: &Operator,
path: &str,
pattern: Option<Regex>,
first_only: bool,
max_files: usize,
) -> Result<Vec<StageFileInfo>> {
if path == STDIN_FD {
Expand All @@ -226,16 +215,15 @@ impl StageFilesInfo {
.recursive(true)
.metakey(StageFileInfo::meta_query())
.await?;
let mut limit: usize = 0;

if files.len() == max_files {
return Ok(files);
}
while let Some(obj) = lister.try_next().await? {
let meta = obj.metadata();
if check_file(&obj.path()[prefix_len..], meta.mode(), &pattern) {
files.push(StageFileInfo::new(obj.path().to_string(), meta));
if first_only {
return Ok(files);
}
limit += 1;
if limit == max_files {
if files.len() == max_files {
return Ok(files);
}
}
Expand All @@ -249,11 +237,10 @@ impl StageFilesInfo {
&self,
operator: &Operator,
thread_num: usize,
first_only: bool,
max_files: usize,
files: &[String],
) -> Result<Vec<Result<(String, Metadata)>>> {
if first_only {
if max_files == 1 {
let Some(file) = files.first() else {
return Ok(vec![]);
};
Expand Down Expand Up @@ -302,7 +289,6 @@ fn blocking_list_files_with_pattern(
operator: &Operator,
path: &str,
pattern: Option<Regex>,
first_only: bool,
max_files: usize,
) -> Result<Vec<StageFileInfo>> {
if path == STDIN_FD {
Expand All @@ -326,17 +312,15 @@ fn blocking_list_files_with_pattern(
.recursive(true)
.metakey(StageFileInfo::meta_query())
.call()?;
let mut limit = 0;
if files.len() == max_files {
return Ok(files);
}
for obj in list {
let obj = obj?;
let meta = obj.metadata();
if check_file(&obj.path()[prefix_len..], meta.mode(), &pattern) {
files.push(StageFileInfo::new(obj.path().to_string(), meta));
if first_only {
return Ok(files);
}
limit += 1;
if limit == max_files {
if files.len() == max_files {
return Ok(files);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ pub async fn list_stage_files(
) -> Result<Vec<StageFileInfo>> {
let op = init_stage_operator(stage_info)?;
let infos = files_info
.list(&op, thread_num, false, max_files)
.list(&op, thread_num, max_files)
.await?
.into_iter()
.collect::<Vec<_>>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ impl Interpreter for RemoveUserStageInterpreter {
pattern,
};
let files: Vec<String> = files_info
.list(&op, thread_num, false, None)
.list(&op, thread_num, None)
.await?
.into_iter()
.map(|file_with_meta| file_with_meta.path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ impl AsyncSource for ListStagesSource {
pattern: self.args_parsed.files_info.pattern.clone(),
};

let files = files_info.list(&op, thread_num, false, None).await?;
let files = files_info.list(&op, thread_num, None).await?;

let names: Vec<String> = files.iter().map(|file| file.path.to_string()).collect();

Expand Down
10 changes: 4 additions & 6 deletions src/query/sql/src/planner/plans/copy_into_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,21 +127,19 @@ impl CopyIntoTablePlan {
if self.force {
stage_table_info
.files_info
.blocking_list(&operator, false, max_files)
.blocking_list(&operator, max_files)
} else {
stage_table_info
.files_info
.blocking_list(&operator, false, None)
stage_table_info.files_info.blocking_list(&operator, None)
}
} else if self.force {
stage_table_info
.files_info
.list(&operator, thread_num, false, max_files)
.list(&operator, thread_num, max_files)
.await
} else {
stage_table_info
.files_info
.list(&operator, thread_num, false, None)
.list(&operator, thread_num, None)
.await
}?;

Expand Down
2 changes: 1 addition & 1 deletion src/query/sql/src/planner/plans/scalar_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,7 @@ fn hash_column_set<H: Hasher>(columns: &ColumnSet, state: &mut H) {
columns.iter().for_each(|c| c.hash(state));
}

/// UDFCall includes server & lambda call
/// UDFCall includes script & lambda call
#[derive(Clone, Debug, Educe)]
#[educe(PartialEq, Eq, Hash)]
pub struct UDFCall {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl ParquetRSTable {
.collect::<Vec<_>>(),
None => self
.files_info
.list(&self.operator, thread_num, false, None)
.list(&self.operator, thread_num, None)
.await?
.into_iter()
.map(|f| (f.path, f.size))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ impl Table for ParquetRSTable {
.collect::<Vec<_>>(),
None => self
.files_info
.list(&self.operator, thread_num, false, None)
.list(&self.operator, thread_num, None)
.await?
.into_iter()
.map(|f| (f.path, f.size))
Expand Down

0 comments on commit 1877668

Please sign in to comment.