Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Added support to read and write Decimal128 to Avro #837

Merged
merged 5 commits into from
Feb 15, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions src/io/avro/read/deserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,43 @@ fn deserialize_value<'a>(
.unwrap();
array.push(Some(value))
}
PrimitiveType::Int128 => {
let avro_inner = match avro_field {
AvroSchema::Bytes(_) | AvroSchema::Fixed(_) => avro_field,
AvroSchema::Union(u) => match &u.as_slice() {
&[e, AvroSchema::Null] | &[AvroSchema::Null, e] => e,
_ => unreachable!(),
},
_ => unreachable!(),
};
let len = match avro_inner {
AvroSchema::Bytes(_) => {
util::zigzag_i64(&mut block)?.try_into().map_err(|_| {
ArrowError::ExternalFormat(
"Avro format contains a non-usize number of bytes".to_string(),
)
})?
}
AvroSchema::Fixed(b) => b.size,
_ => unreachable!(),
};
if len > 16 {
return Err(ArrowError::ExternalFormat(
"Avro decimal bytes return more than 16 bytes".to_string(),
));
}
let value = &block[..len];
block = &block[len..];
let mut bytes = [0u8; 16];
bytes[..len].copy_from_slice(value);
let data = i128::from_be_bytes(bytes) >> (8 * (16 - len));
// println!("{:?}", data);
let array = array
.as_mut_any()
.downcast_mut::<MutablePrimitiveArray<i128>>()
.unwrap();
array.push(Some(data))
}
_ => unreachable!(),
},
PhysicalType::Utf8 => {
Expand Down
4 changes: 3 additions & 1 deletion src/io/avro/write/schema.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use avro_schema::{
Field as AvroField, Fixed, FixedLogical, IntLogical, LongLogical, Schema as AvroSchema,
BytesLogical, Field as AvroField, Fixed, FixedLogical, IntLogical, LongLogical,
Schema as AvroSchema,
};

use crate::datatypes::*;
Expand Down Expand Up @@ -54,6 +55,7 @@ fn _type_to_schema(data_type: &DataType) -> Result<AvroSchema> {
AvroSchema::Fixed(fixed)
}
DataType::FixedSizeBinary(size) => AvroSchema::Fixed(Fixed::new("", *size)),
DataType::Decimal(p, s) => AvroSchema::Bytes(Some(BytesLogical::Decimal(*p, *s))),
other => {
return Err(ArrowError::NotYetImplemented(format!(
"write {:?} to avro",
Expand Down
33 changes: 33 additions & 0 deletions src/io/avro/write/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,39 @@ pub fn new_serializer<'a>(array: &'a dyn Array, schema: &AvroSchema) -> BoxSeria
vec![],
))
}
(PhysicalType::Primitive(PrimitiveType::Int128), AvroSchema::Bytes(_)) => {
let values = array
.as_any()
.downcast_ref::<PrimitiveArray<i128>>()
.unwrap();
Box::new(BufStreamingIterator::new(
values.values().iter(),
|x, buf| {
let len = (x.leading_zeros() / 8) as usize;
util::zigzag_encode((16 - len) as i64, buf).unwrap();
buf.extend_from_slice(&x.to_be_bytes()[len..]);
},
vec![],
))
}
(PhysicalType::Primitive(PrimitiveType::Int128), AvroSchema::Union(_)) => {
let values = array
.as_any()
.downcast_ref::<PrimitiveArray<i128>>()
.unwrap();
Box::new(BufStreamingIterator::new(
values.iter(),
|x, buf| {
util::zigzag_encode(x.is_some() as i64, buf).unwrap();
if let Some(x) = x {
let len = (x.leading_zeros() / 8) as usize;
util::zigzag_encode((16 - len) as i64, buf).unwrap();
buf.extend_from_slice(&x.to_be_bytes()[len..]);
}
},
vec![],
))
}
(PhysicalType::Primitive(PrimitiveType::MonthDayNano), AvroSchema::Fixed(_)) => {
let values = array
.as_any()
Expand Down