Skip to content

Commit

Permalink
Expose SortingColumn in parquet files (#3103)
Browse files Browse the repository at this point in the history
* Expose SortColumn from parquet file

* fix formatting issues

* empty commit

* fix PR comments

* formatting fix

* add parquet round trip test

* fix clippy error

* update the test based on PR comment

Co-authored-by: askoa <askoa@local>
  • Loading branch information
askoa and askoa authored Nov 15, 2022
1 parent 8bb2917 commit 371ec57
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 2 deletions.
21 changes: 19 additions & 2 deletions parquet/src/file/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use std::sync::Arc;

use crate::format::{
BoundaryOrder, ColumnChunk, ColumnIndex, ColumnMetaData, OffsetIndex, PageLocation,
RowGroup,
RowGroup, SortingColumn,
};

use crate::basic::{ColumnOrder, Compression, Encoding, Type};
Expand Down Expand Up @@ -229,6 +229,7 @@ pub type RowGroupMetaDataPtr = Arc<RowGroupMetaData>;
pub struct RowGroupMetaData {
columns: Vec<ColumnChunkMetaData>,
num_rows: i64,
sorting_columns: Option<Vec<SortingColumn>>,
total_byte_size: i64,
schema_descr: SchemaDescPtr,
page_offset_index: Option<Vec<Vec<PageLocation>>>,
Expand Down Expand Up @@ -260,6 +261,11 @@ impl RowGroupMetaData {
self.num_rows
}

/// Returns the sort ordering of the rows in this RowGroup if any
pub fn sorting_columns(&self) -> Option<&Vec<SortingColumn>> {
self.sorting_columns.as_ref()
}

/// Total byte size of all uncompressed column data in this row group.
pub fn total_byte_size(&self) -> i64 {
self.total_byte_size
Expand Down Expand Up @@ -303,9 +309,11 @@ impl RowGroupMetaData {
let cc = ColumnChunkMetaData::from_thrift(d.clone(), c)?;
columns.push(cc);
}
let sorting_columns = rg.sorting_columns;
Ok(RowGroupMetaData {
columns,
num_rows,
sorting_columns,
total_byte_size,
schema_descr,
page_offset_index: None,
Expand All @@ -318,7 +326,7 @@ impl RowGroupMetaData {
columns: self.columns().iter().map(|v| v.to_thrift()).collect(),
total_byte_size: self.total_byte_size,
num_rows: self.num_rows,
sorting_columns: None,
sorting_columns: self.sorting_columns().cloned(),
file_offset: None,
total_compressed_size: None,
ordinal: None,
Expand All @@ -331,6 +339,7 @@ pub struct RowGroupMetaDataBuilder {
columns: Vec<ColumnChunkMetaData>,
schema_descr: SchemaDescPtr,
num_rows: i64,
sorting_columns: Option<Vec<SortingColumn>>,
total_byte_size: i64,
page_offset_index: Option<Vec<Vec<PageLocation>>>,
}
Expand All @@ -342,6 +351,7 @@ impl RowGroupMetaDataBuilder {
columns: Vec::with_capacity(schema_descr.num_columns()),
schema_descr,
num_rows: 0,
sorting_columns: None,
total_byte_size: 0,
page_offset_index: None,
}
Expand All @@ -353,6 +363,12 @@ impl RowGroupMetaDataBuilder {
self
}

/// Sets the sorting order for columns
pub fn set_sorting_columns(mut self, value: Option<Vec<SortingColumn>>) -> Self {
self.sorting_columns = value;
self
}

/// Sets total size in bytes for this row group.
pub fn set_total_byte_size(mut self, value: i64) -> Self {
self.total_byte_size = value;
Expand Down Expand Up @@ -384,6 +400,7 @@ impl RowGroupMetaDataBuilder {
Ok(RowGroupMetaData {
columns: self.columns,
num_rows: self.num_rows,
sorting_columns: self.sorting_columns,
total_byte_size: self.total_byte_size,
schema_descr: self.schema_descr,
page_offset_index: self.page_offset_index,
Expand Down
16 changes: 16 additions & 0 deletions parquet/src/file/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ use std::{collections::HashMap, sync::Arc};
use crate::basic::{Compression, Encoding};
use crate::compression::{CodecOptions, CodecOptionsBuilder};
use crate::file::metadata::KeyValue;
use crate::format::SortingColumn;
use crate::schema::types::ColumnPath;

const DEFAULT_PAGE_SIZE: usize = 1024 * 1024;
Expand Down Expand Up @@ -121,6 +122,7 @@ pub struct WriterProperties {
pub(crate) key_value_metadata: Option<Vec<KeyValue>>,
default_column_properties: ColumnProperties,
column_properties: HashMap<ColumnPath, ColumnProperties>,
sorting_columns: Option<Vec<SortingColumn>>,
}

impl WriterProperties {
Expand Down Expand Up @@ -182,6 +184,11 @@ impl WriterProperties {
self.key_value_metadata.as_ref()
}

/// Returns sorting columns.
pub fn sorting_columns(&self) -> Option<&Vec<SortingColumn>> {
self.sorting_columns.as_ref()
}

/// Returns encoding for a data page, when dictionary encoding is enabled.
/// This is not configurable.
#[inline]
Expand Down Expand Up @@ -262,6 +269,7 @@ pub struct WriterPropertiesBuilder {
key_value_metadata: Option<Vec<KeyValue>>,
default_column_properties: ColumnProperties,
column_properties: HashMap<ColumnPath, ColumnProperties>,
sorting_columns: Option<Vec<SortingColumn>>,
}

impl WriterPropertiesBuilder {
Expand All @@ -278,6 +286,7 @@ impl WriterPropertiesBuilder {
key_value_metadata: None,
default_column_properties: ColumnProperties::new(),
column_properties: HashMap::new(),
sorting_columns: None,
}
}

Expand All @@ -294,6 +303,7 @@ impl WriterPropertiesBuilder {
key_value_metadata: self.key_value_metadata,
default_column_properties: self.default_column_properties,
column_properties: self.column_properties,
sorting_columns: self.sorting_columns,
}
}

Expand Down Expand Up @@ -370,6 +380,12 @@ impl WriterPropertiesBuilder {
self
}

/// Sets sorting order of rows in the row group if any
pub fn set_sorting_columns(mut self, value: Option<Vec<SortingColumn>>) -> Self {
self.sorting_columns = value;
self
}

// ----------------------------------------------------------------------
// Setters for any column (global)

Expand Down
61 changes: 61 additions & 0 deletions parquet/src/file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,7 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> {
.set_column_metadata(column_chunks)
.set_total_byte_size(self.total_bytes_written as i64)
.set_num_rows(self.total_rows_written.unwrap_or(0) as i64)
.set_sorting_columns(self.props.sorting_columns().cloned())
.build()?;

let metadata = Arc::new(row_group_metadata);
Expand Down Expand Up @@ -653,6 +654,7 @@ mod tests {
reader::{FileReader, SerializedFileReader, SerializedPageReader},
statistics::{from_thrift, to_thrift, Statistics},
};
use crate::format::SortingColumn;
use crate::record::RowAccessor;
use crate::schema::types::{ColumnDescriptor, ColumnPath};
use crate::util::memory::ByteBufferPtr;
Expand Down Expand Up @@ -844,6 +846,65 @@ mod tests {
assert_eq!(read_field, &field);
}

#[test]
fn test_file_writer_with_sorting_columns_metadata() {
let file = tempfile::tempfile().unwrap();

let schema = Arc::new(
types::Type::group_type_builder("schema")
.with_fields(&mut vec![
Arc::new(
types::Type::primitive_type_builder("col1", Type::INT32)
.build()
.unwrap(),
),
Arc::new(
types::Type::primitive_type_builder("col2", Type::INT32)
.build()
.unwrap(),
),
])
.build()
.unwrap(),
);
let expected_result = Some(vec![SortingColumn {
column_idx: 0,
descending: false,
nulls_first: true,
}]);
let props = Arc::new(
WriterProperties::builder()
.set_key_value_metadata(Some(vec![KeyValue::new(
"key".to_string(),
"value".to_string(),
)]))
.set_sorting_columns(expected_result.clone())
.build(),
);
let mut writer =
SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
let mut row_group_writer = writer.next_row_group().expect("get row group writer");

let col_writer = row_group_writer.next_column().unwrap().unwrap();
col_writer.close().unwrap();

let col_writer = row_group_writer.next_column().unwrap().unwrap();
col_writer.close().unwrap();

row_group_writer.close().unwrap();
writer.close().unwrap();

let reader = SerializedFileReader::new(file).unwrap();
let result: Vec<Option<&Vec<SortingColumn>>> = reader
.metadata()
.row_groups()
.iter()
.map(|f| f.sorting_columns())
.collect();
// validate the sorting column read match the one written above
assert_eq!(expected_result.as_ref(), result[0]);
}

#[test]
fn test_file_writer_empty_row_groups() {
let file = tempfile::tempfile().unwrap();
Expand Down

0 comments on commit 371ec57

Please sign in to comment.