From 49951467deb6ea6d8c1d5286ea68e6b55b6794a7 Mon Sep 17 00:00:00 2001 From: Jorge Leitao Date: Thu, 17 Mar 2022 18:29:38 +0100 Subject: [PATCH] Added basic support to read indexes (#100) --- examples/read_metadata.rs | 20 ++++++++- guide/src/README.md | 15 ++++++- src/error.rs | 6 +++ src/indexes/mod.rs | 60 +++++++++++++++++++++++++++ src/lib.rs | 1 + src/metadata/column_chunk_metadata.rs | 5 +++ 6 files changed, 104 insertions(+), 3 deletions(-) create mode 100644 src/indexes/mod.rs diff --git a/examples/read_metadata.rs b/examples/read_metadata.rs index 18aa8a819..157b14e87 100644 --- a/examples/read_metadata.rs +++ b/examples/read_metadata.rs @@ -1,7 +1,8 @@ +use parquet2::bloom_filter; use parquet2::error::Result; +use parquet2::indexes; // ANCHOR: deserialize -use parquet2::bloom_filter; use parquet2::encoding::Encoding; use parquet2::metadata::ColumnDescriptor; use parquet2::page::{split_buffer, DataPage}; @@ -60,6 +61,23 @@ fn main() -> Result<()> { let column_metadata = metadata.row_groups[row_group].column(column); // ANCHOR_END: column_metadata + // ANCHOR: column_index + // read the column index + let index = indexes::read_column(&mut reader, column_metadata.column_chunk())?; + if let Some(index) = index { + // these are the minimum and maximum within each page, which can be used + // to skip pages. + println!("{index:?}"); + } + + // read the offset index containing page locations + let maybe_pages = indexes::read_page_locations(&mut reader, column_metadata.column_chunk())?; + if let Some(pages) = maybe_pages { + // there are page locations in the file + println!("{pages:?}"); + } + // ANCHOR_END: column_index + // ANCHOR: statistics if let Some(maybe_stats) = column_metadata.statistics() { let stats = maybe_stats?; diff --git a/guide/src/README.md b/guide/src/README.md index 2099940c0..05ebe39dc 100644 --- a/guide/src/README.md +++ b/guide/src/README.md @@ -113,11 +113,22 @@ which can be downcasted via its `Statistics::physical_type()`: ## Bloom filters -The metadata of columns can contain bloom filter bitsets that -can be used to pushdown filter operations. +The column metadata may contain bloom filter bitsets that can be used to pushdown +filter operations to row groups. This crate offers the necessary functionality to check whether an item is not in a column chunk: ```rust,no_run,noplayground {{#include ../../examples/read_metadata.rs:bloom_filter}} ``` + +## Column and page indexes + +The column metadata may contain column and page indexes that can be used to push down filters +when reading (IO) pages. + +This crate offers the necessary functionality to check whether an item is not in a column chunk: + +```rust,no_run,noplayground +{{#include ../../examples/read_metadata.rs:column_metadata}} +``` diff --git a/src/error.rs b/src/error.rs index fb7d4dd79..fa0038401 100644 --- a/src/error.rs +++ b/src/error.rs @@ -70,6 +70,12 @@ impl From for ParquetError { } } +impl From for ParquetError { + fn from(e: std::num::TryFromIntError) -> ParquetError { + ParquetError::OutOfSpec(format!("Number must be zero or positive: {}", e)) + } +} + /// A specialized `Result` for Parquet errors. pub type Result = std::result::Result; diff --git a/src/indexes/mod.rs b/src/indexes/mod.rs new file mode 100644 index 000000000..6934a6f5b --- /dev/null +++ b/src/indexes/mod.rs @@ -0,0 +1,60 @@ +use std::convert::TryInto; +use std::io::{Cursor, Read, Seek, SeekFrom}; + +use parquet_format_async_temp::{ + thrift::protocol::TCompactInputProtocol, ColumnChunk, ColumnIndex, OffsetIndex, PageLocation, +}; + +use crate::error::ParquetError; + +/// Read the [`ColumnIndex`] from the [`ColumnChunk`], if available. +pub fn read_column( + reader: &mut R, + chunk: &ColumnChunk, +) -> Result, ParquetError> { + let (offset, length): (u64, usize) = if let Some(offset) = chunk.column_index_offset { + let length = chunk.column_index_length.ok_or_else(|| { + ParquetError::OutOfSpec( + "The column length must exist if column offset exists".to_string(), + ) + })?; + (offset.try_into()?, length.try_into()?) + } else { + return Ok(None); + }; + + reader.seek(SeekFrom::Start(offset))?; + let mut data = vec![0; length]; + reader.read_exact(&mut data)?; + + let mut d = Cursor::new(&data); + let mut prot = TCompactInputProtocol::new(&mut d); + Ok(Some(ColumnIndex::read_from_in_protocol(&mut prot)?)) +} + +/// Read [`PageLocation`]s from the [`ColumnChunk`], if available. +pub fn read_page_locations( + reader: &mut R, + chunk: &ColumnChunk, +) -> Result>, ParquetError> { + let (offset, length): (u64, usize) = if let Some(offset) = chunk.offset_index_offset { + let length = chunk.offset_index_length.ok_or_else(|| { + ParquetError::OutOfSpec( + "The column length must exist if column offset exists".to_string(), + ) + })?; + (offset.try_into()?, length.try_into()?) + } else { + return Ok(None); + }; + + reader.seek(SeekFrom::Start(offset))?; + let mut data = vec![0; length]; + reader.read_exact(&mut data)?; + + let mut d = Cursor::new(&data); + let mut prot = TCompactInputProtocol::new(&mut d); + let offset = OffsetIndex::read_from_in_protocol(&mut prot)?; + + Ok(Some(offset.page_locations)) +} diff --git a/src/lib.rs b/src/lib.rs index 11b7528e9..a5de7fe78 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,6 +7,7 @@ pub mod error; pub mod bloom_filter; pub mod compression; pub mod encoding; +pub mod indexes; pub mod metadata; pub mod page; mod parquet_bridge; diff --git a/src/metadata/column_chunk_metadata.rs b/src/metadata/column_chunk_metadata.rs index cf3178079..7d5b3afbb 100644 --- a/src/metadata/column_chunk_metadata.rs +++ b/src/metadata/column_chunk_metadata.rs @@ -41,6 +41,11 @@ impl ColumnChunkMetaData { self.column_chunk.file_offset } + /// Returns this column's [`ColumnChunk`] + pub fn column_chunk(&self) -> &ColumnChunk { + &self.column_chunk + } + // The column's metadata fn column_metadata(&self) -> &ColumnMetaData { self.column_chunk.meta_data.as_ref().unwrap()