diff --git a/polars/polars-io/src/parquet/mmap.rs b/polars/polars-io/src/parquet/mmap.rs new file mode 100644 index 000000000000..2ecdab50c019 --- /dev/null +++ b/polars/polars-io/src/parquet/mmap.rs @@ -0,0 +1,66 @@ +use super::*; +use arrow::datatypes::Field; +use arrow::io::parquet::read::{ + column_iter_to_arrays, ArrayIter, BasicDecompressor, ColumnChunkMetaData, PageReader, +}; + +// TODO! make public in arrow2? +pub(super) fn get_field_columns<'a>( + columns: &'a [ColumnChunkMetaData], + field_name: &str, +) -> Vec<&'a ColumnChunkMetaData> { + columns + .iter() + .filter(|x| x.descriptor().path_in_schema[0] == field_name) + .collect() +} + +/// memory maps all columns that are part of the parquet field `field_name` +pub(super) fn mmap_columns<'a>( + file: &'a [u8], + columns: &'a [ColumnChunkMetaData], + field_name: &str, +) -> Vec<(&'a ColumnChunkMetaData, &'a [u8])> { + get_field_columns(columns, field_name) + .into_iter() + .map(|meta| _mmap_single_column(file, meta)) + .collect() +} + +fn _mmap_single_column<'a>( + file: &'a [u8], + meta: &'a ColumnChunkMetaData, +) -> (&'a ColumnChunkMetaData, &'a [u8]) { + let (start, len) = meta.byte_range(); + let chunk = &file[start as usize..(start + len) as usize]; + (meta, chunk) +} + +// similar to arrow2 serializer, except this accepts a slice instead of a vec. +// this allows use to memory map +pub(super) fn to_deserializer<'a>( + columns: Vec<(&ColumnChunkMetaData, &'a [u8])>, + field: Field, + num_rows: usize, + chunk_size: Option, +) -> ArrowResult> { + let chunk_size = chunk_size.unwrap_or(usize::MAX).min(num_rows); + + let (columns, types): (Vec<_>, Vec<_>) = columns + .into_iter() + .map(|(column_meta, chunk)| { + let pages = PageReader::new( + std::io::Cursor::new(chunk), + column_meta, + std::sync::Arc::new(|_, _| true), + vec![], + ); + ( + BasicDecompressor::new(pages, vec![]), + &column_meta.descriptor().descriptor.primitive_type, + ) + }) + .unzip(); + + column_iter_to_arrays(columns, types, field, Some(chunk_size)) +} diff --git a/polars/polars-io/src/parquet/mod.rs b/polars/polars-io/src/parquet/mod.rs index 90fb9bebd416..de0e12bca760 100644 --- a/polars/polars-io/src/parquet/mod.rs +++ b/polars/polars-io/src/parquet/mod.rs @@ -14,6 +14,7 @@ //! } //! ``` //! +pub(super) mod mmap; pub mod predicates; mod read; mod read_impl; diff --git a/polars/polars-io/src/parquet/read.rs b/polars/polars-io/src/parquet/read.rs index f79311e99178..6e0a7942e136 100644 --- a/polars/polars-io/src/parquet/read.rs +++ b/polars/polars-io/src/parquet/read.rs @@ -95,6 +95,7 @@ impl ParquetReader { } impl SerReader for ParquetReader { + /// Create a new [`ParquetReader`] from an existing `Reader`. fn new(reader: R) -> Self { ParquetReader { reader, diff --git a/polars/polars-io/src/parquet/read_impl.rs b/polars/polars-io/src/parquet/read_impl.rs index 7d5ffad23d05..1a2553f5d2b6 100644 --- a/polars/polars-io/src/parquet/read_impl.rs +++ b/polars/polars-io/src/parquet/read_impl.rs @@ -1,24 +1,47 @@ use crate::aggregations::{apply_aggregations, ScanAggregation}; use crate::mmap::{MmapBytesReader, ReaderBytes}; +use crate::parquet::mmap; +use crate::parquet::mmap::mmap_columns; use crate::parquet::predicates::collect_statistics; use crate::predicates::{apply_predicate, arrow_schema_to_empty_df, PhysicalIoExpr}; use crate::utils::apply_projection; use crate::RowCount; use arrow::array::new_empty_array; use arrow::io::parquet::read; -use arrow::io::parquet::read::{to_deserializer, ArrayIter, FileMetaData}; +use arrow::io::parquet::read::{ArrayIter, FileMetaData}; use polars_core::prelude::*; use polars_core::utils::accumulate_dataframes_vertical; use polars_core::POOL; use rayon::prelude::*; use std::borrow::Cow; use std::convert::TryFrom; -use std::io::Cursor; use std::ops::Deref; use std::sync::Arc; -fn array_iter_to_series(iter: ArrayIter, field: &ArrowField) -> Result { - let chunks = iter.collect::>>()?; +fn array_iter_to_series( + iter: ArrayIter, + field: &ArrowField, + num_rows: Option, +) -> Result { + let mut total_count = 0; + let chunks = match num_rows { + None => iter.collect::>>()?, + Some(n) => { + let mut out = Vec::with_capacity(2); + + for arr in iter { + let arr = arr?; + let len = arr.len(); + out.push(arr); + + total_count += len; + if total_count >= n { + break; + } + } + out + } + }; if chunks.is_empty() { let arr = new_empty_array(field.data_type.clone()); Series::try_from((field.name.as_str(), arr)) @@ -29,20 +52,16 @@ fn array_iter_to_series(iter: ArrayIter, field: &ArrowField) -> Result { #[allow(clippy::too_many_arguments)] pub fn read_parquet( - reader: R, + mut reader: R, limit: usize, projection: Option<&[usize]>, schema: &ArrowSchema, metadata: Option, predicate: Option>, aggregate: Option<&[ScanAggregation]>, - parallel: bool, + mut parallel: bool, row_count: Option, ) -> Result { - let reader = ReaderBytes::from(&reader); - let bytes = reader.deref(); - let mut reader = Cursor::new(bytes); - let file_metadata = metadata .map(Ok) .unwrap_or_else(|| read::read_metadata(&mut reader))?; @@ -52,10 +71,17 @@ pub fn read_parquet( .map(Cow::Borrowed) .unwrap_or_else(|| Cow::Owned((0usize..schema.fields.len()).collect::>())); + if projection.len() == 1 { + parallel = false; + } + let mut dfs = Vec::with_capacity(row_group_len); let mut remaining_rows = limit; + let reader = ReaderBytes::from(&reader); + let bytes = reader.deref(); + let mut previous_row_count = 0; for rg in 0..row_group_len { let md = &file_metadata.row_groups[rg]; @@ -87,17 +113,20 @@ pub fn read_parquet( projection .par_iter() .map(|column_i| { - let mut reader = Cursor::new(bytes); let field = &schema.fields[*column_i]; - let columns = read::read_columns(&mut reader, md.columns(), &field.name)?; - let iter = to_deserializer( + let columns = mmap_columns(bytes, md.columns(), &field.name); + let iter = mmap::to_deserializer( columns, field.clone(), remaining_rows, Some(chunk_size), )?; - array_iter_to_series(iter, field) + if remaining_rows < md.num_rows() { + array_iter_to_series(iter, field, Some(remaining_rows)) + } else { + array_iter_to_series(iter, field, None) + } }) .collect::>>() })? @@ -106,11 +135,19 @@ pub fn read_parquet( .iter() .map(|column_i| { let field = &schema.fields[*column_i]; - let columns = read::read_columns(&mut reader, md.columns(), &field.name)?; - let iter = - to_deserializer(columns, field.clone(), remaining_rows, Some(chunk_size))?; - - array_iter_to_series(iter, field) + let columns = mmap_columns(bytes, md.columns(), &field.name); + let iter = mmap::to_deserializer( + columns, + field.clone(), + remaining_rows, + Some(chunk_size), + )?; + + if remaining_rows < md.num_rows() { + array_iter_to_series(iter, field, Some(remaining_rows)) + } else { + array_iter_to_series(iter, field, None) + } }) .collect::>>()? };