Skip to content

Commit

Permalink
[fix] parquet rowgroup pruning not kicking in
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Oct 28, 2021
1 parent 8dfb604 commit 1c7b0e7
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 14 deletions.
38 changes: 25 additions & 13 deletions datafusion/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,11 @@ const FILE_SIZE_COLUMN_NAME: &str = "_df_part_file_size_";
const FILE_PATH_COLUMN_NAME: &str = "_df_part_file_path_";
const FILE_MODIFIED_COLUMN_NAME: &str = "_df_part_file_modified_";

/// Check whether the given expression can be resolved using only the columns `col_names`
/// Check whether the given expression can be resolved using only the columns `col_names`.
/// This means that if this function returns true:
/// - the table provider can filter the table partition values with this expression
/// - the expression can be marked as `TableProviderFilterPushDown::Exact` once this filtering
/// was performed
pub fn expr_applicable_for_cols(col_names: &[String], expr: &Expr) -> bool {
match expr {
// leaf
Expand Down Expand Up @@ -153,24 +157,22 @@ pub fn split_files(

/// Discover the partitions on the given path and prune out files
/// that belong to irrelevant partitions using `filters` expressions.
/// Assumes that `filters` only contains expressions that can be resolved
/// using partitioning columns only.
/// `filters` might contain expressions that can be resolved only at the
/// file level (e.g. Parquet row group pruning).
///
/// TODO for tables with many files (10k+), it will usually more efficient
/// to first list the folders relative to the first partition dimension,
/// prune those, then list only the contain of the remaining folders.
pub async fn pruned_partition_list(
store: &dyn ObjectStore,
table_path: &str,
applicable_filters: &[Expr],
filters: &[Expr],
file_extension: &str,
table_partition_cols: &[String],
) -> Result<PartitionedFileStream> {
// if no partition col => simply list all the files
// if partition but no filter => parse the partition values while listing all the files
// otherwise => parse the partition values and serde them as a RecordBatch to filter them
// if no partition col => simply list all the files
if table_partition_cols.is_empty() {
Ok(Box::pin(
return Ok(Box::pin(
store
.list_file_with_suffix(table_path, file_extension)
.await?
Expand All @@ -180,9 +182,17 @@ pub async fn pruned_partition_list(
file_meta: f?,
})
}),
))
} else if applicable_filters.is_empty() {
let stream_path = table_path.to_owned();
));
}

let applicable_filters: Vec<_> = filters
.iter()
.filter(|f| expr_applicable_for_cols(table_partition_cols, f))
.collect();
let stream_path = table_path.to_owned();
if applicable_filters.is_empty() {
// parse the partition values while listing all the files
// TODO we might avoid parsing the partition values if they are not used in any projection
let table_partition_cols_stream = table_partition_cols.to_vec();
Ok(Box::pin(
store
Expand Down Expand Up @@ -217,8 +227,8 @@ pub async fn pruned_partition_list(
}),
))
} else {
// parse the partition values and serde them as a RecordBatch to filter them
// TODO avoid collecting but have a streaming memory table instead
let stream_path = table_path.to_owned();
let batches: Vec<RecordBatch> = store
.list_file_with_suffix(table_path, file_extension)
.await?
Expand Down Expand Up @@ -501,10 +511,12 @@ mod tests {
]);
let filter1 = Expr::eq(col("part1"), lit("p1v2"));
let filter2 = Expr::eq(col("part2"), lit("p2v1"));
// filter3 cannot be resolved at partition pruning
let filter3 = Expr::eq(col("part2"), col("other"));
let pruned = pruned_partition_list(
store.as_ref(),
"tablepath/",
&[filter1, filter2],
&[filter1, filter2, filter3],
".parquet",
&[String::from("part1"), String::from("part2")],
)
Expand Down
5 changes: 4 additions & 1 deletion datafusion/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,12 @@ impl TableProvider for ListingTable {
filter: &Expr,
) -> Result<TableProviderFilterPushDown> {
if expr_applicable_for_cols(&self.options.table_partition_cols, filter) {
// if filter can be handled by partiton pruning, it is exact
Ok(TableProviderFilterPushDown::Exact)
} else {
Ok(TableProviderFilterPushDown::Unsupported)
// otherwise, we still might be able to handle the filter with file
// level mechanisms such as Parquet row group pruning.
Ok(TableProviderFilterPushDown::Inexact)
}
}
}
Expand Down

0 comments on commit 1c7b0e7

Please sign in to comment.