diff --git a/src/io/avro/read/deserialize.rs b/src/io/avro/read/deserialize.rs index 37c733fc578..3abbb6f5239 100644 --- a/src/io/avro/read/deserialize.rs +++ b/src/io/avro/read/deserialize.rs @@ -228,6 +228,41 @@ fn deserialize_value<'a>( .unwrap(); array.push(Some(value)) } + PrimitiveType::Int128 => { + let avro_inner = match avro_field { + AvroSchema::Bytes(_) | AvroSchema::Fixed(_) => avro_field, + AvroSchema::Union(u) => match &u.as_slice() { + &[e, AvroSchema::Null] | &[AvroSchema::Null, e] => e, + _ => unreachable!(), + }, + _ => unreachable!(), + }; + let len = match avro_inner { + AvroSchema::Bytes(_) => { + util::zigzag_i64(&mut block)?.try_into().map_err(|_| { + ArrowError::ExternalFormat( + "Avro format contains a non-usize number of bytes".to_string(), + ) + })? + } + AvroSchema::Fixed(b) => b.size, + _ => unreachable!(), + }; + if len > 16 { + return Err(ArrowError::ExternalFormat( + "Avro decimal bytes return more than 16 bytes".to_string(), + )); + } + let mut bytes = [0u8; 16]; + bytes[..len].copy_from_slice(&block[..len]); + block = &block[len..]; + let data = i128::from_be_bytes(bytes) >> (8 * (16 - len)); + let array = array + .as_mut_any() + .downcast_mut::>() + .unwrap(); + array.push(Some(data as i128)) + } _ => unreachable!(), }, PhysicalType::Utf8 => { @@ -353,6 +388,28 @@ fn skip_item<'a>(field: &Field, avro_field: &AvroSchema, mut block: &'a [u8]) -> PrimitiveType::MonthDayNano => { block = &block[12..]; } + PrimitiveType::Int128 => { + let avro_inner = match avro_field { + AvroSchema::Bytes(_) | AvroSchema::Fixed(_) => avro_field, + AvroSchema::Union(u) => match &u.as_slice() { + &[e, AvroSchema::Null] | &[AvroSchema::Null, e] => e, + _ => unreachable!(), + }, + _ => unreachable!(), + }; + let len = match avro_inner { + AvroSchema::Bytes(_) => { + util::zigzag_i64(&mut block)?.try_into().map_err(|_| { + ArrowError::ExternalFormat( + "Avro format contains a non-usize number of bytes".to_string(), + ) + })? + } + AvroSchema::Fixed(b) => b.size, + _ => unreachable!(), + }; + block = &block[len..]; + } _ => unreachable!(), }, PhysicalType::Utf8 | PhysicalType::Binary => { diff --git a/src/io/avro/write/schema.rs b/src/io/avro/write/schema.rs index 950a87ad217..23e0949145a 100644 --- a/src/io/avro/write/schema.rs +++ b/src/io/avro/write/schema.rs @@ -1,5 +1,6 @@ use avro_schema::{ - Field as AvroField, Fixed, FixedLogical, IntLogical, LongLogical, Schema as AvroSchema, + BytesLogical, Field as AvroField, Fixed, FixedLogical, IntLogical, LongLogical, + Schema as AvroSchema, }; use crate::datatypes::*; @@ -54,6 +55,7 @@ fn _type_to_schema(data_type: &DataType) -> Result { AvroSchema::Fixed(fixed) } DataType::FixedSizeBinary(size) => AvroSchema::Fixed(Fixed::new("", *size)), + DataType::Decimal(p, s) => AvroSchema::Bytes(Some(BytesLogical::Decimal(*p, *s))), other => { return Err(ArrowError::NotYetImplemented(format!( "write {:?} to avro", diff --git a/src/io/avro/write/serialize.rs b/src/io/avro/write/serialize.rs index 02659a3018b..1cce28cc578 100644 --- a/src/io/avro/write/serialize.rs +++ b/src/io/avro/write/serialize.rs @@ -236,6 +236,40 @@ pub fn new_serializer<'a>(array: &'a dyn Array, schema: &AvroSchema) -> BoxSeria vec![], )) } + (PhysicalType::Primitive(PrimitiveType::Int128), AvroSchema::Bytes(_)) => { + let values = array + .as_any() + .downcast_ref::>() + .unwrap(); + Box::new(BufStreamingIterator::new( + values.values().iter(), + |x, buf| { + let len = ((x.leading_zeros() / 8) - ((x.leading_zeros() / 8) % 2)) as usize; + util::zigzag_encode((16 - len) as i64, buf).unwrap(); + buf.extend_from_slice(&x.to_be_bytes()[len..]); + }, + vec![], + )) + } + (PhysicalType::Primitive(PrimitiveType::Int128), AvroSchema::Union(_)) => { + let values = array + .as_any() + .downcast_ref::>() + .unwrap(); + Box::new(BufStreamingIterator::new( + values.iter(), + |x, buf| { + util::zigzag_encode(x.is_some() as i64, buf).unwrap(); + if let Some(x) = x { + let len = + ((x.leading_zeros() / 8) - ((x.leading_zeros() / 8) % 2)) as usize; + util::zigzag_encode((16 - len) as i64, buf).unwrap(); + buf.extend_from_slice(&x.to_be_bytes()[len..]); + } + }, + vec![], + )) + } (PhysicalType::Primitive(PrimitiveType::MonthDayNano), AvroSchema::Fixed(_)) => { let values = array .as_any() diff --git a/tests/it/io/avro/read.rs b/tests/it/io/avro/read.rs index 46897ecceee..5efd42518b6 100644 --- a/tests/it/io/avro/read.rs +++ b/tests/it/io/avro/read.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use arrow2::chunk::Chunk; use avro_rs::types::{Record, Value}; use avro_rs::{Codec, Writer}; -use avro_rs::{Days, Duration, Millis, Months, Schema as AvroSchema}; +use avro_rs::{Days, Decimal, Duration, Millis, Months, Schema as AvroSchema}; use arrow2::array::*; use arrow2::datatypes::*; @@ -47,7 +47,8 @@ pub(super) fn schema() -> (AvroSchema, Schema) { "type": "enum", "name": "", "symbols" : ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"] - }} + }}, + {"name": "decimal", "type": {"type": "bytes", "logicalType": "decimal", "precision": 18, "scale": 5}} ] } "#; @@ -76,6 +77,7 @@ pub(super) fn schema() -> (AvroSchema, Schema) { DataType::Dictionary(i32::KEY_TYPE, Box::new(DataType::Utf8), false), false, ), + Field::new("decimal", DataType::Decimal(18, 5), false), ]); (AvroSchema::parse_str(raw_schema).unwrap(), schema) @@ -109,6 +111,10 @@ pub(super) fn data() -> Chunk> { Int32Array::from_slice([1, 0]), Arc::new(Utf8Array::::from_slice(["SPADES", "HEARTS"])), )), + Arc::new( + PrimitiveArray::::from_slice([12345678i128, -12345678i128]) + .to(DataType::Decimal(18, 5)), + ), ]; Chunk::try_new(columns).unwrap() @@ -142,6 +148,10 @@ pub(super) fn write_avro(codec: Codec) -> std::result::Result, avro_rs:: Value::Record(vec![("e".to_string(), Value::Double(1.0f64))]), ); record.put("enum", Value::Enum(1, "HEARTS".to_string())); + record.put( + "decimal", + Value::Decimal(Decimal::from(&[0u8, 188u8, 97u8, 78u8])), + ); record.put( "duration", Value::Duration(Duration::new(Months::new(1), Days::new(1), Millis::new(1))), @@ -170,6 +180,12 @@ pub(super) fn write_avro(codec: Codec) -> std::result::Result, avro_rs:: Value::Record(vec![("e".to_string(), Value::Double(2.0f64))]), ); record.put("enum", Value::Enum(0, "SPADES".to_string())); + record.put( + "decimal", + Value::Decimal(Decimal::from(&[ + 255u8, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 67, 158, 178, + ])), + ); writer.append(record)?; Ok(writer.into_inner().unwrap()) } @@ -260,6 +276,6 @@ fn test_projected(projection: Vec) -> Result<()> { #[test] fn read_projected() -> Result<()> { test_projected(vec![ - true, false, false, false, false, false, false, false, false, false, false, + true, false, false, false, false, false, false, false, false, false, false, false, ]) }