Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Simplified reading parquet #532

Merged
merged 5 commits into from
Oct 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ futures = { version = "0.3", optional = true }
# for faster hashing
ahash = { version = "0.7", optional = true }

parquet2 = { version = "0.5.2", optional = true, default_features = false, features = ["stream"] }
parquet2 = { version = "0.6", optional = true, default_features = false, features = ["stream"] }

avro-rs = { version = "0.13", optional = true, default_features = false }

Expand Down
3 changes: 2 additions & 1 deletion examples/parquet_read.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use std::fs::File;
use std::io::BufReader;

use arrow2::io::parquet::read;
use arrow2::{array::Array, error::Result};

fn read_column_chunk(path: &str, row_group: usize, column: usize) -> Result<Box<dyn Array>> {
// Open a file, a common operation in Rust
let mut file = File::open(path)?;
let mut file = BufReader::new(File::open(path)?);

// Read the files' metadata. This has a small IO cost because it requires seeking to the end
// of the file to read its footer.
Expand Down
8 changes: 3 additions & 5 deletions examples/parquet_read_parallel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,14 @@ fn parallel_read(path: &str) -> Result<Vec<Box<dyn Array>>> {
let metadata_consumer = file_metadata.clone();
let arrow_schema_consumer = arrow_schema.clone();
let child = thread::spawn(move || {
let (column, row_group, iter) = rx_consumer.recv().unwrap();
let (column, row_group, pages) = rx_consumer.recv().unwrap();
let start = SystemTime::now();
println!("consumer start - {} {}", column, row_group);
let metadata = metadata_consumer.row_groups[row_group].column(column);
let data_type = arrow_schema_consumer.fields()[column].data_type().clone();

let pages = iter
.into_iter()
.map(|x| x.and_then(|x| read::decompress(x, &mut vec![])));
let mut pages = read::streaming_iterator::convert(pages);
let mut pages = read::BasicDecompressor::new(pages.into_iter(), vec![]);

let array = read::page_iter_to_array(&mut pages, metadata, data_type);
println!(
"consumer end - {:?}: {} {}",
Expand Down
31 changes: 19 additions & 12 deletions examples/parquet_write.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
use std::fs::File;
use std::iter::once;

use arrow2::error::ArrowError;
use arrow2::io::parquet::write::to_parquet_schema;
use arrow2::{
array::{Array, Int32Array},
datatypes::{Field, Schema},
error::Result,
io::parquet::write::{
array_to_page, write_file, Compression, DynIter, Encoding, Version, WriteOptions,
array_to_pages, write_file, Compression, Compressor, DynIter, DynStreamingIterator,
Encoding, FallibleStreamingIterator, Version, WriteOptions,
},
};

Expand All @@ -24,17 +26,22 @@ fn write_single_array(path: &str, array: &dyn Array, field: Field) -> Result<()>
// map arrow fields to parquet fields
let parquet_schema = to_parquet_schema(&schema)?;

// Declare the row group iterator. This must be an iterator of iterators of iterators:
// * first iterator of row groups
// * second iterator of column chunks
// * third iterator of pages
// an array can be divided in multiple pages via `.slice(offset, length)` (`O(1)`).
// All column chunks within a row group MUST have the same length.
let row_groups = once(Result::Ok(DynIter::new(once(Ok(DynIter::new(
once(array)
.zip(parquet_schema.columns().to_vec().into_iter())
.map(|(array, descriptor)| array_to_page(array, descriptor, options, encoding)),
))))));
let descriptor = parquet_schema.columns()[0].clone();

// Declare the row group iterator. This must be an iterator of iterators of streaming iterators
// * first iterator over row groups
let row_groups = once(Result::Ok(DynIter::new(
// * second iterator over column chunks (we assume no struct arrays -> `once` column)
once(
// * third iterator over (compressed) pages; dictionary encoding may lead to multiple pages per array.
array_to_pages(array, descriptor, options, encoding).map(move |pages| {
let encoded_pages = DynIter::new(pages.map(|x| Ok(x?)));
let compressed_pages = Compressor::new(encoded_pages, options.compression, vec![])
.map_err(ArrowError::from);
DynStreamingIterator::new(compressed_pages)
}),
),
)));

// Create a new empty file
let mut file = File::create(path)?;
Expand Down
6 changes: 6 additions & 0 deletions src/io/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,9 @@ impl From<parquet2::error::ParquetError> for ArrowError {
ArrowError::External("".to_string(), Box::new(error))
}
}

impl From<ArrowError> for parquet2::error::ParquetError {
fn from(error: ArrowError) -> Self {
parquet2::error::ParquetError::General(error.to_string())
}
}
9 changes: 4 additions & 5 deletions src/io/parquet/read/binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use parquet2::{
encoding::{delta_length_byte_array, hybrid_rle, Encoding},
metadata::{ColumnChunkMetaData, ColumnDescriptor},
page::{BinaryPageDict, DataPage},
read::StreamingIterator,
FallibleStreamingIterator,
};

use crate::{
Expand Down Expand Up @@ -308,17 +308,16 @@ pub fn iter_to_array<O, I, E>(
where
ArrowError: From<E>,
O: Offset,
E: Clone,
I: StreamingIterator<Item = std::result::Result<DataPage, E>>,
I: FallibleStreamingIterator<Item = DataPage, Error = E>,
{
let capacity = metadata.num_values() as usize;
let mut values = MutableBuffer::<u8>::with_capacity(0);
let mut offsets = MutableBuffer::<O>::with_capacity(1 + capacity);
offsets.push(O::default());
let mut validity = MutableBitmap::with_capacity(capacity);
while let Some(page) = iter.next() {
while let Some(page) = iter.next()? {
extend_from_page(
page.as_ref().map_err(|x| x.clone())?,
page,
metadata.descriptor(),
&mut offsets,
&mut values,
Expand Down
9 changes: 4 additions & 5 deletions src/io/parquet/read/binary/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use parquet2::{
encoding::{hybrid_rle, Encoding},
metadata::{ColumnChunkMetaData, ColumnDescriptor},
page::{BinaryPageDict, DataPage},
read::StreamingIterator,
FallibleStreamingIterator,
};

use super::super::utils as other_utils;
Expand Down Expand Up @@ -133,17 +133,16 @@ where
ArrowError: From<E>,
O: Offset,
K: DictionaryKey,
E: Clone,
I: StreamingIterator<Item = std::result::Result<DataPage, E>>,
I: FallibleStreamingIterator<Item = DataPage, Error = E>,
{
let capacity = metadata.num_values() as usize;
let mut indices = MutableBuffer::<K>::with_capacity(capacity);
let mut values = MutableBuffer::<u8>::with_capacity(0);
let mut offsets = MutableBuffer::<O>::with_capacity(1 + capacity);
let mut validity = MutableBitmap::with_capacity(capacity);
while let Some(page) = iter.next() {
while let Some(page) = iter.next()? {
extend_from_page(
page.as_ref().map_err(|x| x.clone())?,
page,
metadata.descriptor(),
&mut indices,
&mut offsets,
Expand Down
10 changes: 5 additions & 5 deletions src/io/parquet/read/binary/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use parquet2::{
encoding::{hybrid_rle::HybridRleDecoder, Encoding},
metadata::{ColumnChunkMetaData, ColumnDescriptor},
page::DataPage,
read::{levels::get_bit_width, StreamingIterator},
read::levels::get_bit_width,
FallibleStreamingIterator,
};

use super::super::nested_utils::*;
Expand Down Expand Up @@ -153,8 +154,7 @@ pub fn iter_to_array<O, I, E>(
where
O: Offset,
ArrowError: From<E>,
E: Clone,
I: StreamingIterator<Item = std::result::Result<DataPage, E>>,
I: FallibleStreamingIterator<Item = DataPage, Error = E>,
{
let capacity = metadata.num_values() as usize;
let mut values = MutableBuffer::<u8>::with_capacity(0);
Expand All @@ -164,9 +164,9 @@ where

let (mut nested, is_nullable) = init_nested(metadata.descriptor().base_type(), capacity);

while let Some(page) = iter.next() {
while let Some(page) = iter.next()? {
extend_from_page(
page.as_ref().map_err(|x| x.clone())?,
page,
metadata.descriptor(),
is_nullable,
&mut nested,
Expand Down
14 changes: 4 additions & 10 deletions src/io/parquet/read/boolean/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use parquet2::{
encoding::{hybrid_rle, Encoding},
metadata::{ColumnChunkMetaData, ColumnDescriptor},
page::DataPage,
read::StreamingIterator,
FallibleStreamingIterator,
};

pub(super) fn read_required(buffer: &[u8], additional: usize, values: &mut MutableBitmap) {
Expand Down Expand Up @@ -71,19 +71,13 @@ fn read_optional(
pub fn iter_to_array<I, E>(mut iter: I, metadata: &ColumnChunkMetaData) -> Result<BooleanArray>
where
ArrowError: From<E>,
E: Clone,
I: StreamingIterator<Item = std::result::Result<DataPage, E>>,
I: FallibleStreamingIterator<Item = DataPage, Error = E>,
{
let capacity = metadata.num_values() as usize;
let mut values = MutableBitmap::with_capacity(capacity);
let mut validity = MutableBitmap::with_capacity(capacity);
while let Some(page) = iter.next() {
extend_from_page(
page.as_ref().map_err(|x| x.clone())?,
metadata.descriptor(),
&mut values,
&mut validity,
)?
while let Some(page) = iter.next()? {
extend_from_page(page, metadata.descriptor(), &mut values, &mut validity)?
}

Ok(BooleanArray::from_data(
Expand Down
10 changes: 5 additions & 5 deletions src/io/parquet/read/boolean/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use parquet2::{
encoding::{hybrid_rle::HybridRleDecoder, Encoding},
metadata::{ColumnChunkMetaData, ColumnDescriptor},
page::DataPage,
read::{levels::get_bit_width, StreamingIterator},
read::levels::get_bit_width,
FallibleStreamingIterator,
};

use super::super::nested_utils::*;
Expand Down Expand Up @@ -137,18 +138,17 @@ pub fn iter_to_array<I, E>(
) -> Result<Box<dyn Array>>
where
ArrowError: From<E>,
E: Clone,
I: StreamingIterator<Item = std::result::Result<DataPage, E>>,
I: FallibleStreamingIterator<Item = DataPage, Error = E>,
{
let capacity = metadata.num_values() as usize;
let mut values = MutableBitmap::with_capacity(capacity);
let mut validity = MutableBitmap::with_capacity(capacity);

let (mut nested, is_nullable) = init_nested(metadata.descriptor().base_type(), capacity);

while let Some(page) = iter.next() {
while let Some(page) = iter.next()? {
extend_from_page(
page.as_ref().map_err(|x| x.clone())?,
page,
metadata.descriptor(),
is_nullable,
&mut nested,
Expand Down
9 changes: 4 additions & 5 deletions src/io/parquet/read/fixed_size_binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use futures::{pin_mut, Stream, StreamExt};
use parquet2::{
encoding::{hybrid_rle, Encoding},
page::{DataPage, FixedLenByteArrayPageDict},
read::StreamingIterator,
FallibleStreamingIterator,
};

use super::{ColumnChunkMetaData, ColumnDescriptor};
Expand Down Expand Up @@ -134,17 +134,16 @@ pub fn iter_to_array<I, E>(
) -> Result<FixedSizeBinaryArray>
where
ArrowError: From<E>,
E: Clone,
I: StreamingIterator<Item = std::result::Result<DataPage, E>>,
I: FallibleStreamingIterator<Item = DataPage, Error = E>,
{
let size = *FixedSizeBinaryArray::get_size(&data_type) as usize;

let capacity = metadata.num_values() as usize;
let mut values = MutableBuffer::<u8>::with_capacity(capacity * size);
let mut validity = MutableBitmap::with_capacity(capacity);
while let Some(page) = iter.next() {
while let Some(page) = iter.next()? {
extend_from_page(
page.as_ref().map_err(|x| x.clone())?,
page,
size,
metadata.descriptor(),
&mut values,
Expand Down
10 changes: 5 additions & 5 deletions src/io/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,20 @@ use std::{
use futures::{AsyncRead, AsyncSeek, Stream};
pub use parquet2::{
error::ParquetError,
fallible_streaming_iterator,
metadata::{ColumnChunkMetaData, ColumnDescriptor, RowGroupMetaData},
page::{CompressedDataPage, DataPage, DataPageHeader},
read::{
decompress, get_page_iterator as _get_page_iterator, get_page_stream as _get_page_stream,
read_metadata as _read_metadata, read_metadata_async as _read_metadata_async,
streaming_iterator, Decompressor, PageFilter, PageIterator, StreamingIterator,
BasicDecompressor, Decompressor, PageFilter, PageIterator,
},
schema::types::{
LogicalType, ParquetType, PhysicalType, PrimitiveConvertedType,
TimeUnit as ParquetTimeUnit, TimestampType,
},
types::int96_to_i64_ns,
FallibleStreamingIterator,
};

use crate::{
Expand Down Expand Up @@ -82,7 +84,7 @@ pub async fn read_metadata_async<R: AsyncRead + AsyncSeek + Send + Unpin>(

fn dict_read<
K: DictionaryKey,
I: StreamingIterator<Item = std::result::Result<DataPage, ParquetError>>,
I: FallibleStreamingIterator<Item = DataPage, Error = ParquetError>,
>(
iter: &mut I,
metadata: &ColumnChunkMetaData,
Expand Down Expand Up @@ -164,9 +166,7 @@ fn dict_read<
}

/// Converts an iterator of [`DataPage`] into a single [`Array`].
pub fn page_iter_to_array<
I: StreamingIterator<Item = std::result::Result<DataPage, ParquetError>>,
>(
pub fn page_iter_to_array<I: FallibleStreamingIterator<Item = DataPage, Error = ParquetError>>(
iter: &mut I,
metadata: &ColumnChunkMetaData,
data_type: DataType,
Expand Down
9 changes: 4 additions & 5 deletions src/io/parquet/read/primitive/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use std::sync::Arc;
use parquet2::{
encoding::{hybrid_rle, Encoding},
page::{DataPage, PrimitivePageDict},
read::StreamingIterator,
types::NativeType,
FallibleStreamingIterator,
};

use super::super::utils;
Expand Down Expand Up @@ -135,18 +135,17 @@ where
ArrowError: From<E>,
T: NativeType,
K: DictionaryKey,
E: Clone,
A: ArrowNativeType,
F: Copy + Fn(T) -> A,
I: StreamingIterator<Item = std::result::Result<DataPage, E>>,
I: FallibleStreamingIterator<Item = DataPage, Error = E>,
{
let capacity = metadata.num_values() as usize;
let mut indices = MutableBuffer::<K>::with_capacity(capacity);
let mut values = MutableBuffer::<A>::with_capacity(capacity);
let mut validity = MutableBitmap::with_capacity(capacity);
while let Some(page) = iter.next() {
while let Some(page) = iter.next()? {
extend_from_page(
page.as_ref().map_err(|x| x.clone())?,
page,
metadata.descriptor(),
&mut indices,
&mut values,
Expand Down
Loading