Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added support for async parquet.
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Aug 10, 2021
1 parent bfb4910 commit e36876b
Show file tree
Hide file tree
Showing 6 changed files with 189 additions and 19 deletions.
45 changes: 45 additions & 0 deletions src/io/parquet/read/binary/basic.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use futures::{pin_mut, Stream, StreamExt};
use parquet2::{
encoding::{bitpacking, delta_length_byte_array, hybrid_rle, uleb128, Encoding},
metadata::{ColumnChunkMetaData, ColumnDescriptor},
Expand Down Expand Up @@ -353,3 +354,47 @@ where
_ => unreachable!(),
})
}

pub async fn stream_to_array<O, I, E>(
pages: I,
metadata: &ColumnChunkMetaData,
data_type: &DataType,
) -> Result<Box<dyn Array>>
where
ArrowError: From<E>,
O: Offset,
E: Clone,
I: Stream<Item = std::result::Result<DataPage, E>>,
{
let capacity = metadata.num_values() as usize;
let mut values = MutableBuffer::<u8>::with_capacity(0);
let mut offsets = MutableBuffer::<O>::with_capacity(1 + capacity);
offsets.push(O::default());
let mut validity = MutableBitmap::with_capacity(capacity);

pin_mut!(pages); // needed for iteration

while let Some(page) = pages.next().await {
extend_from_page(
page.as_ref().map_err(|x| x.clone())?,
metadata.descriptor(),
&mut offsets,
&mut values,
&mut validity,
)?
}

Ok(match data_type {
DataType::LargeBinary | DataType::Binary => Box::new(BinaryArray::from_data(
offsets.into(),
values.into(),
validity.into(),
)),
DataType::LargeUtf8 | DataType::Utf8 => Box::new(Utf8Array::from_data(
offsets.into(),
values.into(),
validity.into(),
)),
_ => unreachable!(),
})
}
1 change: 1 addition & 0 deletions src/io/parquet/read/binary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ mod basic;
mod nested;

pub use basic::iter_to_array;
pub use basic::stream_to_array;
pub use nested::iter_to_array as iter_to_array_nested;
26 changes: 26 additions & 0 deletions src/io/parquet/read/boolean/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use crate::{
};

use super::super::utils;

use futures::{pin_mut, Stream, StreamExt};
use parquet2::{
encoding::{hybrid_rle, Encoding},
metadata::{ColumnChunkMetaData, ColumnDescriptor},
Expand Down Expand Up @@ -86,6 +88,30 @@ where
Ok(BooleanArray::from_data(values.into(), validity.into()))
}

pub async fn stream_to_array<I, E>(pages: I, metadata: &ColumnChunkMetaData) -> Result<BooleanArray>
where
ArrowError: From<E>,
E: Clone,
I: Stream<Item = std::result::Result<DataPage, E>>,
{
let capacity = metadata.num_values() as usize;
let mut values = MutableBitmap::with_capacity(capacity);
let mut validity = MutableBitmap::with_capacity(capacity);

pin_mut!(pages); // needed for iteration

while let Some(page) = pages.next().await {
extend_from_page(
page.as_ref().map_err(|x| x.clone())?,
metadata.descriptor(),
&mut values,
&mut validity,
)?
}

Ok(BooleanArray::from_data(values.into(), validity.into()))
}

fn extend_from_page(
page: &DataPage,
descriptor: &ColumnDescriptor,
Expand Down
1 change: 1 addition & 0 deletions src/io/parquet/read/boolean/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ mod basic;
mod nested;

pub use basic::iter_to_array;
pub use basic::stream_to_array;
pub use nested::iter_to_array as iter_to_array_nested;
92 changes: 73 additions & 19 deletions src/io/parquet/read/mod.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,6 @@
use std::io::{Read, Seek};

mod binary;
mod boolean;
mod fixed_size_binary;
mod nested_utils;
mod primitive;
mod record_batch;
pub mod schema;
pub mod statistics;
mod utils;

use crate::{
array::Array,
datatypes::{DataType, IntervalUnit, TimeUnit},
error::{ArrowError, Result},
};

pub use record_batch::RecordReader;
pub use schema::{get_schema, is_type_nullable, FileMetaData};

use futures::Stream;
pub use parquet2::{
error::ParquetError,
metadata::{ColumnChunkMetaData, ColumnDescriptor, RowGroupMetaData},
Expand All @@ -34,6 +16,25 @@ pub use parquet2::{
types::int96_to_i64_ns,
};

use crate::{
array::Array,
datatypes::{DataType, IntervalUnit, TimeUnit},
error::{ArrowError, Result},
};

mod binary;
mod boolean;
mod fixed_size_binary;
mod nested_utils;
mod primitive;
mod record_batch;
pub mod schema;
pub mod statistics;
mod utils;

pub use record_batch::RecordReader;
pub use schema::{get_schema, is_type_nullable, FileMetaData};

/// Creates a new iterator of compressed pages.
pub fn get_page_iterator<'b, RR: Read + Seek>(
metadata: &FileMetaData,
Expand Down Expand Up @@ -163,6 +164,59 @@ pub fn page_iter_to_array<
}
}

// Converts an async stream of compressed data pages into an [`Array`].
pub async fn page_stream_to_array<I: Stream<Item = std::result::Result<DataPage, ParquetError>>>(
pages: I,
metadata: &ColumnChunkMetaData,
data_type: DataType,
) -> Result<Box<dyn Array>> {
use DataType::*;
match data_type {
// INT32
UInt8 => primitive::stream_to_array(pages, metadata, data_type, |x: i32| x as u8).await,
UInt16 => primitive::stream_to_array(pages, metadata, data_type, |x: i32| x as u16).await,
UInt32 => primitive::stream_to_array(pages, metadata, data_type, |x: i32| x as u32).await,
Int8 => primitive::stream_to_array(pages, metadata, data_type, |x: i32| x as i8).await,
Int16 => primitive::stream_to_array(pages, metadata, data_type, |x: i32| x as i16).await,
Int32 | Date32 | Time32(_) | Interval(IntervalUnit::YearMonth) => {
primitive::stream_to_array(pages, metadata, data_type, |x: i32| x as i32).await
}

Timestamp(TimeUnit::Nanosecond, None) => match metadata.descriptor().type_() {
ParquetType::PrimitiveType { physical_type, .. } => match physical_type {
PhysicalType::Int96 => {
primitive::stream_to_array(
pages,
metadata,
DataType::Timestamp(TimeUnit::Nanosecond, None),
int96_to_i64_ns,
)
.await
}
_ => primitive::stream_to_array(pages, metadata, data_type, |x: i64| x).await,
},
_ => unreachable!(),
},

// INT64
Int64 | Date64 | Time64(_) | Duration(_) | Timestamp(_, _) => {
primitive::stream_to_array(pages, metadata, data_type, |x: i64| x).await
}
UInt64 => primitive::stream_to_array(pages, metadata, data_type, |x: i64| x as u64).await,

Float32 => primitive::stream_to_array(pages, metadata, data_type, |x: f32| x).await,
Float64 => primitive::stream_to_array(pages, metadata, data_type, |x: f64| x).await,

Boolean => Ok(Box::new(boolean::stream_to_array(pages, metadata).await?)),

Binary | Utf8 => binary::stream_to_array::<i32, _, _>(pages, metadata, &data_type).await,
LargeBinary | LargeUtf8 => {
binary::stream_to_array::<i64, _, _>(pages, metadata, &data_type).await
}
_ => todo!(),
}
}

#[cfg(test)]
mod tests {
use std::fs::File;
Expand Down
43 changes: 43 additions & 0 deletions src/io/parquet/read/primitive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ mod utils;

use std::sync::Arc;

use futures::{pin_mut, Stream, StreamExt};
use parquet2::{page::DataPage, read::StreamingIterator, types::NativeType};

use super::nested_utils::*;
Expand Down Expand Up @@ -59,6 +60,48 @@ where
)))
}

pub async fn stream_to_array<T, A, I, E, F>(
pages: I,
metadata: &ColumnChunkMetaData,
data_type: DataType,
op: F,
) -> Result<Box<dyn Array>>
where
ArrowError: From<E>,
T: NativeType,
E: Clone,
A: ArrowNativeType,
F: Copy + Fn(T) -> A,
I: Stream<Item = std::result::Result<DataPage, E>>,
{
let capacity = metadata.num_values() as usize;
let mut values = MutableBuffer::<A>::with_capacity(capacity);
let mut validity = MutableBitmap::with_capacity(capacity);

pin_mut!(pages); // needed for iteration

while let Some(page) = pages.next().await {
basic::extend_from_page(
page.as_ref().map_err(|x| x.clone())?,
metadata.descriptor(),
&mut values,
&mut validity,
op,
)?
}

let data_type = match data_type {
DataType::Dictionary(_, values) => values.as_ref().clone(),
_ => data_type,
};

Ok(Box::new(PrimitiveArray::from_data(
data_type,
values.into(),
validity.into(),
)))
}

pub fn iter_to_array_nested<T, A, I, E, F>(
mut iter: I,
metadata: &ColumnChunkMetaData,
Expand Down

0 comments on commit e36876b

Please sign in to comment.