Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Added read Decimal from parquet #489

Merged
merged 3 commits into from
Oct 6, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ futures = { version = "0.3", optional = true }
# for faster hashing
ahash = { version = "0.7", optional = true }

parquet2 = { version = "0.5", optional = true, default_features = false, features = ["stream"] }
parquet2 = { git= "https://github.com/jorgecarleitao/parquet2.git", rev="e6bd6723c876c21667c74fb75db181d99d646bc2", optional = true, default_features = false, features = ["stream"] }
jorgecarleitao marked this conversation as resolved.
Show resolved Hide resolved

avro-rs = { version = "0.13", optional = true, default_features = false }

Expand Down
16 changes: 16 additions & 0 deletions parquet_integration/write_parquet.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import pyarrow as pa
import pyarrow.parquet
import os
from decimal import Decimal

PYARROW_PATH = "fixtures/pyarrow3"

Expand All @@ -11,6 +12,7 @@ def case_basic_nullable(size=1):
string = ["Hello", None, "aa", "", None, "abc", None, None, "def", "aaa"]
boolean = [True, None, False, False, None, True, None, None, True, True]
string_large = ["ABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCD😃🌚🕳👊"] * 10
decimal = [Decimal(e) if e is not None else None for e in int64]

fields = [
pa.field("int64", pa.int64()),
Expand All @@ -20,6 +22,10 @@ def case_basic_nullable(size=1):
pa.field("date", pa.timestamp("ms")),
pa.field("uint32", pa.uint32()),
pa.field("string_large", pa.utf8()),
# decimal testing
pa.field("decimal_9", pa.decimal128(9,0)),
pa.field("decimal_18", pa.decimal128(18,0)),
pa.field("decimal_26", pa.decimal128(26,0)),
]
schema = pa.schema(fields)

Expand All @@ -32,6 +38,9 @@ def case_basic_nullable(size=1):
"date": int64 * size,
"uint32": int64 * size,
"string_large": string_large * size,
"decimal_9": decimal * size,
"decimal_18": decimal * size,
"decimal_26": decimal * size,
},
schema,
f"basic_nullable_{size*10}.parquet",
Expand All @@ -43,6 +52,7 @@ def case_basic_required(size=1):
float64 = [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
string = ["Hello", "bbb", "aa", "", "bbb", "abc", "bbb", "bbb", "def", "aaa"]
boolean = [True, True, False, False, False, True, True, True, True, True]
decimal = [Decimal(e) for e in int64]

fields = [
pa.field("int64", pa.int64(), nullable=False),
Expand All @@ -57,6 +67,9 @@ def case_basic_required(size=1):
nullable=False,
),
pa.field("uint32", pa.uint32(), nullable=False),
pa.field("decimal_9", pa.decimal128(9,0), nullable=False),
pa.field("decimal_18", pa.decimal128(18,0), nullable=False),
pa.field("decimal_26", pa.decimal128(26,0), nullable=False),
]
schema = pa.schema(fields)

Expand All @@ -68,6 +81,9 @@ def case_basic_required(size=1):
"bool": boolean * size,
"date": int64 * size,
"uint32": int64 * size,
"decimal_9": decimal * size,
"decimal_18": decimal * size,
"decimal_26": decimal * size,
},
schema,
f"basic_required_{size*10}.parquet",
Expand Down
53 changes: 43 additions & 10 deletions src/io/parquet/read/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
//! APIs to read from Parquet format.
use std::{
io::{Read, Seek},
sync::Arc,
};
use std::{convert::TryInto, io::{Read, Seek}, sync::Arc};

use futures::{AsyncRead, AsyncSeek, Stream};
pub use parquet2::{
Expand All @@ -21,11 +18,7 @@ pub use parquet2::{
types::int96_to_i64_ns,
};

use crate::{
array::{Array, DictionaryKey},
datatypes::{DataType, IntervalUnit, TimeUnit},
error::{ArrowError, Result},
};
use crate::{array::{Array, DictionaryKey, PrimitiveArray}, datatypes::{DataType, IntervalUnit, TimeUnit}, error::{ArrowError, Result}};

mod binary;
mod boolean;
Expand Down Expand Up @@ -211,7 +204,47 @@ pub fn page_iter_to_array<
FixedSizeBinary(_) => Ok(Box::new(fixed_size_binary::iter_to_array(
iter, data_type, metadata,
)?)),

Decimal(_, _) => match metadata.descriptor().type_() {
ParquetType::PrimitiveType { physical_type, ..} => match physical_type{
PhysicalType::Int32 => primitive::iter_to_array(
iter,
metadata,
data_type,
|x: i32| x as i128,
),
PhysicalType::Int64 => primitive::iter_to_array(
iter,
metadata,
data_type,
|x: i64| x as i128,
),
PhysicalType::FixedLenByteArray(n) => {
if *n > 16 {
Err(ArrowError::NotYetImplemented(format!(
"Can't decode Decimal128 type from Fixed Size Byte Array of len {:?}",
n
)))
} else {
let paddings = (0..(16-*n)).map(|_| 0u8).collect::<Vec<_>>();
fixed_size_binary::iter_to_array(iter, DataType::FixedSizeBinary(*n), metadata)
.map(|e|{
let a = e.into_iter().map(|v|
v.and_then(|v1| {
[&paddings, v1].concat().try_into().map(
|pad16| i128::from_be_bytes(pad16)
).ok()
}
)
).collect::<Vec<_>>();
Box::new(PrimitiveArray::<i128>::from(a).to(data_type)) as Box<dyn Array>
}
)
}
},
_ => unreachable!()
},
_ => unreachable!()
},
List(ref inner) => match inner.data_type() {
UInt8 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i32| x as u8),
UInt16 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i32| x as u16),
Expand Down
3 changes: 3 additions & 0 deletions src/io/parquet/read/schema/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ pub fn from_int64(
ParquetTimeUnit::MICROS(_) => DataType::Time64(TimeUnit::Microsecond),
ParquetTimeUnit::NANOS(_) => DataType::Time64(TimeUnit::Nanosecond),
},
(Some(PrimitiveConvertedType::Decimal(precision,scale)), _) => {
DataType::Decimal(*precision as usize, *scale as usize)
}
(c, l) => {
return Err(ArrowError::NotYetImplemented(format!(
"The conversion of (Int64, {:?}, {:?}) to arrow still not implemented",
Expand Down
5 changes: 4 additions & 1 deletion src/io/parquet/read/schema/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,10 @@ mod tests {
"bool",
"date",
"uint32",
"string_large"
"string_large",
"decimal_9",
"decimal_18",
"decimal_26"
]
);
Ok(())
Expand Down
105 changes: 105 additions & 0 deletions src/io/parquet/read/statistics/fixlen.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
use std::convert::{TryFrom, TryInto};

use super::super::schema;
use super::primitive::PrimitiveStatistics;
use crate::datatypes::DataType;
use crate::error::{ArrowError, Result};
use parquet2::schema::types::ParquetType;
use parquet2::{
schema::types::PhysicalType,
statistics::{
FixedLenStatistics as ParquetFixedLenStatistics, Statistics as ParquetStatistics,
},
};

use super::Statistics;

#[derive(Debug, Clone, PartialEq)]
pub struct FixedLenStatistics {
pub null_count: Option<i64>,
pub distinct_count: Option<i64>,
pub min_value: Option<Vec<u8>>,
pub max_value: Option<Vec<u8>>,
pub data_type: DataType,
}

impl Statistics for FixedLenStatistics {
fn data_type(&self) -> &DataType {
&self.data_type
}
}

impl From<&ParquetFixedLenStatistics> for FixedLenStatistics {
fn from(stats: &ParquetFixedLenStatistics) -> Self {
let byte_lens = match stats.physical_type() {
PhysicalType::FixedLenByteArray(size) => *size,
_ => unreachable!(),
};
Self {
null_count: stats.null_count,
distinct_count: stats.distinct_count,
min_value: stats.min_value.clone(),
max_value: stats.max_value.clone(),
data_type: DataType::FixedSizeBinary(byte_lens),
}
}
}

impl TryFrom<(&ParquetFixedLenStatistics, DataType)> for PrimitiveStatistics<i128> {
type Error = ArrowError;
fn try_from((stats, data_type): (&ParquetFixedLenStatistics, DataType)) -> Result<Self> {
let byte_lens = match stats.physical_type() {
PhysicalType::FixedLenByteArray(size) => *size,
_ => unreachable!(),
};
if byte_lens > 16 {
Err(ArrowError::Other(format!(
"Can't deserialize i128 from Fixed Len Byte array with lengtg {:?}",
byte_lens
)))
} else {
let paddings = (0..(16 - byte_lens)).map(|_| 0u8).collect::<Vec<_>>();
let max_value = stats.max_value.as_ref().and_then(|value| {
[paddings.as_slice(), value]
.concat()
.try_into()
.map(|v| i128::from_be_bytes(v))
.ok()
});

let min_value = stats.min_value.as_ref().and_then(|value| {
[paddings.as_slice(), value]
.concat()
.try_into()
.map(|v| i128::from_be_bytes(v))
.ok()
});
Ok(Self {
data_type,
null_count: stats.null_count,
distinct_count: stats.distinct_count,
max_value,
min_value,
})
}
}
}

pub(super) fn statistics_from_fix_len(
stats: &ParquetFixedLenStatistics,
type_: &ParquetType,
) -> Result<Box<dyn Statistics>> {
let data_type = schema::to_data_type(type_)?.unwrap();

use DataType::*;
Ok(match data_type {
Decimal(_, _) => Box::new(PrimitiveStatistics::<i128>::try_from((stats, data_type))?),
FixedSizeBinary(_) => Box::new(FixedLenStatistics::from(stats)),
other => {
return Err(ArrowError::NotYetImplemented(format!(
"Can't read {:?} from parquet",
other
)))
}
})
}
6 changes: 6 additions & 0 deletions src/io/parquet/read/statistics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ mod binary;
pub use binary::*;
mod boolean;
pub use boolean::*;
mod fixlen;
pub use fixlen::*;

/// Trait representing a deserialized parquet statistics into arrow.
pub trait Statistics: std::fmt::Debug {
Expand Down Expand Up @@ -70,6 +72,10 @@ pub fn deserialize_statistics(stats: &dyn ParquetStatistics) -> Result<Box<dyn S
DataType::Float64,
))))
}
PhysicalType::FixedLenByteArray(_) =>{
let stats = stats.as_any().downcast_ref().unwrap();
fixlen::statistics_from_fix_len(stats, stats.descriptor.type_())
}
_ => Err(ArrowError::NotYetImplemented(
"Reading Fixed-len array statistics is not yet supported".to_string(),
)),
Expand Down
2 changes: 2 additions & 0 deletions src/io/parquet/read/statistics/primitive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ pub(super) fn statistics_from_i32(
UInt32 => Box::new(PrimitiveStatistics::<u32>::from((stats, data_type))),
Int8 => Box::new(PrimitiveStatistics::<i8>::from((stats, data_type))),
Int16 => Box::new(PrimitiveStatistics::<i16>::from((stats, data_type))),
Decimal(_, _) => Box::new(PrimitiveStatistics::<i128>::from((stats, data_type))),
_ => Box::new(PrimitiveStatistics::<i32>::from((stats, data_type))),
})
}
Expand All @@ -69,6 +70,7 @@ pub(super) fn statistics_from_i64(
UInt64 => {
Box::new(PrimitiveStatistics::<u64>::from((stats, data_type))) as Box<dyn Statistics>
}
Decimal(_, _) => Box::new(PrimitiveStatistics::<i128>::from((stats, data_type))),
_ => Box::new(PrimitiveStatistics::<i64>::from((stats, data_type))),
})
}
43 changes: 39 additions & 4 deletions src/io/parquet/write/fixed_len_bytes.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use parquet2::{
compression::create_codec, encoding::Encoding, metadata::ColumnDescriptor,
page::CompressedDataPage, write::WriteOptions,
compression::create_codec,
encoding::Encoding,
metadata::ColumnDescriptor,
page::CompressedDataPage,
statistics::{serialize_statistics, deserialize_statistics, ParquetStatistics},
write::WriteOptions,
};

use super::utils;
use super::{binary::ord_binary, utils};
use crate::{
array::{Array, FixedSizeBinaryArray},
error::Result,
Expand Down Expand Up @@ -54,16 +58,47 @@ pub fn array_to_page(
buffer
};

let statistics = if options.write_statistics {
build_statistics(array, descriptor.clone())
} else {
None
};

utils::build_plain_page(
buffer,
array.len(),
array.null_count(),
uncompressed_page_size,
0,
definition_levels_byte_length,
None,
statistics,
descriptor,
options,
Encoding::Plain,
)
}

pub(super) fn build_statistics(
array: &FixedSizeBinaryArray,
descriptor: ColumnDescriptor,
) -> Option<ParquetStatistics> {
let pq_statistics = &ParquetStatistics {
max: None,
min: None,
null_count: Some(array.null_count() as i64),
distinct_count: None,
max_value: array
.iter()
.flatten()
.max_by(|x, y| ord_binary(x, y))
.map(|x| x.to_vec()),
min_value: array
.iter()
.flatten()
.min_by(|x, y| ord_binary(x, y))
.map(|x| x.to_vec()),
};
deserialize_statistics(pq_statistics,descriptor).map(
|e| serialize_statistics(&*e)
).ok()
}
Loading