-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Prune Parquet RowGroup in a single call to PruningPredicate::prune
, update StatisticsExtractor API
#10802
Prune Parquet RowGroup in a single call to PruningPredicate::prune
, update StatisticsExtractor API
#10802
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,7 +25,7 @@ use arrow_schema::SchemaRef; | |
use async_trait::async_trait; | ||
use datafusion::datasource::listing::PartitionedFile; | ||
use datafusion::datasource::physical_plan::{ | ||
parquet::{RequestedStatistics, StatisticsConverter}, | ||
parquet::StatisticsConverter, | ||
{FileScanConfig, ParquetExec}, | ||
}; | ||
use datafusion::datasource::TableProvider; | ||
|
@@ -518,21 +518,17 @@ impl ParquetMetadataIndexBuilder { | |
|
||
// extract the parquet statistics from the file's footer | ||
let metadata = reader.metadata(); | ||
let row_groups = metadata.row_groups(); | ||
|
||
// Extract the min/max values for each row group from the statistics | ||
let row_counts = StatisticsConverter::row_counts(reader.metadata())?; | ||
let value_column_mins = StatisticsConverter::try_new( | ||
let converter = StatisticsConverter::try_new( | ||
"value", | ||
RequestedStatistics::Min, | ||
reader.schema(), | ||
)? | ||
.extract(reader.metadata())?; | ||
let value_column_maxes = StatisticsConverter::try_new( | ||
"value", | ||
RequestedStatistics::Max, | ||
reader.schema(), | ||
)? | ||
.extract(reader.metadata())?; | ||
reader.parquet_schema(), | ||
)?; | ||
let row_counts = StatisticsConverter::row_group_row_counts(row_groups.iter())?; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This looks like an user-facing change, should be ok at this stage? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, it is a user facing change, but we haven't released a version of datafusion yet that had |
||
let value_column_mins = converter.row_group_mins(row_groups.iter())?; | ||
let value_column_maxes = converter.row_group_maxes(row_groups.iter())?; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ❤️ |
||
|
||
// In a real system you would have to handle nulls, which represent | ||
// unknown statistics. All statistics are known in this example | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,9 +24,7 @@ use arrow_schema::{ | |
Field, Schema, | ||
}; | ||
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; | ||
use datafusion::datasource::physical_plan::parquet::{ | ||
RequestedStatistics, StatisticsConverter, | ||
}; | ||
use datafusion::datasource::physical_plan::parquet::StatisticsConverter; | ||
use parquet::arrow::{arrow_reader::ArrowReaderBuilder, ArrowWriter}; | ||
use parquet::file::properties::WriterProperties; | ||
use std::sync::Arc; | ||
|
@@ -159,41 +157,26 @@ fn criterion_benchmark(c: &mut Criterion) { | |
let file = file.reopen().unwrap(); | ||
let reader = ArrowReaderBuilder::try_new(file).unwrap(); | ||
let metadata = reader.metadata(); | ||
let row_groups = metadata.row_groups(); | ||
|
||
let mut group = | ||
c.benchmark_group(format!("Extract statistics for {}", dtype.clone())); | ||
group.bench_function( | ||
BenchmarkId::new("extract_statistics", dtype.clone()), | ||
|b| { | ||
b.iter(|| { | ||
let _ = StatisticsConverter::try_new( | ||
"col", | ||
RequestedStatistics::Min, | ||
reader.schema(), | ||
) | ||
.unwrap() | ||
.extract(metadata) | ||
.unwrap(); | ||
|
||
let _ = StatisticsConverter::try_new( | ||
"col", | ||
RequestedStatistics::Max, | ||
reader.schema(), | ||
) | ||
.unwrap() | ||
.extract(reader.metadata()) | ||
.unwrap(); | ||
|
||
let _ = StatisticsConverter::try_new( | ||
let converter = StatisticsConverter::try_new( | ||
"col", | ||
RequestedStatistics::NullCount, | ||
reader.schema(), | ||
reader.parquet_schema(), | ||
) | ||
.unwrap() | ||
.extract(reader.metadata()) | ||
.unwrap(); | ||
|
||
let _ = StatisticsConverter::row_counts(reader.metadata()).unwrap(); | ||
let _ = converter.row_group_mins(row_groups.iter()).unwrap(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is more clear than using enum IMO :) |
||
let _ = converter.row_group_maxes(row_groups.iter()).unwrap(); | ||
let _ = converter.row_group_null_counts(row_groups.iter()).unwrap(); | ||
let _ = StatisticsConverter::row_group_row_counts(row_groups.iter()) | ||
.unwrap(); | ||
}) | ||
}, | ||
); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,26 +17,23 @@ | |
|
||
use arrow::{array::ArrayRef, datatypes::Schema}; | ||
use arrow_array::BooleanArray; | ||
use arrow_schema::FieldRef; | ||
use datafusion_common::{Column, ScalarValue}; | ||
use datafusion_common::{Column, Result, ScalarValue}; | ||
use parquet::basic::Type; | ||
use parquet::data_type::Decimal; | ||
use parquet::file::metadata::ColumnChunkMetaData; | ||
use parquet::schema::types::SchemaDescriptor; | ||
use parquet::{ | ||
arrow::{async_reader::AsyncFileReader, ParquetRecordBatchStreamBuilder}, | ||
bloom_filter::Sbbf, | ||
file::metadata::RowGroupMetaData, | ||
}; | ||
use std::collections::{HashMap, HashSet}; | ||
use std::sync::Arc; | ||
|
||
use crate::datasource::listing::FileRange; | ||
use crate::datasource::physical_plan::parquet::statistics::{ | ||
max_statistics, min_statistics, parquet_column, | ||
}; | ||
use crate::datasource::physical_plan::parquet::statistics::parquet_column; | ||
use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; | ||
|
||
use super::ParquetFileMetrics; | ||
use super::{ParquetFileMetrics, StatisticsConverter}; | ||
|
||
/// Tracks which RowGroups within a parquet file should be scanned. | ||
/// | ||
|
@@ -136,32 +133,35 @@ impl RowGroupSet { | |
metrics: &ParquetFileMetrics, | ||
) { | ||
assert_eq!(groups.len(), self.len()); | ||
for (idx, metadata) in groups.iter().enumerate() { | ||
if !self.should_scan(idx) { | ||
continue; | ||
} | ||
let pruning_stats = RowGroupPruningStatistics { | ||
parquet_schema, | ||
row_group_metadata: metadata, | ||
arrow_schema, | ||
}; | ||
match predicate.prune(&pruning_stats) { | ||
Ok(values) => { | ||
// NB: false means don't scan row group | ||
if !values[0] { | ||
// Indexes of row groups still to scan | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here is the change to prune all row groups with one call to |
||
let indexes = self.indexes(); | ||
let row_group_metadatas = indexes.iter().map(|&i| &groups[i]).collect::<Vec<_>>(); | ||
|
||
let pruning_stats = RowGroupPruningStatistics { | ||
parquet_schema, | ||
row_group_metadatas, | ||
arrow_schema, | ||
}; | ||
|
||
// try to prune the row groups | ||
match predicate.prune(&pruning_stats) { | ||
Ok(values) => { | ||
// values[i] is false means the predicate could not be true for row group i | ||
for (idx, &value) in indexes.iter().zip(values.iter()) { | ||
if !value { | ||
self.do_not_scan(*idx); | ||
metrics.row_groups_pruned_statistics.add(1); | ||
self.do_not_scan(idx); | ||
continue; | ||
} else { | ||
metrics.row_groups_matched_statistics.add(1); | ||
} | ||
} | ||
// stats filter array could not be built | ||
// don't prune this row group | ||
Err(e) => { | ||
log::debug!("Error evaluating row group predicate values {e}"); | ||
metrics.predicate_evaluation_errors.add(1); | ||
} | ||
} | ||
metrics.row_groups_matched_statistics.add(1); | ||
// stats filter array could not be built | ||
// Can't prune. | ||
Err(e) => { | ||
log::debug!("Error evaluating row group predicate values {e}"); | ||
metrics.predicate_evaluation_errors.add(1); | ||
} | ||
} | ||
} | ||
|
||
|
@@ -360,49 +360,55 @@ impl PruningStatistics for BloomFilterStatistics { | |
} | ||
} | ||
|
||
/// Wraps [`RowGroupMetaData`] in a way that implements [`PruningStatistics`] | ||
/// | ||
/// Note: This should be implemented for an array of [`RowGroupMetaData`] instead | ||
/// of per row-group | ||
/// Wraps a slice of [`RowGroupMetaData`] in a way that implements [`PruningStatistics`] | ||
struct RowGroupPruningStatistics<'a> { | ||
parquet_schema: &'a SchemaDescriptor, | ||
row_group_metadata: &'a RowGroupMetaData, | ||
row_group_metadatas: Vec<&'a RowGroupMetaData>, | ||
arrow_schema: &'a Schema, | ||
} | ||
|
||
impl<'a> RowGroupPruningStatistics<'a> { | ||
/// Lookups up the parquet column by name | ||
fn column(&self, name: &str) -> Option<(&ColumnChunkMetaData, &FieldRef)> { | ||
let (idx, field) = parquet_column(self.parquet_schema, self.arrow_schema, name)?; | ||
Some((self.row_group_metadata.column(idx), field)) | ||
/// Return an iterator over the row group metadata | ||
fn metadata_iter(&'a self) -> impl Iterator<Item = &'a RowGroupMetaData> + 'a { | ||
self.row_group_metadatas.iter().copied() | ||
} | ||
|
||
fn statistics_converter<'b>( | ||
&'a self, | ||
column: &'b Column, | ||
) -> Result<StatisticsConverter<'a>> { | ||
StatisticsConverter::try_new(&column.name, self.arrow_schema, self.parquet_schema) | ||
} | ||
} | ||
|
||
impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> { | ||
fn min_values(&self, column: &Column) -> Option<ArrayRef> { | ||
let (column, field) = self.column(&column.name)?; | ||
min_statistics(field.data_type(), std::iter::once(column.statistics())).ok() | ||
self.statistics_converter(column) | ||
.and_then(|c| c.row_group_mins(self.metadata_iter())) | ||
.ok() | ||
} | ||
|
||
fn max_values(&self, column: &Column) -> Option<ArrayRef> { | ||
let (column, field) = self.column(&column.name)?; | ||
max_statistics(field.data_type(), std::iter::once(column.statistics())).ok() | ||
self.statistics_converter(column) | ||
.and_then(|c| c.row_group_maxes(self.metadata_iter())) | ||
.ok() | ||
} | ||
|
||
fn num_containers(&self) -> usize { | ||
1 | ||
self.row_group_metadatas.len() | ||
} | ||
|
||
fn null_counts(&self, column: &Column) -> Option<ArrayRef> { | ||
let (c, _) = self.column(&column.name)?; | ||
let scalar = ScalarValue::UInt64(Some(c.statistics()?.null_count())); | ||
scalar.to_array().ok() | ||
self.statistics_converter(column) | ||
.and_then(|c| c.row_group_null_counts(self.metadata_iter())) | ||
.ok() | ||
} | ||
|
||
fn row_counts(&self, column: &Column) -> Option<ArrayRef> { | ||
let (c, _) = self.column(&column.name)?; | ||
let scalar = ScalarValue::UInt64(Some(c.num_values() as u64)); | ||
scalar.to_array().ok() | ||
fn row_counts(&self, _column: &Column) -> Option<ArrayRef> { | ||
// row counts are the same for all columns in a row group | ||
StatisticsConverter::row_group_row_counts(self.metadata_iter()) | ||
.ok() | ||
.map(|counts| Arc::new(counts) as ArrayRef) | ||
} | ||
|
||
fn contained( | ||
|
@@ -429,6 +435,7 @@ mod tests { | |
use parquet::arrow::async_reader::ParquetObjectReader; | ||
use parquet::basic::LogicalType; | ||
use parquet::data_type::{ByteArray, FixedLenByteArray}; | ||
use parquet::file::metadata::ColumnChunkMetaData; | ||
use parquet::{ | ||
basic::Type as PhysicalType, file::statistics::Statistics as ParquetStatistics, | ||
schema::types::SchemaDescPtr, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a pretty good example of how the statistics API changed. FYI @NGA-TRAN