-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support user defined ParquetAccessPlan
in ParquetExec
, validation to ParquetAccessPlan::select
#10813
Support user defined ParquetAccessPlan
in ParquetExec
, validation to ParquetAccessPlan::select
#10813
Changes from 1 commit
38fa45a
bdb51b2
6a602de
baae282
7799d65
69509a4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,6 +15,7 @@ | |
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
use datafusion_common::{internal_err, Result}; | ||
use parquet::arrow::arrow_reader::{RowSelection, RowSelector}; | ||
use parquet::file::metadata::RowGroupMetaData; | ||
|
||
|
@@ -182,6 +183,11 @@ impl ParquetAccessPlan { | |
/// is returned for *all* the rows in the row groups that are not skipped. | ||
/// Thus it includes a `Select` selection for any [`RowGroupAccess::Scan`]. | ||
/// | ||
/// # Errors | ||
/// | ||
/// Returns an error if the specified row selection does not specify | ||
/// the same number of rows as in `row_group_metadata`. | ||
alamb marked this conversation as resolved.
Show resolved
Hide resolved
|
||
/// | ||
/// # Example: No Selections | ||
/// | ||
/// Given an access plan like this | ||
|
@@ -228,7 +234,7 @@ impl ParquetAccessPlan { | |
pub fn into_overall_row_selection( | ||
self, | ||
row_group_meta_data: &[RowGroupMetaData], | ||
) -> Option<RowSelection> { | ||
) -> Result<Option<RowSelection>> { | ||
assert_eq!(row_group_meta_data.len(), self.row_groups.len()); | ||
// Intuition: entire row groups are filtered out using | ||
// `row_group_indexes` which come from Skip and Scan. An overall | ||
|
@@ -239,7 +245,32 @@ impl ParquetAccessPlan { | |
.iter() | ||
.any(|rg| matches!(rg, RowGroupAccess::Selection(_))) | ||
{ | ||
return None; | ||
return Ok(None); | ||
} | ||
|
||
// validate all Selections | ||
for (idx, (rg, rg_meta)) in self | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is new checking added as users can pass in |
||
.row_groups | ||
.iter() | ||
.zip(row_group_meta_data.iter()) | ||
.enumerate() | ||
{ | ||
let RowGroupAccess::Selection(selection) = rg else { | ||
continue; | ||
}; | ||
let rows_in_selection = selection | ||
.iter() | ||
.map(|selection| selection.row_count) | ||
.sum::<usize>(); | ||
|
||
let row_group_row_count = rg_meta.num_rows(); | ||
if rows_in_selection as i64 != row_group_row_count { | ||
return internal_err!( | ||
"Invalid ParquetAccessPlan Selection. Row group {idx} has {row_group_row_count} rows \ | ||
but selection only specifies {rows_in_selection} rows. \ | ||
Selection: {selection:?}" | ||
); | ||
} | ||
} | ||
|
||
let total_selection: RowSelection = self | ||
|
@@ -261,7 +292,7 @@ impl ParquetAccessPlan { | |
}) | ||
.collect(); | ||
|
||
Some(total_selection) | ||
Ok(Some(total_selection)) | ||
} | ||
|
||
/// Return an iterator over the row group indexes that should be scanned | ||
|
@@ -305,6 +336,7 @@ impl ParquetAccessPlan { | |
#[cfg(test)] | ||
mod test { | ||
use super::*; | ||
use datafusion_common::assert_contains; | ||
use parquet::basic::LogicalType; | ||
use parquet::file::metadata::ColumnChunkMetaData; | ||
use parquet::schema::types::{SchemaDescPtr, SchemaDescriptor}; | ||
|
@@ -320,7 +352,9 @@ mod test { | |
]); | ||
|
||
let row_group_indexes = access_plan.row_group_indexes(); | ||
let row_selection = access_plan.into_overall_row_selection(row_group_metadata()); | ||
let row_selection = access_plan | ||
.into_overall_row_selection(row_group_metadata()) | ||
.unwrap(); | ||
|
||
// scan all row groups, no selection | ||
assert_eq!(row_group_indexes, vec![0, 1, 2, 3]); | ||
|
@@ -337,7 +371,9 @@ mod test { | |
]); | ||
|
||
let row_group_indexes = access_plan.row_group_indexes(); | ||
let row_selection = access_plan.into_overall_row_selection(row_group_metadata()); | ||
let row_selection = access_plan | ||
.into_overall_row_selection(row_group_metadata()) | ||
.unwrap(); | ||
|
||
// skip all row groups, no selection | ||
assert_eq!(row_group_indexes, vec![] as Vec<usize>); | ||
|
@@ -348,14 +384,22 @@ mod test { | |
let access_plan = ParquetAccessPlan::new(vec![ | ||
RowGroupAccess::Scan, | ||
RowGroupAccess::Selection( | ||
vec![RowSelector::select(5), RowSelector::skip(7)].into(), | ||
// select / skip all 20 rows in row group 1 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: to be consistent with L427 in this file, it would be better to call it as |
||
vec![ | ||
RowSelector::select(5), | ||
RowSelector::skip(7), | ||
RowSelector::select(8), | ||
] | ||
.into(), | ||
), | ||
RowGroupAccess::Skip, | ||
RowGroupAccess::Skip, | ||
]); | ||
|
||
let row_group_indexes = access_plan.row_group_indexes(); | ||
let row_selection = access_plan.into_overall_row_selection(row_group_metadata()); | ||
let row_selection = access_plan | ||
.into_overall_row_selection(row_group_metadata()) | ||
.unwrap(); | ||
|
||
assert_eq!(row_group_indexes, vec![0, 1]); | ||
assert_eq!( | ||
|
@@ -366,7 +410,8 @@ mod test { | |
RowSelector::select(10), | ||
// selectors from the second row group | ||
RowSelector::select(5), | ||
RowSelector::skip(7) | ||
RowSelector::skip(7), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. turns out that some of the existing unit tests were actually invalid. However, I think the issues are actually test problem, not actual code problems. All the actual parquet reader tests passed |
||
RowSelector::select(8) | ||
] | ||
.into() | ||
) | ||
|
@@ -379,13 +424,21 @@ mod test { | |
RowGroupAccess::Skip, | ||
RowGroupAccess::Scan, | ||
RowGroupAccess::Selection( | ||
vec![RowSelector::select(5), RowSelector::skip(7)].into(), | ||
// specify all 30 rows in row group 1 | ||
vec![ | ||
RowSelector::select(5), | ||
RowSelector::skip(7), | ||
RowSelector::select(18), | ||
] | ||
.into(), | ||
), | ||
RowGroupAccess::Scan, | ||
]); | ||
|
||
let row_group_indexes = access_plan.row_group_indexes(); | ||
let row_selection = access_plan.into_overall_row_selection(row_group_metadata()); | ||
let row_selection = access_plan | ||
.into_overall_row_selection(row_group_metadata()) | ||
.unwrap(); | ||
|
||
assert_eq!(row_group_indexes, vec![1, 2, 3]); | ||
assert_eq!( | ||
|
@@ -397,6 +450,7 @@ mod test { | |
// selectors from the third row group | ||
RowSelector::select(5), | ||
RowSelector::skip(7), | ||
RowSelector::select(18), | ||
// select the entire fourth row group | ||
RowSelector::select(40), | ||
] | ||
|
@@ -405,6 +459,53 @@ mod test { | |
); | ||
} | ||
|
||
#[test] | ||
fn test_invalid_too_few() { | ||
let access_plan = ParquetAccessPlan::new(vec![ | ||
RowGroupAccess::Scan, | ||
// select 12 rows, but row group 1 has 20 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: -> I think the select is referred to as selection, which the following code also includes a skip. |
||
RowGroupAccess::Selection( | ||
vec![RowSelector::select(5), RowSelector::skip(7)].into(), | ||
), | ||
RowGroupAccess::Scan, | ||
RowGroupAccess::Scan, | ||
]); | ||
|
||
let row_group_indexes = access_plan.row_group_indexes(); | ||
let err = access_plan | ||
.into_overall_row_selection(row_group_metadata()) | ||
.unwrap_err() | ||
.to_string(); | ||
assert_eq!(row_group_indexes, vec![0, 1, 2, 3]); | ||
assert_contains!(err, "Internal error: Invalid ParquetAccessPlan Selection. Row group 1 has 20 rows but selection only specifies 12 rows"); | ||
} | ||
|
||
#[test] | ||
fn test_invalid_too_many() { | ||
let access_plan = ParquetAccessPlan::new(vec![ | ||
RowGroupAccess::Scan, | ||
// select 22 rows, but row group 1 has only 20 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto. |
||
RowGroupAccess::Selection( | ||
vec![ | ||
RowSelector::select(10), | ||
RowSelector::skip(2), | ||
RowSelector::select(10), | ||
] | ||
.into(), | ||
), | ||
RowGroupAccess::Scan, | ||
RowGroupAccess::Scan, | ||
]); | ||
|
||
let row_group_indexes = access_plan.row_group_indexes(); | ||
let err = access_plan | ||
.into_overall_row_selection(row_group_metadata()) | ||
.unwrap_err() | ||
.to_string(); | ||
assert_eq!(row_group_indexes, vec![0, 1, 2, 3]); | ||
assert_contains!(err, "Invalid ParquetAccessPlan Selection. Row group 1 has 20 rows but selection only specifies 22 rows"); | ||
} | ||
|
||
static ROW_GROUP_METADATA: OnceLock<Vec<RowGroupMetaData>> = OnceLock::new(); | ||
|
||
/// [`RowGroupMetaData`] that returns 4 row groups with 10, 20, 30, 40 rows | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -145,6 +145,52 @@ pub use writer::plan_to_parquet; | |
/// custom reader is used, it supplies the metadata directly and this parameter | ||
/// is ignored. [`ParquetExecBuilder::with_metadata_size_hint`] for more details. | ||
/// | ||
/// * User provided [`ParquetAccessPlan`]s to skip row groups and/or pages | ||
/// based on external information. See "Implementing External Indexes" below | ||
/// | ||
/// # Implementing External Indexes | ||
/// | ||
/// It is possible to restrict the row groups and selections within those row | ||
/// groups that the ParquetExec will consider by providing an initial | ||
/// [`ParquetAccessPlan`] as `extensions` on [`PartitionedFile`]. This can be | ||
/// used to implement external indexes on top of parquet files and select only | ||
/// portions of the files. | ||
/// | ||
/// The `ParquetExec` will try and further reduce any provided | ||
/// `ParquetAccessPlan` further based on the contents of `ParquetMetadata` and | ||
Comment on lines
+159
to
+160
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: there are two
|
||
/// other settings. | ||
/// | ||
/// ## Example of providing a ParquetAccessPlan | ||
/// | ||
/// ``` | ||
/// # use std::sync::Arc; | ||
/// # use arrow_schema::{Schema, SchemaRef}; | ||
/// # use datafusion::datasource::listing::PartitionedFile; | ||
/// # use datafusion::datasource::physical_plan::parquet::ParquetAccessPlan; | ||
/// # use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec}; | ||
/// # use datafusion_execution::object_store::ObjectStoreUrl; | ||
/// # fn schema() -> SchemaRef { | ||
/// # Arc::new(Schema::empty()) | ||
/// # } | ||
/// // create an access plan to scan row group 0, 1 and 3 and skip row groups 2 and 4 | ||
/// let mut access_plan = ParquetAccessPlan::new_all(5); | ||
/// access_plan.skip(2); | ||
/// access_plan.skip(4); | ||
/// // provide the plan as extension to the FileScanConfig | ||
/// let partitioned_file = PartitionedFile::new("my_file.parquet", 1234) | ||
/// .with_extensions(Arc::new(access_plan)); | ||
/// // create a ParquetExec to scan this file | ||
/// let file_scan_config = FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema()) | ||
/// .with_file(partitioned_file); | ||
/// // this parquet exec will not even try to read row groups 2 and 4. Additional | ||
/// // pruning based on predicates may also happen | ||
/// let exec = ParquetExec::builder(file_scan_config).build(); | ||
/// ``` | ||
/// | ||
/// For a complete example, see the [`parquet_index_advanced` example]). | ||
/// | ||
/// [`parquet_index_advanced` example]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/parquet_index_advanced.rs | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will be added in #10701 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. BTW #10701 (example of how to use this API) is ready for review |
||
/// | ||
/// # Execution Overview | ||
/// | ||
/// * Step 1: [`ParquetExec::execute`] is called, returning a [`FileStream`] | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,6 +28,7 @@ use crate::datasource::physical_plan::{ | |
use crate::datasource::schema_adapter::SchemaAdapterFactory; | ||
use crate::physical_optimizer::pruning::PruningPredicate; | ||
use arrow_schema::{ArrowError, SchemaRef}; | ||
use datafusion_common::{exec_err, Result}; | ||
use datafusion_physical_expr_common::physical_expr::PhysicalExpr; | ||
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; | ||
use futures::{StreamExt, TryStreamExt}; | ||
|
@@ -60,11 +61,10 @@ pub(super) struct ParquetOpener { | |
impl FileOpener for ParquetOpener { | ||
fn open(&self, file_meta: FileMeta) -> datafusion_common::Result<FileOpenFuture> { | ||
let file_range = file_meta.range.clone(); | ||
let file_metrics = ParquetFileMetrics::new( | ||
self.partition_index, | ||
file_meta.location().as_ref(), | ||
&self.metrics, | ||
); | ||
let extensions = file_meta.extensions.clone(); | ||
let file_name = file_meta.location().to_string(); | ||
let file_metrics = | ||
ParquetFileMetrics::new(self.partition_index, &file_name, &self.metrics); | ||
|
||
let reader: Box<dyn AsyncFileReader> = | ||
self.parquet_file_reader_factory.create_reader( | ||
|
@@ -139,7 +139,8 @@ impl FileOpener for ParquetOpener { | |
let predicate = pruning_predicate.as_ref().map(|p| p.as_ref()); | ||
let rg_metadata = file_metadata.row_groups(); | ||
// track which row groups to actually read | ||
let access_plan = ParquetAccessPlan::new_all(rg_metadata.len()); | ||
let access_plan = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the required plumbing, which I am quite pleased with -- it is quite straightforward now |
||
create_initial_plan(&file_name, extensions, rg_metadata.len())?; | ||
let mut row_groups = RowGroupAccessPlanFilter::new(access_plan); | ||
// if there is a range restricting what parts of the file to read | ||
if let Some(range) = file_range.as_ref() { | ||
|
@@ -186,7 +187,7 @@ impl FileOpener for ParquetOpener { | |
|
||
let row_group_indexes = access_plan.row_group_indexes(); | ||
if let Some(row_selection) = | ||
access_plan.into_overall_row_selection(rg_metadata) | ||
access_plan.into_overall_row_selection(rg_metadata)? | ||
{ | ||
builder = builder.with_row_selection(row_selection); | ||
} | ||
|
@@ -212,3 +213,34 @@ impl FileOpener for ParquetOpener { | |
})) | ||
} | ||
} | ||
|
||
/// Return the initial [`ParquetAccessPlan`] | ||
/// | ||
/// If the user has supplied one as an extension, use that | ||
/// otherwise return a plan that scans all row groups | ||
/// | ||
/// Returns an error is an invalid `ParquetAccessPlan` is provided | ||
/// | ||
/// Note: path is only used for error messages | ||
alamb marked this conversation as resolved.
Show resolved
Hide resolved
|
||
fn create_initial_plan( | ||
file_name: &str, | ||
extensions: Option<Arc<dyn std::any::Any + Send + Sync>>, | ||
row_group_count: usize, | ||
) -> Result<ParquetAccessPlan> { | ||
if let Some(extensions) = extensions { | ||
if let Some(access_plan) = extensions.downcast_ref::<ParquetAccessPlan>() { | ||
let plan_len = access_plan.len(); | ||
if plan_len != row_group_count { | ||
return exec_err!( | ||
"Invalid ParquetAccessPlan for {file_name}. Specified {plan_len} row groups, but file has {row_group_count}" | ||
); | ||
} | ||
|
||
// check row group count matches the plan | ||
return Ok(access_plan.clone()); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: is it better to add a logging in the else branch? |
||
} | ||
|
||
// default to scanning all row groups | ||
Ok(ParquetAccessPlan::new_all(row_group_count)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since users can now provide a
ParquetAccessPlan
it is important to do validation on the contents.While technically we could avoid doing this validation when the selections came from the page pruning, I think it would be a good check to have to catch future bugs rather than subtle wrong results so I chose to always validate