Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Simpler
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Jan 30, 2022
1 parent 955ce20 commit ebe7261
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 43 deletions.
10 changes: 6 additions & 4 deletions src/io/parquet/read/binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::VecDeque;
use std::default::Default;

use parquet2::{
encoding::{delta_length_byte_array, hybrid_rle, Encoding},
encoding::{hybrid_rle, Encoding},
page::{BinaryPageDict, DataPage},
schema::Repetition,
};
Expand All @@ -19,6 +19,7 @@ use super::super::utils::{extend_from_decoder, next, MaybeNext, OptionalPageVali
use super::super::DataPages;
use super::{super::utils, utils::Binary};

/*
fn read_delta_optional<O: Offset>(
validity_buffer: &[u8],
values_buffer: &[u8],
Expand Down Expand Up @@ -54,6 +55,7 @@ fn read_delta_optional<O: Offset>(
let new_values = values_iterator.into_values();
values.extend_from_slice(new_values);
}
*/

struct Optional<'a> {
values: utils::BinaryIter<'a>,
Expand All @@ -68,7 +70,7 @@ impl<'a> Optional<'a> {

Self {
values,
validity: OptionalPageValidity::new(validity_buffer, page.num_values()),
validity: OptionalPageValidity::new(page),
}
}
}
Expand Down Expand Up @@ -135,13 +137,13 @@ struct OptionalDictionary<'a> {

impl<'a> OptionalDictionary<'a> {
fn new(page: &'a DataPage, dict: &'a BinaryPageDict) -> Self {
let (_, validity_buffer, values_buffer, _) = utils::split_buffer(page, page.descriptor());
let (_, _, values_buffer, _) = utils::split_buffer(page, page.descriptor());

let values = values_iter1(values_buffer, dict, page.num_values());

Self {
values,
validity: OptionalPageValidity::new(validity_buffer, page.num_values()),
validity: OptionalPageValidity::new(page),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/read/binary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub use dictionary::iter_to_arrays as iter_to_dict_arrays;

use self::basic::TraitBinaryArray;

use super::{nested_utils::Nested, DataPages};
use super::DataPages;
use basic::BinaryArrayIterator;

/// Converts [`DataPages`] to an [`Iterator`] of [`Array`]
Expand Down
4 changes: 2 additions & 2 deletions src/io/parquet/read/boolean/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ struct Optional<'a> {

impl<'a> Optional<'a> {
pub fn new(page: &'a DataPage) -> Self {
let (_, validity_buffer, values_buffer, _) = split_buffer(page, page.descriptor());
let (_, _, values_buffer, _) = split_buffer(page, page.descriptor());

Self {
values: values_iter(values_buffer),
validity: OptionalPageValidity::new(validity_buffer, page.num_values()),
validity: OptionalPageValidity::new(page),
}
}
}
Expand Down
19 changes: 11 additions & 8 deletions src/io/parquet/read/boolean/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,17 @@ impl<'a> Decoder<'a, bool, MutableBitmap> for BooleanDecoder {
required: usize,
) {
match state {
State::Optional(page_validity, page_values) => read_optional_values(
page_validity.definition_levels.by_ref(),
page_validity.max_def(),
page_values.by_ref(),
values,
validity,
required,
),
State::Optional(page_validity, page_values) => {
let max_def = page_validity.max_def();
read_optional_values(
page_validity.definition_levels.by_ref(),
max_def,
page_values.by_ref(),
values,
validity,
required,
)
}
State::Required(page) => {
values.extend_from_slice(page.values, page.offset, required);
page.offset += required;
Expand Down
4 changes: 2 additions & 2 deletions src/io/parquet/read/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@ where
K: DictionaryKey,
{
fn new(page: &'a DataPage) -> Self {
let (_, validity_buffer, indices_buffer, _) = utils::split_buffer(page, page.descriptor());
let (_, _, indices_buffer, _) = utils::split_buffer(page, page.descriptor());

let values = values_iter1(indices_buffer, page.num_values());

Self {
values,
validity: OptionalPageValidity::new(validity_buffer, page.num_values()),
validity: OptionalPageValidity::new(page),
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/io/parquet/read/fixed_size_binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ struct Optional<'a> {

impl<'a> Optional<'a> {
fn new(page: &'a DataPage, size: usize) -> Self {
let (_, validity_buffer, values_buffer, _) = split_buffer(page, page.descriptor());
let (_, _, values_buffer, _) = split_buffer(page, page.descriptor());

let values = values_buffer.chunks_exact(size);

Self {
values,
validity: OptionalPageValidity::new(validity_buffer, page.num_values()),
validity: OptionalPageValidity::new(page),
}
}
}
Expand Down Expand Up @@ -100,13 +100,13 @@ struct OptionalDictionary<'a> {

impl<'a> OptionalDictionary<'a> {
fn new(page: &'a DataPage, dict: &'a FixedLenByteArrayPageDict) -> Self {
let (_, validity_buffer, values_buffer, _) = split_buffer(page, page.descriptor());
let (_, _, values_buffer, _) = split_buffer(page, page.descriptor());

let values = values_iter1(values_buffer, dict, page.num_values());

Self {
values,
validity: OptionalPageValidity::new(validity_buffer, page.num_values()),
validity: OptionalPageValidity::new(page),
}
}
}
Expand Down
29 changes: 10 additions & 19 deletions src/io/parquet/read/primitive/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::{
};

use super::super::utils;
use super::super::utils::{split_buffer, OptionalPageValidity};
use super::super::utils::OptionalPageValidity;
use super::super::DataPages;

#[derive(Debug)]
Expand All @@ -35,10 +35,12 @@ where
G: for<'b> Fn(&'b [u8]) -> P,
F: Fn(P) -> T,
{
fn new(data: &'a [u8], op1: G, op2: F) -> Self {
fn new(page: &'a DataPage, op1: G, op2: F) -> Self {
let (_, _, values, _) = utils::split_buffer(page, page.descriptor());
assert_eq!(values.len(), page.num_values() * std::mem::size_of::<T>());
Self {
phantom: Default::default(),
values: data
values: values
.chunks_exact(std::mem::size_of::<P>())
.map(op1)
.map(op2),
Expand Down Expand Up @@ -190,32 +192,21 @@ where
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true) => {
let dict = dict.as_any().downcast_ref().unwrap();

let (_, validity_buffer, values_buffer, _) =
utils::split_buffer(page, page.descriptor());
let (_, _, values_buffer, _) = utils::split_buffer(page, page.descriptor());

Ok(State::OptionalDictionary(
OptionalPageValidity::new(validity_buffer, page.num_values()),
OptionalPageValidity::new(page),
ValuesDictionary::new(values_buffer, page.num_values(), dict, self.op2),
))
}
(Encoding::Plain, None, true) => {
let (_, validity_buffer, values_buffer, _) = split_buffer(page, page.descriptor());

let validity = OptionalPageValidity::new(validity_buffer, page.num_values());
let values = Values::new(values_buffer, self.op1, self.op2);
let validity = OptionalPageValidity::new(page);
let values = Values::new(page, self.op1, self.op2);

Ok(State::Optional(validity, values))
}
(Encoding::Plain, None, false) => {
assert_eq!(
page.buffer().len(),
page.num_values() * std::mem::size_of::<T>()
);
Ok(State::Required(Values::new(
page.buffer(),
self.op1,
self.op2,
)))
Ok(State::Required(Values::new(page, self.op1, self.op2)))
}
_ => Err(utils::not_implemented(
&page.encoding(),
Expand Down
9 changes: 6 additions & 3 deletions src/io/parquet/read/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use parquet2::metadata::ColumnDescriptor;
use parquet2::page::{split_buffer as _split_buffer, DataPage, DataPageHeader};
use streaming_iterator::{convert, Convert, StreamingIterator};

use crate::array::DictionaryKey;
use crate::bitmap::utils::BitmapIter;
use crate::bitmap::MutableBitmap;
use crate::error::ArrowError;
Expand Down Expand Up @@ -144,13 +143,15 @@ pub struct OptionalPageValidity<'a> {

impl<'a> OptionalPageValidity<'a> {
#[inline]
pub fn new(validity: &'a [u8], length: usize) -> Self {
pub fn new(page: &'a DataPage) -> Self {
let (_, validity, _, _) = split_buffer(page, page.descriptor());

let validity = convert(hybrid_rle::Decoder::new(validity, 1));
Self {
validity,
run_offset: 0,
consumed: 0,
length,
length: page.num_values(),
}
}

Expand Down Expand Up @@ -238,6 +239,7 @@ pub(super) fn extend_from_decoder<'a, T: Default, P: Pushable<T>, I: Iterator<It
}
}

/*
pub(super) fn read_dict_optional<K>(
validity_buffer: &[u8],
indices_buffer: &[u8],
Expand All @@ -260,6 +262,7 @@ pub(super) fn read_dict_optional<K>(
extend_from_decoder(validity, &mut page_validity, None, indices, indices_iter)
}
*/

/// The state of a partially deserialized page
pub(super) trait PageState<'a> {
Expand Down

0 comments on commit ebe7261

Please sign in to comment.