diff --git a/Cargo.toml b/Cargo.toml index 1fc9dbfe777..f87d6dc9757 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -69,7 +69,7 @@ ahash = { version = "0.7", optional = true } # parquet support #parquet2 = { version = "0.10", optional = true, default_features = false, features = ["stream"] } -parquet2 = { git = "https://github.com/jorgecarleitao/parquet2", branch = "write_indexes", optional = true, default_features = false, features = ["stream"] } +parquet2 = { git = "https://github.com/jorgecarleitao/parquet2", rev = "40caedeebda6641443b7fc99f0adeb38e6bec518", optional = true, default_features = false, features = ["stream"] } # avro support avro-schema = { version = "0.2", optional = true } diff --git a/src/io/parquet/read/deserialize/binary/basic.rs b/src/io/parquet/read/deserialize/binary/basic.rs index 1db294567c0..5cbf433255e 100644 --- a/src/io/parquet/read/deserialize/binary/basic.rs +++ b/src/io/parquet/read/deserialize/binary/basic.rs @@ -61,15 +61,19 @@ fn read_delta_optional( #[derive(Debug)] pub(super) struct Required<'a> { - pub values: BinaryIter<'a>, + pub values: std::iter::Take>>, + // here because `BinaryIter` has no size_hint. pub remaining: usize, } impl<'a> Required<'a> { pub fn new(page: &'a DataPage) -> Self { + let values = BinaryIter::new(page.buffer()); + let (offset, length) = page.rows.unwrap_or((0, page.num_values())); + Self { - values: BinaryIter::new(page.buffer()), - remaining: page.num_values(), + values: values.skip(offset).take(length), + remaining: length, } } } @@ -220,7 +224,7 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder { page_values, ), State::Required(page) => { - page.remaining -= additional; + page.remaining = page.remaining.saturating_sub(additional); for x in page.values.by_ref().take(additional) { values.push(x) } diff --git a/src/io/parquet/read/deserialize/primitive/basic.rs b/src/io/parquet/read/deserialize/primitive/basic.rs index ff5880918fc..0728d2f8952 100644 --- a/src/io/parquet/read/deserialize/primitive/basic.rs +++ b/src/io/parquet/read/deserialize/primitive/basic.rs @@ -17,6 +17,29 @@ use super::super::utils; use super::super::utils::OptionalPageValidity; use super::super::DataPages; +#[derive(Debug)] +pub(super) struct RequiredValues<'a> { + pub values: std::iter::Take>>, +} + +impl<'a> RequiredValues<'a> { + pub fn new(page: &'a DataPage) -> Self { + let (_, _, values) = utils::split_buffer(page); + assert_eq!(values.len() % std::mem::size_of::

(), 0); + let values = values.chunks_exact(std::mem::size_of::

()); + + let (offset, length) = page.rows.unwrap_or((0, page.num_values())); + Self { + values: values.skip(offset).take(length), + } + } + + #[inline] + pub fn len(&self) -> usize { + self.values.size_hint().0 + } +} + #[derive(Debug)] pub(super) struct Values<'a> { pub values: std::slice::ChunksExact<'a, u8>, @@ -73,7 +96,7 @@ where P: ParquetNativeType, { Optional(OptionalPageValidity<'a>, Values<'a>), - Required(Values<'a>), + Required(RequiredValues<'a>), RequiredDictionary(ValuesDictionary<'a, P>), OptionalDictionary(OptionalPageValidity<'a>, ValuesDictionary<'a, P>), } @@ -158,7 +181,7 @@ where Ok(State::Optional(validity, values)) } - (Encoding::Plain, _, false) => Ok(State::Required(Values::new::

(page))), + (Encoding::Plain, _, false) => Ok(State::Required(RequiredValues::new::

(page))), _ => Err(utils::not_implemented( &page.encoding(), is_optional, diff --git a/src/io/parquet/read/mod.rs b/src/io/parquet/read/mod.rs index 4d72be1a102..62de86dd348 100644 --- a/src/io/parquet/read/mod.rs +++ b/src/io/parquet/read/mod.rs @@ -25,8 +25,8 @@ pub use parquet2::{ decompress, get_column_iterator, get_page_iterator as _get_page_iterator, get_page_stream as _get_page_stream, read_columns_indexes as _read_columns_indexes, read_metadata as _read_metadata, read_metadata_async as _read_metadata_async, - BasicDecompressor, ColumnChunkIter, Decompressor, MutStreamingIterator, PageFilter, - PageReader, ReadColumnIterator, State, + read_pages_locations, BasicDecompressor, ColumnChunkIter, Decompressor, + MutStreamingIterator, PageFilter, PageReader, ReadColumnIterator, State, }, schema::types::{ LogicalType, ParquetType, PhysicalType, PrimitiveConvertedType, diff --git a/src/io/parquet/read/row_group.rs b/src/io/parquet/read/row_group.rs index f8cc41a82fe..b53e60f271f 100644 --- a/src/io/parquet/read/row_group.rs +++ b/src/io/parquet/read/row_group.rs @@ -183,9 +183,7 @@ pub fn to_deserializer<'a>( .map(|(column_meta, chunk)| { let pages = PageReader::new( std::io::Cursor::new(chunk), - column_meta.num_values(), - column_meta.compression(), - column_meta.descriptor().descriptor.clone(), + column_meta, Arc::new(|_, _| true), vec![], ); diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index 8c6c2fc1284..2c1a4da9e79 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -1,10 +1,13 @@ use std::io::{Cursor, Read, Seek}; use std::sync::Arc; +use arrow2::error::ArrowError; use arrow2::{ array::*, bitmap::Bitmap, buffer::Buffer, chunk::Chunk, datatypes::*, error::Result, io::parquet::read::statistics::*, io::parquet::read::*, io::parquet::write::*, }; +use parquet2::indexes::{compute_rows, select_pages}; +use parquet2::read::IndexedPageReader; use crate::io::ipc::read_gzip_json; @@ -830,3 +833,123 @@ fn arrow_type() -> Result<()> { assert_eq!(new_batches, vec![batch]); Ok(()) } + +/// Returns 2 sets of pages with different the same number of rows distributed un-evenly +fn pages() -> Result<(Vec, Vec, Schema)> { + // create pages with different number of rows + let array11 = PrimitiveArray::::from_slice([1, 2, 3, 4, 5]); + let array12 = PrimitiveArray::::from_slice([6]); + let array21 = Utf8Array::::from_slice(["a", "b", "c"]); + let array22 = Utf8Array::::from_slice(["d", "e", "f"]); + + let schema = Schema::from(vec![ + Field::new("a1", DataType::Int64, false), + Field::new("a2", DataType::Utf8, false), + ]); + + let parquet_schema = to_parquet_schema(&schema)?; + + let options = WriteOptions { + write_statistics: true, + compression: Compression::Uncompressed, + version: Version::V1, + }; + + let pages1 = vec![ + array_to_page( + &array11, + parquet_schema.columns()[0].descriptor.clone(), + options, + Encoding::Plain, + )?, + array_to_page( + &array12, + parquet_schema.columns()[0].descriptor.clone(), + options, + Encoding::Plain, + )?, + ]; + let pages2 = vec![ + array_to_page( + &array21, + parquet_schema.columns()[1].descriptor.clone(), + options, + Encoding::Plain, + )?, + array_to_page( + &array22, + parquet_schema.columns()[1].descriptor.clone(), + options, + Encoding::Plain, + )?, + ]; + + Ok((pages1, pages2, schema)) +} + +/// Tests that when arrow-specific types (Duration and LargeUtf8) are written to parquet, we can rountrip its +/// logical types. +#[test] +fn read_with_indexes() -> Result<()> { + let (pages1, pages2, schema) = pages()?; + + let options = WriteOptions { + write_statistics: true, + compression: Compression::Uncompressed, + version: Version::V1, + }; + + let to_compressed = |pages: Vec| { + let encoded_pages = DynIter::new(pages.into_iter().map(Ok)); + let compressed_pages = + Compressor::new(encoded_pages, options.compression, vec![]).map_err(ArrowError::from); + Result::Ok(DynStreamingIterator::new(compressed_pages)) + }; + + let row_group = DynIter::new(vec![to_compressed(pages1), to_compressed(pages2)].into_iter()); + + let writer = vec![]; + let mut writer = FileWriter::try_new(writer, schema, options)?; + + writer.start()?; + writer.write(row_group)?; + let (_, data) = writer.end(None)?; + + let mut reader = Cursor::new(data); + + let metadata = read_metadata(&mut reader)?; + + let schema = infer_schema(&metadata)?; + + let row_group = &metadata.row_groups[0]; + + let pages = read_pages_locations(&mut reader, row_group.columns())?; + + // say we concluded from the indexes that we only needed the "6" from the first column, so second page. + let _indexes = read_columns_indexes(&mut reader, row_group.columns(), &schema.fields)?; + let intervals = compute_rows(&[false, true], &pages[0], row_group.num_rows() as u64)?; + + // based on the intervals from c1, we compute which pages from the second column are required: + let pages = select_pages(&intervals, &pages[1], row_group.num_rows() as u64)?; + + // and read them: + let c1 = &metadata.row_groups[0].columns()[1]; + + let pages = IndexedPageReader::new(reader, c1, pages, vec![], vec![]); + let pages = BasicDecompressor::new(pages, vec![]); + + let arrays = column_iter_to_arrays( + vec![pages], + vec![&c1.descriptor().descriptor.primitive_type], + schema.fields[1].clone(), + row_group.num_rows() as usize, + )?; + + let arrays = arrays.collect::>>()?; + + assert_eq!( + arrays, + vec![Arc::new(Utf8Array::::from_slice(["f"])) as Arc] + ); + Ok(()) +}