Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Fix serialization
Browse files Browse the repository at this point in the history
  • Loading branch information
thanhtm1 committed Feb 15, 2022
1 parent e8f8c4d commit c617e56
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 10 deletions.
7 changes: 3 additions & 4 deletions src/io/avro/read/deserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,11 +253,10 @@ fn deserialize_value<'a>(
"Avro decimal bytes return more than 16 bytes".to_string(),
));
}
let value = &block[..len];
block = &block[len..];
let mut bytes = [0u8; 16];
bytes[..len].copy_from_slice(value);
let data = u128::from_be_bytes(bytes) >> (8 * (16 - len));
bytes[..len].copy_from_slice(&block[..len]);
block = &block[len..];
let data = i128::from_be_bytes(bytes) >> (8 * (16 - len));
let array = array
.as_mut_any()
.downcast_mut::<MutablePrimitiveArray<i128>>()
Expand Down
4 changes: 2 additions & 2 deletions src/io/avro/write/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ pub fn new_serializer<'a>(array: &'a dyn Array, schema: &AvroSchema) -> BoxSeria
Box::new(BufStreamingIterator::new(
values.values().iter(),
|x, buf| {
let len = (x.leading_zeros() / 8) as usize;
let len = ((x.leading_zeros() / 8) - ((x.leading_zeros() / 8) % 2)) as usize;
util::zigzag_encode((16 - len) as i64, buf).unwrap();
buf.extend_from_slice(&x.to_be_bytes()[len..]);
},
Expand All @@ -261,7 +261,7 @@ pub fn new_serializer<'a>(array: &'a dyn Array, schema: &AvroSchema) -> BoxSeria
|x, buf| {
util::zigzag_encode(x.is_some() as i64, buf).unwrap();
if let Some(x) = x {
let len = (x.leading_zeros() / 8) as usize;
let len = ((x.leading_zeros() / 8) - ((x.leading_zeros() / 8) % 2)) as usize;
util::zigzag_encode((16 - len) as i64, buf).unwrap();
buf.extend_from_slice(&x.to_be_bytes()[len..]);
}
Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/read/deserialize/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ pub fn page_iter_to_arrays<'a, I: 'a + DataPages>(
// values.
let mut bytes = [0u8; 16];
bytes[..n].copy_from_slice(value);
(u128::from_be_bytes(bytes) >> (8 * (16 - n))) as i128
i128::from_be_bytes(bytes) >> (8 * (16 - n))
})
.collect::<Vec<_>>();
let validity = array.validity().cloned();
Expand Down
8 changes: 5 additions & 3 deletions tests/it/io/avro/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ pub(super) fn data() -> Chunk<Arc<dyn Array>> {
Arc::new(Utf8Array::<i32>::from_slice(["SPADES", "HEARTS"])),
)),
Arc::new(
PrimitiveArray::<i128>::from_slice([12345678i128, 23456781i128])
PrimitiveArray::<i128>::from_slice([12345678i128, -12345678i128])
.to(DataType::Decimal(18, 5)),
),
];
Expand Down Expand Up @@ -150,7 +150,7 @@ pub(super) fn write_avro(codec: Codec) -> std::result::Result<Vec<u8>, avro_rs::
record.put("enum", Value::Enum(1, "HEARTS".to_string()));
record.put(
"decimal",
Value::Decimal(Decimal::from(&[188u8, 97u8, 78u8])),
Value::Decimal(Decimal::from(&[0u8, 188u8, 97u8, 78u8])),
);
record.put(
"duration",
Expand Down Expand Up @@ -182,7 +182,9 @@ pub(super) fn write_avro(codec: Codec) -> std::result::Result<Vec<u8>, avro_rs::
record.put("enum", Value::Enum(0, "SPADES".to_string()));
record.put(
"decimal",
Value::Decimal(Decimal::from(&[1u8, 101u8, 236u8, 13u8])),
Value::Decimal(Decimal::from(&[
255u8, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 67, 158, 178,
])),
);
writer.append(record)?;
Ok(writer.into_inner().unwrap())
Expand Down

0 comments on commit c617e56

Please sign in to comment.