From 18cfc091613e4f0fdca7f6cc371c449f059880d6 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Sat, 29 Jan 2022 22:52:04 +0000 Subject: [PATCH] Nested --- src/io/parquet/read/binary/basic.rs | 49 ++-- src/io/parquet/read/binary/mod.rs | 27 +- src/io/parquet/read/binary/nested.rs | 246 ++++++++--------- src/io/parquet/read/mod.rs | 258 +++++++++++------- src/io/parquet/read/nested_utils.rs | 112 ++------ src/io/parquet/read/primitive/basic.rs | 24 +- src/io/parquet/read/primitive/mod.rs | 37 ++- src/io/parquet/read/primitive/nested.rs | 339 +++++++++++++++--------- src/io/parquet/read/primitive/utils.rs | 7 - src/io/parquet/read/row_group.rs | 93 ++++--- src/io/parquet/read/utils.rs | 1 + 11 files changed, 672 insertions(+), 521 deletions(-) diff --git a/src/io/parquet/read/binary/basic.rs b/src/io/parquet/read/binary/basic.rs index b088ffc78b4..ce6be424cc7 100644 --- a/src/io/parquet/read/binary/basic.rs +++ b/src/io/parquet/read/binary/basic.rs @@ -15,7 +15,7 @@ use crate::{ error::Result, }; -use super::super::utils::{extend_from_decoder, next, MaybeNext, OptionalPageValidity}; +use super::super::utils::{extend_from_decoder, next, BinaryIter, MaybeNext, OptionalPageValidity}; use super::super::DataPages; use super::{super::utils, utils::Binary}; @@ -57,33 +57,16 @@ fn read_delta_optional( } */ -struct Optional<'a> { - values: utils::BinaryIter<'a>, - validity: OptionalPageValidity<'a>, -} - -impl<'a> Optional<'a> { - fn new(page: &'a DataPage) -> Self { - let (_, validity_buffer, values_buffer, _) = utils::split_buffer(page, page.descriptor()); - - let values = utils::BinaryIter::new(values_buffer); - - Self { - values, - validity: OptionalPageValidity::new(page), - } - } -} - -struct Required<'a> { - pub values: utils::BinaryIter<'a>, +#[derive(Debug)] +pub(super) struct Required<'a> { + pub values: BinaryIter<'a>, pub remaining: usize, } impl<'a> Required<'a> { - fn new(page: &'a DataPage) -> Self { + pub fn new(page: &'a DataPage) -> Self { Self { - values: utils::BinaryIter::new(page.buffer()), + values: BinaryIter::new(page.buffer()), remaining: page.num_values(), } } @@ -149,7 +132,7 @@ impl<'a> OptionalDictionary<'a> { } enum State<'a> { - Optional(Optional<'a>), + Optional(OptionalPageValidity<'a>, BinaryIter<'a>), Required(Required<'a>), RequiredDictionary(RequiredDictionary<'a>), OptionalDictionary(OptionalDictionary<'a>), @@ -158,7 +141,7 @@ enum State<'a> { impl<'a> utils::PageState<'a> for State<'a> { fn len(&self) -> usize { match self { - State::Optional(state) => state.validity.len(), + State::Optional(validity, _) => validity.len(), State::Required(state) => state.remaining, State::RequiredDictionary(state) => state.remaining, State::OptionalDictionary(state) => state.validity.len(), @@ -210,7 +193,13 @@ impl<'a, O: Offset> utils::Decoder<'a, &'a [u8], Binary> for BinaryDecoder page.descriptor().type_().get_basic_info().repetition() == &Repetition::Optional; match (page.encoding(), page.dictionary_page(), is_optional) { - (Encoding::Plain, None, true) => Ok(State::Optional(Optional::new(page))), + (Encoding::Plain, None, true) => { + let (_, _, values, _) = utils::split_buffer(page, page.descriptor()); + + let values = BinaryIter::new(values); + + Ok(State::Optional(OptionalPageValidity::new(page), values)) + } (Encoding::Plain, None, false) => Ok(State::Required(Required::new(page))), (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false) => { Ok(State::RequiredDictionary(RequiredDictionary::new( @@ -245,12 +234,12 @@ impl<'a, O: Offset> utils::Decoder<'a, &'a [u8], Binary> for BinaryDecoder additional: usize, ) { match state { - State::Optional(page) => extend_from_decoder( + State::Optional(page_validity, page_values) => extend_from_decoder( validity, - &mut page.validity, + page_validity, Some(additional), values, - &mut page.values, + page_values, ), State::Required(page) => { page.remaining -= additional; @@ -275,7 +264,7 @@ impl<'a, O: Offset> utils::Decoder<'a, &'a [u8], Binary> for BinaryDecoder } } -fn finish>( +pub(super) fn finish>( data_type: &DataType, values: Binary, validity: MutableBitmap, diff --git a/src/io/parquet/read/binary/mod.rs b/src/io/parquet/read/binary/mod.rs index 366b50af857..c21f008bc70 100644 --- a/src/io/parquet/read/binary/mod.rs +++ b/src/io/parquet/read/binary/mod.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use crate::{ array::{Array, Offset}, - datatypes::DataType, + datatypes::{DataType, Field}, error::Result, }; @@ -15,7 +15,8 @@ pub use dictionary::iter_to_arrays as iter_to_dict_arrays; use self::basic::TraitBinaryArray; -use super::DataPages; +use self::nested::ArrayIterator; +use super::{nested_utils::NestedState, DataPages}; use basic::BinaryArrayIterator; /// Converts [`DataPages`] to an [`Iterator`] of [`Array`] @@ -34,3 +35,25 @@ where .map(|x| x.map(|x| Arc::new(x) as Arc)), ) } + +/// Converts [`DataPages`] to an [`Iterator`] of [`Array`] +pub fn iter_to_arrays_nested<'a, O, A, I>( + iter: I, + field: Field, + data_type: DataType, + chunk_size: usize, +) -> Box)>> + 'a> +where + I: 'a + DataPages, + A: TraitBinaryArray, + O: Offset, +{ + Box::new( + ArrayIterator::::new(iter, field, data_type, chunk_size).map(|x| { + x.map(|(nested, array)| { + let values = Arc::new(array) as Arc; + (nested, values) + }) + }), + ) +} diff --git a/src/io/parquet/read/binary/nested.rs b/src/io/parquet/read/binary/nested.rs index d0dd6cb26f8..ffe810c55b5 100644 --- a/src/io/parquet/read/binary/nested.rs +++ b/src/io/parquet/read/binary/nested.rs @@ -1,140 +1,144 @@ -use parquet2::{ - encoding::{hybrid_rle::HybridRleDecoder, Encoding}, - metadata::ColumnDescriptor, - page::DataPage, - read::levels::get_bit_width, -}; +use std::collections::VecDeque; -use super::super::utils; -use super::super::utils::Pushable; -use super::{super::nested_utils::*, utils::Binary}; +use parquet2::{encoding::Encoding, page::DataPage, schema::Repetition}; -use crate::{array::Offset, bitmap::MutableBitmap, error::Result}; +use crate::{ + array::Offset, + bitmap::MutableBitmap, + datatypes::{DataType, Field}, + error::Result, + io::parquet::read::{utils::MaybeNext, DataPages}, +}; -fn read_plain_required(buffer: &[u8], additional: usize, values: &mut Binary) { - let values_iterator = utils::BinaryIter::new(buffer); +use super::super::nested_utils::*; +use super::utils::Binary; +use super::{ + super::utils, + basic::{finish, Required, TraitBinaryArray}, +}; - // each value occupies 4 bytes + len declared in 4 bytes => reserve accordingly. - values.offsets.reserve(additional); - values.values.reserve(buffer.len() - 4 * additional); - let a = values.values.capacity(); - for value in values_iterator { - values.push(value); - } - debug_assert_eq!(a, values.values.capacity()); +#[allow(clippy::large_enum_variant)] +#[derive(Debug)] +enum State<'a> { + Optional(Optional<'a>, utils::BinaryIter<'a>), + Required(Required<'a>), } -fn read_values<'a, O, D, G>( - def_levels: D, - max_def: u32, - mut new_values: G, - values: &mut Binary, - validity: &mut MutableBitmap, -) where - O: Offset, - D: Iterator, - G: Iterator, -{ - def_levels.for_each(|def| { - if def == max_def { - let v = new_values.next().unwrap(); - values.push(v); - validity.push(true); - } else if def == max_def - 1 { - values.push(&[]); - validity.push(false); +impl<'a> utils::PageState<'a> for State<'a> { + fn len(&self) -> usize { + match self { + State::Optional(validity, _) => validity.len(), + State::Required(state) => state.remaining, } - }); + } +} + +#[derive(Debug, Default)] +struct BinaryDecoder { + phantom_o: std::marker::PhantomData, } -#[allow(clippy::too_many_arguments)] -fn read( - rep_levels: &[u8], - def_levels: &[u8], - values_buffer: &[u8], - additional: usize, - rep_level_encoding: (&Encoding, i16), - def_level_encoding: (&Encoding, i16), - is_nullable: bool, - nested: &mut Vec>, - values: &mut Binary, - validity: &mut MutableBitmap, -) { - let max_rep_level = rep_level_encoding.1 as u32; - let max_def_level = def_level_encoding.1 as u32; - - match (rep_level_encoding.0, def_level_encoding.0) { - (Encoding::Rle, Encoding::Rle) => { - if is_nullable { - let def_levels = HybridRleDecoder::new( - def_levels, - get_bit_width(def_level_encoding.1), +impl<'a, O: Offset> utils::Decoder<'a, &'a [u8], Binary> for BinaryDecoder { + type State = State<'a>; + + fn build_state(&self, page: &'a DataPage) -> Result { + let is_optional = + page.descriptor().type_().get_basic_info().repetition() == &Repetition::Optional; + + match (page.encoding(), page.dictionary_page(), is_optional) { + (Encoding::Plain, None, true) => { + let (_, _, values, _) = utils::split_buffer(page, page.descriptor()); + + let values = utils::BinaryIter::new(values); + + Ok(State::Optional(Optional::new(page), values)) + } + (Encoding::Plain, None, false) => Ok(State::Required(Required::new(page))), + _ => Err(utils::not_implemented( + &page.encoding(), + is_optional, + false, + "any", + "Binary", + )), + } + } + + fn with_capacity(&self, capacity: usize) -> Binary { + Binary::::with_capacity(capacity) + } + + fn extend_from_state( + state: &mut Self::State, + values: &mut Binary, + validity: &mut MutableBitmap, + additional: usize, + ) { + match state { + State::Optional(page_validity, page_values) => { + let max_def = page_validity.max_def(); + read_optional_values( + page_validity.definition_levels.by_ref(), + max_def, + page_values.by_ref(), + values, + validity, additional, - ); - let new_values = utils::BinaryIter::new(values_buffer); - read_values(def_levels, max_def_level, new_values, values, validity) - } else { - read_plain_required(values_buffer, additional, values) + ) } + State::Required(page) => { + page.remaining -= additional; + for x in page.values.by_ref().take(additional) { + values.push(x) + } + } + } + } +} + +pub struct ArrayIterator, I: DataPages> { + iter: I, + data_type: DataType, + field: Field, + items: VecDeque<(Binary, MutableBitmap)>, + nested: VecDeque, + chunk_size: usize, + phantom_a: std::marker::PhantomData, +} - let rep_levels = - HybridRleDecoder::new(rep_levels, get_bit_width(rep_level_encoding.1), additional); - let def_levels = - HybridRleDecoder::new(def_levels, get_bit_width(def_level_encoding.1), additional); - - extend_offsets( - rep_levels, - def_levels, - is_nullable, - max_rep_level, - max_def_level, - nested, - ) +impl, I: DataPages> ArrayIterator { + pub fn new(iter: I, field: Field, data_type: DataType, chunk_size: usize) -> Self { + Self { + iter, + data_type, + field, + items: VecDeque::new(), + nested: VecDeque::new(), + chunk_size, + phantom_a: Default::default(), } - _ => todo!(), } } -pub(super) fn extend_from_page( - page: &DataPage, - descriptor: &ColumnDescriptor, - is_nullable: bool, - nested: &mut Vec>, - values: &mut Binary, - validity: &mut MutableBitmap, -) -> Result<()> { - let additional = page.num_values(); - - let (rep_levels, def_levels, values_buffer, version) = utils::split_buffer(page, descriptor); - - match (&page.encoding(), page.dictionary_page()) { - (Encoding::Plain, None) => read( - rep_levels, - def_levels, - values_buffer, - additional, - ( - &page.repetition_level_encoding(), - descriptor.max_rep_level(), - ), - ( - &page.definition_level_encoding(), - descriptor.max_def_level(), - ), - is_nullable, - nested, - values, - validity, - ), - _ => { - return Err(utils::not_implemented( - &page.encoding(), - is_nullable, - page.dictionary_page().is_some(), - version, - "primitive", - )) +impl, I: DataPages> Iterator for ArrayIterator { + type Item = Result<(NestedState, A)>; + + fn next(&mut self) -> Option { + let maybe_state = next( + &mut self.iter, + &mut self.items, + &mut self.nested, + &self.field, + self.chunk_size, + &BinaryDecoder::::default(), + ); + match maybe_state { + MaybeNext::Some(Ok((nested, values, validity))) => { + Some(Ok((nested, finish(&self.data_type, values, validity)))) + } + MaybeNext::Some(Err(e)) => Some(Err(e)), + MaybeNext::None => None, + MaybeNext::More => self.next(), } } - Ok(()) } diff --git a/src/io/parquet/read/mod.rs b/src/io/parquet/read/mod.rs index acaedfc8786..280e17fa776 100644 --- a/src/io/parquet/read/mod.rs +++ b/src/io/parquet/read/mod.rs @@ -246,68 +246,67 @@ fn column_datatype(data_type: &DataType, column: usize) -> DataType { } fn page_iter_to_arrays<'a, I: 'a + DataPages>( - iter: I, - metadata: &ColumnChunkMetaData, + pages: I, + type_: &ParquetType, field: Field, chunk_size: usize, ) -> Result>> + 'a>> { use DataType::*; - let type_ = metadata.descriptor().type_(); match field.data_type.to_logical_type() { /*Null => Ok(Box::new(NullArray::from_data( data_type, metadata.num_values() as usize, ))),*/ - Boolean => Ok(boolean::iter_to_arrays(iter, field.data_type, chunk_size)), + Boolean => Ok(boolean::iter_to_arrays(pages, field.data_type, chunk_size)), UInt8 => Ok(primitive::iter_to_arrays( - iter, + pages, field.data_type, chunk_size, read_item, |x: i32| x as u8, )), UInt16 => Ok(primitive::iter_to_arrays( - iter, + pages, field.data_type, chunk_size, read_item, |x: i32| x as u16, )), UInt32 => Ok(primitive::iter_to_arrays( - iter, + pages, field.data_type, chunk_size, read_item, |x: i32| x as u32, )), Int8 => Ok(primitive::iter_to_arrays( - iter, + pages, field.data_type, chunk_size, read_item, |x: i32| x as i8, )), Int16 => Ok(primitive::iter_to_arrays( - iter, + pages, field.data_type, chunk_size, read_item, |x: i32| x as i16, )), Int32 | Date32 | Time32(_) | Interval(IntervalUnit::YearMonth) => Ok( - primitive::iter_to_arrays(iter, field.data_type, chunk_size, read_item, |x: i32| { + primitive::iter_to_arrays(pages, field.data_type, chunk_size, read_item, |x: i32| { x as i32 }), ), - Timestamp(TimeUnit::Nanosecond, None) => match metadata.descriptor().type_() { + Timestamp(TimeUnit::Nanosecond, None) => match type_ { ParquetType::PrimitiveType { physical_type, logical_type, .. } => match (physical_type, logical_type) { (PhysicalType::Int96, _) => Ok(primitive::iter_to_arrays( - iter, + pages, DataType::Timestamp(TimeUnit::Nanosecond, None), chunk_size, read_item, @@ -315,21 +314,21 @@ fn page_iter_to_arrays<'a, I: 'a + DataPages>( )), (_, Some(LogicalType::TIMESTAMP(TimestampType { unit, .. }))) => Ok(match unit { ParquetTimeUnit::MILLIS(_) => primitive::iter_to_arrays( - iter, + pages, field.data_type, chunk_size, read_item, |x: i64| x * 1_000_000, ), ParquetTimeUnit::MICROS(_) => primitive::iter_to_arrays( - iter, + pages, field.data_type, chunk_size, read_item, |x: i64| x * 1_000, ), ParquetTimeUnit::NANOS(_) => primitive::iter_to_arrays( - iter, + pages, field.data_type, chunk_size, read_item, @@ -337,7 +336,7 @@ fn page_iter_to_arrays<'a, I: 'a + DataPages>( ), }), _ => Ok(primitive::iter_to_arrays( - iter, + pages, field.data_type, chunk_size, read_item, @@ -348,21 +347,21 @@ fn page_iter_to_arrays<'a, I: 'a + DataPages>( }, FixedSizeBinary(_) => Ok(Box::new( - fixed_size_binary::BinaryArrayIterator::new(iter, field.data_type, chunk_size) + fixed_size_binary::BinaryArrayIterator::new(pages, field.data_type, chunk_size) .map(|x| x.map(|x| Arc::new(x) as _)), )), Decimal(_, _) => match type_ { ParquetType::PrimitiveType { physical_type, .. } => Ok(match physical_type { PhysicalType::Int32 => primitive::iter_to_arrays( - iter, + pages, field.data_type, chunk_size, read_item, |x: i32| x as i128, ), PhysicalType::Int64 => primitive::iter_to_arrays( - iter, + pages, field.data_type, chunk_size, read_item, @@ -377,13 +376,13 @@ fn page_iter_to_arrays<'a, I: 'a + DataPages>( &PhysicalType::FixedLenByteArray(n) => { let n = n as usize; - let iter = fixed_size_binary::BinaryArrayIterator::new( - iter, + let pages = fixed_size_binary::BinaryArrayIterator::new( + pages, DataType::FixedSizeBinary(n), chunk_size, ); - let iter = iter.map(move |maybe_array| { + let pages = pages.map(move |maybe_array| { let array = maybe_array?; let values = array .values() @@ -407,9 +406,9 @@ fn page_iter_to_arrays<'a, I: 'a + DataPages>( )) }); - let iter = iter.map(|x| x.map(|x| Arc::new(x) as Arc)); + let arrays = pages.map(|x| x.map(|x| Arc::new(x) as Arc)); - Box::new(iter) as _ + Box::new(arrays) as _ } _ => unreachable!(), }), @@ -418,12 +417,12 @@ fn page_iter_to_arrays<'a, I: 'a + DataPages>( // INT64 Int64 | Date64 | Time64(_) | Duration(_) | Timestamp(_, _) => Ok( - primitive::iter_to_arrays(iter, field.data_type, chunk_size, read_item, |x: i64| { + primitive::iter_to_arrays(pages, field.data_type, chunk_size, read_item, |x: i64| { x as i64 }), ), UInt64 => Ok(primitive::iter_to_arrays( - iter, + pages, field.data_type, chunk_size, read_item, @@ -431,14 +430,14 @@ fn page_iter_to_arrays<'a, I: 'a + DataPages>( )), Float32 => Ok(primitive::iter_to_arrays( - iter, + pages, field.data_type, chunk_size, read_item, |x: f32| x, )), Float64 => Ok(primitive::iter_to_arrays( - iter, + pages, field.data_type, chunk_size, read_item, @@ -446,37 +445,34 @@ fn page_iter_to_arrays<'a, I: 'a + DataPages>( )), Binary => Ok(binary::iter_to_arrays::, _>( - iter, + pages, field.data_type, chunk_size, )), LargeBinary => Ok(binary::iter_to_arrays::, _>( - iter, + pages, field.data_type, chunk_size, )), Utf8 => Ok(binary::iter_to_arrays::, _>( - iter, + pages, field.data_type, chunk_size, )), LargeUtf8 => Ok(binary::iter_to_arrays::, _>( - iter, + pages, field.data_type, chunk_size, )), Dictionary(key_type, _, _) => match_integer_type!(key_type, |$K| { - dict_read::<$K, _>(iter, type_, field.data_type, chunk_size) + dict_read::<$K, _>(pages, type_, field.data_type, chunk_size) }), - List(_) => page_iter_to_arrays_nested(iter, field, chunk_size), - /* - LargeList(ref inner) => { - let values = page_iter_to_array(iter, nested, metadata, inner.data_type().clone()); - create_list(data_type, nested, values.into()) + LargeList(inner) | List(inner) => { + let data_type = inner.data_type.clone(); + page_iter_to_arrays_nested(pages, field, data_type, chunk_size) } - */ other => Err(ArrowError::NotYetImplemented(format!( "Reading {:?} from parquet still not implemented", other @@ -509,11 +505,56 @@ fn finish_array(data_type: DataType, arrays: &mut VecDeque>) -> B } fn page_iter_to_arrays_nested<'a, I: 'a + DataPages>( - iter: I, + pages: I, field: Field, + data_type: DataType, chunk_size: usize, ) -> Result>> + 'a>> { - let iter = boolean::iter_to_arrays_nested(iter, field.clone(), chunk_size); + use DataType::*; + let iter = match data_type { + Boolean => boolean::iter_to_arrays_nested(pages, field.clone(), chunk_size), + Int32 => primitive::iter_to_arrays_nested( + pages, + field.clone(), + data_type, + chunk_size, + read_item, + |x: i32| x, + ), + Int64 => primitive::iter_to_arrays_nested( + pages, + field.clone(), + data_type, + chunk_size, + read_item, + |x: i64| x, + ), + Binary => binary::iter_to_arrays_nested::, _>( + pages, + field.clone(), + data_type, + chunk_size, + ), + LargeBinary => binary::iter_to_arrays_nested::, _>( + pages, + field.clone(), + data_type, + chunk_size, + ), + Utf8 => binary::iter_to_arrays_nested::, _>( + pages, + field.clone(), + data_type, + chunk_size, + ), + LargeUtf8 => binary::iter_to_arrays_nested::, _>( + pages, + field.clone(), + data_type, + chunk_size, + ), + _ => todo!(), + }; let iter = iter.map(move |x| { let (mut nested, array) = x?; @@ -524,60 +565,91 @@ fn page_iter_to_arrays_nested<'a, I: 'a + DataPages>( Ok(Box::new(iter)) } -/* -/// Returns an iterator of [`Array`] built from an iterator of column chunks. It also returns -/// the two buffers used to decompress and deserialize pages (to be re-used). -#[allow(clippy::type_complexity)] -pub fn column_iter_to_arrays( - mut columns: I, - field: &Field, - mut buffer: Vec, - chunk_size: usize, -) -> Result<(impl Iterator>, Vec, Vec)> -where - II: Iterator>, - I: ColumnChunkIter, -{ - let mut nested_info = vec![]; - init_nested(field, 0, &mut nested_info); - - let data_type = field.data_type().clone(); - - let mut arrays = VecDeque::new(); - let page_buffer; - let mut column = 0; - loop { - match columns.advance()? { - State::Some(mut new_iter) => { - let data_type = column_datatype(&data_type, column); - if let Some((pages, metadata)) = new_iter.get() { - let mut iterator = BasicDecompressor::new(pages, buffer); - - let array = page_iter_to_arrays( - &mut iterator, - &mut nested_info, - metadata, - data_type, - chunk_size, - )? - .collect::>>()? - .pop() - .unwrap(); - buffer = iterator.into_inner(); - arrays.push_back(array) - } - column += 1; - columns = new_iter; - } - State::Finished(b) => { - page_buffer = b; - break; +struct StructIterator { + iters: Vec>>>>, + fields: Vec, +} + +impl StructIterator { + pub fn new( + iters: Vec>>>>, + fields: Vec, + ) -> Self { + assert_eq!(iters.len(), fields.len()); + Self { iters, fields } + } +} + +impl Iterator for StructIterator { + type Item = Result>; + + fn next(&mut self) -> Option { + let values = self + .iters + .iter_mut() + .map(|iter| iter.next()) + .collect::>>(); + + if values.iter().any(|x| x.is_none()) { + return None; + } + let values = values + .into_iter() + .map(|x| x.unwrap()) + .collect::>>(); + + match values { + Ok(values) => Some(Ok(Arc::new(StructArray::from_data( + DataType::Struct(self.fields.clone()), + values, + None, + )))), + Err(e) => Some(Err(e)), + } + } +} + +fn get_fields(field: &Field) -> Vec { + use crate::datatypes::PhysicalType::*; + match field.data_type.to_physical_type() { + Null | Boolean | Primitive(_) | Binary | FixedSizeBinary | LargeBinary | Utf8 + | Dictionary(_) | LargeUtf8 | List | FixedSizeList | LargeList => { + vec![field.clone()] + } + Struct => { + if let DataType::Struct(fields) = field.data_type.to_logical_type() { + fields.clone() + } else { + unreachable!() } } + _ => todo!(), } +} - let array = finish_array(data_type, &mut arrays); - assert!(arrays.is_empty()); - Ok((array, page_buffer, buffer)) +/// Returns an iterator of [`Array`] built from an iterator of column chunks. +pub fn column_iter_to_arrays<'a, I: 'static>( + columns: Vec, + types: Vec<&ParquetType>, + field: &Field, + chunk_size: usize, +) -> Result>> + 'a>> +where + I: DataPages, +{ + // get fields + let fields = get_fields(field); + + let mut iters = columns + .into_iter() + .zip(types.into_iter()) + .zip(fields.clone().into_iter()) + .map(|((pages, type_), field)| page_iter_to_arrays(pages, type_, field, chunk_size)) + .collect::>>()?; + + Ok(if fields.len() > 1 { + Box::new(StructIterator::new(iters, fields)) + } else { + iters.pop().unwrap() + }) } - */ diff --git a/src/io/parquet/read/nested_utils.rs b/src/io/parquet/read/nested_utils.rs index bd3ba52b9d8..bbb22602aad 100644 --- a/src/io/parquet/read/nested_utils.rs +++ b/src/io/parquet/read/nested_utils.rs @@ -215,77 +215,6 @@ pub(super) fn read_optional_values( } } -pub fn extend_offsets( - rep_levels: R, - def_levels: D, - is_nullable: bool, - max_rep: u32, - max_def: u32, - nested: &mut Vec>, -) where - R: Iterator, - D: Iterator, -{ - let mut values_count = vec![0; nested.len()]; - let mut prev_def: u32 = 0; - let mut is_first = true; - - rep_levels.zip(def_levels).for_each(|(rep, def)| { - let mut closures = max_rep - rep; - if prev_def <= 1 { - closures = 1; - }; - if is_first { - // close on first run to ensure offsets start with 0. - closures = max_rep; - is_first = false; - } - - nested - .iter_mut() - .zip(values_count.iter()) - .enumerate() - .skip(rep as usize) - .take((rep + closures) as usize) - .for_each(|(depth, (nested, length))| { - let is_null = (def - rep) as usize == depth && depth == rep as usize; - nested.push(*length, !is_null); - }); - - values_count - .iter_mut() - .enumerate() - .for_each(|(depth, values)| { - if depth == 1 { - if def == max_def || (is_nullable && def == max_def - 1) { - *values += 1 - } - } else if depth == 0 { - let a = nested - .get(depth + 1) - .map(|x| x.is_nullable()) - .unwrap_or_default(); // todo: cumsum this - let condition = rep == 1 - || rep == 0 - && def >= max_def.saturating_sub((a as u32) + (is_nullable as u32)); - - if condition { - *values += 1; - } - } - }); - prev_def = def; - }); - - // close validities - nested - .iter_mut() - .zip(values_count.iter()) - .for_each(|(nested, length)| { - nested.close(*length); - }); -} - fn init_nested_recursive(field: &Field, capacity: usize, container: &mut Vec>) { let is_nullable = field.is_nullable; @@ -417,11 +346,6 @@ impl NestedState { self.nested[0].num_values() } - /// Whether the primitive is optional - pub fn is_optional(&self) -> bool { - self.nested.last().unwrap().is_nullable() - } - pub fn depth(&self) -> usize { // outermost is the number of rows self.nested.len() @@ -526,12 +450,22 @@ pub fn extend_offsets1<'a>( } fn extend_offsets2<'a>(page: &mut NestedPage<'a>, nested: &mut NestedState, additional: usize) { - let is_optional = nested.is_optional(); let mut values_count = vec![0; nested.depth()]; let mut prev_def: u32 = 0; let mut is_first = true; - let max_def = page.max_def_level; + let mut def_threshold = page.max_def_level; + let thres = nested + .nested + .iter() + .rev() + .map(|nested| { + let is_nullable = nested.is_nullable(); + def_threshold -= is_nullable as u32; + def_threshold + }) + .collect::>(); + let max_rep = page.max_rep_level; let mut iter = page.repetitions.by_ref().zip(page.definitions.by_ref()); @@ -568,25 +502,15 @@ fn extend_offsets2<'a>(page: &mut NestedPage<'a>, nested: &mut NestedState, addi values_count .iter_mut() + .zip(thres.iter()) .enumerate() - .for_each(|(depth, values)| { + .for_each(|(depth, (values, thre))| { if depth == 1 { - if def == max_def || (is_optional && def == max_def - 1) { + if def >= *thre { *values += 1 } - } else if depth == 0 { - let a = nested - .nested - .get(depth + 1) - .map(|x| x.is_nullable()) - .unwrap_or_default(); // todo: cumsum this - let condition = rep == 1 - || rep == 0 - && def >= max_def.saturating_sub((a as u32) + (!is_optional as u32)); - - if condition { - *values += 1; - } + } else if depth == 0 && def >= *thre { + *values += 1; } }); prev_def = def; @@ -611,7 +535,7 @@ pub struct Optional<'a> { impl<'a> Optional<'a> { pub fn new(page: &'a DataPage) -> Self { - let (_, def_levels, values_buffer, _) = split_buffer(page, page.descriptor()); + let (_, def_levels, _, _) = split_buffer(page, page.descriptor()); let max_def = page.descriptor().max_def_level(); diff --git a/src/io/parquet/read/primitive/basic.rs b/src/io/parquet/read/primitive/basic.rs index d151da71fbe..0755d399667 100644 --- a/src/io/parquet/read/primitive/basic.rs +++ b/src/io/parquet/read/primitive/basic.rs @@ -17,14 +17,14 @@ use super::super::utils::OptionalPageValidity; use super::super::DataPages; #[derive(Debug)] -struct Values<'a, T, P, G, F> +pub(super) struct Values<'a, T, P, G, F> where T: NativeType, P: ParquetNativeType, G: for<'b> Fn(&'b [u8]) -> P, F: Fn(P) -> T, { - values: std::iter::Map, G>, F>, + pub values: std::iter::Map, G>, F>, phantom: std::marker::PhantomData

, } @@ -35,9 +35,9 @@ where G: for<'b> Fn(&'b [u8]) -> P, F: Fn(P) -> T, { - fn new(page: &'a DataPage, op1: G, op2: F) -> Self { + pub fn new(page: &'a DataPage, op1: G, op2: F) -> Self { let (_, _, values, _) = utils::split_buffer(page, page.descriptor()); - assert_eq!(values.len(), page.num_values() * std::mem::size_of::()); + assert_eq!(values.len() % std::mem::size_of::

, + op1: G, + op2: F, } -#[allow(clippy::too_many_arguments)] -fn read( - rep_levels: &[u8], - def_levels: &[u8], - values_buffer: &[u8], - additional: usize, - rep_level_encoding: (&Encoding, i16), - def_level_encoding: (&Encoding, i16), - is_nullable: bool, - nested: &mut Vec>, - values: &mut Vec, - validity: &mut MutableBitmap, - op: F, -) where +impl<'a, T, P, G, F> PrimitiveDecoder +where T: NativeType, - A: ArrowNativeType, - F: Fn(T) -> A, + P: ParquetNativeType, + G: for<'b> Fn(&'b [u8]) -> P, + F: Fn(P) -> T, { - let new_values = chunks(values_buffer); - - let max_rep_level = rep_level_encoding.1 as u32; - let max_def_level = def_level_encoding.1 as u32; - - match (rep_level_encoding.0, def_level_encoding.0) { - (Encoding::Rle, Encoding::Rle) => { - let rep_levels = - HybridRleDecoder::new(rep_levels, get_bit_width(rep_level_encoding.1), additional); - if is_nullable { - let def_levels = HybridRleDecoder::new( - def_levels, - get_bit_width(def_level_encoding.1), - additional, - ); - read_values(def_levels, max_def_level, new_values, op, values, validity) - } else { - read_values_required(new_values, op, values) + #[inline] + fn new(op1: G, op2: F) -> Self { + Self { + phantom: std::marker::PhantomData, + phantom_p: std::marker::PhantomData, + op1, + op2, + } + } +} + +impl<'a, T, P, G, F> utils::Decoder<'a, T, Vec> for PrimitiveDecoder +where + T: NativeType, + P: ParquetNativeType, + G: Copy + for<'b> Fn(&'b [u8]) -> P, + F: Copy + Fn(P) -> T, +{ + type State = State<'a, T, P, G, F>; + + fn build_state(&self, page: &'a DataPage) -> Result { + let is_optional = + page.descriptor().type_().get_basic_info().repetition() == &Repetition::Optional; + + match (page.encoding(), page.dictionary_page(), is_optional) { + /*(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false) => { + todo!() + } + (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true) => { + let dict = dict.as_any().downcast_ref().unwrap(); + Ok(State::OptionalDictionary(OptionalDictionaryPage::new( + page, dict, self.op2, + ))) + }*/ + (Encoding::Plain, None, true) => Ok(State::Optional( + Optional::new(page), + Values::new(page, self.op1, self.op2), + )), + (Encoding::Plain, None, false) => { + Ok(State::Required(Values::new(page, self.op1, self.op2))) } + _ => Err(utils::not_implemented( + &page.encoding(), + is_optional, + false, + "any", + "Primitive", + )), + } + } - let def_levels = - HybridRleDecoder::new(def_levels, get_bit_width(def_level_encoding.1), additional); - - extend_offsets( - rep_levels, - def_levels, - is_nullable, - max_rep_level, - max_def_level, - nested, - ) + fn with_capacity(&self, capacity: usize) -> Vec { + Vec::::with_capacity(capacity) + } + + fn extend_from_state( + state: &mut Self::State, + values: &mut Vec, + validity: &mut MutableBitmap, + remaining: usize, + ) { + match state { + State::Optional(page_validity, page_values) => { + let max_def = page_validity.max_def(); + read_optional_values( + page_validity.definition_levels.by_ref(), + max_def, + page_values.values.by_ref(), + values, + validity, + remaining, + ) + } + State::Required(page) => { + values.extend(page.values.by_ref().take(remaining)); + } + //State::OptionalDictionary(page) => todo!(), + //State::RequiredDictionary(page) => todo!(), } - _ => todo!(), } } -pub fn extend_from_page( - page: &DataPage, - descriptor: &ColumnDescriptor, - is_nullable: bool, - nested: &mut Vec>, - values: &mut Vec, - validity: &mut MutableBitmap, - op: F, -) -> Result<()> +fn finish( + data_type: &DataType, + values: Vec, + validity: MutableBitmap, +) -> PrimitiveArray { + PrimitiveArray::from_data(data_type.clone(), values.into(), validity.into()) +} + +/// An iterator adapter over [`DataPages`] assumed to be encoded as boolean arrays +#[derive(Debug)] +pub struct ArrayIterator where + I: DataPages, T: NativeType, - A: ArrowNativeType, - F: Fn(T) -> A, + + P: ParquetNativeType, + G: Copy + for<'b> Fn(&'b [u8]) -> P, + F: Copy + Fn(P) -> T, { - let additional = page.num_values(); - - let (rep_levels, def_levels, values_buffer, version) = utils::split_buffer(page, descriptor); - - match (&page.encoding(), page.dictionary_page()) { - (Encoding::Plain, None) => read( - rep_levels, - def_levels, - values_buffer, - additional, - ( - &page.repetition_level_encoding(), - descriptor.max_rep_level(), - ), - ( - &page.definition_level_encoding(), - descriptor.max_def_level(), - ), - is_nullable, - nested, - values, - validity, - op, - ), - _ => { - return Err(utils::not_implemented( - &page.encoding(), - is_nullable, - page.dictionary_page().is_some(), - version, - "primitive", - )) + iter: I, + field: Field, + data_type: DataType, + // invariant: items.len() == nested.len() + items: VecDeque<(Vec, MutableBitmap)>, + nested: VecDeque, + chunk_size: usize, + decoder: PrimitiveDecoder, +} + +impl ArrayIterator +where + I: DataPages, + T: NativeType, + + P: ParquetNativeType, + G: Copy + for<'b> Fn(&'b [u8]) -> P, + F: Copy + Fn(P) -> T, +{ + pub fn new( + iter: I, + field: Field, + data_type: DataType, + chunk_size: usize, + op1: G, + op2: F, + ) -> Self { + Self { + iter, + field, + data_type, + items: VecDeque::new(), + nested: VecDeque::new(), + chunk_size, + decoder: PrimitiveDecoder::new(op1, op2), + } + } +} + +impl Iterator for ArrayIterator +where + I: DataPages, + T: NativeType, + + P: ParquetNativeType, + G: Copy + for<'b> Fn(&'b [u8]) -> P, + F: Copy + Fn(P) -> T, +{ + type Item = Result<(NestedState, PrimitiveArray)>; + + fn next(&mut self) -> Option { + let maybe_state = next( + &mut self.iter, + &mut self.items, + &mut self.nested, + &self.field, + self.chunk_size, + &self.decoder, + ); + match maybe_state { + MaybeNext::Some(Ok((nested, values, validity))) => { + Some(Ok((nested, finish(&self.data_type, values, validity)))) + } + MaybeNext::Some(Err(e)) => Some(Err(e)), + MaybeNext::None => None, + MaybeNext::More => self.next(), } } - Ok(()) } diff --git a/src/io/parquet/read/primitive/utils.rs b/src/io/parquet/read/primitive/utils.rs index f7c3d6d6b50..97dac6f5489 100644 --- a/src/io/parquet/read/primitive/utils.rs +++ b/src/io/parquet/read/primitive/utils.rs @@ -10,10 +10,3 @@ pub fn read_item(chunk: &[u8]) -> T { }; T::from_le_bytes(chunk) } - -#[inline] -pub fn chunks(bytes: &[u8]) -> impl Iterator + '_ { - assert_eq!(bytes.len() % std::mem::size_of::(), 0); - let chunks = bytes.chunks_exact(std::mem::size_of::()); - chunks.map(read_item) -} diff --git a/src/io/parquet/read/row_group.rs b/src/io/parquet/read/row_group.rs index 1f58bcf0389..1de42cce250 100644 --- a/src/io/parquet/read/row_group.rs +++ b/src/io/parquet/read/row_group.rs @@ -11,7 +11,7 @@ use parquet2::{ use crate::{ array::Array, chunk::Chunk, datatypes::Field, error::Result, - io::parquet::read::page_iter_to_arrays, + io::parquet::read::column_iter_to_arrays, }; use super::RowGroupMetaData; @@ -23,66 +23,77 @@ pub struct RowGroupReader { fn get_field_columns<'a>( row_group: &'a RowGroupMetaData, - field: &ParquetType, + field_name: &str, ) -> Vec<&'a ColumnChunkMetaData> { row_group .columns() .iter() .enumerate() - .filter(|x| x.1.descriptor().path_in_schema()[0] == field.name()) + .filter(|x| x.1.descriptor().path_in_schema()[0] == field_name) .map(|x| x.1) .collect() } -pub(super) fn get_iterators( +/// Reads all columns that are part of the parquet field `field_name` +pub fn read_columns<'a, R: Read + Seek>( + reader: &mut R, + row_group: &'a RowGroupMetaData, + field_name: &str, +) -> Result)>> { + get_field_columns(row_group, field_name) + .into_iter() + .map(|meta| { + let (start, len) = meta.byte_range(); + reader.seek(std::io::SeekFrom::Start(start))?; + let mut chunk = vec![0; len as usize]; + reader.read_exact(&mut chunk)?; + Ok((meta, chunk)) + }) + .collect() +} + +pub(super) fn get_iterators<'a, R: Read + Seek>( reader: &mut R, parquet_fields: &[ParquetType], row_group: &RowGroupMetaData, fields: Vec, chunk_size: Option, -) -> Result>>>>> { +) -> Result>> + 'a>>> { + let chunk_size = chunk_size + .unwrap_or(usize::MAX) + .min(row_group.num_rows() as usize); + // reads all the necessary columns for all fields from the row group // This operation is IO-bounded `O(C)` where C is the number of columns in the row group - fields + let columns = parquet_fields .iter() - .zip(parquet_fields.iter()) - .map(|(field, parquet_field)| { - let chunks = get_field_columns(row_group, parquet_field) - .into_iter() - .map(|meta| { - let (start, len) = meta.byte_range(); - reader.seek(std::io::SeekFrom::Start(start))?; - let mut chunk = vec![0; len as usize]; - reader.read_exact(&mut chunk)?; - Ok((meta, chunk)) - }); + .map(|parquet_field| read_columns(reader, row_group, parquet_field.name())) + .collect::>>()?; - chunks - .map(|x| { - x.and_then(|(column_meta, chunk)| { - let pages = PageIterator::new( - std::io::Cursor::new(chunk), - column_meta.num_values(), - column_meta.compression(), - column_meta.descriptor().clone(), - Arc::new(|_, _| true), - vec![], - ); - let pages = BasicDecompressor::new(pages, vec![]); - page_iter_to_arrays( - pages, - column_meta, - field.clone(), - chunk_size - .unwrap_or(usize::MAX) - .min(row_group.num_rows() as usize), - ) - }) + columns + .into_iter() + .map(|columns| { + let (pages, types): (Vec<_>, Vec<_>) = columns + .into_iter() + .map(|(column_meta, chunk)| { + let pages = PageIterator::new( + std::io::Cursor::new(chunk), + column_meta.num_values(), + column_meta.compression(), + column_meta.descriptor().clone(), + Arc::new(|_, _| true), + vec![], + ); + ( + BasicDecompressor::new(pages, vec![]), + column_meta.descriptor().type_(), + ) }) - // todo: generalize for struct type - .next() - .unwrap() + .unzip(); + (pages, types) }) + .zip(fields.into_iter()) + .map(|((columns, types), field)| column_iter_to_arrays(columns, types, &field, chunk_size)) .collect() } diff --git a/src/io/parquet/read/utils.rs b/src/io/parquet/read/utils.rs index ae7dbf50ac7..e48d893b75a 100644 --- a/src/io/parquet/read/utils.rs +++ b/src/io/parquet/read/utils.rs @@ -12,6 +12,7 @@ use crate::error::ArrowError; use super::DataPages; +#[derive(Debug)] pub struct BinaryIter<'a> { values: &'a [u8], }

(), 0); Self { phantom: Default::default(), values: values @@ -46,6 +46,11 @@ where .map(op2), } } + + #[inline] + pub fn len(&self) -> usize { + self.values.size_hint().0 + } } #[inline] @@ -70,7 +75,7 @@ where } #[derive(Debug)] -struct ValuesDictionary<'a, T, P, F> +pub(super) struct ValuesDictionary<'a, T, P, F> where T: NativeType, P: ParquetNativeType, @@ -100,6 +105,11 @@ where values, } } + + #[inline] + pub fn len(&self) -> usize { + self.values.size_hint().0 + } } // The state of a `DataPage` of `Primitive` parquet primitive type @@ -127,8 +137,8 @@ where fn len(&self) -> usize { match self { State::Optional(optional, _) => optional.len(), - State::Required(values) => values.values.size_hint().0, - State::RequiredDictionary(values) => values.values.size_hint().0, + State::Required(values) => values.len(), + State::RequiredDictionary(values) => values.len(), State::OptionalDictionary(optional, _) => optional.len(), } } diff --git a/src/io/parquet/read/primitive/mod.rs b/src/io/parquet/read/primitive/mod.rs index ad00e48c6e2..ad61079eb0f 100644 --- a/src/io/parquet/read/primitive/mod.rs +++ b/src/io/parquet/read/primitive/mod.rs @@ -5,13 +5,18 @@ mod utils; use std::sync::Arc; -use super::ColumnDescriptor; use super::{nested_utils::*, DataPages}; -use crate::{array::Array, datatypes::DataType, error::Result}; +use crate::{ + array::Array, + datatypes::{DataType, Field}, + error::Result, +}; use basic::PrimitiveArrayIterator; +use nested::ArrayIterator; pub use dictionary::iter_to_arrays as iter_to_dict_arrays; +pub use utils::read_item; /// Converts [`DataPages`] to an [`Iterator`] of [`Array`] pub fn iter_to_arrays<'a, I, T, P, G, F>( @@ -34,4 +39,30 @@ where ) } -pub use utils::read_item; +/// Converts [`DataPages`] to an [`Iterator`] of [`Array`] +pub fn iter_to_arrays_nested<'a, I, T, P, G, F>( + iter: I, + field: Field, + data_type: DataType, + chunk_size: usize, + op1: G, + op2: F, +) -> Box)>> + 'a> +where + I: 'a + DataPages, + T: crate::types::NativeType, + P: parquet2::types::NativeType, + G: 'a + Copy + for<'b> Fn(&'b [u8]) -> P, + F: 'a + Copy + Fn(P) -> T, +{ + Box::new( + ArrayIterator::::new(iter, field, data_type, chunk_size, op1, op2).map( + |x| { + x.map(|(nested, array)| { + let values = Arc::new(array) as Arc; + (nested, values) + }) + }, + ), + ) +} diff --git a/src/io/parquet/read/primitive/nested.rs b/src/io/parquet/read/primitive/nested.rs index fa7fe704684..442e55a18c7 100644 --- a/src/io/parquet/read/primitive/nested.rs +++ b/src/io/parquet/read/primitive/nested.rs @@ -1,151 +1,244 @@ +use std::collections::VecDeque; + use parquet2::{ - encoding::{hybrid_rle::HybridRleDecoder, Encoding}, - page::DataPage, - read::levels::get_bit_width, + encoding::Encoding, page::DataPage, schema::Repetition, types::NativeType as ParquetNativeType, +}; + +use crate::{ + array::PrimitiveArray, + bitmap::MutableBitmap, + datatypes::{DataType, Field}, + error::Result, + io::parquet::read::utils::MaybeNext, types::NativeType, }; -use super::super::nested_utils::extend_offsets; -use super::ColumnDescriptor; -use super::{super::utils, utils::chunks, Nested}; -use crate::{bitmap::MutableBitmap, error::Result, types::NativeType as ArrowNativeType}; - -fn read_values( - def_levels: D, - max_def: u32, - mut new_values: G, - op: F, - values: &mut Vec, - validity: &mut MutableBitmap, -) where +use super::super::nested_utils::*; +use super::super::utils; +use super::super::DataPages; +use super::basic::Values; + +// The state of a `DataPage` of `Primitive` parquet primitive type +#[allow(clippy::large_enum_variant)] +#[derive(Debug)] +enum State<'a, T, P, G, F> +where T: NativeType, - D: Iterator, - G: Iterator, - A: ArrowNativeType, - F: Fn(T) -> A, + P: ParquetNativeType, + G: Copy + for<'b> Fn(&'b [u8]) -> P, + F: Copy + Fn(P) -> T, { - def_levels.for_each(|def| { - if def == max_def { - values.push(op(new_values.next().unwrap())); - validity.push(true); - } else if def == max_def - 1 { - values.push(A::default()); - validity.push(false); + Optional(Optional<'a>, Values<'a, T, P, G, F>), + Required(Values<'a, T, P, G, F>), + //RequiredDictionary(ValuesDictionary<'a, T, P, F>), + //OptionalDictionary(Optional<'a>, ValuesDictionary<'a, T, P, F>), +} + +impl<'a, T, P, G, F> utils::PageState<'a> for State<'a, T, P, G, F> +where + T: NativeType, + P: ParquetNativeType, + G: Copy + for<'b> Fn(&'b [u8]) -> P, + F: Copy + Fn(P) -> T, +{ + fn len(&self) -> usize { + match self { + State::Optional(optional, _) => optional.len(), + State::Required(required) => required.len(), + //State::RequiredDictionary(required) => required.len(), + //State::OptionalDictionary(optional, _) => optional.len(), } - }); + } } -fn read_values_required(new_values: G, op: F, values: &mut Vec) +#[derive(Debug)] +struct PrimitiveDecoder where T: NativeType, - G: Iterator, - A: ArrowNativeType, - F: Fn(T) -> A, + P: ParquetNativeType, + G: for<'b> Fn(&'b [u8]) -> P, + F: Fn(P) -> T, { - values.extend(new_values.map(op)); + phantom: std::marker::PhantomData, + phantom_p: std::marker::PhantomData