From 0fe5ac07577c0e1f3fb7f2d42a697a00170a65ba Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Mon, 27 Jun 2022 08:52:35 +0200 Subject: [PATCH 1/2] prevent double copy due to mmap and stop early with n_rows --- polars/polars-io/src/parquet/read.rs | 26 ++++++++- polars/polars-io/src/parquet/read_impl.rs | 56 +++++++++++++++---- .../physical_plan/executors/scan/parquet.rs | 2 +- 3 files changed, 70 insertions(+), 14 deletions(-) diff --git a/polars/polars-io/src/parquet/read.rs b/polars/polars-io/src/parquet/read.rs index f79311e99178..8dfcd416f626 100644 --- a/polars/polars-io/src/parquet/read.rs +++ b/polars/polars-io/src/parquet/read.rs @@ -6,13 +6,16 @@ use crate::prelude::*; use crate::RowCount; use arrow::io::parquet::read; use polars_core::prelude::*; +use std::fs::File; use std::io::{Read, Seek}; +use std::path::Path; use std::sync::Arc; /// Read Apache parquet format into a DataFrame. #[must_use] -pub struct ParquetReader { +pub struct ParquetReader<'a, R: Read + Seek> { reader: R, + path: Option<&'a Path>, rechunk: bool, n_rows: Option, columns: Option>, @@ -21,7 +24,18 @@ pub struct ParquetReader { row_count: Option, } -impl ParquetReader { +impl<'a> ParquetReader<'a, File> { + /// Create a new [`ParquetReader`] from a known `path`. + /// Prefer `from_path` over `new` as that is faster. + pub fn from_path(path: &'a Path) -> Result { + let file = std::fs::File::open(path)?; + let mut out = Self::new(file); + out.path = Some(path); + Ok(out) + } +} + +impl ParquetReader<'_, R> { #[cfg(feature = "lazy")] // todo! hoist to lazy crate pub fn finish_with_scan_ops( @@ -37,6 +51,7 @@ impl ParquetReader { let rechunk = self.rechunk; read_parquet( self.reader, + self.path, self.n_rows.unwrap_or(usize::MAX), projection, &schema, @@ -94,10 +109,14 @@ impl ParquetReader { } } -impl SerReader for ParquetReader { +impl SerReader for ParquetReader<'_, R> { + /// Create a new [`ParquetReader`] for an existing `Reader`. + /// If reading from a file, prefer [`ParquetReader::from_path`], this + /// is faster. fn new(reader: R) -> Self { ParquetReader { reader, + path: None, rechunk: false, n_rows: None, columns: None, @@ -122,6 +141,7 @@ impl SerReader for ParquetReader { read_parquet( self.reader, + self.path, self.n_rows.unwrap_or(usize::MAX), self.projection.as_deref(), &schema, diff --git a/polars/polars-io/src/parquet/read_impl.rs b/polars/polars-io/src/parquet/read_impl.rs index 7d5ffad23d05..65ca6d4698be 100644 --- a/polars/polars-io/src/parquet/read_impl.rs +++ b/polars/polars-io/src/parquet/read_impl.rs @@ -15,10 +15,33 @@ use std::borrow::Cow; use std::convert::TryFrom; use std::io::Cursor; use std::ops::Deref; +use std::path::Path; use std::sync::Arc; -fn array_iter_to_series(iter: ArrayIter, field: &ArrowField) -> Result { - let chunks = iter.collect::>>()?; +fn array_iter_to_series( + iter: ArrayIter, + field: &ArrowField, + num_rows: Option, +) -> Result { + let mut total_count = 0; + let chunks = match num_rows { + None => iter.collect::>>()?, + Some(n) => { + let mut out = Vec::with_capacity(2); + + for arr in iter { + let arr = arr?; + let len = arr.len(); + out.push(arr); + + total_count += len; + if total_count >= n { + break; + } + } + out + } + }; if chunks.is_empty() { let arr = new_empty_array(field.data_type.clone()); Series::try_from((field.name.as_str(), arr)) @@ -29,19 +52,18 @@ fn array_iter_to_series(iter: ArrayIter, field: &ArrowField) -> Result { #[allow(clippy::too_many_arguments)] pub fn read_parquet( - reader: R, + mut reader: R, + path: Option<&Path>, limit: usize, projection: Option<&[usize]>, schema: &ArrowSchema, metadata: Option, predicate: Option>, aggregate: Option<&[ScanAggregation]>, - parallel: bool, + mut parallel: bool, row_count: Option, ) -> Result { - let reader = ReaderBytes::from(&reader); - let bytes = reader.deref(); - let mut reader = Cursor::new(bytes); + let file = reader.to_file(); let file_metadata = metadata .map(Ok) @@ -52,6 +74,10 @@ pub fn read_parquet( .map(Cow::Borrowed) .unwrap_or_else(|| Cow::Owned((0usize..schema.fields.len()).collect::>())); + if projection.len() == 1 || path.is_none() { + parallel = false; + } + let mut dfs = Vec::with_capacity(row_group_len); let mut remaining_rows = limit; @@ -83,11 +109,13 @@ pub fn read_parquet( let chunk_size = md.num_rows() as usize; let columns = if parallel { + let file_name = path.unwrap(); + POOL.install(|| { projection .par_iter() .map(|column_i| { - let mut reader = Cursor::new(bytes); + let mut reader = std::fs::File::open(file_name).unwrap(); let field = &schema.fields[*column_i]; let columns = read::read_columns(&mut reader, md.columns(), &field.name)?; let iter = to_deserializer( @@ -97,7 +125,11 @@ pub fn read_parquet( Some(chunk_size), )?; - array_iter_to_series(iter, field) + if remaining_rows < md.num_rows() { + array_iter_to_series(iter, field, Some(remaining_rows)) + } else { + array_iter_to_series(iter, field, None) + } }) .collect::>>() })? @@ -110,7 +142,11 @@ pub fn read_parquet( let iter = to_deserializer(columns, field.clone(), remaining_rows, Some(chunk_size))?; - array_iter_to_series(iter, field) + if remaining_rows < md.num_rows() { + array_iter_to_series(iter, field, Some(remaining_rows)) + } else { + array_iter_to_series(iter, field, None) + } }) .collect::>>()? }; diff --git a/polars/polars-lazy/src/physical_plan/executors/scan/parquet.rs b/polars/polars-lazy/src/physical_plan/executors/scan/parquet.rs index 990f9e110311..0e188bf9e2b0 100644 --- a/polars/polars-lazy/src/physical_plan/executors/scan/parquet.rs +++ b/polars/polars-lazy/src/physical_plan/executors/scan/parquet.rs @@ -36,7 +36,7 @@ impl ParquetExec { &self.aggregate, ); - ParquetReader::new(file) + ParquetReader::from_path(self.path.as_path())? .with_n_rows(n_rows) .read_parallel(self.options.parallel) .with_row_count(std::mem::take(&mut self.options.row_count)) From 4b8ec86d738dff431121139efb79d433be6e8f89 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Mon, 27 Jun 2022 09:10:53 +0200 Subject: [PATCH 2/2] improve parquet reading performance --- polars/polars-io/src/parquet/mmap.rs | 66 +++++++++++++++++++ polars/polars-io/src/parquet/mod.rs | 1 + polars/polars-io/src/parquet/read.rs | 27 ++------ polars/polars-io/src/parquet/read_impl.rs | 31 ++++----- .../physical_plan/executors/scan/parquet.rs | 2 +- 5 files changed, 88 insertions(+), 39 deletions(-) create mode 100644 polars/polars-io/src/parquet/mmap.rs diff --git a/polars/polars-io/src/parquet/mmap.rs b/polars/polars-io/src/parquet/mmap.rs new file mode 100644 index 000000000000..2ecdab50c019 --- /dev/null +++ b/polars/polars-io/src/parquet/mmap.rs @@ -0,0 +1,66 @@ +use super::*; +use arrow::datatypes::Field; +use arrow::io::parquet::read::{ + column_iter_to_arrays, ArrayIter, BasicDecompressor, ColumnChunkMetaData, PageReader, +}; + +// TODO! make public in arrow2? +pub(super) fn get_field_columns<'a>( + columns: &'a [ColumnChunkMetaData], + field_name: &str, +) -> Vec<&'a ColumnChunkMetaData> { + columns + .iter() + .filter(|x| x.descriptor().path_in_schema[0] == field_name) + .collect() +} + +/// memory maps all columns that are part of the parquet field `field_name` +pub(super) fn mmap_columns<'a>( + file: &'a [u8], + columns: &'a [ColumnChunkMetaData], + field_name: &str, +) -> Vec<(&'a ColumnChunkMetaData, &'a [u8])> { + get_field_columns(columns, field_name) + .into_iter() + .map(|meta| _mmap_single_column(file, meta)) + .collect() +} + +fn _mmap_single_column<'a>( + file: &'a [u8], + meta: &'a ColumnChunkMetaData, +) -> (&'a ColumnChunkMetaData, &'a [u8]) { + let (start, len) = meta.byte_range(); + let chunk = &file[start as usize..(start + len) as usize]; + (meta, chunk) +} + +// similar to arrow2 serializer, except this accepts a slice instead of a vec. +// this allows use to memory map +pub(super) fn to_deserializer<'a>( + columns: Vec<(&ColumnChunkMetaData, &'a [u8])>, + field: Field, + num_rows: usize, + chunk_size: Option, +) -> ArrowResult> { + let chunk_size = chunk_size.unwrap_or(usize::MAX).min(num_rows); + + let (columns, types): (Vec<_>, Vec<_>) = columns + .into_iter() + .map(|(column_meta, chunk)| { + let pages = PageReader::new( + std::io::Cursor::new(chunk), + column_meta, + std::sync::Arc::new(|_, _| true), + vec![], + ); + ( + BasicDecompressor::new(pages, vec![]), + &column_meta.descriptor().descriptor.primitive_type, + ) + }) + .unzip(); + + column_iter_to_arrays(columns, types, field, Some(chunk_size)) +} diff --git a/polars/polars-io/src/parquet/mod.rs b/polars/polars-io/src/parquet/mod.rs index 90fb9bebd416..de0e12bca760 100644 --- a/polars/polars-io/src/parquet/mod.rs +++ b/polars/polars-io/src/parquet/mod.rs @@ -14,6 +14,7 @@ //! } //! ``` //! +pub(super) mod mmap; pub mod predicates; mod read; mod read_impl; diff --git a/polars/polars-io/src/parquet/read.rs b/polars/polars-io/src/parquet/read.rs index 8dfcd416f626..6e0a7942e136 100644 --- a/polars/polars-io/src/parquet/read.rs +++ b/polars/polars-io/src/parquet/read.rs @@ -6,16 +6,13 @@ use crate::prelude::*; use crate::RowCount; use arrow::io::parquet::read; use polars_core::prelude::*; -use std::fs::File; use std::io::{Read, Seek}; -use std::path::Path; use std::sync::Arc; /// Read Apache parquet format into a DataFrame. #[must_use] -pub struct ParquetReader<'a, R: Read + Seek> { +pub struct ParquetReader { reader: R, - path: Option<&'a Path>, rechunk: bool, n_rows: Option, columns: Option>, @@ -24,18 +21,7 @@ pub struct ParquetReader<'a, R: Read + Seek> { row_count: Option, } -impl<'a> ParquetReader<'a, File> { - /// Create a new [`ParquetReader`] from a known `path`. - /// Prefer `from_path` over `new` as that is faster. - pub fn from_path(path: &'a Path) -> Result { - let file = std::fs::File::open(path)?; - let mut out = Self::new(file); - out.path = Some(path); - Ok(out) - } -} - -impl ParquetReader<'_, R> { +impl ParquetReader { #[cfg(feature = "lazy")] // todo! hoist to lazy crate pub fn finish_with_scan_ops( @@ -51,7 +37,6 @@ impl ParquetReader<'_, R> { let rechunk = self.rechunk; read_parquet( self.reader, - self.path, self.n_rows.unwrap_or(usize::MAX), projection, &schema, @@ -109,14 +94,11 @@ impl ParquetReader<'_, R> { } } -impl SerReader for ParquetReader<'_, R> { - /// Create a new [`ParquetReader`] for an existing `Reader`. - /// If reading from a file, prefer [`ParquetReader::from_path`], this - /// is faster. +impl SerReader for ParquetReader { + /// Create a new [`ParquetReader`] from an existing `Reader`. fn new(reader: R) -> Self { ParquetReader { reader, - path: None, rechunk: false, n_rows: None, columns: None, @@ -141,7 +123,6 @@ impl SerReader for ParquetReader<'_, R> { read_parquet( self.reader, - self.path, self.n_rows.unwrap_or(usize::MAX), self.projection.as_deref(), &schema, diff --git a/polars/polars-io/src/parquet/read_impl.rs b/polars/polars-io/src/parquet/read_impl.rs index 65ca6d4698be..1a2553f5d2b6 100644 --- a/polars/polars-io/src/parquet/read_impl.rs +++ b/polars/polars-io/src/parquet/read_impl.rs @@ -1,21 +1,21 @@ use crate::aggregations::{apply_aggregations, ScanAggregation}; use crate::mmap::{MmapBytesReader, ReaderBytes}; +use crate::parquet::mmap; +use crate::parquet::mmap::mmap_columns; use crate::parquet::predicates::collect_statistics; use crate::predicates::{apply_predicate, arrow_schema_to_empty_df, PhysicalIoExpr}; use crate::utils::apply_projection; use crate::RowCount; use arrow::array::new_empty_array; use arrow::io::parquet::read; -use arrow::io::parquet::read::{to_deserializer, ArrayIter, FileMetaData}; +use arrow::io::parquet::read::{ArrayIter, FileMetaData}; use polars_core::prelude::*; use polars_core::utils::accumulate_dataframes_vertical; use polars_core::POOL; use rayon::prelude::*; use std::borrow::Cow; use std::convert::TryFrom; -use std::io::Cursor; use std::ops::Deref; -use std::path::Path; use std::sync::Arc; fn array_iter_to_series( @@ -53,7 +53,6 @@ fn array_iter_to_series( #[allow(clippy::too_many_arguments)] pub fn read_parquet( mut reader: R, - path: Option<&Path>, limit: usize, projection: Option<&[usize]>, schema: &ArrowSchema, @@ -63,8 +62,6 @@ pub fn read_parquet( mut parallel: bool, row_count: Option, ) -> Result { - let file = reader.to_file(); - let file_metadata = metadata .map(Ok) .unwrap_or_else(|| read::read_metadata(&mut reader))?; @@ -74,7 +71,7 @@ pub fn read_parquet( .map(Cow::Borrowed) .unwrap_or_else(|| Cow::Owned((0usize..schema.fields.len()).collect::>())); - if projection.len() == 1 || path.is_none() { + if projection.len() == 1 { parallel = false; } @@ -82,6 +79,9 @@ pub fn read_parquet( let mut remaining_rows = limit; + let reader = ReaderBytes::from(&reader); + let bytes = reader.deref(); + let mut previous_row_count = 0; for rg in 0..row_group_len { let md = &file_metadata.row_groups[rg]; @@ -109,16 +109,13 @@ pub fn read_parquet( let chunk_size = md.num_rows() as usize; let columns = if parallel { - let file_name = path.unwrap(); - POOL.install(|| { projection .par_iter() .map(|column_i| { - let mut reader = std::fs::File::open(file_name).unwrap(); let field = &schema.fields[*column_i]; - let columns = read::read_columns(&mut reader, md.columns(), &field.name)?; - let iter = to_deserializer( + let columns = mmap_columns(bytes, md.columns(), &field.name); + let iter = mmap::to_deserializer( columns, field.clone(), remaining_rows, @@ -138,9 +135,13 @@ pub fn read_parquet( .iter() .map(|column_i| { let field = &schema.fields[*column_i]; - let columns = read::read_columns(&mut reader, md.columns(), &field.name)?; - let iter = - to_deserializer(columns, field.clone(), remaining_rows, Some(chunk_size))?; + let columns = mmap_columns(bytes, md.columns(), &field.name); + let iter = mmap::to_deserializer( + columns, + field.clone(), + remaining_rows, + Some(chunk_size), + )?; if remaining_rows < md.num_rows() { array_iter_to_series(iter, field, Some(remaining_rows)) diff --git a/polars/polars-lazy/src/physical_plan/executors/scan/parquet.rs b/polars/polars-lazy/src/physical_plan/executors/scan/parquet.rs index 0e188bf9e2b0..990f9e110311 100644 --- a/polars/polars-lazy/src/physical_plan/executors/scan/parquet.rs +++ b/polars/polars-lazy/src/physical_plan/executors/scan/parquet.rs @@ -36,7 +36,7 @@ impl ParquetExec { &self.aggregate, ); - ParquetReader::from_path(self.path.as_path())? + ParquetReader::new(file) .with_n_rows(n_rows) .read_parallel(self.options.parallel) .with_row_count(std::mem::take(&mut self.options.row_count))