Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
feat(parquet): add support decimal256 read/write in parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
TCeason committed Feb 21, 2023
1 parent 615300b commit 0b3a4ab
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 0 deletions.
32 changes: 32 additions & 0 deletions src/io/parquet/read/deserialize/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use parquet2::{
types::int96_to_i64_ns,
};

use crate::types::i256;
use crate::{
array::{Array, DictionaryKey, MutablePrimitiveArray, PrimitiveArray},
datatypes::{DataType, IntervalUnit, TimeUnit},
Expand Down Expand Up @@ -228,6 +229,37 @@ pub fn page_iter_to_arrays<'a, I: Pages + 'a>(

Box::new(arrays) as _
}
(PhysicalType::FixedLenByteArray(n), Decimal256(_, _)) if *n > 16 => {
return Err(Error::NotYetImplemented(format!(
"Can't decode Decimal256 type from Fixed Size Byte Array of len {n:?}"
)))
}
(PhysicalType::FixedLenByteArray(n), Decimal256(_, _)) => {
let n = *n;

let pages = fixed_size_binary::Iter::new(
pages,
DataType::FixedSizeBinary(16),
num_rows,
chunk_size,
);

let pages = pages.map(move |maybe_array| {
let array = maybe_array?;
let values = array
.values()
.chunks_exact(16)
.map(|value: &[u8]| super::super::convert_i256(value, n))
.collect::<Vec<_>>();
let validity = array.validity().cloned();

PrimitiveArray::<i256>::try_new(data_type.clone(), values.into(), validity)
});

let arrays = pages.map(|x| x.map(|x| x.boxed()));

Box::new(arrays) as _
}
(PhysicalType::Int32, Date64) => dyn_iter(iden(primitive::IntegerIter::new(
pages,
data_type,
Expand Down
8 changes: 8 additions & 0 deletions src/io/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub use parquet2::{

use crate::{array::Array, error::Result};

use crate::types::{i256, NativeType};
pub use deserialize::{
column_iter_to_arrays, create_list, get_page_iterator, init_nested, n_columns, InitNested,
NestedArrayIter, NestedState, StructIterator,
Expand Down Expand Up @@ -81,3 +82,10 @@ fn convert_i128(value: &[u8], n: usize) -> i128 {
bytes[..n].copy_from_slice(value);
i128::from_be_bytes(bytes) >> (8 * (16 - n))
}

fn convert_i256(value: &[u8], n: usize) -> i256 {
let mut bytes = [0u8; 32];
bytes[..n].copy_from_slice(value);

i256::from_be_bytes(bytes)
}
29 changes: 29 additions & 0 deletions src/io/parquet/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::array::*;
use crate::datatypes::*;
use crate::error::{Error, Result};
use crate::types::days_ms;
use crate::types::i256;
use crate::types::NativeType;

pub use nested::write_rep_and_def;
Expand Down Expand Up @@ -463,6 +464,34 @@ pub fn array_to_page_simple(

fixed_len_bytes::array_to_page(array, options, type_, statistics)
}
DataType::Decimal256(precision, _) => {
let type_ = type_;
let precision = *precision;
let size = 16;
println!("the array is {:?}", array.clone());
let array = array
.as_any()
.downcast_ref::<PrimitiveArray<i256>>()
.unwrap();
let mut values = Vec::<u8>::with_capacity(size * array.len());
array.values().iter().for_each(|x| {
let bytes = &x.to_be_bytes()[16 - size..];
values.extend_from_slice(bytes)
});
let array = FixedSizeBinaryArray::new(
DataType::FixedSizeBinary(size),
values.into(),
array.validity().cloned(),
);
let statistics = if options.write_statistics {
let stats = fixed_len_bytes::build_statistics(&array, type_.clone());
Some(stats)
} else {
None
};

fixed_len_bytes::array_to_page(&array, options, type_, statistics)
}
DataType::Decimal(precision, _) => {
let type_ = type_;
let precision = *precision;
Expand Down
8 changes: 8 additions & 0 deletions src/io/parquet/write/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,14 @@ pub fn to_parquet_type(field: &Field) -> Result<ParquetType> {
None,
)?)
}
DataType::Decimal256(_, _) => Ok(ParquetType::try_from_primitive(
name,
PhysicalType::FixedLenByteArray(16),
repetition,
None,
None,
None,
)?),
DataType::Interval(_) => Ok(ParquetType::try_from_primitive(
name,
PhysicalType::FixedLenByteArray(12),
Expand Down

0 comments on commit 0b3a4ab

Please sign in to comment.