diff --git a/src/metadata/row_metadata.rs b/src/metadata/row_metadata.rs index 0470028a2..29b0b889d 100644 --- a/src/metadata/row_metadata.rs +++ b/src/metadata/row_metadata.rs @@ -1,7 +1,7 @@ use parquet_format_async_temp::RowGroup; use super::{column_chunk_metadata::ColumnChunkMetaData, schema_descriptor::SchemaDescriptor}; -use crate::error::Result; +use crate::{error::Result, write::ColumnOffsetsMetadata}; /// Metadata for a row group. #[derive(Debug, Clone)] @@ -77,12 +77,20 @@ impl RowGroupMetaData { /// Method to convert to Thrift. pub fn into_thrift(self) -> RowGroup { + let file_offset = self + .columns + .iter() + .map(|c| { + ColumnOffsetsMetadata::from_column_chunk_metadata(c).calc_row_group_file_offset() + }) + .next() + .unwrap_or(None); RowGroup { columns: self.columns.into_iter().map(|v| v.into_thrift()).collect(), total_byte_size: self.total_byte_size, num_rows: self.num_rows, sorting_columns: None, - file_offset: None, + file_offset: file_offset, total_compressed_size: None, ordinal: None, } diff --git a/src/write/mod.rs b/src/write/mod.rs index 67b76a608..1e324180d 100644 --- a/src/write/mod.rs +++ b/src/write/mod.rs @@ -17,6 +17,8 @@ pub use compression::{compress, Compressor}; pub use file::FileWriter; +pub use row_group::ColumnOffsetsMetadata; + use crate::compression::Compression; use crate::page::CompressedPage; diff --git a/src/write/row_group.rs b/src/write/row_group.rs index 8e4746f06..e38af9817 100644 --- a/src/write/row_group.rs +++ b/src/write/row_group.rs @@ -1,12 +1,12 @@ use std::io::Write; use futures::AsyncWrite; -use parquet_format_async_temp::RowGroup; +use parquet_format_async_temp::{ColumnChunk, RowGroup}; use crate::{ compression::Compression, error::{ParquetError, Result}, - metadata::ColumnDescriptor, + metadata::{ColumnChunkMetaData, ColumnDescriptor}, page::CompressedPage, }; @@ -15,6 +15,42 @@ use super::{ DynIter, DynStreamingIterator, }; +pub struct ColumnOffsetsMetadata { + pub dictionary_page_offset: Option, + pub data_page_offset: Option, +} + +impl ColumnOffsetsMetadata { + pub fn from_column_chunk(column_chunk: &ColumnChunk) -> ColumnOffsetsMetadata { + ColumnOffsetsMetadata { + dictionary_page_offset: column_chunk + .meta_data + .as_ref() + .map(|meta| meta.dictionary_page_offset) + .unwrap_or(None), + data_page_offset: column_chunk + .meta_data + .as_ref() + .map(|meta| meta.data_page_offset), + } + } + + pub fn from_column_chunk_metadata( + column_chunk_metadata: &ColumnChunkMetaData, + ) -> ColumnOffsetsMetadata { + ColumnOffsetsMetadata { + dictionary_page_offset: column_chunk_metadata.dictionary_page_offset(), + data_page_offset: Some(column_chunk_metadata.data_page_offset()), + } + } + + pub fn calc_row_group_file_offset(&self) -> Option { + self.dictionary_page_offset + .filter(|x| *x > 0_i64) + .or(self.data_page_offset) + } +} + pub fn write_row_group< 'a, W, @@ -46,6 +82,14 @@ where let bytes_written = offset - initial; // compute row group stats + let file_offest = columns + .iter() + .next() + .map(|column_chunk| { + ColumnOffsetsMetadata::from_column_chunk(column_chunk).calc_row_group_file_offset() + }) + .unwrap_or(None); + let total_byte_size = columns .iter() .map(|c| c.meta_data.as_ref().unwrap().total_compressed_size) @@ -57,7 +101,7 @@ where total_byte_size, num_rows: num_rows as i64, sorting_columns: None, - file_offset: None, + file_offset: file_offest, total_compressed_size: None, ordinal: None, }, @@ -95,6 +139,14 @@ where let bytes_written = offset - initial; // compute row group stats + let file_offest = columns + .iter() + .next() + .map(|column_chunk| { + ColumnOffsetsMetadata::from_column_chunk(column_chunk).calc_row_group_file_offset() + }) + .unwrap_or(None); + let total_byte_size = columns .iter() .map(|c| c.meta_data.as_ref().unwrap().total_compressed_size) @@ -106,7 +158,7 @@ where total_byte_size, num_rows: num_rows as i64, sorting_columns: None, - file_offset: None, + file_offset: file_offest, total_compressed_size: None, ordinal: None, },