Skip to content

Commit

Permalink
[fix] wrong schema passed to get_statistics_with_limit
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Oct 28, 2021
1 parent f8edb38 commit c5cfcfb
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 12 deletions.
5 changes: 3 additions & 2 deletions datafusion/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,9 @@ impl TableProvider for ListingTable {
}

impl ListingTable {
/// Get the list of files for a scan. The list is grouped to let the execution plan
/// know how the files should be distributed to different threads / executors.
/// Get the list of files for a scan as well as the file level statistics.
/// The list is grouped to let the execution plan know how the files should
/// be distributed to different threads / executors.
async fn list_files_for_scan<'a>(
&'a self,
filters: &'a [Expr],
Expand Down
15 changes: 7 additions & 8 deletions datafusion/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,22 +37,21 @@ use crate::scalar::ScalarValue;
use futures::StreamExt;
use std::pin::Pin;

/// Get all files as well as the summary statistic
/// if the optional `limit` is provided, includes only sufficient files
/// needed to read up to `limit` number of rows
/// Get all files as well as the file level summary statistics (no statistic for partition columns).
/// If the optional `limit` is provided, includes only sufficient files.
/// Needed to read up to `limit` number of rows.
/// TODO fix case where `num_rows` and `total_byte_size` are not defined (stat should be None instead of Some(0))
/// TODO check that stats for partition columns are correct
pub async fn get_statistics_with_limit(
all_files: impl Stream<Item = Result<(PartitionedFile, Statistics)>>,
schema: SchemaRef,
file_schema: SchemaRef,
limit: Option<usize>,
) -> Result<(Vec<PartitionedFile>, Statistics)> {
let mut result_files = vec![];

let mut total_byte_size = 0;
let mut null_counts = vec![0; schema.fields().len()];
let mut null_counts = vec![0; file_schema.fields().len()];
let mut has_statistics = false;
let (mut max_values, mut min_values) = create_max_min_accs(&schema);
let (mut max_values, mut min_values) = create_max_min_accs(&file_schema);

let mut num_rows = 0;
let mut is_exact = true;
Expand Down Expand Up @@ -105,7 +104,7 @@ pub async fn get_statistics_with_limit(

let column_stats = if has_statistics {
Some(get_col_stats(
&*schema,
&*file_schema,
null_counts,
&mut max_values,
&mut min_values,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/physical_plan/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ impl PhysicalPlanConfig {
DEFAULT_PARTITION_COLUMN_DATATYPE.clone(),
false,
));
// TODO provide accurate stat for partition column
// TODO provide accurate stat for partition column (#1186)
table_cols_stats.push(ColumnStatistics::default())
}
}
Expand Down
66 changes: 65 additions & 1 deletion datafusion/tests/path_partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use datafusion::{
},
},
error::{DataFusionError, Result},
physical_plan::ColumnStatistics,
prelude::ExecutionContext,
test_util::{arrow_test_data, parquet_test_data},
};
Expand Down Expand Up @@ -157,6 +158,7 @@ async fn parquet_multiple_partitions() -> Result<()> {
],
&["year", "month", "day"],
"",
"alltypes_plain.parquet",
)
.await;

Expand Down Expand Up @@ -185,6 +187,65 @@ async fn parquet_multiple_partitions() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn parquet_statistics() -> Result<()> {
let mut ctx = ExecutionContext::new();

register_partitioned_alltypes_parquet(
&mut ctx,
&[
"year=2021/month=09/day=09/file.parquet",
"year=2021/month=10/day=09/file.parquet",
"year=2021/month=10/day=28/file.parquet",
],
&["year", "month", "day"],
"",
// This is the only file we found in the test set with
// actual stats. It has 1 column / 1 row.
"single_nan.parquet",
)
.await;

//// NO PROJECTION ////
let logical_plan = ctx.sql("SELECT * FROM t").await?.to_logical_plan();

let physical_plan = ctx.create_physical_plan(&logical_plan).await?;
assert_eq!(physical_plan.schema().fields().len(), 4);

let stat_cols = physical_plan
.statistics()
.column_statistics
.expect("col stats should be defined");
assert_eq!(stat_cols.len(), 4);
// stats for the first col are read from the parquet file
assert_eq!(stat_cols[0].null_count, Some(3));
// TODO assert partition column (1,2,3) stats once implemented (#1186)
assert_eq!(stat_cols[1], ColumnStatistics::default());
assert_eq!(stat_cols[2], ColumnStatistics::default());
assert_eq!(stat_cols[3], ColumnStatistics::default());

//// WITH PROJECTION ////
let logical_plan = ctx
.sql("SELECT mycol, day FROM t WHERE day='28'")
.await?
.to_logical_plan();

let physical_plan = ctx.create_physical_plan(&logical_plan).await?;
assert_eq!(physical_plan.schema().fields().len(), 2);

let stat_cols = physical_plan
.statistics()
.column_statistics
.expect("col stats should be defined");
assert_eq!(stat_cols.len(), 2);
// stats for the first col are read from the parquet file
assert_eq!(stat_cols[0].null_count, Some(1));
// TODO assert partition column stats once implemented (#1186)
assert_eq!(stat_cols[1], ColumnStatistics::default());

Ok(())
}

#[tokio::test]
async fn parquet_overlapping_columns() -> Result<()> {
let mut ctx = ExecutionContext::new();
Expand All @@ -199,6 +260,7 @@ async fn parquet_overlapping_columns() -> Result<()> {
],
&["id"],
"",
"alltypes_plain.parquet",
)
.await;

Expand Down Expand Up @@ -237,14 +299,16 @@ async fn register_partitioned_alltypes_parquet(
store_paths: &[&str],
partition_cols: &[&str],
table_path: &str,
source_file: &str,
) {
let testdata = parquet_test_data();
let parquet_file_path = format!("{}/alltypes_plain.parquet", testdata);
let parquet_file_path = format!("{}/{}", testdata, source_file);
let object_store =
MirroringObjectStore::new_arc(parquet_file_path.clone(), store_paths);

let mut options = ListingOptions::new(Arc::new(ParquetFormat::default()));
options.table_partition_cols = partition_cols.iter().map(|&s| s.to_owned()).collect();
options.collect_stat = true;

let file_schema = options
.infer_schema(Arc::clone(&object_store), store_paths[0])
Expand Down

0 comments on commit c5cfcfb

Please sign in to comment.