Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
support decimal256(76,0)
Browse files Browse the repository at this point in the history
  • Loading branch information
TCeason committed Mar 1, 2023
1 parent 2512564 commit 05fd442
Show file tree
Hide file tree
Showing 9 changed files with 30 additions and 20 deletions.
4 changes: 4 additions & 0 deletions parquet_integration/write_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def case_basic_nullable() -> Tuple[dict, pa.Schema, str]:
pa.field("decimal256_18", pa.decimal256(18, 0)),
pa.field("decimal256_26", pa.decimal256(26, 0)),
pa.field("decimal256_39", pa.decimal256(39, 0)),
pa.field("decimal256_76", pa.decimal256(76, 0)),
pa.field("timestamp_us", pa.timestamp("us")),
pa.field("timestamp_s", pa.timestamp("s")),
pa.field("emoji", pa.utf8()),
Expand All @@ -58,6 +59,7 @@ def case_basic_nullable() -> Tuple[dict, pa.Schema, str]:
"decimal256_18": decimal,
"decimal256_26": decimal,
"decimal256_39": decimal,
"decimal256_76": decimal,
"timestamp_us": int64,
"timestamp_s": int64,
"emoji": emoji,
Expand Down Expand Up @@ -94,6 +96,7 @@ def case_basic_required() -> Tuple[dict, pa.Schema, str]:
pa.field("decimal256_18", pa.decimal256(18, 0), nullable=False),
pa.field("decimal256_26", pa.decimal256(26, 0), nullable=False),
pa.field("decimal256_39", pa.decimal256(39, 0), nullable=False),
pa.field("decimal256_76", pa.decimal256(76, 0), nullable=False),
]
schema = pa.schema(fields)

Expand All @@ -112,6 +115,7 @@ def case_basic_required() -> Tuple[dict, pa.Schema, str]:
"decimal256_18": decimal,
"decimal256_26": decimal,
"decimal256_39": decimal,
"decimal256_76": decimal,
},
schema,
f"basic_required_10.parquet",
Expand Down
2 changes: 1 addition & 1 deletion src/datatypes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ impl From<PrimitiveType> for DataType {
PrimitiveType::UInt32 => DataType::UInt32,
PrimitiveType::UInt64 => DataType::UInt64,
PrimitiveType::Int128 => DataType::Decimal(32, 32),
PrimitiveType::Int256 => DataType::Decimal256(32, 32),
PrimitiveType::Int256 => DataType::Decimal256(64, 64),
PrimitiveType::Float16 => DataType::Float16,
PrimitiveType::Float32 => DataType::Float32,
PrimitiveType::Float64 => DataType::Float64,
Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/read/deserialize/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ pub fn page_iter_to_arrays<'a, I: Pages + 'a>(
let values = array
.values()
.chunks_exact(n)
.map(|value: &[u8]| super::super::convert_i256(value, n))
.map(|value: &[u8]| super::super::convert_i256(value))
.collect::<Vec<_>>();
let validity = array.validity().cloned();

Expand Down
13 changes: 9 additions & 4 deletions src/io/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,14 @@ fn convert_i128(value: &[u8], n: usize) -> i128 {
i128::from_be_bytes(bytes) >> (8 * (16 - n))
}

fn convert_i256(value: &[u8], n: usize) -> i256 {
fn convert_i256(value: &[u8]) -> i256 {
let mut bytes = [0u8; 32];
bytes[..n].copy_from_slice(value);

i256(i256::from_be_bytes(bytes).0 >> (8 * (32 - n)))
let mut neg_bytes = [255u8; 32];
if value[0] >= 128 {
neg_bytes[32 - value.len()..].copy_from_slice(value);
i256::from_be_bytes(neg_bytes)
} else {
bytes[32 - value.len()..].copy_from_slice(value);
i256::from_be_bytes(bytes)
}
}
8 changes: 4 additions & 4 deletions src/io/parquet/read/statistics/fixlen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ use parquet2::statistics::{FixedLenStatistics, Statistics as ParquetStatistics};

use crate::array::*;
use crate::error::Result;
use crate::io::parquet::read::convert_i256;
use crate::types::{days_ms, i256};

use super::super::{convert_days_ms, convert_i128, convert_i256};
use super::super::{convert_days_ms, convert_i128};

pub(super) fn push_i128(
from: Option<&dyn ParquetStatistics>,
Expand Down Expand Up @@ -61,7 +62,6 @@ pub(super) fn push_i256_with_i128(

pub(super) fn push_i256(
from: Option<&dyn ParquetStatistics>,
n: usize,
min: &mut dyn MutableArray,
max: &mut dyn MutableArray,
) -> Result<()> {
Expand All @@ -75,8 +75,8 @@ pub(super) fn push_i256(
.unwrap();
let from = from.map(|s| s.as_any().downcast_ref::<FixedLenStatistics>().unwrap());

min.push(from.and_then(|s| s.min_value.as_deref().map(|x| convert_i256(x, n))));
max.push(from.and_then(|s| s.max_value.as_deref().map(|x| convert_i256(x, n))));
min.push(from.and_then(|s| s.min_value.as_deref().map(|x| convert_i256(x))));
max.push(from.and_then(|s| s.max_value.as_deref().map(|x| convert_i256(x))));

Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/read/statistics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ fn push(
ParquetPhysicalType::FixedLenByteArray(n) if *n > 32 => Err(Error::NotYetImplemented(
format!("Can't decode Decimal256 type from Fixed Size Byte Array of len {n:?}"),
)),
ParquetPhysicalType::FixedLenByteArray(n) => fixlen::push_i256(from, *n, min, max),
ParquetPhysicalType::FixedLenByteArray(_) => fixlen::push_i256(from, min, max),
_ => unreachable!(),
},
Binary => binary::push::<i32>(from, min, max),
Expand Down
6 changes: 4 additions & 2 deletions src/io/parquet/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ pub fn array_to_page_simple(
);
fixed_len_bytes::array_to_page(&array, options, type_, statistics)
} else {
let size = decimal_length_from_precision(precision);
let size = 32;
let array = array
.as_any()
.downcast_ref::<PrimitiveArray<i256>>()
Expand All @@ -531,8 +531,10 @@ pub fn array_to_page_simple(
None
};
let mut values = Vec::<u8>::with_capacity(size * array.len());
println!("values is {:?}", values);
array.values().iter().for_each(|x| {
let bytes = &x.to_be_bytes()[32 - size..];
let bytes = &x.to_be_bytes();
println!("bytes is {:?}", bytes);
values.extend_from_slice(bytes)
});
let array = FixedSizeBinaryArray::new(
Expand Down
3 changes: 1 addition & 2 deletions src/io/parquet/write/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,10 +329,9 @@ pub fn to_parquet_type(field: &Field) -> Result<ParquetType> {
None,
)?)
} else {
let len = decimal_length_from_precision(precision);
Ok(ParquetType::try_from_primitive(
name,
PhysicalType::FixedLenByteArray(len),
PhysicalType::FixedLenByteArray(32),
repetition,
None,
None,
Expand Down
10 changes: 5 additions & 5 deletions src/types/native.rs
Original file line number Diff line number Diff line change
Expand Up @@ -577,22 +577,22 @@ impl NativeType for i256 {
let mut bytes = [0u8; 32];
let (a, b) = self.0.into_words();

let b = b.to_be_bytes();
let a = a.to_be_bytes();
(0..16).for_each(|i| {
bytes[i] = b[i];
bytes[i] = a[i];
});

let a = a.to_be_bytes();
let b = b.to_be_bytes();
(0..16).for_each(|i| {
bytes[i + 16] = a[i];
bytes[i + 16] = b[i];
});

bytes
}

#[inline]
fn from_be_bytes(bytes: Self::Bytes) -> Self {
let (b, a) = bytes.split_at(16);
let (a, b) = bytes.split_at(16);
let a: [u8; 16] = a.try_into().unwrap();
let b: [u8; 16] = b.try_into().unwrap();
let a = i128::from_be_bytes(a);
Expand Down

0 comments on commit 05fd442

Please sign in to comment.