diff --git a/datafusion/src/datasource/listing/table.rs b/datafusion/src/datasource/listing/table.rs index 81eaed5488a7..aadc340b46c9 100644 --- a/datafusion/src/datasource/listing/table.rs +++ b/datafusion/src/datasource/listing/table.rs @@ -224,8 +224,9 @@ impl TableProvider for ListingTable { } impl ListingTable { - /// Get the list of files for a scan. The list is grouped to let the execution plan - /// know how the files should be distributed to different threads / executors. + /// Get the list of files for a scan as well as the file level statistics. + /// The list is grouped to let the execution plan know how the files should + /// be distributed to different threads / executors. async fn list_files_for_scan<'a>( &'a self, filters: &'a [Expr], diff --git a/datafusion/src/datasource/mod.rs b/datafusion/src/datasource/mod.rs index efea47b1d437..9f4f77f7ea28 100644 --- a/datafusion/src/datasource/mod.rs +++ b/datafusion/src/datasource/mod.rs @@ -37,22 +37,21 @@ use crate::scalar::ScalarValue; use futures::StreamExt; use std::pin::Pin; -/// Get all files as well as the summary statistic -/// if the optional `limit` is provided, includes only sufficient files -/// needed to read up to `limit` number of rows +/// Get all files as well as the file level summary statistics (no statistic for partition columns). +/// If the optional `limit` is provided, includes only sufficient files. +/// Needed to read up to `limit` number of rows. /// TODO fix case where `num_rows` and `total_byte_size` are not defined (stat should be None instead of Some(0)) -/// TODO check that stats for partition columns are correct pub async fn get_statistics_with_limit( all_files: impl Stream>, - schema: SchemaRef, + file_schema: SchemaRef, limit: Option, ) -> Result<(Vec, Statistics)> { let mut result_files = vec![]; let mut total_byte_size = 0; - let mut null_counts = vec![0; schema.fields().len()]; + let mut null_counts = vec![0; file_schema.fields().len()]; let mut has_statistics = false; - let (mut max_values, mut min_values) = create_max_min_accs(&schema); + let (mut max_values, mut min_values) = create_max_min_accs(&file_schema); let mut num_rows = 0; let mut is_exact = true; @@ -105,7 +104,7 @@ pub async fn get_statistics_with_limit( let column_stats = if has_statistics { Some(get_col_stats( - &*schema, + &*file_schema, null_counts, &mut max_values, &mut min_values, diff --git a/datafusion/src/physical_plan/file_format/mod.rs b/datafusion/src/physical_plan/file_format/mod.rs index 64788b9bc046..cd30c6f7e05b 100644 --- a/datafusion/src/physical_plan/file_format/mod.rs +++ b/datafusion/src/physical_plan/file_format/mod.rs @@ -108,7 +108,7 @@ impl PhysicalPlanConfig { DEFAULT_PARTITION_COLUMN_DATATYPE.clone(), false, )); - // TODO provide accurate stat for partition column + // TODO provide accurate stat for partition column (#1186) table_cols_stats.push(ColumnStatistics::default()) } } diff --git a/datafusion/tests/path_partition.rs b/datafusion/tests/path_partition.rs index 74179c4f3f1e..789511065fc8 100644 --- a/datafusion/tests/path_partition.rs +++ b/datafusion/tests/path_partition.rs @@ -31,6 +31,7 @@ use datafusion::{ }, }, error::{DataFusionError, Result}, + physical_plan::ColumnStatistics, prelude::ExecutionContext, test_util::{arrow_test_data, parquet_test_data}, }; @@ -157,6 +158,7 @@ async fn parquet_multiple_partitions() -> Result<()> { ], &["year", "month", "day"], "", + "alltypes_plain.parquet", ) .await; @@ -185,6 +187,65 @@ async fn parquet_multiple_partitions() -> Result<()> { Ok(()) } +#[tokio::test] +async fn parquet_statistics() -> Result<()> { + let mut ctx = ExecutionContext::new(); + + register_partitioned_alltypes_parquet( + &mut ctx, + &[ + "year=2021/month=09/day=09/file.parquet", + "year=2021/month=10/day=09/file.parquet", + "year=2021/month=10/day=28/file.parquet", + ], + &["year", "month", "day"], + "", + // This is the only file we found in the test set with + // actual stats. It has 1 column / 1 row. + "single_nan.parquet", + ) + .await; + + //// NO PROJECTION //// + let logical_plan = ctx.sql("SELECT * FROM t").await?.to_logical_plan(); + + let physical_plan = ctx.create_physical_plan(&logical_plan).await?; + assert_eq!(physical_plan.schema().fields().len(), 4); + + let stat_cols = physical_plan + .statistics() + .column_statistics + .expect("col stats should be defined"); + assert_eq!(stat_cols.len(), 4); + // stats for the first col are read from the parquet file + assert_eq!(stat_cols[0].null_count, Some(3)); + // TODO assert partition column (1,2,3) stats once implemented (#1186) + assert_eq!(stat_cols[1], ColumnStatistics::default()); + assert_eq!(stat_cols[2], ColumnStatistics::default()); + assert_eq!(stat_cols[3], ColumnStatistics::default()); + + //// WITH PROJECTION //// + let logical_plan = ctx + .sql("SELECT mycol, day FROM t WHERE day='28'") + .await? + .to_logical_plan(); + + let physical_plan = ctx.create_physical_plan(&logical_plan).await?; + assert_eq!(physical_plan.schema().fields().len(), 2); + + let stat_cols = physical_plan + .statistics() + .column_statistics + .expect("col stats should be defined"); + assert_eq!(stat_cols.len(), 2); + // stats for the first col are read from the parquet file + assert_eq!(stat_cols[0].null_count, Some(1)); + // TODO assert partition column stats once implemented (#1186) + assert_eq!(stat_cols[1], ColumnStatistics::default()); + + Ok(()) +} + #[tokio::test] async fn parquet_overlapping_columns() -> Result<()> { let mut ctx = ExecutionContext::new(); @@ -199,6 +260,7 @@ async fn parquet_overlapping_columns() -> Result<()> { ], &["id"], "", + "alltypes_plain.parquet", ) .await; @@ -237,14 +299,16 @@ async fn register_partitioned_alltypes_parquet( store_paths: &[&str], partition_cols: &[&str], table_path: &str, + source_file: &str, ) { let testdata = parquet_test_data(); - let parquet_file_path = format!("{}/alltypes_plain.parquet", testdata); + let parquet_file_path = format!("{}/{}", testdata, source_file); let object_store = MirroringObjectStore::new_arc(parquet_file_path.clone(), store_paths); let mut options = ListingOptions::new(Arc::new(ParquetFormat::default())); options.table_partition_cols = partition_cols.iter().map(|&s| s.to_owned()).collect(); + options.collect_stat = true; let file_schema = options .infer_schema(Arc::clone(&object_store), store_paths[0])