Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Fixed reading parquet binary dict page #791

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions src/io/parquet/read/binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,14 @@ fn read_dict_buffer<O: Offset>(
values: &mut Binary<O>,
validity: &mut MutableBitmap,
) {
let length = values.len() + additional;

let values_iterator = values_iter(indices_buffer, dict, additional);

let mut validity_iterator = hybrid_rle::Decoder::new(validity_buffer, 1);

extend_from_decoder(
validity,
&mut validity_iterator,
length,
additional,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parameter mixup; extend_from_decoder expects the additional length, but this was passing total length.

values,
values_iterator,
);
Expand All @@ -69,12 +67,11 @@ fn read_dict_required<O: Offset>(
values: &mut Binary<O>,
validity: &mut MutableBitmap,
) {
debug_assert_eq!(0, validity.len());
let values_iterator = values_iter(indices_buffer, dict, additional);

for value in values_iterator {
values.push(value);
}
validity.extend_constant(additional, true);
}

struct Offsets<'a, O: Offset>(pub &'a mut Vec<O>);
Expand Down Expand Up @@ -192,7 +189,7 @@ pub(super) fn extend_from_page<O: Offset>(

let (_, validity_buffer, values_buffer, version) = utils::split_buffer(page, descriptor);

match (&page.encoding(), page.dictionary_page(), is_optional) {
match dbg!((&page.encoding(), page.dictionary_page(), is_optional)) {
danburkert marked this conversation as resolved.
Show resolved Hide resolved
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true) => {
read_dict_buffer::<O>(
validity_buffer,
Expand Down
7 changes: 4 additions & 3 deletions src/io/parquet/read/binary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,10 @@ where
ArrowError: From<E>,
I: FallibleStreamingIterator<Item = DataPage, Error = E>,
{
let is_nullable = nested.pop().unwrap().is_nullable();
let capacity = metadata.num_values() as usize;
let mut values = Binary::<O>::with_capacity(capacity);
let mut validity = MutableBitmap::with_capacity(capacity);

let is_nullable = nested.pop().unwrap().is_nullable();
let mut validity = MutableBitmap::with_capacity(capacity * usize::from(is_nullable));

if nested.is_empty() {
while let Some(page) = iter.next()? {
Expand All @@ -53,6 +52,8 @@ where
)?
}
}
debug_assert_eq!(values.len(), capacity);
debug_assert_eq!(validity.len(), capacity * usize::from(is_nullable));
Ok(utils::finish_array(data_type, values, validity))
}

Expand Down
7 changes: 4 additions & 3 deletions src/io/parquet/read/boolean/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,10 @@ where
ArrowError: From<E>,
I: FallibleStreamingIterator<Item = DataPage, Error = E>,
{
let is_nullable = nested.pop().unwrap().is_nullable();
let capacity = metadata.num_values() as usize;
let mut values = MutableBitmap::with_capacity(capacity);
let mut validity = MutableBitmap::with_capacity(capacity);

let is_nullable = nested.pop().unwrap().is_nullable();
let mut validity = MutableBitmap::with_capacity(capacity * usize::from(is_nullable));

if nested.is_empty() {
while let Some(page) = iter.next()? {
Expand All @@ -46,6 +45,8 @@ where
)?
}
}
debug_assert_eq!(values.len(), capacity);
debug_assert_eq!(validity.len(), capacity * usize::from(is_nullable));

Ok(Box::new(BooleanArray::from_data(
data_type,
Expand Down
11 changes: 6 additions & 5 deletions src/io/parquet/read/fixed_size_binary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,12 @@ pub(crate) fn read_dict_required(
values: &mut FixedSizeBinary,
validity: &mut MutableBitmap,
) {
let size = values.size;
debug_assert!(validity.is_empty());

let values_iter = values_iter(indices_buffer, dict.values(), values.size, additional);

for value in values_iter {
values.push(value);
}
validity.extend_constant(additional * size, true);
}

pub(crate) fn read_optional(
Expand Down Expand Up @@ -114,14 +112,17 @@ where
ArrowError: From<E>,
I: FallibleStreamingIterator<Item = DataPage, Error = E>,
{
let is_nullable = metadata.descriptor().max_def_level() == 1;
let size = FixedSizeBinaryArray::get_size(&data_type);

let capacity = metadata.num_values() as usize;
let mut values = FixedSizeBinary::with_capacity(capacity, size);
let mut validity = MutableBitmap::with_capacity(capacity);
let mut validity = MutableBitmap::with_capacity(capacity * usize::from(is_nullable));

while let Some(page) = iter.next()? {
extend_from_page(page, metadata.descriptor(), &mut values, &mut validity)?
}
debug_assert_eq!(values.len(), capacity);
debug_assert_eq!(validity.len(), capacity * usize::from(is_nullable));

Ok(FixedSizeBinaryArray::from_data(
data_type,
Expand Down
5 changes: 5 additions & 0 deletions src/io/parquet/read/fixed_size_binary/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ impl FixedSizeBinary {
self.values
.resize(self.values.len() + additional * self.size, 0);
}

#[inline]
pub fn len(&mut self) -> usize {
self.values.len() / self.size
}
}

impl Pushable<&[u8]> for FixedSizeBinary {
Expand Down
10 changes: 3 additions & 7 deletions src/io/parquet/read/primitive/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,17 @@ fn read_dict_buffer_optional<T, A, F>(
A: ArrowNativeType,
F: Fn(T) -> A,
{
let length = additional + values.len();

let values_iterator = values_iter(indices_buffer, dict.values(), additional, op);

let mut validity_iterator = hybrid_rle::Decoder::new(validity_buffer, 1);

extend_from_decoder(
validity,
&mut validity_iterator,
length,
additional,
values,
values_iterator,
)
);
}

fn read_dict_buffer_required<T, A, F>(
Expand All @@ -73,11 +71,9 @@ fn read_dict_buffer_required<T, A, F>(
A: ArrowNativeType,
F: Fn(T) -> A,
{
debug_assert_eq!(0, validity.len());
let values_iterator = values_iter(indices_buffer, dict.values(), additional, op);

values.extend(values_iterator);

validity.extend_constant(additional, true);
}

fn read_nullable<T, A, F>(
Expand Down
7 changes: 4 additions & 3 deletions src/io/parquet/read/primitive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,10 @@ where
F: Copy + Fn(T) -> A,
I: FallibleStreamingIterator<Item = DataPage, Error = E>,
{
let is_nullable = nested.pop().unwrap().is_nullable();
let capacity = metadata.num_values() as usize;
let mut values = Vec::<A>::with_capacity(capacity);
let mut validity = MutableBitmap::with_capacity(capacity);

let is_nullable = nested.pop().unwrap().is_nullable();
let mut validity = MutableBitmap::with_capacity(capacity * usize::from(is_nullable));

if nested.is_empty() {
while let Some(page) = iter.next()? {
Expand All @@ -98,6 +97,8 @@ where
)?
}
}
debug_assert_eq!(values.len(), capacity);
debug_assert_eq!(validity.len(), capacity * usize::from(is_nullable));

let data_type = match data_type {
DataType::Dictionary(_, values, _) => values.as_ref().clone(),
Expand Down
17 changes: 9 additions & 8 deletions src/io/parquet/read/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,16 +117,13 @@ pub(super) fn extend_from_decoder<'a, T: Default, C: Pushable<T>, I: Iterator<It
values: &mut C,
mut values_iter: I,
) {
let remaining = page_length;
let mut remaining = page_length;
for run in decoder {
match run {
hybrid_rle::HybridEncoded::Bitpacked(pack) => {
// compute the length of the pack
let pack_size = pack.len() * 8;
let pack_remaining = page_length;
let length = std::cmp::min(pack_size, pack_remaining);

let additional = remaining.min(length);
let additional = pack_size.min(remaining);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the full page length instead of the remaining length is one of the bugs.


// extend validity
validity.extend_from_slice(pack, 0, additional);
Expand All @@ -140,13 +137,13 @@ pub(super) fn extend_from_decoder<'a, T: Default, C: Pushable<T>, I: Iterator<It
values.push_null()
};
}

remaining -= additional;
}
hybrid_rle::HybridEncoded::Rle(value, length) => {
hybrid_rle::HybridEncoded::Rle(value, additional) => {
let is_set = value[0] == 1;

// extend validity
let length = length;
let additional = remaining.min(length);
validity.extend_constant(additional, is_set);

// extend values
Expand All @@ -155,9 +152,13 @@ pub(super) fn extend_from_decoder<'a, T: Default, C: Pushable<T>, I: Iterator<It
} else {
values.extend_constant(additional, T::default());
}

remaining -= additional;
}
}
}

debug_assert_eq!(remaining, 0);
}

pub(super) fn read_dict_optional<K>(
Expand Down