Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Bumped parquet2
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Aug 17, 2022
1 parent 7396e7e commit 99a91b2
Show file tree
Hide file tree
Showing 16 changed files with 133 additions and 85 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ futures = { version = "0.3", optional = true }
async-stream = { version = "0.3.2", optional = true }

# parquet support
parquet2 = { version = "0.15.0", optional = true, default_features = false, features = ["async"] }
parquet2 = { version = "0.16", optional = true, default_features = false, features = ["async"] }

# avro support
avro-schema = { version = "0.3", optional = true }
Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@ impl From<parquet2::error::Error> for Error {

impl From<Error> for parquet2::error::Error {
fn from(error: Error) -> Self {
parquet2::error::Error::General(error.to_string())
parquet2::error::Error::OutOfSpec(error.to_string())
}
}
37 changes: 24 additions & 13 deletions src/io/parquet/read/deserialize/binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::{
bitmap::{Bitmap, MutableBitmap},
buffer::Buffer,
datatypes::DataType,
error::Result,
error::{Error, Result},
};

use super::super::utils::{
Expand Down Expand Up @@ -51,13 +51,13 @@ impl<'a> Delta<'a> {
pub fn try_new(page: &'a DataPage) -> Result<Self> {
let (_, _, values) = split_buffer(page)?;

let mut lengths_iter = delta_length_byte_array::Decoder::new(values);
let mut lengths_iter = delta_length_byte_array::Decoder::try_new(values)?;

#[allow(clippy::needless_collect)] // we need to consume it to get the values
let lengths = lengths_iter
.by_ref()
.map(|x| x as usize)
.collect::<Vec<_>>();
.map(|x| x.map(|x| x as usize).map_err(Error::from))
.collect::<Result<Vec<_>>>()?;

let values = lengths_iter.into_values();
Ok(Self {
Expand Down Expand Up @@ -405,20 +405,26 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
}
State::OptionalDictionary(page_validity, page_values) => {
let page_dict = &page_values.dict;
let op = move |index: u32| page_dict[index as usize].as_ref();
utils::extend_from_decoder(
validity,
page_validity,
Some(additional),
values,
&mut page_values.values.by_ref().map(op),
&mut page_values
.values
.by_ref()
.map(|index| page_dict[index.unwrap() as usize].as_ref()),
)
}
State::RequiredDictionary(page) => {
let page_dict = &page.dict;
let op = move |index: u32| page_dict[index as usize].as_ref();

for x in page.values.by_ref().map(op).take(additional) {
for x in page
.values
.by_ref()
.map(|index| page_dict[index.unwrap() as usize].as_ref())
.take(additional)
{
values.push(x)
}
}
Expand All @@ -442,21 +448,26 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
}
State::FilteredRequiredDictionary(page) => {
let page_dict = &page.dict;
let op = move |index: u32| page_dict[index as usize].as_ref();

for x in page.values.by_ref().map(op).take(additional) {
for x in page
.values
.by_ref()
.map(|index| page_dict[index.unwrap() as usize].as_ref())
.take(additional)
{
values.push(x)
}
}
State::FilteredOptionalDictionary(page_validity, page_values) => {
let page_dict = &page_values.dict;
let op = move |index: u32| page_dict[index as usize].as_ref();
utils::extend_from_decoder(
validity,
page_validity,
Some(additional),
values,
&mut page_values.values.by_ref().map(op),
&mut page_values
.values
.by_ref()
.map(|index| page_dict[index.unwrap() as usize].as_ref()),
)
}
}
Expand Down
17 changes: 12 additions & 5 deletions src/io/parquet/read/deserialize/binary/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl<'a, O: Offset> NestedDecoder<'a> for BinaryDecoder<O> {
)
}

fn push_valid(&self, state: &mut Self::State, decoded: &mut Self::DecodedState) {
fn push_valid(&self, state: &mut Self::State, decoded: &mut Self::DecodedState) -> Result<()> {
let (values, validity) = decoded;
match state {
State::Optional(page) => {
Expand All @@ -104,18 +104,25 @@ impl<'a, O: Offset> NestedDecoder<'a> for BinaryDecoder<O> {
}
State::RequiredDictionary(page) => {
let dict_values = &page.dict;
let op = move |index: u32| dict_values[index as usize].as_ref();
let item = page.values.next().map(op).unwrap_or_default();
let item = page
.values
.next()
.map(|index| dict_values[index.unwrap() as usize].as_ref())
.unwrap_or_default();
values.push(item);
}
State::OptionalDictionary(page) => {
let dict_values = &page.dict;
let op = move |index: u32| dict_values[index as usize].as_ref();
let item = page.values.next().map(op).unwrap_or_default();
let item = page
.values
.next()
.map(|index| dict_values[index.unwrap() as usize].as_ref())
.unwrap_or_default();
values.push(item);
validity.push(true);
}
}
Ok(())
}

fn push_null(&self, decoded: &mut Self::DecodedState) {
Expand Down
3 changes: 2 additions & 1 deletion src/io/parquet/read/deserialize/boolean/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl<'a> NestedDecoder<'a> for BooleanDecoder {
)
}

fn push_valid(&self, state: &mut State, decoded: &mut Self::DecodedState) {
fn push_valid(&self, state: &mut State, decoded: &mut Self::DecodedState) -> Result<()> {
let (values, validity) = decoded;
match state {
State::Optional(page_values) => {
Expand All @@ -95,6 +95,7 @@ impl<'a> NestedDecoder<'a> for BooleanDecoder {
values.push(value);
}
}
Ok(())
}

fn push_null(&self, decoded: &mut Self::DecodedState) {
Expand Down
12 changes: 8 additions & 4 deletions src/io/parquet/read/deserialize/dictionary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ where
Some(remaining),
values,
&mut page.values.by_ref().map(|x| {
let x: usize = x.try_into().unwrap();
// todo: rm unwrap
let x: usize = x.unwrap().try_into().unwrap();
match x.try_into() {
Ok(key) => key,
// todo: convert this to an error.
Expand All @@ -176,7 +177,8 @@ where
page.values
.by_ref()
.map(|x| {
let x: usize = x.try_into().unwrap();
// todo: rm unwrap
let x: usize = x.unwrap().try_into().unwrap();
let x: K = match x.try_into() {
Ok(key) => key,
// todo: convert this to an error.
Expand All @@ -195,7 +197,8 @@ where
Some(remaining),
values,
&mut page_values.by_ref().map(|x| {
let x: usize = x.try_into().unwrap();
// todo: rm unwrap
let x: usize = x.unwrap().try_into().unwrap();
let x: K = match x.try_into() {
Ok(key) => key,
// todo: convert this to an error.
Expand All @@ -211,7 +214,8 @@ where
page.values
.by_ref()
.map(|x| {
let x: usize = x.try_into().unwrap();
// todo: rm unwrap
let x: usize = x.unwrap().try_into().unwrap();
let x: K = match x.try_into() {
Ok(key) => key,
// todo: convert this to an error.
Expand Down
7 changes: 4 additions & 3 deletions src/io/parquet/read/deserialize/dictionary/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,11 @@ impl<'a, K: DictionaryKey> NestedDecoder<'a> for DictionaryDecoder<K> {
)
}

fn push_valid(&self, state: &mut Self::State, decoded: &mut Self::DecodedState) {
fn push_valid(&self, state: &mut Self::State, decoded: &mut Self::DecodedState) -> Result<()> {
let (values, validity) = decoded;
match state {
State::Optional(page_values) => {
let key = page_values.next();
let key = page_values.next().transpose()?;
// todo: convert unwrap to error
let key = match K::try_from(key.unwrap_or_default() as usize) {
Ok(key) => key,
Expand All @@ -124,14 +124,15 @@ impl<'a, K: DictionaryKey> NestedDecoder<'a> for DictionaryDecoder<K> {
validity.push(true);
}
State::Required(page_values) => {
let key = page_values.values.next();
let key = page_values.values.next().transpose()?;
let key = match K::try_from(key.unwrap_or_default() as usize) {
Ok(key) => key,
Err(_) => todo!(),
};
values.push(key);
}
}
Ok(())
}

fn push_null(&self, decoded: &mut Self::DecodedState) {
Expand Down
37 changes: 18 additions & 19 deletions src/io/parquet/read/deserialize/fixed_size_binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,27 +227,26 @@ impl<'a> Decoder<'a> for BinaryDecoder {
values.push(x)
}
}
State::OptionalDictionary(page) => {
let op = |index: u32| {
let index = index as usize;
State::OptionalDictionary(page) => extend_from_decoder(
validity,
&mut page.validity,
Some(remaining),
values,
page.values.by_ref().map(|index| {
let index = index.unwrap() as usize;
&page.dict[index * self.size..(index + 1) * self.size]
};

extend_from_decoder(
validity,
&mut page.validity,
Some(remaining),
values,
page.values.by_ref().map(op),
)
}
}),
),
State::RequiredDictionary(page) => {
let op = |index: u32| {
let index = index as usize;
&page.dict[index * self.size..(index + 1) * self.size]
};

for x in page.values.by_ref().map(op).take(remaining) {
for x in page
.values
.by_ref()
.map(|index| {
let index = index.unwrap() as usize;
&page.dict[index * self.size..(index + 1) * self.size]
})
.take(remaining)
{
values.push(x)
}
}
Expand Down
25 changes: 17 additions & 8 deletions src/io/parquet/read/deserialize/nested_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ pub(super) trait NestedDecoder<'a> {
/// Initializes a new state
fn with_capacity(&self, capacity: usize) -> Self::DecodedState;

fn push_valid(&self, state: &mut Self::State, decoded: &mut Self::DecodedState);
fn push_valid(&self, state: &mut Self::State, decoded: &mut Self::DecodedState) -> Result<()>;
fn push_null(&self, decoded: &mut Self::DecodedState);

fn deserialize_dict(&self, page: &DictPage) -> Self::Dictionary;
Expand Down Expand Up @@ -309,9 +309,9 @@ impl<'a> NestedPage<'a> {
let max_def_level = page.descriptor.max_def_level;

let reps =
HybridRleDecoder::new(rep_levels, get_bit_width(max_rep_level), page.num_values());
HybridRleDecoder::try_new(rep_levels, get_bit_width(max_rep_level), page.num_values())?;
let defs =
HybridRleDecoder::new(def_levels, get_bit_width(max_def_level), page.num_values());
HybridRleDecoder::try_new(def_levels, get_bit_width(max_def_level), page.num_values())?;

let iter = reps.zip(defs).peekable();

Expand Down Expand Up @@ -377,7 +377,7 @@ pub(super) fn extend<'a, D: NestedDecoder<'a>>(
&mut decoded,
decoder,
additional,
);
)?;
*remaining -= nested.len() - existing;
items.push_back((nested, decoded));

Expand All @@ -393,7 +393,7 @@ pub(super) fn extend<'a, D: NestedDecoder<'a>>(
&mut decoded,
decoder,
additional,
);
)?;
*remaining -= nested.len();
items.push_back((nested, decoded));
}
Expand All @@ -407,7 +407,7 @@ fn extend_offsets2<'a, D: NestedDecoder<'a>>(
decoded: &mut D::DecodedState,
decoder: &D,
additional: usize,
) {
) -> Result<()> {
let mut values_count = vec![0; nested.len()];

for (depth, nest) in nested.iter().enumerate().skip(1) {
Expand All @@ -431,6 +431,8 @@ fn extend_offsets2<'a, D: NestedDecoder<'a>>(

let mut rows = 0;
while let Some((rep, def)) = page.iter.next() {
let rep = rep?;
let def = def?;
if rep == 0 {
rows += 1;
}
Expand All @@ -455,20 +457,27 @@ fn extend_offsets2<'a, D: NestedDecoder<'a>>(
// the leaf / primitive
let is_valid = (def != cum_sum[depth]) || !nest.is_nullable();
if right_level && is_valid {
decoder.push_valid(values_state, decoded);
decoder.push_valid(values_state, decoded)?;
} else {
decoder.push_null(decoded);
}
}
}
}

let next_rep = page.iter.peek().map(|x| x.0).unwrap_or(0);
let next_rep = *page
.iter
.peek()
.map(|x| x.0.as_ref())
.transpose()
.unwrap() // todo: fix this
.unwrap_or(&0);

if next_rep == 0 && rows == additional {
break;
}
}
Ok(())
}

#[inline]
Expand Down
10 changes: 8 additions & 2 deletions src/io/parquet/read/deserialize/primitive/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,12 +232,18 @@ where
page_validity,
Some(remaining),
values,
&mut page_values.values.by_ref().map(op1),
&mut page_values.values.by_ref().map(|x| x.unwrap()).map(op1),
)
}
State::RequiredDictionary(page) => {
let op1 = |index: u32| page.dict[index as usize];
values.extend(page.values.by_ref().map(op1).take(remaining));
values.extend(
page.values
.by_ref()
.map(|x| x.unwrap())
.map(op1)
.take(remaining),
);
}
State::FilteredRequired(page) => {
values.extend(
Expand Down
Loading

0 comments on commit 99a91b2

Please sign in to comment.