Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Fixed error in reading chunked parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Jun 27, 2022
1 parent 88f05bb commit 3a025da
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 7 deletions.
12 changes: 6 additions & 6 deletions src/io/parquet/read/deserialize/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ impl<'a> OptionalPageValidity<'a> {
}
}

/// Number of items remaining
pub fn len(&self) -> usize {
self.iter.len()
+ self
Expand Down Expand Up @@ -300,10 +301,9 @@ pub(super) fn extend_from_decoder<'a, T: Default, P: Pushable<T>, I: Iterator<It
) {
let limit = limit.unwrap_or(usize::MAX);

// todo: remove `consumed_here` and compute next limit from `consumed`
let mut consumed_here = 0;
while consumed_here < limit {
let run = page_validity.next_limited(limit);
let mut remaining = limit;
while remaining > 0 {
let run = page_validity.next_limited(remaining);
let run = if let Some(run) = run { run } else { break };

match run {
Expand All @@ -325,7 +325,7 @@ pub(super) fn extend_from_decoder<'a, T: Default, P: Pushable<T>, I: Iterator<It
}
validity.extend_from_slice(values, offset, length);

consumed_here += length;
remaining -= length;
}
FilteredHybridEncoded::Repeated { is_set, length } => {
validity.extend_constant(length, is_set);
Expand All @@ -335,7 +335,7 @@ pub(super) fn extend_from_decoder<'a, T: Default, P: Pushable<T>, I: Iterator<It
pushable.extend_constant(length, T::default());
}

consumed_here += length;
remaining -= length;
}
FilteredHybridEncoded::Skipped(valids) => for _ in values_iter.by_ref().take(valids) {},
};
Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/read/row_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl Iterator for RowGroupDeserializer {
})
})
.collect::<Result<Vec<_>>>()
.map(Chunk::new);
.and_then(Chunk::try_new);
self.remaining_rows = self.remaining_rows.saturating_sub(
chunk
.as_ref()
Expand Down

0 comments on commit 3a025da

Please sign in to comment.