diff --git a/rust/arrow/src/array/null.rs b/rust/arrow/src/array/null.rs index 08c7cf1f21ef8..ed3a0ad7d6852 100644 --- a/rust/arrow/src/array/null.rs +++ b/rust/arrow/src/array/null.rs @@ -52,6 +52,12 @@ impl NullArray { let array_data = ArrayData::builder(DataType::Null).len(length).build(); NullArray::from(array_data) } + + /// Create a new null array of the specified length and type + pub fn new_with_type(length: usize, data_type: DataType) -> Self { + let array_data = ArrayData::builder(data_type).len(length).build(); + NullArray::from(array_data) + } } impl Array for NullArray { @@ -147,6 +153,15 @@ mod tests { assert_eq!(array2.offset(), 8); } + #[test] + fn test_null_array_new_with_type() { + let length = 10; + let data_type = DataType::Int8; + let array = NullArray::new_with_type(length, data_type.clone()); + assert_eq!(array.len(), length); + assert_eq!(array.data_type(), &data_type); + } + #[test] fn test_debug_null_array() { let array = NullArray::new(1024 * 1024); diff --git a/rust/datafusion/src/datasource/parquet.rs b/rust/datafusion/src/datasource/parquet.rs index 3e60d03eb288e..8630306170c3e 100644 --- a/rust/datafusion/src/datasource/parquet.rs +++ b/rust/datafusion/src/datasource/parquet.rs @@ -30,6 +30,8 @@ use crate::logical_plan::Expr; use crate::physical_plan::parquet::ParquetExec; use crate::physical_plan::ExecutionPlan; +use super::datasource::TableProviderFilterPushDown; + /// Table-based representation of a `ParquetFile`. pub struct ParquetTable { path: String, @@ -41,7 +43,7 @@ pub struct ParquetTable { impl ParquetTable { /// Attempt to initialize a new `ParquetTable` from a file path. pub fn try_new(path: &str, max_concurrency: usize) -> Result { - let parquet_exec = ParquetExec::try_from_path(path, None, 0, 1)?; + let parquet_exec = ParquetExec::try_from_path(path, None, None, 0, 1)?; let schema = parquet_exec.schema(); Ok(Self { path: path.to_string(), @@ -67,17 +69,26 @@ impl TableProvider for ParquetTable { self.schema.clone() } + fn supports_filter_pushdown( + &self, + _filter: &Expr, + ) -> Result { + Ok(TableProviderFilterPushDown::Inexact) + } + /// Scan the file(s), using the provided projection, and return one BatchIterator per /// partition. fn scan( &self, projection: &Option>, batch_size: usize, - _filters: &[Expr], + filters: &[Expr], ) -> Result> { + let predicate = combine_filters(filters); Ok(Arc::new(ParquetExec::try_from_path( &self.path, projection.clone(), + predicate, batch_size, self.max_concurrency, )?)) @@ -88,6 +99,22 @@ impl TableProvider for ParquetTable { } } +/// Combines an array of filter expressions into a single filter expression +/// consisting of the input filter expressions joined with logical AND. +/// Returns None if the filters array is empty. +fn combine_filters(filters: &[Expr]) -> Option { + if filters.is_empty() { + return None; + } + let combined_filter = filters + .iter() + .skip(1) + .fold(filters[0].clone(), |acc, filter| { + crate::logical_plan::and(acc, filter.clone()) + }); + Some(combined_filter) +} + #[cfg(test)] mod tests { use super::*; @@ -333,4 +360,29 @@ mod tests { .expect("should have received at least one batch") .map_err(|e| e.into()) } + + #[test] + fn combine_zero_filters() { + let result = combine_filters(&[]); + assert_eq!(result, None); + } + + #[test] + fn combine_one_filter() { + use crate::logical_plan::{binary_expr, col, lit, Operator}; + let filter = binary_expr(col("c1"), Operator::Lt, lit(1)); + let result = combine_filters(&[filter.clone()]); + assert_eq!(result, Some(filter)); + } + + #[test] + fn combine_multiple_filters() { + use crate::logical_plan::{and, binary_expr, col, lit, Operator}; + let filter1 = binary_expr(col("c1"), Operator::Lt, lit(1)); + let filter2 = binary_expr(col("c2"), Operator::Lt, lit(2)); + let filter3 = binary_expr(col("c3"), Operator::Lt, lit(3)); + let result = + combine_filters(&[filter1.clone(), filter2.clone(), filter3.clone()]); + assert_eq!(result, Some(and(and(filter1, filter2), filter3))); + } } diff --git a/rust/datafusion/src/logical_plan/operators.rs b/rust/datafusion/src/logical_plan/operators.rs index b59462f73df34..dac48d40b4809 100644 --- a/rust/datafusion/src/logical_plan/operators.rs +++ b/rust/datafusion/src/logical_plan/operators.rs @@ -20,7 +20,7 @@ use std::{fmt, ops}; use super::{binary_expr, Expr}; /// Operators applied to expressions -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum Operator { /// Expressions are equal Eq, diff --git a/rust/datafusion/src/optimizer/utils.rs b/rust/datafusion/src/optimizer/utils.rs index 75661c6f723a9..b9e67a43d7e38 100644 --- a/rust/datafusion/src/optimizer/utils.rs +++ b/rust/datafusion/src/optimizer/utils.rs @@ -332,7 +332,7 @@ pub fn rewrite_expression(expr: &Expr, expressions: &Vec) -> Result match expr { Expr::BinaryExpr { op, .. } => Ok(Expr::BinaryExpr { left: Box::new(expressions[0].clone()), - op: op.clone(), + op: *op, right: Box::new(expressions[1].clone()), }), Expr::IsNull(_) => Ok(Expr::IsNull(Box::new(expressions[0].clone()))), diff --git a/rust/datafusion/src/physical_plan/parquet.rs b/rust/datafusion/src/physical_plan/parquet.rs index a92a35992a42a..af821a54fc7df 100644 --- a/rust/datafusion/src/physical_plan/parquet.rs +++ b/rust/datafusion/src/physical_plan/parquet.rs @@ -17,20 +17,40 @@ //! Execution plan for reading Parquet files -use std::any::Any; use std::fmt; use std::fs::File; use std::sync::Arc; use std::task::{Context, Poll}; +use std::{ + any::Any, + collections::{HashMap, HashSet}, +}; -use super::{RecordBatchStream, SendableRecordBatchStream}; -use crate::error::{DataFusionError, Result}; +use super::{ + planner::DefaultPhysicalPlanner, ColumnarValue, PhysicalExpr, RecordBatchStream, + SendableRecordBatchStream, +}; use crate::physical_plan::ExecutionPlan; use crate::physical_plan::{common, Partitioning}; -use arrow::datatypes::{Schema, SchemaRef}; +use crate::{ + error::{DataFusionError, Result}, + execution::context::ExecutionContextState, + logical_plan::{Expr, Operator}, + optimizer::utils, + prelude::ExecutionConfig, +}; use arrow::error::{ArrowError, Result as ArrowResult}; use arrow::record_batch::RecordBatch; -use parquet::file::reader::SerializedFileReader; +use arrow::{ + array::{make_array, ArrayData, ArrayRef, BooleanArray, BooleanBufferBuilder}, + buffer::MutableBuffer, + datatypes::{DataType, Field, Schema, SchemaRef}, +}; +use parquet::file::{ + metadata::RowGroupMetaData, + reader::{FileReader, SerializedFileReader}, + statistics::Statistics as ParquetStatistics, +}; use crossbeam::channel::{bounded, Receiver, RecvError, Sender}; use fmt::Debug; @@ -54,6 +74,8 @@ pub struct ParquetExec { batch_size: usize, /// Statistics for the data set (sum of statistics for all partitions) statistics: Statistics, + /// Optional predicate builder + predicate_builder: Option, } /// Represents one partition of a Parquet data set and this currently means one Parquet file. @@ -79,6 +101,7 @@ impl ParquetExec { pub fn try_from_path( path: &str, projection: Option>, + predicate: Option, batch_size: usize, max_concurrency: usize, ) -> Result { @@ -96,7 +119,13 @@ impl ParquetExec { .iter() .map(|filename| filename.as_str()) .collect::>(); - Self::try_from_files(&filenames, projection, batch_size, max_concurrency) + Self::try_from_files( + &filenames, + projection, + predicate, + batch_size, + max_concurrency, + ) } } @@ -105,6 +134,7 @@ impl ParquetExec { pub fn try_from_files( filenames: &[&str], projection: Option>, + predicate: Option, batch_size: usize, max_concurrency: usize, ) -> Result { @@ -156,8 +186,17 @@ impl ParquetExec { ))); } let schema = schemas[0].clone(); + let predicate_builder = predicate.and_then(|predicate_expr| { + RowGroupPredicateBuilder::try_new(&predicate_expr, schema.clone()).ok() + }); - Ok(Self::new(partitions, schema, projection, batch_size)) + Ok(Self::new( + partitions, + schema, + projection, + predicate_builder, + batch_size, + )) } /// Create a new Parquet reader execution plan with provided partitions and schema @@ -165,6 +204,7 @@ impl ParquetExec { partitions: Vec, schema: Schema, projection: Option>, + predicate_builder: Option, batch_size: usize, ) -> Self { let projection = match projection { @@ -199,6 +239,7 @@ impl ParquetExec { partitions, schema: Arc::new(projected_schema), projection, + predicate_builder, batch_size, statistics, } @@ -237,6 +278,458 @@ impl ParquetPartition { } } +#[derive(Debug, Clone)] +/// Predicate builder used for generating of predicate functions, used to filter row group metadata +pub struct RowGroupPredicateBuilder { + parquet_schema: Schema, + predicate_expr: Arc, + stat_column_req: Vec<(String, StatisticsType, Field)>, +} + +impl RowGroupPredicateBuilder { + /// Try to create a new instance of PredicateExpressionBuilder. + /// This will translate the filter expression into a statistics predicate expression + /// (for example (column / 2) = 4 becomes (column_min / 2) <= 4 && 4 <= (column_max / 2)), + /// then convert it to a DataFusion PhysicalExpression and cache it for later use by build_row_group_predicate. + pub fn try_new(expr: &Expr, parquet_schema: Schema) -> Result { + // build predicate expression once + let mut stat_column_req = Vec::<(String, StatisticsType, Field)>::new(); + let logical_predicate_expr = + build_predicate_expression(expr, &parquet_schema, &mut stat_column_req)?; + // println!( + // "RowGroupPredicateBuilder::try_new, logical_predicate_expr: {:?}", + // logical_predicate_expr + // ); + // build physical predicate expression + let stat_fields = stat_column_req + .iter() + .map(|(_, _, f)| f.clone()) + .collect::>(); + let stat_schema = Schema::new(stat_fields); + let execution_context_state = ExecutionContextState { + datasources: HashMap::new(), + scalar_functions: HashMap::new(), + var_provider: HashMap::new(), + aggregate_functions: HashMap::new(), + config: ExecutionConfig::new(), + }; + let predicate_expr = DefaultPhysicalPlanner::default().create_physical_expr( + &logical_predicate_expr, + &stat_schema, + &execution_context_state, + )?; + // println!( + // "RowGroupPredicateBuilder::try_new, predicate_expr: {:?}", + // predicate_expr + // ); + Ok(Self { + parquet_schema, + predicate_expr, + stat_column_req, + }) + } + + /// Generate a predicate function used to filter row group metadata. + /// This function takes a list of all row groups as parameter, + /// so that DataFusion's physical expressions can be re-used by + /// generating a RecordBatch, containing statistics arrays, + /// on which the physical predicate expression is executed to generate a row group filter array. + /// The generated filter array is then used in the returned closure to filter row groups. + pub fn build_row_group_predicate( + &self, + row_group_metadata: &[RowGroupMetaData], + ) -> Box bool> { + // build statistics record batch + let predicate_result = build_statistics_record_batch( + row_group_metadata, + &self.parquet_schema, + &self.stat_column_req, + ) + .and_then(|statistics_batch| { + // execute predicate expression + self.predicate_expr.evaluate(&statistics_batch) + }) + .and_then(|v| match v { + ColumnarValue::Array(array) => Ok(array), + ColumnarValue::Scalar(_) => Err(DataFusionError::Plan( + "predicate expression didn't return an array".to_string(), + )), + }); + + let predicate_array = match predicate_result { + Ok(array) => array, + // row group filter array could not be built + // return a closure which will not filter out any row groups + _ => return Box::new(|_r, _i| true), + }; + + let predicate_array = predicate_array.as_any().downcast_ref::(); + match predicate_array { + // return row group predicate function + Some(array) => { + // when the result of the predicate expression for a row group is null / undefined, + // e.g. due to missing statistics, this row group can't be filtered out, + // so replace with true + let predicate_values = + array.iter().map(|x| x.unwrap_or(true)).collect::>(); + Box::new(move |_, i| predicate_values[i]) + } + // predicate result is not a BooleanArray + // return a closure which will not filter out any row groups + _ => Box::new(|_r, _i| true), + } + } +} + +/// Build a RecordBatch from a list of RowGroupMetadata structs, +/// creating arrays, one for each statistics column, +/// as requested in the stat_column_req parameter. +fn build_statistics_record_batch( + row_groups: &[RowGroupMetaData], + parquet_schema: &Schema, + stat_column_req: &Vec<(String, StatisticsType, Field)>, +) -> Result { + let mut fields = Vec::::new(); + let mut arrays = Vec::::new(); + for (column_name, statistics_type, stat_field) in stat_column_req { + if let Some((column_index, _)) = parquet_schema.column_with_name(column_name) { + let statistics = row_groups + .iter() + .map(|g| g.column(column_index).statistics()) + .collect::>(); + let array = build_statistics_array( + &statistics, + *statistics_type, + stat_field.data_type(), + ); + fields.push(stat_field.clone()); + arrays.push(array); + } + } + let schema = Arc::new(Schema::new(fields)); + RecordBatch::try_new(schema, arrays) + .map_err(|err| DataFusionError::Plan(err.to_string())) +} + +struct StatisticsExpressionBuilder<'a> { + column_name: String, + column_expr: &'a Expr, + scalar_expr: &'a Expr, + parquet_field: &'a Field, + stat_column_req: &'a mut Vec<(String, StatisticsType, Field)>, + reverse_operator: bool, +} + +impl<'a> StatisticsExpressionBuilder<'a> { + fn try_new( + left: &'a Expr, + right: &'a Expr, + parquet_schema: &'a Schema, + stat_column_req: &'a mut Vec<(String, StatisticsType, Field)>, + ) -> Result { + // find column name; input could be a more complicated expression + let mut left_columns = HashSet::::new(); + utils::expr_to_column_names(left, &mut left_columns)?; + let mut right_columns = HashSet::::new(); + utils::expr_to_column_names(right, &mut right_columns)?; + let (column_expr, scalar_expr, column_names, reverse_operator) = + match (left_columns.len(), right_columns.len()) { + (1, 0) => (left, right, left_columns, false), + (0, 1) => (right, left, right_columns, true), + _ => { + // if more than one column used in expression - not supported + return Err(DataFusionError::Plan( + "Multi-column expressions are not currently supported" + .to_string(), + )); + } + }; + let column_name = column_names.iter().next().unwrap().clone(); + let field = match parquet_schema.column_with_name(&column_name) { + Some((_, f)) => f, + _ => { + // field not found in parquet schema + return Err(DataFusionError::Plan( + "Field not found in parquet schema".to_string(), + )); + } + }; + + Ok(Self { + column_name, + column_expr, + scalar_expr, + parquet_field: field, + stat_column_req, + reverse_operator, + }) + } + + fn correct_operator(&self, op: Operator) -> Operator { + if !self.reverse_operator { + return op; + } + + match op { + Operator::Lt => Operator::Gt, + Operator::Gt => Operator::Lt, + Operator::LtEq => Operator::GtEq, + Operator::GtEq => Operator::LtEq, + _ => op, + } + } + + // fn column_expr(&self) -> &Expr { + // self.column_expr + // } + + fn scalar_expr(&self) -> &Expr { + self.scalar_expr + } + + // fn column_name(&self) -> &String { + // &self.column_name + // } + + fn is_stat_column_missing(&self, statistics_type: StatisticsType) -> bool { + self.stat_column_req + .iter() + .filter(|(c, t, _f)| c == &self.column_name && t == &statistics_type) + .count() + == 0 + } + + fn stat_column_expr( + &mut self, + stat_type: StatisticsType, + suffix: &str, + ) -> Result { + let stat_column_name = format!("{}_{}", self.column_name, suffix); + let stat_field = Field::new( + stat_column_name.as_str(), + self.parquet_field.data_type().clone(), + self.parquet_field.is_nullable(), + ); + if self.is_stat_column_missing(stat_type) { + // only add statistics column if not previously added + self.stat_column_req + .push((self.column_name.clone(), stat_type, stat_field)); + } + rewrite_column_expr( + self.column_expr, + self.column_name.as_str(), + stat_column_name.as_str(), + ) + } + + fn min_column_expr(&mut self) -> Result { + self.stat_column_expr(StatisticsType::Min, "min") + } + + fn max_column_expr(&mut self) -> Result { + self.stat_column_expr(StatisticsType::Max, "max") + } +} + +/// replaces a column with an old name with a new name in an expression +fn rewrite_column_expr( + expr: &Expr, + column_old_name: &str, + column_new_name: &str, +) -> Result { + let expressions = utils::expr_sub_expressions(&expr)?; + let expressions = expressions + .iter() + .map(|e| rewrite_column_expr(e, column_old_name, column_new_name)) + .collect::>>()?; + + if let Expr::Column(name) = expr { + if name == column_old_name { + return Ok(Expr::Column(column_new_name.to_string())); + } + } + utils::rewrite_expression(&expr, &expressions) +} + +/// Translate logical filter expression into parquet statistics predicate expression +fn build_predicate_expression( + expr: &Expr, + parquet_schema: &Schema, + stat_column_req: &mut Vec<(String, StatisticsType, Field)>, +) -> Result { + use crate::logical_plan; + // predicate expression can only be a binary expression + let (left, op, right) = match expr { + Expr::BinaryExpr { left, op, right } => (left, *op, right), + _ => { + // unsupported expression - replace with TRUE + // this can still be useful when multiple conditions are joined using AND + // such as: column > 10 AND TRUE + return Ok(logical_plan::lit(true)); + } + }; + + if op == Operator::And || op == Operator::Or { + let left_expr = + build_predicate_expression(left, parquet_schema, stat_column_req)?; + let right_expr = + build_predicate_expression(right, parquet_schema, stat_column_req)?; + return Ok(logical_plan::binary_expr(left_expr, op, right_expr)); + } + + let expr_builder = StatisticsExpressionBuilder::try_new( + left, + right, + parquet_schema, + stat_column_req, + ); + let mut expr_builder = match expr_builder { + Ok(builder) => builder, + // allow partial failure in predicate expression generation + // this can still produce a useful predicate when multiple conditions are joined using AND + Err(_) => { + return Ok(logical_plan::lit(true)); + } + }; + let corrected_op = expr_builder.correct_operator(op); + let statistics_expr = match corrected_op { + Operator::Eq => { + // column = literal => (min, max) = literal => min <= literal && literal <= max + // (column / 2) = 4 => (column_min / 2) <= 4 && 4 <= (column_max / 2) + let min_column_expr = expr_builder.min_column_expr()?; + let max_column_expr = expr_builder.max_column_expr()?; + min_column_expr + .lt_eq(expr_builder.scalar_expr().clone()) + .and(expr_builder.scalar_expr().lt_eq(max_column_expr)) + } + Operator::Gt => { + // column > literal => (min, max) > literal => max > literal + expr_builder + .max_column_expr()? + .gt(expr_builder.scalar_expr().clone()) + } + Operator::GtEq => { + // column >= literal => (min, max) >= literal => max >= literal + expr_builder + .max_column_expr()? + .gt_eq(expr_builder.scalar_expr().clone()) + } + Operator::Lt => { + // column < literal => (min, max) < literal => min < literal + expr_builder + .min_column_expr()? + .lt(expr_builder.scalar_expr().clone()) + } + Operator::LtEq => { + // column <= literal => (min, max) <= literal => min <= literal + expr_builder + .min_column_expr()? + .lt_eq(expr_builder.scalar_expr().clone()) + } + // other expressions are not supported + _ => logical_plan::lit(true), + }; + Ok(statistics_expr) +} + +#[derive(Debug, Copy, Clone, PartialEq)] +enum StatisticsType { + Min, + Max, +} + +fn build_null_array(data_type: &DataType, length: usize) -> ArrayRef { + Arc::new(arrow::array::NullArray::new_with_type( + length, + data_type.clone(), + )) +} + +fn build_statistics_array( + statistics: &[Option<&ParquetStatistics>], + statistics_type: StatisticsType, + data_type: &DataType, +) -> ArrayRef { + let statistics_count = statistics.len(); + let first_group_stats = statistics.iter().find(|s| s.is_some()); + let first_group_stats = if let Some(Some(statistics)) = first_group_stats { + // found first row group with statistics defined + statistics + } else { + // no row group has statistics defined + return build_null_array(data_type, statistics_count); + }; + + let (data_size, arrow_type) = match first_group_stats { + ParquetStatistics::Int32(_) => (std::mem::size_of::(), DataType::Int32), + ParquetStatistics::Int64(_) => (std::mem::size_of::(), DataType::Int64), + ParquetStatistics::Float(_) => (std::mem::size_of::(), DataType::Float32), + ParquetStatistics::Double(_) => (std::mem::size_of::(), DataType::Float64), + ParquetStatistics::ByteArray(_) if data_type == &DataType::Utf8 => { + (0, DataType::Utf8) + } + _ => { + // type of statistics not supported + return build_null_array(data_type, statistics_count); + } + }; + + let statistics = statistics.iter().map(|s| { + s.filter(|s| s.has_min_max_set()) + .map(|s| match statistics_type { + StatisticsType::Min => s.min_bytes(), + StatisticsType::Max => s.max_bytes(), + }) + }); + + if arrow_type == DataType::Utf8 { + let data_size = statistics + .clone() + .map(|x| x.map(|b| b.len()).unwrap_or(0)) + .sum(); + let mut builder = + arrow::array::StringBuilder::with_capacity(statistics_count, data_size); + let string_statistics = + statistics.map(|x| x.and_then(|bytes| std::str::from_utf8(bytes).ok())); + for maybe_string in string_statistics { + match maybe_string { + Some(string_value) => builder.append_value(string_value).unwrap(), + None => builder.append_null().unwrap(), + }; + } + return Arc::new(builder.finish()); + } + + let mut data_buffer = MutableBuffer::new(statistics_count * data_size); + let mut bitmap_builder = BooleanBufferBuilder::new(statistics_count); + let mut null_count = 0; + for s in statistics { + if let Some(stat_data) = s { + bitmap_builder.append(true); + data_buffer.extend_from_slice(stat_data); + } else { + bitmap_builder.append(false); + data_buffer.resize(data_buffer.len() + data_size); + null_count += 1; + } + } + + let mut builder = ArrayData::builder(arrow_type) + .len(statistics_count) + .add_buffer(data_buffer.into()); + if null_count > 0 { + builder = builder.null_bit_buffer(bitmap_builder.finish()); + } + let array_data = builder.build(); + let statistics_array = make_array(array_data); + if statistics_array.data_type() == data_type { + return statistics_array; + } + // cast statistics array to required data type + arrow::compute::cast(&statistics_array, data_type) + .unwrap_or_else(|_| build_null_array(data_type, statistics_count)) +} + #[async_trait] impl ExecutionPlan for ParquetExec { /// Return a reference to Any that can be used for downcasting @@ -282,10 +775,17 @@ impl ExecutionPlan for ParquetExec { let filenames = self.partitions[partition].filenames.clone(); let projection = self.projection.clone(); + let predicate_builder = self.predicate_builder.clone(); let batch_size = self.batch_size; task::spawn_blocking(move || { - if let Err(e) = read_files(&filenames, &projection, batch_size, response_tx) { + if let Err(e) = read_files( + &filenames, + &projection, + &predicate_builder, + batch_size, + response_tx, + ) { println!("Parquet reader thread terminated due to error: {:?}", e); } }); @@ -310,13 +810,19 @@ fn send_result( fn read_files( filenames: &[String], projection: &[usize], + predicate_builder: &Option, batch_size: usize, response_tx: Sender>>, ) -> Result<()> { for filename in filenames { let file = File::open(&filename)?; - let file_reader = Arc::new(SerializedFileReader::new(file)?); - let mut arrow_reader = ParquetFileArrowReader::new(file_reader); + let mut file_reader = SerializedFileReader::new(file)?; + if let Some(predicate_builder) = predicate_builder { + let row_group_predicate = predicate_builder + .build_row_group_predicate(file_reader.metadata().row_groups()); + file_reader.filter_row_groups(&row_group_predicate); + } + let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader)); let mut batch_reader = arrow_reader .get_record_reader_by_columns(projection.to_owned(), batch_size)?; loop { @@ -389,7 +895,10 @@ impl RecordBatchStream for ParquetStream { #[cfg(test)] mod tests { use super::*; + use arrow::array::{Int32Array, StringArray}; use futures::StreamExt; + use parquet::basic::Type as PhysicalType; + use parquet::schema::types::SchemaDescPtr; #[test] fn test_split_files() { @@ -432,7 +941,7 @@ mod tests { let testdata = arrow::util::test_util::parquet_test_data(); let filename = format!("{}/alltypes_plain.parquet", testdata); let parquet_exec = - ParquetExec::try_from_path(&filename, Some(vec![0, 1, 2]), 1024, 4)?; + ParquetExec::try_from_path(&filename, Some(vec![0, 1, 2]), None, 1024, 4)?; assert_eq!(parquet_exec.output_partitioning().partition_count(), 1); let mut results = parquet_exec.execute(0).await?; @@ -457,4 +966,479 @@ mod tests { Ok(()) } + + #[test] + fn build_statistics_array_int32() { + // build row group metadata array + let s1 = ParquetStatistics::int32(None, Some(10), None, 0, false); + let s2 = ParquetStatistics::int32(Some(2), Some(20), None, 0, false); + let s3 = ParquetStatistics::int32(Some(3), Some(30), None, 0, false); + let statistics = vec![Some(&s1), Some(&s2), Some(&s3)]; + + let statistics_array = + build_statistics_array(&statistics, StatisticsType::Min, &DataType::Int32); + let int32_array = statistics_array + .as_any() + .downcast_ref::() + .unwrap(); + let int32_vec = int32_array.into_iter().collect::>(); + assert_eq!(int32_vec, vec![None, Some(2), Some(3)]); + + let statistics_array = + build_statistics_array(&statistics, StatisticsType::Max, &DataType::Int32); + let int32_array = statistics_array + .as_any() + .downcast_ref::() + .unwrap(); + let int32_vec = int32_array.into_iter().collect::>(); + // here the first max value is None and not the Some(10) value which was actually set + // because the min value is None + assert_eq!(int32_vec, vec![None, Some(20), Some(30)]); + } + + #[test] + fn build_statistics_array_utf8() { + // build row group metadata array + let s1 = ParquetStatistics::byte_array(None, Some("10".into()), None, 0, false); + let s2 = ParquetStatistics::byte_array( + Some("2".into()), + Some("20".into()), + None, + 0, + false, + ); + let s3 = ParquetStatistics::byte_array( + Some("3".into()), + Some("30".into()), + None, + 0, + false, + ); + let statistics = vec![Some(&s1), Some(&s2), Some(&s3)]; + + let statistics_array = + build_statistics_array(&statistics, StatisticsType::Min, &DataType::Utf8); + let string_array = statistics_array + .as_any() + .downcast_ref::() + .unwrap(); + let string_vec = string_array.into_iter().collect::>(); + assert_eq!(string_vec, vec![None, Some("2"), Some("3")]); + + let statistics_array = + build_statistics_array(&statistics, StatisticsType::Max, &DataType::Utf8); + let string_array = statistics_array + .as_any() + .downcast_ref::() + .unwrap(); + let string_vec = string_array.into_iter().collect::>(); + // here the first max value is None and not the Some("10") value which was actually set + // because the min value is None + assert_eq!(string_vec, vec![None, Some("20"), Some("30")]); + } + + #[test] + fn build_statistics_array_empty_stats() { + let data_type = DataType::Int32; + let statistics = vec![]; + let statistics_array = + build_statistics_array(&statistics, StatisticsType::Min, &data_type); + assert_eq!(statistics_array.len(), 0); + + let statistics = vec![None, None]; + let statistics_array = + build_statistics_array(&statistics, StatisticsType::Min, &data_type); + assert_eq!(statistics_array.len(), statistics.len()); + assert_eq!(statistics_array.data_type(), &data_type); + for i in 0..statistics_array.len() { + assert_eq!(statistics_array.is_null(i), true); + assert_eq!(statistics_array.is_valid(i), false); + } + } + + #[test] + fn build_statistics_array_unsupported_type() { + // boolean is not currently a supported type for statistics + let s1 = ParquetStatistics::boolean(Some(false), Some(true), None, 0, false); + let s2 = ParquetStatistics::boolean(Some(false), Some(true), None, 0, false); + let statistics = vec![Some(&s1), Some(&s2)]; + let data_type = DataType::Boolean; + let statistics_array = + build_statistics_array(&statistics, StatisticsType::Min, &data_type); + assert_eq!(statistics_array.len(), statistics.len()); + assert_eq!(statistics_array.data_type(), &data_type); + for i in 0..statistics_array.len() { + assert_eq!(statistics_array.is_null(i), true); + assert_eq!(statistics_array.is_valid(i), false); + } + } + + #[test] + fn row_group_predicate_eq() -> Result<()> { + use crate::logical_plan::{col, lit}; + let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); + let expected_expr = "#c1_min LtEq Int32(1) And Int32(1) LtEq #c1_max"; + + // test column on the left + let expr = col("c1").eq(lit(1)); + let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; + assert_eq!(format!("{:?}", predicate_expr), expected_expr); + + // test column on the right + let expr = lit(1).eq(col("c1")); + let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; + assert_eq!(format!("{:?}", predicate_expr), expected_expr); + + Ok(()) + } + + #[test] + fn row_group_predicate_gt() -> Result<()> { + use crate::logical_plan::{col, lit}; + let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); + let expected_expr = "#c1_max Gt Int32(1)"; + + // test column on the left + let expr = col("c1").gt(lit(1)); + let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; + assert_eq!(format!("{:?}", predicate_expr), expected_expr); + + // test column on the right + let expr = lit(1).lt(col("c1")); + let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; + assert_eq!(format!("{:?}", predicate_expr), expected_expr); + + Ok(()) + } + + #[test] + fn row_group_predicate_gt_eq() -> Result<()> { + use crate::logical_plan::{col, lit}; + let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); + let expected_expr = "#c1_max GtEq Int32(1)"; + + // test column on the left + let expr = col("c1").gt_eq(lit(1)); + let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; + assert_eq!(format!("{:?}", predicate_expr), expected_expr); + // test column on the right + let expr = lit(1).lt_eq(col("c1")); + let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; + assert_eq!(format!("{:?}", predicate_expr), expected_expr); + + Ok(()) + } + + #[test] + fn row_group_predicate_lt() -> Result<()> { + use crate::logical_plan::{col, lit}; + let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); + let expected_expr = "#c1_min Lt Int32(1)"; + + // test column on the left + let expr = col("c1").lt(lit(1)); + let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; + assert_eq!(format!("{:?}", predicate_expr), expected_expr); + + // test column on the right + let expr = lit(1).gt(col("c1")); + let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; + assert_eq!(format!("{:?}", predicate_expr), expected_expr); + + Ok(()) + } + + #[test] + fn row_group_predicate_lt_eq() -> Result<()> { + use crate::logical_plan::{col, lit}; + let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); + let expected_expr = "#c1_min LtEq Int32(1)"; + + // test column on the left + let expr = col("c1").lt_eq(lit(1)); + let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; + assert_eq!(format!("{:?}", predicate_expr), expected_expr); + // test column on the right + let expr = lit(1).gt_eq(col("c1")); + let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; + assert_eq!(format!("{:?}", predicate_expr), expected_expr); + + Ok(()) + } + + #[test] + fn row_group_predicate_and() -> Result<()> { + use crate::logical_plan::{col, lit}; + let schema = Schema::new(vec![ + Field::new("c1", DataType::Int32, false), + Field::new("c2", DataType::Int32, false), + Field::new("c3", DataType::Int32, false), + ]); + // test AND operator joining supported c1 < 1 expression and unsupported c2 > c3 expression + let expr = col("c1").lt(lit(1)).and(col("c2").lt(col("c3"))); + let expected_expr = "#c1_min Lt Int32(1) And Boolean(true)"; + let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; + assert_eq!(format!("{:?}", predicate_expr), expected_expr); + + Ok(()) + } + + #[test] + fn row_group_predicate_or() -> Result<()> { + use crate::logical_plan::{col, lit}; + let schema = Schema::new(vec![ + Field::new("c1", DataType::Int32, false), + Field::new("c2", DataType::Int32, false), + ]); + // test OR operator joining supported c1 < 1 expression and unsupported c2 % 2 expression + let expr = col("c1").lt(lit(1)).or(col("c2").modulus(lit(2))); + let expected_expr = "#c1_min Lt Int32(1) Or Boolean(true)"; + let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; + assert_eq!(format!("{:?}", predicate_expr), expected_expr); + + Ok(()) + } + + #[test] + fn row_group_predicate_stat_column_req() -> Result<()> { + use crate::logical_plan::{col, lit}; + let schema = Schema::new(vec![ + Field::new("c1", DataType::Int32, false), + Field::new("c2", DataType::Int32, false), + ]); + let mut stat_column_req = vec![]; + // c1 < 1 and (c2 = 2 or c2 = 3) + let expr = col("c1") + .lt(lit(1)) + .and(col("c2").eq(lit(2)).or(col("c2").eq(lit(3)))); + let expected_expr = "#c1_min Lt Int32(1) And #c2_min LtEq Int32(2) And Int32(2) LtEq #c2_max Or #c2_min LtEq Int32(3) And Int32(3) LtEq #c2_max"; + let predicate_expr = + build_predicate_expression(&expr, &schema, &mut stat_column_req)?; + assert_eq!(format!("{:?}", predicate_expr), expected_expr); + // c1 < 1 should add c1_min + let c1_min_field = Field::new("c1_min", DataType::Int32, false); + assert_eq!( + stat_column_req[0], + ("c1".to_owned(), StatisticsType::Min, c1_min_field) + ); + // c2 = 2 should add c2_min and c2_max + let c2_min_field = Field::new("c2_min", DataType::Int32, false); + assert_eq!( + stat_column_req[1], + ("c2".to_owned(), StatisticsType::Min, c2_min_field) + ); + let c2_max_field = Field::new("c2_max", DataType::Int32, false); + assert_eq!( + stat_column_req[2], + ("c2".to_owned(), StatisticsType::Max, c2_max_field) + ); + // c2 = 3 shouldn't add any new statistics fields + assert_eq!(stat_column_req.len(), 3); + + Ok(()) + } + + #[test] + fn row_group_predicate_builder_simple_expr() -> Result<()> { + use crate::logical_plan::{col, lit}; + // int > 1 => c1_max > 1 + let expr = col("c1").gt(lit(15)); + let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); + let predicate_builder = RowGroupPredicateBuilder::try_new(&expr, schema)?; + + let schema_descr = get_test_schema_descr(vec![("c1", PhysicalType::INT32)]); + let rgm1 = get_row_group_meta_data( + &schema_descr, + vec![ParquetStatistics::int32(Some(1), Some(10), None, 0, false)], + ); + let rgm2 = get_row_group_meta_data( + &schema_descr, + vec![ParquetStatistics::int32(Some(11), Some(20), None, 0, false)], + ); + let row_group_metadata = vec![rgm1, rgm2]; + let row_group_predicate = + predicate_builder.build_row_group_predicate(&row_group_metadata); + let row_group_filter = row_group_metadata + .iter() + .enumerate() + .map(|(i, g)| row_group_predicate(g, i)) + .collect::>(); + assert_eq!(row_group_filter, vec![false, true]); + + Ok(()) + } + + #[test] + fn row_group_predicate_builder_missing_stats() -> Result<()> { + use crate::logical_plan::{col, lit}; + // int > 1 => c1_max > 1 + let expr = col("c1").gt(lit(15)); + let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); + let predicate_builder = RowGroupPredicateBuilder::try_new(&expr, schema)?; + + let schema_descr = get_test_schema_descr(vec![("c1", PhysicalType::INT32)]); + let rgm1 = get_row_group_meta_data( + &schema_descr, + vec![ParquetStatistics::int32(None, None, None, 0, false)], + ); + let rgm2 = get_row_group_meta_data( + &schema_descr, + vec![ParquetStatistics::int32(Some(11), Some(20), None, 0, false)], + ); + let row_group_metadata = vec![rgm1, rgm2]; + let row_group_predicate = + predicate_builder.build_row_group_predicate(&row_group_metadata); + let row_group_filter = row_group_metadata + .iter() + .enumerate() + .map(|(i, g)| row_group_predicate(g, i)) + .collect::>(); + // missing statistics for first row group mean that the result from the predicate expression + // is null / undefined so the first row group can't be filtered out + assert_eq!(row_group_filter, vec![true, true]); + + Ok(()) + } + + #[test] + fn row_group_predicate_builder_partial_expr() -> Result<()> { + use crate::logical_plan::{col, lit}; + // test row group predicate with partially supported expression + // int > 1 and int % 2 => c1_max > 1 and true + let expr = col("c1").gt(lit(15)).and(col("c2").modulus(lit(2))); + let schema = Schema::new(vec![ + Field::new("c1", DataType::Int32, false), + Field::new("c2", DataType::Int32, false), + ]); + let predicate_builder = RowGroupPredicateBuilder::try_new(&expr, schema.clone())?; + + let schema_descr = get_test_schema_descr(vec![ + ("c1", PhysicalType::INT32), + ("c2", PhysicalType::INT32), + ]); + let rgm1 = get_row_group_meta_data( + &schema_descr, + vec![ + ParquetStatistics::int32(Some(1), Some(10), None, 0, false), + ParquetStatistics::int32(Some(1), Some(10), None, 0, false), + ], + ); + let rgm2 = get_row_group_meta_data( + &schema_descr, + vec![ + ParquetStatistics::int32(Some(11), Some(20), None, 0, false), + ParquetStatistics::int32(Some(11), Some(20), None, 0, false), + ], + ); + let row_group_metadata = vec![rgm1, rgm2]; + let row_group_predicate = + predicate_builder.build_row_group_predicate(&row_group_metadata); + let row_group_filter = row_group_metadata + .iter() + .enumerate() + .map(|(i, g)| row_group_predicate(g, i)) + .collect::>(); + // the first row group is still filtered out because the predicate expression can be partially evaluated + // when conditions are joined using AND + assert_eq!(row_group_filter, vec![false, true]); + + // if conditions in predicate are joined with OR and an unsupported expression is used + // this bypasses the entire predicate expression and no row groups are filtered out + let expr = col("c1").gt(lit(15)).or(col("c2").modulus(lit(2))); + let predicate_builder = RowGroupPredicateBuilder::try_new(&expr, schema)?; + let row_group_predicate = + predicate_builder.build_row_group_predicate(&row_group_metadata); + let row_group_filter = row_group_metadata + .iter() + .enumerate() + .map(|(i, g)| row_group_predicate(g, i)) + .collect::>(); + assert_eq!(row_group_filter, vec![true, true]); + + Ok(()) + } + + #[test] + fn row_group_predicate_builder_unsupported_type() -> Result<()> { + use crate::logical_plan::{col, lit}; + // test row group predicate with unsupported statistics type (boolean) + // where a null array is generated for some statistics columns + // int > 1 and bool = true => c1_max > 1 and null + let expr = col("c1").gt(lit(15)).and(col("c2").eq(lit(true))); + let schema = Schema::new(vec![ + Field::new("c1", DataType::Int32, false), + Field::new("c2", DataType::Boolean, false), + ]); + let predicate_builder = RowGroupPredicateBuilder::try_new(&expr, schema)?; + + let schema_descr = get_test_schema_descr(vec![ + ("c1", PhysicalType::INT32), + ("c2", PhysicalType::BOOLEAN), + ]); + let rgm1 = get_row_group_meta_data( + &schema_descr, + vec![ + ParquetStatistics::int32(Some(1), Some(10), None, 0, false), + ParquetStatistics::boolean(Some(false), Some(true), None, 0, false), + ], + ); + let rgm2 = get_row_group_meta_data( + &schema_descr, + vec![ + ParquetStatistics::int32(Some(11), Some(20), None, 0, false), + ParquetStatistics::boolean(Some(false), Some(true), None, 0, false), + ], + ); + let row_group_metadata = vec![rgm1, rgm2]; + let row_group_predicate = + predicate_builder.build_row_group_predicate(&row_group_metadata); + let row_group_filter = row_group_metadata + .iter() + .enumerate() + .map(|(i, g)| row_group_predicate(g, i)) + .collect::>(); + // no row group is filtered out because the predicate expression can't be evaluated + // when a null array is generated for a statistics column, + // because the null values propagate to the end result, making the predicate result undefined + assert_eq!(row_group_filter, vec![true, true]); + + Ok(()) + } + + fn get_row_group_meta_data( + schema_descr: &SchemaDescPtr, + column_statistics: Vec, + ) -> RowGroupMetaData { + use parquet::file::metadata::ColumnChunkMetaData; + let mut columns = vec![]; + for (i, s) in column_statistics.iter().enumerate() { + let column = ColumnChunkMetaData::builder(schema_descr.column(i)) + .set_statistics(s.clone()) + .build() + .unwrap(); + columns.push(column); + } + RowGroupMetaData::builder(schema_descr.clone()) + .set_num_rows(1000) + .set_total_byte_size(2000) + .set_column_metadata(columns) + .build() + .unwrap() + } + + fn get_test_schema_descr(fields: Vec<(&str, PhysicalType)>) -> SchemaDescPtr { + use parquet::schema::types::{SchemaDescriptor, Type as SchemaType}; + let mut schema_fields = fields + .iter() + .map(|(n, t)| { + Arc::new(SchemaType::primitive_type_builder(n, *t).build().unwrap()) + }) + .collect::>(); + let schema = SchemaType::group_type_builder("schema") + .with_fields(&mut schema_fields) + .build() + .unwrap(); + + Arc::new(SchemaDescriptor::new(Arc::new(schema))) + } } diff --git a/rust/datafusion/src/physical_plan/planner.rs b/rust/datafusion/src/physical_plan/planner.rs index 6af2c485b720b..6ba0d2606a236 100644 --- a/rust/datafusion/src/physical_plan/planner.rs +++ b/rust/datafusion/src/physical_plan/planner.rs @@ -475,7 +475,7 @@ impl DefaultPhysicalPlanner { Expr::BinaryExpr { left, op, right } => { let lhs = self.create_physical_expr(left, input_schema, ctx_state)?; let rhs = self.create_physical_expr(right, input_schema, ctx_state)?; - binary(lhs, op.clone(), rhs, input_schema) + binary(lhs, *op, rhs, input_schema) } Expr::Case { expr, diff --git a/rust/datafusion/src/sql/utils.rs b/rust/datafusion/src/sql/utils.rs index ce8b4d1e01f9b..976e2c574d9f3 100644 --- a/rust/datafusion/src/sql/utils.rs +++ b/rust/datafusion/src/sql/utils.rs @@ -305,7 +305,7 @@ where }), Expr::BinaryExpr { left, right, op } => Ok(Expr::BinaryExpr { left: Box::new(clone_with_replacement(&**left, replacement_fn)?), - op: op.clone(), + op: *op, right: Box::new(clone_with_replacement(&**right, replacement_fn)?), }), Expr::Case { diff --git a/rust/parquet/src/arrow/array_reader.rs b/rust/parquet/src/arrow/array_reader.rs index f456e655a59e6..71b84cd981b09 100644 --- a/rust/parquet/src/arrow/array_reader.rs +++ b/rust/parquet/src/arrow/array_reader.rs @@ -435,16 +435,11 @@ where } fn next_batch(&mut self, batch_size: usize) -> Result { - // Try to initialized column reader + // Try to initialize column reader if self.column_reader.is_none() { - let init_result = self.next_column_reader()?; - if !init_result { - return Err(general_err!("No page left!")); - } + self.next_column_reader()?; } - assert!(self.column_reader.is_some()); - let mut data_buffer: Vec = Vec::with_capacity(batch_size); data_buffer.resize_with(batch_size, T::T::default); @@ -466,7 +461,7 @@ where let mut num_read = 0; - while num_read < batch_size { + while self.column_reader.is_some() && num_read < batch_size { let num_to_read = batch_size - num_read; let cur_data_buf = &mut data_buffer[num_read..]; let cur_def_levels_buf = @@ -2127,6 +2122,37 @@ mod tests { } } + #[test] + fn test_complex_array_reader_no_pages() { + let message_type = " + message test_schema { + REPEATED Group test_mid { + OPTIONAL BYTE_ARRAY leaf (UTF8); + } + } + "; + let schema = parse_message_type(message_type) + .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t)))) + .unwrap(); + let column_desc = schema.column(0); + let pages: Vec> = Vec::new(); + let page_iterator = InMemoryPageIterator::new(schema, column_desc.clone(), pages); + + let converter = Utf8Converter::new(Utf8ArrayConverter {}); + let mut array_reader = + ComplexObjectArrayReader::::new( + Box::new(page_iterator), + column_desc, + converter, + None, + ) + .unwrap(); + + let values_per_page = 100; // this value is arbitrary in this test - the result should always be an array of 0 length + let array = array_reader.next_batch(values_per_page).unwrap(); + assert_eq!(array.len(), 0); + } + #[test] fn test_complex_array_reader_def_and_rep_levels() { // Construct column schema diff --git a/rust/parquet/src/file/serialized_reader.rs b/rust/parquet/src/file/serialized_reader.rs index 663412d40870f..9fbff41918605 100644 --- a/rust/parquet/src/file/serialized_reader.rs +++ b/rust/parquet/src/file/serialized_reader.rs @@ -137,6 +137,24 @@ impl SerializedFileReader { metadata, }) } + + /// Filters row group metadata to only those row groups, + /// for which the predicate function returns true + pub fn filter_row_groups( + &mut self, + predicate: &dyn Fn(&RowGroupMetaData, usize) -> bool, + ) { + let mut filtered_row_groups = Vec::::new(); + for (i, row_group_metadata) in self.metadata.row_groups().iter().enumerate() { + if predicate(row_group_metadata, i) { + filtered_row_groups.push(row_group_metadata.clone()); + } + } + self.metadata = ParquetMetaData::new( + self.metadata.file_metadata().clone(), + filtered_row_groups, + ); + } } impl FileReader for SerializedFileReader { @@ -737,4 +755,21 @@ mod tests { Some("foo.baz.Foobaz$Event".to_owned()) ); } + + #[test] + fn test_file_reader_filter_row_groups() -> Result<()> { + let test_file = get_test_file("alltypes_plain.parquet"); + let mut reader = SerializedFileReader::new(test_file)?; + + // test initial number of row groups + let metadata = reader.metadata(); + assert_eq!(metadata.num_row_groups(), 1); + + // test filtering out all row groups + reader.filter_row_groups(&|_, _| false); + let metadata = reader.metadata(); + assert_eq!(metadata.num_row_groups(), 0); + + Ok(()) + } }