Skip to content

Commit

Permalink
Added file offset (#81)
Browse files Browse the repository at this point in the history
  • Loading branch information
barrotsteindev authored Feb 5, 2022
1 parent 2c516c7 commit e8a0c35
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 6 deletions.
12 changes: 10 additions & 2 deletions src/metadata/row_metadata.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use parquet_format_async_temp::RowGroup;

use super::{column_chunk_metadata::ColumnChunkMetaData, schema_descriptor::SchemaDescriptor};
use crate::error::Result;
use crate::{error::Result, write::ColumnOffsetsMetadata};

/// Metadata for a row group.
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -77,12 +77,20 @@ impl RowGroupMetaData {

/// Method to convert to Thrift.
pub fn into_thrift(self) -> RowGroup {
let file_offset = self
.columns
.iter()
.map(|c| {
ColumnOffsetsMetadata::from_column_chunk_metadata(c).calc_row_group_file_offset()
})
.next()
.unwrap_or(None);
RowGroup {
columns: self.columns.into_iter().map(|v| v.into_thrift()).collect(),
total_byte_size: self.total_byte_size,
num_rows: self.num_rows,
sorting_columns: None,
file_offset: None,
file_offset: file_offset,
total_compressed_size: None,
ordinal: None,
}
Expand Down
2 changes: 2 additions & 0 deletions src/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ pub use compression::{compress, Compressor};

pub use file::FileWriter;

pub use row_group::ColumnOffsetsMetadata;

use crate::compression::Compression;
use crate::page::CompressedPage;

Expand Down
60 changes: 56 additions & 4 deletions src/write/row_group.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use std::io::Write;

use futures::AsyncWrite;
use parquet_format_async_temp::RowGroup;
use parquet_format_async_temp::{ColumnChunk, RowGroup};

use crate::{
compression::Compression,
error::{ParquetError, Result},
metadata::ColumnDescriptor,
metadata::{ColumnChunkMetaData, ColumnDescriptor},
page::CompressedPage,
};

Expand All @@ -15,6 +15,42 @@ use super::{
DynIter, DynStreamingIterator,
};

pub struct ColumnOffsetsMetadata {
pub dictionary_page_offset: Option<i64>,
pub data_page_offset: Option<i64>,
}

impl ColumnOffsetsMetadata {
pub fn from_column_chunk(column_chunk: &ColumnChunk) -> ColumnOffsetsMetadata {
ColumnOffsetsMetadata {
dictionary_page_offset: column_chunk
.meta_data
.as_ref()
.map(|meta| meta.dictionary_page_offset)
.unwrap_or(None),
data_page_offset: column_chunk
.meta_data
.as_ref()
.map(|meta| meta.data_page_offset),
}
}

pub fn from_column_chunk_metadata(
column_chunk_metadata: &ColumnChunkMetaData,
) -> ColumnOffsetsMetadata {
ColumnOffsetsMetadata {
dictionary_page_offset: column_chunk_metadata.dictionary_page_offset(),
data_page_offset: Some(column_chunk_metadata.data_page_offset()),
}
}

pub fn calc_row_group_file_offset(&self) -> Option<i64> {
self.dictionary_page_offset
.filter(|x| *x > 0_i64)
.or(self.data_page_offset)
}
}

pub fn write_row_group<
'a,
W,
Expand Down Expand Up @@ -46,6 +82,14 @@ where
let bytes_written = offset - initial;

// compute row group stats
let file_offest = columns
.iter()
.next()
.map(|column_chunk| {
ColumnOffsetsMetadata::from_column_chunk(column_chunk).calc_row_group_file_offset()
})
.unwrap_or(None);

let total_byte_size = columns
.iter()
.map(|c| c.meta_data.as_ref().unwrap().total_compressed_size)
Expand All @@ -57,7 +101,7 @@ where
total_byte_size,
num_rows: num_rows as i64,
sorting_columns: None,
file_offset: None,
file_offset: file_offest,
total_compressed_size: None,
ordinal: None,
},
Expand Down Expand Up @@ -95,6 +139,14 @@ where
let bytes_written = offset - initial;

// compute row group stats
let file_offest = columns
.iter()
.next()
.map(|column_chunk| {
ColumnOffsetsMetadata::from_column_chunk(column_chunk).calc_row_group_file_offset()
})
.unwrap_or(None);

let total_byte_size = columns
.iter()
.map(|c| c.meta_data.as_ref().unwrap().total_compressed_size)
Expand All @@ -106,7 +158,7 @@ where
total_byte_size,
num_rows: num_rows as i64,
sorting_columns: None,
file_offset: None,
file_offset: file_offest,
total_compressed_size: None,
ordinal: None,
},
Expand Down

0 comments on commit e8a0c35

Please sign in to comment.