diff --git a/examples/parquet_read.rs b/examples/parquet_read.rs index 0a58e9dcbd2..4fae152bfa3 100644 --- a/examples/parquet_read.rs +++ b/examples/parquet_read.rs @@ -1,46 +1,23 @@ use std::fs::File; -use std::io::BufReader; +use std::time::SystemTime; +use arrow2::error::Result; use arrow2::io::parquet::read; -use arrow2::{array::Array, error::Result}; - -fn read_field(path: &str, row_group: usize, field: usize) -> Result> { - // Open a file, a common operation in Rust - let mut file = BufReader::new(File::open(path)?); - - // Read the files' metadata. This has a small IO cost because it requires seeking to the end - // of the file to read its footer. - let metadata = read::read_metadata(&mut file)?; - - // Convert the files' metadata into an arrow schema. This is CPU-only and amounts to - // parse thrift if the arrow format is available on a key, or infering the arrow schema from - // the parquet's physical, converted and logical types. - let arrow_schema = read::get_schema(&metadata)?; - - // Created an iterator of column chunks. Each iteration - // yields an iterator of compressed pages. There is almost no CPU work in iterating. - let columns = read::get_column_iterator(&mut file, &metadata, row_group, field, None, vec![]); - - // get the columns' field - let field = &arrow_schema.fields[field]; - - // This is the actual work. In this case, pages are read and - // decompressed, decoded and deserialized to arrow. - // Because `columns` is an iterator, it uses a combination of IO and CPU. - let (array, _, _) = read::column_iter_to_array(columns, field, vec![])?; - - Ok(array) -} fn main() -> Result<()> { use std::env; let args: Vec = env::args().collect(); let file_path = &args[1]; - let field = args[2].parse::().unwrap(); - let row_group = args[3].parse::().unwrap(); - let array = read_field(file_path, row_group, field)?; - println!("{:?}", array); + let reader = File::open(file_path)?; + let reader = read::FileReader::try_new(reader, None, None, None, None)?; + + let start = SystemTime::now(); + for maybe_chunk in reader { + let columns = maybe_chunk?; + assert!(!columns.is_empty()); + } + println!("took: {} ms", start.elapsed().unwrap().as_millis()); Ok(()) } diff --git a/examples/parquet_read_parallel.rs b/examples/parquet_read_parallel.rs deleted file mode 100644 index da7dde95698..00000000000 --- a/examples/parquet_read_parallel.rs +++ /dev/null @@ -1,115 +0,0 @@ -use std::fs::File; -use std::sync::Arc; -use std::thread; -use std::time::SystemTime; - -use crossbeam_channel::unbounded; - -use arrow2::{ - array::Array, chunk::Chunk, error::Result, io::parquet::read, - io::parquet::read::MutStreamingIterator, -}; - -fn parallel_read(path: &str, row_group: usize) -> Result>> { - // prepare a channel to send compressed pages across threads. - let (tx, rx) = unbounded(); - - let mut file = File::open(path)?; - let file_metadata = read::read_metadata(&mut file)?; - let arrow_schema = Arc::new(read::get_schema(&file_metadata)?); - - let start = SystemTime::now(); - // spawn a thread to produce `Vec` (IO bounded) - let producer = thread::spawn(move || { - for (field_i, field) in file_metadata.schema().fields().iter().enumerate() { - let start = SystemTime::now(); - - let mut columns = read::get_column_iterator( - &mut file, - &file_metadata, - row_group, - field_i, - None, - vec![], - ); - - println!("produce start - field: {}", field_i); - - let mut column_chunks = vec![]; - while let read::State::Some(mut new_iter) = columns.advance().unwrap() { - if let Some((pages, metadata)) = new_iter.get() { - let pages = pages.collect::>(); - - column_chunks.push((pages, metadata.clone())); - } - columns = new_iter; - } - // todo: create API to allow sending each column (and not column chunks) to be processed in parallel - tx.send((field_i, field.clone(), column_chunks)).unwrap(); - println!( - "produce end - {:?}: {} {}", - start.elapsed().unwrap(), - field_i, - row_group - ); - } - }); - - // use 2 consumers for CPU-intensive to decompress, decode and deserialize. - #[allow(clippy::needless_collect)] // we need to collect to parallelize - let consumers = (0..2) - .map(|i| { - let rx_consumer = rx.clone(); - let arrow_schema_consumer = arrow_schema.clone(); - thread::spawn(move || { - let mut arrays = vec![]; - while let Ok((field_i, parquet_field, column_chunks)) = rx_consumer.recv() { - let start = SystemTime::now(); - let field = &arrow_schema_consumer.fields[field_i]; - println!("consumer {} start - {}", i, field_i); - - let columns = read::ReadColumnIterator::new(parquet_field, column_chunks); - - let array = read::column_iter_to_array(columns, field, vec![]).map(|x| x.0); - println!( - "consumer {} end - {:?}: {}", - i, - start.elapsed().unwrap(), - field_i - ); - - arrays.push((field_i, array)) - } - arrays - }) - }) - .collect::>(); - - producer.join().expect("producer thread panicked"); - - // collect all columns (join threads) - let mut columns = consumers - .into_iter() - .map(|x| x.join().unwrap()) - .flatten() - .map(|x| Ok((x.0, x.1?))) - .collect::)>>>()?; - // order may not be the same - columns.sort_unstable_by_key(|x| x.0); - let columns = columns.into_iter().map(|x| x.1.into()).collect(); - println!("Finished - {:?}", start.elapsed().unwrap()); - - Chunk::try_new(columns) -} - -fn main() -> Result<()> { - use std::env; - let args: Vec = env::args().collect(); - let file_path = &args[1]; - - let start = SystemTime::now(); - let batch = parallel_read(file_path, 0)?; - assert!(!batch.is_empty()); - println!("took: {} ms", start.elapsed().unwrap().as_millis()); - Ok(()) -} diff --git a/examples/parquet_read_parallel/src/main.rs b/examples/parquet_read_parallel/src/main.rs index 827343ee0f2..39b20fd32e5 100644 --- a/examples/parquet_read_parallel/src/main.rs +++ b/examples/parquet_read_parallel/src/main.rs @@ -6,61 +6,28 @@ use std::time::SystemTime; use rayon::prelude::*; -use arrow2::{ - array::Array, chunk::Chunk, error::Result, io::parquet::read, - io::parquet::read::MutStreamingIterator, -}; +use arrow2::{array::Array, chunk::Chunk, error::Result, io::parquet::read}; fn parallel_read(path: &str, row_group: usize) -> Result>> { let mut file = BufReader::new(File::open(path)?); - let file_metadata = read::read_metadata(&mut file)?; - let schema = read::get_schema(&file_metadata)?; + let metadata = read::read_metadata(&mut file)?; + let schema = read::get_schema(&metadata)?; - // IO-bounded - let columns = file_metadata - .schema() - .fields() - .iter() - .enumerate() - .map(|(field_i, field)| { - let start = SystemTime::now(); - println!("read start - field: {}", field_i); - let mut columns = read::get_column_iterator( - &mut file, - &file_metadata, - row_group, - field_i, - None, - vec![], - ); - - let mut column_chunks = vec![]; - while let read::State::Some(mut new_iter) = columns.advance().unwrap() { - if let Some((pages, metadata)) = new_iter.get() { - let pages = pages.collect::>(); - - column_chunks.push((pages, metadata.clone())); - } - columns = new_iter; - } - println!( - "read end - {:?}: {} {}", - start.elapsed().unwrap(), - field_i, - row_group - ); - (field_i, field.clone(), column_chunks) - }) - .collect::>(); + // read (IO-bounded) all columns into memory (use a subset of the fields to project) + let columns = read::read_columns( + &mut file, + &metadata.row_groups[row_group], + schema.fields, + None, + )?; // CPU-bounded let columns = columns .into_par_iter() - .map(|(field_i, parquet_field, column_chunks)| { - let columns = read::ReadColumnIterator::new(parquet_field, column_chunks); - let field = &schema.fields()[field_i]; - - read::column_iter_to_array(columns, field, vec![]).map(|x| x.0.into()) + .map(|mut iter| { + // when chunk_size != None, `iter` must be iterated multiple times to get all the chunks, + // and some synchronization is required to output a single `Chunk` per iterator + iter.next().unwrap() }) .collect::>>()?; diff --git a/examples/parquet_read_record.rs b/examples/parquet_read_record.rs deleted file mode 100644 index 4fae152bfa3..00000000000 --- a/examples/parquet_read_record.rs +++ /dev/null @@ -1,23 +0,0 @@ -use std::fs::File; -use std::time::SystemTime; - -use arrow2::error::Result; -use arrow2::io::parquet::read; - -fn main() -> Result<()> { - use std::env; - let args: Vec = env::args().collect(); - - let file_path = &args[1]; - - let reader = File::open(file_path)?; - let reader = read::FileReader::try_new(reader, None, None, None, None)?; - - let start = SystemTime::now(); - for maybe_chunk in reader { - let columns = maybe_chunk?; - assert!(!columns.is_empty()); - } - println!("took: {} ms", start.elapsed().unwrap().as_millis()); - Ok(()) -} diff --git a/src/io/parquet/read/binary/dictionary.rs b/src/io/parquet/read/binary/dictionary.rs index 7aa2fb734ad..c47ed4c4b73 100644 --- a/src/io/parquet/read/binary/dictionary.rs +++ b/src/io/parquet/read/binary/dictionary.rs @@ -14,6 +14,7 @@ use crate::{ use super::super::dictionary::*; use super::super::utils; use super::super::utils::Decoder; +use super::super::ArrayIter; use super::super::DataPages; /// An iterator adapter over [`DataPages`] assumed to be encoded as parquet's dictionary-encoded binary representation @@ -163,11 +164,7 @@ where } /// Converts [`DataPages`] to an [`Iterator`] of [`Array`] -pub fn iter_to_arrays<'a, K, O, I>( - iter: I, - data_type: DataType, - chunk_size: usize, -) -> Box>> + 'a> +pub fn iter_to_arrays<'a, K, O, I>(iter: I, data_type: DataType, chunk_size: usize) -> ArrayIter<'a> where I: 'a + DataPages, O: Offset, diff --git a/src/io/parquet/read/binary/mod.rs b/src/io/parquet/read/binary/mod.rs index c21f008bc70..6ed55565fcb 100644 --- a/src/io/parquet/read/binary/mod.rs +++ b/src/io/parquet/read/binary/mod.rs @@ -1,11 +1,3 @@ -use std::sync::Arc; - -use crate::{ - array::{Array, Offset}, - datatypes::{DataType, Field}, - error::Result, -}; - mod basic; mod dictionary; mod nested; @@ -13,18 +5,21 @@ mod utils; pub use dictionary::iter_to_arrays as iter_to_dict_arrays; -use self::basic::TraitBinaryArray; +use std::sync::Arc; +use crate::{ + array::{Array, Offset}, + datatypes::{DataType, Field}, +}; + +use self::basic::TraitBinaryArray; use self::nested::ArrayIterator; -use super::{nested_utils::NestedState, DataPages}; +use super::ArrayIter; +use super::{nested_utils::NestedArrayIter, DataPages}; use basic::BinaryArrayIterator; /// Converts [`DataPages`] to an [`Iterator`] of [`Array`] -pub fn iter_to_arrays<'a, O, A, I>( - iter: I, - data_type: DataType, - chunk_size: usize, -) -> Box>> + 'a> +pub fn iter_to_arrays<'a, O, A, I>(iter: I, data_type: DataType, chunk_size: usize) -> ArrayIter<'a> where I: 'a + DataPages, A: TraitBinaryArray, @@ -42,7 +37,7 @@ pub fn iter_to_arrays_nested<'a, O, A, I>( field: Field, data_type: DataType, chunk_size: usize, -) -> Box)>> + 'a> +) -> NestedArrayIter<'a> where I: 'a + DataPages, A: TraitBinaryArray, diff --git a/src/io/parquet/read/boolean/mod.rs b/src/io/parquet/read/boolean/mod.rs index 575764e9086..dc8e386b9bc 100644 --- a/src/io/parquet/read/boolean/mod.rs +++ b/src/io/parquet/read/boolean/mod.rs @@ -6,19 +6,15 @@ use std::sync::Arc; use crate::{ array::Array, datatypes::{DataType, Field}, - error::Result, }; use self::basic::BooleanArrayIterator; use self::nested::ArrayIterator; -use super::{nested_utils::NestedState, DataPages}; +use super::ArrayIter; +use super::{nested_utils::NestedArrayIter, DataPages}; /// Converts [`DataPages`] to an [`Iterator`] of [`Array`] -pub fn iter_to_arrays<'a, I: 'a>( - iter: I, - data_type: DataType, - chunk_size: usize, -) -> Box>> + 'a> +pub fn iter_to_arrays<'a, I: 'a>(iter: I, data_type: DataType, chunk_size: usize) -> ArrayIter<'a> where I: DataPages, { @@ -33,7 +29,7 @@ pub fn iter_to_arrays_nested<'a, I: 'a>( iter: I, field: Field, chunk_size: usize, -) -> Box)>> + 'a> +) -> NestedArrayIter<'a> where I: DataPages, { diff --git a/src/io/parquet/read/file.rs b/src/io/parquet/read/file.rs index 42ace4bbf44..420e5fc7158 100644 --- a/src/io/parquet/read/file.rs +++ b/src/io/parquet/read/file.rs @@ -1,37 +1,43 @@ use std::io::{Read, Seek}; use std::sync::Arc; -use parquet2::schema::types::ParquetType; - use crate::array::Array; use crate::chunk::Chunk; use crate::datatypes::Schema; -use crate::io::parquet::read::get_iterators; +use crate::io::parquet::read::read_columns; use crate::{ datatypes::Field, error::{ArrowError, Result}, }; -use super::{get_schema, read_metadata, FileMetaData, RowGroupMetaData, RowGroupReader}; +use super::{get_schema, read_metadata, FileMetaData, RowGroupDeserializer, RowGroupMetaData}; type GroupFilter = Arc bool>; -/// Single threaded iterator row groups of a paquet file. +/// An iterator of [`Chunk`] coming from row groups of a paquet file. +/// +/// This can be thought of flatten chain of [`Iterator`] - each row group is sequentially +/// mapped to an [`Iterator`] and each iterator is iterated upon until either the limit +/// or the last iterator ends. +/// +/// # Implementation +/// Note that because pub struct FileReader { - reader: R, - schema: Arc, - parquet_fields: Vec, - groups_filter: Option, + row_groups: RowGroupReader, metadata: FileMetaData, - current_group: usize, - chunk_size: Option, remaining_rows: usize, - current_row_group: Option, + current_row_group: Option, } impl FileReader { /// Creates a new [`FileReader`] by reading the metadata from `reader` and constructing /// Arrow's schema from it. + /// + /// # Error + /// This function errors iff: + /// * reading the metadata from the reader fails + /// * it is not possible to derive an arrow schema from the parquet file + /// * the projection contains columns that do not exist pub fn try_new( mut reader: R, projection: Option<&[usize]>, @@ -44,28 +50,22 @@ impl FileReader { let schema = get_schema(&metadata)?; let schema_metadata = schema.metadata; - let (fields, parquet_fields): (Vec, Vec) = - if let Some(projection) = &projection { - schema - .fields - .into_iter() - .zip(metadata.schema().fields().iter().cloned()) - .enumerate() - .filter_map(|(index, f)| { - if projection.iter().any(|&i| i == index) { - Some(f) - } else { - None - } - }) - .unzip() - } else { - schema - .fields - .into_iter() - .zip(metadata.schema().fields().iter().cloned()) - .unzip() - }; + let fields: Vec = if let Some(projection) = &projection { + schema + .fields + .into_iter() + .enumerate() + .filter_map(|(index, f)| { + if projection.iter().any(|&i| i == index) { + Some(f) + } else { + None + } + }) + .collect() + } else { + schema.fields.into_iter().collect() + }; if let Some(projection) = &projection { if fields.len() != projection.len() { @@ -76,27 +76,31 @@ impl FileReader { } } - let schema = Arc::new(Schema { + let schema = Schema { fields, metadata: schema_metadata, - }); + }; - Ok(Self { + let row_groups = RowGroupReader::new( reader, schema, - parquet_fields, groups_filter, - metadata, - current_group: 0, + metadata.row_groups.clone(), chunk_size, + limit, + ); + + Ok(Self { + row_groups, + metadata, remaining_rows: limit.unwrap_or(usize::MAX), current_row_group: None, }) } - /// Returns the [`Schema`] - pub fn schema(&self) -> &Arc { - &self.schema + /// Returns the derived arrow [`Schema`] of the file + pub fn schema(&self) -> &Schema { + &self.row_groups.schema } /// Returns parquet's [`FileMetaData`]. @@ -106,49 +110,19 @@ impl FileReader { /// Sets the groups filter pub fn set_groups_filter(&mut self, groups_filter: GroupFilter) { - self.groups_filter = Some(groups_filter); + self.row_groups.set_groups_filter(groups_filter); } - fn next_row_group(&mut self) -> Result> { - if self.schema.fields.is_empty() { - return Ok(None); - } - if self.current_group == self.metadata.row_groups.len() { - // reached the last row group - return Ok(None); - }; - if self.remaining_rows == 0 { - // reached the limit - return Ok(None); - } - - let current_row_group = self.current_group; - let row_group = &self.metadata.row_groups[current_row_group]; - if let Some(groups_filter) = self.groups_filter.as_ref() { - if !(groups_filter)(current_row_group, row_group) { - self.current_group += 1; - return self.next_row_group(); - } - } - self.current_group += 1; - - let column_chunks = get_iterators( - &mut self.reader, - &self.parquet_fields, - row_group, - self.schema.fields.clone(), - self.chunk_size, - )?; + fn next_row_group(&mut self) -> Result> { + let result = self.row_groups.next().transpose()?; - let result = RowGroupReader::new( - column_chunks, - row_group.num_rows() as usize, - Some(self.remaining_rows), + self.remaining_rows = self.remaining_rows.saturating_sub( + result + .as_ref() + .map(|x| x.num_rows()) + .unwrap_or(self.remaining_rows), ); - self.remaining_rows = self - .remaining_rows - .saturating_sub(row_group.num_rows() as usize); - Ok(Some(result)) + Ok(result) } } @@ -156,9 +130,6 @@ impl Iterator for FileReader { type Item = Result>>; fn next(&mut self) -> Option { - if self.schema.fields.is_empty() { - return None; - } if self.remaining_rows == 0 { // reached the limit return None; @@ -166,9 +137,11 @@ impl Iterator for FileReader { if let Some(row_group) = &mut self.current_row_group { match row_group.next() { + // no more chunks in the current row group => try a new one None => match self.next_row_group() { Ok(Some(row_group)) => { self.current_row_group = Some(row_group); + // new found => pull again self.next() } Ok(None) => { @@ -194,3 +167,99 @@ impl Iterator for FileReader { } } } + +/// An [`Iterator`] from row groups of a parquet file. +/// +/// # Implementation +/// Advancing this iterator is IO-bounded - each iteration reads all the column chunks from the file +/// to memory and attaches [`RowGroupDeserializer`] to them so that they can be iterated in chunks. +pub struct RowGroupReader { + reader: R, + schema: Schema, + groups_filter: Option, + row_groups: Vec, + chunk_size: Option, + remaining_rows: usize, + current_group: usize, +} + +impl RowGroupReader { + pub fn new( + reader: R, + schema: Schema, + groups_filter: Option, + row_groups: Vec, + chunk_size: Option, + limit: Option, + ) -> Self { + Self { + reader, + schema, + groups_filter, + row_groups, + chunk_size, + remaining_rows: limit.unwrap_or(usize::MAX), + current_group: 0, + } + } + + /// Sets the groups filter + pub fn set_groups_filter(&mut self, groups_filter: GroupFilter) { + self.groups_filter = Some(groups_filter); + } + + #[inline] + fn _next(&mut self) -> Result> { + if self.schema.fields.is_empty() { + return Ok(None); + } + if self.current_group == self.row_groups.len() { + // reached the last row group + return Ok(None); + }; + if self.remaining_rows == 0 { + // reached the limit + return Ok(None); + } + + let current_row_group = self.current_group; + let row_group = &self.row_groups[current_row_group]; + if let Some(groups_filter) = self.groups_filter.as_ref() { + if !(groups_filter)(current_row_group, row_group) { + self.current_group += 1; + return self._next(); + } + } + self.current_group += 1; + + let column_chunks = read_columns( + &mut self.reader, + row_group, + self.schema.fields.clone(), + self.chunk_size, + )?; + + let result = RowGroupDeserializer::new( + column_chunks, + row_group.num_rows() as usize, + Some(self.remaining_rows), + ); + self.remaining_rows = self + .remaining_rows + .saturating_sub(row_group.num_rows() as usize); + Ok(Some(result)) + } +} + +impl Iterator for RowGroupReader { + type Item = Result; + + fn next(&mut self) -> Option { + self._next().transpose() + } + + fn size_hint(&self) -> (usize, Option) { + let len = self.row_groups.len() - self.current_group; + (len, Some(len)) + } +} diff --git a/src/io/parquet/read/fixed_size_binary/dictionary.rs b/src/io/parquet/read/fixed_size_binary/dictionary.rs index 02a080421ce..b1dd9e4ef44 100644 --- a/src/io/parquet/read/fixed_size_binary/dictionary.rs +++ b/src/io/parquet/read/fixed_size_binary/dictionary.rs @@ -12,6 +12,7 @@ use crate::{ use super::super::dictionary::*; use super::super::utils; use super::super::utils::Decoder; +use super::super::ArrayIter; use super::super::DataPages; /// An iterator adapter over [`DataPages`] assumed to be encoded as parquet's dictionary-encoded binary representation @@ -139,11 +140,7 @@ where } /// Converts [`DataPages`] to an [`Iterator`] of [`Array`] -pub fn iter_to_arrays<'a, K, I>( - iter: I, - data_type: DataType, - chunk_size: usize, -) -> Box>> + 'a> +pub fn iter_to_arrays<'a, K, I>(iter: I, data_type: DataType, chunk_size: usize) -> ArrayIter<'a> where I: 'a + DataPages, K: DictionaryKey, diff --git a/src/io/parquet/read/mod.rs b/src/io/parquet/read/mod.rs index 0f564d3cf71..cb706234536 100644 --- a/src/io/parquet/read/mod.rs +++ b/src/io/parquet/read/mod.rs @@ -46,13 +46,19 @@ pub mod schema; pub mod statistics; mod utils; -pub use file::FileReader; +pub use file::{FileReader, RowGroupReader}; pub use row_group::*; pub(crate) use schema::is_type_nullable; pub use schema::{get_schema, FileMetaData}; -pub trait DataPages: FallibleStreamingIterator {} -impl> DataPages for I {} +pub trait DataPages: + FallibleStreamingIterator + Send + Sync +{ +} +impl + Send + Sync> DataPages + for I +{ +} /// Creates a new iterator of compressed pages. pub fn get_page_iterator( @@ -86,7 +92,7 @@ fn dict_read<'a, K: DictionaryKey, I: 'a + DataPages>( type_: &ParquetType, data_type: DataType, chunk_size: usize, -) -> Result>> + 'a>> { +) -> Result> { use DataType::*; let values_data_type = if let Dictionary(_, v, _) = &data_type { v.as_ref() @@ -207,7 +213,7 @@ fn page_iter_to_arrays<'a, I: 'a + DataPages>( type_: &ParquetType, field: Field, chunk_size: usize, -) -> Result>> + 'a>> { +) -> Result> { use DataType::*; match field.data_type.to_logical_type() { Null => Ok(null::iter_to_arrays(pages, field.data_type, chunk_size)), @@ -440,7 +446,7 @@ fn page_iter_to_arrays_nested<'a, I: 'a + DataPages>( field: Field, data_type: DataType, chunk_size: usize, -) -> Result>> + 'a>> { +) -> Result> { use DataType::*; let iter = match data_type { Boolean => boolean::iter_to_arrays_nested(pages, field.clone(), chunk_size), @@ -592,22 +598,19 @@ fn page_iter_to_arrays_nested<'a, I: 'a + DataPages>( Ok(Box::new(iter)) } -struct StructIterator { - iters: Vec>>>>, +struct StructIterator<'a> { + iters: Vec>, fields: Vec, } -impl StructIterator { - pub fn new( - iters: Vec>>>>, - fields: Vec, - ) -> Self { +impl<'a> StructIterator<'a> { + pub fn new(iters: Vec>, fields: Vec) -> Self { assert_eq!(iters.len(), fields.len()); Self { iters, fields } } } -impl Iterator for StructIterator { +impl<'a> Iterator for StructIterator<'a> { type Item = Result>; fn next(&mut self) -> Option { @@ -660,7 +663,7 @@ pub fn column_iter_to_arrays<'a, I: 'static>( types: Vec<&ParquetType>, field: &Field, chunk_size: usize, -) -> Result>> + 'a>> +) -> Result> where I: DataPages, { @@ -680,3 +683,5 @@ where iters.pop().unwrap() }) } + +pub type ArrayIter<'a> = Box>> + Send + Sync + 'a>; diff --git a/src/io/parquet/read/nested_utils.rs b/src/io/parquet/read/nested_utils.rs index bbb22602aad..d4ce5417c0c 100644 --- a/src/io/parquet/read/nested_utils.rs +++ b/src/io/parquet/read/nested_utils.rs @@ -18,7 +18,7 @@ use super::{ }; /// trait describing deserialized repetition and definition levels -pub trait Nested: std::fmt::Debug { +pub trait Nested: std::fmt::Debug + Send + Sync { fn inner(&mut self) -> (Buffer, Option); fn last_offset(&self) -> i64; @@ -635,3 +635,6 @@ where (None, Some(_), _) => unreachable!(), } } + +pub type NestedArrayIter<'a> = + Box)>> + Send + Sync + 'a>; diff --git a/src/io/parquet/read/null.rs b/src/io/parquet/read/null.rs index e0ae164096e..5bfb11d135b 100644 --- a/src/io/parquet/read/null.rs +++ b/src/io/parquet/read/null.rs @@ -1,18 +1,15 @@ use std::sync::Arc; -use super::DataPages; use crate::{ array::{Array, NullArray}, datatypes::DataType, - error::Result, }; +use super::ArrayIter; +use super::DataPages; + /// Converts [`DataPages`] to an [`Iterator`] of [`Array`] -pub fn iter_to_arrays<'a, I>( - mut iter: I, - data_type: DataType, - chunk_size: usize, -) -> Box>> + 'a> +pub fn iter_to_arrays<'a, I>(mut iter: I, data_type: DataType, chunk_size: usize) -> ArrayIter<'a> where I: 'a + DataPages, { diff --git a/src/io/parquet/read/primitive/dictionary.rs b/src/io/parquet/read/primitive/dictionary.rs index 0012c6a403f..f2fc336139c 100644 --- a/src/io/parquet/read/primitive/dictionary.rs +++ b/src/io/parquet/read/primitive/dictionary.rs @@ -13,6 +13,7 @@ use crate::{ use super::super::dictionary::*; use super::super::utils; use super::super::utils::Decoder; +use super::super::ArrayIter; use super::super::DataPages; /// An iterator adapter over [`DataPages`] assumed to be encoded as boolean arrays @@ -160,13 +161,13 @@ pub fn iter_to_arrays<'a, K, I, T, P, F>( data_type: DataType, chunk_size: usize, op: F, -) -> Box>> + 'a> +) -> ArrayIter<'a> where I: 'a + DataPages, K: DictionaryKey, T: NativeType, P: ParquetNativeType, - F: 'a + Copy + Fn(P) -> T, + F: 'a + Copy + Send + Sync + Fn(P) -> T, { Box::new( ArrayIterator::::new(iter, data_type, chunk_size, op) diff --git a/src/io/parquet/read/primitive/mod.rs b/src/io/parquet/read/primitive/mod.rs index 05979b8edc8..6604c6005ac 100644 --- a/src/io/parquet/read/primitive/mod.rs +++ b/src/io/parquet/read/primitive/mod.rs @@ -8,13 +8,14 @@ pub use utils::read_item; use std::sync::Arc; -use super::{nested_utils::*, DataPages}; use crate::{ array::Array, datatypes::{DataType, Field}, - error::Result, }; +use super::ArrayIter; +use super::{nested_utils::*, DataPages}; + use basic::PrimitiveArrayIterator; use nested::ArrayIterator; @@ -25,13 +26,13 @@ pub fn iter_to_arrays<'a, I, T, P, G, F>( chunk_size: usize, op1: G, op2: F, -) -> Box>> + 'a> +) -> ArrayIter<'a> where I: 'a + DataPages, T: crate::types::NativeType, P: parquet2::types::NativeType, - G: 'a + Copy + for<'b> Fn(&'b [u8]) -> P, - F: 'a + Copy + Fn(P) -> T, + G: 'a + Copy + Send + Sync + for<'b> Fn(&'b [u8]) -> P, + F: 'a + Copy + Send + Sync + Fn(P) -> T, { Box::new( PrimitiveArrayIterator::::new(iter, data_type, chunk_size, op1, op2) @@ -47,13 +48,13 @@ pub fn iter_to_arrays_nested<'a, I, T, P, G, F>( chunk_size: usize, op1: G, op2: F, -) -> Box)>> + 'a> +) -> NestedArrayIter<'a> where I: 'a + DataPages, T: crate::types::NativeType, P: parquet2::types::NativeType, - G: 'a + Copy + for<'b> Fn(&'b [u8]) -> P, - F: 'a + Copy + Fn(P) -> T, + G: 'a + Copy + Send + Sync + for<'b> Fn(&'b [u8]) -> P, + F: 'a + Copy + Send + Sync + Fn(P) -> T, { Box::new( ArrayIterator::::new(iter, field, data_type, chunk_size, op1, op2).map( diff --git a/src/io/parquet/read/row_group.rs b/src/io/parquet/read/row_group.rs index 1de42cce250..272d1f08f5e 100644 --- a/src/io/parquet/read/row_group.rs +++ b/src/io/parquet/read/row_group.rs @@ -6,7 +6,6 @@ use std::{ use parquet2::{ metadata::ColumnChunkMetaData, read::{BasicDecompressor, PageIterator}, - schema::types::ParquetType, }; use crate::{ @@ -14,19 +13,29 @@ use crate::{ io::parquet::read::column_iter_to_arrays, }; +use super::ArrayIter; use super::RowGroupMetaData; -pub struct RowGroupReader { +/// An [`Iterator`] of [`Chunk`] that (dynamically) adapts a vector of iterators of [`Array`] into +/// an iterator of [`Chunk`]. +/// +/// This struct tracks advances each of the iterators individually and combines the +/// result in a single [`Chunk`]. +/// +/// # Implementation +/// Advancing this iterator is CPU-bounded. +pub struct RowGroupDeserializer { + num_rows: usize, remaining_rows: usize, - column_chunks: Vec>>>>, + column_chunks: Vec>, } +/// Returns all the column metadata in `row_group` associated to `field_name`. fn get_field_columns<'a>( - row_group: &'a RowGroupMetaData, + columns: &'a [ColumnChunkMetaData], field_name: &str, ) -> Vec<&'a ColumnChunkMetaData> { - row_group - .columns() + columns .iter() .enumerate() .filter(|x| x.1.descriptor().path_in_schema()[0] == field_name) @@ -35,12 +44,12 @@ fn get_field_columns<'a>( } /// Reads all columns that are part of the parquet field `field_name` -pub fn read_columns<'a, R: Read + Seek>( +pub(super) fn _read_columns<'a, R: Read + Seek>( reader: &mut R, - row_group: &'a RowGroupMetaData, + columns: &'a [ColumnChunkMetaData], field_name: &str, ) -> Result)>> { - get_field_columns(row_group, field_name) + get_field_columns(columns, field_name) .into_iter() .map(|meta| { let (start, len) = meta.byte_range(); @@ -52,22 +61,26 @@ pub fn read_columns<'a, R: Read + Seek>( .collect() } -pub(super) fn get_iterators<'a, R: Read + Seek>( +/// Returns a vector of iterators of [`Array`] corresponding to the top level parquet fields whose +/// name matches `fields`'s names. +/// +/// This operation is IO-bounded `O(C)` where C is the number of columns in the row group - +/// it reads all the columns to memory from the row group associated to the requested fields. +pub fn read_columns<'a, R: Read + Seek>( reader: &mut R, - parquet_fields: &[ParquetType], row_group: &RowGroupMetaData, fields: Vec, chunk_size: Option, -) -> Result>> + 'a>>> { +) -> Result>> { let chunk_size = chunk_size .unwrap_or(usize::MAX) .min(row_group.num_rows() as usize); // reads all the necessary columns for all fields from the row group // This operation is IO-bounded `O(C)` where C is the number of columns in the row group - let columns = parquet_fields + let columns = fields .iter() - .map(|parquet_field| read_columns(reader, row_group, parquet_field.name())) + .map(|field| _read_columns(reader, row_group.columns(), &field.name)) .collect::>>()?; columns @@ -97,20 +110,31 @@ pub(super) fn get_iterators<'a, R: Read + Seek>( .collect() } -impl RowGroupReader { +impl RowGroupDeserializer { + /// Creates a new [`RowGroupDeserializer`]. + /// + /// # Panic + /// This function panics iff any of the `column_chunks` + /// do not return an array with an equal length. pub fn new( - column_chunks: Vec>>>>, + column_chunks: Vec>, num_rows: usize, limit: Option, ) -> Self { Self { + num_rows, remaining_rows: limit.unwrap_or(usize::MAX).min(num_rows), column_chunks, } } + + /// Returns the number of rows on this row group + pub fn num_rows(&self) -> usize { + self.num_rows + } } -impl Iterator for RowGroupReader { +impl Iterator for RowGroupDeserializer { type Item = Result>>; fn next(&mut self) -> Option { diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index c90023ba6df..c8e1495214b 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -659,7 +659,7 @@ fn integration_write(schema: &Schema, batches: &[Chunk>]) -> Resu Ok(writer.into_inner()) } -type IntegrationRead = (Arc, Vec>>); +type IntegrationRead = (Schema, Vec>>); fn integration_read(data: &[u8]) -> Result { let reader = Cursor::new(data); @@ -683,7 +683,7 @@ fn test_file(version: &str, file_name: &str) -> Result<()> { let (read_schema, read_batches) = integration_read(&data)?; - assert_eq!(&schema, read_schema.as_ref()); + assert_eq!(schema, read_schema); assert_eq!(batches, read_batches); Ok(()) @@ -748,7 +748,7 @@ fn arrow_type() -> Result<()> { let (new_schema, new_batches) = integration_read(&r)?; - assert_eq!(new_schema.as_ref(), &schema); + assert_eq!(new_schema, schema); assert_eq!(new_batches, vec![batch]); Ok(()) }