diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 30ab9a339b54..6481211d3f23 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -297,6 +297,11 @@ config_namespace! { /// Should DataFusion support recursive CTEs pub enable_recursive_ctes: bool, default = true + + /// Attempt to eliminate sorts by packing & sorting files with non-overlapping + /// statistics into the same file groups. + /// Currently experimental + pub split_file_groups_by_statistics: bool, default = false } } diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index 5ee0f7186703..fdb89a264951 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -150,6 +150,7 @@ pub(crate) mod test_util { object_meta: meta, partition_values: vec![], range: None, + statistics: None, extensions: None, }]]; diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs index 9dfd18f1881e..60a0dce25a13 100644 --- a/datafusion/core/src/datasource/listing/helpers.rs +++ b/datafusion/core/src/datasource/listing/helpers.rs @@ -376,6 +376,7 @@ pub async fn pruned_partition_list<'a>( object_meta, partition_values: partition_values.clone(), range: None, + statistics: None, extensions: None, }) })); diff --git a/datafusion/core/src/datasource/listing/mod.rs b/datafusion/core/src/datasource/listing/mod.rs index b8c279c8a7f1..d0361d7b32c1 100644 --- a/datafusion/core/src/datasource/listing/mod.rs +++ b/datafusion/core/src/datasource/listing/mod.rs @@ -24,7 +24,7 @@ mod url; use crate::error::Result; use chrono::TimeZone; -use datafusion_common::ScalarValue; +use datafusion_common::{ScalarValue, Statistics}; use futures::Stream; use object_store::{path::Path, ObjectMeta}; use std::pin::Pin; @@ -67,6 +67,11 @@ pub struct PartitionedFile { pub partition_values: Vec, /// An optional file range for a more fine-grained parallel execution pub range: Option, + /// Optional statistics that describe the data in this file if known. + /// + /// DataFusion relies on these statistics for planning (in particular to sort file groups), + /// so if they are incorrect, incorrect answers may result. + pub statistics: Option, /// An optional field for user defined per object metadata pub extensions: Option>, } @@ -83,6 +88,7 @@ impl PartitionedFile { }, partition_values: vec![], range: None, + statistics: None, extensions: None, } } @@ -98,7 +104,8 @@ impl PartitionedFile { version: None, }, partition_values: vec![], - range: None, + range: Some(FileRange { start, end }), + statistics: None, extensions: None, } .with_range(start, end) @@ -128,6 +135,7 @@ impl From for PartitionedFile { object_meta, partition_values: vec![], range: None, + statistics: None, extensions: None, } } diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 6ee19828f1d4..4b1994a1797c 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -739,16 +739,43 @@ impl TableProvider for ListingTable { filters: &[Expr], limit: Option, ) -> Result> { - let (partitioned_file_lists, statistics) = + let (mut partitioned_file_lists, statistics) = self.list_files_for_scan(state, filters, limit).await?; + let projected_schema = project_schema(&self.schema(), projection)?; + // if no files need to be read, return an `EmptyExec` if partitioned_file_lists.is_empty() { - let schema = self.schema(); - let projected_schema = project_schema(&schema, projection)?; return Ok(Arc::new(EmptyExec::new(projected_schema))); } + let output_ordering = self.try_create_output_ordering()?; + match state + .config_options() + .execution + .split_file_groups_by_statistics + .then(|| { + output_ordering.first().map(|output_ordering| { + FileScanConfig::split_groups_by_statistics( + &self.table_schema, + &partitioned_file_lists, + output_ordering, + ) + }) + }) + .flatten() + { + Some(Err(e)) => log::debug!("failed to split file groups by statistics: {e}"), + Some(Ok(new_groups)) => { + if new_groups.len() <= self.options.target_partitions { + partitioned_file_lists = new_groups; + } else { + log::debug!("attempted to split file groups by statistics, but there were more file groups than target_partitions; falling back to unordered") + } + } + None => {} // no ordering required + }; + // extract types of partition columns let table_partition_cols = self .options @@ -772,6 +799,7 @@ impl TableProvider for ListingTable { } else { return Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty())))); }; + // create the execution plan self.options .format @@ -784,7 +812,7 @@ impl TableProvider for ListingTable { statistics, projection: projection.cloned(), limit, - output_ordering: self.try_create_output_ordering()?, + output_ordering, table_partition_cols, }, filters.as_ref(), @@ -937,10 +965,11 @@ impl ListingTable { // collect the statistics if required by the config let files = file_list .map(|part_file| async { - let part_file = part_file?; + let mut part_file = part_file?; if self.options.collect_stat { let statistics = self.do_collect_statistics(ctx, &store, &part_file).await?; + part_file.statistics = Some(statistics.clone()); Ok((part_file, statistics)) as Result<(PartitionedFile, Statistics)> } else { Ok((part_file, Statistics::new_unknown(&self.file_schema))) diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs index 1ea411cb6f59..4de7eb136f22 100644 --- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs +++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs @@ -22,7 +22,9 @@ use std::{ borrow::Cow, collections::HashMap, fmt::Debug, marker::PhantomData, sync::Arc, vec, }; -use super::{get_projected_output_ordering, FileGroupPartitioner}; +use super::{ + get_projected_output_ordering, statistics::MinMaxStatistics, FileGroupPartitioner, +}; use crate::datasource::{listing::PartitionedFile, object_store::ObjectStoreUrl}; use crate::{error::Result, scalar::ScalarValue}; @@ -33,7 +35,7 @@ use arrow_array::{ArrayRef, DictionaryArray, RecordBatch, RecordBatchOptions}; use arrow_schema::{DataType, Field, Schema, SchemaRef}; use datafusion_common::stats::Precision; use datafusion_common::{exec_err, ColumnStatistics, DataFusionError, Statistics}; -use datafusion_physical_expr::LexOrdering; +use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr}; use log::warn; @@ -138,12 +140,14 @@ impl FileScanConfig { column_statistics: table_cols_stats, }; - let table_schema = Arc::new( + let projected_schema = Arc::new( Schema::new(table_fields).with_metadata(self.file_schema.metadata().clone()), ); + let projected_output_ordering = - get_projected_output_ordering(self, &table_schema); - (table_schema, table_stats, projected_output_ordering) + get_projected_output_ordering(self, &projected_schema); + + (projected_schema, table_stats, projected_output_ordering) } #[allow(unused)] // Only used by avro @@ -194,6 +198,71 @@ impl FileScanConfig { .with_repartition_file_min_size(repartition_file_min_size) .repartition_file_groups(&file_groups) } + + /// Attempts to do a bin-packing on files into file groups, such that any two files + /// in a file group are ordered and non-overlapping with respect to their statistics. + /// It will produce the smallest number of file groups possible. + pub fn split_groups_by_statistics( + table_schema: &SchemaRef, + file_groups: &[Vec], + sort_order: &[PhysicalSortExpr], + ) -> Result>> { + let flattened_files = file_groups.iter().flatten().collect::>(); + // First Fit: + // * Choose the first file group that a file can be placed into. + // * If it fits into no existing file groups, create a new one. + // + // By sorting files by min values and then applying first-fit bin packing, + // we can produce the smallest number of file groups such that + // files within a group are in order and non-overlapping. + // + // Source: Applied Combinatorics (Keller and Trotter), Chapter 6.8 + // https://www.appliedcombinatorics.org/book/s_posets_dilworth-intord.html + + if flattened_files.is_empty() { + return Ok(vec![]); + } + + let statistics = MinMaxStatistics::new_from_files( + sort_order, + table_schema, + None, + flattened_files.iter().copied(), + ) + .map_err(|e| { + e.context("construct min/max statistics for split_groups_by_statistics") + })?; + + let indices_sorted_by_min = statistics.min_values_sorted(); + let mut file_groups_indices: Vec> = vec![]; + + for (idx, min) in indices_sorted_by_min { + let file_group_to_insert = file_groups_indices.iter_mut().find(|group| { + // If our file is non-overlapping and comes _after_ the last file, + // it fits in this file group. + min > statistics.max( + *group + .last() + .expect("groups should be nonempty at construction"), + ) + }); + match file_group_to_insert { + Some(group) => group.push(idx), + None => file_groups_indices.push(vec![idx]), + } + } + + // Assemble indices back into groups of PartitionedFiles + Ok(file_groups_indices + .into_iter() + .map(|file_group_indices| { + file_group_indices + .into_iter() + .map(|idx| flattened_files[idx].clone()) + .collect() + }) + .collect()) + } } /// A helper that projects partition columns into the file record batches. @@ -770,6 +839,277 @@ mod tests { assert_eq!(projection.fields(), schema.fields()); } + #[test] + fn test_split_groups_by_statistics() -> Result<()> { + use chrono::TimeZone; + use datafusion_common::DFSchema; + use datafusion_expr::execution_props::ExecutionProps; + use object_store::{path::Path, ObjectMeta}; + + struct File { + name: &'static str, + date: &'static str, + statistics: Vec>, + } + impl File { + fn new( + name: &'static str, + date: &'static str, + statistics: Vec>, + ) -> Self { + Self { + name, + date, + statistics, + } + } + } + + struct TestCase { + name: &'static str, + file_schema: Schema, + files: Vec, + sort: Vec, + expected_result: Result>, &'static str>, + } + + use datafusion_expr::col; + let cases = vec![ + TestCase { + name: "test sort", + file_schema: Schema::new(vec![Field::new( + "value".to_string(), + DataType::Float64, + false, + )]), + files: vec![ + File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]), + File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]), + File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]), + ], + sort: vec![col("value").sort(true, false)], + expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]), + }, + // same input but file '2' is in the middle + // test that we still order correctly + TestCase { + name: "test sort with files ordered differently", + file_schema: Schema::new(vec![Field::new( + "value".to_string(), + DataType::Float64, + false, + )]), + files: vec![ + File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]), + File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]), + File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]), + ], + sort: vec![col("value").sort(true, false)], + expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]), + }, + TestCase { + name: "reverse sort", + file_schema: Schema::new(vec![Field::new( + "value".to_string(), + DataType::Float64, + false, + )]), + files: vec![ + File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]), + File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]), + File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]), + ], + sort: vec![col("value").sort(false, true)], + expected_result: Ok(vec![vec!["1", "0"], vec!["2"]]), + }, + // reject nullable sort columns + TestCase { + name: "no nullable sort columns", + file_schema: Schema::new(vec![Field::new( + "value".to_string(), + DataType::Float64, + true, // should fail because nullable + )]), + files: vec![ + File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]), + File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]), + File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]), + ], + sort: vec![col("value").sort(true, false)], + expected_result: Err("construct min/max statistics for split_groups_by_statistics\ncaused by\nbuild min rows\ncaused by\ncreate sorting columns\ncaused by\nError during planning: cannot sort by nullable column") + }, + TestCase { + name: "all three non-overlapping", + file_schema: Schema::new(vec![Field::new( + "value".to_string(), + DataType::Float64, + false, + )]), + files: vec![ + File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]), + File::new("1", "2023-01-01", vec![Some((0.50, 0.99))]), + File::new("2", "2023-01-02", vec![Some((1.00, 1.49))]), + ], + sort: vec![col("value").sort(true, false)], + expected_result: Ok(vec![vec!["0", "1", "2"]]), + }, + TestCase { + name: "all three overlapping", + file_schema: Schema::new(vec![Field::new( + "value".to_string(), + DataType::Float64, + false, + )]), + files: vec![ + File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]), + File::new("1", "2023-01-01", vec![Some((0.00, 0.49))]), + File::new("2", "2023-01-02", vec![Some((0.00, 0.49))]), + ], + sort: vec![col("value").sort(true, false)], + expected_result: Ok(vec![vec!["0"], vec!["1"], vec!["2"]]), + }, + TestCase { + name: "empty input", + file_schema: Schema::new(vec![Field::new( + "value".to_string(), + DataType::Float64, + false, + )]), + files: vec![], + sort: vec![col("value").sort(true, false)], + expected_result: Ok(vec![]), + }, + TestCase { + name: "one file missing statistics", + file_schema: Schema::new(vec![Field::new( + "value".to_string(), + DataType::Float64, + false, + )]), + files: vec![ + File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]), + File::new("1", "2023-01-01", vec![Some((0.00, 0.49))]), + File::new("2", "2023-01-02", vec![None]), + ], + sort: vec![col("value").sort(true, false)], + expected_result: Err("construct min/max statistics for split_groups_by_statistics\ncaused by\ncollect min/max values\ncaused by\nget min/max for column: 'value'\ncaused by\nError during planning: statistics not found"), + }, + ]; + + for case in cases { + let table_schema = Arc::new(Schema::new( + case.file_schema + .fields() + .clone() + .into_iter() + .cloned() + .chain(Some(Arc::new(Field::new( + "date".to_string(), + DataType::Utf8, + false, + )))) + .collect::>(), + )); + let sort_order = case + .sort + .into_iter() + .map(|expr| { + crate::physical_planner::create_physical_sort_expr( + &expr, + &DFSchema::try_from(table_schema.as_ref().clone())?, + &ExecutionProps::default(), + ) + }) + .collect::>>()?; + + let partitioned_files = + case.files.into_iter().map(From::from).collect::>(); + let result = FileScanConfig::split_groups_by_statistics( + &table_schema, + &[partitioned_files.clone()], + &sort_order, + ); + let results_by_name = result + .as_ref() + .map(|file_groups| { + file_groups + .iter() + .map(|file_group| { + file_group + .iter() + .map(|file| { + partitioned_files + .iter() + .find_map(|f| { + if f.object_meta == file.object_meta { + Some( + f.object_meta + .location + .as_ref() + .rsplit('/') + .next() + .unwrap() + .trim_end_matches(".parquet"), + ) + } else { + None + } + }) + .unwrap() + }) + .collect::>() + }) + .collect::>() + }) + .map_err(|e| e.to_string().leak() as &'static str); + + assert_eq!(results_by_name, case.expected_result, "{}", case.name); + } + + return Ok(()); + + impl From for PartitionedFile { + fn from(file: File) -> Self { + PartitionedFile { + object_meta: ObjectMeta { + location: Path::from(format!( + "data/date={}/{}.parquet", + file.date, file.name + )), + last_modified: chrono::Utc.timestamp_nanos(0), + size: 0, + e_tag: None, + version: None, + }, + partition_values: vec![ScalarValue::from(file.date)], + range: None, + statistics: Some(Statistics { + num_rows: Precision::Absent, + total_byte_size: Precision::Absent, + column_statistics: file + .statistics + .into_iter() + .map(|stats| { + stats + .map(|(min, max)| ColumnStatistics { + min_value: Precision::Exact(ScalarValue::from( + min, + )), + max_value: Precision::Exact(ScalarValue::from( + max, + )), + ..Default::default() + }) + .unwrap_or_default() + }) + .collect::>(), + }), + extensions: None, + } + } + } + } + // sets default for configs that play no role in projections fn config_for_projection( file_schema: SchemaRef, diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index ddb8d032f3d8..c450774572db 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -26,6 +26,7 @@ mod file_stream; mod json; #[cfg(feature = "parquet")] pub mod parquet; +mod statistics; pub(crate) use self::csv::plan_to_csv; pub(crate) use self::json::plan_to_json; @@ -451,11 +452,6 @@ fn get_projected_output_ordering( ) -> Vec> { let mut all_orderings = vec![]; for output_ordering in &base_config.output_ordering { - if base_config.file_groups.iter().any(|group| group.len() > 1) { - debug!("Skipping specified output ordering {:?}. Some file group had more than one file: {:?}", - base_config.output_ordering[0], base_config.file_groups); - return vec![]; - } let mut new_ordering = vec![]; for PhysicalSortExpr { expr, options } in output_ordering { if let Some(col) = expr.as_any().downcast_ref::() { @@ -473,11 +469,45 @@ fn get_projected_output_ordering( // since rest of the orderings are violated break; } + // do not push empty entries // otherwise we may have `Some(vec![])` at the output ordering. - if !new_ordering.is_empty() { - all_orderings.push(new_ordering); + if new_ordering.is_empty() { + continue; + } + + // Check if any file groups are not sorted + if base_config.file_groups.iter().any(|group| { + if group.len() <= 1 { + // File groups with <= 1 files are always sorted + return false; + } + + let statistics = match statistics::MinMaxStatistics::new_from_files( + &new_ordering, + projected_schema, + base_config.projection.as_deref(), + group, + ) { + Ok(statistics) => statistics, + Err(e) => { + log::trace!("Error fetching statistics for file group: {e}"); + // we can't prove that it's ordered, so we have to reject it + return true; + } + }; + + !statistics.is_sorted() + }) { + debug!( + "Skipping specified output ordering {:?}. \ + Some file groups couldn't be determined to be sorted: {:?}", + base_config.output_ordering[0], base_config.file_groups + ); + continue; } + + all_orderings.push(new_ordering); } all_orderings } @@ -861,6 +891,7 @@ mod tests { object_meta, partition_values: vec![], range: None, + statistics: None, extensions: None, } } diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 73fb82980fc4..b286b0f746e2 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -1538,6 +1538,7 @@ mod tests { object_meta: meta.clone(), partition_values: vec![], range: Some(FileRange { start, end }), + statistics: None, extensions: None, } } @@ -1639,6 +1640,7 @@ mod tests { ), ], range: None, + statistics: None, extensions: None, }; @@ -1733,6 +1735,7 @@ mod tests { }, partition_values: vec![], range: None, + statistics: None, extensions: None, }; diff --git a/datafusion/core/src/datasource/physical_plan/statistics.rs b/datafusion/core/src/datasource/physical_plan/statistics.rs new file mode 100644 index 000000000000..e1c61ec1a712 --- /dev/null +++ b/datafusion/core/src/datasource/physical_plan/statistics.rs @@ -0,0 +1,290 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/*! + * + * Use statistics to optimize physical planning. + * + * Currently, this module houses code to sort file groups if they are non-overlapping with + * respect to the required sort order. See [`MinMaxStatistics`] + * +*/ + +use std::sync::Arc; + +use arrow::{ + compute::SortColumn, + row::{Row, Rows}, +}; +use arrow_array::RecordBatch; +use arrow_schema::SchemaRef; +use datafusion_common::{DataFusionError, Result}; +use datafusion_physical_expr::{expressions::Column, PhysicalSortExpr}; + +use crate::datasource::listing::PartitionedFile; + +/// A normalized representation of file min/max statistics that allows for efficient sorting & comparison. +/// The min/max values are ordered by [`Self::sort_order`]. +/// Furthermore, any columns that are reversed in the sort order have their min/max values swapped. +pub(crate) struct MinMaxStatistics { + min_by_sort_order: Rows, + max_by_sort_order: Rows, + sort_order: Vec, +} + +impl MinMaxStatistics { + /// Sort order used to sort the statistics + #[allow(unused)] + pub fn sort_order(&self) -> &[PhysicalSortExpr] { + &self.sort_order + } + + /// Min value at index + #[allow(unused)] + pub fn min(&self, idx: usize) -> Row { + self.min_by_sort_order.row(idx) + } + + /// Max value at index + pub fn max(&self, idx: usize) -> Row { + self.max_by_sort_order.row(idx) + } + + pub fn new_from_files<'a>( + projected_sort_order: &[PhysicalSortExpr], // Sort order with respect to projected schema + projected_schema: &SchemaRef, // Projected schema + projection: Option<&[usize]>, // Indices of projection in full table schema (None = all columns) + files: impl IntoIterator, + ) -> Result { + use datafusion_common::ScalarValue; + + let statistics_and_partition_values = files + .into_iter() + .map(|file| { + file.statistics + .as_ref() + .zip(Some(file.partition_values.as_slice())) + }) + .collect::>>() + .ok_or_else(|| { + DataFusionError::Plan("Parquet file missing statistics".to_string()) + })?; + + // Helper function to get min/max statistics for a given column of projected_schema + let get_min_max = |i: usize| -> Result<(Vec, Vec)> { + Ok(statistics_and_partition_values + .iter() + .map(|(s, pv)| { + if i < s.column_statistics.len() { + s.column_statistics[i] + .min_value + .get_value() + .cloned() + .zip(s.column_statistics[i].max_value.get_value().cloned()) + .ok_or_else(|| { + DataFusionError::Plan("statistics not found".to_string()) + }) + } else { + let partition_value = &pv[i - s.column_statistics.len()]; + Ok((partition_value.clone(), partition_value.clone())) + } + }) + .collect::>>()? + .into_iter() + .unzip()) + }; + + let sort_columns = sort_columns_from_physical_sort_exprs(projected_sort_order) + .ok_or(DataFusionError::Plan( + "sort expression must be on column".to_string(), + ))?; + + // Project the schema & sort order down to just the relevant columns + let min_max_schema = Arc::new( + projected_schema + .project(&(sort_columns.iter().map(|c| c.index()).collect::>()))?, + ); + let min_max_sort_order = sort_columns + .iter() + .zip(projected_sort_order.iter()) + .enumerate() + .map(|(i, (col, sort))| PhysicalSortExpr { + expr: Arc::new(Column::new(col.name(), i)), + options: sort.options, + }) + .collect::>(); + + let (min_values, max_values): (Vec<_>, Vec<_>) = sort_columns + .iter() + .map(|c| { + // Reverse the projection to get the index of the column in the full statistics + // The file statistics contains _every_ column , but the sort column's index() + // refers to the index in projected_schema + let i = projection.map(|p| p[c.index()]).unwrap_or(c.index()); + + let (min, max) = get_min_max(i).map_err(|e| { + e.context(format!("get min/max for column: '{}'", c.name())) + })?; + Ok(( + ScalarValue::iter_to_array(min)?, + ScalarValue::iter_to_array(max)?, + )) + }) + .collect::>>() + .map_err(|e| e.context("collect min/max values"))? + .into_iter() + .unzip(); + + Self::new( + &min_max_sort_order, + &min_max_schema, + RecordBatch::try_new(Arc::clone(&min_max_schema), min_values).map_err( + |e| { + DataFusionError::ArrowError(e, Some("\ncreate min batch".to_string())) + }, + )?, + RecordBatch::try_new(Arc::clone(&min_max_schema), max_values).map_err( + |e| { + DataFusionError::ArrowError(e, Some("\ncreate max batch".to_string())) + }, + )?, + ) + } + + pub fn new( + sort_order: &[PhysicalSortExpr], + schema: &SchemaRef, + min_values: RecordBatch, + max_values: RecordBatch, + ) -> Result { + use arrow::row::*; + + let sort_fields = sort_order + .iter() + .map(|expr| { + expr.expr + .data_type(schema) + .map(|data_type| SortField::new_with_options(data_type, expr.options)) + }) + .collect::>>() + .map_err(|e| e.context("create sort fields"))?; + let converter = RowConverter::new(sort_fields)?; + + let sort_columns = sort_columns_from_physical_sort_exprs(sort_order).ok_or( + DataFusionError::Plan("sort expression must be on column".to_string()), + )?; + + // swap min/max if they're reversed in the ordering + let (new_min_cols, new_max_cols): (Vec<_>, Vec<_>) = sort_order + .iter() + .zip(sort_columns.iter().copied()) + .map(|(sort_expr, column)| { + if sort_expr.options.descending { + max_values + .column_by_name(column.name()) + .zip(min_values.column_by_name(column.name())) + } else { + min_values + .column_by_name(column.name()) + .zip(max_values.column_by_name(column.name())) + } + .ok_or_else(|| { + DataFusionError::Plan(format!( + "missing column in MinMaxStatistics::new: '{}'", + column.name() + )) + }) + }) + .collect::>>()? + .into_iter() + .unzip(); + + let [min, max] = [new_min_cols, new_max_cols].map(|cols| { + let values = RecordBatch::try_new( + min_values.schema(), + cols.into_iter().cloned().collect(), + )?; + let sorting_columns = sort_order + .iter() + .zip(sort_columns.iter().copied()) + .map(|(sort_expr, column)| { + let schema = values.schema(); + + let idx = schema.index_of(column.name())?; + let field = schema.field(idx); + + // check that sort columns are non-nullable + if field.is_nullable() { + return Err(DataFusionError::Plan( + "cannot sort by nullable column".to_string(), + )); + } + + Ok(SortColumn { + values: Arc::clone(values.column(idx)), + options: Some(sort_expr.options), + }) + }) + .collect::>>() + .map_err(|e| e.context("create sorting columns"))?; + converter + .convert_columns( + &sorting_columns + .into_iter() + .map(|c| c.values) + .collect::>(), + ) + .map_err(|e| { + DataFusionError::ArrowError(e, Some("convert columns".to_string())) + }) + }); + + Ok(Self { + min_by_sort_order: min.map_err(|e| e.context("build min rows"))?, + max_by_sort_order: max.map_err(|e| e.context("build max rows"))?, + sort_order: sort_order.to_vec(), + }) + } + + /// Return a sorted list of the min statistics together with the original indices + pub fn min_values_sorted(&self) -> Vec<(usize, Row<'_>)> { + let mut sort: Vec<_> = self.min_by_sort_order.iter().enumerate().collect(); + sort.sort_unstable_by(|(_, a), (_, b)| a.cmp(b)); + sort + } + + /// Check if the min/max statistics are in order and non-overlapping + pub fn is_sorted(&self) -> bool { + self.max_by_sort_order + .iter() + .zip(self.min_by_sort_order.iter().skip(1)) + .all(|(max, next_min)| max < next_min) + } +} + +fn sort_columns_from_physical_sort_exprs( + sort_order: &[PhysicalSortExpr], +) -> Option> { + sort_order + .iter() + .map(|expr| { + expr.expr + .as_any() + .downcast_ref::() + }) + .collect::>>() +} diff --git a/datafusion/core/src/test_util/parquet.rs b/datafusion/core/src/test_util/parquet.rs index 8113d799a184..f949058769ef 100644 --- a/datafusion/core/src/test_util/parquet.rs +++ b/datafusion/core/src/test_util/parquet.rs @@ -151,6 +151,7 @@ impl TestParquetFile { object_meta: self.object_meta.clone(), partition_values: vec![], range: None, + statistics: None, extensions: None, }]], statistics: Statistics::new_unknown(&self.schema), diff --git a/datafusion/core/tests/parquet/custom_reader.rs b/datafusion/core/tests/parquet/custom_reader.rs index 4bacc80579ed..e4f4d229c416 100644 --- a/datafusion/core/tests/parquet/custom_reader.rs +++ b/datafusion/core/tests/parquet/custom_reader.rs @@ -69,6 +69,7 @@ async fn route_data_access_ops_to_parquet_file_reader_factory() { object_meta: meta, partition_values: vec![], range: None, + statistics: None, extensions: Some(Arc::new(String::from(EXPECTED_USER_DEFINED_METADATA))), }) .collect(); diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs index ccaa65b7ee5f..8f42f21834cc 100644 --- a/datafusion/core/tests/parquet/page_pruning.rs +++ b/datafusion/core/tests/parquet/page_pruning.rs @@ -62,6 +62,7 @@ async fn get_parquet_exec(state: &SessionState, filter: Expr) -> ParquetExec { object_meta: meta, partition_values: vec![], range: None, + statistics: None, extensions: None, }; diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 5bffdc3af774..ffebbd4ff3e5 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -1690,6 +1690,7 @@ message PartitionedFile { uint64 last_modified_ns = 3; repeated ScalarValue partition_values = 4; FileRange range = 5; + Statistics statistics = 6; } message FileRange { diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 0fb6f4623745..e28ac6b835ca 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -17451,6 +17451,9 @@ impl serde::Serialize for PartitionedFile { if self.range.is_some() { len += 1; } + if self.statistics.is_some() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion.PartitionedFile", len)?; if !self.path.is_empty() { struct_ser.serialize_field("path", &self.path)?; @@ -17469,6 +17472,9 @@ impl serde::Serialize for PartitionedFile { if let Some(v) = self.range.as_ref() { struct_ser.serialize_field("range", v)?; } + if let Some(v) = self.statistics.as_ref() { + struct_ser.serialize_field("statistics", v)?; + } struct_ser.end() } } @@ -17486,6 +17492,7 @@ impl<'de> serde::Deserialize<'de> for PartitionedFile { "partition_values", "partitionValues", "range", + "statistics", ]; #[allow(clippy::enum_variant_names)] @@ -17495,6 +17502,7 @@ impl<'de> serde::Deserialize<'de> for PartitionedFile { LastModifiedNs, PartitionValues, Range, + Statistics, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -17521,6 +17529,7 @@ impl<'de> serde::Deserialize<'de> for PartitionedFile { "lastModifiedNs" | "last_modified_ns" => Ok(GeneratedField::LastModifiedNs), "partitionValues" | "partition_values" => Ok(GeneratedField::PartitionValues), "range" => Ok(GeneratedField::Range), + "statistics" => Ok(GeneratedField::Statistics), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -17545,6 +17554,7 @@ impl<'de> serde::Deserialize<'de> for PartitionedFile { let mut last_modified_ns__ = None; let mut partition_values__ = None; let mut range__ = None; + let mut statistics__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::Path => { @@ -17581,6 +17591,12 @@ impl<'de> serde::Deserialize<'de> for PartitionedFile { } range__ = map_.next_value()?; } + GeneratedField::Statistics => { + if statistics__.is_some() { + return Err(serde::de::Error::duplicate_field("statistics")); + } + statistics__ = map_.next_value()?; + } } } Ok(PartitionedFile { @@ -17589,6 +17605,7 @@ impl<'de> serde::Deserialize<'de> for PartitionedFile { last_modified_ns: last_modified_ns__.unwrap_or_default(), partition_values: partition_values__.unwrap_or_default(), range: range__, + statistics: statistics__, }) } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index d0210eb7cfd3..75339e68a8fe 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -2691,6 +2691,8 @@ pub struct PartitionedFile { pub partition_values: ::prost::alloc::vec::Vec, #[prost(message, optional, tag = "5")] pub range: ::core::option::Option, + #[prost(message, optional, tag = "6")] + pub statistics: ::core::option::Option, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 6184332ea581..bca317e6d213 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -537,6 +537,7 @@ impl TryFrom<&protobuf::PartitionedFile> for PartitionedFile { .map(|v| v.try_into()) .collect::, _>>()?, range: val.range.as_ref().map(|v| v.try_into()).transpose()?, + statistics: val.statistics.as_ref().map(|v| v.try_into()).transpose()?, extensions: None, }) } diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index aa6121bebc34..205a60df6e9e 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -617,6 +617,7 @@ impl TryFrom<&PartitionedFile> for protobuf::PartitionedFile { .map(|v| v.try_into()) .collect::, _>>()?, range: pf.range.as_ref().map(|r| r.try_into()).transpose()?, + statistics: pf.statistics.as_ref().map(|s| s.into()), }) } } diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 8f4b1a3816a3..4fb3d03fe57f 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -200,6 +200,7 @@ datafusion.execution.planning_concurrency 13 datafusion.execution.soft_max_rows_per_output_file 50000000 datafusion.execution.sort_in_place_threshold_bytes 1048576 datafusion.execution.sort_spill_reservation_bytes 10485760 +datafusion.execution.split_file_groups_by_statistics false datafusion.execution.target_partitions 7 datafusion.execution.time_zone +00:00 datafusion.explain.logical_plan_only false @@ -278,6 +279,7 @@ datafusion.execution.planning_concurrency 13 Fan-out during initial physical pla datafusion.execution.soft_max_rows_per_output_file 50000000 Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max datafusion.execution.sort_in_place_threshold_bytes 1048576 When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. datafusion.execution.sort_spill_reservation_bytes 10485760 Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). +datafusion.execution.split_file_groups_by_statistics false Attempt to eliminate sorts by packing & sorting files with non-overlapping statistics into the same file groups. Currently experimental datafusion.execution.target_partitions 7 Number of partitions for query execution. Increasing partitions can increase concurrency. Defaults to the number of CPU cores on the system datafusion.execution.time_zone +00:00 The default time zone Some functions, e.g. `EXTRACT(HOUR from SOME_TIME)`, shift the underlying datetime according to this time zone, and then extract the hour datafusion.explain.logical_plan_only false When set to true, the explain statement will only print logical plans diff --git a/datafusion/sqllogictest/test_files/parquet_sorted_statistics.slt b/datafusion/sqllogictest/test_files/parquet_sorted_statistics.slt new file mode 100644 index 000000000000..f7a81f08456f --- /dev/null +++ b/datafusion/sqllogictest/test_files/parquet_sorted_statistics.slt @@ -0,0 +1,262 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# TESTS FOR SORTED PARQUET FILES + +# Set 2 partitions for deterministic output plans +statement ok +set datafusion.execution.target_partitions = 2; + +# Collect statistics -- used for sorting files +statement ok +set datafusion.execution.collect_statistics = true; + +# Enable split_file_groups_by_statistics since it's currently disabled by default +statement ok +set datafusion.execution.split_file_groups_by_statistics = true; + +# Create a table as a data source +statement ok +CREATE TABLE src_table ( + int_col INT, + descending_col INT, + string_col TEXT, + bigint_col BIGINT, + date_col DATE, + overlapping_col INT, + constant_col INT +) AS VALUES +-- first file +(1, 3, 'aaa', 100, 1, 0, 0), +(2, 2, 'bbb', 200, 2, 1, 0), +(3, 1, 'ccc', 300, 3, 2, 0), +-- second file +(4, 6, 'ddd', 400, 4, 0, 0), +(5, 5, 'eee', 500, 5, 1, 0), +(6, 4, 'fff', 600, 6, 2, 0), +-- third file +(7, 9, 'ggg', 700, 7, 3, 0), +(8, 8, 'hhh', 800, 8, 4, 0), +(9, 7, 'iii', 900, 9, 5, 0); + +# Setup 3 files, in particular more files than there are partitions + +# File 1: +query IITIDII +COPY (SELECT * FROM src_table ORDER BY int_col LIMIT 3) +TO 'test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet' +STORED AS PARQUET; +---- +3 + +# File 2: +query IITIDII +COPY (SELECT * FROM src_table WHERE int_col > 3 ORDER BY int_col LIMIT 3) +TO 'test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet' +STORED AS PARQUET; +---- +3 + +# Add another file to the directory underlying test_table +query IITIDII +COPY (SELECT * FROM src_table WHERE int_col > 6 ORDER BY int_col LIMIT 3) +TO 'test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet' +STORED AS PARQUET; +---- +3 + + +# Create a table from generated parquet files: +statement ok +CREATE EXTERNAL TABLE test_table ( + partition_col TEXT NOT NULL, + int_col INT NOT NULL, + descending_col INT NOT NULL, + string_col TEXT NOT NULL, + bigint_col BIGINT NOT NULL, + date_col DATE NOT NULL, + overlapping_col INT NOT NULL, + constant_col INT NOT NULL +) +STORED AS PARQUET +PARTITIONED BY (partition_col) +WITH ORDER (int_col ASC NULLS LAST, bigint_col ASC NULLS LAST) +LOCATION 'test_files/scratch/parquet_sorted_statistics/test_table'; + +# Order by numeric columns +# This is to exercise file group sorting, which uses file-level statistics +# DataFusion doesn't currently support string column statistics +# This should not require a sort. +query TT +EXPLAIN SELECT int_col, bigint_col +FROM test_table +ORDER BY int_col, bigint_col; +---- +logical_plan +01)Sort: test_table.int_col ASC NULLS LAST, test_table.bigint_col ASC NULLS LAST +02)--TableScan: test_table projection=[int_col, bigint_col] +physical_plan ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet]]}, projection=[int_col, bigint_col], output_ordering=[int_col@0 ASC NULLS LAST, bigint_col@1 ASC NULLS LAST] + +# Another planning test, but project on a column with unsupported statistics +# We should be able to ignore this and look at only the relevant statistics +query TT +EXPLAIN SELECT string_col +FROM test_table +ORDER BY int_col, bigint_col; +---- +logical_plan +01)Projection: test_table.string_col +02)--Sort: test_table.int_col ASC NULLS LAST, test_table.bigint_col ASC NULLS LAST +03)----Projection: test_table.string_col, test_table.int_col, test_table.bigint_col +04)------TableScan: test_table projection=[int_col, string_col, bigint_col] +physical_plan +01)ProjectionExec: expr=[string_col@1 as string_col] +02)--ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet]]}, projection=[int_col, string_col, bigint_col], output_ordering=[int_col@0 ASC NULLS LAST, bigint_col@2 ASC NULLS LAST] + +# Clean up & recreate but sort on descending column +statement ok +DROP TABLE test_table; + +statement ok +CREATE EXTERNAL TABLE test_table ( + partition_col TEXT NOT NULL, + int_col INT NOT NULL, + descending_col INT NOT NULL, + string_col TEXT NOT NULL, + bigint_col BIGINT NOT NULL, + date_col DATE NOT NULL, + overlapping_col INT NOT NULL, + constant_col INT NOT NULL +) +STORED AS PARQUET +PARTITIONED BY (partition_col) +WITH ORDER (descending_col DESC NULLS LAST, bigint_col ASC NULLS LAST) +LOCATION 'test_files/scratch/parquet_sorted_statistics/test_table'; + +# Query order by descending_col +# This should order the files like [C, B, A] +query TT +EXPLAIN SELECT descending_col, bigint_col +FROM test_table +ORDER BY descending_col DESC NULLS LAST, bigint_col ASC NULLS LAST; +---- +logical_plan +01)Sort: test_table.descending_col DESC NULLS LAST, test_table.bigint_col ASC NULLS LAST +02)--TableScan: test_table projection=[descending_col, bigint_col] +physical_plan ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet]]}, projection=[descending_col, bigint_col], output_ordering=[descending_col@0 DESC NULLS LAST, bigint_col@1 ASC NULLS LAST] + +# Clean up & re-create with partition columns in sort order +statement ok +DROP TABLE test_table; + +statement ok +CREATE EXTERNAL TABLE test_table ( + partition_col TEXT NOT NULL, + int_col INT NOT NULL, + descending_col INT NOT NULL, + string_col TEXT NOT NULL, + bigint_col BIGINT NOT NULL, + date_col DATE NOT NULL, + overlapping_col INT NOT NULL, + constant_col INT NOT NULL +) +STORED AS PARQUET +PARTITIONED BY (partition_col) +WITH ORDER (partition_col ASC NULLS LAST, int_col ASC NULLS LAST, bigint_col ASC NULLS LAST) +LOCATION 'test_files/scratch/parquet_sorted_statistics/test_table'; + +# Order with partition column first +# In particular, the partition column is a string +# Even though statistics for string columns are not supported, +# string partition columns are common and we do support sorting file groups on them +query TT +EXPLAIN SELECT int_col, bigint_col, partition_col +FROM test_table +ORDER BY partition_col, int_col, bigint_col; +---- +logical_plan +01)Sort: test_table.partition_col ASC NULLS LAST, test_table.int_col ASC NULLS LAST, test_table.bigint_col ASC NULLS LAST +02)--TableScan: test_table projection=[int_col, bigint_col, partition_col] +physical_plan ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet]]}, projection=[int_col, bigint_col, partition_col], output_ordering=[partition_col@2 ASC NULLS LAST, int_col@0 ASC NULLS LAST, bigint_col@1 ASC NULLS LAST] + +# Clean up & re-create with overlapping column in sort order +# This will test the ability to sort files with overlapping statistics +statement ok +DROP TABLE test_table; + +statement ok +CREATE EXTERNAL TABLE test_table ( + partition_col TEXT NOT NULL, + int_col INT NOT NULL, + descending_col INT NOT NULL, + string_col TEXT NOT NULL, + bigint_col BIGINT NOT NULL, + date_col DATE NOT NULL, + overlapping_col INT NOT NULL, + constant_col INT NOT NULL +) +STORED AS PARQUET +PARTITIONED BY (partition_col) +WITH ORDER (overlapping_col ASC NULLS LAST) +LOCATION 'test_files/scratch/parquet_sorted_statistics/test_table'; + +query TT +EXPLAIN SELECT int_col, bigint_col, overlapping_col +FROM test_table +ORDER BY overlapping_col; +---- +logical_plan +01)Sort: test_table.overlapping_col ASC NULLS LAST +02)--TableScan: test_table projection=[int_col, bigint_col, overlapping_col] +physical_plan +01)SortPreservingMergeExec: [overlapping_col@2 ASC NULLS LAST] +02)--ParquetExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet]]}, projection=[int_col, bigint_col, overlapping_col], output_ordering=[overlapping_col@2 ASC NULLS LAST] + +# Clean up & re-create with constant column in sort order +# This will require a sort because the # of required file groups (3) +# exceeds the # of target partitions (2) +statement ok +DROP TABLE test_table; + +statement ok +CREATE EXTERNAL TABLE test_table ( + partition_col TEXT NOT NULL, + int_col INT NOT NULL, + descending_col INT NOT NULL, + string_col TEXT NOT NULL, + bigint_col BIGINT NOT NULL, + date_col DATE NOT NULL, + overlapping_col INT NOT NULL, + constant_col INT NOT NULL +) +STORED AS PARQUET +PARTITIONED BY (partition_col) +WITH ORDER (constant_col ASC NULLS LAST) +LOCATION 'test_files/scratch/parquet_sorted_statistics/test_table'; + +query TT +EXPLAIN SELECT constant_col +FROM test_table +ORDER BY constant_col; +---- +logical_plan +01)Sort: test_table.constant_col ASC NULLS LAST +02)--TableScan: test_table projection=[constant_col] +physical_plan +01)SortPreservingMergeExec: [constant_col@0 ASC NULLS LAST] +02)--SortExec: expr=[constant_col@0 ASC NULLS LAST], preserve_partitioning=[true] +03)----ParquetExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet]]}, projection=[constant_col] diff --git a/datafusion/substrait/src/physical_plan/consumer.rs b/datafusion/substrait/src/physical_plan/consumer.rs index 11ddb91ad391..50b08e7793f0 100644 --- a/datafusion/substrait/src/physical_plan/consumer.rs +++ b/datafusion/substrait/src/physical_plan/consumer.rs @@ -93,6 +93,7 @@ pub async fn from_substrait_rel( }, partition_values: vec![], range: None, + statistics: None, extensions: None, }; diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 3ee3778177c4..822a2891894d 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -84,6 +84,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.max_buffered_batches_per_output_file | 2 | This is the maximum number of RecordBatches buffered for each output file being worked. Higher values can potentially give faster write performance at the cost of higher peak memory consumption | | datafusion.execution.listing_table_ignore_subdirectory | true | Should sub directories be ignored when scanning directories for data files. Defaults to true (ignores subdirectories), consistent with Hive. Note that this setting does not affect reading partitioned tables (e.g. `/table/year=2021/month=01/data.parquet`). | | datafusion.execution.enable_recursive_ctes | true | Should DataFusion support recursive CTEs | +| datafusion.execution.split_file_groups_by_statistics | false | Attempt to eliminate sorts by packing & sorting files with non-overlapping statistics into the same file groups. Currently experimental | | datafusion.optimizer.enable_distinct_aggregation_soft_limit | true | When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. | | datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores | | datafusion.optimizer.enable_topk_aggregation | true | When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible |