Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Added support to write Avro async #736

Merged
merged 1 commit into from
Jan 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/io/avro/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ pub mod read;
#[cfg_attr(docsrs, doc(cfg(feature = "io_avro_async")))]
pub mod read_async;
pub mod write;
#[cfg(feature = "io_avro_async")]
#[cfg_attr(docsrs, doc(cfg(feature = "io_avro_async")))]
pub mod write_async;

/// Valid compressions
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
Expand Down
36 changes: 23 additions & 13 deletions src/io/avro/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,41 +6,35 @@ use crate::error::Result;
pub use super::Compression;

mod header;
use header::serialize_header;
pub(super) use header::serialize_header;
mod schema;
pub use schema::to_avro_schema;
mod serialize;
pub use serialize::{can_serialize, new_serializer, BoxSerializer};
mod block;
pub use block::*;
mod compress;
mod util;
pub(super) mod util;
pub use compress::compress;

pub use super::{Block, CompressedBlock};

const SYNC_NUMBER: [u8; 16] = [1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4];
pub(super) const SYNC_NUMBER: [u8; 16] = [1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4];
// * Four bytes, ASCII 'O', 'b', 'j', followed by 1.
pub(super) const AVRO_MAGIC: [u8; 4] = [b'O', b'b', b'j', 1u8];

/// Writes Avro's metadata to `writer`.
pub fn write_metadata<W: std::io::Write>(
writer: &mut W,
fields: Vec<AvroField>,
compression: Option<Compression>,
) -> Result<()> {
// * Four bytes, ASCII 'O', 'b', 'j', followed by 1.
let avro_magic = [b'O', b'b', b'j', 1u8];
writer.write_all(&avro_magic)?;
writer.write_all(&AVRO_MAGIC)?;

// * file metadata, including the schema.
let schema = AvroSchema::Record(Record::new("", fields));
let header = serialize_header(&schema, compression)?;

util::zigzag_encode(header.len() as i64, writer)?;
for (name, item) in header {
util::write_binary(name.as_bytes(), writer)?;
util::write_binary(&item, writer)?;
}
writer.write_all(&[0])?;
write_schema(writer, &schema, compression)?;

// The 16-byte, randomly-generated sync marker for this file.
writer.write_all(&SYNC_NUMBER)?;
Expand Down Expand Up @@ -68,3 +62,19 @@ pub fn serialize<'a>(serializers: &mut [BoxSerializer<'a>], block: &mut Block) {
}
}
}

pub(super) fn write_schema<W: std::io::Write>(
writer: &mut W,
schema: &AvroSchema,
compression: Option<Compression>,
) -> Result<()> {
let header = serialize_header(schema, compression)?;

util::zigzag_encode(header.len() as i64, writer)?;
for (name, item) in header {
util::write_binary(name.as_bytes(), writer)?;
util::write_binary(&item, writer)?;
}
writer.write_all(&[0])?;
Ok(())
}
26 changes: 26 additions & 0 deletions src/io/avro/write_async/block.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use futures::{AsyncWrite, AsyncWriteExt};

use crate::error::Result;

use super::super::write::{util::zigzag_encode, SYNC_NUMBER};
use super::super::CompressedBlock;

/// Writes a [`CompressedBlock`] to `writer`
pub async fn write_block<W>(writer: &mut W, compressed_block: &CompressedBlock) -> Result<()>
where
W: AsyncWrite + Unpin,
{
// write size and rows
let mut scratch = Vec::with_capacity(10);
zigzag_encode(compressed_block.number_of_rows as i64, &mut scratch)?;
writer.write_all(&scratch).await?;
scratch.clear();
zigzag_encode(compressed_block.data.len() as i64, &mut scratch)?;
writer.write_all(&scratch).await?;

writer.write_all(&compressed_block.data).await?;

writer.write_all(&SYNC_NUMBER).await?;

Ok(())
}
38 changes: 38 additions & 0 deletions src/io/avro/write_async/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
//! Async write Avro
mod block;
pub use block::write_block;

use avro_schema::{Field as AvroField, Record, Schema as AvroSchema};
use futures::{AsyncWrite, AsyncWriteExt};

use crate::error::Result;

use super::{
write::{write_schema, AVRO_MAGIC, SYNC_NUMBER},
Compression,
};

/// Writes Avro's metadata to `writer`.
pub async fn write_metadata<W>(
writer: &mut W,
fields: Vec<AvroField>,
compression: Option<Compression>,
) -> Result<()>
where
W: AsyncWrite + Unpin,
{
writer.write_all(&AVRO_MAGIC).await?;

// * file metadata, including the schema.
let schema = AvroSchema::Record(Record::new("", fields));

let mut scratch = vec![];
write_schema(&mut scratch, &schema, compression)?;

writer.write_all(&scratch).await?;

// The 16-byte, randomly-generated sync marker for this file.
writer.write_all(&SYNC_NUMBER).await?;

Ok(())
}
2 changes: 2 additions & 0 deletions tests/it/io/avro/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@ mod read;
#[cfg(feature = "io_avro_async")]
mod read_async;
mod write;
#[cfg(feature = "io_avro_async")]
mod write_async;
27 changes: 19 additions & 8 deletions tests/it/io/avro/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ use arrow2::array::*;
use arrow2::chunk::Chunk;
use arrow2::datatypes::*;
use arrow2::error::Result;
use arrow2::io::avro::write;
use arrow2::io::avro::{write, CompressedBlock};
use arrow2::types::months_days_ns;

fn schema() -> Schema {
use super::read::read_avro;

pub(super) fn schema() -> Schema {
Schema::from(vec![
Field::new("a", DataType::Int64, false),
Field::new("b", DataType::Utf8, false),
Expand All @@ -21,7 +23,7 @@ fn schema() -> Schema {
])
}

fn data() -> Chunk<Arc<dyn Array>> {
pub(super) fn data() -> Chunk<Arc<dyn Array>> {
let columns = vec![
Arc::new(Int64Array::from_slice([27, 47])) as Arc<dyn Array>,
Arc::new(Utf8Array::<i32>::from_slice(["foo", "bar"])) as Arc<dyn Array>,
Expand All @@ -40,13 +42,11 @@ fn data() -> Chunk<Arc<dyn Array>> {
Chunk::try_new(columns).unwrap()
}

use super::read::read_avro;

fn write_avro<R: AsRef<dyn Array>>(
pub(super) fn serialize_to_block<R: AsRef<dyn Array>>(
columns: &Chunk<R>,
schema: &Schema,
compression: Option<write::Compression>,
) -> Result<Vec<u8>> {
) -> Result<CompressedBlock> {
let avro_fields = write::to_avro_schema(schema)?;

let mut serializers = columns
Expand All @@ -64,9 +64,20 @@ fn write_avro<R: AsRef<dyn Array>>(

write::compress(&mut block, &mut compressed_block, compression)?;

Ok(compressed_block)
}

fn write_avro<R: AsRef<dyn Array>>(
columns: &Chunk<R>,
schema: &Schema,
compression: Option<write::Compression>,
) -> Result<Vec<u8>> {
let compressed_block = serialize_to_block(columns, schema, compression)?;

let avro_fields = write::to_avro_schema(schema)?;
let mut file = vec![];

write::write_metadata(&mut file, avro_fields.clone(), compression)?;
write::write_metadata(&mut file, avro_fields, compression)?;

write::write_block(&mut file, &compressed_block)?;

Expand Down
47 changes: 47 additions & 0 deletions tests/it/io/avro/write_async.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use arrow2::array::*;
use arrow2::chunk::Chunk;
use arrow2::datatypes::*;
use arrow2::error::Result;
use arrow2::io::avro::write;
use arrow2::io::avro::write_async;

use super::read::read_avro;
use super::write::{data, schema, serialize_to_block};

async fn write_avro<R: AsRef<dyn Array>>(
columns: &Chunk<R>,
schema: &Schema,
compression: Option<write::Compression>,
) -> Result<Vec<u8>> {
// usually done on a different thread pool
let compressed_block = serialize_to_block(columns, schema, compression)?;

let avro_fields = write::to_avro_schema(schema)?;
let mut file = vec![];

write_async::write_metadata(&mut file, avro_fields.clone(), compression).await?;

write_async::write_block(&mut file, &compressed_block).await?;

Ok(file)
}

async fn roundtrip(compression: Option<write::Compression>) -> Result<()> {
let expected = data();
let expected_schema = schema();

let data = write_avro(&expected, &expected_schema, compression).await?;

let (result, read_schema) = read_avro(&data)?;

assert_eq!(expected_schema, read_schema);
for (c1, c2) in result.columns().iter().zip(expected.columns().iter()) {
assert_eq!(c1.as_ref(), c2.as_ref());
}
Ok(())
}

#[tokio::test]
async fn no_compression() -> Result<()> {
roundtrip(None).await
}