Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
FixedSizeBinary.
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Aug 11, 2021
1 parent 5cf9bd2 commit df104cd
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 1 deletion.
34 changes: 34 additions & 0 deletions src/io/parquet/read/fixed_size_binary.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use futures::{pin_mut, Stream, StreamExt};
use parquet2::{
encoding::{bitpacking, hybrid_rle, uleb128, Encoding},
page::{DataPage, DataPageHeader, DataPageHeaderExt, FixedLenByteArrayPageDict},
Expand Down Expand Up @@ -159,6 +160,39 @@ where
))
}

pub async fn stream_to_array<I, E>(
pages: I,
size: i32,
metadata: &ColumnChunkMetaData,
) -> Result<FixedSizeBinaryArray>
where
ArrowError: From<E>,
E: Clone,
I: Stream<Item = std::result::Result<DataPage, E>>,
{
let capacity = metadata.num_values() as usize;
let mut values = MutableBuffer::<u8>::with_capacity(capacity * size as usize);
let mut validity = MutableBitmap::with_capacity(capacity);

pin_mut!(pages); // needed for iteration

while let Some(page) = pages.next().await {
extend_from_page(
page.as_ref().map_err(|x| x.clone())?,
size,
metadata.descriptor(),
&mut values,
&mut validity,
)?
}

Ok(FixedSizeBinaryArray::from_data(
DataType::FixedSizeBinary(size),
values.into(),
validity.into(),
))
}

pub(crate) fn extend_from_page(
page: &DataPage,
size: i32,
Expand Down
8 changes: 7 additions & 1 deletion src/io/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,13 @@ pub async fn page_stream_to_array<I: Stream<Item = std::result::Result<DataPage,
LargeBinary | LargeUtf8 => {
binary::stream_to_array::<i64, _, _>(pages, metadata, &data_type).await
}
_ => todo!(),
FixedSizeBinary(size) => Ok(Box::new(
fixed_size_binary::stream_to_array(pages, size, metadata).await?,
)),
other => Err(ArrowError::NotYetImplemented(format!(
"Async conversion of {:?}",
other
))),
}
}

Expand Down

0 comments on commit df104cd

Please sign in to comment.