Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added support to write FixedSizeBinary to Avro (#1118)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Jun 28, 2022
1 parent 7b27f5e commit d0f0e20
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 28 deletions.
2 changes: 1 addition & 1 deletion src/array/fixed_size_binary/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ impl MutableArray for MutableFixedSizeBinaryArray {
}

fn push_null(&mut self) {
self.values.resize(self.values.len() + self.size, 0);
self.push::<&[u8]>(None);
}

fn shrink_to_fit(&mut self) {
Expand Down
34 changes: 34 additions & 0 deletions src/io/avro/write/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,29 @@ fn binary_optional<O: Offset>(array: &BinaryArray<O>) -> BoxSerializer {
))
}

fn fixed_size_binary_required(array: &FixedSizeBinaryArray) -> BoxSerializer {
Box::new(BufStreamingIterator::new(
array.values_iter(),
|x, buf| {
buf.extend_from_slice(x);
},
vec![],
))
}

fn fixed_size_binary_optional(array: &FixedSizeBinaryArray) -> BoxSerializer {
Box::new(BufStreamingIterator::new(
array.iter(),
|x, buf| {
util::zigzag_encode(x.is_some() as i64, buf).unwrap();
if let Some(x) = x {
buf.extend_from_slice(x);
}
},
vec![],
))
}

fn list_required<'a, O: Offset>(array: &'a ListArray<O>, schema: &AvroSchema) -> BoxSerializer<'a> {
let mut inner = new_serializer(array.values().as_ref(), schema);
let lengths = array
Expand Down Expand Up @@ -218,12 +241,18 @@ pub fn new_serializer<'a>(array: &'a dyn Array, schema: &AvroSchema) -> BoxSeria
(PhysicalType::LargeBinary, AvroSchema::Union(_)) => {
binary_optional::<i64>(array.as_any().downcast_ref().unwrap())
}
(PhysicalType::FixedSizeBinary, AvroSchema::Union(_)) => {
fixed_size_binary_optional(array.as_any().downcast_ref().unwrap())
}
(PhysicalType::Binary, AvroSchema::Bytes(_)) => {
binary_required::<i32>(array.as_any().downcast_ref().unwrap())
}
(PhysicalType::LargeBinary, AvroSchema::Bytes(_)) => {
binary_required::<i64>(array.as_any().downcast_ref().unwrap())
}
(PhysicalType::FixedSizeBinary, AvroSchema::Fixed(_)) => {
fixed_size_binary_required(array.as_any().downcast_ref().unwrap())
}

(PhysicalType::Primitive(PrimitiveType::Int32), AvroSchema::Union(_)) => {
let values = array
Expand Down Expand Up @@ -446,6 +475,7 @@ pub fn can_serialize(data_type: &DataType) -> bool {
match data_type.to_logical_type() {
List(inner) => return can_serialize(&inner.data_type),
LargeList(inner) => return can_serialize(&inner.data_type),
Struct(inner) => return inner.iter().all(|inner| can_serialize(&inner.data_type)),
_ => {}
};

Expand All @@ -454,8 +484,12 @@ pub fn can_serialize(data_type: &DataType) -> bool {
Boolean
| Int32
| Int64
| Float32
| Float64
| Decimal(_, _)
| Utf8
| Binary
| FixedSizeBinary(_)
| LargeUtf8
| LargeBinary
| Interval(IntervalUnit::MonthDayNano)
Expand Down
1 change: 0 additions & 1 deletion src/io/parquet/read/deserialize/binary/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
}
}
State::OptionalDictionary(page_validity, page_values) => {
println!("optional_dict");
let dict_values = page_values.dict.values();
let dict_offsets = page_values.dict.offsets();

Expand Down
58 changes: 32 additions & 26 deletions tests/it/io/avro/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,45 +236,51 @@ fn read_without_codec() -> Result<()> {
test(Codec::Null)
}

#[cfg(feature = "io_avro_compression")]
#[test]
fn read_deflate() -> Result<()> {
test(Codec::Deflate)
}

#[cfg(feature = "io_avro_compression")]
#[test]
fn read_snappy() -> Result<()> {
test(Codec::Snappy)
}

fn test_projected(projection: Vec<bool>) -> Result<()> {
let avro = write_avro(Codec::Null).unwrap();
#[test]
fn test_projected() -> Result<()> {
let expected = data();
let expected = expected
.into_arrays()
.into_iter()
.zip(projection.iter())
.filter_map(|x| if *x.1 { Some(x.0) } else { None })
.collect();
let expected = Chunk::new(expected);
let (_, expected_schema) = schema();
let expected_fields = expected_schema
.fields
.into_iter()
.zip(projection.iter())
.filter_map(|x| if *x.1 { Some(x.0) } else { None })
.collect::<Vec<_>>();
let expected_schema = Schema::from(expected_fields);

let (result, schema) = read_avro(&avro, Some(projection))?;
let avro = write_avro(Codec::Null).unwrap();

assert_eq!(schema, expected_schema);
assert_eq!(result, expected);
Ok(())
}
for i in 0..expected_schema.fields.len() {
let mut projection = vec![false; expected_schema.fields.len()];
projection[i] = true;

#[test]
fn read_projected() -> Result<()> {
test_projected(vec![
true, false, false, false, false, false, false, false, false, false, false, false,
])
let expected = expected
.clone()
.into_arrays()
.into_iter()
.zip(projection.iter())
.filter_map(|x| if *x.1 { Some(x.0) } else { None })
.collect();
let expected = Chunk::new(expected);

let expected_fields = expected_schema
.clone()
.fields
.into_iter()
.zip(projection.iter())
.filter_map(|x| if *x.1 { Some(x.0) } else { None })
.collect::<Vec<_>>();
let expected_schema = Schema::from(expected_fields);

let (result, schema) = read_avro(&avro, Some(projection))?;

assert_eq!(schema, expected_schema);
assert_eq!(result, expected);
}
Ok(())
}
6 changes: 6 additions & 0 deletions tests/it/io/avro/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ pub(super) fn schema() -> Schema {
Field::new("date nullable", DataType::Date32, true),
Field::new("binary", DataType::Binary, false),
Field::new("binary nullable", DataType::Binary, true),
Field::new("fs binary", DataType::FixedSizeBinary(3), false),
Field::new("fs binary nullable", DataType::FixedSizeBinary(3), true),
Field::new("float32", DataType::Float32, false),
Field::new("float32 nullable", DataType::Float32, true),
Field::new("float64", DataType::Float64, false),
Expand Down Expand Up @@ -63,6 +65,8 @@ pub(super) fn data() -> Chunk<Box<dyn Array>> {
Box::new(Int32Array::from([Some(1), None]).to(DataType::Date32)),
Box::new(BinaryArray::<i32>::from_slice([b"foo", b"bar"])),
Box::new(BinaryArray::<i32>::from([Some(b"foo"), None])),
Box::new(FixedSizeBinaryArray::from_slice([[1, 2, 3], [1, 2, 3]])),
Box::new(FixedSizeBinaryArray::from([Some([1, 2, 3]), None])),
Box::new(PrimitiveArray::<f32>::from_slice([1.0, 2.0])),
Box::new(PrimitiveArray::<f32>::from([Some(1.0), None])),
Box::new(PrimitiveArray::<f64>::from_slice([1.0, 2.0])),
Expand Down Expand Up @@ -162,11 +166,13 @@ fn no_compression() -> Result<()> {
roundtrip(None)
}

#[cfg(feature = "io_avro_compression")]
#[test]
fn snappy() -> Result<()> {
roundtrip(Some(write::Compression::Snappy))
}

#[cfg(feature = "io_avro_compression")]
#[test]
fn deflate() -> Result<()> {
roundtrip(Some(write::Compression::Deflate))
Expand Down

0 comments on commit d0f0e20

Please sign in to comment.