Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Prepared for struct array.
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Oct 22, 2021
1 parent 6cdcc05 commit ed86d4d
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 41 deletions.
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ futures = { version = "0.3", optional = true }
# for faster hashing
ahash = { version = "0.7", optional = true }

parquet2 = { version = "0.6", optional = true, default_features = false, features = ["stream"] }
#parquet2 = { version = "0.6", optional = true, default_features = false, features = ["stream"] }
parquet2 = { git = "https://github.com/jorgecarleitao/parquet2", branch = "struct", optional = true, default_features = false, features = ["stream"] }
#parquet2 = { path = "../parquet2", optional = true, default_features = false, features = ["stream"] }

avro-rs = { version = "0.13", optional = true, default_features = false }

Expand Down
60 changes: 53 additions & 7 deletions src/io/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,17 @@ use std::{
};

use futures::{AsyncRead, AsyncSeek, Stream};
use parquet2::read::MutStreamingIterator;
pub use parquet2::{
error::ParquetError,
fallible_streaming_iterator,
metadata::{ColumnChunkMetaData, ColumnDescriptor, RowGroupMetaData},
page::{CompressedDataPage, DataPage, DataPageHeader},
read::{
decompress, get_page_iterator as _get_page_iterator, get_page_stream as _get_page_stream,
read_metadata as _read_metadata, read_metadata_async as _read_metadata_async,
BasicDecompressor, Decompressor, PageFilter, PageIterator,
decompress, get_column_iterator, get_page_iterator as _get_page_iterator,
get_page_stream as _get_page_stream, read_metadata as _read_metadata,
read_metadata_async as _read_metadata_async, BasicDecompressor, Decompressor, PageFilter,
PageIterator, State,
},
schema::types::{
LogicalType, ParquetType, PhysicalType, PrimitiveConvertedType,
Expand Down Expand Up @@ -45,12 +47,12 @@ pub(crate) use schema::is_type_nullable;
pub use schema::{get_schema, FileMetaData};

/// Creates a new iterator of compressed pages.
pub fn get_page_iterator<'b, RR: Read + Seek>(
pub fn get_page_iterator<R: Read + Seek>(
column_metadata: &ColumnChunkMetaData,
reader: &'b mut RR,
reader: R,
pages_filter: Option<PageFilter>,
buffer: Vec<u8>,
) -> Result<PageIterator<'b, RR>> {
) -> Result<PageIterator<R>> {
Ok(_get_page_iterator(
column_metadata,
reader,
Expand Down Expand Up @@ -165,8 +167,52 @@ fn dict_read<
}
}

/// Returns an Array built from an iterator of column chunks
pub fn column_iter_to_array<II, I>(
mut columns: I,
data_type: DataType,
mut buffer: Vec<u8>,
) -> Result<(Box<dyn Array>, Vec<u8>, Vec<u8>)>
where
II: Iterator<Item = std::result::Result<CompressedDataPage, ParquetError>>,
I: MutStreamingIterator<Item = (II, ColumnChunkMetaData), Error = ParquetError>,
{
let mut arrays = vec![];
let page_buffer;
loop {
match columns.advance()? {
State::Some(mut new_iter) => {
if let Some((pages, metadata)) = new_iter.get() {
let data_type = schema::to_data_type(metadata.descriptor().type_())?.unwrap();
let mut iterator = BasicDecompressor::new(pages, buffer);
let array = page_iter_to_array(&mut iterator, metadata, data_type)?;
buffer = iterator.into_inner();
arrays.push(array)
}
columns = new_iter;
}
State::Finished(b) => {
page_buffer = b;
break;
}
}
}

use crate::datatypes::PhysicalType::*;
Ok(match data_type.to_physical_type() {
Null => todo!(),
Boolean | Primitive(_) | FixedSizeBinary | Binary | LargeBinary | Utf8 | LargeUtf8
| List | LargeList | FixedSizeList | Dictionary(_) => {
(arrays.pop().unwrap(), page_buffer, buffer)
}
Struct => todo!(),
Union => todo!(),
Map => todo!(),
})
}

/// Converts an iterator of [`DataPage`] into a single [`Array`].
pub fn page_iter_to_array<I: FallibleStreamingIterator<Item = DataPage, Error = ParquetError>>(
fn page_iter_to_array<I: FallibleStreamingIterator<Item = DataPage, Error = ParquetError>>(
iter: &mut I,
metadata: &ColumnChunkMetaData,
data_type: DataType,
Expand Down
52 changes: 19 additions & 33 deletions src/io/parquet/read/record_batch.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
use std::{
io::{Read, Seek},
rc::Rc,
sync::Arc,
};

use crate::{
datatypes::{Field, Schema},
error::{ArrowError, Result},
error::Result,
record_batch::RecordBatch,
};

use super::{
get_page_iterator, get_schema, page_iter_to_array, read_metadata, Decompressor, FileMetaData,
PageFilter, RowGroupMetaData,
column_iter_to_array, get_column_iterator, get_schema, read_metadata, FileMetaData, PageFilter,
RowGroupMetaData,
};

type GroupFilter = Arc<dyn Fn(usize, &RowGroupMetaData) -> bool>;
Expand All @@ -21,12 +20,11 @@ type GroupFilter = Arc<dyn Fn(usize, &RowGroupMetaData) -> bool>;
pub struct RecordReader<R: Read + Seek> {
reader: R,
schema: Arc<Schema>,
indices: Rc<Vec<usize>>,
buffer: Vec<u8>,
decompress_buffer: Vec<u8>,
groups_filter: Option<GroupFilter>,
pages_filter: Option<PageFilter>,
metadata: Rc<FileMetaData>,
metadata: FileMetaData,
current_group: usize,
remaining_rows: usize,
}
Expand All @@ -46,29 +44,23 @@ impl<R: Read + Seek> RecordReader<R> {
let schema = get_schema(&metadata)?;

let schema_metadata = schema.metadata;
let (indices, fields): (Vec<usize>, Vec<Field>) = if let Some(projection) = &projection {
let fields: Vec<Field> = if let Some(projection) = &projection {
schema
.fields
.into_iter()
.enumerate()
.filter_map(|(index, f)| {
if projection.iter().any(|&i| i == index) {
Some((index, f))
Some(f)
} else {
None
}
})
.unzip()
.collect()
} else {
schema.fields.into_iter().enumerate().unzip()
schema.fields.into_iter().collect()
};

if let Some(projection) = &projection {
if indices.len() != projection.len() {
return Err(ArrowError::InvalidArgumentError("While reading parquet, some columns in the projection do not exist in the file".to_string()));
}
}

let schema = Arc::new(Schema {
fields,
metadata: schema_metadata,
Expand All @@ -77,10 +69,9 @@ impl<R: Read + Seek> RecordReader<R> {
Ok(Self {
reader,
schema,
indices: Rc::new(indices),
groups_filter,
pages_filter,
metadata: Rc::new(metadata),
metadata,
current_group: 0,
buffer: vec![],
decompress_buffer: vec![],
Expand All @@ -95,7 +86,7 @@ impl<R: Read + Seek> RecordReader<R> {

/// Returns parquet's [`FileMetaData`].
pub fn metadata(&self) -> &FileMetaData {
self.metadata.as_ref()
&self.metadata
}

/// Sets the groups filter
Expand All @@ -120,15 +111,14 @@ impl<R: Read + Seek> Iterator for RecordReader<R> {
}

let row_group = self.current_group;
let metadata = self.metadata.clone();
let metadata = &self.metadata;
let group = &metadata.row_groups[row_group];
if let Some(groups_filter) = self.groups_filter.as_ref() {
if !(groups_filter)(row_group, group) {
self.current_group += 1;
return self.next();
}
}
let columns_meta = group.columns();

// todo: avoid these clones.
let schema = self.schema().clone();
Expand All @@ -138,21 +128,18 @@ impl<R: Read + Seek> Iterator for RecordReader<R> {

let a = schema.fields().iter().enumerate().try_fold(
(b1, b2, Vec::with_capacity(schema.fields().len())),
|(b1, b2, mut columns), (column, field)| {
// column according to the file's indexing
let column = self.indices[column];
let column_metadata = &columns_meta[column];
let pages = get_page_iterator(
column_metadata,
|(b1, b2, mut columns), (field_index, field)| {
let column_iter = get_column_iterator(
&mut self.reader,
&self.metadata,
row_group,
field_index,
self.pages_filter.clone(),
b1,
)?;

let mut pages = Decompressor::new(pages, b2);
);

let array =
page_iter_to_array(&mut pages, column_metadata, field.data_type().clone())?;
let (array, b1, b2) =
column_iter_to_array(column_iter, field.data_type().clone(), b2)?;

let array = if array.len() > remaining_rows {
array.slice(0, remaining_rows)
Expand All @@ -161,7 +148,6 @@ impl<R: Read + Seek> Iterator for RecordReader<R> {
};

columns.push(array.into());
let (b1, b2) = pages.into_buffers();
Result::Ok((b1, b2, columns))
},
);
Expand Down

0 comments on commit ed86d4d

Please sign in to comment.