From 29554cc8288f24c6f0a99ba5973b51a910dddb87 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Sat, 24 Jul 2021 19:55:18 +0000 Subject: [PATCH] Added support for dictionary-encoding. --- .github/workflows/integration-parquet.yml | 2 +- Cargo.toml | 2 +- arrow-parquet-integration-testing/main.py | 3 +- .../main_spark.py | 49 +++-- arrow-parquet-integration-testing/src/main.rs | 1 + src/io/parquet/mod.rs | 26 ++- src/io/parquet/read/binary/basic.rs | 51 +++-- src/io/parquet/read/binary/nested.rs | 11 +- src/io/parquet/read/boolean/basic.rs | 11 +- src/io/parquet/read/boolean/nested.rs | 11 +- src/io/parquet/read/fixed_size_binary.rs | 11 +- src/io/parquet/read/mod.rs | 24 ++- src/io/parquet/read/primitive/basic.rs | 49 +++-- src/io/parquet/read/primitive/dictionary.rs | 201 ++++++++++++++++++ src/io/parquet/read/primitive/mod.rs | 17 +- src/io/parquet/read/primitive/nested.rs | 12 +- src/io/parquet/read/statistics/mod.rs | 6 + src/io/parquet/write/binary/basic.rs | 6 +- src/io/parquet/write/binary/mod.rs | 1 + src/io/parquet/write/binary/nested.rs | 4 +- src/io/parquet/write/boolean/basic.rs | 4 +- src/io/parquet/write/boolean/nested.rs | 4 +- src/io/parquet/write/dictionary.rs | 196 +++++++++++++++++ src/io/parquet/write/fixed_len_bytes.rs | 6 +- src/io/parquet/write/mod.rs | 86 ++++++-- src/io/parquet/write/primitive/basic.rs | 6 +- src/io/parquet/write/primitive/mod.rs | 1 + src/io/parquet/write/primitive/nested.rs | 4 +- src/io/parquet/write/record_batch.rs | 9 +- src/io/parquet/write/utf8/basic.rs | 6 +- src/io/parquet/write/utf8/mod.rs | 1 + src/io/parquet/write/utf8/nested.rs | 4 +- src/io/parquet/write/utils.rs | 20 +- 33 files changed, 683 insertions(+), 162 deletions(-) create mode 100644 src/io/parquet/read/primitive/dictionary.rs create mode 100644 src/io/parquet/write/dictionary.rs diff --git a/.github/workflows/integration-parquet.yml b/.github/workflows/integration-parquet.yml index 23d2a27eafd..0bfa715ba32 100644 --- a/.github/workflows/integration-parquet.yml +++ b/.github/workflows/integration-parquet.yml @@ -48,5 +48,5 @@ jobs: pip install --upgrade pip pip install pyarrow pyspark python main.py - # test delta encoding against spark (pyarrow does not support it) + # test against spark python main_spark.py diff --git a/Cargo.toml b/Cargo.toml index 55eeab15e41..f8a3e2b5d03 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -58,7 +58,7 @@ futures = { version = "0.3", optional = true } # for faster hashing ahash = { version = "0.7", optional = true } -parquet2 = { version = "0.1", optional = true } +parquet2 = { git = "https://github.com/jorgecarleitao/parquet2", rev = "ad20b74b60f495f5dde4717985020b022fe99fb2", optional = true } [dev-dependencies] rand = "0.8" diff --git a/arrow-parquet-integration-testing/main.py b/arrow-parquet-integration-testing/main.py index 41e7cdefde9..62a477d398a 100644 --- a/arrow-parquet-integration-testing/main.py +++ b/arrow-parquet-integration-testing/main.py @@ -65,7 +65,8 @@ def variations(): "generated_datetime", "generated_decimal", "generated_interval", - # requires writing Dictionary + # see https://issues.apache.org/jira/browse/ARROW-13486 and + # https://issues.apache.org/jira/browse/ARROW-13487 # "generated_dictionary", # requires writing Struct # "generated_duplicate_fieldnames", diff --git a/arrow-parquet-integration-testing/main_spark.py b/arrow-parquet-integration-testing/main_spark.py index 3268d0613d4..a51dcbaf65c 100644 --- a/arrow-parquet-integration-testing/main_spark.py +++ b/arrow-parquet-integration-testing/main_spark.py @@ -6,27 +6,40 @@ from main import _prepare, _expected -_file = "generated_primitive" -_version = "2" -_encoding = "delta" -column = ("utf8_nullable", 24) -expected = _expected(_file) -expected = next(c for i, c in enumerate(expected) if i == column[1]) -expected = expected.combine_chunks().tolist() +def test(file: str, version: str, column, encoding: str): + """ + Tests that pyspark can read a parquet file written by arrow2. -path = _prepare(_file, _version, _encoding, [column[1]]) + In arrow2: read IPC, write parquet + In pyarrow: read (same) IPC to Python + In pyspark: read (written) parquet to Python + assert that they are equal + """ + # write parquet + path = _prepare(file, version, encoding, [column[1]]) -spark = pyspark.sql.SparkSession.builder.config( - # see https://stackoverflow.com/a/62024670/931303 - "spark.sql.parquet.enableVectorizedReader", - "false", -).getOrCreate() + # read IPC to Python + expected = _expected(file) + expected = next(c for i, c in enumerate(expected) if i == column[1]) + expected = expected.combine_chunks().tolist() -df = spark.read.parquet(path) + # read parquet to Python + spark = pyspark.sql.SparkSession.builder.config( + # see https://stackoverflow.com/a/62024670/931303 + "spark.sql.parquet.enableVectorizedReader", + "false", + ).getOrCreate() -r = df.select(column[0]).collect() -os.remove(path) + result = spark.read.parquet(path).select(column[0]).collect() + result = [r[column[0]] for r in result] + os.remove(path) -result = [r[column[0]] for r in r] -assert expected == result + # assert equality + assert expected == result + + +test("generated_primitive", "2", ("utf8_nullable", 24), "delta") + +test("generated_dictionary", "1", ("dict0", 0), "") +test("generated_dictionary", "2", ("dict0", 0), "") diff --git a/arrow-parquet-integration-testing/src/main.rs b/arrow-parquet-integration-testing/src/main.rs index 7304eb0dd47..f41bca23015 100644 --- a/arrow-parquet-integration-testing/src/main.rs +++ b/arrow-parquet-integration-testing/src/main.rs @@ -162,6 +162,7 @@ fn main() -> Result<()> { .fields() .iter() .map(|x| match x.data_type() { + DataType::Dictionary(_, _) => Encoding::RleDictionary, DataType::Utf8 | DataType::LargeUtf8 => { if utf8_encoding == "delta" { Encoding::DeltaLengthByteArray diff --git a/src/io/parquet/mod.rs b/src/io/parquet/mod.rs index 1150fec4db3..b8d08cb672a 100644 --- a/src/io/parquet/mod.rs +++ b/src/io/parquet/mod.rs @@ -250,14 +250,19 @@ mod tests { .iter() .map(|x| x.map(|x| x as u32)) .collect::>(); - Box::new(PrimitiveArray::::from(values).to(DataType::UInt32)) + Box::new(PrimitiveArray::::from(values)) + } + 6 => { + let keys = PrimitiveArray::::from([Some(0), Some(1), None, Some(1)]); + let values = Arc::new(PrimitiveArray::::from_slice([10, 200])); + Box::new(DictionaryArray::::from_data(keys, values)) } _ => unreachable!(), } } - pub fn pyarrow_nullable_statistics(column: usize) -> Box { - match column { + pub fn pyarrow_nullable_statistics(column: usize) -> Option> { + Some(match column { 0 => Box::new(PrimitiveStatistics:: { data_type: DataType::Int64, distinct_count: None, @@ -300,8 +305,9 @@ mod tests { min_value: Some(0), max_value: Some(9), }), + 6 => return None, _ => unreachable!(), - } + }) } // these values match the values in `integration` @@ -331,8 +337,8 @@ mod tests { } } - pub fn pyarrow_required_statistics(column: usize) -> Box { - match column { + pub fn pyarrow_required_statistics(column: usize) -> Option> { + Some(match column { 0 => Box::new(PrimitiveStatistics:: { data_type: DataType::Int64, null_count: Some(0), @@ -353,11 +359,11 @@ mod tests { max_value: Some("def".to_string()), }), _ => unreachable!(), - } + }) } - pub fn pyarrow_nested_nullable_statistics(column: usize) -> Box { - match column { + pub fn pyarrow_nested_nullable_statistics(column: usize) -> Option> { + Some(match column { 3 => Box::new(PrimitiveStatistics:: { data_type: DataType::Int16, distinct_count: None, @@ -390,7 +396,7 @@ mod tests { min_value: Some(0), max_value: Some(9), }), - } + }) } } diff --git a/src/io/parquet/read/binary/basic.rs b/src/io/parquet/read/binary/basic.rs index 535741798a7..469df2125a4 100644 --- a/src/io/parquet/read/binary/basic.rs +++ b/src/io/parquet/read/binary/basic.rs @@ -1,7 +1,8 @@ use parquet2::{ encoding::{bitpacking, delta_length_byte_array, hybrid_rle, uleb128, Encoding}, metadata::{ColumnChunkMetaData, ColumnDescriptor}, - read::{levels, BinaryPageDict, Page, PageHeader, StreamingIterator}, + page::{BinaryPageDict, DataPage, DataPageHeader}, + read::{levels, StreamingIterator}, }; use crate::{ @@ -202,7 +203,7 @@ pub(super) fn read_plain_required( } fn extend_from_page( - page: &Page, + page: &DataPage, descriptor: &ColumnDescriptor, offsets: &mut MutableBuffer, values: &mut MutableBuffer, @@ -212,22 +213,24 @@ fn extend_from_page( assert!(descriptor.max_def_level() <= 1); let is_optional = descriptor.max_def_level() == 1; match page.header() { - PageHeader::V1(header) => { + DataPageHeader::V1(header) => { assert_eq!(header.definition_level_encoding, Encoding::Rle); let (_, validity_buffer, values_buffer) = levels::split_buffer_v1(page.buffer(), false, is_optional); match (&page.encoding(), page.dictionary_page(), is_optional) { - (Encoding::PlainDictionary, Some(dict), true) => read_dict_buffer::( - validity_buffer, - values_buffer, - page.num_values() as u32, - dict.as_any().downcast_ref().unwrap(), - offsets, - values, - validity, - ), + (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true) => { + read_dict_buffer::( + validity_buffer, + values_buffer, + page.num_values() as u32, + dict.as_any().downcast_ref().unwrap(), + offsets, + values, + validity, + ) + } (Encoding::DeltaLengthByteArray, None, true) => read_delta_optional::( validity_buffer, values_buffer, @@ -258,22 +261,24 @@ fn extend_from_page( } } } - PageHeader::V2(header) => { + DataPageHeader::V2(header) => { let def_level_buffer_length = header.definition_levels_byte_length as usize; let (_, validity_buffer, values_buffer) = levels::split_buffer_v2(page.buffer(), 0, def_level_buffer_length); match (page.encoding(), page.dictionary_page(), is_optional) { - (Encoding::PlainDictionary, Some(dict), true) => read_dict_buffer::( - validity_buffer, - values_buffer, - page.num_values() as u32, - dict.as_any().downcast_ref().unwrap(), - offsets, - values, - validity, - ), + (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true) => { + read_dict_buffer::( + validity_buffer, + values_buffer, + page.num_values() as u32, + dict.as_any().downcast_ref().unwrap(), + offsets, + values, + validity, + ) + } (Encoding::Plain, None, true) => read_plain_optional::( validity_buffer, values_buffer, @@ -317,7 +322,7 @@ where ArrowError: From, O: Offset, E: Clone, - I: StreamingIterator>, + I: StreamingIterator>, { let capacity = metadata.num_values() as usize; let mut values = MutableBuffer::::with_capacity(0); diff --git a/src/io/parquet/read/binary/nested.rs b/src/io/parquet/read/binary/nested.rs index d3835a521d8..4582f532ddf 100644 --- a/src/io/parquet/read/binary/nested.rs +++ b/src/io/parquet/read/binary/nested.rs @@ -3,9 +3,10 @@ use std::sync::Arc; use parquet2::{ encoding::Encoding, metadata::{ColumnChunkMetaData, ColumnDescriptor}, + page::{DataPage, DataPageHeader}, read::{ levels::{get_bit_width, split_buffer_v1, split_buffer_v2, RLEDecoder}, - Page, PageHeader, StreamingIterator, + StreamingIterator, }, }; @@ -108,7 +109,7 @@ fn read( } fn extend_from_page( - page: &Page, + page: &DataPage, descriptor: &ColumnDescriptor, is_nullable: bool, nested: &mut Vec>, @@ -119,7 +120,7 @@ fn extend_from_page( let additional = page.num_values(); match page.header() { - PageHeader::V1(header) => { + DataPageHeader::V1(header) => { assert_eq!(header.definition_level_encoding, Encoding::Rle); assert_eq!(header.repetition_level_encoding, Encoding::Rle); @@ -161,7 +162,7 @@ fn extend_from_page( } } } - PageHeader::V2(header) => match (&page.encoding(), page.dictionary_page()) { + DataPageHeader::V2(header) => match (&page.encoding(), page.dictionary_page()) { (Encoding::Plain, None) => { let def_level_buffer_length = header.definition_levels_byte_length as usize; let rep_level_buffer_length = header.repetition_levels_byte_length as usize; @@ -207,7 +208,7 @@ where O: Offset, ArrowError: From, E: Clone, - I: StreamingIterator>, + I: StreamingIterator>, { let capacity = metadata.num_values() as usize; let mut values = MutableBuffer::::with_capacity(0); diff --git a/src/io/parquet/read/boolean/basic.rs b/src/io/parquet/read/boolean/basic.rs index 29f959e49e3..055d9061657 100644 --- a/src/io/parquet/read/boolean/basic.rs +++ b/src/io/parquet/read/boolean/basic.rs @@ -8,7 +8,8 @@ use super::super::utils; use parquet2::{ encoding::{hybrid_rle, Encoding}, metadata::{ColumnChunkMetaData, ColumnDescriptor}, - read::{levels, Page, PageHeader, StreamingIterator}, + page::{DataPage, DataPageHeader}, + read::{levels, StreamingIterator}, }; pub(super) fn read_required(buffer: &[u8], additional: usize, values: &mut MutableBitmap) { @@ -68,7 +69,7 @@ pub fn iter_to_array(mut iter: I, metadata: &ColumnChunkMetaData) -> Resul where ArrowError: From, E: Clone, - I: StreamingIterator>, + I: StreamingIterator>, { let capacity = metadata.num_values() as usize; let mut values = MutableBitmap::with_capacity(capacity); @@ -86,7 +87,7 @@ where } fn extend_from_page( - page: &Page, + page: &DataPage, descriptor: &ColumnDescriptor, values: &mut MutableBitmap, validity: &mut MutableBitmap, @@ -95,7 +96,7 @@ fn extend_from_page( assert!(descriptor.max_def_level() <= 1); let is_optional = descriptor.max_def_level() == 1; match page.header() { - PageHeader::V1(header) => { + DataPageHeader::V1(header) => { assert_eq!(header.definition_level_encoding, Encoding::Rle); match (&page.encoding(), page.dictionary_page(), is_optional) { @@ -124,7 +125,7 @@ fn extend_from_page( } } } - PageHeader::V2(header) => { + DataPageHeader::V2(header) => { let def_level_buffer_length = header.definition_levels_byte_length as usize; match (page.encoding(), page.dictionary_page(), is_optional) { (Encoding::Plain, None, true) => { diff --git a/src/io/parquet/read/boolean/nested.rs b/src/io/parquet/read/boolean/nested.rs index 154d08852f3..43bfd5c501c 100644 --- a/src/io/parquet/read/boolean/nested.rs +++ b/src/io/parquet/read/boolean/nested.rs @@ -3,9 +3,10 @@ use std::sync::Arc; use parquet2::{ encoding::Encoding, metadata::{ColumnChunkMetaData, ColumnDescriptor}, + page::{DataPage, DataPageHeader}, read::{ levels::{get_bit_width, split_buffer_v1, split_buffer_v2, RLEDecoder}, - Page, PageHeader, StreamingIterator, + StreamingIterator, }, }; @@ -95,7 +96,7 @@ fn read( } fn extend_from_page( - page: &Page, + page: &DataPage, descriptor: &ColumnDescriptor, is_nullable: bool, nested: &mut Vec>, @@ -105,7 +106,7 @@ fn extend_from_page( let additional = page.num_values(); match page.header() { - PageHeader::V1(header) => { + DataPageHeader::V1(header) => { assert_eq!(header.definition_level_encoding, Encoding::Rle); assert_eq!(header.repetition_level_encoding, Encoding::Rle); @@ -146,7 +147,7 @@ fn extend_from_page( } } } - PageHeader::V2(header) => match (&page.encoding(), page.dictionary_page()) { + DataPageHeader::V2(header) => match (&page.encoding(), page.dictionary_page()) { (Encoding::Plain, None) => { let def_level_buffer_length = header.definition_levels_byte_length as usize; let rep_level_buffer_length = header.repetition_levels_byte_length as usize; @@ -190,7 +191,7 @@ pub fn iter_to_array( where ArrowError: From, E: Clone, - I: StreamingIterator>, + I: StreamingIterator>, { let capacity = metadata.num_values() as usize; let mut values = MutableBitmap::with_capacity(capacity); diff --git a/src/io/parquet/read/fixed_size_binary.rs b/src/io/parquet/read/fixed_size_binary.rs index 9408aaa7d2a..cb87661d2e7 100644 --- a/src/io/parquet/read/fixed_size_binary.rs +++ b/src/io/parquet/read/fixed_size_binary.rs @@ -1,6 +1,7 @@ use parquet2::{ encoding::{bitpacking, hybrid_rle, uleb128, Encoding}, - read::{levels, FixedLenByteArrayPageDict, Page, PageHeader, StreamingIterator}, + page::{DataPage, DataPageHeader, FixedLenByteArrayPageDict}, + read::{levels, StreamingIterator}, }; use super::{ColumnChunkMetaData, ColumnDescriptor}; @@ -136,7 +137,7 @@ pub fn iter_to_array( where ArrowError: From, E: Clone, - I: StreamingIterator>, + I: StreamingIterator>, { let capacity = metadata.num_values() as usize; let mut values = MutableBuffer::::with_capacity(capacity * size as usize); @@ -159,7 +160,7 @@ where } pub(crate) fn extend_from_page( - page: &Page, + page: &DataPage, size: i32, descriptor: &ColumnDescriptor, values: &mut MutableBuffer, @@ -169,7 +170,7 @@ pub(crate) fn extend_from_page( assert!(descriptor.max_def_level() <= 1); let is_optional = descriptor.max_def_level() == 1; match page.header() { - PageHeader::V1(header) => { + DataPageHeader::V1(header) => { assert_eq!(header.definition_level_encoding, Encoding::Rle); let (_, validity_buffer, values_buffer) = @@ -207,7 +208,7 @@ pub(crate) fn extend_from_page( } } } - PageHeader::V2(header) => { + DataPageHeader::V2(header) => { let def_level_buffer_length = header.definition_levels_byte_length as usize; match (page.encoding(), page.dictionary_page(), is_optional) { diff --git a/src/io/parquet/read/mod.rs b/src/io/parquet/read/mod.rs index b8bd83dead4..1a57fd1d30b 100644 --- a/src/io/parquet/read/mod.rs +++ b/src/io/parquet/read/mod.rs @@ -22,10 +22,10 @@ pub use schema::{get_schema, is_type_nullable, FileMetaData}; pub use parquet2::{ error::ParquetError, metadata::{ColumnChunkMetaData, ColumnDescriptor, RowGroupMetaData}, + page::{CompressedDataPage, DataPage, DataPageHeader}, read::{ decompress, get_page_iterator as _get_page_iterator, read_metadata as _read_metadata, - streaming_iterator, CompressedPage, Decompressor, Page, PageHeader, PageIterator, - StreamingIterator, + streaming_iterator, Decompressor, PageIterator, StreamingIterator, }, schema::{ types::{LogicalType, ParquetType, PhysicalType, PrimitiveConvertedType}, @@ -52,7 +52,9 @@ pub fn read_metadata(reader: &mut R) -> Result { Ok(_read_metadata(reader)?) } -pub fn page_iter_to_array>>( +pub fn page_iter_to_array< + I: StreamingIterator>, +>( iter: &mut I, metadata: &ColumnChunkMetaData, data_type: DataType, @@ -140,6 +142,20 @@ pub fn page_iter_to_array match key.as_ref() { + Int32 => match values.as_ref() { + Int32 => primitive::iter_to_dict_array::( + iter, + metadata, + data_type, + |x: i32| x as i32, + ), + _ => todo!(), + }, + _ => todo!(), + }, + other => Err(ArrowError::NotYetImplemented(format!( "The conversion of {:?} to arrow still not implemented", other @@ -194,7 +210,7 @@ mod tests { }; assert_eq!(expected.as_ref(), array.as_ref()); - assert_eq!(expected_statistics.as_ref(), statistics.unwrap().as_ref()); + assert_eq!(expected_statistics, statistics); Ok(()) } diff --git a/src/io/parquet/read/primitive/basic.rs b/src/io/parquet/read/primitive/basic.rs index 9cfaa7ced25..102c958a3ee 100644 --- a/src/io/parquet/read/primitive/basic.rs +++ b/src/io/parquet/read/primitive/basic.rs @@ -1,6 +1,7 @@ use parquet2::{ encoding::{bitpacking, hybrid_rle, uleb128, Encoding}, - read::{levels, Page, PageHeader, PrimitivePageDict}, + page::{DataPage, DataPageHeader, PrimitivePageDict}, + read::levels, types::NativeType, }; @@ -142,7 +143,7 @@ fn read_required( } pub fn extend_from_page( - page: &Page, + page: &DataPage, descriptor: &ColumnDescriptor, values: &mut MutableBuffer, validity: &mut MutableBitmap, @@ -158,22 +159,24 @@ where assert_eq!(descriptor.max_rep_level(), 0); let is_optional = descriptor.max_def_level() == 1; match page.header() { - PageHeader::V1(header) => { + DataPageHeader::V1(header) => { assert_eq!(header.definition_level_encoding, Encoding::Rle); let (_, validity_buffer, values_buffer) = levels::split_buffer_v1(page.buffer(), false, is_optional); match (&page.encoding(), page.dictionary_page(), is_optional) { - (Encoding::PlainDictionary, Some(dict), true) => read_dict_buffer_optional( - validity_buffer, - values_buffer, - additional, - dict.as_any().downcast_ref().unwrap(), - values, - validity, - op, - ), + (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true) => { + read_dict_buffer_optional( + validity_buffer, + values_buffer, + additional, + dict.as_any().downcast_ref().unwrap(), + values, + validity, + op, + ) + } (Encoding::Plain, None, true) => read_nullable( validity_buffer, values_buffer, @@ -196,21 +199,23 @@ where } } } - PageHeader::V2(header) => { + DataPageHeader::V2(header) => { let def_level_buffer_length = header.definition_levels_byte_length as usize; let (_, validity_buffer, values_buffer) = levels::split_buffer_v2(page.buffer(), 0, def_level_buffer_length); match (&page.encoding(), page.dictionary_page(), is_optional) { - (Encoding::PlainDictionary, Some(dict), true) => read_dict_buffer_optional( - validity_buffer, - values_buffer, - additional, - dict.as_any().downcast_ref().unwrap(), - values, - validity, - op, - ), + (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true) => { + read_dict_buffer_optional( + validity_buffer, + values_buffer, + additional, + dict.as_any().downcast_ref().unwrap(), + values, + validity, + op, + ) + } (Encoding::Plain, None, true) => read_nullable( validity_buffer, values_buffer, diff --git a/src/io/parquet/read/primitive/dictionary.rs b/src/io/parquet/read/primitive/dictionary.rs new file mode 100644 index 00000000000..8c864cfb133 --- /dev/null +++ b/src/io/parquet/read/primitive/dictionary.rs @@ -0,0 +1,201 @@ +use std::sync::Arc; + +use parquet2::{ + encoding::{bitpacking, hybrid_rle, uleb128}, + page::{DataPage, DataPageHeader, PrimitivePageDict}, + read::{levels, StreamingIterator}, + schema::Encoding, + types::NativeType, +}; + +use super::super::utils as other_utils; +use super::{ColumnChunkMetaData, ColumnDescriptor}; +use crate::{ + array::{Array, DictionaryArray, DictionaryKey, PrimitiveArray}, + bitmap::{utils::BitmapIter, MutableBitmap}, + buffer::MutableBuffer, + datatypes::DataType, + error::{ArrowError, Result}, + types::NativeType as ArrowNativeType, +}; + +fn read_dict_optional( + validity_buffer: &[u8], + indices_buffer: &[u8], + additional: usize, + dict: &PrimitivePageDict, + indices: &mut MutableBuffer, + values: &mut MutableBuffer, + validity: &mut MutableBitmap, + op: F, +) where + K: DictionaryKey, + T: NativeType, + A: ArrowNativeType, + F: Fn(T) -> A, +{ + let dict_values = dict.values(); + values.extend_from_trusted_len_iter(dict_values.iter().map(|x| op(*x))); + + // SPEC: Data page format: the bit width used to encode the entry ids stored as 1 byte (max bit width = 32), + // SPEC: followed by the values encoded using RLE/Bit packed described above (with the given bit width). + let bit_width = indices_buffer[0]; + let indices_buffer = &indices_buffer[1..]; + + let (_, consumed) = uleb128::decode(indices_buffer); + let indices_buffer = &indices_buffer[consumed..]; + + let non_null_indices_len = indices_buffer.len() * 8 / bit_width as usize; + + let mut new_indices = bitpacking::Decoder::new(indices_buffer, bit_width, non_null_indices_len); + + let validity_iterator = hybrid_rle::Decoder::new(validity_buffer, 1); + + for run in validity_iterator { + match run { + hybrid_rle::HybridEncoded::Bitpacked(packed) => { + let remaining = additional - indices.len(); + let len = std::cmp::min(packed.len() * 8, remaining); + for is_valid in BitmapIter::new(packed, 0, len) { + let value = if is_valid { + K::from_u32(new_indices.next().unwrap()).unwrap() + } else { + K::default() + }; + indices.push(value); + } + validity.extend_from_slice(packed, 0, len); + } + hybrid_rle::HybridEncoded::Rle(value, additional) => { + let is_set = value[0] == 1; + validity.extend_constant(additional, is_set); + if is_set { + (0..additional).for_each(|_| { + let index = K::from_u32(new_indices.next().unwrap()).unwrap(); + indices.push(index) + }) + } else { + values.extend_constant(additional, A::default()) + } + } + } + } +} + +fn extend_from_page( + page: &DataPage, + descriptor: &ColumnDescriptor, + indices: &mut MutableBuffer, + values: &mut MutableBuffer, + validity: &mut MutableBitmap, + op: F, +) -> Result<()> +where + K: DictionaryKey, + T: NativeType, + A: ArrowNativeType, + F: Fn(T) -> A, +{ + let additional = page.num_values(); + + assert_eq!(descriptor.max_rep_level(), 0); + let is_optional = descriptor.max_def_level() == 1; + match page.header() { + DataPageHeader::V1(header) => { + assert_eq!(header.definition_level_encoding, Encoding::Rle); + + let (_, validity_buffer, values_buffer) = + levels::split_buffer_v1(page.buffer(), false, is_optional); + + match (&page.encoding(), page.dictionary_page(), is_optional) { + (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true) => { + read_dict_optional( + validity_buffer, + values_buffer, + additional, + dict.as_any().downcast_ref().unwrap(), + indices, + values, + validity, + op, + ) + } + _ => { + return Err(other_utils::not_implemented( + &page.encoding(), + is_optional, + page.dictionary_page().is_some(), + "V1", + "primitive", + )) + } + } + } + DataPageHeader::V2(header) => { + let def_level_buffer_length = header.definition_levels_byte_length as usize; + + let (_, validity_buffer, values_buffer) = + levels::split_buffer_v2(page.buffer(), 0, def_level_buffer_length); + match (&page.encoding(), page.dictionary_page(), is_optional) { + (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true) => { + read_dict_optional( + validity_buffer, + values_buffer, + additional, + dict.as_any().downcast_ref().unwrap(), + indices, + values, + validity, + op, + ) + } + _ => { + return Err(other_utils::not_implemented( + &page.encoding(), + is_optional, + page.dictionary_page().is_some(), + "V2", + "primitive", + )) + } + } + } + }; + Ok(()) +} + +pub fn iter_to_array( + mut iter: I, + metadata: &ColumnChunkMetaData, + data_type: DataType, + op: F, +) -> Result> +where + ArrowError: From, + T: NativeType, + K: DictionaryKey, + E: Clone, + A: ArrowNativeType, + F: Copy + Fn(T) -> A, + I: StreamingIterator>, +{ + let capacity = metadata.num_values() as usize; + let mut indices = MutableBuffer::::with_capacity(capacity); + let mut values = MutableBuffer::::with_capacity(capacity); + let mut validity = MutableBitmap::with_capacity(capacity); + while let Some(page) = iter.next() { + extend_from_page( + page.as_ref().map_err(|x| x.clone())?, + metadata.descriptor(), + &mut indices, + &mut values, + &mut validity, + op, + )? + } + + let keys = PrimitiveArray::from_data(K::DATA_TYPE, indices.into(), validity.into()); + let data_type = DictionaryArray::::get_child(&data_type).clone(); + let values = Arc::new(PrimitiveArray::from_data(data_type, values.into(), None)); + Ok(Box::new(DictionaryArray::::from_data(keys, values))) +} diff --git a/src/io/parquet/read/primitive/mod.rs b/src/io/parquet/read/primitive/mod.rs index 0771b739896..de4332b8074 100644 --- a/src/io/parquet/read/primitive/mod.rs +++ b/src/io/parquet/read/primitive/mod.rs @@ -1,13 +1,11 @@ mod basic; +mod dictionary; mod nested; mod utils; use std::sync::Arc; -use parquet2::{ - read::{Page, StreamingIterator}, - types::NativeType, -}; +use parquet2::{page::DataPage, read::StreamingIterator, types::NativeType}; use super::nested_utils::*; use super::{ColumnChunkMetaData, ColumnDescriptor}; @@ -20,6 +18,8 @@ use crate::{ types::NativeType as ArrowNativeType, }; +pub use dictionary::iter_to_array as iter_to_dict_array; + pub fn iter_to_array( mut iter: I, metadata: &ColumnChunkMetaData, @@ -32,7 +32,7 @@ where E: Clone, A: ArrowNativeType, F: Copy + Fn(T) -> A, - I: StreamingIterator>, + I: StreamingIterator>, { let capacity = metadata.num_values() as usize; let mut values = MutableBuffer::::with_capacity(capacity); @@ -47,6 +47,11 @@ where )? } + let data_type = match data_type { + DataType::Dictionary(_, values) => values.as_ref().clone(), + _ => data_type, + }; + Ok(Box::new(PrimitiveArray::from_data( data_type, values.into(), @@ -66,7 +71,7 @@ where E: Clone, A: ArrowNativeType, F: Copy + Fn(T) -> A, - I: StreamingIterator>, + I: StreamingIterator>, { let capacity = metadata.num_values() as usize; let mut values = MutableBuffer::::with_capacity(capacity); diff --git a/src/io/parquet/read/primitive/nested.rs b/src/io/parquet/read/primitive/nested.rs index acba41b9bb6..39f5ae0d8ef 100644 --- a/src/io/parquet/read/primitive/nested.rs +++ b/src/io/parquet/read/primitive/nested.rs @@ -1,9 +1,7 @@ use parquet2::{ encoding::Encoding, - read::{ - levels::{get_bit_width, split_buffer_v1, split_buffer_v2, RLEDecoder}, - Page, PageHeader, - }, + page::{DataPage, DataPageHeader}, + read::levels::{get_bit_width, split_buffer_v1, split_buffer_v2, RLEDecoder}, types::NativeType, }; @@ -112,7 +110,7 @@ fn read( } pub fn extend_from_page( - page: &Page, + page: &DataPage, descriptor: &ColumnDescriptor, is_nullable: bool, nested: &mut Vec>, @@ -128,7 +126,7 @@ where let additional = page.num_values(); match page.header() { - PageHeader::V1(header) => { + DataPageHeader::V1(header) => { assert_eq!(header.definition_level_encoding, Encoding::Rle); assert_eq!(header.repetition_level_encoding, Encoding::Rle); @@ -170,7 +168,7 @@ where } } } - PageHeader::V2(header) => match (&page.encoding(), page.dictionary_page()) { + DataPageHeader::V2(header) => match (&page.encoding(), page.dictionary_page()) { (Encoding::Plain, None) => { let def_level_buffer_length = header.definition_levels_byte_length as usize; let rep_level_buffer_length = header.repetition_levels_byte_length as usize; diff --git a/src/io/parquet/read/statistics/mod.rs b/src/io/parquet/read/statistics/mod.rs index c217ba48d9d..975ec99582c 100644 --- a/src/io/parquet/read/statistics/mod.rs +++ b/src/io/parquet/read/statistics/mod.rs @@ -24,6 +24,12 @@ impl PartialEq for &dyn Statistics { } } +impl PartialEq for Box { + fn eq(&self, other: &Self) -> bool { + self.data_type() == other.data_type() + } +} + pub fn deserialize_statistics(stats: &dyn ParquetStatistics) -> Result> { match stats.physical_type() { PhysicalType::Int32 => { diff --git a/src/io/parquet/write/binary/basic.rs b/src/io/parquet/write/binary/basic.rs index b9da6ce5cfe..4752d72e1d8 100644 --- a/src/io/parquet/write/binary/basic.rs +++ b/src/io/parquet/write/binary/basic.rs @@ -1,7 +1,7 @@ use parquet2::{ encoding::{delta_bitpacked, Encoding}, metadata::ColumnDescriptor, - read::CompressedPage, + page::CompressedDataPage, statistics::{serialize_statistics, BinaryStatistics, ParquetStatistics, Statistics}, write::WriteOptions, }; @@ -14,7 +14,7 @@ use crate::{ io::parquet::read::is_type_nullable, }; -pub(super) fn encode_plain( +pub(crate) fn encode_plain( array: &BinaryArray, is_optional: bool, buffer: &mut Vec, @@ -44,7 +44,7 @@ pub fn array_to_page( options: WriteOptions, descriptor: ColumnDescriptor, encoding: Encoding, -) -> Result { +) -> Result { let validity = array.validity(); let is_optional = is_type_nullable(descriptor.type_()); diff --git a/src/io/parquet/write/binary/mod.rs b/src/io/parquet/write/binary/mod.rs index f3c414b3190..8d9e94cd0fb 100644 --- a/src/io/parquet/write/binary/mod.rs +++ b/src/io/parquet/write/binary/mod.rs @@ -2,5 +2,6 @@ mod basic; mod nested; pub use basic::array_to_page; +pub(crate) use basic::encode_plain; pub(super) use basic::{encode_delta, ord_binary}; pub use nested::array_to_page as nested_array_to_page; diff --git a/src/io/parquet/write/binary/nested.rs b/src/io/parquet/write/binary/nested.rs index 6cefd9b45f0..52a1ee50674 100644 --- a/src/io/parquet/write/binary/nested.rs +++ b/src/io/parquet/write/binary/nested.rs @@ -1,5 +1,5 @@ use parquet2::schema::Encoding; -use parquet2::{metadata::ColumnDescriptor, read::CompressedPage, write::WriteOptions}; +use parquet2::{metadata::ColumnDescriptor, page::CompressedDataPage, write::WriteOptions}; use super::super::{levels, utils}; use super::basic::{build_statistics, encode_plain}; @@ -14,7 +14,7 @@ pub fn array_to_page( options: WriteOptions, descriptor: ColumnDescriptor, nested: levels::NestedInfo, -) -> Result +) -> Result where OO: Offset, O: Offset, diff --git a/src/io/parquet/write/boolean/basic.rs b/src/io/parquet/write/boolean/basic.rs index ca1fca96633..927cc17044e 100644 --- a/src/io/parquet/write/boolean/basic.rs +++ b/src/io/parquet/write/boolean/basic.rs @@ -1,7 +1,7 @@ use parquet2::{ encoding::hybrid_rle::bitpacked_encode, metadata::ColumnDescriptor, - read::CompressedPage, + page::CompressedDataPage, schema::Encoding, statistics::{serialize_statistics, BooleanStatistics, ParquetStatistics, Statistics}, write::WriteOptions, @@ -43,7 +43,7 @@ pub fn array_to_page( array: &BooleanArray, options: WriteOptions, descriptor: ColumnDescriptor, -) -> Result { +) -> Result { let is_optional = is_type_nullable(descriptor.type_()); let validity = array.validity(); diff --git a/src/io/parquet/write/boolean/nested.rs b/src/io/parquet/write/boolean/nested.rs index 0bc61705418..aa172533235 100644 --- a/src/io/parquet/write/boolean/nested.rs +++ b/src/io/parquet/write/boolean/nested.rs @@ -1,5 +1,5 @@ use parquet2::schema::Encoding; -use parquet2::{metadata::ColumnDescriptor, read::CompressedPage, write::WriteOptions}; +use parquet2::{metadata::ColumnDescriptor, page::CompressedDataPage, write::WriteOptions}; use super::super::{levels, utils}; use super::basic::{build_statistics, encode_plain}; @@ -14,7 +14,7 @@ pub fn array_to_page( options: WriteOptions, descriptor: ColumnDescriptor, nested: levels::NestedInfo, -) -> Result +) -> Result where O: Offset, { diff --git a/src/io/parquet/write/dictionary.rs b/src/io/parquet/write/dictionary.rs new file mode 100644 index 00000000000..450ef4f7af7 --- /dev/null +++ b/src/io/parquet/write/dictionary.rs @@ -0,0 +1,196 @@ +use parquet2::encoding::hybrid_rle::encode_u32; +use parquet2::page::{CompressedDictPage, CompressedPage}; +use parquet2::schema::Encoding; +use parquet2::write::DynIter; +use parquet2::{metadata::ColumnDescriptor, write::WriteOptions}; + +use super::binary::encode_plain as binary_encode_plain; +use super::primitive::encode_plain as primitive_encode_plain; +use super::utf8::encode_plain as utf8_encode_plain; +use crate::array::{Array, DictionaryArray, DictionaryKey, PrimitiveArray}; +use crate::bitmap::Bitmap; +use crate::datatypes::DataType; +use crate::error::{ArrowError, Result}; +use crate::io::parquet::read::is_type_nullable; +use crate::io::parquet::write::utils; + +fn encode_keys( + array: &PrimitiveArray, + // todo: merge this to not discard values' validity + validity: &Option, + descriptor: ColumnDescriptor, + options: WriteOptions, +) -> Result { + let is_optional = is_type_nullable(descriptor.type_()); + + let mut buffer = vec![]; + + if let Some(validity) = validity { + let projected_val = array.iter().map(|x| { + x.map(|x| validity.get_bit(x.to_usize().unwrap())) + .unwrap_or(false) + }); + let projected_val = Bitmap::from_trusted_len_iter(projected_val); + + utils::write_def_levels( + &mut buffer, + is_optional, + &Some(projected_val), + array.len(), + options.version, + )?; + } else { + utils::write_def_levels( + &mut buffer, + is_optional, + array.validity(), + array.len(), + options.version, + )?; + } + + let definition_levels_byte_length = buffer.len(); + + // encode indices + // compute the required number of bits + if let Some(validity) = validity { + let keys = array + .iter() + .flatten() + .map(|x| { + let index = x.to_usize().unwrap(); + // discard indices whose values are null, since they are part of the def levels. + if validity.get_bit(index) { + Some(index as u32) + } else { + None + } + }) + .flatten(); + let num_bits = utils::get_bit_width(keys.clone().max().unwrap_or(0) as u64) as u8; + + let keys = utils::ExactSizedIter::new(keys, array.len() - array.null_count()); + + // num_bits as a single byte + buffer.push(num_bits); + + // followed by the encoded indices. + encode_u32(&mut buffer, keys, num_bits)?; + } else { + let keys = array.iter().flatten().map(|x| x.to_usize().unwrap() as u32); + let num_bits = utils::get_bit_width(keys.clone().max().unwrap_or(0) as u64) as u8; + + let keys = utils::ExactSizedIter::new(keys, array.len() - array.null_count()); + + // num_bits as a single byte + buffer.push(num_bits); + + // followed by the encoded indices. + encode_u32(&mut buffer, keys, num_bits)?; + } + + let uncompressed_page_size = buffer.len(); + + let buffer = utils::compress(buffer, options, definition_levels_byte_length)?; + + utils::build_plain_page( + buffer, + array.len(), + array.null_count(), + uncompressed_page_size, + 0, + definition_levels_byte_length, + None, + descriptor, + options, + Encoding::PlainDictionary, + ) + .map(CompressedPage::Data) +} + +macro_rules! dyn_prim { + ($from:ty, $to:ty, $array:expr) => {{ + let values = $array.values().as_any().downcast_ref().unwrap(); + + let mut buffer = vec![]; + primitive_encode_plain::<$from, $to>(values, false, &mut buffer); + + CompressedPage::Dict(CompressedDictPage::new(buffer, values.len())) + }}; +} + +pub fn array_to_pages( + array: &DictionaryArray, + descriptor: ColumnDescriptor, + options: WriteOptions, + encoding: Encoding, +) -> Result>> +where + PrimitiveArray: std::fmt::Display, +{ + match encoding { + Encoding::PlainDictionary | Encoding::RleDictionary => { + // write DictPage + let dict_page = match array.values().data_type() { + DataType::Int8 => dyn_prim!(i8, i32, array), + DataType::Int16 => dyn_prim!(i16, i32, array), + DataType::Int32 | DataType::Date32 | DataType::Time32(_) => { + dyn_prim!(i32, i32, array) + } + DataType::Int64 + | DataType::Date64 + | DataType::Time64(_) + | DataType::Timestamp(_, _) + | DataType::Duration(_) => dyn_prim!(i64, i64, array), + DataType::UInt8 => dyn_prim!(u8, i32, array), + DataType::UInt16 => dyn_prim!(u16, i32, array), + DataType::UInt32 => dyn_prim!(u32, i32, array), + DataType::UInt64 => dyn_prim!(i64, i64, array), + DataType::Utf8 => { + let values = array.values().as_any().downcast_ref().unwrap(); + + let mut buffer = vec![]; + utf8_encode_plain::(values, false, &mut buffer); + CompressedPage::Dict(CompressedDictPage::new(buffer, values.len())) + } + DataType::LargeUtf8 => { + let values = array.values().as_any().downcast_ref().unwrap(); + + let mut buffer = vec![]; + utf8_encode_plain::(values, false, &mut buffer); + CompressedPage::Dict(CompressedDictPage::new(buffer, values.len())) + } + DataType::Binary => { + let values = array.values().as_any().downcast_ref().unwrap(); + + let mut buffer = vec![]; + binary_encode_plain::(values, false, &mut buffer); + CompressedPage::Dict(CompressedDictPage::new(buffer, values.len())) + } + DataType::LargeBinary => { + let values = array.values().as_any().downcast_ref().unwrap(); + + let mut buffer = vec![]; + binary_encode_plain::(values, false, &mut buffer); + CompressedPage::Dict(CompressedDictPage::new(buffer, values.len())) + } + other => { + return Err(ArrowError::NotYetImplemented(format!( + "Writing dictionary arrays to parquet only support data type {:?}", + other + ))) + } + }; + + // write DataPage pointing to DictPage + let data_page = + encode_keys(array.keys(), array.values().validity(), descriptor, options)?; + + let iter = std::iter::once(Ok(dict_page)).chain(std::iter::once(Ok(data_page))); + Ok(DynIter::new(Box::new(iter))) + } + _ => Err(ArrowError::NotYetImplemented( + "Dictionary arrays only support dictionary encoding".to_string(), + )), + } +} diff --git a/src/io/parquet/write/fixed_len_bytes.rs b/src/io/parquet/write/fixed_len_bytes.rs index 3e4d011bd1d..1fd5260b445 100644 --- a/src/io/parquet/write/fixed_len_bytes.rs +++ b/src/io/parquet/write/fixed_len_bytes.rs @@ -1,6 +1,6 @@ use parquet2::{ - compression::create_codec, metadata::ColumnDescriptor, read::CompressedPage, schema::Encoding, - write::WriteOptions, + compression::create_codec, metadata::ColumnDescriptor, page::CompressedDataPage, + schema::Encoding, write::WriteOptions, }; use super::utils; @@ -14,7 +14,7 @@ pub fn array_to_page( array: &FixedSizeBinaryArray, options: WriteOptions, descriptor: ColumnDescriptor, -) -> Result { +) -> Result { let is_optional = is_type_nullable(descriptor.type_()); let validity = array.validity(); diff --git a/src/io/parquet/write/mod.rs b/src/io/parquet/write/mod.rs index 612c9f7b0f0..22d26b18cad 100644 --- a/src/io/parquet/write/mod.rs +++ b/src/io/parquet/write/mod.rs @@ -1,5 +1,6 @@ mod binary; mod boolean; +mod dictionary; mod fixed_len_bytes; mod levels; mod primitive; @@ -10,6 +11,8 @@ mod utils; pub mod stream; +use std::sync::Arc; + use crate::array::*; use crate::bitmap::Bitmap; use crate::buffer::{Buffer, MutableBuffer}; @@ -23,7 +26,7 @@ use crate::types::NativeType; use parquet2::metadata::ColumnDescriptor; pub use parquet2::{ compression::CompressionCodec, - read::CompressedPage, + page::{CompressedDataPage, CompressedPage}, schema::types::ParquetType, schema::Encoding, write::{DynIter, RowGroupIter}, @@ -99,9 +102,54 @@ pub fn can_encode(data_type: &DataType, encoding: Encoding) -> bool { Encoding::DeltaLengthByteArray, DataType::Binary | DataType::LargeBinary | DataType::Utf8 | DataType::LargeUtf8, ) + | (Encoding::RleDictionary, DataType::Dictionary(_, _)) + | (Encoding::PlainDictionary, DataType::Dictionary(_, _)) ) } +/// Returns an iterator of compressed pages, +pub fn array_to_pages( + array: Arc, + descriptor: ColumnDescriptor, + options: WriteOptions, + encoding: Encoding, +) -> Result>> { + match array.data_type() { + DataType::Dictionary(key, _) => match key.as_ref() { + DataType::Int8 => dictionary::array_to_pages::( + array.as_any().downcast_ref().unwrap(), + descriptor, + options, + encoding, + ), + DataType::Int16 => dictionary::array_to_pages::( + array.as_any().downcast_ref().unwrap(), + descriptor, + options, + encoding, + ), + DataType::Int32 => dictionary::array_to_pages::( + array.as_any().downcast_ref().unwrap(), + descriptor, + options, + encoding, + ), + DataType::Int64 => dictionary::array_to_pages::( + array.as_any().downcast_ref().unwrap(), + descriptor, + options, + encoding, + ), + other => Err(ArrowError::NotYetImplemented(format!( + "Writing parquet pages for data type {:?}", + other + ))), + }, + _ => array_to_page(array.as_ref(), descriptor, options, encoding) + .map(|page| DynIter::new(std::iter::once(Ok(page)))), + } +} + pub fn array_to_page( array: &dyn Array, descriptor: ColumnDescriptor, @@ -295,6 +343,7 @@ pub fn array_to_page( other ))), } + .map(CompressedPage::Data) } macro_rules! dyn_nested_prim { @@ -316,7 +365,7 @@ fn list_array_to_page( values: &dyn Array, descriptor: ColumnDescriptor, options: WriteOptions, -) -> Result { +) -> Result { use DataType::*; let is_optional = is_type_nullable(descriptor.type_()); let nested = NestedInfo::new(offsets, validity, is_optional); @@ -395,7 +444,7 @@ fn nested_array_to_page( array: &dyn Array, descriptor: ColumnDescriptor, options: WriteOptions, -) -> Result { +) -> Result { match array.data_type() { DataType::List(_) => { let array = array.as_any().downcast_ref::>().unwrap(); @@ -438,7 +487,7 @@ fn nested_array_to_page( mod tests { use super::*; - use crate::error::Result; + use crate::{error::Result, record_batch::RecordBatch}; use std::io::Cursor; use super::super::tests::*; @@ -467,6 +516,7 @@ mod tests { pyarrow_required_statistics(column), ) }; + let array: Arc = array.into(); let field = Field::new("a1", array.data_type().clone(), nullable); let schema = Schema::new(vec![field]); @@ -479,15 +529,13 @@ mod tests { let parquet_schema = to_parquet_schema(&schema)?; - // one row group - // one column chunk - // one page + let iter = vec![RecordBatch::try_new( + Arc::new(schema.clone()), + vec![array.clone()], + )]; + let row_groups = - std::iter::once(Result::Ok(DynIter::new(std::iter::once(Ok(DynIter::new( - std::iter::once(array.as_ref()) - .zip(parquet_schema.columns().to_vec().into_iter()) - .map(|(array, descriptor)| array_to_page(array, descriptor, options, encoding)), - )))))); + RowGroupIterator::try_new(iter.into_iter(), &schema, options, vec![encoding])?; let mut writer = Cursor::new(vec![]); write_file( @@ -503,7 +551,7 @@ mod tests { let (result, stats) = read_column(&mut Cursor::new(data), 0, 0)?; assert_eq!(array.as_ref(), result.as_ref()); - assert_eq!(statistics.as_ref(), stats.unwrap().as_ref()); + assert_eq!(statistics.as_ref(), stats.as_ref()); Ok(()) } @@ -794,4 +842,16 @@ mod tests { Encoding::DeltaLengthByteArray, ) } + + #[test] + fn test_i32_optional_v2_dict() -> Result<()> { + round_trip( + 6, + true, + false, + Version::V2, + CompressionCodec::Uncompressed, + Encoding::RleDictionary, + ) + } } diff --git a/src/io/parquet/write/primitive/basic.rs b/src/io/parquet/write/primitive/basic.rs index 94be5d72e61..c13a8f9bee8 100644 --- a/src/io/parquet/write/primitive/basic.rs +++ b/src/io/parquet/write/primitive/basic.rs @@ -1,6 +1,6 @@ use parquet2::{ metadata::ColumnDescriptor, - read::CompressedPage, + page::CompressedDataPage, schema::Encoding, statistics::{serialize_statistics, ParquetStatistics, PrimitiveStatistics, Statistics}, types::NativeType, @@ -15,7 +15,7 @@ use crate::{ types::NativeType as ArrowNativeType, }; -pub(super) fn encode_plain(array: &PrimitiveArray, is_optional: bool, buffer: &mut Vec) +pub(crate) fn encode_plain(array: &PrimitiveArray, is_optional: bool, buffer: &mut Vec) where T: ArrowNativeType, R: NativeType, @@ -42,7 +42,7 @@ pub fn array_to_page( array: &PrimitiveArray, options: WriteOptions, descriptor: ColumnDescriptor, -) -> Result +) -> Result where T: ArrowNativeType, R: NativeType, diff --git a/src/io/parquet/write/primitive/mod.rs b/src/io/parquet/write/primitive/mod.rs index 280e2ff9efb..ddeb6541605 100644 --- a/src/io/parquet/write/primitive/mod.rs +++ b/src/io/parquet/write/primitive/mod.rs @@ -2,4 +2,5 @@ mod basic; mod nested; pub use basic::array_to_page; +pub(crate) use basic::encode_plain; pub use nested::array_to_page as nested_array_to_page; diff --git a/src/io/parquet/write/primitive/nested.rs b/src/io/parquet/write/primitive/nested.rs index fafed4ac2c2..05bfb5858b6 100644 --- a/src/io/parquet/write/primitive/nested.rs +++ b/src/io/parquet/write/primitive/nested.rs @@ -1,6 +1,6 @@ use parquet2::schema::Encoding; use parquet2::{ - metadata::ColumnDescriptor, read::CompressedPage, types::NativeType, write::WriteOptions, + metadata::ColumnDescriptor, page::CompressedDataPage, types::NativeType, write::WriteOptions, }; use super::super::levels; @@ -18,7 +18,7 @@ pub fn array_to_page( options: WriteOptions, descriptor: ColumnDescriptor, nested: levels::NestedInfo, -) -> Result +) -> Result where T: ArrowNativeType, R: NativeType, diff --git a/src/io/parquet/write/record_batch.rs b/src/io/parquet/write/record_batch.rs index 117cdc009d6..3db125f815e 100644 --- a/src/io/parquet/write/record_batch.rs +++ b/src/io/parquet/write/record_batch.rs @@ -1,5 +1,5 @@ use super::{ - array_to_page, to_parquet_schema, DynIter, Encoding, RowGroupIter, SchemaDescriptor, + array_to_pages, to_parquet_schema, DynIter, Encoding, RowGroupIter, SchemaDescriptor, WriteOptions, }; use crate::{ @@ -59,12 +59,7 @@ impl>> Iterator for RowGroupIterator { .zip(self.parquet_schema.columns().to_vec().into_iter()) .zip(encodings.into_iter()) .map(move |((array, type_), encoding)| { - Ok(DynIter::new(std::iter::once(array_to_page( - array.as_ref(), - type_, - options, - encoding, - )))) + array_to_pages(array, type_, options, encoding) }), )) }) diff --git a/src/io/parquet/write/utf8/basic.rs b/src/io/parquet/write/utf8/basic.rs index af501f1847b..ab1c074f213 100644 --- a/src/io/parquet/write/utf8/basic.rs +++ b/src/io/parquet/write/utf8/basic.rs @@ -1,7 +1,7 @@ use parquet2::{ encoding::Encoding, metadata::ColumnDescriptor, - read::CompressedPage, + page::CompressedDataPage, statistics::{serialize_statistics, BinaryStatistics, ParquetStatistics, Statistics}, write::WriteOptions, }; @@ -14,7 +14,7 @@ use crate::{ io::parquet::read::is_type_nullable, }; -pub(super) fn encode_plain( +pub(crate) fn encode_plain( array: &Utf8Array, is_optional: bool, buffer: &mut Vec, @@ -43,7 +43,7 @@ pub fn array_to_page( options: WriteOptions, descriptor: ColumnDescriptor, encoding: Encoding, -) -> Result { +) -> Result { let validity = array.validity(); let is_optional = is_type_nullable(descriptor.type_()); diff --git a/src/io/parquet/write/utf8/mod.rs b/src/io/parquet/write/utf8/mod.rs index 280e2ff9efb..ddeb6541605 100644 --- a/src/io/parquet/write/utf8/mod.rs +++ b/src/io/parquet/write/utf8/mod.rs @@ -2,4 +2,5 @@ mod basic; mod nested; pub use basic::array_to_page; +pub(crate) use basic::encode_plain; pub use nested::array_to_page as nested_array_to_page; diff --git a/src/io/parquet/write/utf8/nested.rs b/src/io/parquet/write/utf8/nested.rs index efc75b8aa8d..af213989acd 100644 --- a/src/io/parquet/write/utf8/nested.rs +++ b/src/io/parquet/write/utf8/nested.rs @@ -1,5 +1,5 @@ use parquet2::schema::Encoding; -use parquet2::{metadata::ColumnDescriptor, read::CompressedPage, write::WriteOptions}; +use parquet2::{metadata::ColumnDescriptor, page::CompressedDataPage, write::WriteOptions}; use super::super::{levels, utils}; use super::basic::{build_statistics, encode_plain}; @@ -14,7 +14,7 @@ pub fn array_to_page( options: WriteOptions, descriptor: ColumnDescriptor, nested: levels::NestedInfo, -) -> Result +) -> Result where OO: Offset, O: Offset, diff --git a/src/io/parquet/write/utils.rs b/src/io/parquet/write/utils.rs index bdf247f6712..33c8a89854d 100644 --- a/src/io/parquet/write/utils.rs +++ b/src/io/parquet/write/utils.rs @@ -4,8 +4,8 @@ use parquet2::{ compression::create_codec, encoding::{hybrid_rle::encode_bool, Encoding}, metadata::ColumnDescriptor, - read::{CompressedPage, PageHeader}, - schema::{CompressionCodec, DataPageHeader, DataPageHeaderV2}, + page::{CompressedDataPage, DataPageHeader, DataPageHeaderV1, DataPageHeaderV2}, + schema::CompressionCodec, statistics::ParquetStatistics, write::WriteOptions, }; @@ -70,10 +70,10 @@ pub fn build_plain_page( descriptor: ColumnDescriptor, options: WriteOptions, encoding: Encoding, -) -> Result { +) -> Result { match options.version { Version::V1 => { - let header = PageHeader::V1(DataPageHeader { + let header = DataPageHeader::V1(DataPageHeaderV1 { num_values: len as i32, encoding, definition_level_encoding: Encoding::Rle, @@ -81,7 +81,7 @@ pub fn build_plain_page( statistics, }); - Ok(CompressedPage::new( + Ok(CompressedDataPage::new( header, buffer, options.compression, @@ -91,7 +91,7 @@ pub fn build_plain_page( )) } Version::V2 => { - let header = PageHeader::V2(DataPageHeaderV2 { + let header = DataPageHeader::V2(DataPageHeaderV2 { num_values: len as i32, encoding, num_nulls: null_count as i32, @@ -102,7 +102,7 @@ pub fn build_plain_page( statistics, }); - Ok(CompressedPage::new( + Ok(CompressedDataPage::new( header, buffer, options.compression, @@ -184,3 +184,9 @@ impl> Iterator for ExactSizedIter { (self.remaining, Some(self.remaining)) } } + +/// Returns the number of bits needed to bitpack `max` +#[inline] +pub fn get_bit_width(max: u64) -> u32 { + 64 - max.leading_zeros() +}