diff --git a/src/io/parquet/read/fixed_size_binary/dictionary.rs b/src/io/parquet/read/fixed_size_binary/dictionary.rs new file mode 100644 index 00000000000..02a080421ce --- /dev/null +++ b/src/io/parquet/read/fixed_size_binary/dictionary.rs @@ -0,0 +1,155 @@ +use std::{collections::VecDeque, sync::Arc}; + +use parquet2::page::FixedLenByteArrayPageDict; + +use crate::{ + array::{Array, DictionaryArray, DictionaryKey, FixedSizeBinaryArray, PrimitiveArray}, + bitmap::MutableBitmap, + datatypes::DataType, + error::{ArrowError, Result}, +}; + +use super::super::dictionary::*; +use super::super::utils; +use super::super::utils::Decoder; +use super::super::DataPages; + +/// An iterator adapter over [`DataPages`] assumed to be encoded as parquet's dictionary-encoded binary representation +#[derive(Debug)] +pub struct ArrayIterator +where + I: DataPages, + K: DictionaryKey, +{ + iter: I, + data_type: DataType, + values: Dict, + items: VecDeque<(Vec, MutableBitmap)>, + chunk_size: usize, +} + +impl ArrayIterator +where + K: DictionaryKey, + I: DataPages, +{ + fn new(iter: I, data_type: DataType, chunk_size: usize) -> Self { + let data_type = match data_type { + DataType::Dictionary(_, values, _) => values.as_ref().clone(), + _ => unreachable!(), + }; + Self { + iter, + data_type, + values: Dict::Empty, + items: VecDeque::new(), + chunk_size, + } + } +} + +impl Iterator for ArrayIterator +where + I: DataPages, + K: DictionaryKey, +{ + type Item = Result>; + + fn next(&mut self) -> Option { + // back[a1, a2, a3, ...]front + if self.items.len() > 1 { + return self.items.pop_back().map(|(values, validity)| { + let keys = finish_key(values, validity); + let values = self.values.unwrap(); + Ok(DictionaryArray::from_data(keys, values)) + }); + } + match (self.items.pop_back(), self.iter.next()) { + (_, Err(e)) => Some(Err(e.into())), + (None, Ok(None)) => None, + (state, Ok(Some(page))) => { + // consume the dictionary page + if let Some(dict) = page.dictionary_page() { + let dict = dict + .as_any() + .downcast_ref::() + .unwrap(); + self.values = match &mut self.values { + Dict::Empty => { + let values = dict.values().to_vec(); + + let array = Arc::new(FixedSizeBinaryArray::from_data( + self.data_type.clone(), + values.into(), + None, + )) as _; + Dict::Complete(array) + } + _ => unreachable!(), + }; + } else { + return Some(Err(ArrowError::nyi( + "dictionary arrays from non-dict-encoded pages", + ))); + } + + let maybe_array = { + // there is a new page => consume the page from the start + let maybe_page = PrimitiveDecoder::default().build_state(page); + let page = match maybe_page { + Ok(page) => page, + Err(e) => return Some(Err(e)), + }; + + utils::extend_from_new_page::, _, _>( + page, + state, + self.chunk_size, + &mut self.items, + &PrimitiveDecoder::default(), + ) + }; + match maybe_array { + Ok(Some((values, validity))) => { + let keys = PrimitiveArray::from_data( + K::PRIMITIVE.into(), + values.into(), + validity.into(), + ); + + let values = self.values.unwrap(); + Some(Ok(DictionaryArray::from_data(keys, values))) + } + Ok(None) => self.next(), + Err(e) => Some(Err(e)), + } + } + (Some((values, validity)), Ok(None)) => { + // we have a populated item and no more pages + // the only case where an item's length may be smaller than chunk_size + debug_assert!(values.len() <= self.chunk_size); + + let keys = finish_key(values, validity); + + let values = self.values.unwrap(); + Some(Ok(DictionaryArray::from_data(keys, values))) + } + } + } +} + +/// Converts [`DataPages`] to an [`Iterator`] of [`Array`] +pub fn iter_to_arrays<'a, K, I>( + iter: I, + data_type: DataType, + chunk_size: usize, +) -> Box>> + 'a> +where + I: 'a + DataPages, + K: DictionaryKey, +{ + Box::new( + ArrayIterator::::new(iter, data_type, chunk_size) + .map(|x| x.map(|x| Arc::new(x) as Arc)), + ) +} diff --git a/src/io/parquet/read/fixed_size_binary/mod.rs b/src/io/parquet/read/fixed_size_binary/mod.rs index 4acd297d52a..a0f2b3121cc 100644 --- a/src/io/parquet/read/fixed_size_binary/mod.rs +++ b/src/io/parquet/read/fixed_size_binary/mod.rs @@ -1,4 +1,6 @@ mod basic; +mod dictionary; mod utils; pub use basic::BinaryArrayIterator; +pub use dictionary::iter_to_arrays as iter_to_dict_arrays; diff --git a/src/io/parquet/read/mod.rs b/src/io/parquet/read/mod.rs index 280e17fa776..a4d36462cd4 100644 --- a/src/io/parquet/read/mod.rs +++ b/src/io/parquet/read/mod.rs @@ -190,6 +190,9 @@ fn dict_read<'a, K: DictionaryKey, I: 'a + DataPages>( LargeUtf8 | LargeBinary => { binary::iter_to_dict_arrays::(iter, data_type, chunk_size) } + FixedSizeBinary(_) => { + fixed_size_binary::iter_to_dict_arrays::(iter, data_type, chunk_size) + } other => { return Err(ArrowError::nyi(format!( "Reading dictionaries of type {:?}", diff --git a/src/scalar/equal.rs b/src/scalar/equal.rs index 583d36a5554..90f345268ca 100644 --- a/src/scalar/equal.rs +++ b/src/scalar/equal.rs @@ -135,6 +135,17 @@ fn equal(lhs: &dyn Scalar, rhs: &dyn Scalar) -> bool { let rhs = rhs.as_any().downcast_ref::().unwrap(); lhs == rhs } + DataType::FixedSizeBinary(_) => { + let lhs = lhs + .as_any() + .downcast_ref::() + .unwrap(); + let rhs = rhs + .as_any() + .downcast_ref::() + .unwrap(); + lhs == rhs + } other => unimplemented!("{:?}", other), } } diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index 11fe2d619b9..925d84d39c6 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -717,19 +717,28 @@ fn arrow_type() -> Result<()> { let array3 = DictionaryArray::from_data(indices.clone(), std::sync::Arc::new(values)); let values = BinaryArray::::from_slice([b"ab", b"ac"]); - let array4 = DictionaryArray::from_data(indices, std::sync::Arc::new(values)); + let array4 = DictionaryArray::from_data(indices.clone(), std::sync::Arc::new(values)); + + let values = FixedSizeBinaryArray::from_data( + DataType::FixedSizeBinary(2), + vec![b'a', b'b', b'a', b'c'].into(), + None, + ); + let array5 = DictionaryArray::from_data(indices, std::sync::Arc::new(values)); let schema = Schema::from(vec![ Field::new("a1", dt1, true), Field::new("a2", array2.data_type().clone(), true), Field::new("a3", array3.data_type().clone(), true), Field::new("a4", array4.data_type().clone(), true), + Field::new("a5", array5.data_type().clone(), true), ]); let batch = Chunk::try_new(vec![ Arc::new(array) as Arc, Arc::new(array2), Arc::new(array3), Arc::new(array4), + Arc::new(array5), ])?; let r = integration_write(&schema, &[batch.clone()])?;