diff --git a/parquet/src/arrow/array_reader/builder.rs b/parquet/src/arrow/array_reader/builder.rs index 32ffaeb9d5cc..e389158a1931 100644 --- a/parquet/src/arrow/array_reader/builder.rs +++ b/parquet/src/arrow/array_reader/builder.rs @@ -39,20 +39,18 @@ use crate::data_type::{ Int64Type, Int96Type, }; use crate::errors::Result; -use crate::schema::types::{ColumnDescriptor, ColumnPath, SchemaDescPtr, Type}; +use crate::schema::types::{ColumnDescriptor, ColumnPath, Type}; /// Create array reader from parquet schema, projection mask, and parquet file reader. pub fn build_array_reader( - parquet_schema: SchemaDescPtr, arrow_schema: SchemaRef, mask: ProjectionMask, - row_groups: Box, + row_groups: &dyn RowGroupCollection, ) -> Result> { - let field = - convert_schema(parquet_schema.as_ref(), mask, Some(arrow_schema.as_ref()))?; + let field = convert_schema(&row_groups.schema(), mask, Some(arrow_schema.as_ref()))?; match &field { - Some(field) => build_reader(field, row_groups.as_ref()), + Some(field) => build_reader(field, row_groups), None => Ok(make_empty_array_reader(row_groups.num_rows())), } } @@ -333,13 +331,8 @@ mod tests { ) .unwrap(); - let array_reader = build_array_reader( - file_reader.metadata().file_metadata().schema_descr_ptr(), - Arc::new(arrow_schema), - mask, - Box::new(file_reader), - ) - .unwrap(); + let array_reader = + build_array_reader(Arc::new(arrow_schema), mask, &file_reader).unwrap(); // Create arrow types let arrow_type = DataType::Struct(vec![Field::new( diff --git a/parquet/src/arrow/array_reader/list_array.rs b/parquet/src/arrow/array_reader/list_array.rs index 2acd59dcc24f..d2fa94611906 100644 --- a/parquet/src/arrow/array_reader/list_array.rs +++ b/parquet/src/arrow/array_reader/list_array.rs @@ -582,13 +582,9 @@ mod tests { let schema = file_metadata.schema_descr_ptr(); let mask = ProjectionMask::leaves(&schema, vec![0]); - let mut array_reader = build_array_reader( - schema, - Arc::new(arrow_schema), - mask, - Box::new(file_reader), - ) - .unwrap(); + let mut array_reader = + build_array_reader(Arc::new(arrow_schema), mask, &file_reader) + .unwrap(); let batch = array_reader.next_batch(100).unwrap(); assert_eq!(batch.data_type(), array_reader.get_data_type()); diff --git a/parquet/src/arrow/array_reader/mod.rs b/parquet/src/arrow/array_reader/mod.rs index d7665ef0f6b2..54c45a336a37 100644 --- a/parquet/src/arrow/array_reader/mod.rs +++ b/parquet/src/arrow/array_reader/mod.rs @@ -100,7 +100,7 @@ pub trait ArrayReader: Send { /// A collection of row groups pub trait RowGroupCollection { /// Get schema of parquet file. - fn schema(&self) -> Result; + fn schema(&self) -> SchemaDescPtr; /// Get the numer of rows in this collection fn num_rows(&self) -> usize; @@ -110,8 +110,8 @@ pub trait RowGroupCollection { } impl RowGroupCollection for Arc { - fn schema(&self) -> Result { - Ok(self.metadata().file_metadata().schema_descr_ptr()) + fn schema(&self) -> SchemaDescPtr { + self.metadata().file_metadata().schema_descr_ptr() } fn num_rows(&self) -> usize { diff --git a/parquet/src/arrow/arrow_reader/filter.rs b/parquet/src/arrow/arrow_reader/filter.rs new file mode 100644 index 000000000000..8945ccde4248 --- /dev/null +++ b/parquet/src/arrow/arrow_reader/filter.rs @@ -0,0 +1,109 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::arrow::ProjectionMask; +use arrow::array::BooleanArray; +use arrow::error::Result as ArrowResult; +use arrow::record_batch::RecordBatch; + +/// A predicate operating on [`RecordBatch`] +pub trait ArrowPredicate: Send + 'static { + /// Returns the [`ProjectionMask`] that describes the columns required + /// to evaluate this predicate. All projected columns will be provided in the `batch` + /// passed to [`evaluate`](Self::evaluate) + fn projection(&self) -> &ProjectionMask; + + /// Evaluate this predicate for the given [`RecordBatch`] containing the columns + /// identified by [`Self::projection`] + /// + /// Rows that are `true` in the returned [`BooleanArray`] will be returned by the + /// parquet reader, whereas rows that are `false` or `Null` will not be + fn evaluate(&mut self, batch: RecordBatch) -> ArrowResult; +} + +/// An [`ArrowPredicate`] created from an [`FnMut`] +pub struct ArrowPredicateFn { + f: F, + projection: ProjectionMask, +} + +impl ArrowPredicateFn +where + F: FnMut(RecordBatch) -> ArrowResult + Send + 'static, +{ + /// Create a new [`ArrowPredicateFn`]. `f` will be passed batches + /// that contains the columns specified in `projection` + /// and returns a [`BooleanArray`] that describes which rows should + /// be passed along + pub fn new(projection: ProjectionMask, f: F) -> Self { + Self { f, projection } + } +} + +impl ArrowPredicate for ArrowPredicateFn +where + F: FnMut(RecordBatch) -> ArrowResult + Send + 'static, +{ + fn projection(&self) -> &ProjectionMask { + &self.projection + } + + fn evaluate(&mut self, batch: RecordBatch) -> ArrowResult { + (self.f)(batch) + } +} + +/// A [`RowFilter`] allows pushing down a filter predicate to skip IO and decode +/// +/// This consists of a list of [`ArrowPredicate`] where only the rows that satisfy all +/// of the predicates will be returned. Any [`RowSelection`] will be applied prior +/// to the first predicate, and each predicate in turn will then be used to compute +/// a more refined [`RowSelection`] to use when evaluating the subsequent predicates. +/// +/// Once all predicates have been evaluated, the final [`RowSelection`] is applied +/// to the top-level [`ProjectionMask`] to produce the final output [`RecordBatch`]. +/// +/// This design has a couple of implications: +/// +/// * [`RowFilter`] can be used to skip entire pages, and thus IO, in addition to CPU decode overheads +/// * Columns may be decoded multiple times if they appear in multiple [`ProjectionMask`] +/// * IO will be deferred until needed by a [`ProjectionMask`] +/// +/// As such there is a trade-off between a single large predicate, or multiple predicates, +/// that will depend on the shape of the data. Whilst multiple smaller predicates may +/// minimise the amount of data scanned/decoded, it may not be faster overall. +/// +/// For example, if a predicate that needs a single column of data filters out all but +/// 1% of the rows, applying it as one of the early `ArrowPredicateFn` will likely significantly +/// improve performance. +/// +/// As a counter example, if a predicate needs several columns of data to evaluate but +/// leaves 99% of the rows, it may be better to not filter the data from parquet and +/// apply the filter after the RecordBatch has been fully decoded. +/// +/// [`RowSelection`]: [super::selection::RowSelection] +pub struct RowFilter { + /// A list of [`ArrowPredicate`] + pub(crate) predicates: Vec>, +} + +impl RowFilter { + /// Create a new [`RowFilter`] from an array of [`ArrowPredicate`] + pub fn new(predicates: Vec>) -> Self { + Self { predicates } + } +} diff --git a/parquet/src/arrow/arrow_reader.rs b/parquet/src/arrow/arrow_reader/mod.rs similarity index 96% rename from parquet/src/arrow/arrow_reader.rs rename to parquet/src/arrow/arrow_reader/mod.rs index fd68f4bc02d7..e363919f6516 100644 --- a/parquet/src/arrow/arrow_reader.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -21,6 +21,7 @@ use std::collections::VecDeque; use std::sync::Arc; use arrow::array::Array; +use arrow::compute::prep_null_mask_filter; use arrow::datatypes::{DataType as ArrowType, Schema, SchemaRef}; use arrow::error::Result as ArrowResult; use arrow::record_batch::{RecordBatch, RecordBatchReader}; @@ -36,6 +37,17 @@ use crate::file::reader::{ChunkReader, FileReader, SerializedFileReader}; use crate::file::serialized_reader::ReadOptionsBuilder; use crate::schema::types::SchemaDescriptor; +#[allow(unused)] +mod filter; +#[allow(unused)] +mod selection; + +// TODO: Make these public once stable (#1792) +#[allow(unused_imports)] +pub(crate) use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter}; +#[allow(unused_imports)] +pub(crate) use selection::{RowSelection, RowSelector}; + /// Arrow reader api. /// With this api, user can get arrow schema from parquet file, and read parquet data /// into arrow arrays. @@ -72,41 +84,10 @@ pub trait ArrowReader { ) -> Result; } -/// [`RowSelection`] allows selecting or skipping a provided number of rows -/// when scanning the parquet file -#[derive(Debug, Clone, Copy)] -pub(crate) struct RowSelection { - /// The number of rows - pub row_count: usize, - - /// If true, skip `row_count` rows - pub skip: bool, -} - -impl RowSelection { - /// Select `row_count` rows - #[allow(unused)] - pub fn select(row_count: usize) -> Self { - Self { - row_count, - skip: false, - } - } - - /// Skip `row_count` rows - #[allow(unused)] - pub fn skip(row_count: usize) -> Self { - Self { - row_count, - skip: true, - } - } -} - #[derive(Debug, Clone, Default)] pub struct ArrowReaderOptions { skip_arrow_metadata: bool, - selection: Option>, + selection: Option, } impl ArrowReaderOptions { @@ -130,12 +111,9 @@ impl ArrowReaderOptions { /// Scan rows from the parquet file according to the provided `selection` /// - /// TODO: Make public once row selection fully implemented (#1792) + /// TODO: Revisit this API, as [`Self`] is provided before the file metadata is available #[allow(unused)] - pub(crate) fn with_row_selection( - self, - selection: impl Into>, - ) -> Self { + pub(crate) fn with_row_selection(self, selection: impl Into) -> Self { Self { selection: Some(selection.into()), ..self @@ -143,6 +121,9 @@ impl ArrowReaderOptions { } } +/// An `ArrowReader` that can be used to synchronously read parquet data as [`RecordBatch`] +/// +/// See [`crate::arrow::async_reader`] for an asynchronous interface pub struct ParquetFileArrowReader { file_reader: Arc, @@ -178,21 +159,13 @@ impl ArrowReader for ParquetFileArrowReader { mask: ProjectionMask, batch_size: usize, ) -> Result { - let array_reader = build_array_reader( - self.file_reader - .metadata() - .file_metadata() - .schema_descr_ptr(), - Arc::new(self.get_schema()?), - mask, - Box::new(self.file_reader.clone()), - )?; + let array_reader = + build_array_reader(Arc::new(self.get_schema()?), mask, &self.file_reader)?; - let selection = self.options.selection.clone().map(Into::into); Ok(ParquetRecordBatchReader::new( batch_size, array_reader, - selection, + self.options.selection.clone(), )) } } @@ -279,11 +252,13 @@ impl ParquetFileArrowReader { } } +/// An `Iterator>` that yields [`RecordBatch`] +/// read from a parquet data source pub struct ParquetRecordBatchReader { batch_size: usize, array_reader: Box, schema: SchemaRef, - selection: Option>, + selection: Option>, } impl Iterator for ParquetRecordBatchReader { @@ -319,7 +294,7 @@ impl Iterator for ParquetRecordBatchReader { Some(remaining) if remaining != 0 => { // if page row count less than batch_size we must set batch size to page row count. // add check avoid dead loop - selection.push_front(RowSelection::select(remaining)); + selection.push_front(RowSelector::select(remaining)); need_read } _ => front.row_count, @@ -364,22 +339,13 @@ impl RecordBatchReader for ParquetRecordBatchReader { } impl ParquetRecordBatchReader { - pub fn try_new( - batch_size: usize, - array_reader: Box, - ) -> Result { - Ok(Self::new(batch_size, array_reader, None)) - } - /// Create a new [`ParquetRecordBatchReader`] that will read at most `batch_size` rows at /// a time from [`ArrayReader`] based on the configured `selection`. If `selection` is `None` /// all rows will be returned - /// - /// TODO: Make public once row selection fully implemented (#1792) pub(crate) fn new( batch_size: usize, array_reader: Box, - selection: Option>, + selection: Option, ) -> Self { let schema = match array_reader.get_data_type() { ArrowType::Struct(ref fields) => Schema::new(fields.clone()), @@ -390,11 +356,41 @@ impl ParquetRecordBatchReader { batch_size, array_reader, schema: Arc::new(schema), - selection, + selection: selection.map(Into::into), } } } +/// Evaluates an [`ArrowPredicate`] returning the [`RowSelection`] +/// +/// If this [`ParquetRecordBatchReader`] has a [`RowSelection`], the +/// returned [`RowSelection`] will be the conjunction of this and +/// the rows selected by `predicate` +#[allow(unused)] +pub(crate) fn evaluate_predicate( + batch_size: usize, + array_reader: Box, + input_selection: Option, + predicate: &mut dyn ArrowPredicate, +) -> Result { + let reader = + ParquetRecordBatchReader::new(batch_size, array_reader, input_selection.clone()); + let mut filters = vec![]; + for maybe_batch in reader { + let filter = predicate.evaluate(maybe_batch?)?; + match filter.null_count() { + 0 => filters.push(filter), + _ => filters.push(prep_null_mask_filter(&filter)), + }; + } + + let raw = RowSelection::from_filters(&filters); + Ok(match input_selection { + Some(selection) => selection.and_then(&raw), + None => raw, + }) +} + #[cfg(test)] mod tests { use bytes::Bytes; @@ -417,7 +413,7 @@ mod tests { use crate::arrow::arrow_reader::{ ArrowReader, ArrowReaderOptions, ParquetFileArrowReader, - ParquetRecordBatchReader, RowSelection, + ParquetRecordBatchReader, RowSelection, RowSelector, }; use crate::arrow::buffer::converter::{ Converter, FixedSizeArrayConverter, IntervalDayTimeArrayConverter, @@ -893,7 +889,7 @@ mod tests { /// Encoding encoding: Encoding, //row selections and total selected row count - row_selections: Option<(Vec, usize)>, + row_selections: Option<(RowSelection, usize)>, } impl Default for TestOptions { @@ -1187,6 +1183,7 @@ mod tests { let mut without_skip_data = gen_expected_data::(&def_levels, &values); let mut skip_data: Vec> = vec![]; + let selections: VecDeque = selections.into(); for select in selections { if select.skip { without_skip_data.drain(0..select.row_count); @@ -1763,12 +1760,12 @@ mod tests { /// a `batch_size` and `selection` fn get_expected_batches( column: &RecordBatch, - selection: &[RowSelection], + selection: &RowSelection, batch_size: usize, ) -> Vec { let mut expected_batches = vec![]; - let mut selection: VecDeque<_> = selection.iter().cloned().collect(); + let mut selection: VecDeque<_> = selection.clone().into(); let mut row_offset = 0; let mut last_start = None; while row_offset < column.num_rows() && !selection.is_empty() { @@ -1820,7 +1817,7 @@ mod tests { step_len: usize, total_len: usize, skip_first: bool, - ) -> (Vec, usize) { + ) -> (RowSelection, usize) { let mut remaining = total_len; let mut skip = skip_first; let mut vec = vec![]; @@ -1831,7 +1828,7 @@ mod tests { } else { remaining }; - vec.push(RowSelection { + vec.push(RowSelector { row_count: step, skip, }); @@ -1841,7 +1838,7 @@ mod tests { } skip = !skip; } - (vec, selected_count) + (vec.into(), selected_count) } #[test] @@ -1890,7 +1887,7 @@ mod tests { fn create_skip_reader( test_file: &File, batch_size: usize, - selections: Vec, + selections: RowSelection, ) -> ParquetRecordBatchReader { let arrow_reader_options = ArrowReaderOptions::new().with_row_selection(selections); diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs new file mode 100644 index 000000000000..8e129f5667ec --- /dev/null +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -0,0 +1,426 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{Array, BooleanArray}; +use arrow::compute::SlicesIterator; +use std::cmp::Ordering; +use std::collections::VecDeque; +use std::ops::Range; + +/// [`RowSelector`] represents a range of rows to scan from a parquet file +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct RowSelector { + /// The number of rows + pub row_count: usize, + + /// If true, skip `row_count` rows + pub skip: bool, +} + +impl RowSelector { + /// Select `row_count` rows + pub fn select(row_count: usize) -> Self { + Self { + row_count, + skip: false, + } + } + + /// Skip `row_count` rows + pub fn skip(row_count: usize) -> Self { + Self { + row_count, + skip: true, + } + } +} + +/// [`RowSelection`] allows selecting or skipping a provided number of rows +/// when scanning the parquet file. +/// +/// This is applied prior to reading column data, and can therefore +/// be used to skip IO to fetch data into memory +/// +/// A typical use-case would be using the [`PageIndex`] to filter out rows +/// that don't satisfy a predicate +/// +/// [`PageIndex`]: [crate::file::page_index::index::PageIndex] +#[derive(Debug, Clone, Default, Eq, PartialEq)] +pub struct RowSelection { + selectors: Vec, +} + +impl RowSelection { + /// Creates a [`RowSelection`] from a slice of [`BooleanArray`] + /// + /// # Panic + /// + /// Panics if any of the [`BooleanArray`] contain nulls + pub fn from_filters(filters: &[BooleanArray]) -> Self { + let mut next_offset = 0; + let total_rows = filters.iter().map(|x| x.len()).sum(); + + let iter = filters.iter().flat_map(|filter| { + let offset = next_offset; + next_offset += filter.len(); + assert_eq!(filter.null_count(), 0); + SlicesIterator::new(filter) + .map(move |(start, end)| start + offset..end + offset) + }); + + Self::from_consecutive_ranges(iter, total_rows) + } + + /// Creates a [`RowSelection`] from an iterator of consecutive ranges to keep + fn from_consecutive_ranges>>( + ranges: I, + total_rows: usize, + ) -> Self { + let mut selectors: Vec = Vec::with_capacity(ranges.size_hint().0); + let mut last_end = 0; + for range in ranges { + let len = range.end - range.start; + + match range.start.cmp(&last_end) { + Ordering::Equal => match selectors.last_mut() { + Some(last) => last.row_count += len, + None => selectors.push(RowSelector::select(len)), + }, + Ordering::Greater => { + selectors.push(RowSelector::skip(range.start - last_end)); + selectors.push(RowSelector::select(len)) + } + Ordering::Less => panic!("out of order"), + } + last_end = range.end; + } + + if last_end != total_rows { + selectors.push(RowSelector::skip(total_rows - last_end)) + } + + Self { selectors } + } + + /// Splits off the first `row_count` from this [`RowSelection`] + pub fn split_off(&mut self, row_count: usize) -> Self { + let mut total_count = 0; + + // Find the index where the selector exceeds the row count + let find = self.selectors.iter().enumerate().find(|(_, selector)| { + total_count += selector.row_count; + total_count > row_count + }); + + let split_idx = match find { + Some((idx, _)) => idx, + None => { + let selectors = std::mem::take(&mut self.selectors); + return Self { selectors }; + } + }; + + let mut remaining = self.selectors.split_off(split_idx); + + // Always present as `split_idx < self.selectors.len` + let next = remaining.first_mut().unwrap(); + let overflow = total_count - row_count; + + if next.row_count != overflow { + self.selectors.push(RowSelector { + row_count: next.row_count - overflow, + skip: next.skip, + }) + } + next.row_count = overflow; + + std::mem::swap(&mut remaining, &mut self.selectors); + Self { + selectors: remaining, + } + } + + /// Given a [`RowSelection`] computed under `self`, returns the [`RowSelection`] + /// representing their conjunction + /// + /// For example: + /// + /// self: NNNNNNNNNNNNYYYYYYYYYYYYYYYYYYYYYYNNNYYYYY + /// other: YYYYYNNNNYYYYYYYYYYYYY YYNNN + /// + /// returned: NNNNNNNNNNNNYYYYYNNNNYYYYYYYYYYYYYYNNYNNNN + /// + /// + pub fn and_then(&self, other: &Self) -> Self { + let mut selectors = vec![]; + let mut first = self.selectors.iter().cloned().peekable(); + let mut second = other.selectors.iter().cloned().peekable(); + + let mut to_skip = 0; + while let Some(b) = second.peek_mut() { + let a = first.peek_mut().unwrap(); + + if b.row_count == 0 { + second.next().unwrap(); + continue; + } + + if a.row_count == 0 { + first.next().unwrap(); + continue; + } + + if a.skip { + // Records were skipped when producing second + to_skip += a.row_count; + first.next().unwrap(); + continue; + } + + let skip = b.skip; + let to_process = a.row_count.min(b.row_count); + + a.row_count -= to_process; + b.row_count -= to_process; + + match skip { + true => to_skip += to_process, + false => { + if to_skip != 0 { + selectors.push(RowSelector::skip(to_skip)); + to_skip = 0; + } + selectors.push(RowSelector::select(to_process)) + } + } + } + + for v in first { + if v.row_count != 0 { + assert!(v.skip); + to_skip += v.row_count + } + } + + if to_skip != 0 { + selectors.push(RowSelector::skip(to_skip)); + } + + Self { selectors } + } + + /// Returns `true` if this [`RowSelection`] selects any rows + pub fn selects_any(&self) -> bool { + self.selectors.iter().any(|x| !x.skip) + } +} + +impl From> for RowSelection { + fn from(selectors: Vec) -> Self { + Self { selectors } + } +} + +impl From for VecDeque { + fn from(r: RowSelection) -> Self { + r.selectors.into() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use rand::{thread_rng, Rng}; + + #[test] + fn test_from_filters() { + let filters = vec![ + BooleanArray::from(vec![false, false, false, true, true, true, true]), + BooleanArray::from(vec![true, true, false, false, true, true, true]), + BooleanArray::from(vec![false, false, false, false]), + BooleanArray::from(Vec::::new()), + ]; + + let selection = RowSelection::from_filters(&filters[..1]); + assert!(selection.selects_any()); + assert_eq!( + selection.selectors, + vec![RowSelector::skip(3), RowSelector::select(4)] + ); + + let selection = RowSelection::from_filters(&filters[..2]); + assert!(selection.selects_any()); + assert_eq!( + selection.selectors, + vec![ + RowSelector::skip(3), + RowSelector::select(6), + RowSelector::skip(2), + RowSelector::select(3) + ] + ); + + let selection = RowSelection::from_filters(&filters); + assert!(selection.selects_any()); + assert_eq!( + selection.selectors, + vec![ + RowSelector::skip(3), + RowSelector::select(6), + RowSelector::skip(2), + RowSelector::select(3), + RowSelector::skip(4) + ] + ); + + let selection = RowSelection::from_filters(&filters[2..3]); + assert!(!selection.selects_any()); + assert_eq!(selection.selectors, vec![RowSelector::skip(4)]); + } + + #[test] + fn test_split_off() { + let mut selection = RowSelection::from(vec![ + RowSelector::skip(34), + RowSelector::select(12), + RowSelector::skip(3), + RowSelector::select(35), + ]); + + let split = selection.split_off(34); + assert_eq!(split.selectors, vec![RowSelector::skip(34)]); + assert_eq!( + selection.selectors, + vec![ + RowSelector::select(12), + RowSelector::skip(3), + RowSelector::select(35) + ] + ); + + let split = selection.split_off(5); + assert_eq!(split.selectors, vec![RowSelector::select(5)]); + assert_eq!( + selection.selectors, + vec![ + RowSelector::select(7), + RowSelector::skip(3), + RowSelector::select(35) + ] + ); + + let split = selection.split_off(8); + assert_eq!( + split.selectors, + vec![RowSelector::select(7), RowSelector::skip(1)] + ); + assert_eq!( + selection.selectors, + vec![RowSelector::skip(2), RowSelector::select(35)] + ); + + let split = selection.split_off(200); + assert_eq!( + split.selectors, + vec![RowSelector::skip(2), RowSelector::select(35)] + ); + assert!(selection.selectors.is_empty()); + } + + #[test] + fn test_and() { + let mut a = RowSelection::from(vec![ + RowSelector::skip(12), + RowSelector::select(23), + RowSelector::skip(3), + RowSelector::select(5), + ]); + + let b = RowSelection::from(vec![ + RowSelector::select(5), + RowSelector::skip(4), + RowSelector::select(15), + RowSelector::skip(4), + ]); + + let mut expected = RowSelection::from(vec![ + RowSelector::skip(12), + RowSelector::select(5), + RowSelector::skip(4), + RowSelector::select(14), + RowSelector::skip(3), + RowSelector::select(1), + RowSelector::skip(4), + ]); + + assert_eq!(a.and_then(&b), expected); + + a.split_off(7); + expected.split_off(7); + assert_eq!(a.and_then(&b), expected); + + let a = RowSelection::from(vec![RowSelector::select(5), RowSelector::skip(3)]); + + let b = RowSelection::from(vec![ + RowSelector::select(2), + RowSelector::skip(1), + RowSelector::select(1), + RowSelector::skip(1), + ]); + + assert_eq!( + a.and_then(&b).selectors, + vec![ + RowSelector::select(2), + RowSelector::skip(1), + RowSelector::select(1), + RowSelector::skip(4) + ] + ); + } + + #[test] + fn test_and_fuzz() { + let mut rand = thread_rng(); + for _ in 0..100 { + let a_len = rand.gen_range(10..100); + let a_bools: Vec<_> = (0..a_len).map(|x| rand.gen_bool(0.2)).collect(); + let a = RowSelection::from_filters(&[BooleanArray::from(a_bools.clone())]); + + let b_len: usize = a_bools.iter().map(|x| *x as usize).sum(); + let b_bools: Vec<_> = (0..b_len).map(|x| rand.gen_bool(0.8)).collect(); + let b = RowSelection::from_filters(&[BooleanArray::from(b_bools.clone())]); + + let mut expected_bools = vec![false; a_len]; + + let mut iter_b = b_bools.iter(); + for (idx, b) in a_bools.iter().enumerate() { + if *b && *iter_b.next().unwrap() { + expected_bools[idx] = true; + } + } + + let expected = + RowSelection::from_filters(&[BooleanArray::from(expected_bools)]); + + let total_rows: usize = expected.selectors.iter().map(|s| s.row_count).sum(); + assert_eq!(a_len, total_rows); + + assert_eq!(a.and_then(&b), expected); + } + } +} diff --git a/parquet/src/arrow/async_reader.rs b/parquet/src/arrow/async_reader.rs index 3770ed265223..5c186d7aa769 100644 --- a/parquet/src/arrow/async_reader.rs +++ b/parquet/src/arrow/async_reader.rs @@ -86,6 +86,7 @@ use std::task::{Context, Poll}; use bytes::Bytes; use futures::future::{BoxFuture, FutureExt}; +use futures::ready; use futures::stream::Stream; use parquet_format::{PageHeader, PageType}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; @@ -94,7 +95,9 @@ use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use crate::arrow::array_reader::{build_array_reader, RowGroupCollection}; -use crate::arrow::arrow_reader::ParquetRecordBatchReader; +use crate::arrow::arrow_reader::{ + evaluate_predicate, ParquetRecordBatchReader, RowFilter, RowSelection, +}; use crate::arrow::schema::parquet_to_arrow_schema; use crate::arrow::ProjectionMask; use crate::basic::Compression; @@ -102,7 +105,7 @@ use crate::column::page::{Page, PageIterator, PageMetadata, PageReader}; use crate::compression::{create_codec, Codec}; use crate::errors::{ParquetError, Result}; use crate::file::footer::{decode_footer, decode_metadata}; -use crate::file::metadata::ParquetMetaData; +use crate::file::metadata::{ParquetMetaData, RowGroupMetaData}; use crate::file::serialized_reader::{decode_page, read_page_header}; use crate::file::FOOTER_SIZE; use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor}; @@ -209,9 +212,13 @@ pub struct ParquetRecordBatchStreamBuilder { row_groups: Option>, projection: ProjectionMask, + + filter: Option, + + selection: Option, } -impl ParquetRecordBatchStreamBuilder { +impl ParquetRecordBatchStreamBuilder { /// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided parquet file pub async fn new(mut input: T) -> Result { let metadata = input.get_metadata().await?; @@ -228,6 +235,8 @@ impl ParquetRecordBatchStreamBuilder { batch_size: 1024, row_groups: None, projection: ProjectionMask::all(), + filter: None, + selection: None, }) } @@ -267,6 +276,32 @@ impl ParquetRecordBatchStreamBuilder { } } + /// Provide a [`RowSelection] to filter out rows, and avoid fetching their + /// data into memory + /// + /// Row group filtering is applied prior to this, and rows from skipped + /// row groups should not be included in the [`RowSelection`] + /// + /// TODO: Make public once stable (#1792) + #[allow(unused)] + pub(crate) fn with_row_selection(self, selection: RowSelection) -> Self { + Self { + selection: Some(selection), + ..self + } + } + + /// Provide a [`RowFilter`] to skip decoding rows + /// + /// TODO: Make public once stable (#1792) + #[allow(unused)] + pub(crate) fn with_row_filter(self, filter: RowFilter) -> Self { + Self { + filter: Some(filter), + ..self + } + } + /// Build a new [`ParquetRecordBatchStream`] pub fn build(self) -> Result> { let num_row_groups = self.metadata.row_groups().len(); @@ -285,25 +320,122 @@ impl ParquetRecordBatchStreamBuilder { None => (0..self.metadata.row_groups().len()).collect(), }; + let reader = ReaderFactory { + input: self.input, + filter: self.filter, + metadata: self.metadata.clone(), + schema: self.schema.clone(), + }; + Ok(ParquetRecordBatchStream { + metadata: self.metadata, + batch_size: self.batch_size, row_groups, projection: self.projection, - batch_size: self.batch_size, - metadata: self.metadata, + selection: self.selection, schema: self.schema, - input: Some(self.input), + reader: Some(reader), state: StreamState::Init, }) } } +type ReadResult = Result<(ReaderFactory, Option)>; + +/// [`ReaderFactory`] is used by [`ParquetRecordBatchStream`] to create +/// [`ParquetRecordBatchReader`] +struct ReaderFactory { + metadata: Arc, + + schema: SchemaRef, + + input: T, + + filter: Option, +} + +impl ReaderFactory +where + T: AsyncFileReader + Send, +{ + /// Reads the next row group with the provided `selection`, `projection` and `batch_size` + /// + /// Note: this captures self so that the resulting future has a static lifetime + async fn read_row_group( + mut self, + row_group_idx: usize, + mut selection: Option, + projection: ProjectionMask, + batch_size: usize, + ) -> ReadResult { + // TODO: calling build_array multiple times is wasteful + let selects_any = |selection: Option<&RowSelection>| { + selection.map(|x| x.selects_any()).unwrap_or(true) + }; + + let meta = self.metadata.row_group(row_group_idx); + let mut row_group = InMemoryRowGroup { + schema: meta.schema_descr_ptr(), + row_count: meta.num_rows() as usize, + column_chunks: vec![None; meta.columns().len()], + }; + + if let Some(filter) = self.filter.as_mut() { + for predicate in filter.predicates.iter_mut() { + if !selects_any(selection.as_ref()) { + return Ok((self, None)); + } + + let predicate_projection = predicate.projection().clone(); + row_group + .fetch( + &mut self.input, + meta, + &predicate_projection, + selection.as_ref(), + ) + .await?; + + let array_reader = build_array_reader( + self.schema.clone(), + predicate_projection, + &row_group, + )?; + + selection = Some(evaluate_predicate( + batch_size, + array_reader, + selection, + predicate.as_mut(), + )?); + } + } + + if !selects_any(selection.as_ref()) { + return Ok((self, None)); + } + + row_group + .fetch(&mut self.input, meta, &projection, selection.as_ref()) + .await?; + + let reader = ParquetRecordBatchReader::new( + batch_size, + build_array_reader(self.schema.clone(), projection, &row_group)?, + selection, + ); + + Ok((self, Some(reader))) + } +} + enum StreamState { /// At the start of a new row group, or the end of the parquet stream Init, /// Decoding a batch Decoding(ParquetRecordBatchReader), /// Reading data from input - Reading(BoxFuture<'static, Result<(T, InMemoryRowGroup)>>), + Reading(BoxFuture<'static, ReadResult>), /// Error Error, } @@ -319,20 +451,23 @@ impl std::fmt::Debug for StreamState { } } -/// An asynchronous [`Stream`] of [`RecordBatch`] for a parquet file +/// An asynchronous [`Stream`] of [`RecordBatch`] for a parquet file that can be +/// constructed using [`ParquetRecordBatchStreamBuilder`] pub struct ParquetRecordBatchStream { metadata: Arc, schema: SchemaRef, - batch_size: usize, + row_groups: VecDeque, projection: ProjectionMask, - row_groups: VecDeque, + batch_size: usize, + + selection: Option, /// This is an option so it can be moved into a future - input: Option, + reader: Option>, state: StreamState, } @@ -384,101 +519,40 @@ where None => return Poll::Ready(None), }; - let metadata = self.metadata.clone(); - let mut input = match self.input.take() { - Some(input) => input, - None => { - self.state = StreamState::Error; - return Poll::Ready(Some(Err(general_err!( - "input stream lost" - )))); - } - }; + let reader = self.reader.take().expect("lost reader"); - let projection = self.projection.clone(); - self.state = StreamState::Reading( - async move { - let row_group_metadata = metadata.row_group(row_group_idx); - let mut column_chunks = - vec![None; row_group_metadata.columns().len()]; - - // TODO: Combine consecutive ranges - let fetch_ranges = (0..column_chunks.len()) - .into_iter() - .filter_map(|idx| { - if !projection.leaf_included(idx) { - None - } else { - let column = row_group_metadata.column(idx); - let (start, length) = column.byte_range(); - - Some(start as usize..(start + length) as usize) - } - }) - .collect(); - - let mut chunk_data = - input.get_byte_ranges(fetch_ranges).await?.into_iter(); - - for (idx, chunk) in column_chunks.iter_mut().enumerate() { - if !projection.leaf_included(idx) { - continue; - } - - let column = row_group_metadata.column(idx); - - if let Some(data) = chunk_data.next() { - *chunk = Some(InMemoryColumnChunk { - num_values: column.num_values(), - compression: column.compression(), - physical_type: column.column_type(), - data, - }); - } - } - - Ok(( - input, - InMemoryRowGroup { - schema: metadata.file_metadata().schema_descr_ptr(), - row_count: row_group_metadata.num_rows() as usize, - column_chunks, - }, - )) - } - .boxed(), - ) - } - StreamState::Reading(f) => { - let result = futures::ready!(f.poll_unpin(cx)); - self.state = StreamState::Init; - - let row_group: Box = match result { - Ok((input, row_group)) => { - self.input = Some(input); - Box::new(row_group) - } - Err(e) => { - self.state = StreamState::Error; - return Poll::Ready(Some(Err(e))); - } - }; + let row_count = + self.metadata.row_group(row_group_idx).num_rows() as usize; - let parquet_schema = self.metadata.file_metadata().schema_descr_ptr(); + let selection = + self.selection.as_mut().map(|s| s.split_off(row_count)); - let array_reader = build_array_reader( - parquet_schema, - self.schema.clone(), - self.projection.clone(), - row_group, - )?; - - let batch_reader = - ParquetRecordBatchReader::try_new(self.batch_size, array_reader) - .expect("reader"); + let fut = reader + .read_row_group( + row_group_idx, + selection, + self.projection.clone(), + self.batch_size, + ) + .boxed(); - self.state = StreamState::Decoding(batch_reader) + self.state = StreamState::Reading(fut) } + StreamState::Reading(f) => match ready!(f.poll_unpin(cx)) { + Ok((reader_factory, maybe_reader)) => { + self.reader = Some(reader_factory); + match maybe_reader { + // Read records from [`ParquetRecordBatchReader`] + Some(reader) => self.state = StreamState::Decoding(reader), + // All rows skipped, read next row group + None => self.state = StreamState::Init, + } + } + Err(e) => { + self.state = StreamState::Error; + return Poll::Ready(Some(Err(e))); + } + }, StreamState::Error => return Poll::Pending, } } @@ -492,9 +566,56 @@ struct InMemoryRowGroup { row_count: usize, } +impl InMemoryRowGroup { + /// Fetches the necessary column data into memory + async fn fetch( + &mut self, + input: &mut T, + metadata: &RowGroupMetaData, + projection: &ProjectionMask, + _selection: Option<&RowSelection>, + ) -> Result<()> { + // TODO: Use OffsetIndex and selection to prune pages + + let fetch_ranges = self + .column_chunks + .iter() + .enumerate() + .into_iter() + .filter_map(|(idx, chunk)| { + (chunk.is_none() && projection.leaf_included(idx)).then(|| { + let column = metadata.column(idx); + let (start, length) = column.byte_range(); + start as usize..(start + length) as usize + }) + }) + .collect(); + + let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter(); + + for (idx, chunk) in self.column_chunks.iter_mut().enumerate() { + if chunk.is_some() || !projection.leaf_included(idx) { + continue; + } + + let column = metadata.column(idx); + + if let Some(data) = chunk_data.next() { + *chunk = Some(InMemoryColumnChunk { + num_values: column.num_values(), + compression: column.compression(), + physical_type: column.column_type(), + data, + }); + } + } + Ok(()) + } +} + impl RowGroupCollection for InMemoryRowGroup { - fn schema(&self) -> Result { - Ok(self.schema.clone()) + fn schema(&self) -> SchemaDescPtr { + self.schema.clone() } fn num_rows(&self) -> usize { @@ -685,7 +806,10 @@ impl PageIterator for ColumnChunkIterator { #[cfg(test)] mod tests { use super::*; - use crate::arrow::{ArrowReader, ParquetFileArrowReader}; + use crate::arrow::arrow_reader::ArrowPredicateFn; + use crate::arrow::{ArrowReader, ArrowWriter, ParquetFileArrowReader}; + use crate::file::footer::parse_metadata; + use arrow::array::{Array, ArrayRef, Int32Array, StringArray}; use arrow::error::Result as ArrowResult; use futures::TryStreamExt; use std::sync::Mutex; @@ -858,4 +982,73 @@ mod tests { assert_eq!(second_page.page_type(), crate::basic::PageType::DATA_PAGE); assert_eq!(second_page.num_values(), 8); } + + #[tokio::test] + async fn test_row_filter() { + let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]); + let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]); + let c = Int32Array::from_iter(0..6); + let data = RecordBatch::try_from_iter([ + ("a", Arc::new(a) as ArrayRef), + ("b", Arc::new(b) as ArrayRef), + ("c", Arc::new(c) as ArrayRef), + ]) + .unwrap(); + + let mut buf = Vec::with_capacity(1024); + let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap(); + writer.write(&data).unwrap(); + writer.close().unwrap(); + + let data: Bytes = buf.into(); + let metadata = parse_metadata(&data).unwrap(); + let parquet_schema = metadata.file_metadata().schema_descr_ptr(); + + let test = TestReader { + data, + metadata: Arc::new(metadata), + requests: Default::default(), + }; + let requests = test.requests.clone(); + + let a_filter = ArrowPredicateFn::new( + ProjectionMask::leaves(&parquet_schema, vec![0]), + |batch| arrow::compute::eq_dyn_utf8_scalar(batch.column(0), "b"), + ); + + let b_filter = ArrowPredicateFn::new( + ProjectionMask::leaves(&parquet_schema, vec![1]), + |batch| arrow::compute::eq_dyn_utf8_scalar(batch.column(0), "4"), + ); + + let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]); + + let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]); + let stream = ParquetRecordBatchStreamBuilder::new(test) + .await + .unwrap() + .with_projection(mask.clone()) + .with_batch_size(1024) + .with_row_filter(filter) + .build() + .unwrap(); + + let batches: Vec<_> = stream.try_collect().await.unwrap(); + assert_eq!(batches.len(), 1); + + let batch = &batches[0]; + assert_eq!(batch.num_rows(), 1); + assert_eq!(batch.num_columns(), 2); + + let col = batch.column(0); + let val = col.as_any().downcast_ref::().unwrap().value(0); + assert_eq!(val, "b"); + + let col = batch.column(1); + let val = col.as_any().downcast_ref::().unwrap().value(0); + assert_eq!(val, 3); + + // Should only have made 3 requests + assert_eq!(requests.lock().unwrap().len(), 3); + } }