Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Optimized write nulls to Avro (#1119)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Jun 28, 2022
1 parent bd85904 commit 29de18e
Showing 1 changed file with 39 additions and 13 deletions.
52 changes: 39 additions & 13 deletions src/io/avro/write/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ use crate::{array::*, datatypes::DataType};
use super::super::super::iterator::*;
use super::util;

// Zigzag representation of false and true respectively.
const IS_NULL: u8 = 0;
const IS_VALID: u8 = 2;

/// A type alias for a boxed [`StreamingIterator`], used to write arrays into avro rows
/// (i.e. a column -> row transposition of types known at run-time)
pub type BoxSerializer<'a> = Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync>;
Expand All @@ -27,10 +31,12 @@ fn utf8_optional<O: Offset>(array: &Utf8Array<O>) -> BoxSerializer {
Box::new(BufStreamingIterator::new(
array.iter(),
|x, buf| {
util::zigzag_encode(x.is_some() as i64, buf).unwrap();
if let Some(x) = x {
buf.push(IS_VALID);
util::zigzag_encode(x.len() as i64, buf).unwrap();
buf.extend_from_slice(x.as_bytes());
} else {
buf.push(IS_NULL);
}
},
vec![],
Expand All @@ -52,10 +58,12 @@ fn binary_optional<O: Offset>(array: &BinaryArray<O>) -> BoxSerializer {
Box::new(BufStreamingIterator::new(
array.iter(),
|x, buf| {
util::zigzag_encode(x.is_some() as i64, buf).unwrap();
if let Some(x) = x {
buf.push(IS_VALID);
util::zigzag_encode(x.len() as i64, buf).unwrap();
buf.extend_from_slice(x);
} else {
buf.push(IS_NULL);
}
},
vec![],
Expand All @@ -76,9 +84,11 @@ fn fixed_size_binary_optional(array: &FixedSizeBinaryArray) -> BoxSerializer {
Box::new(BufStreamingIterator::new(
array.iter(),
|x, buf| {
util::zigzag_encode(x.is_some() as i64, buf).unwrap();
if let Some(x) = x {
buf.push(IS_VALID);
buf.extend_from_slice(x);
} else {
buf.push(IS_NULL);
}
},
vec![],
Expand Down Expand Up @@ -121,8 +131,8 @@ fn list_optional<'a, O: Offset>(array: &'a ListArray<O>, schema: &AvroSchema) ->
Box::new(BufStreamingIterator::new(
lengths,
move |length, buf| {
util::zigzag_encode(length.is_some() as i64, buf).unwrap();
if let Some(length) = length {
buf.push(IS_VALID);
util::zigzag_encode(length, buf).unwrap();
let mut rows = 0;
while let Some(item) = inner.next() {
Expand All @@ -133,6 +143,8 @@ fn list_optional<'a, O: Offset>(array: &'a ListArray<O>, schema: &AvroSchema) ->
break;
}
}
} else {
buf.push(IS_NULL);
}
},
vec![],
Expand Down Expand Up @@ -173,12 +185,13 @@ fn struct_optional<'a>(array: &'a StructArray, schema: &Record) -> BoxSerializer
Box::new(BufStreamingIterator::new(
iterator,
move |maybe, buf| {
util::zigzag_encode(maybe.is_some() as i64, buf).unwrap();
if maybe.is_some() {
buf.push(IS_VALID);
inner
.iter_mut()
.for_each(|item| buf.extend_from_slice(item.next().unwrap()))
} else {
buf.push(IS_NULL);
// skip the item
inner.iter_mut().for_each(|item| {
let _ = item.next().unwrap();
Expand Down Expand Up @@ -215,9 +228,10 @@ pub fn new_serializer<'a>(array: &'a dyn Array, schema: &AvroSchema) -> BoxSeria
Box::new(BufStreamingIterator::new(
values.iter(),
|x, buf| {
util::zigzag_encode(x.is_some() as i64, buf).unwrap();
if let Some(x) = x {
buf.push(x as u8);
buf.extend_from_slice(&[IS_VALID, x as u8]);
} else {
buf.push(IS_NULL);
}
},
vec![],
Expand Down Expand Up @@ -262,9 +276,11 @@ pub fn new_serializer<'a>(array: &'a dyn Array, schema: &AvroSchema) -> BoxSeria
Box::new(BufStreamingIterator::new(
values.iter(),
|x, buf| {
util::zigzag_encode(x.is_some() as i64, buf).unwrap();
if let Some(x) = x {
buf.push(IS_VALID);
util::zigzag_encode(*x as i64, buf).unwrap();
} else {
buf.push(IS_NULL);
}
},
vec![],
Expand All @@ -291,9 +307,11 @@ pub fn new_serializer<'a>(array: &'a dyn Array, schema: &AvroSchema) -> BoxSeria
Box::new(BufStreamingIterator::new(
values.iter(),
|x, buf| {
util::zigzag_encode(x.is_some() as i64, buf).unwrap();
if let Some(x) = x {
buf.push(IS_VALID);
util::zigzag_encode(*x, buf).unwrap();
} else {
buf.push(IS_NULL);
}
},
vec![],
Expand All @@ -320,9 +338,11 @@ pub fn new_serializer<'a>(array: &'a dyn Array, schema: &AvroSchema) -> BoxSeria
Box::new(BufStreamingIterator::new(
values.iter(),
|x, buf| {
util::zigzag_encode(x.is_some() as i64, buf).unwrap();
if let Some(x) = x {
buf.push(IS_VALID);
buf.extend(x.to_le_bytes())
} else {
buf.push(IS_NULL);
}
},
vec![],
Expand All @@ -349,9 +369,11 @@ pub fn new_serializer<'a>(array: &'a dyn Array, schema: &AvroSchema) -> BoxSeria
Box::new(BufStreamingIterator::new(
values.iter(),
|x, buf| {
util::zigzag_encode(x.is_some() as i64, buf).unwrap();
if let Some(x) = x {
buf.push(IS_VALID);
buf.extend(x.to_le_bytes())
} else {
buf.push(IS_NULL);
}
},
vec![],
Expand Down Expand Up @@ -393,12 +415,14 @@ pub fn new_serializer<'a>(array: &'a dyn Array, schema: &AvroSchema) -> BoxSeria
Box::new(BufStreamingIterator::new(
values.iter(),
|x, buf| {
util::zigzag_encode(x.is_some() as i64, buf).unwrap();
if let Some(x) = x {
buf.push(IS_VALID);
let len =
((x.leading_zeros() / 8) - ((x.leading_zeros() / 8) % 2)) as usize;
util::zigzag_encode((16 - len) as i64, buf).unwrap();
buf.extend_from_slice(&x.to_be_bytes()[len..]);
} else {
buf.push(IS_NULL);
}
},
vec![],
Expand All @@ -423,9 +447,11 @@ pub fn new_serializer<'a>(array: &'a dyn Array, schema: &AvroSchema) -> BoxSeria
Box::new(BufStreamingIterator::new(
values.iter(),
|x, buf| {
util::zigzag_encode(x.is_some() as i64, buf).unwrap();
if let Some(x) = x {
buf.push(IS_VALID);
interval_write(x, buf)
} else {
buf.push(IS_NULL);
}
},
vec![],
Expand Down

0 comments on commit 29de18e

Please sign in to comment.