diff --git a/Cargo.toml b/Cargo.toml index 986c30a0f9c..34fc565bc33 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -66,7 +66,9 @@ futures = { version = "0.3", optional = true } # for faster hashing ahash = { version = "0.7", optional = true } -parquet2 = { version = "0.6", optional = true, default_features = false, features = ["stream"] } +#parquet2 = { version = "0.6", optional = true, default_features = false, features = ["stream"] } +parquet2 = { git = "https://github.com/jorgecarleitao/parquet2", branch = "struct", optional = true, default_features = false, features = ["stream"] } +#parquet2 = { path = "../parquet2", optional = true, default_features = false, features = ["stream"] } avro-rs = { version = "0.13", optional = true, default_features = false } diff --git a/examples/parquet_read.rs b/examples/parquet_read.rs index 8ab3ccdac2c..4caa97d60ee 100644 --- a/examples/parquet_read.rs +++ b/examples/parquet_read.rs @@ -4,38 +4,32 @@ use std::io::BufReader; use arrow2::io::parquet::read; use arrow2::{array::Array, error::Result}; -fn read_column_chunk(path: &str, row_group: usize, column: usize) -> Result> { +fn read_field(path: &str, row_group: usize, field: usize) -> Result> { // Open a file, a common operation in Rust let mut file = BufReader::new(File::open(path)?); // Read the files' metadata. This has a small IO cost because it requires seeking to the end // of the file to read its footer. - let file_metadata = read::read_metadata(&mut file)?; + let metadata = read::read_metadata(&mut file)?; // Convert the files' metadata into an arrow schema. This is CPU-only and amounts to // parse thrift if the arrow format is available on a key, or infering the arrow schema from // the parquet's physical, converted and logical types. - let arrow_schema = read::get_schema(&file_metadata)?; + let arrow_schema = read::get_schema(&metadata)?; - // get the columns' metadata - let metadata = file_metadata.row_groups[row_group].column(column); - - // Construct an iterator over pages. This binds `file` to this iterator, and each iteration - // is IO intensive as it will read a compressed page into memory. There is almost no CPU work - // on this operation - let pages = read::get_page_iterator(metadata, &mut file, None, vec![])?; + // Created an iterator of column chunks. Each iteration + // yields an iterator of compressed pages. There is almost no CPU work in iterating. + let columns = read::get_column_iterator(&mut file, &metadata, row_group, field, None, vec![]); // get the columns' logical type - let data_type = arrow_schema.fields()[column].data_type().clone(); + let data_type = arrow_schema.fields()[field].data_type().clone(); - // This is the actual work. In this case, pages are read (by calling `iter.next()`) and are - // immediately decompressed, decoded, deserialized to arrow and deallocated. - // This uses a combination of IO and CPU. At this point, `array` is the arrow-corresponding - // array of the parquets' physical type. - // `Decompressor` re-uses an internal buffer for de-compression, thereby maximizing memory re-use. - let mut pages = read::Decompressor::new(pages, vec![]); + // This is the actual work. In this case, pages are read and + // decompressed, decoded and deserialized to arrow. + // Because `columns` is an iterator, it uses a combination of IO and CPU. + let (array, _, _) = read::column_iter_to_array(columns, data_type, vec![])?; - read::page_iter_to_array(&mut pages, metadata, data_type) + Ok(array) } fn main() -> Result<()> { @@ -43,10 +37,10 @@ fn main() -> Result<()> { let args: Vec = env::args().collect(); let file_path = &args[1]; - let column = args[2].parse::().unwrap(); + let field = args[2].parse::().unwrap(); let row_group = args[3].parse::().unwrap(); - let array = read_column_chunk(file_path, row_group, column)?; + let array = read_field(file_path, row_group, field)?; println!("{}", array); Ok(()) } diff --git a/examples/parquet_read_parallel.rs b/examples/parquet_read_parallel.rs index 9dead6c43ac..532a735894e 100644 --- a/examples/parquet_read_parallel.rs +++ b/examples/parquet_read_parallel.rs @@ -5,77 +5,98 @@ use std::sync::Arc; use std::thread; use std::time::SystemTime; -use arrow2::{array::Array, error::Result, io::parquet::read}; +use arrow2::{ + array::Array, error::Result, io::parquet::read, io::parquet::read::MutStreamingIterator, + record_batch::RecordBatch, +}; -fn parallel_read(path: &str) -> Result>> { - // prepare a channel to send serialized records from threads +fn parallel_read(path: &str, row_group: usize) -> Result { + // prepare a channel to send compressed pages across threads. let (tx, rx) = unbounded(); let mut file = File::open(path)?; let file_metadata = read::read_metadata(&mut file)?; let arrow_schema = Arc::new(read::get_schema(&file_metadata)?); - let file_metadata = Arc::new(file_metadata); - let start = SystemTime::now(); // spawn a thread to produce `Vec` (IO bounded) - let producer_metadata = file_metadata.clone(); - let child = thread::spawn(move || { - for column in 0..producer_metadata.schema().num_columns() { - for row_group in 0..producer_metadata.row_groups.len() { - let start = SystemTime::now(); - let column_metadata = producer_metadata.row_groups[row_group].column(column); - println!("produce start: {} {}", column, row_group); - let pages = read::get_page_iterator(column_metadata, &mut file, None, vec![]) - .unwrap() - .collect::>(); - println!( - "produce end - {:?}: {} {}", - start.elapsed().unwrap(), - column, - row_group - ); - tx.send((column, row_group, pages)).unwrap(); - } - } - }); - - let mut children = Vec::new(); - // use 3 consumers of to decompress, decode and deserialize. - for _ in 0..3 { - let rx_consumer = rx.clone(); - let metadata_consumer = file_metadata.clone(); - let arrow_schema_consumer = arrow_schema.clone(); - let child = thread::spawn(move || { - let (column, row_group, pages) = rx_consumer.recv().unwrap(); + let producer = thread::spawn(move || { + for field in 0..file_metadata.schema().fields().len() { let start = SystemTime::now(); - println!("consumer start - {} {}", column, row_group); - let metadata = metadata_consumer.row_groups[row_group].column(column); - let data_type = arrow_schema_consumer.fields()[column].data_type().clone(); - let mut pages = read::BasicDecompressor::new(pages.into_iter(), vec![]); + let mut columns = read::get_column_iterator( + &mut file, + &file_metadata, + row_group, + field, + None, + vec![], + ); + + println!("produce start - field: {}", field); + + while let read::State::Some(mut new_iter) = columns.advance().unwrap() { + if let Some((pages, metadata)) = new_iter.get() { + let pages = pages.collect::>(); - let array = read::page_iter_to_array(&mut pages, metadata, data_type); + tx.send((field, metadata.clone(), pages)).unwrap(); + } + columns = new_iter; + } println!( - "consumer end - {:?}: {} {}", + "produce end - {:?}: {} {}", start.elapsed().unwrap(), - column, + field, row_group ); - array - }); - children.push(child); - } + } + }); + + // use 2 consumers for CPU-intensive to decompress, decode and deserialize. + #[allow(clippy::needless_collect)] // we need to collect to parallelize + let consumers = (0..2) + .map(|i| { + let rx_consumer = rx.clone(); + let arrow_schema_consumer = arrow_schema.clone(); + thread::spawn(move || { + let mut arrays = vec![]; + while let Ok((field, metadata, pages)) = rx_consumer.recv() { + let start = SystemTime::now(); + let data_type = arrow_schema_consumer.fields()[field].data_type().clone(); + println!("consumer {} start - {}", i, field); + + let mut pages = read::BasicDecompressor::new(pages.into_iter(), vec![]); + + let array = read::page_iter_to_array(&mut pages, &metadata, data_type); + println!( + "consumer {} end - {:?}: {}", + i, + start.elapsed().unwrap(), + field + ); + + arrays.push((field, array)) + } + arrays + }) + }) + .collect::>(); - child.join().expect("child thread panicked"); + producer.join().expect("producer thread panicked"); - let arrays = children + // collect all columns (join threads) + let mut columns = consumers .into_iter() .map(|x| x.join().unwrap()) - .collect::>>()?; + .flatten() + .map(|x| Ok((x.0, x.1?))) + .collect::)>>>()?; + // order may not be the same + columns.sort_unstable_by_key(|x| x.0); + let columns = columns.into_iter().map(|x| x.1.into()).collect(); println!("Finished - {:?}", start.elapsed().unwrap()); - Ok(arrays) + RecordBatch::try_new(arrow_schema, columns) } fn main() -> Result<()> { @@ -83,8 +104,8 @@ fn main() -> Result<()> { let args: Vec = env::args().collect(); let file_path = &args[1]; - let arrays = parallel_read(file_path)?; - for array in arrays { + let batch = parallel_read(file_path, 0)?; + for array in batch.columns() { println!("{}", array) } Ok(()) diff --git a/parquet_integration/write_parquet.py b/parquet_integration/write_parquet.py index 4aef907c614..6ab6931175d 100644 --- a/parquet_integration/write_parquet.py +++ b/parquet_integration/write_parquet.py @@ -184,6 +184,32 @@ def case_nested(size): ) +def case_struct(size): + string = ["Hello", None, "aa", "", None, "abc", None, None, "def", "aaa"] + boolean = [True, None, False, False, None, True, None, None, True, True] + struct_fields = [ + ("f1", pa.utf8()), + ("f2", pa.bool_()), + ] + fields = [ + pa.field( + "struct", + pa.struct(struct_fields), + ) + ] + schema = pa.schema(fields) + return ( + { + "struct": pa.StructArray.from_arrays( + [pa.array(string * size), pa.array(boolean * size)], + fields=struct_fields, + ), + }, + schema, + f"struct_nullable_{size*10}.parquet", + ) + + def write_pyarrow( case, size: int, @@ -228,7 +254,7 @@ def write_pyarrow( ) -for case in [case_basic_nullable, case_basic_required, case_nested]: +for case in [case_basic_nullable, case_basic_required, case_nested, case_struct]: for version in [1, 2]: for use_dict in [True, False]: write_pyarrow(case, 1, version, use_dict, False, False) diff --git a/src/io/parquet/read/README.md b/src/io/parquet/read/README.md new file mode 100644 index 00000000000..30f8e5e194d --- /dev/null +++ b/src/io/parquet/read/README.md @@ -0,0 +1,33 @@ +## Observations + +### LSB equivalence between definition levels and bitmaps + +* When the maximum repetition level is 0 and the maximum definition level is 1, + the RLE-encoded definition levels correspond exactly to Arrow's bitmap and can be + memcopied without further transformations. + +* Reading a parquet nested field can be done by reading each individual primitive + column and "building" the nested struct in arrow. + +## Nested parquet groups are deserialized recursively + +Rows of nested parquet groups are encoded in the repetition and definition levels. +In arrow, they correspond to: +* list's offsets and validity +* struct's validity + +An implementation that leverages this observation: + +When nested types are observed in a parquet column, we recurse over the struct to gather +whether the type is a Struct or List and whether it is required or optional, which we store +in a `Vec`. `Nested` is an enum that can process definition and repetition +levels depending on the type and nullability. + +When processing pages, we process the definition and repetition levels into `Vec`. + +When we finish a column chunk, we recursively pop `Vec` as we are building the `StructArray` +or `ListArray`. + +With this approach, the only difference vs flat is that we cannot leverage the bitmap +optimization, and instead need to deserialize the repetition and definition +levels to `i32`. diff --git a/src/io/parquet/read/binary/basic.rs b/src/io/parquet/read/binary/basic.rs index 5a88b772522..5d7d0ced562 100644 --- a/src/io/parquet/read/binary/basic.rs +++ b/src/io/parquet/read/binary/basic.rs @@ -1,21 +1,17 @@ -use futures::{pin_mut, Stream, StreamExt}; use parquet2::{ encoding::{delta_length_byte_array, hybrid_rle, Encoding}, - metadata::{ColumnChunkMetaData, ColumnDescriptor}, + metadata::ColumnDescriptor, page::{BinaryPageDict, DataPage}, - FallibleStreamingIterator, }; use crate::{ - array::{Array, Offset}, + array::Offset, bitmap::{utils::BitmapIter, MutableBitmap}, buffer::MutableBuffer, - datatypes::DataType, - error::{ArrowError, Result}, + error::Result, }; use super::super::utils; -use super::utils::finish_array; /// Assumptions: No rep levels #[allow(clippy::too_many_arguments)] @@ -233,7 +229,7 @@ pub(super) fn read_plain_required( } } -fn extend_from_page( +pub(super) fn extend_from_page( page: &DataPage, descriptor: &ColumnDescriptor, offsets: &mut MutableBuffer, @@ -300,63 +296,3 @@ fn extend_from_page( }; Ok(()) } - -pub fn iter_to_array( - mut iter: I, - metadata: &ColumnChunkMetaData, - data_type: &DataType, -) -> Result> -where - ArrowError: From, - O: Offset, - I: FallibleStreamingIterator, -{ - let capacity = metadata.num_values() as usize; - let mut values = MutableBuffer::::with_capacity(0); - let mut offsets = MutableBuffer::::with_capacity(1 + capacity); - offsets.push(O::default()); - let mut validity = MutableBitmap::with_capacity(capacity); - while let Some(page) = iter.next()? { - extend_from_page( - page, - metadata.descriptor(), - &mut offsets, - &mut values, - &mut validity, - )? - } - - Ok(finish_array(data_type.clone(), offsets, values, validity)) -} - -pub async fn stream_to_array( - pages: I, - metadata: &ColumnChunkMetaData, - data_type: &DataType, -) -> Result> -where - ArrowError: From, - O: Offset, - E: Clone, - I: Stream>, -{ - let capacity = metadata.num_values() as usize; - let mut values = MutableBuffer::::with_capacity(0); - let mut offsets = MutableBuffer::::with_capacity(1 + capacity); - offsets.push(O::default()); - let mut validity = MutableBitmap::with_capacity(capacity); - - pin_mut!(pages); // needed for iteration - - while let Some(page) = pages.next().await { - extend_from_page( - page.as_ref().map_err(|x| x.clone())?, - metadata.descriptor(), - &mut offsets, - &mut values, - &mut validity, - )? - } - - Ok(finish_array(data_type.clone(), offsets, values, validity)) -} diff --git a/src/io/parquet/read/binary/mod.rs b/src/io/parquet/read/binary/mod.rs index e37090c94f7..fad5c0dcd89 100644 --- a/src/io/parquet/read/binary/mod.rs +++ b/src/io/parquet/read/binary/mod.rs @@ -1,9 +1,97 @@ +use futures::{pin_mut, Stream, StreamExt}; +use parquet2::{metadata::ColumnChunkMetaData, page::DataPage, FallibleStreamingIterator}; + +use crate::{ + array::{Array, Offset}, + bitmap::MutableBitmap, + buffer::MutableBuffer, + datatypes::DataType, + error::{ArrowError, Result}, + io::parquet::read::binary::utils::finish_array, +}; + mod basic; mod dictionary; mod nested; mod utils; -pub use basic::iter_to_array; -pub use basic::stream_to_array; pub use dictionary::iter_to_array as iter_to_dict_array; -pub use nested::iter_to_array as iter_to_array_nested; + +use super::nested_utils::Nested; + +pub fn iter_to_array( + mut iter: I, + metadata: &ColumnChunkMetaData, + data_type: DataType, + nested: &mut Vec>, +) -> Result> +where + O: Offset, + ArrowError: From, + I: FallibleStreamingIterator, +{ + let capacity = metadata.num_values() as usize; + let mut values = MutableBuffer::::with_capacity(0); + let mut offsets = MutableBuffer::::with_capacity(1 + capacity); + offsets.push(O::default()); + let mut validity = MutableBitmap::with_capacity(capacity); + + let is_nullable = nested.pop().unwrap().is_nullable(); + + if nested.is_empty() { + while let Some(page) = iter.next()? { + basic::extend_from_page( + page, + metadata.descriptor(), + &mut offsets, + &mut values, + &mut validity, + )? + } + } else { + while let Some(page) = iter.next()? { + nested::extend_from_page( + page, + metadata.descriptor(), + is_nullable, + nested, + &mut offsets, + &mut values, + &mut validity, + )? + } + } + Ok(utils::finish_array(data_type, offsets, values, validity)) +} + +pub async fn stream_to_array( + pages: I, + metadata: &ColumnChunkMetaData, + data_type: &DataType, +) -> Result> +where + ArrowError: From, + O: Offset, + E: Clone, + I: Stream>, +{ + let capacity = metadata.num_values() as usize; + let mut values = MutableBuffer::::with_capacity(0); + let mut offsets = MutableBuffer::::with_capacity(1 + capacity); + offsets.push(O::default()); + let mut validity = MutableBitmap::with_capacity(capacity); + + pin_mut!(pages); // needed for iteration + + while let Some(page) = pages.next().await { + basic::extend_from_page( + page.as_ref().map_err(|x| x.clone())?, + metadata.descriptor(), + &mut offsets, + &mut values, + &mut validity, + )? + } + + Ok(finish_array(data_type.clone(), offsets, values, validity)) +} diff --git a/src/io/parquet/read/binary/nested.rs b/src/io/parquet/read/binary/nested.rs index 8640d4c84b3..3c57aab9a5f 100644 --- a/src/io/parquet/read/binary/nested.rs +++ b/src/io/parquet/read/binary/nested.rs @@ -1,24 +1,15 @@ -use std::sync::Arc; - use parquet2::{ encoding::{hybrid_rle::HybridRleDecoder, Encoding}, - metadata::{ColumnChunkMetaData, ColumnDescriptor}, + metadata::ColumnDescriptor, page::DataPage, read::levels::get_bit_width, - FallibleStreamingIterator, }; use super::super::nested_utils::*; use super::super::utils; use super::basic::read_plain_required; -use super::utils::finish_array; -use crate::{ - array::{Array, Offset}, - bitmap::MutableBitmap, - buffer::MutableBuffer, - datatypes::DataType, - error::{ArrowError, Result}, -}; + +use crate::{array::Offset, bitmap::MutableBitmap, buffer::MutableBuffer, error::Result}; fn read_values<'a, O, D, G>( def_levels: D, @@ -101,7 +92,7 @@ fn read( } } -fn extend_from_page( +pub(super) fn extend_from_page( page: &DataPage, descriptor: &ColumnDescriptor, is_nullable: bool, @@ -146,38 +137,3 @@ fn extend_from_page( } Ok(()) } - -pub fn iter_to_array( - mut iter: I, - metadata: &ColumnChunkMetaData, - data_type: DataType, -) -> Result<(Arc, Vec>)> -where - O: Offset, - ArrowError: From, - I: FallibleStreamingIterator, -{ - let capacity = metadata.num_values() as usize; - let mut values = MutableBuffer::::with_capacity(0); - let mut offsets = MutableBuffer::::with_capacity(1 + capacity); - offsets.push(O::default()); - let mut validity = MutableBitmap::with_capacity(capacity); - - let (mut nested, is_nullable) = init_nested(metadata.descriptor().base_type(), capacity); - - while let Some(page) = iter.next()? { - extend_from_page( - page, - metadata.descriptor(), - is_nullable, - &mut nested, - &mut offsets, - &mut values, - &mut validity, - )? - } - - let values = finish_array(data_type, offsets, values, validity).into(); - - Ok((values, nested)) -} diff --git a/src/io/parquet/read/boolean/basic.rs b/src/io/parquet/read/boolean/basic.rs index 128caaac8e9..e52f42594b1 100644 --- a/src/io/parquet/read/boolean/basic.rs +++ b/src/io/parquet/read/boolean/basic.rs @@ -12,7 +12,6 @@ use parquet2::{ encoding::{hybrid_rle, Encoding}, metadata::{ColumnChunkMetaData, ColumnDescriptor}, page::DataPage, - FallibleStreamingIterator, }; pub(super) fn read_required(buffer: &[u8], additional: usize, values: &mut MutableBitmap) { @@ -68,25 +67,6 @@ fn read_optional( } } -pub fn iter_to_array(mut iter: I, metadata: &ColumnChunkMetaData) -> Result -where - ArrowError: From, - I: FallibleStreamingIterator, -{ - let capacity = metadata.num_values() as usize; - let mut values = MutableBitmap::with_capacity(capacity); - let mut validity = MutableBitmap::with_capacity(capacity); - while let Some(page) = iter.next()? { - extend_from_page(page, metadata.descriptor(), &mut values, &mut validity)? - } - - Ok(BooleanArray::from_data( - DataType::Boolean, - values.into(), - validity.into(), - )) -} - pub async fn stream_to_array(pages: I, metadata: &ColumnChunkMetaData) -> Result where ArrowError: From, @@ -115,7 +95,7 @@ where )) } -fn extend_from_page( +pub(super) fn extend_from_page( page: &DataPage, descriptor: &ColumnDescriptor, values: &mut MutableBitmap, diff --git a/src/io/parquet/read/boolean/mod.rs b/src/io/parquet/read/boolean/mod.rs index 912f1bb026b..ad6ef57cdee 100644 --- a/src/io/parquet/read/boolean/mod.rs +++ b/src/io/parquet/read/boolean/mod.rs @@ -1,6 +1,55 @@ +use crate::{ + array::{Array, BooleanArray}, + bitmap::MutableBitmap, + datatypes::DataType, + error::{ArrowError, Result}, +}; + +use parquet2::{metadata::ColumnChunkMetaData, page::DataPage, FallibleStreamingIterator}; + mod basic; mod nested; -pub use basic::iter_to_array; pub use basic::stream_to_array; -pub use nested::iter_to_array as iter_to_array_nested; + +use super::nested_utils::Nested; + +pub fn iter_to_array( + mut iter: I, + metadata: &ColumnChunkMetaData, + data_type: DataType, + nested: &mut Vec>, +) -> Result> +where + ArrowError: From, + I: FallibleStreamingIterator, +{ + let capacity = metadata.num_values() as usize; + let mut values = MutableBitmap::with_capacity(capacity); + let mut validity = MutableBitmap::with_capacity(capacity); + + let is_nullable = nested.pop().unwrap().is_nullable(); + + if nested.is_empty() { + while let Some(page) = iter.next()? { + basic::extend_from_page(page, metadata.descriptor(), &mut values, &mut validity)? + } + } else { + while let Some(page) = iter.next()? { + nested::extend_from_page( + page, + metadata.descriptor(), + is_nullable, + nested, + &mut values, + &mut validity, + )? + } + } + + Ok(Box::new(BooleanArray::from_data( + data_type, + values.into(), + validity.into(), + ))) +} diff --git a/src/io/parquet/read/boolean/nested.rs b/src/io/parquet/read/boolean/nested.rs index ac89cac8d4f..072e9c05fb7 100644 --- a/src/io/parquet/read/boolean/nested.rs +++ b/src/io/parquet/read/boolean/nested.rs @@ -1,21 +1,16 @@ -use std::sync::Arc; - use parquet2::{ encoding::{hybrid_rle::HybridRleDecoder, Encoding}, - metadata::{ColumnChunkMetaData, ColumnDescriptor}, + metadata::ColumnDescriptor, page::DataPage, read::levels::get_bit_width, - FallibleStreamingIterator, }; use super::super::nested_utils::*; use super::super::utils; use super::basic::read_required; use crate::{ - array::{Array, BooleanArray}, bitmap::{utils::BitmapIter, MutableBitmap}, - datatypes::DataType, - error::{ArrowError, Result}, + error::Result, }; fn read_values( @@ -65,7 +60,11 @@ fn read( get_bit_width(def_level_encoding.1), additional, ); - let new_values = BitmapIter::new(values_buffer, 0, additional); + + // don't know how many values there is: using the max possible + let num_valid_values = additional.min(values_buffer.len() * 8); + + let new_values = BitmapIter::new(values_buffer, 0, num_valid_values); read_values(def_levels, max_def_level, new_values, values, validity) } else { read_required(values_buffer, additional, values) @@ -87,7 +86,7 @@ fn read( } } -fn extend_from_page( +pub(super) fn extend_from_page( page: &DataPage, descriptor: &ColumnDescriptor, is_nullable: bool, @@ -130,38 +129,3 @@ fn extend_from_page( } Ok(()) } - -pub fn iter_to_array( - mut iter: I, - metadata: &ColumnChunkMetaData, - data_type: DataType, -) -> Result<(Arc, Vec>)> -where - ArrowError: From, - I: FallibleStreamingIterator, -{ - let capacity = metadata.num_values() as usize; - let mut values = MutableBitmap::with_capacity(capacity); - let mut validity = MutableBitmap::with_capacity(capacity); - - let (mut nested, is_nullable) = init_nested(metadata.descriptor().base_type(), capacity); - - while let Some(page) = iter.next()? { - extend_from_page( - page, - metadata.descriptor(), - is_nullable, - &mut nested, - &mut values, - &mut validity, - )? - } - - let values = Arc::new(BooleanArray::from_data( - data_type, - values.into(), - validity.into(), - )); - - Ok((values, nested)) -} diff --git a/src/io/parquet/read/mod.rs b/src/io/parquet/read/mod.rs index 673a23b727b..7d1470df474 100644 --- a/src/io/parquet/read/mod.rs +++ b/src/io/parquet/read/mod.rs @@ -14,9 +14,10 @@ pub use parquet2::{ metadata::{ColumnChunkMetaData, ColumnDescriptor, RowGroupMetaData}, page::{CompressedDataPage, DataPage, DataPageHeader}, read::{ - decompress, get_page_iterator as _get_page_iterator, get_page_stream as _get_page_stream, - read_metadata as _read_metadata, read_metadata_async as _read_metadata_async, - BasicDecompressor, Decompressor, PageFilter, PageIterator, + decompress, get_column_iterator, get_page_iterator as _get_page_iterator, + get_page_stream as _get_page_stream, read_metadata as _read_metadata, + read_metadata_async as _read_metadata_async, BasicDecompressor, ColumnChunkIter, + Decompressor, MutStreamingIterator, PageFilter, PageIterator, State, }, schema::types::{ LogicalType, ParquetType, PhysicalType, PrimitiveConvertedType, @@ -27,10 +28,10 @@ pub use parquet2::{ }; use crate::{ - array::{Array, DictionaryKey, NullArray, PrimitiveArray}, + array::{Array, DictionaryKey, NullArray, PrimitiveArray, StructArray}, datatypes::{DataType, IntervalUnit, TimeUnit}, error::{ArrowError, Result}, - io::parquet::read::nested_utils::create_list, + io::parquet::read::nested_utils::{create_list, init_nested}, }; mod binary; @@ -50,12 +51,12 @@ pub use schema::{get_schema, FileMetaData}; use self::nested_utils::Nested; /// Creates a new iterator of compressed pages. -pub fn get_page_iterator<'b, RR: Read + Seek>( +pub fn get_page_iterator( column_metadata: &ColumnChunkMetaData, - reader: &'b mut RR, + reader: R, pages_filter: Option, buffer: Vec, -) -> Result> { +) -> Result> { Ok(_get_page_iterator( column_metadata, reader, @@ -170,65 +171,27 @@ fn dict_read< } } -fn page_iter_to_array_nested< - I: FallibleStreamingIterator, ->( - iter: &mut I, - metadata: &ColumnChunkMetaData, - data_type: DataType, -) -> Result<(Arc, Vec>)> { - use DataType::*; - match data_type { - UInt8 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i32| x as u8), - UInt16 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i32| x as u16), - UInt32 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i32| x as u32), - Int8 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i32| x as i8), - Int16 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i32| x as i16), - Int32 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i32| x as i32), - - Timestamp(TimeUnit::Nanosecond, None) => match metadata.descriptor().type_() { - ParquetType::PrimitiveType { physical_type, .. } => match physical_type { - PhysicalType::Int96 => primitive::iter_to_array_nested( - iter, - metadata, - DataType::Timestamp(TimeUnit::Nanosecond, None), - int96_to_i64_ns, - ), - _ => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i64| x), - }, - _ => unreachable!(), - }, - - // INT64 - Int64 | Date64 | Time64(_) | Duration(_) | Timestamp(_, _) => { - primitive::iter_to_array_nested(iter, metadata, data_type, |x: i64| x) - } - UInt64 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i64| x as u64), - - Float32 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: f32| x), - Float64 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: f64| x), - - Boolean => boolean::iter_to_array_nested(iter, metadata, data_type), - - Binary | Utf8 => binary::iter_to_array_nested::(iter, metadata, data_type), - LargeBinary | LargeUtf8 => { - binary::iter_to_array_nested::(iter, metadata, data_type) - } - List(ref inner) => { - let (values, mut nested) = - page_iter_to_array_nested(iter, metadata, inner.data_type().clone())?; - Ok((create_list(data_type, &mut nested, values)?.into(), nested)) +fn column_datatype(data_type: &DataType, column: usize) -> DataType { + use crate::datatypes::PhysicalType::*; + match data_type.to_physical_type() { + Null | Boolean | Primitive(_) | FixedSizeBinary | Binary | LargeBinary | Utf8 + | LargeUtf8 | Dictionary(_) | List | LargeList | FixedSizeList => data_type.clone(), + Struct => { + // todo: this won't work for nested structs because we need to flatten the column ids + if let DataType::Struct(v) = data_type { + v[column].data_type().clone() + } else { + unreachable!() + } } - other => Err(ArrowError::NotYetImplemented(format!( - "Reading {:?} from parquet still not implemented", - other - ))), + Union => todo!(), + Map => todo!(), } } -/// Converts an iterator of [`DataPage`] into a single [`Array`]. -pub fn page_iter_to_array>( +fn page_iter_to_array>( iter: &mut I, + nested: &mut Vec>, metadata: &ColumnChunkMetaData, data_type: DataType, ) -> Result> { @@ -238,14 +201,16 @@ pub fn page_iter_to_array primitive::iter_to_array(iter, metadata, data_type, |x: i32| x as u8), - UInt16 => primitive::iter_to_array(iter, metadata, data_type, |x: i32| x as u16), - UInt32 => primitive::iter_to_array(iter, metadata, data_type, |x: i32| x as u32), - Int8 => primitive::iter_to_array(iter, metadata, data_type, |x: i32| x as i8), - Int16 => primitive::iter_to_array(iter, metadata, data_type, |x: i32| x as i16), + + Boolean => boolean::iter_to_array(iter, metadata, data_type, nested), + + UInt8 => primitive::iter_to_array(iter, metadata, data_type, nested, |x: i32| x as u8), + UInt16 => primitive::iter_to_array(iter, metadata, data_type, nested, |x: i32| x as u16), + UInt32 => primitive::iter_to_array(iter, metadata, data_type, nested, |x: i32| x as u32), + Int8 => primitive::iter_to_array(iter, metadata, data_type, nested, |x: i32| x as i8), + Int16 => primitive::iter_to_array(iter, metadata, data_type, nested, |x: i32| x as i16), Int32 | Date32 | Time32(_) | Interval(IntervalUnit::YearMonth) => { - primitive::iter_to_array(iter, metadata, data_type, |x: i32| x as i32) + primitive::iter_to_array(iter, metadata, data_type, nested, |x: i32| x as i32) } Timestamp(TimeUnit::Nanosecond, None) => match metadata.descriptor().type_() { @@ -254,36 +219,24 @@ pub fn page_iter_to_array primitive::iter_to_array(iter, metadata, data_type, |x: i64| x), + _ => primitive::iter_to_array(iter, metadata, data_type, nested, |x: i64| x), }, _ => unreachable!(), }, - // INT64 - Int64 | Date64 | Time64(_) | Duration(_) | Timestamp(_, _) => { - primitive::iter_to_array(iter, metadata, data_type, |x: i64| x) - } - UInt64 => primitive::iter_to_array(iter, metadata, data_type, |x: i64| x as u64), - - Float32 => primitive::iter_to_array(iter, metadata, data_type, |x: f32| x), - Float64 => primitive::iter_to_array(iter, metadata, data_type, |x: f64| x), - - Boolean => Ok(Box::new(boolean::iter_to_array(iter, metadata)?)), - - Binary | Utf8 => binary::iter_to_array::(iter, metadata, &data_type), - LargeBinary | LargeUtf8 => binary::iter_to_array::(iter, metadata, &data_type), FixedSizeBinary(_) => Ok(Box::new(fixed_size_binary::iter_to_array( iter, data_type, metadata, )?)), Decimal(_, _) => match metadata.descriptor().type_() { ParquetType::PrimitiveType { physical_type, .. } => match physical_type { PhysicalType::Int32 => { - primitive::iter_to_array(iter, metadata, data_type, |x: i32| x as i128) + primitive::iter_to_array(iter, metadata, data_type, nested, |x: i32| x as i128) } PhysicalType::Int64 => { - primitive::iter_to_array(iter, metadata, data_type, |x: i64| x as i128) + primitive::iter_to_array(iter, metadata, data_type, nested, |x: i64| x as i128) } PhysicalType::FixedLenByteArray(n) => { if *n > 16 { @@ -320,21 +273,34 @@ pub fn page_iter_to_array unreachable!(), }, - List(ref inner) => { - let (values, mut nested) = - page_iter_to_array_nested(iter, metadata, inner.data_type().clone())?; - create_list(data_type, &mut nested, values) + + // INT64 + Int64 | Date64 | Time64(_) | Duration(_) | Timestamp(_, _) => { + primitive::iter_to_array(iter, metadata, data_type, nested, |x: i64| x) } - LargeList(ref inner) => { - let (values, mut nested) = - page_iter_to_array_nested(iter, metadata, inner.data_type().clone())?; - create_list(data_type, &mut nested, values) + UInt64 => primitive::iter_to_array(iter, metadata, data_type, nested, |x: i64| x as u64), + + Float32 => primitive::iter_to_array(iter, metadata, data_type, nested, |x: f32| x), + Float64 => primitive::iter_to_array(iter, metadata, data_type, nested, |x: f64| x), + + Binary | Utf8 => binary::iter_to_array::(iter, metadata, data_type, nested), + LargeBinary | LargeUtf8 => { + binary::iter_to_array::(iter, metadata, data_type, nested) } Dictionary(key_type, _) => match_integer_type!(key_type, |$T| { dict_read::<$T, _>(iter, metadata, data_type) }), + List(ref inner) => { + let values = page_iter_to_array(iter, nested, metadata, inner.data_type().clone())?; + create_list(data_type, nested, values.into()) + } + LargeList(ref inner) => { + let values = page_iter_to_array(iter, nested, metadata, inner.data_type().clone())?; + create_list(data_type, nested, values.into()) + } + other => Err(ArrowError::NotYetImplemented(format!( "Reading {:?} from parquet still not implemented", other @@ -342,6 +308,66 @@ pub fn page_iter_to_array( + mut columns: I, + data_type: DataType, + mut buffer: Vec, +) -> Result<(Box, Vec, Vec)> +where + II: Iterator>, + I: ColumnChunkIter, +{ + let mut nested_info = vec![]; + init_nested(columns.field(), 0, &mut nested_info); + + let mut arrays = vec![]; + let page_buffer; + let mut column = 0; + loop { + match columns.advance()? { + State::Some(mut new_iter) => { + let data_type = column_datatype(&data_type, column); + if let Some((pages, metadata)) = new_iter.get() { + let mut iterator = BasicDecompressor::new(pages, buffer); + + let array = + page_iter_to_array(&mut iterator, &mut nested_info, metadata, data_type)?; + buffer = iterator.into_inner(); + arrays.push(array) + } + column += 1; + columns = new_iter; + } + State::Finished(b) => { + page_buffer = b; + break; + } + } + } + + use crate::datatypes::PhysicalType::*; + Ok(match data_type.to_physical_type() { + Null | Boolean | Primitive(_) | FixedSizeBinary | Binary | LargeBinary | Utf8 + | LargeUtf8 | List | LargeList | FixedSizeList | Dictionary(_) => { + (arrays.pop().unwrap(), page_buffer, buffer) + } + Struct => ( + Box::new(StructArray::from_data( + data_type, + arrays.into_iter().map(|x| x.into()).collect(), + None, + )), + page_buffer, + buffer, + ), + Union => todo!(), + Map => todo!(), + }) +} + /// Converts an async stream of [`DataPage`] into a single [`Array`]. pub async fn page_stream_to_array>>( pages: I, diff --git a/src/io/parquet/read/nested_utils.rs b/src/io/parquet/read/nested_utils.rs index e3bc873b62e..2a13ba97ef3 100644 --- a/src/io/parquet/read/nested_utils.rs +++ b/src/io/parquet/read/nested_utils.rs @@ -25,6 +25,40 @@ pub trait Nested: std::fmt::Debug { fn is_nullable(&self) -> bool; } +#[derive(Debug, Default)] +pub struct NestedPrimitive { + is_nullable: bool, +} + +impl NestedPrimitive { + pub fn new(is_nullable: bool) -> Self { + Self { is_nullable } + } +} + +impl Nested for NestedPrimitive { + fn inner(&mut self) -> (Buffer, Option) { + (Default::default(), Default::default()) + } + + #[inline] + fn last_offset(&self) -> i64 { + 0 + } + + fn is_nullable(&self) -> bool { + self.is_nullable + } + + fn push(&mut self, _value: i64, _is_valid: bool) {} + + fn offsets(&mut self) -> &[i64] { + &[] + } + + fn close(&mut self, _length: i64) {} +} + #[derive(Debug, Default)] pub struct NestedOptional { pub validity: MutableBitmap, @@ -180,45 +214,32 @@ pub fn extend_offsets( }); } -pub fn is_nullable(type_: &ParquetType, container: &mut Vec) { - match type_ { +pub fn init_nested(field: &ParquetType, capacity: usize, container: &mut Vec>) { + match field { ParquetType::PrimitiveType { basic_info, .. } => { - container.push(super::schema::is_nullable(basic_info)); + container.push( + Box::new(NestedPrimitive::new(super::schema::is_nullable(basic_info))) + as Box, + ); } ParquetType::GroupType { basic_info, fields, .. } => { if basic_info.repetition() != &Repetition::Repeated { - container.push(super::schema::is_nullable(basic_info)); + let item = if super::schema::is_nullable(basic_info) { + Box::new(NestedOptional::with_capacity(capacity)) as Box + } else { + Box::new(NestedValid::with_capacity(capacity)) as Box + }; + container.push(item); } for field in fields { - is_nullable(field, container) + init_nested(field, capacity, container) } } } } -pub fn init_nested(base_type: &ParquetType, capacity: usize) -> (Vec>, bool) { - let mut nullable = Vec::new(); - is_nullable(base_type, &mut nullable); - // the primitive's nullability is the last on the list - let is_nullable = nullable.pop().unwrap(); - - ( - nullable - .iter() - .map(|is_nullable| { - if *is_nullable { - Box::new(NestedOptional::with_capacity(capacity)) as Box - } else { - Box::new(NestedValid::with_capacity(capacity)) as Box - } - }) - .collect(), - is_nullable, - ) -} - pub fn create_list( data_type: DataType, nested: &mut Vec>, diff --git a/src/io/parquet/read/primitive/mod.rs b/src/io/parquet/read/primitive/mod.rs index 7839988127a..bb7b5dc325f 100644 --- a/src/io/parquet/read/primitive/mod.rs +++ b/src/io/parquet/read/primitive/mod.rs @@ -3,8 +3,6 @@ mod dictionary; mod nested; mod utils; -use std::sync::Arc; - use futures::{pin_mut, Stream, StreamExt}; use parquet2::{page::DataPage, types::NativeType, FallibleStreamingIterator}; @@ -21,38 +19,6 @@ use crate::{ pub use dictionary::iter_to_array as iter_to_dict_array; -pub fn iter_to_array( - mut iter: I, - metadata: &ColumnChunkMetaData, - data_type: DataType, - op: F, -) -> Result> -where - ArrowError: From, - T: NativeType, - A: ArrowNativeType, - F: Copy + Fn(T) -> A, - I: FallibleStreamingIterator, -{ - let capacity = metadata.num_values() as usize; - let mut values = MutableBuffer::::with_capacity(capacity); - let mut validity = MutableBitmap::with_capacity(capacity); - while let Some(page) = iter.next()? { - basic::extend_from_page(page, metadata.descriptor(), &mut values, &mut validity, op)? - } - - let data_type = match data_type { - DataType::Dictionary(_, values) => values.as_ref().clone(), - _ => data_type, - }; - - Ok(Box::new(PrimitiveArray::from_data( - data_type, - values.into(), - validity.into(), - ))) -} - pub async fn stream_to_array( pages: I, metadata: &ColumnChunkMetaData, @@ -95,12 +61,13 @@ where ))) } -pub fn iter_to_array_nested( +pub fn iter_to_array( mut iter: I, metadata: &ColumnChunkMetaData, data_type: DataType, + nested: &mut Vec>, op: F, -) -> Result<(Arc, Vec>)> +) -> Result> where ArrowError: From, T: NativeType, @@ -113,24 +80,34 @@ where let mut values = MutableBuffer::::with_capacity(capacity); let mut validity = MutableBitmap::with_capacity(capacity); - let (mut nested, is_nullable) = init_nested(metadata.descriptor().base_type(), capacity); - - while let Some(page) = iter.next()? { - nested::extend_from_page( - page, - metadata.descriptor(), - is_nullable, - &mut nested, - &mut values, - &mut validity, - op, - )? + let is_nullable = nested.pop().unwrap().is_nullable(); + + if nested.is_empty() { + while let Some(page) = iter.next()? { + basic::extend_from_page(page, metadata.descriptor(), &mut values, &mut validity, op)? + } + } else { + while let Some(page) = iter.next()? { + nested::extend_from_page( + page, + metadata.descriptor(), + is_nullable, + nested, + &mut values, + &mut validity, + op, + )? + } } - let values = Arc::new(PrimitiveArray::::from_data( + let data_type = match data_type { + DataType::Dictionary(_, values) => values.as_ref().clone(), + _ => data_type, + }; + + Ok(Box::new(PrimitiveArray::from_data( data_type, values.into(), validity.into(), - )); - Ok((values, nested)) + ))) } diff --git a/src/io/parquet/read/record_batch.rs b/src/io/parquet/read/record_batch.rs index a49195fd8bd..d13da3f898f 100644 --- a/src/io/parquet/read/record_batch.rs +++ b/src/io/parquet/read/record_batch.rs @@ -1,6 +1,5 @@ use std::{ io::{Read, Seek}, - rc::Rc, sync::Arc, }; @@ -11,8 +10,8 @@ use crate::{ }; use super::{ - get_page_iterator, get_schema, page_iter_to_array, read_metadata, Decompressor, FileMetaData, - PageFilter, RowGroupMetaData, + column_iter_to_array, get_column_iterator, get_schema, read_metadata, FileMetaData, PageFilter, + RowGroupMetaData, }; type GroupFilter = Arc bool>; @@ -21,12 +20,12 @@ type GroupFilter = Arc bool>; pub struct RecordReader { reader: R, schema: Arc, - indices: Rc>, + indices: Vec, buffer: Vec, decompress_buffer: Vec, groups_filter: Option, pages_filter: Option, - metadata: Rc, + metadata: FileMetaData, current_group: usize, remaining_rows: usize, } @@ -65,7 +64,10 @@ impl RecordReader { if let Some(projection) = &projection { if indices.len() != projection.len() { - return Err(ArrowError::InvalidArgumentError("While reading parquet, some columns in the projection do not exist in the file".to_string())); + return Err(ArrowError::InvalidArgumentError( + "While reading parquet, some columns in the projection do not exist in the file" + .to_string(), + )); } } @@ -77,10 +79,10 @@ impl RecordReader { Ok(Self { reader, schema, - indices: Rc::new(indices), + indices, groups_filter, pages_filter, - metadata: Rc::new(metadata), + metadata, current_group: 0, buffer: vec![], decompress_buffer: vec![], @@ -95,7 +97,7 @@ impl RecordReader { /// Returns parquet's [`FileMetaData`]. pub fn metadata(&self) -> &FileMetaData { - self.metadata.as_ref() + &self.metadata } /// Sets the groups filter @@ -120,7 +122,7 @@ impl Iterator for RecordReader { } let row_group = self.current_group; - let metadata = self.metadata.clone(); + let metadata = &self.metadata; let group = &metadata.row_groups[row_group]; if let Some(groups_filter) = self.groups_filter.as_ref() { if !(groups_filter)(row_group, group) { @@ -128,7 +130,6 @@ impl Iterator for RecordReader { return self.next(); } } - let columns_meta = group.columns(); // todo: avoid these clones. let schema = self.schema().clone(); @@ -138,21 +139,19 @@ impl Iterator for RecordReader { let a = schema.fields().iter().enumerate().try_fold( (b1, b2, Vec::with_capacity(schema.fields().len())), - |(b1, b2, mut columns), (column, field)| { - // column according to the file's indexing - let column = self.indices[column]; - let column_metadata = &columns_meta[column]; - let pages = get_page_iterator( - column_metadata, + |(b1, b2, mut columns), (field_index, field)| { + let field_index = self.indices[field_index]; // project into the original schema + let column_iter = get_column_iterator( &mut self.reader, + &self.metadata, + row_group, + field_index, self.pages_filter.clone(), b1, - )?; + ); - let mut pages = Decompressor::new(pages, b2); - - let array = - page_iter_to_array(&mut pages, column_metadata, field.data_type().clone())?; + let (array, b1, b2) = + column_iter_to_array(column_iter, field.data_type().clone(), b2)?; let array = if array.len() > remaining_rows { array.slice(0, remaining_rows) @@ -161,7 +160,6 @@ impl Iterator for RecordReader { }; columns.push(array.into()); - let (b1, b2) = pages.into_buffers(); Result::Ok((b1, b2, columns)) }, ); diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index 083c9d3f114..0c6710b7c63 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -533,6 +533,63 @@ pub fn pyarrow_nested_nullable_statistics(column: usize) -> Option Box { + match column { + 0 => { + let string = [ + Some("Hello"), + None, + Some("aa"), + Some(""), + None, + Some("abc"), + None, + None, + Some("def"), + Some("aaa"), + ]; + let boolean = [ + Some(true), + None, + Some(false), + Some(false), + None, + Some(true), + None, + None, + Some(true), + Some(true), + ]; + let values = vec![ + Arc::new(Utf8Array::::from(string)) as Arc, + Arc::new(BooleanArray::from(boolean)) as Arc, + ]; + let fields = vec![ + Field::new("f1", DataType::Utf8, true), + Field::new("f2", DataType::Boolean, true), + ]; + Box::new(StructArray::from_data( + DataType::Struct(fields), + values, + None, + )) + } + _ => todo!(), + } +} + +pub fn pyarrow_struct_statistics(column: usize) -> Option> { + match column { + 0 => Some(Box::new(Utf8Statistics { + distinct_count: None, + null_count: Some(1), + min_value: Some("".to_string()), + max_value: Some("def".to_string()), + })), + _ => todo!(), + } +} + /// Round-trip with parquet using the same integration files used for IPC integration tests. fn integration_write(schema: &Schema, batches: &[RecordBatch]) -> Result> { let options = WriteOptions { diff --git a/tests/it/io/parquet/read.rs b/tests/it/io/parquet/read.rs index 037601a4380..9b53a870ce8 100644 --- a/tests/it/io/parquet/read.rs +++ b/tests/it/io/parquet/read.rs @@ -35,6 +35,7 @@ fn test_pyarrow_integration( ("basic", true) => pyarrow_required(column), ("basic", false) => pyarrow_nullable(column), ("nested", false) => pyarrow_nested_nullable(column), + ("struct", false) => pyarrow_struct(column), _ => unreachable!(), }; @@ -42,6 +43,7 @@ fn test_pyarrow_integration( ("basic", true) => pyarrow_required_statistics(column), ("basic", false) => pyarrow_nullable_statistics(column), ("nested", false) => pyarrow_nested_nullable_statistics(column), + ("struct", false) => pyarrow_struct_statistics(column), _ => unreachable!(), }; @@ -312,6 +314,11 @@ fn v2_decimal_26_required() -> Result<()> { test_pyarrow_integration(8, 2, "basic", false, true) } +#[test] +fn v1_struct_optional() -> Result<()> { + test_pyarrow_integration(0, 1, "struct", false, false) +} + #[test] fn all_types() -> Result<()> { let path = "testing/parquet-testing/data/alltypes_plain.parquet";