Skip to content

Commit

Permalink
Simplified API to get iterator
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Aug 25, 2021
1 parent 450c99d commit d968bde
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 26 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ distribute work across threads. E.g.
```rust
let handles = vec![];
for column in columns {
let compressed_pages = get_page_iterator(&metadata, row_group, column, &mut file, file)?.collect()?;
let column_meta = metadata.row_groups[row_group].column(column);
let compressed_pages = get_page_iterator(column_meta, &mut file, file)?.collect()?;
// each compressed_page has a buffer; cloning is expensive(!). We move it so that the memory
// is released at the end of the processing.
handles.push(thread::spawn move {
Expand Down
4 changes: 3 additions & 1 deletion examples/s3/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,11 @@ async fn main() -> Result<()> {

// * first row group
// * first column
let column_metadata = metadata.row_groups[row_group].column(column);

// * do not skip any pages
let pages =
get_page_stream(&metadata, 0, 0, &mut reader, vec![], Arc::new(|_, _| true)).await?;
get_page_stream(&column_metadata, &mut reader, vec![], Arc::new(|_, _| true)).await?;

pin_mut!(pages); // needed for iteration

Expand Down
13 changes: 4 additions & 9 deletions integration-tests/src/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,20 +97,15 @@ pub(crate) mod tests {
column: usize,
) -> Result<(Array, Option<std::sync::Arc<dyn Statistics>>)> {
let metadata = read_metadata(reader)?;
let descriptor = metadata.row_groups[row_group]
.column(column)
.descriptor()
.clone();
let column_meta = metadata.row_groups[row_group].column(column);
let descriptor = column_meta.descriptor().clone();

let iterator = get_page_iterator(&metadata, row_group, column, reader, None, vec![])?;
let iterator = get_page_iterator(&column_meta, reader, None, vec![])?;

let buffer = vec![];
let mut iterator = Decompressor::new(iterator, buffer);

let statistics = metadata.row_groups[row_group]
.column(column)
.statistics()
.transpose()?;
let statistics = column_meta.statistics().transpose()?;

let page = iterator.next().unwrap().as_ref().unwrap();

Expand Down
3 changes: 2 additions & 1 deletion parquet-tools/src/lib/dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ where
writeln!(writer, "{}", SEPARATOR)?;

for column in &columns {
let iter = get_page_iterator(&metadata, i, *column, &mut file)?;
let column_meta = group.column(column);
let iter = get_page_iterator(column_meta, &mut file)?;
for (page_ind, page) in iter.enumerate() {
let page = page?;
writeln!(
Expand Down
17 changes: 8 additions & 9 deletions src/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub use stream::read_metadata as read_metadata_async;
use std::io::{Read, Seek, SeekFrom};
use std::sync::Arc;

use crate::metadata::RowGroupMetaData;
use crate::metadata::{ColumnChunkMetaData, RowGroupMetaData};
use crate::{error::Result, metadata::FileMetaData};

pub use page_iterator::{PageFilter, PageIterator};
Expand All @@ -44,16 +44,13 @@ pub fn filter_row_groups(
}

pub fn get_page_iterator<'a, RR: Read + Seek>(
metadata: &FileMetaData,
row_group: usize,
column: usize,
column_metadata: &ColumnChunkMetaData,
reader: &'a mut RR,
pages_filter: Option<PageFilter>,
buffer: Vec<u8>,
) -> Result<PageIterator<'a, RR>> {
let pages_filter = pages_filter.unwrap_or_else(|| Arc::new(|_, _| true));

let column_metadata = metadata.row_groups[row_group].column(column);
let (col_start, _) = column_metadata.byte_range();
reader.seek(SeekFrom::Start(col_start))?;
Ok(PageIterator::new(
Expand Down Expand Up @@ -84,8 +81,9 @@ mod tests {

let row_group = 0;
let column = 0;
let column_metadata = metadata.row_groups[row_group].column(column);
let buffer = vec![];
let mut iter = get_page_iterator(&metadata, row_group, column, &mut file, None, buffer)?;
let mut iter = get_page_iterator(column_metadata, &mut file, None, buffer)?;

let page = iter.next().unwrap().unwrap();
assert_eq!(page.num_values(), 8);
Expand All @@ -102,9 +100,9 @@ mod tests {

let row_group = 0;
let column = 0;
let column_metadata = metadata.row_groups[row_group].column(column);
let buffer = vec![0];
let mut iterator =
get_page_iterator(&metadata, row_group, column, &mut file, None, buffer)?;
let mut iterator = get_page_iterator(column_metadata, &mut file, None, buffer)?;

let page = iterator.next().unwrap().unwrap();
iterator.reuse_buffer(page.buffer);
Expand All @@ -125,8 +123,9 @@ mod tests {

let row_group = 0;
let column = 0;
let column_metadata = metadata.row_groups[row_group].column(column);
let buffer = vec![1];
let iterator = get_page_iterator(&metadata, row_group, column, &mut file, None, buffer)?;
let iterator = get_page_iterator(column_metadata, &mut file, None, buffer)?;

let buffer = vec![];
let mut iterator = Decompressor::new(iterator, buffer);
Expand Down
7 changes: 2 additions & 5 deletions src/read/page_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,19 @@ use parquet_format_async_temp::thrift::protocol::TCompactInputStreamProtocol;

use crate::compression::Compression;
use crate::error::Result;
use crate::metadata::{ColumnDescriptor, FileMetaData};
use crate::metadata::{ColumnChunkMetaData, ColumnDescriptor};
use crate::page::{CompressedDataPage, ParquetPageHeader};

use super::page_iterator::{finish_page, get_page_header, FinishedPage};
use super::PageFilter;

/// Returns a stream of compressed data pages
pub async fn get_page_stream<'a, RR: AsyncRead + Unpin + Send + AsyncSeek>(
metadata: &'a FileMetaData,
row_group: usize,
column: usize,
column_metadata: &'a ColumnChunkMetaData,
reader: &'a mut RR,
buffer: Vec<u8>,
pages_filter: PageFilter,
) -> Result<impl Stream<Item = Result<CompressedDataPage>> + 'a> {
let column_metadata = metadata.row_groups[row_group].column(column);
let (col_start, _) = column_metadata.byte_range();
reader.seek(SeekFrom::Start(col_start)).await?;
Ok(_get_page_stream(
Expand Down

0 comments on commit d968bde

Please sign in to comment.