From b45866d8525c174cde125b49d813903f216e89ca Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Mon, 27 Sep 2021 06:44:06 +0000 Subject: [PATCH 1/5] Moved compression allocation one level up. --- src/io/parquet/write/binary/basic.rs | 10 +++++-- src/io/parquet/write/binary/nested.rs | 8 +++--- src/io/parquet/write/boolean/basic.rs | 10 +++++-- src/io/parquet/write/boolean/nested.rs | 8 +++--- src/io/parquet/write/dictionary.rs | 10 +++++-- src/io/parquet/write/primitive/basic.rs | 10 +++++-- src/io/parquet/write/primitive/nested.rs | 8 +++--- src/io/parquet/write/utf8/basic.rs | 10 +++++-- src/io/parquet/write/utf8/nested.rs | 8 +++--- src/io/parquet/write/utils.rs | 34 +++++++++++------------- 10 files changed, 75 insertions(+), 41 deletions(-) diff --git a/src/io/parquet/write/binary/basic.rs b/src/io/parquet/write/binary/basic.rs index b8803700bb5..759bab09fa3 100644 --- a/src/io/parquet/write/binary/basic.rs +++ b/src/io/parquet/write/binary/basic.rs @@ -79,7 +79,13 @@ pub fn array_to_page( let uncompressed_page_size = buffer.len(); - let buffer = utils::compress(buffer, options, definition_levels_byte_length)?; + let mut compressed_buffer = vec![]; + let _was_compressed = utils::compress( + &mut buffer, + &mut compressed_buffer, + options, + definition_levels_byte_length, + )?; let statistics = if options.write_statistics { Some(build_statistics(array, descriptor.clone())) @@ -88,7 +94,7 @@ pub fn array_to_page( }; utils::build_plain_page( - buffer, + compressed_buffer, array.len(), array.null_count(), uncompressed_page_size, diff --git a/src/io/parquet/write/binary/nested.rs b/src/io/parquet/write/binary/nested.rs index cf92180a3c9..3310cb32157 100644 --- a/src/io/parquet/write/binary/nested.rs +++ b/src/io/parquet/write/binary/nested.rs @@ -35,8 +35,10 @@ where let uncompressed_page_size = buffer.len(); - let buffer = utils::compress( - buffer, + let mut compressed_buffer = vec![]; + let _was_compressed = utils::compress( + &mut buffer, + &mut compressed_buffer, options, definition_levels_byte_length + repetition_levels_byte_length, )?; @@ -48,7 +50,7 @@ where }; utils::build_plain_page( - buffer, + compressed_buffer, levels::num_values(nested.offsets()), array.null_count(), uncompressed_page_size, diff --git a/src/io/parquet/write/boolean/basic.rs b/src/io/parquet/write/boolean/basic.rs index 604aa49d3cf..8c00570b3b9 100644 --- a/src/io/parquet/write/boolean/basic.rs +++ b/src/io/parquet/write/boolean/basic.rs @@ -62,7 +62,13 @@ pub fn array_to_page( let uncompressed_page_size = buffer.len(); - let buffer = utils::compress(buffer, options, definition_levels_byte_length)?; + let mut compressed_buffer = vec![]; + let _was_compressed = utils::compress( + &mut buffer, + &mut compressed_buffer, + options, + definition_levels_byte_length, + )?; let statistics = if options.write_statistics { Some(build_statistics(array)) @@ -71,7 +77,7 @@ pub fn array_to_page( }; utils::build_plain_page( - buffer, + compressed_buffer, array.len(), array.null_count(), uncompressed_page_size, diff --git a/src/io/parquet/write/boolean/nested.rs b/src/io/parquet/write/boolean/nested.rs index b9726f93b65..619f9cfe76b 100644 --- a/src/io/parquet/write/boolean/nested.rs +++ b/src/io/parquet/write/boolean/nested.rs @@ -34,8 +34,10 @@ where let uncompressed_page_size = buffer.len(); - let buffer = utils::compress( - buffer, + let mut compressed_buffer = vec![]; + let _was_compressed = utils::compress( + &mut buffer, + &mut compressed_buffer, options, definition_levels_byte_length + repetition_levels_byte_length, )?; @@ -47,7 +49,7 @@ where }; utils::build_plain_page( - buffer, + compressed_buffer, levels::num_values(nested.offsets()), array.null_count(), uncompressed_page_size, diff --git a/src/io/parquet/write/dictionary.rs b/src/io/parquet/write/dictionary.rs index 8a221099f5f..5fd861d991f 100644 --- a/src/io/parquet/write/dictionary.rs +++ b/src/io/parquet/write/dictionary.rs @@ -96,10 +96,16 @@ fn encode_keys( let uncompressed_page_size = buffer.len(); - let buffer = utils::compress(buffer, options, definition_levels_byte_length)?; + let mut compressed_buffer = vec![]; + let _was_compressed = utils::compress( + &mut buffer, + &mut compressed_buffer, + options, + definition_levels_byte_length, + )?; utils::build_plain_page( - buffer, + compressed_buffer, array.len(), array.null_count(), uncompressed_page_size, diff --git a/src/io/parquet/write/primitive/basic.rs b/src/io/parquet/write/primitive/basic.rs index a86f246c7b9..8e553a8a8f5 100644 --- a/src/io/parquet/write/primitive/basic.rs +++ b/src/io/parquet/write/primitive/basic.rs @@ -67,7 +67,13 @@ where let uncompressed_page_size = buffer.len(); - let buffer = utils::compress(buffer, options, definition_levels_byte_length)?; + let mut compressed_buffer = vec![]; + let _was_compressed = utils::compress( + &mut buffer, + &mut compressed_buffer, + options, + definition_levels_byte_length, + )?; let statistics = if options.write_statistics { Some(build_statistics(array, descriptor.clone())) @@ -76,7 +82,7 @@ where }; utils::build_plain_page( - buffer, + compressed_buffer, array.len(), array.null_count(), uncompressed_page_size, diff --git a/src/io/parquet/write/primitive/nested.rs b/src/io/parquet/write/primitive/nested.rs index 43483a81499..f6d4e75dedc 100644 --- a/src/io/parquet/write/primitive/nested.rs +++ b/src/io/parquet/write/primitive/nested.rs @@ -40,8 +40,10 @@ where let uncompressed_page_size = buffer.len(); - let buffer = utils::compress( - buffer, + let mut compressed_buffer = vec![]; + let _was_compressed = utils::compress( + &mut buffer, + &mut compressed_buffer, options, definition_levels_byte_length + repetition_levels_byte_length, )?; @@ -53,7 +55,7 @@ where }; utils::build_plain_page( - buffer, + compressed_buffer, levels::num_values(nested.offsets()), array.null_count(), uncompressed_page_size, diff --git a/src/io/parquet/write/utf8/basic.rs b/src/io/parquet/write/utf8/basic.rs index ab1c074f213..da04d28601d 100644 --- a/src/io/parquet/write/utf8/basic.rs +++ b/src/io/parquet/write/utf8/basic.rs @@ -78,7 +78,13 @@ pub fn array_to_page( let uncompressed_page_size = buffer.len(); - let buffer = utils::compress(buffer, options, definition_levels_byte_length)?; + let mut compressed_buffer = vec![]; + let _was_compressed = utils::compress( + &mut buffer, + &mut compressed_buffer, + options, + definition_levels_byte_length, + )?; let statistics = if options.write_statistics { Some(build_statistics(array, descriptor.clone())) @@ -87,7 +93,7 @@ pub fn array_to_page( }; utils::build_plain_page( - buffer, + compressed_buffer, array.len(), array.null_count(), uncompressed_page_size, diff --git a/src/io/parquet/write/utf8/nested.rs b/src/io/parquet/write/utf8/nested.rs index d9b7dafbd0e..4cd70f681a0 100644 --- a/src/io/parquet/write/utf8/nested.rs +++ b/src/io/parquet/write/utf8/nested.rs @@ -35,8 +35,10 @@ where let uncompressed_page_size = buffer.len(); - let buffer = utils::compress( - buffer, + let mut compressed_buffer = vec![]; + let _was_compressed = utils::compress( + &mut buffer, + &mut compressed_buffer, options, definition_levels_byte_length + repetition_levels_byte_length, )?; @@ -48,7 +50,7 @@ where }; utils::build_plain_page( - buffer, + compressed_buffer, levels::num_values(nested.offsets()), array.null_count(), uncompressed_page_size, diff --git a/src/io/parquet/write/utils.rs b/src/io/parquet/write/utils.rs index 8a96a3e6bff..6bf1faa70fb 100644 --- a/src/io/parquet/write/utils.rs +++ b/src/io/parquet/write/utils.rs @@ -59,7 +59,7 @@ pub fn write_def_levels( #[allow(clippy::too_many_arguments)] pub fn build_plain_page( - buffer: Vec, + compressed_buffer: Vec, len: usize, null_count: usize, uncompressed_page_size: usize, @@ -82,7 +82,7 @@ pub fn build_plain_page( Ok(CompressedDataPage::new( header, - buffer, + compressed_buffer, options.compression, uncompressed_page_size, None, @@ -103,7 +103,7 @@ pub fn build_plain_page( Ok(CompressedDataPage::new( header, - buffer, + compressed_buffer, options.compression, uncompressed_page_size, None, @@ -113,33 +113,29 @@ pub fn build_plain_page( } } +/// Compresses `buffer` into `compressed_buffer` according to the parquet specification. +/// Returns whether the buffer was compressed or swapped. pub fn compress( - mut buffer: Vec, + buffer: &mut Vec, + compressed_buffer: &mut Vec, options: WriteOptions, levels_byte_length: usize, -) -> Result> { +) -> Result { let codec = create_codec(&options.compression)?; Ok(if let Some(mut codec) = codec { match options.version { Version::V1 => { - // todo: remove this allocation by extending `buffer` directly. - // needs refactoring `compress`'s API. - let mut tmp = vec![]; - codec.compress(&buffer, &mut tmp)?; - tmp + codec.compress(buffer, compressed_buffer)?; } Version::V2 => { - // todo: remove this allocation by extending `buffer` directly. - // needs refactoring `compress`'s API. - let mut tmp = vec![]; - codec.compress(&buffer[levels_byte_length..], &mut tmp)?; - buffer.truncate(levels_byte_length); - buffer.extend_from_slice(&tmp); - buffer + compressed_buffer.extend_from_slice(&buffer[..levels_byte_length]); + codec.compress(&buffer[levels_byte_length..], compressed_buffer)?; } - } + }; + true } else { - buffer + std::mem::swap(buffer, compressed_buffer); + false }) } From 81e04616a7afc3fab77a60af926aeec0ef788b19 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Tue, 28 Sep 2021 06:04:33 +0000 Subject: [PATCH 2/5] Decoupled encoding from compression. --- Cargo.toml | 6 ++- src/io/parquet/mod.rs | 6 +++ src/io/parquet/read/binary/basic.rs | 9 ++-- src/io/parquet/read/binary/dictionary.rs | 9 ++-- src/io/parquet/read/binary/nested.rs | 10 ++-- src/io/parquet/read/boolean/basic.rs | 14 ++---- src/io/parquet/read/boolean/nested.rs | 10 ++-- src/io/parquet/read/fixed_size_binary.rs | 9 ++-- src/io/parquet/read/mod.rs | 12 ++--- src/io/parquet/read/primitive/dictionary.rs | 9 ++-- src/io/parquet/read/primitive/mod.rs | 21 +++----- src/io/parquet/write/binary/basic.rs | 17 ++----- src/io/parquet/write/binary/nested.rs | 17 ++----- src/io/parquet/write/boolean/basic.rs | 17 ++----- src/io/parquet/write/boolean/nested.rs | 17 ++----- src/io/parquet/write/dictionary.rs | 38 +++++---------- src/io/parquet/write/fixed_len_bytes.rs | 19 +------- src/io/parquet/write/mod.rs | 20 ++++---- src/io/parquet/write/primitive/basic.rs | 17 ++----- src/io/parquet/write/primitive/nested.rs | 17 ++----- src/io/parquet/write/record_batch.rs | 17 +++++-- src/io/parquet/write/utf8/basic.rs | 17 ++----- src/io/parquet/write/utf8/nested.rs | 17 ++----- src/io/parquet/write/utils.rs | 53 +++------------------ tests/it/io/parquet/mod.rs | 11 ++++- 25 files changed, 130 insertions(+), 279 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 20a11dc9061..823f60ef313 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -61,7 +61,9 @@ futures = { version = "0.3", optional = true } # for faster hashing ahash = { version = "0.7", optional = true } -parquet2 = { version = "0.5.2", optional = true, default_features = false, features = ["stream"] } +#parquet2 = { version = "0.5.2", optional = true, default_features = false, features = ["stream"] } +#parquet2 = { git = "https://github.com/jorgecarleitao/parquet2", branch = "reuse_compress", optional = true, default_features = false, features = ["stream"] } +parquet2 = { path = "../parquet2", optional = true, default_features = false, features = ["stream"] } avro-rs = { version = "0.13", optional = true, default_features = false } @@ -100,7 +102,7 @@ full = [ "io_avro", "regex", "merge_sort", - "compute", + #"compute", # parses timezones used in timestamp conversions "chrono-tz" ] diff --git a/src/io/parquet/mod.rs b/src/io/parquet/mod.rs index 6c01e6e6bc4..ad9f28f3ae9 100644 --- a/src/io/parquet/mod.rs +++ b/src/io/parquet/mod.rs @@ -11,3 +11,9 @@ impl From for ArrowError { ArrowError::External("".to_string(), Box::new(error)) } } + +impl From for parquet2::error::ParquetError { + fn from(error: ArrowError) -> Self { + parquet2::error::ParquetError::General(error.to_string()) + } +} diff --git a/src/io/parquet/read/binary/basic.rs b/src/io/parquet/read/binary/basic.rs index 44d52857e99..655e77144eb 100644 --- a/src/io/parquet/read/binary/basic.rs +++ b/src/io/parquet/read/binary/basic.rs @@ -3,7 +3,7 @@ use parquet2::{ encoding::{delta_length_byte_array, hybrid_rle, Encoding}, metadata::{ColumnChunkMetaData, ColumnDescriptor}, page::{BinaryPageDict, DataPage}, - read::StreamingIterator, + FallibleStreamingIterator, }; use crate::{ @@ -308,17 +308,16 @@ pub fn iter_to_array( where ArrowError: From, O: Offset, - E: Clone, - I: StreamingIterator>, + I: FallibleStreamingIterator, { let capacity = metadata.num_values() as usize; let mut values = MutableBuffer::::with_capacity(0); let mut offsets = MutableBuffer::::with_capacity(1 + capacity); offsets.push(O::default()); let mut validity = MutableBitmap::with_capacity(capacity); - while let Some(page) = iter.next() { + while let Some(page) = iter.next()? { extend_from_page( - page.as_ref().map_err(|x| x.clone())?, + page, metadata.descriptor(), &mut offsets, &mut values, diff --git a/src/io/parquet/read/binary/dictionary.rs b/src/io/parquet/read/binary/dictionary.rs index b32616069fc..6e650974f42 100644 --- a/src/io/parquet/read/binary/dictionary.rs +++ b/src/io/parquet/read/binary/dictionary.rs @@ -4,7 +4,7 @@ use parquet2::{ encoding::{hybrid_rle, Encoding}, metadata::{ColumnChunkMetaData, ColumnDescriptor}, page::{BinaryPageDict, DataPage}, - read::StreamingIterator, + FallibleStreamingIterator, }; use super::super::utils as other_utils; @@ -133,17 +133,16 @@ where ArrowError: From, O: Offset, K: DictionaryKey, - E: Clone, - I: StreamingIterator>, + I: FallibleStreamingIterator, { let capacity = metadata.num_values() as usize; let mut indices = MutableBuffer::::with_capacity(capacity); let mut values = MutableBuffer::::with_capacity(0); let mut offsets = MutableBuffer::::with_capacity(1 + capacity); let mut validity = MutableBitmap::with_capacity(capacity); - while let Some(page) = iter.next() { + while let Some(page) = iter.next()? { extend_from_page( - page.as_ref().map_err(|x| x.clone())?, + page, metadata.descriptor(), &mut indices, &mut offsets, diff --git a/src/io/parquet/read/binary/nested.rs b/src/io/parquet/read/binary/nested.rs index 211a79ef958..0482074c480 100644 --- a/src/io/parquet/read/binary/nested.rs +++ b/src/io/parquet/read/binary/nested.rs @@ -4,7 +4,8 @@ use parquet2::{ encoding::{hybrid_rle::HybridRleDecoder, Encoding}, metadata::{ColumnChunkMetaData, ColumnDescriptor}, page::DataPage, - read::{levels::get_bit_width, StreamingIterator}, + read::levels::get_bit_width, + FallibleStreamingIterator, }; use super::super::nested_utils::*; @@ -153,8 +154,7 @@ pub fn iter_to_array( where O: Offset, ArrowError: From, - E: Clone, - I: StreamingIterator>, + I: FallibleStreamingIterator, { let capacity = metadata.num_values() as usize; let mut values = MutableBuffer::::with_capacity(0); @@ -164,9 +164,9 @@ where let (mut nested, is_nullable) = init_nested(metadata.descriptor().base_type(), capacity); - while let Some(page) = iter.next() { + while let Some(page) = iter.next()? { extend_from_page( - page.as_ref().map_err(|x| x.clone())?, + page, metadata.descriptor(), is_nullable, &mut nested, diff --git a/src/io/parquet/read/boolean/basic.rs b/src/io/parquet/read/boolean/basic.rs index 914b2921b9a..128caaac8e9 100644 --- a/src/io/parquet/read/boolean/basic.rs +++ b/src/io/parquet/read/boolean/basic.rs @@ -12,7 +12,7 @@ use parquet2::{ encoding::{hybrid_rle, Encoding}, metadata::{ColumnChunkMetaData, ColumnDescriptor}, page::DataPage, - read::StreamingIterator, + FallibleStreamingIterator, }; pub(super) fn read_required(buffer: &[u8], additional: usize, values: &mut MutableBitmap) { @@ -71,19 +71,13 @@ fn read_optional( pub fn iter_to_array(mut iter: I, metadata: &ColumnChunkMetaData) -> Result where ArrowError: From, - E: Clone, - I: StreamingIterator>, + I: FallibleStreamingIterator, { let capacity = metadata.num_values() as usize; let mut values = MutableBitmap::with_capacity(capacity); let mut validity = MutableBitmap::with_capacity(capacity); - while let Some(page) = iter.next() { - extend_from_page( - page.as_ref().map_err(|x| x.clone())?, - metadata.descriptor(), - &mut values, - &mut validity, - )? + while let Some(page) = iter.next()? { + extend_from_page(page, metadata.descriptor(), &mut values, &mut validity)? } Ok(BooleanArray::from_data( diff --git a/src/io/parquet/read/boolean/nested.rs b/src/io/parquet/read/boolean/nested.rs index fc48477ea88..5d99bbb2f7d 100644 --- a/src/io/parquet/read/boolean/nested.rs +++ b/src/io/parquet/read/boolean/nested.rs @@ -4,7 +4,8 @@ use parquet2::{ encoding::{hybrid_rle::HybridRleDecoder, Encoding}, metadata::{ColumnChunkMetaData, ColumnDescriptor}, page::DataPage, - read::{levels::get_bit_width, StreamingIterator}, + read::levels::get_bit_width, + FallibleStreamingIterator, }; use super::super::nested_utils::*; @@ -137,8 +138,7 @@ pub fn iter_to_array( ) -> Result> where ArrowError: From, - E: Clone, - I: StreamingIterator>, + I: FallibleStreamingIterator, { let capacity = metadata.num_values() as usize; let mut values = MutableBitmap::with_capacity(capacity); @@ -146,9 +146,9 @@ where let (mut nested, is_nullable) = init_nested(metadata.descriptor().base_type(), capacity); - while let Some(page) = iter.next() { + while let Some(page) = iter.next()? { extend_from_page( - page.as_ref().map_err(|x| x.clone())?, + page, metadata.descriptor(), is_nullable, &mut nested, diff --git a/src/io/parquet/read/fixed_size_binary.rs b/src/io/parquet/read/fixed_size_binary.rs index 2a02524785b..9552b2b1082 100644 --- a/src/io/parquet/read/fixed_size_binary.rs +++ b/src/io/parquet/read/fixed_size_binary.rs @@ -2,7 +2,7 @@ use futures::{pin_mut, Stream, StreamExt}; use parquet2::{ encoding::{hybrid_rle, Encoding}, page::{DataPage, FixedLenByteArrayPageDict}, - read::StreamingIterator, + FallibleStreamingIterator, }; use super::{ColumnChunkMetaData, ColumnDescriptor}; @@ -134,17 +134,16 @@ pub fn iter_to_array( ) -> Result where ArrowError: From, - E: Clone, - I: StreamingIterator>, + I: FallibleStreamingIterator, { let size = *FixedSizeBinaryArray::get_size(&data_type) as usize; let capacity = metadata.num_values() as usize; let mut values = MutableBuffer::::with_capacity(capacity * size); let mut validity = MutableBitmap::with_capacity(capacity); - while let Some(page) = iter.next() { + while let Some(page) = iter.next()? { extend_from_page( - page.as_ref().map_err(|x| x.clone())?, + page, size, metadata.descriptor(), &mut values, diff --git a/src/io/parquet/read/mod.rs b/src/io/parquet/read/mod.rs index bb05a744633..d28b7f30a85 100644 --- a/src/io/parquet/read/mod.rs +++ b/src/io/parquet/read/mod.rs @@ -8,18 +8,20 @@ use std::{ use futures::{AsyncRead, AsyncSeek, Stream}; pub use parquet2::{ error::ParquetError, + fallible_streaming_iterator, metadata::{ColumnChunkMetaData, ColumnDescriptor, RowGroupMetaData}, page::{CompressedDataPage, DataPage, DataPageHeader}, read::{ decompress, get_page_iterator as _get_page_iterator, get_page_stream as _get_page_stream, - read_metadata as _read_metadata, read_metadata_async as _read_metadata_async, - streaming_iterator, Decompressor, PageFilter, PageIterator, StreamingIterator, + read_metadata as _read_metadata, read_metadata_async as _read_metadata_async, Decompressor, + PageFilter, PageIterator, }, schema::types::{ LogicalType, ParquetType, PhysicalType, PrimitiveConvertedType, TimeUnit as ParquetTimeUnit, TimestampType, }, types::int96_to_i64_ns, + FallibleStreamingIterator, }; use crate::{ @@ -82,7 +84,7 @@ pub async fn read_metadata_async( fn dict_read< K: DictionaryKey, - I: StreamingIterator>, + I: FallibleStreamingIterator, >( iter: &mut I, metadata: &ColumnChunkMetaData, @@ -164,9 +166,7 @@ fn dict_read< } /// Converts an iterator of [`DataPage`] into a single [`Array`]. -pub fn page_iter_to_array< - I: StreamingIterator>, ->( +pub fn page_iter_to_array>( iter: &mut I, metadata: &ColumnChunkMetaData, data_type: DataType, diff --git a/src/io/parquet/read/primitive/dictionary.rs b/src/io/parquet/read/primitive/dictionary.rs index 962ca34d6ef..6a8caeb17d2 100644 --- a/src/io/parquet/read/primitive/dictionary.rs +++ b/src/io/parquet/read/primitive/dictionary.rs @@ -3,8 +3,8 @@ use std::sync::Arc; use parquet2::{ encoding::{hybrid_rle, Encoding}, page::{DataPage, PrimitivePageDict}, - read::StreamingIterator, types::NativeType, + FallibleStreamingIterator, }; use super::super::utils; @@ -135,18 +135,17 @@ where ArrowError: From, T: NativeType, K: DictionaryKey, - E: Clone, A: ArrowNativeType, F: Copy + Fn(T) -> A, - I: StreamingIterator>, + I: FallibleStreamingIterator, { let capacity = metadata.num_values() as usize; let mut indices = MutableBuffer::::with_capacity(capacity); let mut values = MutableBuffer::::with_capacity(capacity); let mut validity = MutableBitmap::with_capacity(capacity); - while let Some(page) = iter.next() { + while let Some(page) = iter.next()? { extend_from_page( - page.as_ref().map_err(|x| x.clone())?, + page, metadata.descriptor(), &mut indices, &mut values, diff --git a/src/io/parquet/read/primitive/mod.rs b/src/io/parquet/read/primitive/mod.rs index 3ea354e656f..748c1285224 100644 --- a/src/io/parquet/read/primitive/mod.rs +++ b/src/io/parquet/read/primitive/mod.rs @@ -6,7 +6,7 @@ mod utils; use std::sync::Arc; use futures::{pin_mut, Stream, StreamExt}; -use parquet2::{page::DataPage, read::StreamingIterator, types::NativeType}; +use parquet2::{page::DataPage, types::NativeType, FallibleStreamingIterator}; use super::nested_utils::*; use super::{ColumnChunkMetaData, ColumnDescriptor}; @@ -30,22 +30,15 @@ pub fn iter_to_array( where ArrowError: From, T: NativeType, - E: Clone, A: ArrowNativeType, F: Copy + Fn(T) -> A, - I: StreamingIterator>, + I: FallibleStreamingIterator, { let capacity = metadata.num_values() as usize; let mut values = MutableBuffer::::with_capacity(capacity); let mut validity = MutableBitmap::with_capacity(capacity); - while let Some(page) = iter.next() { - basic::extend_from_page( - page.as_ref().map_err(|x| x.clone())?, - metadata.descriptor(), - &mut values, - &mut validity, - op, - )? + while let Some(page) = iter.next()? { + basic::extend_from_page(page, metadata.descriptor(), &mut values, &mut validity, op)? } let data_type = match data_type { @@ -114,7 +107,7 @@ where E: Clone, A: ArrowNativeType, F: Copy + Fn(T) -> A, - I: StreamingIterator>, + I: FallibleStreamingIterator, { let capacity = metadata.num_values() as usize; let mut values = MutableBuffer::::with_capacity(capacity); @@ -122,9 +115,9 @@ where let (mut nested, is_nullable) = init_nested(metadata.descriptor().base_type(), capacity); - while let Some(page) = iter.next() { + while let Some(page) = iter.next()? { nested::extend_from_page( - page.as_ref().map_err(|x| x.clone())?, + page, metadata.descriptor(), is_nullable, &mut nested, diff --git a/src/io/parquet/write/binary/basic.rs b/src/io/parquet/write/binary/basic.rs index 759bab09fa3..7a672dd6fe6 100644 --- a/src/io/parquet/write/binary/basic.rs +++ b/src/io/parquet/write/binary/basic.rs @@ -1,7 +1,7 @@ use parquet2::{ encoding::{delta_bitpacked, Encoding}, metadata::ColumnDescriptor, - page::CompressedDataPage, + page::DataPage, statistics::{serialize_statistics, BinaryStatistics, ParquetStatistics, Statistics}, write::WriteOptions, }; @@ -44,7 +44,7 @@ pub fn array_to_page( options: WriteOptions, descriptor: ColumnDescriptor, encoding: Encoding, -) -> Result { +) -> Result { let validity = array.validity(); let is_optional = is_type_nullable(descriptor.type_()); @@ -77,16 +77,6 @@ pub fn array_to_page( } } - let uncompressed_page_size = buffer.len(); - - let mut compressed_buffer = vec![]; - let _was_compressed = utils::compress( - &mut buffer, - &mut compressed_buffer, - options, - definition_levels_byte_length, - )?; - let statistics = if options.write_statistics { Some(build_statistics(array, descriptor.clone())) } else { @@ -94,10 +84,9 @@ pub fn array_to_page( }; utils::build_plain_page( - compressed_buffer, + buffer, array.len(), array.null_count(), - uncompressed_page_size, 0, definition_levels_byte_length, statistics, diff --git a/src/io/parquet/write/binary/nested.rs b/src/io/parquet/write/binary/nested.rs index 3310cb32157..9161741c4cb 100644 --- a/src/io/parquet/write/binary/nested.rs +++ b/src/io/parquet/write/binary/nested.rs @@ -1,5 +1,5 @@ use parquet2::{ - encoding::Encoding, metadata::ColumnDescriptor, page::CompressedDataPage, write::WriteOptions, + encoding::Encoding, metadata::ColumnDescriptor, page::DataPage, write::WriteOptions, }; use super::super::{levels, utils}; @@ -15,7 +15,7 @@ pub fn array_to_page( options: WriteOptions, descriptor: ColumnDescriptor, nested: levels::NestedInfo, -) -> Result +) -> Result where OO: Offset, O: Offset, @@ -33,16 +33,6 @@ where encode_plain(array, is_optional, &mut buffer); - let uncompressed_page_size = buffer.len(); - - let mut compressed_buffer = vec![]; - let _was_compressed = utils::compress( - &mut buffer, - &mut compressed_buffer, - options, - definition_levels_byte_length + repetition_levels_byte_length, - )?; - let statistics = if options.write_statistics { Some(build_statistics(array, descriptor.clone())) } else { @@ -50,10 +40,9 @@ where }; utils::build_plain_page( - compressed_buffer, + buffer, levels::num_values(nested.offsets()), array.null_count(), - uncompressed_page_size, repetition_levels_byte_length, definition_levels_byte_length, statistics, diff --git a/src/io/parquet/write/boolean/basic.rs b/src/io/parquet/write/boolean/basic.rs index 8c00570b3b9..f9046d6d585 100644 --- a/src/io/parquet/write/boolean/basic.rs +++ b/src/io/parquet/write/boolean/basic.rs @@ -1,7 +1,7 @@ use parquet2::{ encoding::{hybrid_rle::bitpacked_encode, Encoding}, metadata::ColumnDescriptor, - page::CompressedDataPage, + page::DataPage, statistics::{serialize_statistics, BooleanStatistics, ParquetStatistics, Statistics}, write::WriteOptions, }; @@ -42,7 +42,7 @@ pub fn array_to_page( array: &BooleanArray, options: WriteOptions, descriptor: ColumnDescriptor, -) -> Result { +) -> Result { let is_optional = is_type_nullable(descriptor.type_()); let validity = array.validity(); @@ -60,16 +60,6 @@ pub fn array_to_page( encode_plain(array, is_optional, &mut buffer)?; - let uncompressed_page_size = buffer.len(); - - let mut compressed_buffer = vec![]; - let _was_compressed = utils::compress( - &mut buffer, - &mut compressed_buffer, - options, - definition_levels_byte_length, - )?; - let statistics = if options.write_statistics { Some(build_statistics(array)) } else { @@ -77,10 +67,9 @@ pub fn array_to_page( }; utils::build_plain_page( - compressed_buffer, + buffer, array.len(), array.null_count(), - uncompressed_page_size, 0, definition_levels_byte_length, statistics, diff --git a/src/io/parquet/write/boolean/nested.rs b/src/io/parquet/write/boolean/nested.rs index 619f9cfe76b..427c7a05925 100644 --- a/src/io/parquet/write/boolean/nested.rs +++ b/src/io/parquet/write/boolean/nested.rs @@ -1,5 +1,5 @@ use parquet2::{ - encoding::Encoding, metadata::ColumnDescriptor, page::CompressedDataPage, write::WriteOptions, + encoding::Encoding, metadata::ColumnDescriptor, page::DataPage, write::WriteOptions, }; use super::super::{levels, utils}; @@ -15,7 +15,7 @@ pub fn array_to_page( options: WriteOptions, descriptor: ColumnDescriptor, nested: levels::NestedInfo, -) -> Result +) -> Result where O: Offset, { @@ -32,16 +32,6 @@ where encode_plain(array, is_optional, &mut buffer)?; - let uncompressed_page_size = buffer.len(); - - let mut compressed_buffer = vec![]; - let _was_compressed = utils::compress( - &mut buffer, - &mut compressed_buffer, - options, - definition_levels_byte_length + repetition_levels_byte_length, - )?; - let statistics = if options.write_statistics { Some(build_statistics(array)) } else { @@ -49,10 +39,9 @@ where }; utils::build_plain_page( - compressed_buffer, + buffer, levels::num_values(nested.offsets()), array.null_count(), - uncompressed_page_size, repetition_levels_byte_length, definition_levels_byte_length, statistics, diff --git a/src/io/parquet/write/dictionary.rs b/src/io/parquet/write/dictionary.rs index 5fd861d991f..d5df74e5bd6 100644 --- a/src/io/parquet/write/dictionary.rs +++ b/src/io/parquet/write/dictionary.rs @@ -1,7 +1,7 @@ use parquet2::{ encoding::{hybrid_rle::encode_u32, Encoding}, metadata::ColumnDescriptor, - page::{CompressedDictPage, CompressedPage}, + page::{EncodedDictPage, EncodedPage}, write::{DynIter, WriteOptions}, }; @@ -21,7 +21,7 @@ fn encode_keys( validity: Option<&Bitmap>, descriptor: ColumnDescriptor, options: WriteOptions, -) -> Result { +) -> Result { let is_optional = is_type_nullable(descriptor.type_()); let mut buffer = vec![]; @@ -94,21 +94,10 @@ fn encode_keys( encode_u32(&mut buffer, keys, num_bits)?; } - let uncompressed_page_size = buffer.len(); - - let mut compressed_buffer = vec![]; - let _was_compressed = utils::compress( - &mut buffer, - &mut compressed_buffer, - options, - definition_levels_byte_length, - )?; - utils::build_plain_page( - compressed_buffer, + buffer, array.len(), array.null_count(), - uncompressed_page_size, 0, definition_levels_byte_length, None, @@ -116,7 +105,7 @@ fn encode_keys( options, Encoding::RleDictionary, ) - .map(CompressedPage::Data) + .map(EncodedPage::Data) } macro_rules! dyn_prim { @@ -125,9 +114,7 @@ macro_rules! dyn_prim { let mut buffer = vec![]; primitive_encode_plain::<$from, $to>(values, false, &mut buffer); - let buffer = utils::compress(buffer, $options, 0)?; - - CompressedPage::Dict(CompressedDictPage::new(buffer, values.len())) + EncodedDictPage::new(buffer, values.len()) }}; } @@ -136,7 +123,7 @@ pub fn array_to_pages( descriptor: ColumnDescriptor, options: WriteOptions, encoding: Encoding, -) -> Result>> +) -> Result>> where PrimitiveArray: std::fmt::Display, { @@ -163,32 +150,28 @@ where let mut buffer = vec![]; utf8_encode_plain::(values, false, &mut buffer); - let buffer = utils::compress(buffer, options, 0)?; - CompressedPage::Dict(CompressedDictPage::new(buffer, values.len())) + EncodedDictPage::new(buffer, values.len()) } DataType::LargeUtf8 => { let values = array.values().as_any().downcast_ref().unwrap(); let mut buffer = vec![]; utf8_encode_plain::(values, false, &mut buffer); - let buffer = utils::compress(buffer, options, 0)?; - CompressedPage::Dict(CompressedDictPage::new(buffer, values.len())) + EncodedDictPage::new(buffer, values.len()) } DataType::Binary => { let values = array.values().as_any().downcast_ref().unwrap(); let mut buffer = vec![]; binary_encode_plain::(values, false, &mut buffer); - let buffer = utils::compress(buffer, options, 0)?; - CompressedPage::Dict(CompressedDictPage::new(buffer, values.len())) + EncodedDictPage::new(buffer, values.len()) } DataType::LargeBinary => { let values = array.values().as_any().downcast_ref().unwrap(); let mut buffer = vec![]; binary_encode_plain::(values, false, &mut buffer); - let buffer = utils::compress(buffer, options, 0)?; - CompressedPage::Dict(CompressedDictPage::new(buffer, values.len())) + EncodedDictPage::new(buffer, values.len()) } other => { return Err(ArrowError::NotYetImplemented(format!( @@ -197,6 +180,7 @@ where ))) } }; + let dict_page = EncodedPage::Dict(dict_page); // write DataPage pointing to DictPage let data_page = diff --git a/src/io/parquet/write/fixed_len_bytes.rs b/src/io/parquet/write/fixed_len_bytes.rs index 0f64b2f5214..304a79d52f8 100644 --- a/src/io/parquet/write/fixed_len_bytes.rs +++ b/src/io/parquet/write/fixed_len_bytes.rs @@ -1,8 +1,7 @@ use parquet2::{ - compression::create_codec, encoding::Encoding, metadata::ColumnDescriptor, - page::CompressedDataPage, + page::DataPage, statistics::{deserialize_statistics, serialize_statistics, ParquetStatistics}, write::WriteOptions, }; @@ -18,7 +17,7 @@ pub fn array_to_page( array: &FixedSizeBinaryArray, options: WriteOptions, descriptor: ColumnDescriptor, -) -> Result { +) -> Result { let is_optional = is_type_nullable(descriptor.type_()); let validity = array.validity(); @@ -45,19 +44,6 @@ pub fn array_to_page( buffer.extend_from_slice(array.values()); } - let uncompressed_page_size = buffer.len(); - - let codec = create_codec(&options.compression)?; - let buffer = if let Some(mut codec) = codec { - // todo: remove this allocation by extending `buffer` directly. - // needs refactoring `compress`'s API. - let mut tmp = vec![]; - codec.compress(&buffer, &mut tmp)?; - tmp - } else { - buffer - }; - let statistics = if options.write_statistics { build_statistics(array, descriptor.clone()) } else { @@ -68,7 +54,6 @@ pub fn array_to_page( buffer, array.len(), array.null_count(), - uncompressed_page_size, 0, definition_levels_byte_length, statistics, diff --git a/src/io/parquet/write/mod.rs b/src/io/parquet/write/mod.rs index b40fccac59e..fdc3d886652 100644 --- a/src/io/parquet/write/mod.rs +++ b/src/io/parquet/write/mod.rs @@ -24,13 +24,17 @@ use crate::io::parquet::write::levels::NestedInfo; use crate::types::days_ms; use crate::types::NativeType; +use parquet2::page::DataPage; pub use parquet2::{ compression::Compression, encoding::Encoding, metadata::{ColumnDescriptor, KeyValue, SchemaDescriptor}, - page::{CompressedDataPage, CompressedPage}, + page::{CompressedDataPage, CompressedPage, EncodedPage}, schema::types::ParquetType, - write::{write_file as parquet_write_file, DynIter, RowGroupIter, Version, WriteOptions}, + write::{ + write_file as parquet_write_file, Compressor, DynIter, DynStreamingIterator, RowGroupIter, + Version, WriteOptions, + }, }; pub use record_batch::RowGroupIterator; use schema::schema_to_metadata_key; @@ -104,13 +108,13 @@ pub fn can_encode(data_type: &DataType, encoding: Encoding) -> bool { ) } -/// Returns an iterator of compressed pages, +/// Returns an iterator of [`EncodedPage`]. pub fn array_to_pages( array: Arc, descriptor: ColumnDescriptor, options: WriteOptions, encoding: Encoding, -) -> Result>> { +) -> Result>> { match array.data_type() { DataType::Dictionary(key_type, _) => { with_match_dictionary_key_type!(key_type.as_ref(), |$T| { @@ -133,7 +137,7 @@ pub fn array_to_page( descriptor: ColumnDescriptor, options: WriteOptions, encoding: Encoding, -) -> Result { +) -> Result { let data_type = array.data_type(); if !can_encode(data_type, encoding) { return Err(ArrowError::InvalidArgumentError(format!( @@ -319,7 +323,7 @@ pub fn array_to_page( other ))), } - .map(CompressedPage::Data) + .map(EncodedPage::Data) } macro_rules! dyn_nested_prim { @@ -341,7 +345,7 @@ fn list_array_to_page( values: &dyn Array, descriptor: ColumnDescriptor, options: WriteOptions, -) -> Result { +) -> Result { use DataType::*; let is_optional = is_type_nullable(descriptor.type_()); let nested = NestedInfo::new(offsets, validity, is_optional); @@ -420,7 +424,7 @@ fn nested_array_to_page( array: &dyn Array, descriptor: ColumnDescriptor, options: WriteOptions, -) -> Result { +) -> Result { match array.data_type() { DataType::List(_) => { let array = array.as_any().downcast_ref::>().unwrap(); diff --git a/src/io/parquet/write/primitive/basic.rs b/src/io/parquet/write/primitive/basic.rs index 8e553a8a8f5..9b9deb16d0b 100644 --- a/src/io/parquet/write/primitive/basic.rs +++ b/src/io/parquet/write/primitive/basic.rs @@ -1,7 +1,7 @@ use parquet2::{ encoding::Encoding, metadata::ColumnDescriptor, - page::CompressedDataPage, + page::DataPage, statistics::{serialize_statistics, ParquetStatistics, PrimitiveStatistics, Statistics}, types::NativeType, write::WriteOptions, @@ -42,7 +42,7 @@ pub fn array_to_page( array: &PrimitiveArray, options: WriteOptions, descriptor: ColumnDescriptor, -) -> Result +) -> Result where T: ArrowNativeType, R: NativeType, @@ -65,16 +65,6 @@ where encode_plain(array, is_optional, &mut buffer); - let uncompressed_page_size = buffer.len(); - - let mut compressed_buffer = vec![]; - let _was_compressed = utils::compress( - &mut buffer, - &mut compressed_buffer, - options, - definition_levels_byte_length, - )?; - let statistics = if options.write_statistics { Some(build_statistics(array, descriptor.clone())) } else { @@ -82,10 +72,9 @@ where }; utils::build_plain_page( - compressed_buffer, + buffer, array.len(), array.null_count(), - uncompressed_page_size, 0, definition_levels_byte_length, statistics, diff --git a/src/io/parquet/write/primitive/nested.rs b/src/io/parquet/write/primitive/nested.rs index f6d4e75dedc..5be103d08b9 100644 --- a/src/io/parquet/write/primitive/nested.rs +++ b/src/io/parquet/write/primitive/nested.rs @@ -1,5 +1,5 @@ use parquet2::{ - encoding::Encoding, metadata::ColumnDescriptor, page::CompressedDataPage, types::NativeType, + encoding::Encoding, metadata::ColumnDescriptor, page::DataPage, types::NativeType, write::WriteOptions, }; @@ -18,7 +18,7 @@ pub fn array_to_page( options: WriteOptions, descriptor: ColumnDescriptor, nested: levels::NestedInfo, -) -> Result +) -> Result where T: ArrowNativeType, R: NativeType, @@ -38,16 +38,6 @@ where encode_plain(array, is_optional, &mut buffer); - let uncompressed_page_size = buffer.len(); - - let mut compressed_buffer = vec![]; - let _was_compressed = utils::compress( - &mut buffer, - &mut compressed_buffer, - options, - definition_levels_byte_length + repetition_levels_byte_length, - )?; - let statistics = if options.write_statistics { Some(build_statistics(array, descriptor.clone())) } else { @@ -55,10 +45,9 @@ where }; utils::build_plain_page( - compressed_buffer, + buffer, levels::num_values(nested.offsets()), array.null_count(), - uncompressed_page_size, repetition_levels_byte_length, definition_levels_byte_length, statistics, diff --git a/src/io/parquet/write/record_batch.rs b/src/io/parquet/write/record_batch.rs index 3af63a3d9be..bfa599bfd07 100644 --- a/src/io/parquet/write/record_batch.rs +++ b/src/io/parquet/write/record_batch.rs @@ -1,6 +1,9 @@ +use parquet2::write::Compressor; +use parquet2::FallibleStreamingIterator; + use super::{ - array_to_pages, to_parquet_schema, DynIter, Encoding, RowGroupIter, SchemaDescriptor, - WriteOptions, + array_to_pages, to_parquet_schema, DynIter, DynStreamingIterator, Encoding, RowGroupIter, + SchemaDescriptor, WriteOptions, }; use crate::{ datatypes::Schema, @@ -59,8 +62,14 @@ impl>> Iterator for RowGroupIterator { .into_iter() .zip(self.parquet_schema.columns().to_vec().into_iter()) .zip(encodings.into_iter()) - .map(move |((array, type_), encoding)| { - array_to_pages(array, type_, options, encoding) + .map(move |((array, descriptor), encoding)| { + array_to_pages(array, descriptor, options, encoding).map(move |pages| { + let encoded_pages = DynIter::new(pages.map(|x| Ok(x?))); + let compressed_pages = + Compressor::new(encoded_pages, options.compression, vec![]) + .map_err(ArrowError::from); + DynStreamingIterator::new(compressed_pages) + }) }), )) }) diff --git a/src/io/parquet/write/utf8/basic.rs b/src/io/parquet/write/utf8/basic.rs index da04d28601d..f1e8fd3d24c 100644 --- a/src/io/parquet/write/utf8/basic.rs +++ b/src/io/parquet/write/utf8/basic.rs @@ -1,7 +1,7 @@ use parquet2::{ encoding::Encoding, metadata::ColumnDescriptor, - page::CompressedDataPage, + page::DataPage, statistics::{serialize_statistics, BinaryStatistics, ParquetStatistics, Statistics}, write::WriteOptions, }; @@ -43,7 +43,7 @@ pub fn array_to_page( options: WriteOptions, descriptor: ColumnDescriptor, encoding: Encoding, -) -> Result { +) -> Result { let validity = array.validity(); let is_optional = is_type_nullable(descriptor.type_()); @@ -76,16 +76,6 @@ pub fn array_to_page( } } - let uncompressed_page_size = buffer.len(); - - let mut compressed_buffer = vec![]; - let _was_compressed = utils::compress( - &mut buffer, - &mut compressed_buffer, - options, - definition_levels_byte_length, - )?; - let statistics = if options.write_statistics { Some(build_statistics(array, descriptor.clone())) } else { @@ -93,10 +83,9 @@ pub fn array_to_page( }; utils::build_plain_page( - compressed_buffer, + buffer, array.len(), array.null_count(), - uncompressed_page_size, 0, definition_levels_byte_length, statistics, diff --git a/src/io/parquet/write/utf8/nested.rs b/src/io/parquet/write/utf8/nested.rs index 4cd70f681a0..cb87fabf31f 100644 --- a/src/io/parquet/write/utf8/nested.rs +++ b/src/io/parquet/write/utf8/nested.rs @@ -1,5 +1,5 @@ use parquet2::{ - encoding::Encoding, metadata::ColumnDescriptor, page::CompressedDataPage, write::WriteOptions, + encoding::Encoding, metadata::ColumnDescriptor, page::DataPage, write::WriteOptions, }; use super::super::{levels, utils}; @@ -15,7 +15,7 @@ pub fn array_to_page( options: WriteOptions, descriptor: ColumnDescriptor, nested: levels::NestedInfo, -) -> Result +) -> Result where OO: Offset, O: Offset, @@ -33,16 +33,6 @@ where encode_plain(array, is_optional, &mut buffer); - let uncompressed_page_size = buffer.len(); - - let mut compressed_buffer = vec![]; - let _was_compressed = utils::compress( - &mut buffer, - &mut compressed_buffer, - options, - definition_levels_byte_length + repetition_levels_byte_length, - )?; - let statistics = if options.write_statistics { Some(build_statistics(array, descriptor.clone())) } else { @@ -50,10 +40,9 @@ where }; utils::build_plain_page( - compressed_buffer, + buffer, levels::num_values(nested.offsets()), array.null_count(), - uncompressed_page_size, repetition_levels_byte_length, definition_levels_byte_length, statistics, diff --git a/src/io/parquet/write/utils.rs b/src/io/parquet/write/utils.rs index 6bf1faa70fb..6857bbc533f 100644 --- a/src/io/parquet/write/utils.rs +++ b/src/io/parquet/write/utils.rs @@ -1,10 +1,10 @@ use crate::bitmap::Bitmap; use parquet2::{ - compression::{create_codec, Compression}, + compression::Compression, encoding::{hybrid_rle::encode_bool, Encoding}, metadata::ColumnDescriptor, - page::{CompressedDataPage, DataPageHeader, DataPageHeaderV1, DataPageHeaderV2}, + page::{DataPage, DataPageHeader, DataPageHeaderV1, DataPageHeaderV2}, statistics::ParquetStatistics, write::WriteOptions, }; @@ -59,17 +59,16 @@ pub fn write_def_levels( #[allow(clippy::too_many_arguments)] pub fn build_plain_page( - compressed_buffer: Vec, + buffer: Vec, len: usize, null_count: usize, - uncompressed_page_size: usize, repetition_levels_byte_length: usize, definition_levels_byte_length: usize, statistics: Option, descriptor: ColumnDescriptor, options: WriteOptions, encoding: Encoding, -) -> Result { +) -> Result { match options.version { Version::V1 => { let header = DataPageHeader::V1(DataPageHeaderV1 { @@ -80,14 +79,7 @@ pub fn build_plain_page( statistics, }); - Ok(CompressedDataPage::new( - header, - compressed_buffer, - options.compression, - uncompressed_page_size, - None, - descriptor, - )) + Ok(DataPage::new(header, buffer, None, descriptor)) } Version::V2 => { let header = DataPageHeader::V2(DataPageHeaderV2 { @@ -101,44 +93,11 @@ pub fn build_plain_page( statistics, }); - Ok(CompressedDataPage::new( - header, - compressed_buffer, - options.compression, - uncompressed_page_size, - None, - descriptor, - )) + Ok(DataPage::new(header, buffer, None, descriptor)) } } } -/// Compresses `buffer` into `compressed_buffer` according to the parquet specification. -/// Returns whether the buffer was compressed or swapped. -pub fn compress( - buffer: &mut Vec, - compressed_buffer: &mut Vec, - options: WriteOptions, - levels_byte_length: usize, -) -> Result { - let codec = create_codec(&options.compression)?; - Ok(if let Some(mut codec) = codec { - match options.version { - Version::V1 => { - codec.compress(buffer, compressed_buffer)?; - } - Version::V2 => { - compressed_buffer.extend_from_slice(&buffer[..levels_byte_length]); - codec.compress(&buffer[levels_byte_length..], compressed_buffer)?; - } - }; - true - } else { - std::mem::swap(buffer, compressed_buffer); - false - }) -} - /// Auxiliary iterator adapter to declare the size hint of an iterator. pub(super) struct ExactSizedIter> { iter: I, diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index 97a2445a0ba..cac44296792 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -1,6 +1,7 @@ use std::io::{Cursor, Read, Seek}; use std::sync::Arc; +use arrow2::error::ArrowError; use arrow2::{ array::*, bitmap::Bitmap, buffer::Buffer, datatypes::*, error::Result, io::parquet::read::statistics::*, io::parquet::read::*, io::parquet::write::*, @@ -492,13 +493,19 @@ fn integration_write(schema: &Schema, batches: &[RecordBatch]) -> Result .columns() .iter() .zip(descritors.clone()) - .map(|(array, type_)| { + .map(|(array, descriptor)| { let encoding = if let DataType::Dictionary(_, _) = array.data_type() { Encoding::RleDictionary } else { Encoding::Plain }; - array_to_pages(array.clone(), type_, options, encoding) + array_to_pages(array.clone(), descriptor, options, encoding).map(|pages| { + let encoded_pages = DynIter::new(pages.map(|x| Ok(x?))); + let compressed_pages = + Compressor::new(encoded_pages, options.compression, vec![]) + .map_err(ArrowError::from); + DynStreamingIterator::new(compressed_pages) + }) }); let iterator = DynIter::new(iterator); Ok(iterator) From d3946e86402fcabc5451447ec5d5dc54137d752f Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Thu, 14 Oct 2021 19:53:56 +0000 Subject: [PATCH 3/5] Temp --- Cargo.toml | 6 +++--- examples/parquet_read.rs | 3 ++- examples/parquet_read_parallel.rs | 8 +++---- examples/parquet_write.rs | 31 +++++++++++++++++----------- src/io/parquet/read/mod.rs | 4 ++-- src/io/parquet/write/mod.rs | 7 +++---- src/io/parquet/write/record_batch.rs | 16 +++++++------- tests/it/io/parquet/mod.rs | 2 +- 8 files changed, 42 insertions(+), 35 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 823f60ef313..1b97c689c61 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -62,8 +62,8 @@ futures = { version = "0.3", optional = true } ahash = { version = "0.7", optional = true } #parquet2 = { version = "0.5.2", optional = true, default_features = false, features = ["stream"] } -#parquet2 = { git = "https://github.com/jorgecarleitao/parquet2", branch = "reuse_compress", optional = true, default_features = false, features = ["stream"] } -parquet2 = { path = "../parquet2", optional = true, default_features = false, features = ["stream"] } +parquet2 = { git = "https://github.com/jorgecarleitao/parquet2", branch = "reuse_compress", optional = true, default_features = false, features = ["stream"] } +#parquet2 = { path = "../parquet2", optional = true, default_features = false, features = ["stream"] } avro-rs = { version = "0.13", optional = true, default_features = false } @@ -102,7 +102,7 @@ full = [ "io_avro", "regex", "merge_sort", - #"compute", + "compute", # parses timezones used in timestamp conversions "chrono-tz" ] diff --git a/examples/parquet_read.rs b/examples/parquet_read.rs index 1e0183ba1e3..8ab3ccdac2c 100644 --- a/examples/parquet_read.rs +++ b/examples/parquet_read.rs @@ -1,11 +1,12 @@ use std::fs::File; +use std::io::BufReader; use arrow2::io::parquet::read; use arrow2::{array::Array, error::Result}; fn read_column_chunk(path: &str, row_group: usize, column: usize) -> Result> { // Open a file, a common operation in Rust - let mut file = File::open(path)?; + let mut file = BufReader::new(File::open(path)?); // Read the files' metadata. This has a small IO cost because it requires seeking to the end // of the file to read its footer. diff --git a/examples/parquet_read_parallel.rs b/examples/parquet_read_parallel.rs index defaf6bb411..9dead6c43ac 100644 --- a/examples/parquet_read_parallel.rs +++ b/examples/parquet_read_parallel.rs @@ -47,16 +47,14 @@ fn parallel_read(path: &str) -> Result>> { let metadata_consumer = file_metadata.clone(); let arrow_schema_consumer = arrow_schema.clone(); let child = thread::spawn(move || { - let (column, row_group, iter) = rx_consumer.recv().unwrap(); + let (column, row_group, pages) = rx_consumer.recv().unwrap(); let start = SystemTime::now(); println!("consumer start - {} {}", column, row_group); let metadata = metadata_consumer.row_groups[row_group].column(column); let data_type = arrow_schema_consumer.fields()[column].data_type().clone(); - let pages = iter - .into_iter() - .map(|x| x.and_then(|x| read::decompress(x, &mut vec![]))); - let mut pages = read::streaming_iterator::convert(pages); + let mut pages = read::BasicDecompressor::new(pages.into_iter(), vec![]); + let array = read::page_iter_to_array(&mut pages, metadata, data_type); println!( "consumer end - {:?}: {} {}", diff --git a/examples/parquet_write.rs b/examples/parquet_write.rs index d6df8d736c2..888fce79242 100644 --- a/examples/parquet_write.rs +++ b/examples/parquet_write.rs @@ -1,13 +1,15 @@ use std::fs::File; use std::iter::once; +use arrow2::error::ArrowError; use arrow2::io::parquet::write::to_parquet_schema; use arrow2::{ array::{Array, Int32Array}, datatypes::{Field, Schema}, error::Result, io::parquet::write::{ - array_to_page, write_file, Compression, DynIter, Encoding, Version, WriteOptions, + array_to_pages, write_file, Compression, Compressor, DynIter, DynStreamingIterator, + Encoding, FallibleStreamingIterator, Version, WriteOptions, }, }; @@ -24,17 +26,22 @@ fn write_single_array(path: &str, array: &dyn Array, field: Field) -> Result<()> // map arrow fields to parquet fields let parquet_schema = to_parquet_schema(&schema)?; - // Declare the row group iterator. This must be an iterator of iterators of iterators: - // * first iterator of row groups - // * second iterator of column chunks - // * third iterator of pages - // an array can be divided in multiple pages via `.slice(offset, length)` (`O(1)`). - // All column chunks within a row group MUST have the same length. - let row_groups = once(Result::Ok(DynIter::new(once(Ok(DynIter::new( - once(array) - .zip(parquet_schema.columns().to_vec().into_iter()) - .map(|(array, descriptor)| array_to_page(array, descriptor, options, encoding)), - )))))); + let descriptor = parquet_schema.columns()[0].clone(); + + // Declare the row group iterator. This must be an iterator of iterators of streaming iterators + // * first iterator over row groups + let row_groups = once(Result::Ok(DynIter::new( + // * second iterator over column chunks (we assume no struct arrays -> `once` column) + once( + // * third iterator over (compressed) pages; dictionary encoding may lead to multiple pages per array. + array_to_pages(array, descriptor, options, encoding).map(move |pages| { + let encoded_pages = DynIter::new(pages.map(|x| Ok(x?))); + let compressed_pages = Compressor::new(encoded_pages, options.compression, vec![]) + .map_err(ArrowError::from); + DynStreamingIterator::new(compressed_pages) + }), + ), + ))); // Create a new empty file let mut file = File::create(path)?; diff --git a/src/io/parquet/read/mod.rs b/src/io/parquet/read/mod.rs index d28b7f30a85..1f6198c1e69 100644 --- a/src/io/parquet/read/mod.rs +++ b/src/io/parquet/read/mod.rs @@ -13,8 +13,8 @@ pub use parquet2::{ page::{CompressedDataPage, DataPage, DataPageHeader}, read::{ decompress, get_page_iterator as _get_page_iterator, get_page_stream as _get_page_stream, - read_metadata as _read_metadata, read_metadata_async as _read_metadata_async, Decompressor, - PageFilter, PageIterator, + read_metadata as _read_metadata, read_metadata_async as _read_metadata_async, + BasicDecompressor, Decompressor, PageFilter, PageIterator, }, schema::types::{ LogicalType, ParquetType, PhysicalType, PrimitiveConvertedType, diff --git a/src/io/parquet/write/mod.rs b/src/io/parquet/write/mod.rs index fdc3d886652..e1a6aa6b5e9 100644 --- a/src/io/parquet/write/mod.rs +++ b/src/io/parquet/write/mod.rs @@ -12,8 +12,6 @@ mod utils; pub mod stream; -use std::sync::Arc; - use crate::array::*; use crate::bitmap::Bitmap; use crate::buffer::{Buffer, MutableBuffer}; @@ -35,6 +33,7 @@ pub use parquet2::{ write_file as parquet_write_file, Compressor, DynIter, DynStreamingIterator, RowGroupIter, Version, WriteOptions, }, + FallibleStreamingIterator, }; pub use record_batch::RowGroupIterator; use schema::schema_to_metadata_key; @@ -110,7 +109,7 @@ pub fn can_encode(data_type: &DataType, encoding: Encoding) -> bool { /// Returns an iterator of [`EncodedPage`]. pub fn array_to_pages( - array: Arc, + array: &dyn Array, descriptor: ColumnDescriptor, options: WriteOptions, encoding: Encoding, @@ -126,7 +125,7 @@ pub fn array_to_pages( ) }) } - _ => array_to_page(array.as_ref(), descriptor, options, encoding) + _ => array_to_page(array, descriptor, options, encoding) .map(|page| DynIter::new(std::iter::once(Ok(page)))), } } diff --git a/src/io/parquet/write/record_batch.rs b/src/io/parquet/write/record_batch.rs index bfa599bfd07..b6cb940799c 100644 --- a/src/io/parquet/write/record_batch.rs +++ b/src/io/parquet/write/record_batch.rs @@ -63,13 +63,15 @@ impl>> Iterator for RowGroupIterator { .zip(self.parquet_schema.columns().to_vec().into_iter()) .zip(encodings.into_iter()) .map(move |((array, descriptor), encoding)| { - array_to_pages(array, descriptor, options, encoding).map(move |pages| { - let encoded_pages = DynIter::new(pages.map(|x| Ok(x?))); - let compressed_pages = - Compressor::new(encoded_pages, options.compression, vec![]) - .map_err(ArrowError::from); - DynStreamingIterator::new(compressed_pages) - }) + array_to_pages(array.as_ref(), descriptor, options, encoding).map( + move |pages| { + let encoded_pages = DynIter::new(pages.map(|x| Ok(x?))); + let compressed_pages = + Compressor::new(encoded_pages, options.compression, vec![]) + .map_err(ArrowError::from); + DynStreamingIterator::new(compressed_pages) + }, + ) }), )) }) diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index cac44296792..e82135fd812 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -499,7 +499,7 @@ fn integration_write(schema: &Schema, batches: &[RecordBatch]) -> Result } else { Encoding::Plain }; - array_to_pages(array.clone(), descriptor, options, encoding).map(|pages| { + array_to_pages(array.as_ref(), descriptor, options, encoding).map(|pages| { let encoded_pages = DynIter::new(pages.map(|x| Ok(x?))); let compressed_pages = Compressor::new(encoded_pages, options.compression, vec![]) From 4ae27b01575af6f2ba61dc93108e274355becff4 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Sun, 17 Oct 2021 05:24:51 +0000 Subject: [PATCH 4/5] Bumped parquet --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 1b97c689c61..ead71c38aee 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -62,7 +62,7 @@ futures = { version = "0.3", optional = true } ahash = { version = "0.7", optional = true } #parquet2 = { version = "0.5.2", optional = true, default_features = false, features = ["stream"] } -parquet2 = { git = "https://github.com/jorgecarleitao/parquet2", branch = "reuse_compress", optional = true, default_features = false, features = ["stream"] } +parquet2 = { git = "https://github.com/jorgecarleitao/parquet2", branch = "simplify_codec", optional = true, default_features = false, features = ["stream"] } #parquet2 = { path = "../parquet2", optional = true, default_features = false, features = ["stream"] } avro-rs = { version = "0.13", optional = true, default_features = false } From 56c98c794001f5f8c4a24219cd13c9bd218612d5 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Mon, 18 Oct 2021 05:26:08 +0000 Subject: [PATCH 5/5] Bumped to release parquet2 --- Cargo.toml | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ead71c38aee..73642d0f39c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -61,9 +61,7 @@ futures = { version = "0.3", optional = true } # for faster hashing ahash = { version = "0.7", optional = true } -#parquet2 = { version = "0.5.2", optional = true, default_features = false, features = ["stream"] } -parquet2 = { git = "https://github.com/jorgecarleitao/parquet2", branch = "simplify_codec", optional = true, default_features = false, features = ["stream"] } -#parquet2 = { path = "../parquet2", optional = true, default_features = false, features = ["stream"] } +parquet2 = { version = "0.6", optional = true, default_features = false, features = ["stream"] } avro-rs = { version = "0.13", optional = true, default_features = false }