Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
support decimal256(76,0)
Browse files Browse the repository at this point in the history
  • Loading branch information
TCeason committed Mar 1, 2023
1 parent 2512564 commit b82d43a
Show file tree
Hide file tree
Showing 12 changed files with 120 additions and 22 deletions.
4 changes: 4 additions & 0 deletions parquet_integration/write_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def case_basic_nullable() -> Tuple[dict, pa.Schema, str]:
pa.field("decimal256_18", pa.decimal256(18, 0)),
pa.field("decimal256_26", pa.decimal256(26, 0)),
pa.field("decimal256_39", pa.decimal256(39, 0)),
pa.field("decimal256_76", pa.decimal256(76, 0)),
pa.field("timestamp_us", pa.timestamp("us")),
pa.field("timestamp_s", pa.timestamp("s")),
pa.field("emoji", pa.utf8()),
Expand All @@ -58,6 +59,7 @@ def case_basic_nullable() -> Tuple[dict, pa.Schema, str]:
"decimal256_18": decimal,
"decimal256_26": decimal,
"decimal256_39": decimal,
"decimal256_76": decimal,
"timestamp_us": int64,
"timestamp_s": int64,
"emoji": emoji,
Expand Down Expand Up @@ -94,6 +96,7 @@ def case_basic_required() -> Tuple[dict, pa.Schema, str]:
pa.field("decimal256_18", pa.decimal256(18, 0), nullable=False),
pa.field("decimal256_26", pa.decimal256(26, 0), nullable=False),
pa.field("decimal256_39", pa.decimal256(39, 0), nullable=False),
pa.field("decimal256_76", pa.decimal256(76, 0), nullable=False),
]
schema = pa.schema(fields)

Expand All @@ -112,6 +115,7 @@ def case_basic_required() -> Tuple[dict, pa.Schema, str]:
"decimal256_18": decimal,
"decimal256_26": decimal,
"decimal256_39": decimal,
"decimal256_76": decimal,
},
schema,
f"basic_required_10.parquet",
Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/read/deserialize/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ pub fn page_iter_to_arrays<'a, I: Pages + 'a>(
let values = array
.values()
.chunks_exact(n)
.map(|value: &[u8]| super::super::convert_i256(value, n))
.map(super::super::convert_i256)
.collect::<Vec<_>>();
let validity = array.validity().cloned();

Expand Down
13 changes: 9 additions & 4 deletions src/io/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,14 @@ fn convert_i128(value: &[u8], n: usize) -> i128 {
i128::from_be_bytes(bytes) >> (8 * (16 - n))
}

fn convert_i256(value: &[u8], n: usize) -> i256 {
fn convert_i256(value: &[u8]) -> i256 {
let mut bytes = [0u8; 32];
bytes[..n].copy_from_slice(value);

i256(i256::from_be_bytes(bytes).0 >> (8 * (32 - n)))
let mut neg_bytes = [255u8; 32];
if value[0] >= 128 {
neg_bytes[32 - value.len()..].copy_from_slice(value);
i256::from_be_bytes(neg_bytes)
} else {
bytes[32 - value.len()..].copy_from_slice(value);
i256::from_be_bytes(bytes)
}
}
3 changes: 0 additions & 3 deletions src/io/parquet/read/row_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,6 @@ pub fn to_deserializer<'a>(

(columns, types)
} else {
for (meta, chunk) in columns.clone() {
println!("the meta is {:?},\nthe chunk is {:?}", meta, chunk);
}
let (columns, types): (Vec<_>, Vec<_>) = columns
.into_iter()
.map(|(column_meta, chunk)| {
Expand Down
8 changes: 4 additions & 4 deletions src/io/parquet/read/statistics/fixlen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ use parquet2::statistics::{FixedLenStatistics, Statistics as ParquetStatistics};

use crate::array::*;
use crate::error::Result;
use crate::io::parquet::read::convert_i256;
use crate::types::{days_ms, i256};

use super::super::{convert_days_ms, convert_i128, convert_i256};
use super::super::{convert_days_ms, convert_i128};

pub(super) fn push_i128(
from: Option<&dyn ParquetStatistics>,
Expand Down Expand Up @@ -61,7 +62,6 @@ pub(super) fn push_i256_with_i128(

pub(super) fn push_i256(
from: Option<&dyn ParquetStatistics>,
n: usize,
min: &mut dyn MutableArray,
max: &mut dyn MutableArray,
) -> Result<()> {
Expand All @@ -75,8 +75,8 @@ pub(super) fn push_i256(
.unwrap();
let from = from.map(|s| s.as_any().downcast_ref::<FixedLenStatistics>().unwrap());

min.push(from.and_then(|s| s.min_value.as_deref().map(|x| convert_i256(x, n))));
max.push(from.and_then(|s| s.max_value.as_deref().map(|x| convert_i256(x, n))));
min.push(from.and_then(|s| s.min_value.as_deref().map(convert_i256)));
max.push(from.and_then(|s| s.max_value.as_deref().map(convert_i256)));

Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/read/statistics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ fn push(
ParquetPhysicalType::FixedLenByteArray(n) if *n > 32 => Err(Error::NotYetImplemented(
format!("Can't decode Decimal256 type from Fixed Size Byte Array of len {n:?}"),
)),
ParquetPhysicalType::FixedLenByteArray(n) => fixlen::push_i256(from, *n, min, max),
ParquetPhysicalType::FixedLenByteArray(_) => fixlen::push_i256(from, min, max),
_ => unreachable!(),
},
Binary => binary::push::<i32>(from, min, max),
Expand Down
4 changes: 2 additions & 2 deletions src/io/parquet/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ pub fn array_to_page_simple(
);
fixed_len_bytes::array_to_page(&array, options, type_, statistics)
} else {
let size = decimal_length_from_precision(precision);
let size = 32;
let array = array
.as_any()
.downcast_ref::<PrimitiveArray<i256>>()
Expand All @@ -532,7 +532,7 @@ pub fn array_to_page_simple(
};
let mut values = Vec::<u8>::with_capacity(size * array.len());
array.values().iter().for_each(|x| {
let bytes = &x.to_be_bytes()[32 - size..];
let bytes = &x.to_be_bytes();
values.extend_from_slice(bytes)
});
let array = FixedSizeBinaryArray::new(
Expand Down
3 changes: 1 addition & 2 deletions src/io/parquet/write/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,10 +329,9 @@ pub fn to_parquet_type(field: &Field) -> Result<ParquetType> {
None,
)?)
} else {
let len = decimal_length_from_precision(precision);
Ok(ParquetType::try_from_primitive(
name,
PhysicalType::FixedLenByteArray(len),
PhysicalType::FixedLenByteArray(32),
repetition,
None,
None,
Expand Down
10 changes: 5 additions & 5 deletions src/types/native.rs
Original file line number Diff line number Diff line change
Expand Up @@ -577,22 +577,22 @@ impl NativeType for i256 {
let mut bytes = [0u8; 32];
let (a, b) = self.0.into_words();

let b = b.to_be_bytes();
let a = a.to_be_bytes();
(0..16).for_each(|i| {
bytes[i] = b[i];
bytes[i] = a[i];
});

let a = a.to_be_bytes();
let b = b.to_be_bytes();
(0..16).for_each(|i| {
bytes[i + 16] = a[i];
bytes[i + 16] = b[i];
});

bytes
}

#[inline]
fn from_be_bytes(bytes: Self::Bytes) -> Self {
let (b, a) = bytes.split_at(16);
let (a, b) = bytes.split_at(16);
let a: [u8; 16] = a.try_into().unwrap();
let b: [u8; 16] = b.try_into().unwrap();
let a = i128::from_be_bytes(a);
Expand Down
24 changes: 24 additions & 0 deletions tests/it/io/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,13 @@ pub fn pyarrow_nullable(column: &str) -> Box<dyn Array> {
.collect::<Vec<_>>();
Box::new(PrimitiveArray::<i256>::from(values).to(DataType::Decimal256(39, 0)))
}
"decimal256_76" => {
let values = i64_values
.iter()
.map(|x| x.map(|x| i256(x.as_i256())))
.collect::<Vec<_>>();
Box::new(PrimitiveArray::<i256>::from(values).to(DataType::Decimal256(76, 0)))
}
"timestamp_us" => Box::new(
PrimitiveArray::<i64>::from(i64_values)
.to(DataType::Timestamp(TimeUnit::Microsecond, None)),
Expand Down Expand Up @@ -684,6 +691,16 @@ pub fn pyarrow_nullable_statistics(column: &str) -> Statistics {
Int256Array::from_slice([i256(9.as_i256())]).to(DataType::Decimal256(39, 0)),
),
},
"decimal256_76" => Statistics {
distinct_count: UInt64Array::from([None]).boxed(),
null_count: UInt64Array::from([Some(3)]).boxed(),
min_value: Box::new(
Int256Array::from_slice([i256(-(256.as_i256()))]).to(DataType::Decimal256(76, 0)),
),
max_value: Box::new(
Int256Array::from_slice([i256(9.as_i256())]).to(DataType::Decimal256(76, 0)),
),
},
"timestamp_us" => Statistics {
distinct_count: UInt64Array::from([None]).boxed(),
null_count: UInt64Array::from([Some(3)]).boxed(),
Expand Down Expand Up @@ -792,6 +809,13 @@ pub fn pyarrow_required(column: &str) -> Box<dyn Array> {
.collect::<Vec<_>>();
Box::new(PrimitiveArray::<i256>::from(values).to(DataType::Decimal256(39, 0)))
}
"decimal256_76" => {
let values = i64_values
.iter()
.map(|x| x.map(|x| i256(x.as_i256())))
.collect::<Vec<_>>();
Box::new(PrimitiveArray::<i256>::from(values).to(DataType::Decimal256(76, 0)))
}
_ => unreachable!(),
}
}
Expand Down
25 changes: 25 additions & 0 deletions tests/it/io/parquet/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,16 @@ fn v1_decimal256_39_required() -> Result<()> {
test_pyarrow_integration("decimal256_39", 1, "basic", false, true, None)
}

#[test]
fn v1_decimal256_76_nullable() -> Result<()> {
test_pyarrow_integration("decimal256_76", 1, "basic", false, false, None)
}

#[test]
fn v1_decimal256_76_required() -> Result<()> {
test_pyarrow_integration("decimal256_76", 1, "basic", false, true, None)
}

#[test]
fn v2_decimal_9_nullable() -> Result<()> {
test_pyarrow_integration("decimal_9", 2, "basic", false, false, None)
Expand Down Expand Up @@ -496,6 +506,11 @@ fn v2_decimal256_39_nullable() -> Result<()> {
test_pyarrow_integration("decimal256_39", 2, "basic", false, false, None)
}

#[test]
fn v2_decimal256_76_nullable() -> Result<()> {
test_pyarrow_integration("decimal256_76", 2, "basic", false, false, None)
}

#[test]
fn v1_timestamp_us_nullable() -> Result<()> {
test_pyarrow_integration("timestamp_us", 1, "basic", false, false, None)
Expand Down Expand Up @@ -566,6 +581,16 @@ fn v2_decimal256_39_required_dict() -> Result<()> {
test_pyarrow_integration("decimal256_39", 2, "basic", true, true, None)
}

#[test]
fn v2_decimal256_76_required() -> Result<()> {
test_pyarrow_integration("decimal256_76", 2, "basic", false, true, None)
}

#[test]
fn v2_decimal256_76_required_dict() -> Result<()> {
test_pyarrow_integration("decimal256_76", 2, "basic", true, true, None)
}

#[test]
fn v1_struct_required_optional() -> Result<()> {
test_pyarrow_integration("struct", 1, "struct", false, false, None)
Expand Down
44 changes: 44 additions & 0 deletions tests/it/io/parquet/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,28 @@ fn decimal256_39_required_v1() -> Result<()> {
)
}

#[test]
fn decimal256_76_optional_v1() -> Result<()> {
round_trip(
"decimal256_76",
"nullable",
Version::V1,
CompressionOptions::Uncompressed,
vec![Encoding::Plain],
)
}

#[test]
fn decimal256_76_required_v1() -> Result<()> {
round_trip(
"decimal256_76",
"required",
Version::V1,
CompressionOptions::Uncompressed,
vec![Encoding::Plain],
)
}

#[test]
fn decimal_9_optional_v2() -> Result<()> {
round_trip(
Expand Down Expand Up @@ -781,6 +803,28 @@ fn decimal256_39_required_v2() -> Result<()> {
)
}

#[test]
fn decimal256_76_optional_v2() -> Result<()> {
round_trip(
"decimal256_76",
"nullable",
Version::V2,
CompressionOptions::Uncompressed,
vec![Encoding::Plain],
)
}

#[test]
fn decimal256_76_required_v2() -> Result<()> {
round_trip(
"decimal256_76",
"required",
Version::V2,
CompressionOptions::Uncompressed,
vec![Encoding::Plain],
)
}

#[test]
fn struct_v1() -> Result<()> {
round_trip(
Expand Down

0 comments on commit b82d43a

Please sign in to comment.