From 61b93b4eba26670f84af85ff05258d1afc09f40a Mon Sep 17 00:00:00 2001 From: Kornelijus Survila Date: Tue, 12 Apr 2022 20:47:35 -0600 Subject: [PATCH] Auto-detect page compression This removes the explicit compression setting in `WriteOptions`. It was somewhat confusing as the pages still needed to be appropriately compressed, and limiting as it forced the same value for the whole file. --- src/page/mod.rs | 7 +++++++ src/page/page_dict/mod.rs | 9 ++++++++- src/write/column_chunk.rs | 21 ++++++++++++++++----- src/write/compression.rs | 1 + src/write/file.rs | 1 - src/write/mod.rs | 3 --- src/write/page.rs | 26 ++++++++++++++++---------- src/write/row_group.rs | 7 ++----- src/write/stream.rs | 1 - tests/it/write/indexes.rs | 3 +-- tests/it/write/mod.rs | 9 +++------ 11 files changed, 54 insertions(+), 34 deletions(-) diff --git a/src/page/mod.rs b/src/page/mod.rs index 6cbb57608..7baf66478 100644 --- a/src/page/mod.rs +++ b/src/page/mod.rs @@ -277,6 +277,13 @@ impl CompressedPage { } } + pub(crate) fn compression(&self) -> Compression { + match self { + CompressedPage::Data(page) => page.compression(), + CompressedPage::Dict(page) => page.compression(), + } + } + pub(crate) fn num_values(&self) -> usize { match self { CompressedPage::Data(page) => page.num_values(), diff --git a/src/page/page_dict/mod.rs b/src/page/page_dict/mod.rs index ed4f6e5a7..d116e5898 100644 --- a/src/page/page_dict/mod.rs +++ b/src/page/page_dict/mod.rs @@ -36,18 +36,25 @@ impl EncodedDictPage { #[derive(Debug)] pub struct CompressedDictPage { pub(crate) buffer: Vec, + compression: Compression, pub(crate) num_values: usize, pub(crate) uncompressed_page_size: usize, } impl CompressedDictPage { - pub fn new(buffer: Vec, uncompressed_page_size: usize, num_values: usize) -> Self { + pub fn new(buffer: Vec, compression: Compression, uncompressed_page_size: usize, num_values: usize) -> Self { Self { buffer, + compression, uncompressed_page_size, num_values, } } + + /// The compression of the data in this page. + pub fn compression(&self) -> Compression { + self.compression + } } pub fn read_dict_page( diff --git a/src/write/column_chunk.rs b/src/write/column_chunk.rs index 0a63c03ca..02bbe2296 100644 --- a/src/write/column_chunk.rs +++ b/src/write/column_chunk.rs @@ -25,7 +25,6 @@ pub fn write_column_chunk<'a, W, E>( writer: &mut W, mut offset: u64, descriptor: &ColumnDescriptor, - compression: Compression, mut compressed_pages: DynStreamingIterator<'a, CompressedPage, E>, ) -> Result<(ColumnChunk, Vec, u64)> where @@ -45,7 +44,7 @@ where } let mut bytes_written = offset - initial; - let column_chunk = build_column_chunk(&specs, descriptor, compression)?; + let column_chunk = build_column_chunk(&specs, descriptor)?; // write metadata let mut protocol = TCompactOutputProtocol::new(writer); @@ -63,7 +62,6 @@ pub async fn write_column_chunk_async( writer: &mut W, mut offset: u64, descriptor: &ColumnDescriptor, - compression: Compression, mut compressed_pages: DynStreamingIterator<'_, CompressedPage, E>, ) -> Result<(ColumnChunk, Vec, u64)> where @@ -81,7 +79,7 @@ where } let mut bytes_written = offset - initial; - let column_chunk = build_column_chunk(&specs, descriptor, compression)?; + let column_chunk = build_column_chunk(&specs, descriptor)?; // write metadata let mut protocol = TCompactOutputStreamProtocol::new(writer); @@ -99,10 +97,23 @@ where fn build_column_chunk( specs: &[PageWriteSpec], descriptor: &ColumnDescriptor, - compression: Compression, ) -> Result { // compute stats to build header at the end of the chunk + let compression = specs + .iter() + .map(|spec| spec.compression) + .collect::>(); + if compression.len() > 1 { + return Err(crate::error::Error::OutOfSpec( + "All pages within a column chunk must be compressed with the same codec".to_string(), + )); + } + let compression = compression + .into_iter() + .next() + .unwrap_or(Compression::Uncompressed); + // SPEC: the total compressed size is the total compressed size of each page + the header size let total_compressed_size = specs .iter() diff --git a/src/write/compression.rs b/src/write/compression.rs index 9821fce46..2af9e8469 100644 --- a/src/write/compression.rs +++ b/src/write/compression.rs @@ -69,6 +69,7 @@ fn compress_dict( } Ok(CompressedDictPage::new( compressed_buffer, + compression, uncompressed_page_size, num_values, )) diff --git a/src/write/file.rs b/src/write/file.rs index e800fc71b..6a3309350 100644 --- a/src/write/file.rs +++ b/src/write/file.rs @@ -110,7 +110,6 @@ impl FileWriter { &mut self.writer, self.offset, self.schema.columns(), - self.options.compression, row_group, ordinal, )?; diff --git a/src/write/mod.rs b/src/write/mod.rs index dd41e4f44..e9e6dce57 100644 --- a/src/write/mod.rs +++ b/src/write/mod.rs @@ -20,7 +20,6 @@ pub use file::FileWriter; pub use row_group::ColumnOffsetsMetadata; -use crate::compression::Compression; use crate::page::CompressedPage; pub type RowGroupIter<'a, E> = @@ -31,8 +30,6 @@ pub type RowGroupIter<'a, E> = pub struct WriteOptions { /// Whether to write statistics, including indexes pub write_statistics: bool, - /// Whether to use compression - pub compression: Compression, /// Which Parquet version to use pub version: Version, } diff --git a/src/write/page.rs b/src/write/page.rs index fbc38d585..7e5d50d11 100644 --- a/src/write/page.rs +++ b/src/write/page.rs @@ -8,6 +8,7 @@ use parquet_format_async_temp::thrift::protocol::{ }; use parquet_format_async_temp::{DictionaryPageHeader, Encoding, PageType}; +use crate::compression::Compression; use crate::error::{Error, Result}; use crate::page::{ CompressedDataPage, CompressedDictPage, CompressedPage, DataPageHeader, ParquetPageHeader, @@ -44,6 +45,7 @@ pub struct PageWriteSpec { pub header_size: u64, pub offset: u64, pub bytes_written: u64, + pub compression: Compression, pub statistics: Option>, } @@ -84,6 +86,7 @@ pub fn write_page( header_size, offset, bytes_written, + compression: compressed_page.compression(), statistics, num_rows: selected_rows.map(|x| x.last().unwrap().length), num_values, @@ -127,6 +130,7 @@ pub async fn write_page_async( header_size, offset, bytes_written, + compression: compressed_page.compression(), statistics, num_rows: selected_rows.map(|x| x.last().unwrap().length), num_values, @@ -210,21 +214,23 @@ mod tests { #[test] fn dict_too_large() { - let page = CompressedDictPage { - buffer: vec![], - uncompressed_page_size: i32::MAX as usize + 1, - num_values: 100, - }; + let page = CompressedDictPage::new( + vec![], + Compression::Uncompressed, + i32::MAX as usize + 1, + 100, + ); assert!(assemble_dict_page_header(&page).is_err()); } #[test] fn dict_too_many_values() { - let page = CompressedDictPage { - buffer: vec![], - uncompressed_page_size: 0, - num_values: i32::MAX as usize + 1, - }; + let page = CompressedDictPage::new( + vec![], + Compression::Uncompressed, + 0, + i32::MAX as usize + 1, + ); assert!(assemble_dict_page_header(&page).is_err()); } } diff --git a/src/write/row_group.rs b/src/write/row_group.rs index 351733f6e..4cf3cc9a1 100644 --- a/src/write/row_group.rs +++ b/src/write/row_group.rs @@ -4,7 +4,6 @@ use futures::AsyncWrite; use parquet_format_async_temp::{ColumnChunk, RowGroup}; use crate::{ - compression::Compression, error::{Error, Result}, metadata::{ColumnChunkMetaData, ColumnDescriptor}, page::CompressedPage, @@ -81,7 +80,6 @@ pub fn write_row_group< writer: &mut W, mut offset: u64, descriptors: &[ColumnDescriptor], - compression: Compression, columns: DynIter<'a, std::result::Result, E>>, ordinal: usize, ) -> Result<(RowGroup, Vec>, u64)> @@ -96,7 +94,7 @@ where let columns = column_iter .map(|(descriptor, page_iter)| { let (column, page_specs, size) = - write_column_chunk(writer, offset, descriptor, compression, page_iter?)?; + write_column_chunk(writer, offset, descriptor, page_iter?)?; offset += size; Ok((column, page_specs)) }) @@ -147,7 +145,6 @@ pub async fn write_row_group_async< writer: &mut W, mut offset: u64, descriptors: &[ColumnDescriptor], - compression: Compression, columns: DynIter<'a, std::result::Result, E>>, ) -> Result<(RowGroup, Vec>, u64)> where @@ -161,7 +158,7 @@ where let mut columns = vec![]; for (descriptor, page_iter) in column_iter { let (column, page_specs, size) = - write_column_chunk_async(writer, offset, descriptor, compression, page_iter?).await?; + write_column_chunk_async(writer, offset, descriptor, page_iter?).await?; offset += size; columns.push((column, page_specs)); } diff --git a/src/write/stream.rs b/src/write/stream.rs index 6e7f8c301..37160e5f5 100644 --- a/src/write/stream.rs +++ b/src/write/stream.rs @@ -106,7 +106,6 @@ impl FileStreamer { &mut self.writer, self.offset, self.schema.columns(), - self.options.compression, row_group, ) .await?; diff --git a/tests/it/write/indexes.rs b/tests/it/write/indexes.rs index b423951f0..a1f5216b1 100644 --- a/tests/it/write/indexes.rs +++ b/tests/it/write/indexes.rs @@ -25,7 +25,6 @@ fn write_file() -> Result> { let options = WriteOptions { write_statistics: true, - compression: Compression::Uncompressed, version: Version::V1, }; @@ -44,7 +43,7 @@ fn write_file() -> Result> { let pages = DynStreamingIterator::new(Compressor::new( DynIter::new(pages.into_iter()), - options.compression, + Compression::Uncompressed, vec![], )); let columns = std::iter::once(Ok(pages)); diff --git a/tests/it/write/mod.rs b/tests/it/write/mod.rs index d8ff76ccd..5d1570f46 100644 --- a/tests/it/write/mod.rs +++ b/tests/it/write/mod.rs @@ -55,7 +55,6 @@ fn test_column(column: &str, compression: Compression) -> Result<()> { let options = WriteOptions { write_statistics: true, - compression, version: Version::V1, }; @@ -83,7 +82,7 @@ fn test_column(column: &str, compression: Compression) -> Result<()> { &options, &a[0].descriptor, ))), - options.compression, + compression, vec![], )); let columns = std::iter::once(Ok(pages)); @@ -187,7 +186,6 @@ fn basic() -> Result<()> { let options = WriteOptions { write_statistics: false, - compression: Compression::Uncompressed, version: Version::V1, }; @@ -205,7 +203,7 @@ fn basic() -> Result<()> { &options, &schema.columns()[0].descriptor, ))), - options.compression, + Compression::Uncompressed, vec![], )); let columns = std::iter::once(Ok(pages)); @@ -237,7 +235,6 @@ async fn test_column_async(column: &str) -> Result<()> { let options = WriteOptions { write_statistics: true, - compression: Compression::Uncompressed, version: Version::V1, }; @@ -264,7 +261,7 @@ async fn test_column_async(column: &str) -> Result<()> { &options, &a[0].descriptor, ))), - options.compression, + Compression::Uncompressed, vec![], )); let columns = std::iter::once(Ok(pages));