From 68ee80ba9361cabfed4d305d78c088a4e82e336b Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Sun, 20 Mar 2022 17:51:41 +0000 Subject: [PATCH] Migrate to latest parquet2 --- Cargo.toml | 5 +- arrow-parquet-integration-testing/src/main.rs | 3 +- benches/write_parquet.rs | 3 +- examples/parquet_write.rs | 3 +- examples/parquet_write_parallel/src/main.rs | 3 +- src/doc/lib.md | 3 +- src/error.rs | 6 + .../parquet/read/deserialize/binary/basic.rs | 2 +- .../parquet/read/deserialize/binary/nested.rs | 2 +- .../parquet/read/deserialize/boolean/basic.rs | 2 +- .../read/deserialize/boolean/nested.rs | 2 +- src/io/parquet/read/deserialize/dictionary.rs | 2 +- .../deserialize/fixed_size_binary/basic.rs | 2 +- src/io/parquet/read/deserialize/mod.rs | 7 +- .../parquet/read/deserialize/nested_utils.rs | 6 +- .../read/deserialize/primitive/basic.rs | 2 +- .../read/deserialize/primitive/nested.rs | 2 +- src/io/parquet/read/deserialize/simple.rs | 22 +- src/io/parquet/read/deserialize/utils.rs | 2 +- src/io/parquet/read/indexes/binary.rs | 43 ++++ src/io/parquet/read/indexes/boolean.rs | 21 ++ .../parquet/read/indexes/fixed_len_binary.rs | 58 +++++ src/io/parquet/read/indexes/mod.rs | 141 ++++++++++++ src/io/parquet/read/indexes/primitive.rs | 206 ++++++++++++++++++ src/io/parquet/read/mod.rs | 24 +- src/io/parquet/read/row_group.rs | 10 +- src/io/parquet/read/schema/convert.rs | 102 ++++----- src/io/parquet/read/schema/mod.rs | 4 - src/io/parquet/read/statistics/primitive.rs | 27 +-- src/io/parquet/write/binary/basic.rs | 18 +- src/io/parquet/write/binary/mod.rs | 1 + src/io/parquet/write/binary/nested.rs | 14 +- src/io/parquet/write/boolean/basic.rs | 11 +- src/io/parquet/write/boolean/nested.rs | 11 +- src/io/parquet/write/dictionary.rs | 95 +++++--- src/io/parquet/write/file.rs | 8 +- src/io/parquet/write/fixed_len_bytes.rs | 29 ++- src/io/parquet/write/mod.rs | 52 ++--- src/io/parquet/write/primitive/basic.rs | 16 +- src/io/parquet/write/primitive/mod.rs | 1 + src/io/parquet/write/primitive/nested.rs | 11 +- src/io/parquet/write/row_group.rs | 32 ++- src/io/parquet/write/sink.rs | 3 +- src/io/parquet/write/utf8/basic.rs | 18 +- src/io/parquet/write/utf8/mod.rs | 1 + src/io/parquet/write/utf8/nested.rs | 13 +- src/io/parquet/write/utils.rs | 60 ++--- tests/it/io/parquet/mod.rs | 23 +- tests/it/io/parquet/write.rs | 7 +- 49 files changed, 792 insertions(+), 347 deletions(-) create mode 100644 src/io/parquet/read/indexes/binary.rs create mode 100644 src/io/parquet/read/indexes/boolean.rs create mode 100644 src/io/parquet/read/indexes/fixed_len_binary.rs create mode 100644 src/io/parquet/read/indexes/mod.rs create mode 100644 src/io/parquet/read/indexes/primitive.rs diff --git a/Cargo.toml b/Cargo.toml index 97542deea9f..ee47f3cac67 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -53,7 +53,7 @@ hex = { version = "^0.4", optional = true } # for IPC compression lz4 = { version = "1.23.1", optional = true } -zstd = { version = "0.10", optional = true } +zstd = { version = "0.11", optional = true } rand = { version = "0.8", optional = true } @@ -68,7 +68,8 @@ futures = { version = "0.3", optional = true } ahash = { version = "0.7", optional = true } # parquet support -parquet2 = { version = "0.10", optional = true, default_features = false, features = ["stream"] } +#parquet2 = { version = "0.10", optional = true, default_features = false, features = ["stream"] } +parquet2 = { git = "https://github.com/jorgecarleitao/parquet2", branch = "write_indexes", optional = true, default_features = false, features = ["stream"] } # avro support avro-schema = { version = "0.2", optional = true } diff --git a/arrow-parquet-integration-testing/src/main.rs b/arrow-parquet-integration-testing/src/main.rs index 787474cce7d..a9f5c649954 100644 --- a/arrow-parquet-integration-testing/src/main.rs +++ b/arrow-parquet-integration-testing/src/main.rs @@ -196,8 +196,7 @@ fn main() -> Result<()> { writer.start()?; for group in row_groups { - let (group, len) = group?; - writer.write(group, len)?; + writer.write(group?)?; } let _ = writer.end(None)?; diff --git a/benches/write_parquet.rs b/benches/write_parquet.rs index 32b264bfe53..42cf8deec49 100644 --- a/benches/write_parquet.rs +++ b/benches/write_parquet.rs @@ -34,8 +34,7 @@ fn write(array: &dyn Array, encoding: Encoding) -> Result<()> { writer.start()?; for group in row_groups { - let (group, len) = group?; - writer.write(group, len)?; + writer.write(group?)?; } let _ = writer.end(None)?; Ok(()) diff --git a/examples/parquet_write.rs b/examples/parquet_write.rs index df7939563bb..f11e2ec4f29 100644 --- a/examples/parquet_write.rs +++ b/examples/parquet_write.rs @@ -30,8 +30,7 @@ fn write_batch(path: &str, schema: Schema, columns: Chunk>) -> Re writer.start()?; for group in row_groups { - let (group, len) = group?; - writer.write(group, len)?; + writer.write(group?)?; } let _size = writer.end(None)?; Ok(()) diff --git a/examples/parquet_write_parallel/src/main.rs b/examples/parquet_write_parallel/src/main.rs index e997f11ce4b..2af167e6279 100644 --- a/examples/parquet_write_parallel/src/main.rs +++ b/examples/parquet_write_parallel/src/main.rs @@ -99,8 +99,7 @@ fn parallel_write(path: &str, schema: &Schema, batches: &[Chunk]) -> Result<()> // Write the file. writer.start()?; for group in row_groups { - let (group, len) = group?; - writer.write(group, len)?; + writer.write(group?)?; } let _size = writer.end(None)?; diff --git a/src/doc/lib.md b/src/doc/lib.md index 9638ff47480..08108b50932 100644 --- a/src/doc/lib.md +++ b/src/doc/lib.md @@ -62,8 +62,7 @@ fn main() -> Result<()> { // Write the file. writer.start()?; for group in row_groups { - let (group, len) = group?; - writer.write(group, len)?; + writer.write(group?)?; } let _ = writer.end(None)?; Ok(()) diff --git a/src/error.rs b/src/error.rs index 22faa164c35..1ee610085a4 100644 --- a/src/error.rs +++ b/src/error.rs @@ -52,6 +52,12 @@ impl From for ArrowError { } } +impl From for ArrowError { + fn from(error: std::string::FromUtf8Error) -> Self { + ArrowError::External("".to_string(), Box::new(error)) + } +} + impl From for ArrowError { fn from(error: simdutf8::basic::Utf8Error) -> Self { ArrowError::External("".to_string(), Box::new(error)) diff --git a/src/io/parquet/read/deserialize/binary/basic.rs b/src/io/parquet/read/deserialize/binary/basic.rs index 10d3ec07c49..c2a38dacd3f 100644 --- a/src/io/parquet/read/deserialize/binary/basic.rs +++ b/src/io/parquet/read/deserialize/binary/basic.rs @@ -180,7 +180,7 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder { fn build_state(&self, page: &'a DataPage) -> Result { let is_optional = - page.descriptor().type_().get_basic_info().repetition() == &Repetition::Optional; + page.descriptor.primitive_type.field_info.repetition == Repetition::Optional; match (page.encoding(), page.dictionary_page(), is_optional) { (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false) => { diff --git a/src/io/parquet/read/deserialize/binary/nested.rs b/src/io/parquet/read/deserialize/binary/nested.rs index 53a86523614..51ed5ba1b74 100644 --- a/src/io/parquet/read/deserialize/binary/nested.rs +++ b/src/io/parquet/read/deserialize/binary/nested.rs @@ -42,7 +42,7 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder { fn build_state(&self, page: &'a DataPage) -> Result { let is_optional = - page.descriptor().type_().get_basic_info().repetition() == &Repetition::Optional; + page.descriptor.primitive_type.field_info.repetition == Repetition::Optional; match (page.encoding(), page.dictionary_page(), is_optional) { (Encoding::Plain, None, true) => { diff --git a/src/io/parquet/read/deserialize/boolean/basic.rs b/src/io/parquet/read/deserialize/boolean/basic.rs index 705b6d0747a..5f920f78d3e 100644 --- a/src/io/parquet/read/deserialize/boolean/basic.rs +++ b/src/io/parquet/read/deserialize/boolean/basic.rs @@ -89,7 +89,7 @@ impl<'a> Decoder<'a> for BooleanDecoder { fn build_state(&self, page: &'a DataPage) -> Result { let is_optional = - page.descriptor().type_().get_basic_info().repetition() == &Repetition::Optional; + page.descriptor.primitive_type.field_info.repetition == Repetition::Optional; match (page.encoding(), is_optional) { (Encoding::Plain, true) => Ok(State::Optional(Optional::new(page))), diff --git a/src/io/parquet/read/deserialize/boolean/nested.rs b/src/io/parquet/read/deserialize/boolean/nested.rs index 276283f9080..2fbbf963529 100644 --- a/src/io/parquet/read/deserialize/boolean/nested.rs +++ b/src/io/parquet/read/deserialize/boolean/nested.rs @@ -65,7 +65,7 @@ impl<'a> Decoder<'a> for BooleanDecoder { fn build_state(&self, page: &'a DataPage) -> Result { let is_optional = - page.descriptor().type_().get_basic_info().repetition() == &Repetition::Optional; + page.descriptor.primitive_type.field_info.repetition == Repetition::Optional; match (page.encoding(), is_optional) { (Encoding::Plain, true) => { diff --git a/src/io/parquet/read/deserialize/dictionary.rs b/src/io/parquet/read/deserialize/dictionary.rs index 96aa1b8034d..f3c68ad2d9a 100644 --- a/src/io/parquet/read/deserialize/dictionary.rs +++ b/src/io/parquet/read/deserialize/dictionary.rs @@ -131,7 +131,7 @@ where fn build_state(&self, page: &'a DataPage) -> Result { let is_optional = - page.descriptor().type_().get_basic_info().repetition() == &Repetition::Optional; + page.descriptor.primitive_type.field_info.repetition == Repetition::Optional; match (page.encoding(), is_optional) { (Encoding::PlainDictionary | Encoding::RleDictionary, false) => { diff --git a/src/io/parquet/read/deserialize/fixed_size_binary/basic.rs b/src/io/parquet/read/deserialize/fixed_size_binary/basic.rs index 260fafa7eb3..866c7b87176 100644 --- a/src/io/parquet/read/deserialize/fixed_size_binary/basic.rs +++ b/src/io/parquet/read/deserialize/fixed_size_binary/basic.rs @@ -121,7 +121,7 @@ impl<'a> Decoder<'a> for BinaryDecoder { fn build_state(&self, page: &'a DataPage) -> Result { let is_optional = - page.descriptor().type_().get_basic_info().repetition() == &Repetition::Optional; + page.descriptor.primitive_type.field_info.repetition == Repetition::Optional; match (page.encoding(), page.dictionary_page(), is_optional) { (Encoding::Plain, None, true) => Ok(State::Optional(Optional::new(page, self.size))), diff --git a/src/io/parquet/read/deserialize/mod.rs b/src/io/parquet/read/deserialize/mod.rs index bb688a12f89..0d5706354c0 100644 --- a/src/io/parquet/read/deserialize/mod.rs +++ b/src/io/parquet/read/deserialize/mod.rs @@ -17,6 +17,7 @@ use crate::{ }; use self::nested_utils::{InitNested, NestedArrayIter, NestedState}; +use parquet2::schema::types::PrimitiveType; use simple::page_iter_to_arrays; use super::*; @@ -27,7 +28,7 @@ pub fn get_page_iterator( reader: R, pages_filter: Option, buffer: Vec, -) -> Result> { +) -> Result> { Ok(_get_page_iterator( column_metadata, reader, @@ -76,7 +77,7 @@ fn create_list( fn columns_to_iter_recursive<'a, I: 'a>( mut columns: Vec, - mut types: Vec<&ParquetType>, + mut types: Vec<&PrimitiveType>, field: Field, mut init: Vec, chunk_size: usize, @@ -238,7 +239,7 @@ fn field_to_init(field: &Field) -> Vec { /// The arrays are guaranteed to be at most of size `chunk_size` and data type `field.data_type`. pub fn column_iter_to_arrays<'a, I: 'a>( columns: Vec, - types: Vec<&ParquetType>, + types: Vec<&PrimitiveType>, field: Field, chunk_size: usize, ) -> Result> diff --git a/src/io/parquet/read/deserialize/nested_utils.rs b/src/io/parquet/read/deserialize/nested_utils.rs index cb1f977cd1f..8a7dafb30fb 100644 --- a/src/io/parquet/read/deserialize/nested_utils.rs +++ b/src/io/parquet/read/deserialize/nested_utils.rs @@ -283,8 +283,8 @@ impl<'a> NestedPage<'a> { pub fn new(page: &'a DataPage) -> Self { let (rep_levels, def_levels, _) = split_buffer(page); - let max_rep_level = page.descriptor().max_rep_level(); - let max_def_level = page.descriptor().max_def_level(); + let max_rep_level = page.descriptor.max_rep_level; + let max_def_level = page.descriptor.max_def_level; let reps = HybridRleDecoder::new(rep_levels, get_bit_width(max_rep_level), page.num_values()); @@ -451,7 +451,7 @@ impl<'a> Optional<'a> { pub fn new(page: &'a DataPage) -> Self { let (_, def_levels, _) = split_buffer(page); - let max_def = page.descriptor().max_def_level(); + let max_def = page.descriptor.max_def_level; Self { definition_levels: HybridRleDecoder::new( diff --git a/src/io/parquet/read/deserialize/primitive/basic.rs b/src/io/parquet/read/deserialize/primitive/basic.rs index ed6d30419f9..ff5880918fc 100644 --- a/src/io/parquet/read/deserialize/primitive/basic.rs +++ b/src/io/parquet/read/deserialize/primitive/basic.rs @@ -137,7 +137,7 @@ where fn build_state(&self, page: &'a DataPage) -> Result { let is_optional = - page.descriptor().type_().get_basic_info().repetition() == &Repetition::Optional; + page.descriptor.primitive_type.field_info.repetition == Repetition::Optional; match (page.encoding(), page.dictionary_page(), is_optional) { (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false) => { diff --git a/src/io/parquet/read/deserialize/primitive/nested.rs b/src/io/parquet/read/deserialize/primitive/nested.rs index a16217c9b18..40be69abc33 100644 --- a/src/io/parquet/read/deserialize/primitive/nested.rs +++ b/src/io/parquet/read/deserialize/primitive/nested.rs @@ -81,7 +81,7 @@ where fn build_state(&self, page: &'a DataPage) -> Result { let is_optional = - page.descriptor().type_().get_basic_info().repetition() == &Repetition::Optional; + page.descriptor.primitive_type.field_info.repetition == Repetition::Optional; match (page.encoding(), page.dictionary_page(), is_optional) { (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false) => { diff --git a/src/io/parquet/read/deserialize/simple.rs b/src/io/parquet/read/deserialize/simple.rs index 9544f16bff1..2063eb5990d 100644 --- a/src/io/parquet/read/deserialize/simple.rs +++ b/src/io/parquet/read/deserialize/simple.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use parquet2::{ schema::types::{ - LogicalType, ParquetType, PhysicalType, TimeUnit as ParquetTimeUnit, TimestampType, + LogicalType, PhysicalType, PrimitiveType, TimeUnit as ParquetTimeUnit, TimestampType, }, types::int96_to_i64_ns, }; @@ -60,24 +60,14 @@ where /// of [`DataType`] `data_type` and `chunk_size`. pub fn page_iter_to_arrays<'a, I: 'a + DataPages>( pages: I, - type_: &ParquetType, + type_: &PrimitiveType, data_type: DataType, chunk_size: usize, ) -> Result> { use DataType::*; - let (physical_type, logical_type) = if let ParquetType::PrimitiveType { - physical_type, - logical_type, - .. - } = type_ - { - (physical_type, logical_type) - } else { - return Err(ArrowError::InvalidArgumentError( - "page_iter_to_arrays can only be called with a parquet primitive type".into(), - )); - }; + let physical_type = &type_.physical_type; + let logical_type = &type_.logical_type; Ok(match data_type.to_logical_type() { Null => null::iter_to_arrays(pages, data_type, chunk_size), @@ -276,7 +266,7 @@ fn timestamp<'a, I: 'a + DataPages>( Ok(match (unit, time_unit) { (ParquetTimeUnit::MILLIS(_), TimeUnit::Second) => dyn_iter(op(iter, |x| x / 1_000)), (ParquetTimeUnit::MICROS(_), TimeUnit::Second) => dyn_iter(op(iter, |x| x / 1_000_000)), - (ParquetTimeUnit::NANOS(_), TimeUnit::Second) => dyn_iter(op(iter, |x| x * 1_000_000_000)), + (ParquetTimeUnit::NANOS(_), TimeUnit::Second) => dyn_iter(op(iter, |x| x / 1_000_000_000)), (ParquetTimeUnit::MILLIS(_), TimeUnit::Millisecond) => dyn_iter(iden(iter)), (ParquetTimeUnit::MICROS(_), TimeUnit::Millisecond) => dyn_iter(op(iter, |x| x / 1_000)), @@ -348,7 +338,7 @@ fn timestamp_dict<'a, K: DictionaryKey, I: 'a + DataPages>( pages, data_type, chunk_size, - |x: i64| x * 1_000_000_000, + |x: i64| x / 1_000_000_000, )) } diff --git a/src/io/parquet/read/deserialize/utils.rs b/src/io/parquet/read/deserialize/utils.rs index f9e9cfe20be..af4de4a4615 100644 --- a/src/io/parquet/read/deserialize/utils.rs +++ b/src/io/parquet/read/deserialize/utils.rs @@ -55,7 +55,7 @@ pub fn not_implemented( #[inline] pub fn split_buffer(page: &DataPage) -> (&[u8], &[u8], &[u8]) { - _split_buffer(page, page.descriptor()) + _split_buffer(page) } /// A private trait representing structs that can receive elements. diff --git a/src/io/parquet/read/indexes/binary.rs b/src/io/parquet/read/indexes/binary.rs new file mode 100644 index 00000000000..f67e94d86c3 --- /dev/null +++ b/src/io/parquet/read/indexes/binary.rs @@ -0,0 +1,43 @@ +use parquet2::indexes::PageIndex; + +use crate::{ + array::{Array, BinaryArray, PrimitiveArray, Utf8Array}, + datatypes::{DataType, PhysicalType}, + error::ArrowError, + trusted_len::TrustedLen, +}; + +use super::ColumnIndex; + +pub fn deserialize( + indexes: &[PageIndex>], + data_type: &DataType, +) -> Result { + Ok(ColumnIndex { + min: deserialize_binary_iter(indexes.iter().map(|index| index.min.as_ref()), data_type)?, + max: deserialize_binary_iter(indexes.iter().map(|index| index.max.as_ref()), data_type)?, + null_count: PrimitiveArray::from_trusted_len_iter( + indexes + .iter() + .map(|index| index.null_count.map(|x| x as u64)), + ), + }) +} + +fn deserialize_binary_iter<'a, I: TrustedLen>>>( + iter: I, + data_type: &DataType, +) -> Result, ArrowError> { + match data_type.to_physical_type() { + PhysicalType::LargeBinary => Ok(Box::new(BinaryArray::::from_iter(iter))), + PhysicalType::Utf8 => { + let iter = iter.map(|x| x.map(|x| std::str::from_utf8(x)).transpose()); + Ok(Box::new(Utf8Array::::try_from_trusted_len_iter(iter)?)) + } + PhysicalType::LargeUtf8 => { + let iter = iter.map(|x| x.map(|x| std::str::from_utf8(x)).transpose()); + Ok(Box::new(Utf8Array::::try_from_trusted_len_iter(iter)?)) + } + _ => Ok(Box::new(BinaryArray::::from_iter(iter))), + } +} diff --git a/src/io/parquet/read/indexes/boolean.rs b/src/io/parquet/read/indexes/boolean.rs new file mode 100644 index 00000000000..501c9e63a64 --- /dev/null +++ b/src/io/parquet/read/indexes/boolean.rs @@ -0,0 +1,21 @@ +use parquet2::indexes::PageIndex; + +use crate::array::{BooleanArray, PrimitiveArray}; + +use super::ColumnIndex; + +pub fn deserialize(indexes: &[PageIndex]) -> ColumnIndex { + ColumnIndex { + min: Box::new(BooleanArray::from_trusted_len_iter( + indexes.iter().map(|index| index.min), + )), + max: Box::new(BooleanArray::from_trusted_len_iter( + indexes.iter().map(|index| index.max), + )), + null_count: PrimitiveArray::from_trusted_len_iter( + indexes + .iter() + .map(|index| index.null_count.map(|x| x as u64)), + ), + } +} diff --git a/src/io/parquet/read/indexes/fixed_len_binary.rs b/src/io/parquet/read/indexes/fixed_len_binary.rs new file mode 100644 index 00000000000..c4499814d12 --- /dev/null +++ b/src/io/parquet/read/indexes/fixed_len_binary.rs @@ -0,0 +1,58 @@ +use parquet2::indexes::PageIndex; + +use crate::{ + array::{Array, FixedSizeBinaryArray, MutableFixedSizeBinaryArray, PrimitiveArray}, + datatypes::{DataType, PhysicalType, PrimitiveType}, + trusted_len::TrustedLen, +}; + +use super::ColumnIndex; + +pub fn deserialize(indexes: &[PageIndex>], data_type: DataType) -> ColumnIndex { + ColumnIndex { + min: deserialize_binary_iter( + indexes.iter().map(|index| index.min.as_ref()), + data_type.clone(), + ), + max: deserialize_binary_iter(indexes.iter().map(|index| index.max.as_ref()), data_type), + null_count: PrimitiveArray::from_trusted_len_iter( + indexes + .iter() + .map(|index| index.null_count.map(|x| x as u64)), + ), + } +} + +fn deserialize_binary_iter<'a, I: TrustedLen>>>( + iter: I, + data_type: DataType, +) -> Box { + match data_type.to_physical_type() { + PhysicalType::Primitive(PrimitiveType::Int128) => { + Box::new(PrimitiveArray::from_trusted_len_iter(iter.map(|v| { + v.map(|x| { + // Copy the fixed-size byte value to the start of a 16 byte stack + // allocated buffer, then use an arithmetic right shift to fill in + // MSBs, which accounts for leading 1's in negative (two's complement) + // values. + let n = x.len(); + let mut bytes = [0u8; 16]; + bytes[..n].copy_from_slice(x); + i128::from_be_bytes(bytes) >> (8 * (16 - n)) + }) + }))) + } + _ => { + let mut a = MutableFixedSizeBinaryArray::from_data( + data_type, + Vec::with_capacity(iter.size_hint().0), + None, + ); + for item in iter { + a.push(item); + } + let a: FixedSizeBinaryArray = a.into(); + Box::new(a) + } + } +} diff --git a/src/io/parquet/read/indexes/mod.rs b/src/io/parquet/read/indexes/mod.rs new file mode 100644 index 00000000000..329fed1a3ff --- /dev/null +++ b/src/io/parquet/read/indexes/mod.rs @@ -0,0 +1,141 @@ +use parquet2::indexes::{ + BooleanIndex, ByteIndex, FixedLenByteIndex, Index as ParquetIndex, NativeIndex, +}; +use parquet2::metadata::ColumnChunkMetaData; +use parquet2::read::read_columns_indexes as _read_columns_indexes; +use parquet2::schema::types::PhysicalType as ParquetPhysicalType; + +mod binary; +mod boolean; +mod fixed_len_binary; +mod primitive; + +use std::io::{Read, Seek}; + +use crate::datatypes::Field; +use crate::{ + array::{Array, UInt64Array}, + datatypes::DataType, + error::ArrowError, +}; + +/// Arrow-deserialized [`ColumnIndex`] containing the minimum and maximum value +/// of every page from the column. +/// # Invariants +/// The minimum and maximum are guaranteed to have the same logical type. +#[derive(Debug, PartialEq)] +pub struct ColumnIndex { + /// The minimum values in the pages + pub min: Box, + /// The maximum values in the pages + pub max: Box, + /// The number of null values in the pages + pub null_count: UInt64Array, +} + +impl ColumnIndex { + /// The [`DataType`] of the column index. + pub fn data_type(&self) -> &DataType { + self.min.data_type() + } +} + +/// Given a sequence of [`ParquetIndex`] representing the page indexes of each column in the +/// parquet file, returns the page-level statistics as arrow's arrays, as a vector of [`ColumnIndex`]. +/// +/// This function maps timestamps, decimal types, etc. accordingly. +/// # Implementation +/// This function is CPU-bounded but `O(P)` where `P` is the total number of pages in all columns. +/// # Error +/// This function errors iff the value is not deserializable to arrow (e.g. invalid utf-8) +fn deserialize( + indexes: &[Box], + data_types: Vec, +) -> Result, ArrowError> { + indexes + .iter() + .zip(data_types.into_iter()) + .map(|(index, data_type)| match index.physical_type() { + ParquetPhysicalType::Boolean => { + let index = index.as_any().downcast_ref::().unwrap(); + Ok(boolean::deserialize(&index.indexes)) + } + ParquetPhysicalType::Int32 => { + let index = index.as_any().downcast_ref::>().unwrap(); + Ok(primitive::deserialize_i32(&index.indexes, data_type)) + } + ParquetPhysicalType::Int64 => { + let index = index.as_any().downcast_ref::>().unwrap(); + Ok(primitive::deserialize_i64( + &index.indexes, + &index.primitive_type, + data_type, + )) + } + ParquetPhysicalType::Int96 => { + let index = index + .as_any() + .downcast_ref::>() + .unwrap(); + Ok(primitive::deserialize_i96(&index.indexes, data_type)) + } + ParquetPhysicalType::Float => { + let index = index.as_any().downcast_ref::>().unwrap(); + Ok(primitive::deserialize_id(&index.indexes, data_type)) + } + ParquetPhysicalType::Double => { + let index = index.as_any().downcast_ref::>().unwrap(); + Ok(primitive::deserialize_id(&index.indexes, data_type)) + } + ParquetPhysicalType::ByteArray => { + let index = index.as_any().downcast_ref::().unwrap(); + binary::deserialize(&index.indexes, &data_type) + } + ParquetPhysicalType::FixedLenByteArray(_) => { + let index = index.as_any().downcast_ref::().unwrap(); + Ok(fixed_len_binary::deserialize(&index.indexes, data_type)) + } + }) + .collect() +} + +// recursive function to get the corresponding leaf data_types corresponding to the +// parquet columns +fn populate_dt(data_type: &DataType, container: &mut Vec) { + match data_type.to_logical_type() { + DataType::List(inner) => populate_dt(&inner.data_type, container), + DataType::LargeList(inner) => populate_dt(&inner.data_type, container), + DataType::Dictionary(_, inner, _) => populate_dt(inner, container), + DataType::Struct(fields) => fields + .iter() + .for_each(|f| populate_dt(&f.data_type, container)), + _ => container.push(data_type.clone()), + } +} + +/// Reads the column indexes from the reader assuming a valid set of derived Arrow fields +/// for all parquet the columns in the file. +/// +/// This function is expected to be used to filter out parquet pages. +/// +/// # Implementation +/// This function is IO-bounded and calls `reader.read_exact` exactly once. +/// # Error +/// Errors iff the indexes can't be read or their deserialization to arrow is incorrect (e.g. invalid utf-8) +pub fn read_columns_indexes( + reader: &mut R, + chunks: &[ColumnChunkMetaData], + fields: &[Field], +) -> Result, ArrowError> { + let indexes = _read_columns_indexes(reader, chunks)?; + + // map arrow fields to the corresponding columns in parquet taking into account + // that fields may be nested but parquet column indexes are only leaf columns + let mut data_types = vec![]; + fields + .iter() + .map(|f| &f.data_type) + .for_each(|d| populate_dt(d, &mut data_types)); + + deserialize(&indexes, data_types) +} diff --git a/src/io/parquet/read/indexes/primitive.rs b/src/io/parquet/read/indexes/primitive.rs new file mode 100644 index 00000000000..59686f9265a --- /dev/null +++ b/src/io/parquet/read/indexes/primitive.rs @@ -0,0 +1,206 @@ +use parquet2::indexes::PageIndex; +use parquet2::schema::types::{ + LogicalType, PrimitiveType, TimeUnit as ParquetTimeUnit, TimestampType, +}; +use parquet2::types::int96_to_i64_ns; + +use crate::array::{Array, MutablePrimitiveArray, PrimitiveArray}; +use crate::datatypes::{DataType, TimeUnit}; +use crate::trusted_len::TrustedLen; +use crate::types::NativeType; + +use super::ColumnIndex; + +#[inline] +fn deserialize_int32>>( + iter: I, + data_type: DataType, +) -> Box { + use DataType::*; + match data_type.to_logical_type() { + UInt8 => Box::new( + PrimitiveArray::::from_trusted_len_iter(iter.map(|x| x.map(|x| x as u8))) + .to(data_type), + ) as _, + UInt16 => Box::new( + PrimitiveArray::::from_trusted_len_iter(iter.map(|x| x.map(|x| x as u16))) + .to(data_type), + ), + UInt32 => Box::new( + PrimitiveArray::::from_trusted_len_iter(iter.map(|x| x.map(|x| x as u32))) + .to(data_type), + ), + Decimal(_, _) => Box::new( + PrimitiveArray::::from_trusted_len_iter(iter.map(|x| x.map(|x| x as i128))) + .to(data_type), + ), + _ => Box::new(PrimitiveArray::::from_trusted_len_iter(iter).to(data_type)), + } +} + +#[inline] +fn timestamp( + array: &mut MutablePrimitiveArray, + time_unit: TimeUnit, + logical_type: &Option, +) { + let unit = if let Some(LogicalType::TIMESTAMP(TimestampType { unit, .. })) = logical_type { + unit + } else { + return; + }; + + match (unit, time_unit) { + (ParquetTimeUnit::MILLIS(_), TimeUnit::Second) => array + .values_mut_slice() + .iter_mut() + .for_each(|x| *x /= 1_000), + (ParquetTimeUnit::MICROS(_), TimeUnit::Second) => array + .values_mut_slice() + .iter_mut() + .for_each(|x| *x /= 1_000_000), + (ParquetTimeUnit::NANOS(_), TimeUnit::Second) => array + .values_mut_slice() + .iter_mut() + .for_each(|x| *x /= 1_000_000_000), + + (ParquetTimeUnit::MILLIS(_), TimeUnit::Millisecond) => {} + (ParquetTimeUnit::MICROS(_), TimeUnit::Millisecond) => array + .values_mut_slice() + .iter_mut() + .for_each(|x| *x /= 1_000), + (ParquetTimeUnit::NANOS(_), TimeUnit::Millisecond) => array + .values_mut_slice() + .iter_mut() + .for_each(|x| *x /= 1_000_000), + + (ParquetTimeUnit::MILLIS(_), TimeUnit::Microsecond) => array + .values_mut_slice() + .iter_mut() + .for_each(|x| *x *= 1_000), + (ParquetTimeUnit::MICROS(_), TimeUnit::Microsecond) => {} + (ParquetTimeUnit::NANOS(_), TimeUnit::Microsecond) => array + .values_mut_slice() + .iter_mut() + .for_each(|x| *x /= 1_000), + + (ParquetTimeUnit::MILLIS(_), TimeUnit::Nanosecond) => array + .values_mut_slice() + .iter_mut() + .for_each(|x| *x *= 1_000_000), + (ParquetTimeUnit::MICROS(_), TimeUnit::Nanosecond) => array + .values_mut_slice() + .iter_mut() + .for_each(|x| *x /= 1_000), + (ParquetTimeUnit::NANOS(_), TimeUnit::Nanosecond) => {} + } +} + +#[inline] +fn deserialize_int64>>( + iter: I, + primitive_type: &PrimitiveType, + data_type: DataType, +) -> Box { + use DataType::*; + match data_type.to_logical_type() { + UInt64 => Box::new( + PrimitiveArray::::from_trusted_len_iter(iter.map(|x| x.map(|x| x as u64))) + .to(data_type), + ) as _, + Decimal(_, _) => Box::new( + PrimitiveArray::::from_trusted_len_iter(iter.map(|x| x.map(|x| x as i128))) + .to(data_type), + ) as _, + Timestamp(time_unit, _) => { + let mut array = + MutablePrimitiveArray::::from_trusted_len_iter(iter).to(data_type.clone()); + + timestamp(&mut array, *time_unit, &primitive_type.logical_type); + + let array: PrimitiveArray = array.into(); + + Box::new(array) + } + _ => Box::new(PrimitiveArray::::from_trusted_len_iter(iter).to(data_type)), + } +} + +#[inline] +fn deserialize_int96>>( + iter: I, + data_type: DataType, +) -> Box { + Box::new( + PrimitiveArray::::from_trusted_len_iter(iter.map(|x| x.map(int96_to_i64_ns))) + .to(data_type), + ) +} + +#[inline] +fn deserialize_id_s>>( + iter: I, + data_type: DataType, +) -> Box { + Box::new(PrimitiveArray::::from_trusted_len_iter(iter).to(data_type)) +} + +pub fn deserialize_i32(indexes: &[PageIndex], data_type: DataType) -> ColumnIndex { + ColumnIndex { + min: deserialize_int32(indexes.iter().map(|index| index.min), data_type.clone()), + max: deserialize_int32(indexes.iter().map(|index| index.max), data_type), + null_count: PrimitiveArray::from_trusted_len_iter( + indexes + .iter() + .map(|index| index.null_count.map(|x| x as u64)), + ), + } +} + +pub fn deserialize_i64( + indexes: &[PageIndex], + primitive_type: &PrimitiveType, + data_type: DataType, +) -> ColumnIndex { + ColumnIndex { + min: deserialize_int64( + indexes.iter().map(|index| index.min), + primitive_type, + data_type.clone(), + ), + max: deserialize_int64( + indexes.iter().map(|index| index.max), + primitive_type, + data_type, + ), + null_count: PrimitiveArray::from_trusted_len_iter( + indexes + .iter() + .map(|index| index.null_count.map(|x| x as u64)), + ), + } +} + +pub fn deserialize_i96(indexes: &[PageIndex<[u32; 3]>], data_type: DataType) -> ColumnIndex { + ColumnIndex { + min: deserialize_int96(indexes.iter().map(|index| index.min), data_type.clone()), + max: deserialize_int96(indexes.iter().map(|index| index.max), data_type), + null_count: PrimitiveArray::from_trusted_len_iter( + indexes + .iter() + .map(|index| index.null_count.map(|x| x as u64)), + ), + } +} + +pub fn deserialize_id(indexes: &[PageIndex], data_type: DataType) -> ColumnIndex { + ColumnIndex { + min: deserialize_id_s(indexes.iter().map(|index| index.min), data_type.clone()), + max: deserialize_id_s(indexes.iter().map(|index| index.max), data_type), + null_count: PrimitiveArray::from_trusted_len_iter( + indexes + .iter() + .map(|index| index.null_count.map(|x| x as u64)), + ), + } +} diff --git a/src/io/parquet/read/mod.rs b/src/io/parquet/read/mod.rs index 14bcbef3c02..4d72be1a102 100644 --- a/src/io/parquet/read/mod.rs +++ b/src/io/parquet/read/mod.rs @@ -3,10 +3,16 @@ mod deserialize; mod file; +mod indexes; mod row_group; pub mod schema; pub mod statistics; +use std::{ + io::{Read, Seek}, + sync::Arc, +}; + use futures::{AsyncRead, AsyncSeek}; // re-exports of parquet2's relevant APIs @@ -17,9 +23,10 @@ pub use parquet2::{ page::{CompressedDataPage, DataPage, DataPageHeader}, read::{ decompress, get_column_iterator, get_page_iterator as _get_page_iterator, - get_page_stream as _get_page_stream, read_metadata as _read_metadata, - read_metadata_async as _read_metadata_async, BasicDecompressor, ColumnChunkIter, - Decompressor, MutStreamingIterator, PageFilter, PageIterator, ReadColumnIterator, State, + get_page_stream as _get_page_stream, read_columns_indexes as _read_columns_indexes, + read_metadata as _read_metadata, read_metadata_async as _read_metadata_async, + BasicDecompressor, ColumnChunkIter, Decompressor, MutStreamingIterator, PageFilter, + PageReader, ReadColumnIterator, State, }, schema::types::{ LogicalType, ParquetType, PhysicalType, PrimitiveConvertedType, @@ -29,19 +36,14 @@ pub use parquet2::{ FallibleStreamingIterator, }; +use crate::{array::Array, error::Result}; + pub use deserialize::{column_iter_to_arrays, get_page_iterator}; pub use file::{FileReader, RowGroupReader}; +pub use indexes::{read_columns_indexes, ColumnIndex}; pub use row_group::*; -pub(crate) use schema::is_type_nullable; pub use schema::{infer_schema, FileMetaData}; -use std::{ - io::{Read, Seek}, - sync::Arc, -}; - -use crate::{array::Array, error::Result}; - /// Trait describing a [`FallibleStreamingIterator`] of [`DataPage`] pub trait DataPages: FallibleStreamingIterator + Send + Sync diff --git a/src/io/parquet/read/row_group.rs b/src/io/parquet/read/row_group.rs index f83a65eadbe..f8cc41a82fe 100644 --- a/src/io/parquet/read/row_group.rs +++ b/src/io/parquet/read/row_group.rs @@ -9,7 +9,7 @@ use futures::{ }; use parquet2::{ metadata::ColumnChunkMetaData, - read::{BasicDecompressor, PageIterator}, + read::{BasicDecompressor, PageReader}, }; use crate::{ @@ -95,7 +95,7 @@ pub(super) fn get_field_columns<'a>( ) -> Vec<&'a ColumnChunkMetaData> { columns .iter() - .filter(|x| x.descriptor().path_in_schema()[0] == field_name) + .filter(|x| x.descriptor().path_in_schema[0] == field_name) .collect() } @@ -181,17 +181,17 @@ pub fn to_deserializer<'a>( let (columns, types): (Vec<_>, Vec<_>) = columns .into_iter() .map(|(column_meta, chunk)| { - let pages = PageIterator::new( + let pages = PageReader::new( std::io::Cursor::new(chunk), column_meta.num_values(), column_meta.compression(), - column_meta.descriptor().clone(), + column_meta.descriptor().descriptor.clone(), Arc::new(|_, _| true), vec![], ); ( BasicDecompressor::new(pages, vec![]), - column_meta.descriptor().type_(), + &column_meta.descriptor().descriptor.primitive_type, ) }) .unzip(); diff --git a/src/io/parquet/read/schema/convert.rs b/src/io/parquet/read/schema/convert.rs index ae2d66a1b9b..f1506314095 100644 --- a/src/io/parquet/read/schema/convert.rs +++ b/src/io/parquet/read/schema/convert.rs @@ -1,8 +1,8 @@ //! This module has a single entry point, [`parquet_to_arrow_schema`]. use parquet2::schema::{ types::{ - BasicTypeInfo, GroupConvertedType, LogicalType, ParquetType, PhysicalType, - PrimitiveConvertedType, TimeUnit as ParquetTimeUnit, TimestampType, + FieldInfo, GroupConvertedType, LogicalType, ParquetType, PhysicalType, + PrimitiveConvertedType, PrimitiveType, TimeUnit as ParquetTimeUnit, TimestampType, }, Repetition, }; @@ -166,41 +166,40 @@ fn from_fixed_len_byte_array( } /// Maps a [`PhysicalType`] with optional metadata to a [`DataType`] -fn to_primitive_type_inner( - physical_type: &PhysicalType, - logical_type: &Option, - converted_type: &Option, -) -> DataType { - match physical_type { +fn to_primitive_type_inner(primitive_type: &PrimitiveType) -> DataType { + match primitive_type.physical_type { PhysicalType::Boolean => DataType::Boolean, - PhysicalType::Int32 => from_int32(logical_type, converted_type), - PhysicalType::Int64 => from_int64(logical_type, converted_type), + PhysicalType::Int32 => { + from_int32(&primitive_type.logical_type, &primitive_type.converted_type) + } + PhysicalType::Int64 => { + from_int64(&primitive_type.logical_type, &primitive_type.converted_type) + } PhysicalType::Int96 => DataType::Timestamp(TimeUnit::Nanosecond, None), PhysicalType::Float => DataType::Float32, PhysicalType::Double => DataType::Float64, - PhysicalType::ByteArray => from_byte_array(logical_type, converted_type), - PhysicalType::FixedLenByteArray(length) => { - from_fixed_len_byte_array(length, logical_type, converted_type) + PhysicalType::ByteArray => { + from_byte_array(&primitive_type.logical_type, &primitive_type.converted_type) } + PhysicalType::FixedLenByteArray(length) => from_fixed_len_byte_array( + &length, + &primitive_type.logical_type, + &primitive_type.converted_type, + ), } } /// Entry point for converting parquet primitive type to arrow type. /// /// This function takes care of repetition. -fn to_primitive_type( - basic_info: &BasicTypeInfo, - physical_type: &PhysicalType, - logical_type: &Option, - converted_type: &Option, -) -> DataType { - let base_type = to_primitive_type_inner(physical_type, logical_type, converted_type); +fn to_primitive_type(primitive_type: &PrimitiveType) -> DataType { + let base_type = to_primitive_type_inner(primitive_type); - if basic_info.repetition() == &Repetition::Repeated { + if primitive_type.field_info.repetition == Repetition::Repeated { DataType::List(Box::new(Field::new( - basic_info.name(), + &primitive_type.field_info.name, base_type, - is_nullable(basic_info), + is_nullable(&primitive_type.field_info), ))) } else { base_type @@ -236,18 +235,18 @@ fn to_struct(fields: &[ParquetType]) -> Option { /// /// This function takes care of logical type and repetition. fn to_group_type( - basic_info: &BasicTypeInfo, + field_info: &FieldInfo, logical_type: &Option, converted_type: &Option, fields: &[ParquetType], parent_name: &str, ) -> Option { debug_assert!(!fields.is_empty()); - if basic_info.repetition() == &Repetition::Repeated { + if field_info.repetition == Repetition::Repeated { Some(DataType::List(Box::new(Field::new( - basic_info.name(), + &field_info.name, to_struct(fields)?, - is_nullable(basic_info), + is_nullable(field_info), )))) } else { non_repeated_group(logical_type, converted_type, fields, parent_name) @@ -255,8 +254,8 @@ fn to_group_type( } /// Checks whether this schema is nullable. -pub(crate) fn is_nullable(basic_info: &BasicTypeInfo) -> bool { - match basic_info.repetition() { +pub(crate) fn is_nullable(field_info: &FieldInfo) -> bool { + match field_info.repetition { Repetition::Optional => true, Repetition::Repeated => true, Repetition::Required => false, @@ -268,9 +267,9 @@ pub(crate) fn is_nullable(basic_info: &BasicTypeInfo) -> bool { /// i.e. if it is a column-less group type. fn to_field(type_: &ParquetType) -> Option { Some(Field::new( - type_.get_basic_info().name(), + &type_.get_field_info().name, to_data_type(type_)?, - is_nullable(type_.get_basic_info()), + is_nullable(type_.get_field_info()), )) } @@ -282,16 +281,7 @@ fn to_list(fields: &[ParquetType], parent_name: &str) -> Option { let item = fields.first().unwrap(); let item_type = match item { - ParquetType::PrimitiveType { - physical_type, - logical_type, - converted_type, - .. - } => Some(to_primitive_type_inner( - physical_type, - logical_type, - converted_type, - )), + ParquetType::PrimitiveType(primitive) => Some(to_primitive_type_inner(primitive)), ParquetType::GroupType { fields, .. } => { if fields.len() == 1 && item.name() != "array" @@ -312,17 +302,17 @@ fn to_list(fields: &[ParquetType], parent_name: &str) -> Option { // Without this step, the child incorrectly inherits the parent's optionality let (list_item_name, item_is_optional) = match item { ParquetType::GroupType { - basic_info, fields, .. - } if basic_info.name() == "list" && fields.len() == 1 => { + field_info, fields, .. + } if field_info.name == "list" && fields.len() == 1 => { let field = fields.first().unwrap(); ( - field.name(), - field.get_basic_info().repetition() != &Repetition::Required, + &field.get_field_info().name, + field.get_field_info().repetition != Repetition::Required, ) } _ => ( - item.name(), - item.get_basic_info().repetition() != &Repetition::Required, + &item.get_field_info().name, + item.get_field_info().repetition != Repetition::Required, ), }; @@ -344,19 +334,9 @@ fn to_list(fields: &[ParquetType], parent_name: &str) -> Option { /// conversion, the result is Ok(None). pub(crate) fn to_data_type(type_: &ParquetType) -> Option { match type_ { - ParquetType::PrimitiveType { - basic_info, - physical_type, - logical_type, - converted_type, - } => Some(to_primitive_type( - basic_info, - physical_type, - logical_type, - converted_type, - )), + ParquetType::PrimitiveType(primitive) => Some(to_primitive_type(primitive)), ParquetType::GroupType { - basic_info, + field_info, logical_type, converted_type, fields, @@ -365,11 +345,11 @@ pub(crate) fn to_data_type(type_: &ParquetType) -> Option { None } else { to_group_type( - basic_info, + field_info, logical_type, converted_type, fields, - basic_info.name(), + &field_info.name, ) } } diff --git a/src/io/parquet/read/schema/mod.rs b/src/io/parquet/read/schema/mod.rs index 0c7e7d4d665..17147fb03b5 100644 --- a/src/io/parquet/read/schema/mod.rs +++ b/src/io/parquet/read/schema/mod.rs @@ -28,7 +28,3 @@ pub fn infer_schema(file_metadata: &FileMetaData) -> Result { Schema { fields, metadata } })) } - -pub(crate) fn is_type_nullable(type_: &ParquetType) -> bool { - is_nullable(type_.get_basic_info()) -} diff --git a/src/io/parquet/read/statistics/primitive.rs b/src/io/parquet/read/statistics/primitive.rs index 91a630692df..e76eb4089fa 100644 --- a/src/io/parquet/read/statistics/primitive.rs +++ b/src/io/parquet/read/statistics/primitive.rs @@ -1,14 +1,16 @@ -use crate::datatypes::TimeUnit; -use crate::{datatypes::DataType, types::NativeType}; +use std::any::Any; + use parquet2::schema::types::{ - LogicalType, ParquetType, TimeUnit as ParquetTimeUnit, TimestampType, + LogicalType, PrimitiveType, TimeUnit as ParquetTimeUnit, TimestampType, }; use parquet2::statistics::PrimitiveStatistics as ParquetPrimitiveStatistics; use parquet2::types::NativeType as ParquetNativeType; -use std::any::Any; -use super::Statistics; +use crate::datatypes::TimeUnit; use crate::error::Result; +use crate::{datatypes::DataType, types::NativeType}; + +use super::Statistics; /// Arrow-deserialized parquet Statistics of a primitive type #[derive(Debug, Clone, PartialEq)] @@ -74,14 +76,9 @@ pub(super) fn statistics_from_i32( }) } -fn timestamp(type_: &ParquetType, time_unit: TimeUnit, x: i64) -> i64 { - let logical_type = if let ParquetType::PrimitiveType { logical_type, .. } = type_ { - logical_type - } else { - unreachable!() - }; - - let unit = if let Some(LogicalType::TIMESTAMP(TimestampType { unit, .. })) = logical_type { +fn timestamp(type_: &PrimitiveType, time_unit: TimeUnit, x: i64) -> i64 { + let unit = if let Some(LogicalType::TIMESTAMP(TimestampType { unit, .. })) = &type_.logical_type + { unit } else { return x; @@ -121,10 +118,10 @@ pub(super) fn statistics_from_i64( distinct_count: stats.distinct_count, min_value: stats .min_value - .map(|x| timestamp(stats.descriptor.type_(), time_unit, x)), + .map(|x| timestamp(&stats.primitive_type, time_unit, x)), max_value: stats .max_value - .map(|x| timestamp(stats.descriptor.type_(), time_unit, x)), + .map(|x| timestamp(&stats.primitive_type, time_unit, x)), }), Decimal(_, _) => Box::new(PrimitiveStatistics::::from((stats, data_type))), _ => Box::new(PrimitiveStatistics::::from((stats, data_type))), diff --git a/src/io/parquet/write/binary/basic.rs b/src/io/parquet/write/binary/basic.rs index 7a7c4cd805e..5d67dbf90f8 100644 --- a/src/io/parquet/write/binary/basic.rs +++ b/src/io/parquet/write/binary/basic.rs @@ -1,7 +1,8 @@ use parquet2::{ encoding::{delta_bitpacked, Encoding}, - metadata::ColumnDescriptor, + metadata::Descriptor, page::DataPage, + schema::types::PrimitiveType, statistics::{serialize_statistics, BinaryStatistics, ParquetStatistics, Statistics}, write::WriteOptions, }; @@ -11,7 +12,7 @@ use crate::{ array::{Array, BinaryArray, Offset}, bitmap::Bitmap, error::{ArrowError, Result}, - io::parquet::read::is_type_nullable, + io::parquet::read::schema::is_nullable, }; pub(crate) fn encode_plain( @@ -42,11 +43,11 @@ pub(crate) fn encode_plain( pub fn array_to_page( array: &BinaryArray, options: WriteOptions, - descriptor: ColumnDescriptor, + descriptor: Descriptor, encoding: Encoding, ) -> Result { let validity = array.validity(); - let is_optional = is_type_nullable(descriptor.type_()); + let is_optional = is_nullable(&descriptor.primitive_type.field_info); let mut buffer = vec![]; utils::write_def_levels( @@ -78,7 +79,7 @@ pub fn array_to_page( } let statistics = if options.write_statistics { - Some(build_statistics(array, descriptor.clone())) + Some(build_statistics(array, descriptor.primitive_type.clone())) } else { None }; @@ -86,6 +87,7 @@ pub fn array_to_page( utils::build_plain_page( buffer, array.len(), + array.len(), array.null_count(), 0, definition_levels_byte_length, @@ -96,12 +98,12 @@ pub fn array_to_page( ) } -pub(super) fn build_statistics( +pub(crate) fn build_statistics( array: &BinaryArray, - descriptor: ColumnDescriptor, + primitive_type: PrimitiveType, ) -> ParquetStatistics { let statistics = &BinaryStatistics { - descriptor, + primitive_type, null_count: Some(array.null_count() as i64), distinct_count: None, max_value: array diff --git a/src/io/parquet/write/binary/mod.rs b/src/io/parquet/write/binary/mod.rs index 8d9e94cd0fb..e229572b14a 100644 --- a/src/io/parquet/write/binary/mod.rs +++ b/src/io/parquet/write/binary/mod.rs @@ -2,6 +2,7 @@ mod basic; mod nested; pub use basic::array_to_page; +pub(crate) use basic::build_statistics; pub(crate) use basic::encode_plain; pub(super) use basic::{encode_delta, ord_binary}; pub use nested::array_to_page as nested_array_to_page; diff --git a/src/io/parquet/write/binary/nested.rs b/src/io/parquet/write/binary/nested.rs index 9161741c4cb..941a910ac3a 100644 --- a/src/io/parquet/write/binary/nested.rs +++ b/src/io/parquet/write/binary/nested.rs @@ -1,26 +1,25 @@ -use parquet2::{ - encoding::Encoding, metadata::ColumnDescriptor, page::DataPage, write::WriteOptions, -}; +use parquet2::metadata::Descriptor; +use parquet2::{encoding::Encoding, page::DataPage, write::WriteOptions}; use super::super::{levels, utils}; use super::basic::{build_statistics, encode_plain}; +use crate::io::parquet::read::schema::is_nullable; use crate::{ array::{Array, BinaryArray, Offset}, error::Result, - io::parquet::read::is_type_nullable, }; pub fn array_to_page( array: &BinaryArray, options: WriteOptions, - descriptor: ColumnDescriptor, + descriptor: Descriptor, nested: levels::NestedInfo, ) -> Result where OO: Offset, O: Offset, { - let is_optional = is_type_nullable(descriptor.type_()); + let is_optional = is_nullable(&descriptor.primitive_type.field_info); let validity = array.validity(); @@ -34,7 +33,7 @@ where encode_plain(array, is_optional, &mut buffer); let statistics = if options.write_statistics { - Some(build_statistics(array, descriptor.clone())) + Some(build_statistics(array, descriptor.primitive_type.clone())) } else { None }; @@ -42,6 +41,7 @@ where utils::build_plain_page( buffer, levels::num_values(nested.offsets()), + nested.offsets().len().saturating_sub(1), array.null_count(), repetition_levels_byte_length, definition_levels_byte_length, diff --git a/src/io/parquet/write/boolean/basic.rs b/src/io/parquet/write/boolean/basic.rs index f9046d6d585..e70a0de769d 100644 --- a/src/io/parquet/write/boolean/basic.rs +++ b/src/io/parquet/write/boolean/basic.rs @@ -1,14 +1,14 @@ use parquet2::{ encoding::{hybrid_rle::bitpacked_encode, Encoding}, - metadata::ColumnDescriptor, + metadata::Descriptor, page::DataPage, statistics::{serialize_statistics, BooleanStatistics, ParquetStatistics, Statistics}, write::WriteOptions, }; use super::super::utils; -use crate::error::Result; -use crate::{array::*, io::parquet::read::is_type_nullable}; +use crate::array::*; +use crate::{error::Result, io::parquet::read::schema::is_nullable}; fn encode(iterator: impl Iterator, buffer: &mut Vec) -> Result<()> { // encode values using bitpacking @@ -41,9 +41,9 @@ pub(super) fn encode_plain( pub fn array_to_page( array: &BooleanArray, options: WriteOptions, - descriptor: ColumnDescriptor, + descriptor: Descriptor, ) -> Result { - let is_optional = is_type_nullable(descriptor.type_()); + let is_optional = is_nullable(&descriptor.primitive_type.field_info); let validity = array.validity(); @@ -69,6 +69,7 @@ pub fn array_to_page( utils::build_plain_page( buffer, array.len(), + array.len(), array.null_count(), 0, definition_levels_byte_length, diff --git a/src/io/parquet/write/boolean/nested.rs b/src/io/parquet/write/boolean/nested.rs index 427c7a05925..d758bd5097e 100644 --- a/src/io/parquet/write/boolean/nested.rs +++ b/src/io/parquet/write/boolean/nested.rs @@ -1,25 +1,23 @@ -use parquet2::{ - encoding::Encoding, metadata::ColumnDescriptor, page::DataPage, write::WriteOptions, -}; +use parquet2::{encoding::Encoding, metadata::Descriptor, page::DataPage, write::WriteOptions}; use super::super::{levels, utils}; use super::basic::{build_statistics, encode_plain}; +use crate::io::parquet::read::schema::is_nullable; use crate::{ array::{Array, BooleanArray, Offset}, error::Result, - io::parquet::read::is_type_nullable, }; pub fn array_to_page( array: &BooleanArray, options: WriteOptions, - descriptor: ColumnDescriptor, + descriptor: Descriptor, nested: levels::NestedInfo, ) -> Result where O: Offset, { - let is_optional = is_type_nullable(descriptor.type_()); + let is_optional = is_nullable(&descriptor.primitive_type.field_info); let validity = array.validity(); @@ -41,6 +39,7 @@ where utils::build_plain_page( buffer, levels::num_values(nested.offsets()), + nested.offsets().len().saturating_sub(1), array.null_count(), repetition_levels_byte_length, definition_levels_byte_length, diff --git a/src/io/parquet/write/dictionary.rs b/src/io/parquet/write/dictionary.rs index 521c863aac7..e41f006ca2a 100644 --- a/src/io/parquet/write/dictionary.rs +++ b/src/io/parquet/write/dictionary.rs @@ -1,29 +1,36 @@ use parquet2::{ encoding::{hybrid_rle::encode_u32, Encoding}, - metadata::ColumnDescriptor, + metadata::Descriptor, page::{EncodedDictPage, EncodedPage}, + statistics::ParquetStatistics, write::{DynIter, WriteOptions}, }; +use super::binary::build_statistics as binary_build_statistics; use super::binary::encode_plain as binary_encode_plain; +use super::fixed_len_bytes::build_statistics as fixed_binary_build_statistics; use super::fixed_len_bytes::encode_plain as fixed_binary_encode_plain; +use super::primitive::build_statistics as primitive_build_statistics; use super::primitive::encode_plain as primitive_encode_plain; +use super::utf8::build_statistics as utf8_build_statistics; use super::utf8::encode_plain as utf8_encode_plain; -use crate::array::{Array, DictionaryArray, DictionaryKey, PrimitiveArray}; use crate::bitmap::Bitmap; use crate::datatypes::DataType; use crate::error::{ArrowError, Result}; -use crate::io::parquet::read::is_type_nullable; use crate::io::parquet::write::utils; +use crate::{ + array::{Array, DictionaryArray, DictionaryKey, PrimitiveArray}, + io::parquet::read::schema::is_nullable, +}; fn encode_keys( array: &PrimitiveArray, - // todo: merge this to not discard values' validity validity: Option<&Bitmap>, - descriptor: ColumnDescriptor, + descriptor: Descriptor, + statistics: ParquetStatistics, options: WriteOptions, ) -> Result { - let is_optional = is_type_nullable(descriptor.type_()); + let is_optional = is_nullable(&descriptor.primitive_type.field_info); let mut buffer = vec![]; @@ -94,10 +101,11 @@ fn encode_keys( utils::build_plain_page( buffer, array.len(), + array.len(), array.null_count(), 0, definition_levels_byte_length, - None, + Some(statistics), descriptor, options, Encoding::RleDictionary, @@ -106,74 +114,84 @@ fn encode_keys( } macro_rules! dyn_prim { - ($from:ty, $to:ty, $array:expr, $options:expr) => {{ + ($from:ty, $to:ty, $array:expr, $options:expr, $descriptor:expr) => {{ let values = $array.values().as_any().downcast_ref().unwrap(); let mut buffer = vec![]; primitive_encode_plain::<$from, $to>(values, false, &mut buffer); - EncodedDictPage::new(buffer, values.len()) + ( + EncodedDictPage::new(buffer, values.len()), + primitive_build_statistics::<$from, $to>(values, $descriptor.primitive_type.clone()), + ) }}; } pub fn array_to_pages( array: &DictionaryArray, - descriptor: ColumnDescriptor, + descriptor: Descriptor, options: WriteOptions, encoding: Encoding, ) -> Result>> { + println!("{descriptor:#?}"); match encoding { Encoding::PlainDictionary | Encoding::RleDictionary => { // write DictPage - let dict_page = match array.values().data_type().to_logical_type() { - DataType::Int8 => dyn_prim!(i8, i32, array, options), - DataType::Int16 => dyn_prim!(i16, i32, array, options), + let (dict_page, statistics) = match array.values().data_type().to_logical_type() { + DataType::Int8 => dyn_prim!(i8, i32, array, options, descriptor), + DataType::Int16 => dyn_prim!(i16, i32, array, options, descriptor), DataType::Int32 | DataType::Date32 | DataType::Time32(_) => { - dyn_prim!(i32, i32, array, options) + dyn_prim!(i32, i32, array, options, descriptor) } DataType::Int64 | DataType::Date64 | DataType::Time64(_) | DataType::Timestamp(_, _) - | DataType::Duration(_) => dyn_prim!(i64, i64, array, options), - DataType::UInt8 => dyn_prim!(u8, i32, array, options), - DataType::UInt16 => dyn_prim!(u16, i32, array, options), - DataType::UInt32 => dyn_prim!(u32, i32, array, options), - DataType::UInt64 => dyn_prim!(i64, i64, array, options), - DataType::Float32 => dyn_prim!(f32, f32, array, options), - DataType::Float64 => dyn_prim!(f64, f64, array, options), + | DataType::Duration(_) => dyn_prim!(i64, i64, array, options, descriptor), + DataType::UInt8 => dyn_prim!(u8, i32, array, options, descriptor), + DataType::UInt16 => dyn_prim!(u16, i32, array, options, descriptor), + DataType::UInt32 => dyn_prim!(u32, i32, array, options, descriptor), + DataType::UInt64 => dyn_prim!(i64, i64, array, options, descriptor), + DataType::Float32 => dyn_prim!(f32, f32, array, options, descriptor), + DataType::Float64 => dyn_prim!(f64, f64, array, options, descriptor), DataType::Utf8 => { - let values = array.values().as_any().downcast_ref().unwrap(); + let array = array.values().as_any().downcast_ref().unwrap(); let mut buffer = vec![]; - utf8_encode_plain::(values, false, &mut buffer); - EncodedDictPage::new(buffer, values.len()) + utf8_encode_plain::(array, false, &mut buffer); + let stats = utf8_build_statistics(array, descriptor.primitive_type.clone()); + (EncodedDictPage::new(buffer, array.len()), stats) } DataType::LargeUtf8 => { - let values = array.values().as_any().downcast_ref().unwrap(); + let array = array.values().as_any().downcast_ref().unwrap(); let mut buffer = vec![]; - utf8_encode_plain::(values, false, &mut buffer); - EncodedDictPage::new(buffer, values.len()) + utf8_encode_plain::(array, false, &mut buffer); + let stats = utf8_build_statistics(array, descriptor.primitive_type.clone()); + (EncodedDictPage::new(buffer, array.len()), stats) } DataType::Binary => { - let values = array.values().as_any().downcast_ref().unwrap(); + let array = array.values().as_any().downcast_ref().unwrap(); let mut buffer = vec![]; - binary_encode_plain::(values, false, &mut buffer); - EncodedDictPage::new(buffer, values.len()) + binary_encode_plain::(array, false, &mut buffer); + let stats = binary_build_statistics(array, descriptor.primitive_type.clone()); + (EncodedDictPage::new(buffer, array.len()), stats) } DataType::LargeBinary => { - let values = array.values().as_any().downcast_ref().unwrap(); + let array = array.values().as_any().downcast_ref().unwrap(); let mut buffer = vec![]; - binary_encode_plain::(values, false, &mut buffer); - EncodedDictPage::new(buffer, values.len()) + binary_encode_plain::(array, false, &mut buffer); + let stats = binary_build_statistics(array, descriptor.primitive_type.clone()); + (EncodedDictPage::new(buffer, array.len()), stats) } DataType::FixedSizeBinary(_) => { let mut buffer = vec![]; let array = array.values().as_any().downcast_ref().unwrap(); fixed_binary_encode_plain(array, false, &mut buffer); - EncodedDictPage::new(buffer, array.len()) + let stats = + fixed_binary_build_statistics(array, descriptor.primitive_type.clone()); + (EncodedDictPage::new(buffer, array.len()), stats) } other => { return Err(ArrowError::NotYetImplemented(format!( @@ -185,8 +203,13 @@ pub fn array_to_pages( let dict_page = EncodedPage::Dict(dict_page); // write DataPage pointing to DictPage - let data_page = - encode_keys(array.keys(), array.values().validity(), descriptor, options)?; + let data_page = encode_keys( + array.keys(), + array.values().validity(), + descriptor, + statistics, + options, + )?; let iter = std::iter::once(Ok(dict_page)).chain(std::iter::once(Ok(data_page))); Ok(DynIter::new(Box::new(iter))) diff --git a/src/io/parquet/write/file.rs b/src/io/parquet/write/file.rs index 47f595a1717..2354fead5e7 100644 --- a/src/io/parquet/write/file.rs +++ b/src/io/parquet/write/file.rs @@ -67,12 +67,8 @@ impl FileWriter { } /// Writes a row group to the file. - pub fn write( - &mut self, - row_group: RowGroupIter<'_, ArrowError>, - num_rows: usize, - ) -> Result<()> { - Ok(self.writer.write(row_group, num_rows)?) + pub fn write(&mut self, row_group: RowGroupIter<'_, ArrowError>) -> Result<()> { + Ok(self.writer.write(row_group)?) } /// Writes the footer of the parquet file. Returns the total size of the file. diff --git a/src/io/parquet/write/fixed_len_bytes.rs b/src/io/parquet/write/fixed_len_bytes.rs index e129ab66c46..32bce74d1c8 100644 --- a/src/io/parquet/write/fixed_len_bytes.rs +++ b/src/io/parquet/write/fixed_len_bytes.rs @@ -1,8 +1,9 @@ use parquet2::{ encoding::Encoding, - metadata::ColumnDescriptor, + metadata::Descriptor, page::DataPage, - statistics::{deserialize_statistics, serialize_statistics, ParquetStatistics}, + schema::types::PrimitiveType, + statistics::{serialize_statistics, FixedLenStatistics, ParquetStatistics, Statistics}, write::WriteOptions, }; @@ -10,7 +11,7 @@ use super::{binary::ord_binary, utils}; use crate::{ array::{Array, FixedSizeBinaryArray}, error::Result, - io::parquet::read::is_type_nullable, + io::parquet::read::schema::is_nullable, }; pub(crate) fn encode_plain(array: &FixedSizeBinaryArray, is_optional: bool, buffer: &mut Vec) { @@ -29,9 +30,9 @@ pub(crate) fn encode_plain(array: &FixedSizeBinaryArray, is_optional: bool, buff pub fn array_to_page( array: &FixedSizeBinaryArray, options: WriteOptions, - descriptor: ColumnDescriptor, + descriptor: Descriptor, ) -> Result { - let is_optional = is_type_nullable(descriptor.type_()); + let is_optional = is_nullable(&descriptor.primitive_type.field_info); let validity = array.validity(); let mut buffer = vec![]; @@ -48,7 +49,7 @@ pub fn array_to_page( encode_plain(array, is_optional, &mut buffer); let statistics = if options.write_statistics { - build_statistics(array, descriptor.clone()) + Some(build_statistics(array, descriptor.primitive_type.clone())) } else { None }; @@ -56,6 +57,7 @@ pub fn array_to_page( utils::build_plain_page( buffer, array.len(), + array.len(), array.null_count(), 0, definition_levels_byte_length, @@ -68,11 +70,10 @@ pub fn array_to_page( pub(super) fn build_statistics( array: &FixedSizeBinaryArray, - descriptor: ColumnDescriptor, -) -> Option { - let pq_statistics = &ParquetStatistics { - max: None, - min: None, + primitive_type: PrimitiveType, +) -> ParquetStatistics { + let statistics = &FixedLenStatistics { + primitive_type, null_count: Some(array.null_count() as i64), distinct_count: None, max_value: array @@ -85,8 +86,6 @@ pub(super) fn build_statistics( .flatten() .min_by(|x, y| ord_binary(x, y)) .map(|x| x.to_vec()), - }; - deserialize_statistics(pq_statistics, descriptor) - .map(|e| serialize_statistics(&*e)) - .ok() + } as &dyn Statistics; + serialize_statistics(statistics) } diff --git a/src/io/parquet/write/mod.rs b/src/io/parquet/write/mod.rs index 7c31f27fc52..e46dfeb9e89 100644 --- a/src/io/parquet/write/mod.rs +++ b/src/io/parquet/write/mod.rs @@ -16,7 +16,7 @@ use crate::array::*; use crate::bitmap::Bitmap; use crate::datatypes::*; use crate::error::{ArrowError, Result}; -use crate::io::parquet::read::is_type_nullable; +use crate::io::parquet::read::schema::is_nullable; use crate::io::parquet::write::levels::NestedInfo; use crate::types::days_ms; use crate::types::NativeType; @@ -26,7 +26,7 @@ pub use parquet2::{ compression::Compression, encoding::Encoding, fallible_streaming_iterator, - metadata::{ColumnDescriptor, KeyValue, SchemaDescriptor}, + metadata::{Descriptor, KeyValue, SchemaDescriptor}, page::{CompressedDataPage, CompressedPage, EncodedPage}, schema::types::ParquetType, write::{ @@ -53,11 +53,13 @@ pub(self) fn decimal_length_from_precision(precision: usize) -> usize { /// Creates a parquet [`SchemaDescriptor`] from a [`Schema`]. pub fn to_parquet_schema(schema: &Schema) -> Result { + println!("{:#?}", schema); let parquet_types = schema .fields .iter() .map(to_parquet_type) .collect::>>()?; + println!("{:#?}", parquet_types); Ok(SchemaDescriptor::new("root".to_string(), parquet_types)) } @@ -80,7 +82,7 @@ pub fn can_encode(data_type: &DataType, encoding: Encoding) -> bool { /// Returns an iterator of [`EncodedPage`]. pub fn array_to_pages( array: &dyn Array, - descriptor: ColumnDescriptor, + descriptor: Descriptor, options: WriteOptions, encoding: Encoding, ) -> Result>> { @@ -103,7 +105,7 @@ pub fn array_to_pages( /// Converts an [`Array`] to a [`CompressedPage`] based on options, descriptor and `encoding`. pub fn array_to_page( array: &dyn Array, - descriptor: ColumnDescriptor, + descriptor: Descriptor, options: WriteOptions, encoding: Encoding, ) -> Result { @@ -316,11 +318,11 @@ fn list_array_to_page( offsets: &[O], validity: Option<&Bitmap>, values: &dyn Array, - descriptor: ColumnDescriptor, + descriptor: Descriptor, options: WriteOptions, ) -> Result { use DataType::*; - let is_optional = is_type_nullable(descriptor.type_()); + let is_optional = is_nullable(&descriptor.primitive_type.field_info); let nested = NestedInfo::new(offsets, validity, is_optional); match values.data_type() { @@ -347,47 +349,19 @@ fn list_array_to_page( Utf8 => { let values = values.as_any().downcast_ref().unwrap(); - let is_optional = is_type_nullable(descriptor.type_()); - - utf8::nested_array_to_page::( - values, - options, - descriptor, - NestedInfo::new(offsets, validity, is_optional), - ) + utf8::nested_array_to_page::(values, options, descriptor, nested) } LargeUtf8 => { let values = values.as_any().downcast_ref().unwrap(); - let is_optional = is_type_nullable(descriptor.type_()); - - utf8::nested_array_to_page::( - values, - options, - descriptor, - NestedInfo::new(offsets, validity, is_optional), - ) + utf8::nested_array_to_page::(values, options, descriptor, nested) } Binary => { let values = values.as_any().downcast_ref().unwrap(); - let is_optional = is_type_nullable(descriptor.type_()); - - binary::nested_array_to_page::( - values, - options, - descriptor, - NestedInfo::new(offsets, validity, is_optional), - ) + binary::nested_array_to_page::(values, options, descriptor, nested) } LargeBinary => { let values = values.as_any().downcast_ref().unwrap(); - let is_optional = is_type_nullable(descriptor.type_()); - - binary::nested_array_to_page::( - values, - options, - descriptor, - NestedInfo::new(offsets, validity, is_optional), - ) + binary::nested_array_to_page::(values, options, descriptor, nested) } _ => todo!(), } @@ -395,7 +369,7 @@ fn list_array_to_page( fn nested_array_to_page( array: &dyn Array, - descriptor: ColumnDescriptor, + descriptor: Descriptor, options: WriteOptions, ) -> Result { match array.data_type() { diff --git a/src/io/parquet/write/primitive/basic.rs b/src/io/parquet/write/primitive/basic.rs index 9b9deb16d0b..2b7ea6e08b9 100644 --- a/src/io/parquet/write/primitive/basic.rs +++ b/src/io/parquet/write/primitive/basic.rs @@ -1,7 +1,8 @@ use parquet2::{ encoding::Encoding, - metadata::ColumnDescriptor, + metadata::Descriptor, page::DataPage, + schema::types::PrimitiveType, statistics::{serialize_statistics, ParquetStatistics, PrimitiveStatistics, Statistics}, types::NativeType, write::WriteOptions, @@ -11,7 +12,7 @@ use super::super::utils; use crate::{ array::{Array, PrimitiveArray}, error::Result, - io::parquet::read::is_type_nullable, + io::parquet::read::schema::is_nullable, types::NativeType as ArrowNativeType, }; @@ -41,14 +42,14 @@ where pub fn array_to_page( array: &PrimitiveArray, options: WriteOptions, - descriptor: ColumnDescriptor, + descriptor: Descriptor, ) -> Result where T: ArrowNativeType, R: NativeType, T: num_traits::AsPrimitive, { - let is_optional = is_type_nullable(descriptor.type_()); + let is_optional = is_nullable(&descriptor.primitive_type.field_info); let validity = array.validity(); @@ -66,7 +67,7 @@ where encode_plain(array, is_optional, &mut buffer); let statistics = if options.write_statistics { - Some(build_statistics(array, descriptor.clone())) + Some(build_statistics(array, descriptor.primitive_type.clone())) } else { None }; @@ -74,6 +75,7 @@ where utils::build_plain_page( buffer, array.len(), + array.len(), array.null_count(), 0, definition_levels_byte_length, @@ -86,7 +88,7 @@ where pub fn build_statistics( array: &PrimitiveArray, - descriptor: ColumnDescriptor, + primitive_type: PrimitiveType, ) -> ParquetStatistics where T: ArrowNativeType, @@ -94,7 +96,7 @@ where T: num_traits::AsPrimitive, { let statistics = &PrimitiveStatistics:: { - descriptor, + primitive_type, null_count: Some(array.null_count() as i64), distinct_count: None, max_value: array diff --git a/src/io/parquet/write/primitive/mod.rs b/src/io/parquet/write/primitive/mod.rs index ddeb6541605..eec1d695d1d 100644 --- a/src/io/parquet/write/primitive/mod.rs +++ b/src/io/parquet/write/primitive/mod.rs @@ -2,5 +2,6 @@ mod basic; mod nested; pub use basic::array_to_page; +pub(crate) use basic::build_statistics; pub(crate) use basic::encode_plain; pub use nested::array_to_page as nested_array_to_page; diff --git a/src/io/parquet/write/primitive/nested.rs b/src/io/parquet/write/primitive/nested.rs index 5be103d08b9..ff8d3f1a658 100644 --- a/src/io/parquet/write/primitive/nested.rs +++ b/src/io/parquet/write/primitive/nested.rs @@ -1,22 +1,22 @@ use parquet2::{ - encoding::Encoding, metadata::ColumnDescriptor, page::DataPage, types::NativeType, + encoding::Encoding, metadata::Descriptor, page::DataPage, types::NativeType, write::WriteOptions, }; use super::super::levels; use super::super::utils; use super::basic::{build_statistics, encode_plain}; +use crate::io::parquet::read::schema::is_nullable; use crate::{ array::{Array, Offset, PrimitiveArray}, error::Result, - io::parquet::read::is_type_nullable, types::NativeType as ArrowNativeType, }; pub fn array_to_page( array: &PrimitiveArray, options: WriteOptions, - descriptor: ColumnDescriptor, + descriptor: Descriptor, nested: levels::NestedInfo, ) -> Result where @@ -25,7 +25,7 @@ where T: num_traits::AsPrimitive, O: Offset, { - let is_optional = is_type_nullable(descriptor.type_()); + let is_optional = is_nullable(&descriptor.primitive_type.field_info); let validity = array.validity(); @@ -39,7 +39,7 @@ where encode_plain(array, is_optional, &mut buffer); let statistics = if options.write_statistics { - Some(build_statistics(array, descriptor.clone())) + Some(build_statistics(array, descriptor.primitive_type.clone())) } else { None }; @@ -47,6 +47,7 @@ where utils::build_plain_page( buffer, levels::num_values(nested.offsets()), + nested.offsets().len().saturating_sub(1), array.null_count(), repetition_levels_byte_length, definition_levels_byte_length, diff --git a/src/io/parquet/write/row_group.rs b/src/io/parquet/write/row_group.rs index f6076808ac5..5c419640395 100644 --- a/src/io/parquet/write/row_group.rs +++ b/src/io/parquet/write/row_group.rs @@ -28,13 +28,15 @@ pub fn row_group_iter + 'static + Send + Sync>( .zip(columns.into_iter()) .zip(encodings.into_iter()) .map(move |((array, descriptor), encoding)| { - array_to_pages(array.as_ref(), descriptor, options, encoding).map(move |pages| { - let encoded_pages = DynIter::new(pages.map(|x| Ok(x?))); - let compressed_pages = - Compressor::new(encoded_pages, options.compression, vec![]) - .map_err(ArrowError::from); - DynStreamingIterator::new(compressed_pages) - }) + array_to_pages(array.as_ref(), descriptor.descriptor, options, encoding).map( + move |pages| { + let encoded_pages = DynIter::new(pages.map(|x| Ok(x?))); + let compressed_pages = + Compressor::new(encoded_pages, options.compression, vec![]) + .map_err(ArrowError::from); + DynStreamingIterator::new(compressed_pages) + }, + ) }), ) } @@ -78,23 +80,19 @@ impl + 'static, I: Iterator>>> RowGro impl + 'static + Send + Sync, I: Iterator>>> Iterator for RowGroupIterator { - type Item = Result<(RowGroupIter<'static, ArrowError>, usize)>; + type Item = Result>; fn next(&mut self) -> Option { let options = self.options; self.iter.next().map(|maybe_chunk| { let chunk = maybe_chunk?; - let len = chunk.len(); let encodings = self.encodings.clone(); - Ok(( - row_group_iter( - chunk, - encodings, - self.parquet_schema.columns().to_vec(), - options, - ), - len, + Ok(row_group_iter( + chunk, + encodings, + self.parquet_schema.columns().to_vec(), + options, )) }) } diff --git a/src/io/parquet/write/sink.rs b/src/io/parquet/write/sink.rs index 8906be9431b..47b994840d1 100644 --- a/src/io/parquet/write/sink.rs +++ b/src/io/parquet/write/sink.rs @@ -150,7 +150,6 @@ where fn start_send(self: Pin<&mut Self>, item: Chunk>) -> Result<(), Self::Error> { let this = self.get_mut(); if let Some(mut writer) = this.writer.take() { - let count = item.len(); let rows = crate::io::parquet::write::row_group_iter( item, this.encoding.clone(), @@ -158,7 +157,7 @@ where this.options, ); this.task = Some(Box::pin(async move { - writer.write(rows, count).await?; + writer.write(rows).await?; Ok(Some(writer)) })); Ok(()) diff --git a/src/io/parquet/write/utf8/basic.rs b/src/io/parquet/write/utf8/basic.rs index f1e8fd3d24c..cf45e4e9cd6 100644 --- a/src/io/parquet/write/utf8/basic.rs +++ b/src/io/parquet/write/utf8/basic.rs @@ -1,7 +1,8 @@ use parquet2::{ encoding::Encoding, - metadata::ColumnDescriptor, + metadata::Descriptor, page::DataPage, + schema::types::PrimitiveType, statistics::{serialize_statistics, BinaryStatistics, ParquetStatistics, Statistics}, write::WriteOptions, }; @@ -11,7 +12,7 @@ use super::super::utils; use crate::{ array::{Array, Offset, Utf8Array}, error::{ArrowError, Result}, - io::parquet::read::is_type_nullable, + io::parquet::read::schema::is_nullable, }; pub(crate) fn encode_plain( @@ -41,11 +42,11 @@ pub(crate) fn encode_plain( pub fn array_to_page( array: &Utf8Array, options: WriteOptions, - descriptor: ColumnDescriptor, + descriptor: Descriptor, encoding: Encoding, ) -> Result { let validity = array.validity(); - let is_optional = is_type_nullable(descriptor.type_()); + let is_optional = is_nullable(&descriptor.primitive_type.field_info); let mut buffer = vec![]; utils::write_def_levels( @@ -77,7 +78,7 @@ pub fn array_to_page( } let statistics = if options.write_statistics { - Some(build_statistics(array, descriptor.clone())) + Some(build_statistics(array, descriptor.primitive_type.clone())) } else { None }; @@ -85,6 +86,7 @@ pub fn array_to_page( utils::build_plain_page( buffer, array.len(), + array.len(), array.null_count(), 0, definition_levels_byte_length, @@ -95,12 +97,12 @@ pub fn array_to_page( ) } -pub(super) fn build_statistics( +pub(crate) fn build_statistics( array: &Utf8Array, - descriptor: ColumnDescriptor, + primitive_type: PrimitiveType, ) -> ParquetStatistics { let statistics = &BinaryStatistics { - descriptor, + primitive_type, null_count: Some(array.null_count() as i64), distinct_count: None, max_value: array diff --git a/src/io/parquet/write/utf8/mod.rs b/src/io/parquet/write/utf8/mod.rs index ddeb6541605..eec1d695d1d 100644 --- a/src/io/parquet/write/utf8/mod.rs +++ b/src/io/parquet/write/utf8/mod.rs @@ -2,5 +2,6 @@ mod basic; mod nested; pub use basic::array_to_page; +pub(crate) use basic::build_statistics; pub(crate) use basic::encode_plain; pub use nested::array_to_page as nested_array_to_page; diff --git a/src/io/parquet/write/utf8/nested.rs b/src/io/parquet/write/utf8/nested.rs index cb87fabf31f..69f395f17d9 100644 --- a/src/io/parquet/write/utf8/nested.rs +++ b/src/io/parquet/write/utf8/nested.rs @@ -1,26 +1,24 @@ -use parquet2::{ - encoding::Encoding, metadata::ColumnDescriptor, page::DataPage, write::WriteOptions, -}; +use parquet2::{encoding::Encoding, metadata::Descriptor, page::DataPage, write::WriteOptions}; use super::super::{levels, utils}; use super::basic::{build_statistics, encode_plain}; +use crate::io::parquet::read::schema::is_nullable; use crate::{ array::{Array, Offset, Utf8Array}, error::Result, - io::parquet::read::is_type_nullable, }; pub fn array_to_page( array: &Utf8Array, options: WriteOptions, - descriptor: ColumnDescriptor, + descriptor: Descriptor, nested: levels::NestedInfo, ) -> Result where OO: Offset, O: Offset, { - let is_optional = is_type_nullable(descriptor.type_()); + let is_optional = is_nullable(&descriptor.primitive_type.field_info); let validity = array.validity(); @@ -34,7 +32,7 @@ where encode_plain(array, is_optional, &mut buffer); let statistics = if options.write_statistics { - Some(build_statistics(array, descriptor.clone())) + Some(build_statistics(array, descriptor.primitive_type.clone())) } else { None }; @@ -42,6 +40,7 @@ where utils::build_plain_page( buffer, levels::num_values(nested.offsets()), + nested.offsets().len().saturating_sub(1), array.null_count(), repetition_levels_byte_length, definition_levels_byte_length, diff --git a/src/io/parquet/write/utils.rs b/src/io/parquet/write/utils.rs index 6857bbc533f..aaebbcbebae 100644 --- a/src/io/parquet/write/utils.rs +++ b/src/io/parquet/write/utils.rs @@ -3,7 +3,7 @@ use crate::bitmap::Bitmap; use parquet2::{ compression::Compression, encoding::{hybrid_rle::encode_bool, Encoding}, - metadata::ColumnDescriptor, + metadata::Descriptor, page::{DataPage, DataPageHeader, DataPageHeaderV1, DataPageHeaderV2}, statistics::ParquetStatistics, write::WriteOptions, @@ -60,42 +60,42 @@ pub fn write_def_levels( #[allow(clippy::too_many_arguments)] pub fn build_plain_page( buffer: Vec, - len: usize, + num_values: usize, + num_rows: usize, null_count: usize, repetition_levels_byte_length: usize, definition_levels_byte_length: usize, statistics: Option, - descriptor: ColumnDescriptor, + descriptor: Descriptor, options: WriteOptions, encoding: Encoding, ) -> Result { - match options.version { - Version::V1 => { - let header = DataPageHeader::V1(DataPageHeaderV1 { - num_values: len as i32, - encoding: encoding.into(), - definition_level_encoding: Encoding::Rle.into(), - repetition_level_encoding: Encoding::Rle.into(), - statistics, - }); - - Ok(DataPage::new(header, buffer, None, descriptor)) - } - Version::V2 => { - let header = DataPageHeader::V2(DataPageHeaderV2 { - num_values: len as i32, - encoding: encoding.into(), - num_nulls: null_count as i32, - num_rows: len as i32, - definition_levels_byte_length: definition_levels_byte_length as i32, - repetition_levels_byte_length: repetition_levels_byte_length as i32, - is_compressed: Some(options.compression != Compression::Uncompressed), - statistics, - }); - - Ok(DataPage::new(header, buffer, None, descriptor)) - } - } + let header = match options.version { + Version::V1 => DataPageHeader::V1(DataPageHeaderV1 { + num_values: num_values as i32, + encoding: encoding.into(), + definition_level_encoding: Encoding::Rle.into(), + repetition_level_encoding: Encoding::Rle.into(), + statistics, + }), + Version::V2 => DataPageHeader::V2(DataPageHeaderV2 { + num_values: num_values as i32, + encoding: encoding.into(), + num_nulls: null_count as i32, + num_rows: num_rows as i32, + definition_levels_byte_length: definition_levels_byte_length as i32, + repetition_levels_byte_length: repetition_levels_byte_length as i32, + is_compressed: Some(options.compression != Compression::Uncompressed), + statistics, + }), + }; + Ok(DataPage::new( + header, + buffer, + None, + descriptor, + Some((0, num_rows)), + )) } /// Auxiliary iterator adapter to declare the size hint of an iterator. diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index 5e1b59c7488..8c6c2fc1284 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -22,12 +22,18 @@ pub fn read_column( let metadata = read_metadata(&mut reader)?; let schema = infer_schema(&metadata)?; + // verify that we can read indexes + let _indexes = read_columns_indexes( + &mut reader, + metadata.row_groups[0].columns(), + &schema.fields, + )?; + let column = schema .fields .iter() .enumerate() - .filter_map(|(i, f)| if f.name == column { Some(i) } else { None }) - .next() + .find_map(|(i, f)| if f.name == column { Some(i) } else { None }) .unwrap(); let mut reader = FileReader::try_new(reader, Some(&[column]), None, None, None)?; @@ -329,7 +335,7 @@ pub fn pyarrow_nullable(column: &str) -> Box { .collect::>(); Box::new(PrimitiveArray::::from(values)) } - "string_large" => { + "int32_dict" => { let keys = PrimitiveArray::::from([Some(0), Some(1), None, Some(1)]); let values = Arc::new(PrimitiveArray::::from_slice([10, 200])); Box::new(DictionaryArray::::from_data(keys, values)) @@ -413,7 +419,13 @@ pub fn pyarrow_nullable_statistics(column: &str) -> Option> min_value: Some(0), max_value: Some(9), }), - "string_large" => return None, + "int32_dict" => Box::new(PrimitiveStatistics { + data_type: DataType::Dictionary(IntegerType::Int32, Box::new(DataType::Int32), false), + null_count: Some(0), + distinct_count: None, + min_value: Some(10), + max_value: Some(200), + }), "decimal_9" => Box::new(PrimitiveStatistics:: { distinct_count: None, null_count: Some(3), @@ -716,8 +728,7 @@ fn integration_write(schema: &Schema, batches: &[Chunk>]) -> Resu writer.start()?; for group in row_groups { - let (group, len) = group?; - writer.write(group, len)?; + writer.write(group?)?; } let (_size, writer) = writer.end(None)?; diff --git a/tests/it/io/parquet/write.rs b/tests/it/io/parquet/write.rs index c9141f4d515..a068a8335a1 100644 --- a/tests/it/io/parquet/write.rs +++ b/tests/it/io/parquet/write.rs @@ -49,8 +49,7 @@ fn round_trip( writer.start()?; for group in row_groups { - let (group, len) = group?; - writer.write(group, len)?; + writer.write(group?)?; } let (_size, writer) = writer.end(None)?; @@ -354,7 +353,7 @@ fn utf8_optional_v2_delta() -> Result<()> { #[test] fn i32_optional_v2_dict() -> Result<()> { round_trip( - "string_large", + "int32_dict", true, false, Version::V2, @@ -366,7 +365,7 @@ fn i32_optional_v2_dict() -> Result<()> { #[test] fn i32_optional_v2_dict_compressed() -> Result<()> { round_trip( - "string_large", + "int32_dict", true, false, Version::V2,