From d968bde36e236f29adc57d04894acee6d712b5a4 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Wed, 25 Aug 2021 07:03:13 +0000 Subject: [PATCH] Simplified API to get iterator --- README.md | 3 ++- examples/s3/src/main.rs | 4 +++- integration-tests/src/read/mod.rs | 13 ++++--------- parquet-tools/src/lib/dump.rs | 3 ++- src/read/mod.rs | 17 ++++++++--------- src/read/page_stream.rs | 7 ++----- 6 files changed, 21 insertions(+), 26 deletions(-) diff --git a/README.md b/README.md index ff816d566..2458c38a4 100644 --- a/README.md +++ b/README.md @@ -102,7 +102,8 @@ distribute work across threads. E.g. ```rust let handles = vec![]; for column in columns { - let compressed_pages = get_page_iterator(&metadata, row_group, column, &mut file, file)?.collect()?; + let column_meta = metadata.row_groups[row_group].column(column); + let compressed_pages = get_page_iterator(column_meta, &mut file, file)?.collect()?; // each compressed_page has a buffer; cloning is expensive(!). We move it so that the memory // is released at the end of the processing. handles.push(thread::spawn move { diff --git a/examples/s3/src/main.rs b/examples/s3/src/main.rs index 9c6cf5182..64aa41025 100644 --- a/examples/s3/src/main.rs +++ b/examples/s3/src/main.rs @@ -49,9 +49,11 @@ async fn main() -> Result<()> { // * first row group // * first column + let column_metadata = metadata.row_groups[row_group].column(column); + // * do not skip any pages let pages = - get_page_stream(&metadata, 0, 0, &mut reader, vec![], Arc::new(|_, _| true)).await?; + get_page_stream(&column_metadata, &mut reader, vec![], Arc::new(|_, _| true)).await?; pin_mut!(pages); // needed for iteration diff --git a/integration-tests/src/read/mod.rs b/integration-tests/src/read/mod.rs index ac276dee4..5419b2d32 100644 --- a/integration-tests/src/read/mod.rs +++ b/integration-tests/src/read/mod.rs @@ -97,20 +97,15 @@ pub(crate) mod tests { column: usize, ) -> Result<(Array, Option>)> { let metadata = read_metadata(reader)?; - let descriptor = metadata.row_groups[row_group] - .column(column) - .descriptor() - .clone(); + let column_meta = metadata.row_groups[row_group].column(column); + let descriptor = column_meta.descriptor().clone(); - let iterator = get_page_iterator(&metadata, row_group, column, reader, None, vec![])?; + let iterator = get_page_iterator(&column_meta, reader, None, vec![])?; let buffer = vec![]; let mut iterator = Decompressor::new(iterator, buffer); - let statistics = metadata.row_groups[row_group] - .column(column) - .statistics() - .transpose()?; + let statistics = column_meta.statistics().transpose()?; let page = iterator.next().unwrap().as_ref().unwrap(); diff --git a/parquet-tools/src/lib/dump.rs b/parquet-tools/src/lib/dump.rs index 7de32288d..59339963e 100644 --- a/parquet-tools/src/lib/dump.rs +++ b/parquet-tools/src/lib/dump.rs @@ -47,7 +47,8 @@ where writeln!(writer, "{}", SEPARATOR)?; for column in &columns { - let iter = get_page_iterator(&metadata, i, *column, &mut file)?; + let column_meta = group.column(column); + let iter = get_page_iterator(column_meta, &mut file)?; for (page_ind, page) in iter.enumerate() { let page = page?; writeln!( diff --git a/src/read/mod.rs b/src/read/mod.rs index 00e8b5f1f..56102bf9f 100644 --- a/src/read/mod.rs +++ b/src/read/mod.rs @@ -19,7 +19,7 @@ pub use stream::read_metadata as read_metadata_async; use std::io::{Read, Seek, SeekFrom}; use std::sync::Arc; -use crate::metadata::RowGroupMetaData; +use crate::metadata::{ColumnChunkMetaData, RowGroupMetaData}; use crate::{error::Result, metadata::FileMetaData}; pub use page_iterator::{PageFilter, PageIterator}; @@ -44,16 +44,13 @@ pub fn filter_row_groups( } pub fn get_page_iterator<'a, RR: Read + Seek>( - metadata: &FileMetaData, - row_group: usize, - column: usize, + column_metadata: &ColumnChunkMetaData, reader: &'a mut RR, pages_filter: Option, buffer: Vec, ) -> Result> { let pages_filter = pages_filter.unwrap_or_else(|| Arc::new(|_, _| true)); - let column_metadata = metadata.row_groups[row_group].column(column); let (col_start, _) = column_metadata.byte_range(); reader.seek(SeekFrom::Start(col_start))?; Ok(PageIterator::new( @@ -84,8 +81,9 @@ mod tests { let row_group = 0; let column = 0; + let column_metadata = metadata.row_groups[row_group].column(column); let buffer = vec![]; - let mut iter = get_page_iterator(&metadata, row_group, column, &mut file, None, buffer)?; + let mut iter = get_page_iterator(column_metadata, &mut file, None, buffer)?; let page = iter.next().unwrap().unwrap(); assert_eq!(page.num_values(), 8); @@ -102,9 +100,9 @@ mod tests { let row_group = 0; let column = 0; + let column_metadata = metadata.row_groups[row_group].column(column); let buffer = vec![0]; - let mut iterator = - get_page_iterator(&metadata, row_group, column, &mut file, None, buffer)?; + let mut iterator = get_page_iterator(column_metadata, &mut file, None, buffer)?; let page = iterator.next().unwrap().unwrap(); iterator.reuse_buffer(page.buffer); @@ -125,8 +123,9 @@ mod tests { let row_group = 0; let column = 0; + let column_metadata = metadata.row_groups[row_group].column(column); let buffer = vec![1]; - let iterator = get_page_iterator(&metadata, row_group, column, &mut file, None, buffer)?; + let iterator = get_page_iterator(column_metadata, &mut file, None, buffer)?; let buffer = vec![]; let mut iterator = Decompressor::new(iterator, buffer); diff --git a/src/read/page_stream.rs b/src/read/page_stream.rs index 51a82eda8..3d97e39c3 100644 --- a/src/read/page_stream.rs +++ b/src/read/page_stream.rs @@ -6,7 +6,7 @@ use parquet_format_async_temp::thrift::protocol::TCompactInputStreamProtocol; use crate::compression::Compression; use crate::error::Result; -use crate::metadata::{ColumnDescriptor, FileMetaData}; +use crate::metadata::{ColumnChunkMetaData, ColumnDescriptor}; use crate::page::{CompressedDataPage, ParquetPageHeader}; use super::page_iterator::{finish_page, get_page_header, FinishedPage}; @@ -14,14 +14,11 @@ use super::PageFilter; /// Returns a stream of compressed data pages pub async fn get_page_stream<'a, RR: AsyncRead + Unpin + Send + AsyncSeek>( - metadata: &'a FileMetaData, - row_group: usize, - column: usize, + column_metadata: &'a ColumnChunkMetaData, reader: &'a mut RR, buffer: Vec, pages_filter: PageFilter, ) -> Result> + 'a> { - let column_metadata = metadata.row_groups[row_group].column(column); let (col_start, _) = column_metadata.byte_range(); reader.seek(SeekFrom::Start(col_start)).await?; Ok(_get_page_stream(