Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Removed un-used
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Jan 28, 2022
1 parent a8c1e15 commit 6ef21ba
Show file tree
Hide file tree
Showing 8 changed files with 30 additions and 434 deletions.
149 changes: 2 additions & 147 deletions src/io/parquet/read/binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::default::Default;

use parquet2::{
encoding::{delta_length_byte_array, hybrid_rle, Encoding},
metadata::ColumnDescriptor,
page::{BinaryPageDict, DataPage},
};

Expand All @@ -13,69 +12,12 @@ use crate::{
buffer::Buffer,
datatypes::DataType,
error::Result,
io::parquet::read::{
utils::{extend_from_decoder, Decoder, OptionalPageValidity, Pushable},
DataPages,
},
};

use super::super::DataPages;
use super::super::utils::{extend_from_decoder, Decoder, OptionalPageValidity};
use super::{super::utils, utils::Binary};

#[inline]
fn values_iter<'a>(
indices_buffer: &'a [u8],
dict: &'a BinaryPageDict,
additional: usize,
) -> impl Iterator<Item = &'a [u8]> + 'a {
let dict_values = dict.values();
let dict_offsets = dict.offsets();

// SPEC: Data page format: the bit width used to encode the entry ids stored as 1 byte (max bit width = 32),
// SPEC: followed by the values encoded using RLE/Bit packed described above (with the given bit width).
let bit_width = indices_buffer[0];
let indices_buffer = &indices_buffer[1..];

let indices = hybrid_rle::HybridRleDecoder::new(indices_buffer, bit_width as u32, additional);
indices.map(move |index| {
let index = index as usize;
let dict_offset_i = dict_offsets[index] as usize;
let dict_offset_ip1 = dict_offsets[index + 1] as usize;
&dict_values[dict_offset_i..dict_offset_ip1]
})
}

/// Assumptions: No rep levels
#[allow(clippy::too_many_arguments)]
fn read_dict_buffer<O: Offset>(
validity_buffer: &[u8],
indices_buffer: &[u8],
additional: usize,
dict: &BinaryPageDict,
values: &mut Binary<O>,
validity: &mut MutableBitmap,
) {
let values_iter = values_iter(indices_buffer, dict, additional);

let mut page_validity = OptionalPageValidity::new(validity_buffer, additional);

extend_from_decoder(validity, &mut page_validity, None, values, values_iter);
}

#[allow(clippy::too_many_arguments)]
fn read_dict_required<O: Offset>(
indices_buffer: &[u8],
additional: usize,
dict: &BinaryPageDict,
values: &mut Binary<O>,
validity: &mut MutableBitmap,
) {
debug_assert_eq!(0, validity.len());
let values_iterator = values_iter(indices_buffer, dict, additional);
for value in values_iterator {
values.push(value);
}
}

fn read_delta_optional<O: Offset>(
validity_buffer: &[u8],
values_buffer: &[u8],
Expand Down Expand Up @@ -112,93 +54,6 @@ fn read_delta_optional<O: Offset>(
values.extend_from_slice(new_values);
}

fn read_plain_optional<O: Offset>(
validity_buffer: &[u8],
values_buffer: &[u8],
additional: usize,
values: &mut Binary<O>,
validity: &mut MutableBitmap,
) {
// values_buffer: first 4 bytes are len, remaining is values
let values_iter = utils::BinaryIter::new(values_buffer);

let mut page_validity = OptionalPageValidity::new(validity_buffer, additional);

extend_from_decoder(validity, &mut page_validity, None, values, values_iter)
}

pub(super) fn read_plain_required<O: Offset>(
buffer: &[u8],
additional: usize,
values: &mut Binary<O>,
) {
let values_iterator = utils::BinaryIter::new(buffer);

// each value occupies 4 bytes + len declared in 4 bytes => reserve accordingly.
values.offsets.reserve(additional);
values.values.reserve(buffer.len() - 4 * additional);
let a = values.values.capacity();
for value in values_iterator {
values.push(value);
}
debug_assert_eq!(a, values.values.capacity());
}

pub(super) fn extend_from_page<O: Offset>(
page: &DataPage,
descriptor: &ColumnDescriptor,
values: &mut Binary<O>,
validity: &mut MutableBitmap,
) -> Result<()> {
let additional = page.num_values();
assert_eq!(descriptor.max_rep_level(), 0);
assert!(descriptor.max_def_level() <= 1);
let is_optional = descriptor.max_def_level() == 1;

let (_, validity_buffer, values_buffer, version) = utils::split_buffer(page, descriptor);

match (&page.encoding(), page.dictionary_page(), is_optional) {
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true) => {
read_dict_buffer::<O>(
validity_buffer,
values_buffer,
additional,
dict.as_any().downcast_ref().unwrap(),
values,
validity,
)
}
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false) => {
read_dict_required::<O>(
values_buffer,
additional,
dict.as_any().downcast_ref().unwrap(),
values,
validity,
)
}
(Encoding::DeltaLengthByteArray, None, true) => {
read_delta_optional::<O>(validity_buffer, values_buffer, additional, values, validity)
}
(Encoding::Plain, _, true) => {
read_plain_optional::<O>(validity_buffer, values_buffer, additional, values, validity)
}
(Encoding::Plain, _, false) => {
read_plain_required::<O>(page.buffer(), page.num_values(), values)
}
_ => {
return Err(utils::not_implemented(
&page.encoding(),
is_optional,
page.dictionary_page().is_some(),
version,
"Binary",
))
}
};
Ok(())
}

struct Optional<'a> {
values: utils::BinaryIter<'a>,
validity: OptionalPageValidity<'a>,
Expand Down
38 changes: 2 additions & 36 deletions src/io/parquet/read/binary/mod.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
use std::sync::Arc;

use futures::{pin_mut, Stream, StreamExt};
use parquet2::{metadata::ColumnChunkMetaData, page::DataPage, FallibleStreamingIterator};

use crate::{
array::{Array, Offset},
bitmap::MutableBitmap,
datatypes::DataType,
error::{ArrowError, Result},
io::parquet::read::binary::utils::finish_array,
error::Result,
};

mod basic;
Expand All @@ -18,40 +13,11 @@ mod utils;

pub use dictionary::iter_to_arrays as iter_to_dict_arrays;

use self::{basic::TraitBinaryArray, utils::Binary};
use self::basic::TraitBinaryArray;

use super::{nested_utils::Nested, DataPages};
use basic::BinaryArrayIterator;

pub async fn stream_to_array<O, I, E>(
pages: I,
metadata: &ColumnChunkMetaData,
data_type: &DataType,
) -> Result<Box<dyn Array>>
where
ArrowError: From<E>,
O: Offset,
E: Clone,
I: Stream<Item = std::result::Result<DataPage, E>>,
{
let capacity = metadata.num_values() as usize;
let mut values = Binary::<O>::with_capacity(capacity);
let mut validity = MutableBitmap::with_capacity(capacity);

pin_mut!(pages); // needed for iteration

while let Some(page) = pages.next().await {
basic::extend_from_page(
page.as_ref().map_err(|x| x.clone())?,
metadata.descriptor(),
&mut values,
&mut validity,
)?
}

Ok(finish_array(data_type.clone(), values, validity))
}

/// Converts [`DataPages`] to an [`Iterator`] of [`Array`]
pub fn iter_to_arrays<'a, O, A, I>(
iter: I,
Expand Down
15 changes: 14 additions & 1 deletion src/io/parquet/read/binary/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,24 @@ use parquet2::{
};

use super::super::utils;
use super::basic::read_plain_required;
use super::super::utils::Pushable;
use super::{super::nested_utils::*, utils::Binary};

use crate::{array::Offset, bitmap::MutableBitmap, error::Result};

fn read_plain_required<O: Offset>(buffer: &[u8], additional: usize, values: &mut Binary<O>) {
let values_iterator = utils::BinaryIter::new(buffer);

// each value occupies 4 bytes + len declared in 4 bytes => reserve accordingly.
values.offsets.reserve(additional);
values.values.reserve(buffer.len() - 4 * additional);
let a = values.values.capacity();
for value in values_iterator {
values.push(value);
}
debug_assert_eq!(a, values.values.capacity());
}

fn read_values<'a, O, D, G>(
def_levels: D,
max_def: u32,
Expand Down
29 changes: 1 addition & 28 deletions src/io/parquet/read/binary/utils.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,4 @@
use crate::{
array::{Array, BinaryArray, Offset, Utf8Array},
bitmap::MutableBitmap,
datatypes::DataType,
io::parquet::read::utils::Pushable,
};

pub(super) fn finish_array<O: Offset>(
data_type: DataType,
values: Binary<O>,
validity: MutableBitmap,
) -> Box<dyn Array> {
match data_type {
DataType::LargeBinary | DataType::Binary => Box::new(BinaryArray::from_data(
data_type,
values.offsets.0.into(),
values.values.into(),
validity.into(),
)),
DataType::LargeUtf8 | DataType::Utf8 => Box::new(Utf8Array::from_data(
data_type,
values.offsets.0.into(),
values.values.into(),
validity.into(),
)),
_ => unreachable!(),
}
}
use crate::{array::Offset, io::parquet::read::utils::Pushable};

/// [`Pushable`] for variable length binary data.
#[derive(Debug)]
Expand Down
13 changes: 5 additions & 8 deletions src/io/parquet/read/boolean/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,11 @@ use crate::{
};

use super::super::utils;
use super::super::utils::{extend_from_decoder, split_buffer, Decoder, OptionalPageValidity};
use super::super::utils::{
extend_from_decoder, extend_from_new_page, split_buffer, Decoder, OptionalPageValidity,
};
use super::super::DataPages;

pub(super) fn read_required(buffer: &[u8], additional: usize, values: &mut MutableBitmap) {
// in PLAIN, booleans are LSB bitpacked and thus we can read them as if they were a bitmap.
values.extend_from_slice(buffer, 0, additional);
}

// The state of an optional DataPage with a boolean physical type
#[derive(Debug)]
struct Optional<'a> {
Expand Down Expand Up @@ -101,7 +98,7 @@ fn build_state(page: &DataPage, is_optional: bool) -> Result<BooleanPageState> {
#[derive(Default)]
struct BooleanDecoder {}

impl<'a> utils::Decoder<'a, bool, MutableBitmap> for BooleanDecoder {
impl<'a> Decoder<'a, bool, MutableBitmap> for BooleanDecoder {
type State = BooleanPageState<'a>;
type Array = BooleanArray;

Expand Down Expand Up @@ -183,7 +180,7 @@ impl<I: DataPages> Iterator for BooleanArrayIterator<I> {
Err(e) => return Some(Err(e)),
};

let maybe_array = utils::extend_from_new_page::<BooleanDecoder, _, _>(
let maybe_array = extend_from_new_page::<BooleanDecoder, _, _>(
page,
state,
&self.data_type,
Expand Down
6 changes: 5 additions & 1 deletion src/io/parquet/read/boolean/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,16 @@ use parquet2::{

use super::super::nested_utils::*;
use super::super::utils;
use super::basic::read_required;
use crate::{
bitmap::{utils::BitmapIter, MutableBitmap},
error::Result,
};

fn read_required(buffer: &[u8], additional: usize, values: &mut MutableBitmap) {
// in PLAIN, booleans are LSB bitpacked and thus we can read them as if they were a bitmap.
values.extend_from_slice(buffer, 0, additional);
}

fn read_values<D, G>(
def_levels: D,
max_def: u32,
Expand Down
Loading

0 comments on commit 6ef21ba

Please sign in to comment.