diff --git a/src/io/parquet/read/fixed_size_binary.rs b/src/io/parquet/read/fixed_size_binary.rs index e92dd0619b..6ea5e78250 100644 --- a/src/io/parquet/read/fixed_size_binary.rs +++ b/src/io/parquet/read/fixed_size_binary.rs @@ -1,3 +1,4 @@ +use futures::{pin_mut, Stream, StreamExt}; use parquet2::{ encoding::{bitpacking, hybrid_rle, uleb128, Encoding}, page::{DataPage, DataPageHeader, DataPageHeaderExt, FixedLenByteArrayPageDict}, @@ -159,6 +160,39 @@ where )) } +pub async fn stream_to_array( + pages: I, + size: i32, + metadata: &ColumnChunkMetaData, +) -> Result +where + ArrowError: From, + E: Clone, + I: Stream>, +{ + let capacity = metadata.num_values() as usize; + let mut values = MutableBuffer::::with_capacity(capacity * size as usize); + let mut validity = MutableBitmap::with_capacity(capacity); + + pin_mut!(pages); // needed for iteration + + while let Some(page) = pages.next().await { + extend_from_page( + page.as_ref().map_err(|x| x.clone())?, + size, + metadata.descriptor(), + &mut values, + &mut validity, + )? + } + + Ok(FixedSizeBinaryArray::from_data( + DataType::FixedSizeBinary(size), + values.into(), + validity.into(), + )) +} + pub(crate) fn extend_from_page( page: &DataPage, size: i32, diff --git a/src/io/parquet/read/mod.rs b/src/io/parquet/read/mod.rs index 18b27c5341..0a80092886 100644 --- a/src/io/parquet/read/mod.rs +++ b/src/io/parquet/read/mod.rs @@ -243,7 +243,13 @@ pub async fn page_stream_to_array { binary::stream_to_array::(pages, metadata, &data_type).await } - _ => todo!(), + FixedSizeBinary(size) => Ok(Box::new( + fixed_size_binary::stream_to_array(pages, size, metadata).await?, + )), + other => Err(ArrowError::NotYetImplemented(format!( + "Async conversion of {:?}", + other + ))), } }