Skip to content

Commit

Permalink
refactor: ParquetTable list file in read_partition.
Browse files Browse the repository at this point in the history
  • Loading branch information
youngsofun committed Feb 3, 2023
1 parent 9eced49 commit ec12a30
Show file tree
Hide file tree
Showing 9 changed files with 62 additions and 164 deletions.
4 changes: 3 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/query/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ common-meta-types = { path = "../../meta/types" }
common-pipeline-core = { path = "../pipeline/core" }
common-settings = { path = "../settings" }
common-storage = { path = "../../common/storage" }
opendal = { workspace = true }

async-trait = "0.1.57"
chrono = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,18 @@ use common_arrow::arrow::datatypes::Schema as ArrowSchema;
use common_expression::TableSchema;
use common_meta_app::schema::TableInfo;
use common_meta_types::UserStageInfo;
use common_storage::StageFilesInfo;

use crate::plan::datasource::datasource_info::parquet_read_options::ParquetReadOptions;

#[derive(serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq, Debug)]
pub struct ParquetTableInfo {
pub table_info: TableInfo,
pub file_locations: Vec<String>,
pub arrow_schema: ArrowSchema,
pub read_options: ParquetReadOptions,
pub user_stage_info: UserStageInfo,
pub files_info: StageFilesInfo,

pub table_info: TableInfo,
pub arrow_schema: ArrowSchema,
}

impl ParquetTableInfo {
Expand Down
29 changes: 12 additions & 17 deletions src/query/sql/src/planner/binder/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use common_ast::parser::tokenize_sql;
use common_ast::Backtrace;
use common_ast::Dialect;
use common_catalog::catalog_kind::CATALOG_DEFAULT;
use common_catalog::plan::ParquetReadOptions;
use common_catalog::table::ColumnId;
use common_catalog::table::ColumnStatistics;
use common_catalog::table::NavigationPoint;
Expand All @@ -46,9 +47,8 @@ use common_expression::Scalar;
use common_functions::scalars::BUILTIN_FUNCTIONS;
use common_meta_types::StageFileFormatType;
use common_meta_types::UserStageInfo;
use common_storage::StageFilesInfo;
use common_storages_parquet::ParquetTable;
use common_storages_stage::list_file;
use common_storages_stage::StageTable;
use common_storages_view::view_table::QUERY;

use crate::binder::copy::parse_stage_location_v2;
Expand Down Expand Up @@ -340,25 +340,20 @@ impl Binder {
}
};

let op = StageTable::get_op(&user_stage_info)?;

if matches!(
user_stage_info.file_format_options.format,
StageFileFormatType::Parquet
) {
let files = list_file(&op, &path)
.await?
.into_iter()
.map(|x| x.path)
.collect();

let table = ParquetTable::create(
op,
files,
Default::default(),
user_stage_info.clone(),
)
.await?;
let files_info = StageFilesInfo {
path,
pattern: None,
files: None,
};
let read_options = ParquetReadOptions::default();

let table =
ParquetTable::create(user_stage_info.clone(), files_info, read_options)
.await?;

let table_alias_name = if let Some(table_alias) = alias {
Some(
Expand Down
1 change: 0 additions & 1 deletion src/query/storages/parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ storages-common-table-meta = { path = "../common/table-meta" }
async-trait = { version = "0.1.57", package = "async-trait-fn" }
chrono = { workspace = true }
futures = "0.3.24"
glob = "0.3.1"
opendal = { workspace = true }
serde = { workspace = true }
typetag = "0.2.3"
Expand Down
70 changes: 9 additions & 61 deletions src/query/storages/parquet/src/parquet_table/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,7 @@ use common_catalog::table::Table;
use common_exception::ErrorCode;
use common_exception::Result;
use common_meta_types::UserStageInfo;
use glob::Pattern;
use opendal::raw::get_basename;
use opendal::raw::get_parent;
use opendal::ObjectMode;
use common_storage::StageFilesInfo;
use opendal::Operator;

use super::table::create_parquet_table_info;
Expand All @@ -33,85 +30,36 @@ use crate::ParquetTable;
impl ParquetTable {
pub fn blocking_create(
operator: Operator,
maybe_glob_locations: Vec<String>,
read_options: ParquetReadOptions,
stage_info: UserStageInfo,
files_info: StageFilesInfo,
) -> Result<Arc<dyn Table>> {
let (file_locations, arrow_schema) =
Self::blocking_prepare_metas(maybe_glob_locations, operator.clone())?;
let first_file = files_info.blocking_first_file(&operator)?;
let arrow_schema = Self::blocking_prepare_metas(&first_file.path, operator.clone())?;

let table_info = create_parquet_table_info(arrow_schema.clone());

Ok(Arc::new(ParquetTable {
file_locations,
table_info,
arrow_schema,
operator,
read_options,
stage_info,
files_info,
}))
}

fn blocking_prepare_metas(
paths: Vec<String>,
operator: Operator,
) -> Result<(Vec<String>, ArrowSchema)> {
let mut files = Vec::with_capacity(paths.len());
for maybe_glob_path in paths {
let list = Self::blocking_list_files(&maybe_glob_path, &operator)?;
files.extend(list);
}

if files.is_empty() {
return Err(ErrorCode::BadArguments(
"No matched files found for read_parquet",
));
}

fn blocking_prepare_metas(path: &str, operator: Operator) -> Result<ArrowSchema> {
// Infer schema from the first parquet file.
// Assume all parquet files have the same schema.
// If not, throw error during reading.
let mut reader = operator.object(&files[0]).blocking_reader()?;
let mut reader = operator.object(path).blocking_reader()?;
let first_meta = pread::read_metadata(&mut reader).map_err(|e| {
ErrorCode::Internal(format!(
"Read parquet file '{}''s meta error: {}",
&files[0], e
))
ErrorCode::Internal(format!("Read parquet file '{}''s meta error: {}", path, e))
})?;

let arrow_schema = pread::infer_schema(&first_meta)?;

Ok((files, arrow_schema))
}

/// List files from the given path with pattern.
///
/// Only support simple patterns (one level): `path/to/dir/*.parquet`.
fn blocking_list_files(maybe_glob_path: &str, operator: &Operator) -> Result<Vec<String>> {
let basename = get_basename(maybe_glob_path);
let pattern = match Pattern::new(basename) {
Ok(pattern) => pattern,
Err(_) => {
// not a Unix shell pattern, push the path directly.
return Ok(vec![maybe_glob_path.to_string()]);
}
};

let obj = operator.object(get_parent(maybe_glob_path));
let mut files = Vec::new();
let list = obj.blocking_list()?;
for de in list {
let de = de?;
match de.blocking_mode()? {
ObjectMode::FILE => {
if pattern.matches(de.name()) {
files.push(de.path().to_string());
}
}
_ => continue,
}
}

Ok(files)
Ok(arrow_schema)
}
}
87 changes: 13 additions & 74 deletions src/query/storages/parquet/src/parquet_table/non_blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,116 +16,55 @@ use std::sync::Arc;

use common_arrow::arrow::datatypes::Schema as ArrowSchema;
use common_arrow::arrow::io::parquet::read as pread;
use common_base::base::tokio;
use common_catalog::plan::ParquetReadOptions;
use common_catalog::table::Table;
use common_exception::ErrorCode;
use common_exception::Result;
use common_meta_types::UserStageInfo;
use futures::TryStreamExt;
use glob::Pattern;
use opendal::raw::get_basename;
use opendal::raw::get_parent;
use opendal::ObjectMode;
use common_storage::init_stage_operator;
use common_storage::StageFilesInfo;
use opendal::Operator;

use super::table::create_parquet_table_info;
use crate::ParquetTable;

impl ParquetTable {
pub async fn create(
operator: Operator,
maybe_glob_locations: Vec<String>,
read_options: ParquetReadOptions,
stage_info: UserStageInfo,
files_info: StageFilesInfo,
read_options: ParquetReadOptions,
) -> Result<Arc<dyn Table>> {
let operator = init_stage_operator(&stage_info)?;
if operator.metadata().can_blocking() {
return Self::blocking_create(operator, maybe_glob_locations, read_options, stage_info);
return Self::blocking_create(operator, read_options, stage_info, files_info);
}
let first_file = files_info.first_file(&operator).await?;

let (file_locations, arrow_schema) =
Self::prepare_metas(maybe_glob_locations, operator.clone()).await?;
let arrow_schema = Self::prepare_metas(&first_file.path, operator.clone()).await?;

let table_info = create_parquet_table_info(arrow_schema.clone());

Ok(Arc::new(ParquetTable {
file_locations,
table_info,
arrow_schema,
operator,
read_options,
stage_info,
files_info,
}))
}

async fn prepare_metas(
paths: Vec<String>,
operator: Operator,
) -> Result<(Vec<String>, ArrowSchema)> {
let mut handles = Vec::with_capacity(paths.len());
for maybe_glob_path in paths {
let operator = operator.clone();
handles.push(async move {
tokio::spawn(async move { Self::list_files(&maybe_glob_path, &operator).await })
.await
.unwrap()
});
}
let files = futures::future::try_join_all(handles)
.await?
.into_iter()
.flatten()
.collect::<Vec<_>>();

if files.is_empty() {
return Err(ErrorCode::BadArguments(
"No matched files found for read_parquet",
));
}

async fn prepare_metas(path: &str, operator: Operator) -> Result<ArrowSchema> {
// Infer schema from the first parquet file.
// Assume all parquet files have the same schema.
// If not, throw error during reading.
let mut reader = operator.object(&files[0]).reader().await?;
let mut reader = operator.object(path).reader().await?;
let first_meta = pread::read_metadata_async(&mut reader).await.map_err(|e| {
ErrorCode::Internal(format!(
"Read parquet file '{}''s meta error: {}",
&files[0], e
))
ErrorCode::Internal(format!("Read parquet file '{}''s meta error: {}", path, e))
})?;

let arrow_schema = pread::infer_schema(&first_meta)?;

Ok((files, arrow_schema))
}

/// List files from the given path with pattern.
///
/// Only support simple patterns (one level): `path/to/dir/*.parquet`.
async fn list_files(maybe_glob_path: &str, operator: &Operator) -> Result<Vec<String>> {
let basename = get_basename(maybe_glob_path);
let pattern = match Pattern::new(basename) {
Ok(pattern) => pattern,
Err(_) => {
// not a Unix shell pattern, push the path directly.
return Ok(vec![maybe_glob_path.to_string()]);
}
};

let obj = operator.object(get_parent(maybe_glob_path));
let mut files = Vec::new();
let mut list = obj.list().await?;
while let Some(de) = list.try_next().await? {
match de.mode().await? {
ObjectMode::FILE => {
if pattern.matches(de.name()) {
files.push(de.path().to_string());
}
}
_ => continue,
}
}

Ok(files)
Ok(arrow_schema)
}
}
11 changes: 10 additions & 1 deletion src/query/storages/parquet/src/parquet_table/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,21 @@ impl ParquetTable {
None
};

let file_locations = if self.operator.metadata().can_blocking() {
self.files_info.blocking_list(&self.operator, false)
} else {
self.files_info.list(&self.operator, false).await
}?
.into_iter()
.map(|f| f.path)
.collect::<Vec<_>>();

let pruner = PartitionPruner {
schema,
row_group_pruner,
page_pruners,
operator: self.operator.clone(),
locations: self.file_locations.clone(),
locations: file_locations,
columns_to_read,
column_nodes: projected_column_nodes,
skip_pruning,
Expand Down
Loading

0 comments on commit ec12a30

Please sign in to comment.