diff --git a/src/io/parquet/read/binary/nested.rs b/src/io/parquet/read/binary/nested.rs index 5ae93116a0f..d0dd6cb26f8 100644 --- a/src/io/parquet/read/binary/nested.rs +++ b/src/io/parquet/read/binary/nested.rs @@ -65,8 +65,6 @@ fn read( match (rep_level_encoding.0, def_level_encoding.0) { (Encoding::Rle, Encoding::Rle) => { - let rep_levels = - HybridRleDecoder::new(rep_levels, get_bit_width(rep_level_encoding.1), additional); if is_nullable { let def_levels = HybridRleDecoder::new( def_levels, @@ -79,6 +77,8 @@ fn read( read_plain_required(values_buffer, additional, values) } + let rep_levels = + HybridRleDecoder::new(rep_levels, get_bit_width(rep_level_encoding.1), additional); let def_levels = HybridRleDecoder::new(def_levels, get_bit_width(def_level_encoding.1), additional); diff --git a/src/io/parquet/read/boolean/mod.rs b/src/io/parquet/read/boolean/mod.rs index 423ca16800a..ab4a16b21a9 100644 --- a/src/io/parquet/read/boolean/mod.rs +++ b/src/io/parquet/read/boolean/mod.rs @@ -1,46 +1,17 @@ +mod basic; +mod nested; + use std::sync::Arc; use crate::{ - array::{Array, BooleanArray}, - bitmap::MutableBitmap, - datatypes::DataType, + array::Array, + datatypes::{DataType, Field}, error::Result, }; -use parquet2::{metadata::ColumnDescriptor, page::DataPage}; - -mod basic; -mod nested; - use self::basic::BooleanArrayIterator; - -use super::{nested_utils::Nested, DataPages}; - -fn page_to_array_nested( - page: &DataPage, - descriptor: &ColumnDescriptor, - data_type: DataType, - nested: &mut Vec>, - is_nullable: bool, -) -> Result { - let capacity = page.num_values() as usize; - let mut values = MutableBitmap::with_capacity(capacity); - let mut validity = MutableBitmap::with_capacity(capacity); - nested::extend_from_page( - page, - descriptor, - is_nullable, - nested, - &mut values, - &mut validity, - )?; - - Ok(BooleanArray::from_data( - data_type, - values.into(), - validity.into(), - )) -} +use self::nested::ArrayIterator; +use super::{nested_utils::NestedState, DataPages}; /// Converts [`DataPages`] to an [`Iterator`] of [`Array`] pub fn iter_to_arrays<'a, I: 'a>( @@ -57,3 +28,20 @@ where .map(|x| x.map(|x| Arc::new(x) as Arc)), ) } + +/// Converts [`DataPages`] to an [`Iterator`] of [`Array`] +pub fn iter_to_arrays_nested<'a, I: 'a>( + iter: I, + field: Field, + chunk_size: usize, +) -> Box)>> + 'a> +where + I: DataPages, +{ + Box::new(ArrayIterator::new(iter, field, chunk_size).map(|x| { + x.map(|(nested, array)| { + let values = Arc::new(array) as Arc; + (nested, values) + }) + })) +} diff --git a/src/io/parquet/read/boolean/nested.rs b/src/io/parquet/read/boolean/nested.rs index bf3d4125759..420904a3ab8 100644 --- a/src/io/parquet/read/boolean/nested.rs +++ b/src/io/parquet/read/boolean/nested.rs @@ -1,135 +1,246 @@ +use std::collections::VecDeque; + use parquet2::{ encoding::{hybrid_rle::HybridRleDecoder, Encoding}, - metadata::ColumnDescriptor, page::DataPage, read::levels::get_bit_width, + schema::Repetition, }; -use super::super::nested_utils::*; -use super::super::utils; use crate::{ + array::BooleanArray, bitmap::{utils::BitmapIter, MutableBitmap}, + datatypes::{DataType, Field}, error::Result, + io::parquet::read::{utils::Decoder, DataPages}, }; -fn read_required(buffer: &[u8], additional: usize, values: &mut MutableBitmap) { - // in PLAIN, booleans are LSB bitpacked and thus we can read them as if they were a bitmap. - values.extend_from_slice(buffer, 0, additional); -} +use super::super::nested_utils::*; +use super::super::utils; -fn read_values( - def_levels: D, +// The state of an optional DataPage with a boolean physical type +#[derive(Debug)] +struct Optional<'a> { + values: BitmapIter<'a>, + definition_levels: HybridRleDecoder<'a>, max_def: u32, - mut new_values: G, - values: &mut MutableBitmap, - validity: &mut MutableBitmap, -) where - D: Iterator, - G: Iterator, -{ - def_levels.for_each(|def| { - if def == max_def { - values.push(new_values.next().unwrap()); - validity.push(true); - } else if def == max_def - 1 { - values.push(false); - validity.push(false); +} + +impl<'a> Optional<'a> { + pub fn new(page: &'a DataPage) -> Self { + let (_, def_levels, values_buffer, _) = utils::split_buffer(page, page.descriptor()); + + // in PLAIN, booleans are LSB bitpacked and thus we can read them as if they were a bitmap. + // note that `values_buffer` contains only non-null values. + // thus, at this point, it is not known how many values this buffer contains + // values_len is the upper bound. The actual number depends on how many nulls there is. + let values_len = values_buffer.len() * 8; + let values = BitmapIter::new(values_buffer, 0, values_len); + + let max_def = page.descriptor().max_def_level(); + + Self { + values, + definition_levels: HybridRleDecoder::new( + def_levels, + get_bit_width(max_def), + page.num_values(), + ), + max_def: max_def as u32, } - }); + } } -#[allow(clippy::too_many_arguments)] -fn read( - rep_levels: &[u8], - def_levels: &[u8], - values_buffer: &[u8], - additional: usize, - rep_level_encoding: (&Encoding, i16), - def_level_encoding: (&Encoding, i16), - is_nullable: bool, - nested: &mut Vec>, - values: &mut MutableBitmap, - validity: &mut MutableBitmap, -) { - let max_rep_level = rep_level_encoding.1 as u32; - let max_def_level = def_level_encoding.1 as u32; - - match (rep_level_encoding.0, def_level_encoding.0) { - (Encoding::Rle, Encoding::Rle) => { - if is_nullable { - let def_levels = HybridRleDecoder::new( - def_levels, - get_bit_width(def_level_encoding.1), - additional, - ); +// The state of a required DataPage with a boolean physical type +#[derive(Debug)] +struct Required<'a> { + values: &'a [u8], + // invariant: offset <= length; + offset: usize, + length: usize, +} + +impl<'a> Required<'a> { + pub fn new(page: &'a DataPage) -> Self { + Self { + values: page.buffer(), + offset: 0, + length: page.num_values(), + } + } +} + +// The state of a `DataPage` of `Boolean` parquet boolean type +#[derive(Debug)] +enum State<'a> { + Optional(Optional<'a>), + Required(Required<'a>), +} + +impl<'a> State<'a> { + pub fn len(&self) -> usize { + match self { + State::Optional(page) => page.definition_levels.size_hint().0, + State::Required(page) => page.length - page.offset, + } + } +} + +impl<'a> utils::PageState<'a> for State<'a> { + fn len(&self) -> usize { + self.len() + } +} + +fn build_state(page: &DataPage) -> Result { + let is_optional = + page.descriptor().type_().get_basic_info().repetition() == &Repetition::Optional; + + match (page.encoding(), is_optional) { + (Encoding::Plain, true) => Ok(State::Optional(Optional::new(page))), + (Encoding::Plain, false) => Ok(State::Required(Required::new(page))), + _ => Err(utils::not_implemented( + &page.encoding(), + is_optional, + false, + "any", + "Boolean", + )), + } +} + +#[derive(Default)] +struct BooleanDecoder {} - // don't know how many values there is: using the max possible - let num_valid_values = additional.min(values_buffer.len() * 8); +impl<'a> Decoder<'a, bool, MutableBitmap> for BooleanDecoder { + type State = State<'a>; - let new_values = BitmapIter::new(values_buffer, 0, num_valid_values); - read_values(def_levels, max_def_level, new_values, values, validity) - } else { - read_required(values_buffer, additional, values) + fn with_capacity(&self, capacity: usize) -> MutableBitmap { + MutableBitmap::with_capacity(capacity) + } + + fn extend_from_state( + state: &mut State, + values: &mut MutableBitmap, + validity: &mut MutableBitmap, + required: usize, + ) { + match state { + State::Optional(page) => read_optional_values( + page.definition_levels.by_ref(), + page.max_def, + page.values.by_ref(), + values, + validity, + required, + ), + State::Required(page) => { + values.extend_from_slice(page.values, page.offset, required); + page.offset += required; } + } + } +} - let rep_levels = - HybridRleDecoder::new(rep_levels, get_bit_width(rep_level_encoding.1), additional); - let def_levels = - HybridRleDecoder::new(def_levels, get_bit_width(def_level_encoding.1), additional); +/// An iterator adapter over [`DataPages`] assumed to be encoded as boolean arrays +#[derive(Debug)] +pub struct ArrayIterator { + iter: I, + field: Field, + // invariant: items.len() == nested.len() + items: VecDeque<(MutableBitmap, MutableBitmap)>, + nested: VecDeque, + chunk_size: usize, +} - extend_offsets( - rep_levels, - def_levels, - is_nullable, - max_rep_level, - max_def_level, - nested, - ) +impl ArrayIterator { + pub fn new(iter: I, field: Field, chunk_size: usize) -> Self { + Self { + iter, + field, + items: VecDeque::new(), + nested: VecDeque::new(), + chunk_size, } - _ => todo!(), } } -pub(super) fn extend_from_page( - page: &DataPage, - descriptor: &ColumnDescriptor, - is_nullable: bool, - nested: &mut Vec>, - values: &mut MutableBitmap, - validity: &mut MutableBitmap, -) -> Result<()> { - let additional = page.num_values(); - - let (rep_levels, def_levels, values_buffer, version) = utils::split_buffer(page, descriptor); - - match (&page.encoding(), page.dictionary_page()) { - (Encoding::Plain, None) => read( - rep_levels, - def_levels, - values_buffer, - additional, - ( - &page.repetition_level_encoding(), - descriptor.max_rep_level(), - ), - ( - &page.definition_level_encoding(), - descriptor.max_def_level(), - ), - is_nullable, - nested, - values, - validity, - ), - _ => { - return Err(utils::not_implemented( - &page.encoding(), - is_nullable, - page.dictionary_page().is_some(), - version, - "primitive", - )) +impl Iterator for ArrayIterator { + type Item = Result<(NestedState, BooleanArray)>; + + fn next(&mut self) -> Option { + // back[a1, a2, a3, ...]front + if self.items.len() > 1 { + let nested = self.nested.pop_back().unwrap(); + let (values, validity) = self.items.pop_back().unwrap(); + let array = BooleanArray::from_data(DataType::Boolean, values.into(), validity.into()); + return Some(Ok((nested, array))); + } + match ( + self.nested.pop_back(), + self.items.pop_back(), + self.iter.next(), + ) { + (_, _, Err(e)) => Some(Err(e.into())), + (None, None, Ok(None)) => None, + (state, p_state, Ok(Some(page))) => { + // the invariant + assert_eq!(state.is_some(), p_state.is_some()); + + // there is a new page => consume the page from the start + let mut nested_page = NestedPage::new(page); + + // read next chunk from `nested_page` and get number of values to read + let maybe_nested = extend_offsets1( + &mut nested_page, + state, + &self.field, + &mut self.nested, + self.chunk_size, + ); + let nested = match maybe_nested { + Ok(nested) => nested, + Err(e) => return Some(Err(e)), + }; + // at this point we know whether there were enough rows in `page` + // to fill chunk_size or not (`nested.is_some()`) + // irrespectively, we need to consume the values from the page + + let maybe_page = build_state(page); + let page = match maybe_page { + Ok(page) => page, + Err(e) => return Some(Err(e)), + }; + + let maybe_array = extend_from_new_page::( + page, + p_state, + &mut self.items, + &nested, + &self.nested, + &BooleanDecoder::default(), + ); + let state = match maybe_array { + Ok(s) => s, + Err(e) => return Some(Err(e)), + }; + match nested { + Some(p_state) => Some(Ok(( + p_state, + BooleanArray::from_data(DataType::Boolean, state.0.into(), state.1.into()), + ))), + None => self.next(), + } + } + (Some(nested), Some((values, validity)), Ok(None)) => { + // we have a populated item and no more pages + // the only case where an item's length may be smaller than chunk_size + let array = + BooleanArray::from_data(DataType::Boolean, values.into(), validity.into()); + Some(Ok((nested, array))) + } + (Some(_), None, _) => unreachable!(), + (None, Some(_), _) => unreachable!(), } } - Ok(()) } diff --git a/src/io/parquet/read/mod.rs b/src/io/parquet/read/mod.rs index 257141c93ea..eed913a8d37 100644 --- a/src/io/parquet/read/mod.rs +++ b/src/io/parquet/read/mod.rs @@ -31,10 +31,7 @@ use crate::{ array::{Array, BinaryArray, DictionaryKey, NullArray, PrimitiveArray, StructArray, Utf8Array}, datatypes::{DataType, Field, IntervalUnit, TimeUnit}, error::{ArrowError, Result}, - io::parquet::read::{ - nested_utils::{create_list, init_nested}, - primitive::read_item, - }, + io::parquet::read::{nested_utils::create_list, primitive::read_item}, }; mod binary; @@ -54,8 +51,6 @@ pub use row_group::*; pub(crate) use schema::is_type_nullable; pub use schema::{get_schema, FileMetaData}; -use self::nested_utils::Nested; - pub trait DataPages: FallibleStreamingIterator {} impl> DataPages for I {} @@ -278,20 +273,17 @@ fn column_datatype(data_type: &DataType, column: usize) -> DataType { } } -fn page_iter_to_arrays< - 'a, - I: 'a + FallibleStreamingIterator, ->( +fn page_iter_to_arrays<'a, I: 'a + DataPages>( iter: I, metadata: &ColumnChunkMetaData, - data_type: DataType, + field: Field, chunk_size: usize, ) -> Result>> + 'a>> { use DataType::*; let is_optional = metadata.descriptor().max_def_level() != metadata.descriptor().max_rep_level(); let type_ = metadata.descriptor().type_(); - match data_type.to_logical_type() { + match field.data_type.to_logical_type() { /*Null => Ok(Box::new(NullArray::from_data( data_type, metadata.num_values() as usize, @@ -299,13 +291,13 @@ fn page_iter_to_arrays< Boolean => Ok(boolean::iter_to_arrays( iter, is_optional, - data_type, + field.data_type, chunk_size, )), UInt8 => Ok(primitive::iter_to_arrays( iter, is_optional, - data_type, + field.data_type, chunk_size, read_item, |x: i32| x as u8, @@ -313,7 +305,7 @@ fn page_iter_to_arrays< UInt16 => Ok(primitive::iter_to_arrays( iter, is_optional, - data_type, + field.data_type, chunk_size, read_item, |x: i32| x as u16, @@ -321,7 +313,7 @@ fn page_iter_to_arrays< UInt32 => Ok(primitive::iter_to_arrays( iter, is_optional, - data_type, + field.data_type, chunk_size, read_item, |x: i32| x as u32, @@ -329,7 +321,7 @@ fn page_iter_to_arrays< Int8 => Ok(primitive::iter_to_arrays( iter, is_optional, - data_type, + field.data_type, chunk_size, read_item, |x: i32| x as i8, @@ -337,7 +329,7 @@ fn page_iter_to_arrays< Int16 => Ok(primitive::iter_to_arrays( iter, is_optional, - data_type, + field.data_type, chunk_size, read_item, |x: i32| x as i16, @@ -346,7 +338,7 @@ fn page_iter_to_arrays< Ok(primitive::iter_to_arrays( iter, is_optional, - data_type, + field.data_type, chunk_size, read_item, |x: i32| x as i32, @@ -371,7 +363,7 @@ fn page_iter_to_arrays< ParquetTimeUnit::MILLIS(_) => primitive::iter_to_arrays( iter, is_optional, - data_type, + field.data_type, chunk_size, read_item, |x: i64| x * 1_000_000, @@ -379,7 +371,7 @@ fn page_iter_to_arrays< ParquetTimeUnit::MICROS(_) => primitive::iter_to_arrays( iter, is_optional, - data_type, + field.data_type, chunk_size, read_item, |x: i64| x * 1_000, @@ -387,7 +379,7 @@ fn page_iter_to_arrays< ParquetTimeUnit::NANOS(_) => primitive::iter_to_arrays( iter, is_optional, - data_type, + field.data_type, chunk_size, read_item, |x: i64| x, @@ -396,7 +388,7 @@ fn page_iter_to_arrays< _ => Ok(primitive::iter_to_arrays( iter, is_optional, - data_type, + field.data_type, chunk_size, read_item, |x: i64| x, @@ -406,8 +398,13 @@ fn page_iter_to_arrays< }, FixedSizeBinary(_) => Ok(Box::new( - fixed_size_binary::BinaryArrayIterator::new(iter, data_type, chunk_size, is_optional) - .map(|x| x.map(|x| Arc::new(x) as _)), + fixed_size_binary::BinaryArrayIterator::new( + iter, + field.data_type, + chunk_size, + is_optional, + ) + .map(|x| x.map(|x| Arc::new(x) as _)), )), Decimal(_, _) => match type_ { @@ -415,7 +412,7 @@ fn page_iter_to_arrays< PhysicalType::Int32 => primitive::iter_to_arrays( iter, is_optional, - data_type, + field.data_type, chunk_size, read_item, |x: i32| x as i128, @@ -423,7 +420,7 @@ fn page_iter_to_arrays< PhysicalType::Int64 => primitive::iter_to_arrays( iter, is_optional, - data_type, + field.data_type, chunk_size, read_item, |x: i64| x as i128, @@ -462,7 +459,7 @@ fn page_iter_to_arrays< let validity = array.validity().cloned(); Ok(PrimitiveArray::::from_data( - data_type.clone(), + field.data_type.clone(), values.into(), validity, )) @@ -482,7 +479,7 @@ fn page_iter_to_arrays< Ok(primitive::iter_to_arrays( iter, is_optional, - data_type, + field.data_type, chunk_size, read_item, |x: i64| x as i64, @@ -491,7 +488,7 @@ fn page_iter_to_arrays< UInt64 => Ok(primitive::iter_to_arrays( iter, is_optional, - data_type, + field.data_type, chunk_size, read_item, |x: i64| x as u64, @@ -500,7 +497,7 @@ fn page_iter_to_arrays< Float32 => Ok(primitive::iter_to_arrays( iter, is_optional, - data_type, + field.data_type, chunk_size, read_item, |x: f32| x, @@ -508,7 +505,7 @@ fn page_iter_to_arrays< Float64 => Ok(primitive::iter_to_arrays( iter, is_optional, - data_type, + field.data_type, chunk_size, read_item, |x: f64| x, @@ -517,37 +514,34 @@ fn page_iter_to_arrays< Binary => Ok(binary::iter_to_arrays::, _>( iter, is_optional, - data_type, + field.data_type, chunk_size, )), LargeBinary => Ok(binary::iter_to_arrays::, _>( iter, is_optional, - data_type, + field.data_type, chunk_size, )), Utf8 => Ok(binary::iter_to_arrays::, _>( iter, is_optional, - data_type, + field.data_type, chunk_size, )), LargeUtf8 => Ok(binary::iter_to_arrays::, _>( iter, is_optional, - data_type, + field.data_type, chunk_size, )), Dictionary(key_type, _, _) => match_integer_type!(key_type, |$K| { - dict_read::<$K, _>(iter, is_optional, type_, data_type, chunk_size) + dict_read::<$K, _>(iter, is_optional, type_, field.data_type, chunk_size) }), + List(_) => page_iter_to_arrays_nested(iter, field, chunk_size), /* - List(ref inner) => { - let values = page_iter_to_array(iter, nested, metadata, inner.data_type().clone()); - create_list(data_type, nested, values.into()) - } LargeList(ref inner) => { let values = page_iter_to_array(iter, nested, metadata, inner.data_type().clone()); create_list(data_type, nested, values.into()) @@ -584,6 +578,22 @@ fn finish_array(data_type: DataType, arrays: &mut VecDeque>) -> B } } +fn page_iter_to_arrays_nested<'a, I: 'a + DataPages>( + iter: I, + field: Field, + chunk_size: usize, +) -> Result>> + 'a>> { + let iter = boolean::iter_to_arrays_nested(iter, field.clone(), chunk_size); + + let iter = iter.map(move |x| { + let (mut nested, array) = x?; + let _ = nested.nested.pop().unwrap(); // the primitive + create_list(field.data_type().clone(), &mut nested, array) + }); + + Ok(Box::new(iter)) +} + /* /// Returns an iterator of [`Array`] built from an iterator of column chunks. It also returns /// the two buffers used to decompress and deserialize pages (to be re-used). diff --git a/src/io/parquet/read/nested_utils.rs b/src/io/parquet/read/nested_utils.rs index 54a4f483084..fbee8238277 100644 --- a/src/io/parquet/read/nested_utils.rs +++ b/src/io/parquet/read/nested_utils.rs @@ -1,4 +1,8 @@ -use std::sync::Arc; +use std::{collections::VecDeque, sync::Arc}; + +use parquet2::{ + encoding::hybrid_rle::HybridRleDecoder, page::DataPage, read::levels::get_bit_width, +}; use crate::{ array::{Array, ListArray}, @@ -8,6 +12,8 @@ use crate::{ error::{ArrowError, Result}, }; +use super::utils::{split_buffer, Decoder, Pushable}; + /// trait describing deserialized repetition and definition levels pub trait Nested: std::fmt::Debug { fn inner(&mut self) -> (Buffer, Option); @@ -21,16 +27,26 @@ pub trait Nested: std::fmt::Debug { fn close(&mut self, length: i64); fn is_nullable(&self) -> bool; + + /// number of rows + fn len(&self) -> usize; + + /// number of values associated to the primitive type this nested tracks + fn num_values(&self) -> usize; } #[derive(Debug, Default)] pub struct NestedPrimitive { is_nullable: bool, + length: usize, } impl NestedPrimitive { pub fn new(is_nullable: bool) -> Self { - Self { is_nullable } + Self { + is_nullable, + length: 0, + } } } @@ -48,13 +64,23 @@ impl Nested for NestedPrimitive { self.is_nullable } - fn push(&mut self, _value: i64, _is_valid: bool) {} + fn push(&mut self, _value: i64, _is_valid: bool) { + self.length += 1 + } fn offsets(&mut self) -> &[i64] { &[] } fn close(&mut self, _length: i64) {} + + fn len(&self) -> usize { + self.length + } + + fn num_values(&self) -> usize { + self.length + } } #[derive(Debug, Default)] @@ -91,6 +117,14 @@ impl Nested for NestedOptional { fn close(&mut self, length: i64) { self.offsets.push(length) } + + fn len(&self) -> usize { + self.offsets.len().saturating_sub(1) + } + + fn num_values(&self) -> usize { + self.offsets.last().copied().unwrap_or(0) as usize + } } impl NestedOptional { @@ -132,6 +166,14 @@ impl Nested for NestedValid { fn close(&mut self, length: i64) { self.offsets.push(length) } + + fn len(&self) -> usize { + self.offsets.len().saturating_sub(1) + } + + fn num_values(&self) -> usize { + self.offsets.last().copied().unwrap_or(0) as usize + } } impl NestedValid { @@ -141,6 +183,35 @@ impl NestedValid { } } +pub(super) fn read_optional_values( + def_levels: D, + max_def: u32, + mut new_values: G, + values: &mut P, + validity: &mut MutableBitmap, + mut remaining: usize, +) where + D: Iterator, + G: Iterator, + C: Default, + P: Pushable, +{ + for def in def_levels { + if def == max_def { + values.push(new_values.next().unwrap()); + validity.push(true); + remaining -= 1; + } else if def == max_def - 1 { + values.push(C::default()); + validity.push(false); + remaining -= 1; + } + if remaining == 0 { + break; + } + } +} + pub fn extend_offsets( rep_levels: R, def_levels: D, @@ -212,7 +283,7 @@ pub fn extend_offsets( }); } -pub fn init_nested(field: &Field, capacity: usize, container: &mut Vec>) { +fn init_nested_recursive(field: &Field, capacity: usize, container: &mut Vec>) { let is_nullable = field.is_nullable; use crate::datatypes::PhysicalType::*; @@ -231,7 +302,7 @@ pub fn init_nested(field: &Field, capacity: usize, container: &mut Vec { - init_nested(inner.as_ref(), capacity, container) + init_nested_recursive(inner.as_ref(), capacity, container) } _ => unreachable!(), }; @@ -241,7 +312,7 @@ pub fn init_nested(field: &Field, capacity: usize, container: &mut Vec NestedState { + let mut container = vec![]; + init_nested_recursive(field, capacity, &mut container); + NestedState::new(container) +} + pub fn create_list( data_type: DataType, - nested: &mut Vec>, + nested: &mut NestedState, values: Arc, -) -> Result> { +) -> Result> { Ok(match data_type { DataType::List(_) => { - let (offsets, validity) = nested.pop().unwrap().inner(); + let (offsets, validity) = nested.nested.pop().unwrap().inner(); let offsets = Buffer::::from_trusted_len_iter(offsets.iter().map(|x| *x as i32)); - Box::new(ListArray::::from_data( + Arc::new(ListArray::::from_data( data_type, offsets, values, validity, )) } DataType::LargeList(_) => { - let (offsets, validity) = nested.pop().unwrap().inner(); + let (offsets, validity) = nested.nested.pop().unwrap().inner(); - Box::new(ListArray::::from_data( + Arc::new(ListArray::::from_data( data_type, offsets, values, validity, )) } @@ -279,3 +356,245 @@ pub fn create_list( } }) } + +pub struct NestedPage<'a> { + repetitions: HybridRleDecoder<'a>, + max_rep_level: u32, + definitions: HybridRleDecoder<'a>, + max_def_level: u32, +} + +impl<'a> NestedPage<'a> { + pub fn new(page: &'a DataPage) -> Self { + let (rep_levels, def_levels, _, _) = split_buffer(page, page.descriptor()); + + let max_rep_level = page.descriptor().max_rep_level(); + let max_def_level = page.descriptor().max_def_level(); + + Self { + repetitions: HybridRleDecoder::new( + rep_levels, + get_bit_width(max_rep_level), + page.num_values(), + ), + max_rep_level: max_rep_level as u32, + definitions: HybridRleDecoder::new( + def_levels, + get_bit_width(max_def_level), + page.num_values(), + ), + max_def_level: max_def_level as u32, + } + } + + // number of values (!= number of rows) + pub fn len(&self) -> usize { + self.repetitions.size_hint().0 + } +} + +#[derive(Debug)] +pub struct NestedState { + pub nested: Vec>, +} + +impl NestedState { + pub fn new(nested: Vec>) -> Self { + Self { nested } + } + + /// The number of rows in this state + pub fn len(&self) -> usize { + // outermost is the number of rows + self.nested[0].len() + } + + /// The number of values associated with the primitive type + pub fn num_values(&self) -> usize { + self.nested[0].num_values() + } + + /// Whether the primitive is optional + pub fn is_optional(&self) -> bool { + self.nested.last().unwrap().is_nullable() + } + + pub fn depth(&self) -> usize { + // outermost is the number of rows + self.nested.len() + } +} + +pub(super) fn extend_from_new_page<'a, T: Decoder<'a, C, P>, C: Default, P: Pushable>( + mut page: T::State, + state: Option<(P, MutableBitmap)>, + items: &mut VecDeque<(P, MutableBitmap)>, + nested_state: &Option, + nested: &VecDeque, + decoder: &T, +) -> Result<(P, MutableBitmap)> { + let needed = nested_state + .as_ref() + .map(|x| x.num_values()) + // unwrap is fine because either there is a state or the state is in nested + .unwrap_or_else(|| nested.back().unwrap().num_values()); + + let (mut values, mut validity) = if let Some((values, validity)) = state { + // there is a already a state => it must be incomplete... + debug_assert!( + values.len() < needed, + "the temp array is expected to be incomplete" + ); + (values, validity) + } else { + // there is no state => initialize it + ( + decoder.with_capacity(needed), + MutableBitmap::with_capacity(needed), + ) + }; + + let remaining = needed - values.len(); + + // extend the current state + T::extend_from_state(&mut page, &mut values, &mut validity, remaining); + + // the number of values required is always fulfilled because + // dremel assigns one (rep, def) to each value and we request + // items that complete a row + assert_eq!(values.len(), remaining); + + for nest in nested { + let num_values = nest.num_values(); + let mut values = decoder.with_capacity(num_values); + let mut validity = MutableBitmap::with_capacity(num_values); + T::extend_from_state(&mut page, &mut values, &mut validity, num_values); + items.push_back((values, validity)); + } + + assert_eq!(items.len(), nested.len()); + + // and return this item + Ok((values, validity)) +} + +/// Extends `state` by consuming `page`, optionally extending `items` if `page` +/// has less items than `chunk_size` +pub fn extend_offsets1<'a>( + page: &mut NestedPage<'a>, + state: Option, + field: &Field, + items: &mut VecDeque, + chunk_size: usize, +) -> Result> { + let mut nested = if let Some(nested) = state { + // there is a already a state => it must be incomplete... + debug_assert!( + nested.len() < chunk_size, + "the temp array is expected to be incomplete" + ); + nested + } else { + // there is no state => initialize it + init_nested(field, chunk_size) + }; + + let remaining = chunk_size - nested.len(); + + // extend the current state + extend_offsets2(page, &mut nested, remaining); + + if nested.len() < chunk_size { + // the whole page was consumed and we still do not have enough items + // => push the values to `items` so that it can be continued later + items.push_back(nested); + // and indicate that there is no item available + return Ok(None); + } + + while page.len() > 0 { + let mut nested = init_nested(field, chunk_size); + extend_offsets2(page, &mut nested, chunk_size); + items.push_back(nested) + } + + // and return + Ok(Some(nested)) +} + +fn extend_offsets2<'a>(page: &mut NestedPage<'a>, nested: &mut NestedState, additional: usize) { + let is_optional = nested.is_optional(); + let mut values_count = vec![0; nested.depth()]; + let mut prev_def: u32 = 0; + let mut is_first = true; + + let max_def = page.max_def_level; + let max_rep = page.max_rep_level; + + let mut iter = page.repetitions.by_ref().zip(page.definitions.by_ref()); + + let mut rows = 0; + while rows < additional { + // unwrap is ok because by definition there has to be a closing statement + let (rep, def) = iter.next().unwrap(); + if rep == 0 { + rows += 1 + } + + let mut closures = max_rep - rep; + if prev_def <= 1 { + closures = 1; + }; + if is_first { + // close on first run to ensure offsets start with 0. + closures = max_rep; + is_first = false; + } + + nested + .nested + .iter_mut() + .zip(values_count.iter()) + .enumerate() + .skip(rep as usize) + .take((rep + closures) as usize) + .for_each(|(depth, (nested, length))| { + let is_null = (def - rep) as usize == depth && depth == rep as usize; + nested.push(*length, !is_null); + }); + + values_count + .iter_mut() + .enumerate() + .for_each(|(depth, values)| { + if depth == 1 { + if def == max_def || (is_optional && def == max_def - 1) { + *values += 1 + } + } else if depth == 0 { + let a = nested + .nested + .get(depth + 1) + .map(|x| x.is_nullable()) + .unwrap_or_default(); // todo: cumsum this + let condition = rep == 1 + || rep == 0 + && def >= max_def.saturating_sub((a as u32) + (!is_optional as u32)); + + if condition { + *values += 1; + } + } + }); + prev_def = def; + } + + // close validities + nested + .nested + .iter_mut() + .zip(values_count.iter()) + .for_each(|(nested, length)| { + nested.close(*length); + }); +} diff --git a/src/io/parquet/read/row_group.rs b/src/io/parquet/read/row_group.rs index 8f71f5dcff3..1f58bcf0389 100644 --- a/src/io/parquet/read/row_group.rs +++ b/src/io/parquet/read/row_group.rs @@ -72,14 +72,14 @@ pub(super) fn get_iterators( page_iter_to_arrays( pages, column_meta, - field.data_type().clone(), + field.clone(), chunk_size .unwrap_or(usize::MAX) - .min(column_meta.num_values() as usize), + .min(row_group.num_rows() as usize), ) }) }) - // todo: generalize for nested + // todo: generalize for struct type .next() .unwrap() }) diff --git a/src/io/parquet/read/utils.rs b/src/io/parquet/read/utils.rs index c4c01c464e5..2e90115bded 100644 --- a/src/io/parquet/read/utils.rs +++ b/src/io/parquet/read/utils.rs @@ -233,8 +233,6 @@ pub(super) fn extend_from_decoder<'a, T: Default, P: Pushable, I: Iterator( diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index 83f7cf6a5fa..c2cb5eb7125 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -160,6 +160,8 @@ pub fn pyarrow_nested_nullable(column: usize) -> Box { let validity = Some(Bitmap::from([ true, false, true, true, true, true, false, true, ])); + // [0, 2, 2, 5, 8, 8, 11, 11, 12] + // [[a1, a2], None, [a3, a4, a5], [a6, a7, a8], [], [a9, a10, a11], None, [a12]] let data_type = DataType::List(Box::new(field)); Box::new(ListArray::::from_data( data_type, offsets, values, validity,