Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Support nested decimal read write (#1553)
Browse files Browse the repository at this point in the history
* add decimal and decimal256 supports to array_to_page_nested

* add decimal and decimal256 supports to array_to_page_nested

* support nested decimal 256

* fix reviewer comments

* add tests

* fix tests
  • Loading branch information
ariesdevil authored Aug 29, 2023
1 parent ba6a882 commit f609d0c
Show file tree
Hide file tree
Showing 9 changed files with 609 additions and 36 deletions.
6 changes: 6 additions & 0 deletions parquet_integration/write_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ def case_nested() -> Tuple[dict, pa.Schema, str]:
[""],
]

decimal_nullable = [[Decimal(n) if n is not None else None for n in sublist] if sublist is not None else None for sublist in items_nullable]

list_struct_nullable = [
[{"a": "a"}, {"a": "b"}],
None,
Expand Down Expand Up @@ -222,6 +224,8 @@ def case_nested() -> Tuple[dict, pa.Schema, str]:
pa.field("list_bool", pa.list_(pa.bool_())),
pa.field("list_utf8", pa.list_(pa.utf8())),
pa.field("list_large_binary", pa.list_(pa.large_binary())),
pa.field("list_decimal", pa.list_(pa.decimal128(9, 0))),
pa.field("list_decimal256", pa.list_(pa.decimal256(9, 0))),
pa.field("list_nested_i64", pa.list_(pa.list_(pa.int64()))),
pa.field("list_nested_inner_required_i64", pa.list_(pa.list_(pa.int64()))),
pa.field(
Expand Down Expand Up @@ -251,6 +255,8 @@ def case_nested() -> Tuple[dict, pa.Schema, str]:
"list_bool": boolean,
"list_utf8": string,
"list_large_binary": string,
"list_decimal": decimal_nullable,
"list_decimal256": decimal_nullable,
"list_nested_i64": items_nested,
"list_nested_inner_required_i64": items_required_nested,
"list_nested_inner_required_required_i64": items_required_nested_2,
Expand Down
34 changes: 17 additions & 17 deletions src/io/parquet/read/deserialize/fixed_size_binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@ use super::super::utils::{
use super::super::Pages;
use super::utils::FixedSizeBinary;

type Dict = Vec<u8>;
pub(super) type Dict = Vec<u8>;

#[derive(Debug)]
struct Optional<'a> {
values: std::slice::ChunksExact<'a, u8>,
validity: OptionalPageValidity<'a>,
pub(super) struct Optional<'a> {
pub(super) values: std::slice::ChunksExact<'a, u8>,
pub(super) validity: OptionalPageValidity<'a>,
}

impl<'a> Optional<'a> {
fn try_new(page: &'a DataPage, size: usize) -> Result<Self> {
pub(super) fn try_new(page: &'a DataPage, size: usize) -> Result<Self> {
let (_, _, values) = split_buffer(page)?;

let values = values.chunks_exact(size);
Expand All @@ -41,12 +41,12 @@ impl<'a> Optional<'a> {
}

#[derive(Debug)]
struct Required<'a> {
pub(super) struct Required<'a> {
pub values: std::slice::ChunksExact<'a, u8>,
}

impl<'a> Required<'a> {
fn new(page: &'a DataPage, size: usize) -> Self {
pub(super) fn new(page: &'a DataPage, size: usize) -> Self {
let values = page.buffer();
assert_eq!(values.len() % size, 0);
let values = values.chunks_exact(size);
Expand All @@ -60,7 +60,7 @@ impl<'a> Required<'a> {
}

#[derive(Debug)]
struct FilteredRequired<'a> {
pub(super) struct FilteredRequired<'a> {
pub values: SliceFilteredIter<std::slice::ChunksExact<'a, u8>>,
}

Expand All @@ -83,13 +83,13 @@ impl<'a> FilteredRequired<'a> {
}

#[derive(Debug)]
struct RequiredDictionary<'a> {
pub(super) struct RequiredDictionary<'a> {
pub values: hybrid_rle::HybridRleDecoder<'a>,
dict: &'a Dict,
pub dict: &'a Dict,
}

impl<'a> RequiredDictionary<'a> {
fn try_new(page: &'a DataPage, dict: &'a Dict) -> Result<Self> {
pub(super) fn try_new(page: &'a DataPage, dict: &'a Dict) -> Result<Self> {
let values = dict_indices_decoder(page)?;

Ok(Self { dict, values })
Expand All @@ -102,14 +102,14 @@ impl<'a> RequiredDictionary<'a> {
}

#[derive(Debug)]
struct OptionalDictionary<'a> {
values: hybrid_rle::HybridRleDecoder<'a>,
validity: OptionalPageValidity<'a>,
dict: &'a Dict,
pub(super) struct OptionalDictionary<'a> {
pub(super) values: hybrid_rle::HybridRleDecoder<'a>,
pub(super) validity: OptionalPageValidity<'a>,
pub(super) dict: &'a Dict,
}

impl<'a> OptionalDictionary<'a> {
fn try_new(page: &'a DataPage, dict: &'a Dict) -> Result<Self> {
pub(super) fn try_new(page: &'a DataPage, dict: &'a Dict) -> Result<Self> {
let values = dict_indices_decoder(page)?;

Ok(Self {
Expand Down Expand Up @@ -267,7 +267,7 @@ impl<'a> Decoder<'a> for BinaryDecoder {
}
}

fn finish(
pub fn finish(
data_type: &DataType,
values: FixedSizeBinary,
validity: MutableBitmap,
Expand Down
2 changes: 2 additions & 0 deletions src/io/parquet/read/deserialize/fixed_size_binary/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
mod basic;
mod dictionary;
mod nested;
mod utils;

pub use basic::Iter;
pub use dictionary::{DictIter, NestedDictIter};
pub use nested::NestedIter;
189 changes: 189 additions & 0 deletions src/io/parquet/read/deserialize/fixed_size_binary/nested.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
use std::collections::VecDeque;

use parquet2::{
encoding::Encoding,
page::{DataPage, DictPage},
schema::Repetition,
};

use super::super::utils::{not_implemented, MaybeNext, PageState};
use super::utils::FixedSizeBinary;
use crate::array::FixedSizeBinaryArray;
use crate::io::parquet::read::deserialize::fixed_size_binary::basic::{
finish, Dict, Optional, OptionalDictionary, Required, RequiredDictionary,
};
use crate::io::parquet::read::deserialize::nested_utils::{next, NestedDecoder};
use crate::io::parquet::read::deserialize::utils::Pushable;
use crate::io::parquet::read::{InitNested, NestedState};
use crate::{bitmap::MutableBitmap, datatypes::DataType, error::Result, io::parquet::read::Pages};

#[derive(Debug)]
enum State<'a> {
Optional(Optional<'a>),
Required(Required<'a>),
RequiredDictionary(RequiredDictionary<'a>),
OptionalDictionary(OptionalDictionary<'a>),
}

impl<'a> PageState<'a> for State<'a> {
fn len(&self) -> usize {
match self {
State::Optional(state) => state.validity.len(),
State::Required(state) => state.len(),
State::RequiredDictionary(state) => state.len(),
State::OptionalDictionary(state) => state.validity.len(),
}
}
}

#[derive(Debug, Default)]
struct BinaryDecoder {
size: usize,
}

impl<'a> NestedDecoder<'a> for BinaryDecoder {
type State = State<'a>;
type Dictionary = Dict;
type DecodedState = (FixedSizeBinary, MutableBitmap);

fn build_state(
&self,
page: &'a DataPage,
dict: Option<&'a Self::Dictionary>,
) -> Result<Self::State> {
let is_optional =
page.descriptor.primitive_type.field_info.repetition == Repetition::Optional;
let is_filtered = page.selected_rows().is_some();

match (page.encoding(), dict, is_optional, is_filtered) {
(Encoding::Plain, _, true, false) => {
Ok(State::Optional(Optional::try_new(page, self.size)?))
}
(Encoding::Plain, _, false, false) => {
Ok(State::Required(Required::new(page, self.size)))
}
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false, false) => {
RequiredDictionary::try_new(page, dict).map(State::RequiredDictionary)
}
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true, false) => {
OptionalDictionary::try_new(page, dict).map(State::OptionalDictionary)
}
_ => Err(not_implemented(page)),
}
}

fn with_capacity(&self, capacity: usize) -> Self::DecodedState {
(
FixedSizeBinary::with_capacity(capacity, self.size),
MutableBitmap::with_capacity(capacity),
)
}

fn push_valid(&self, state: &mut Self::State, decoded: &mut Self::DecodedState) -> Result<()> {
let (values, validity) = decoded;
match state {
State::Optional(page) => {
let value = page.values.by_ref().next().unwrap_or_default();
values.push(value);
validity.push(true);
}
State::Required(page) => {
let value = page.values.by_ref().next().unwrap_or_default();
values.push(value);
}
State::RequiredDictionary(page) => {
let item = page
.values
.by_ref()
.next()
.map(|index| {
let index = index.unwrap() as usize;
&page.dict[index * self.size..(index + 1) * self.size]
})
.unwrap_or_default();
values.push(item);
}
State::OptionalDictionary(page) => {
let item = page
.values
.by_ref()
.next()
.map(|index| {
let index = index.unwrap() as usize;
&page.dict[index * self.size..(index + 1) * self.size]
})
.unwrap_or_default();
values.push(item);
validity.push(true);
}
}
Ok(())
}

fn push_null(&self, decoded: &mut Self::DecodedState) {
let (values, validity) = decoded;
values.push_null();
validity.push(false);
}

fn deserialize_dict(&self, page: &DictPage) -> Self::Dictionary {
page.buffer.clone()
}
}

pub struct NestedIter<I: Pages> {
iter: I,
data_type: DataType,
size: usize,
init: Vec<InitNested>,
items: VecDeque<(NestedState, (FixedSizeBinary, MutableBitmap))>,
dict: Option<Dict>,
chunk_size: Option<usize>,
remaining: usize,
}

impl<I: Pages> NestedIter<I> {
pub fn new(
iter: I,
init: Vec<InitNested>,
data_type: DataType,
num_rows: usize,
chunk_size: Option<usize>,
) -> Self {
let size = FixedSizeBinaryArray::get_size(&data_type);
Self {
iter,
data_type,
size,
init,
items: VecDeque::new(),
dict: None,
chunk_size,
remaining: num_rows,
}
}
}

impl<I: Pages> Iterator for NestedIter<I> {
type Item = Result<(NestedState, FixedSizeBinaryArray)>;

fn next(&mut self) -> Option<Self::Item> {
let maybe_state = next(
&mut self.iter,
&mut self.items,
&mut self.dict,
&mut self.remaining,
&self.init,
self.chunk_size,
&BinaryDecoder { size: self.size },
);
match maybe_state {
MaybeNext::Some(Ok((nested, decoded))) => {
Some(Ok((nested, finish(&self.data_type, decoded.0, decoded.1))))
}
MaybeNext::Some(Err(e)) => Some(Err(e)),
MaybeNext::None => None,
MaybeNext::More => self.next(),
}
}
}
Loading

0 comments on commit f609d0c

Please sign in to comment.