Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added support to write to Avro (#690)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Dec 22, 2021
1 parent 1ec0827 commit 45e72e9
Show file tree
Hide file tree
Showing 23 changed files with 697 additions and 64 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ io_parquet_compression = [
"parquet2/lz4",
"parquet2/brotli",
]
io_avro = ["avro-schema", "fallible-streaming-iterator", "serde_json"]
io_avro = ["avro-schema", "streaming-iterator", "fallible-streaming-iterator", "serde_json"]
io_avro_compression = [
"libflate",
"snap",
Expand Down
65 changes: 65 additions & 0 deletions examples/avro_write.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use std::fs::File;

use arrow2::{
array::{Array, Int32Array},
datatypes::{Field, Schema},
error::Result,
io::avro::write,
};

fn write_avro<W: std::io::Write>(
file: &mut W,
arrays: &[&dyn Array],
schema: &Schema,
compression: Option<write::Compression>,
) -> Result<()> {
let avro_fields = write::to_avro_schema(schema)?;

let mut serializers = arrays
.iter()
.zip(avro_fields.iter())
.map(|(array, field)| write::new_serializer(*array, &field.schema))
.collect::<Vec<_>>();
let mut block = write::Block::new(arrays[0].len(), vec![]);

write::serialize(&mut serializers, &mut block)?;

let mut compressed_block = write::CompressedBlock::default();

if let Some(compression) = compression {
write::compress(&block, &mut compressed_block, compression)?;
} else {
compressed_block.number_of_rows = block.number_of_rows;
std::mem::swap(&mut compressed_block.data, &mut block.data);
}

write::write_metadata(file, avro_fields.clone(), compression)?;

write::write_block(file, &compressed_block)?;

Ok(())
}

fn main() -> Result<()> {
use std::env;
let args: Vec<String> = env::args().collect();

let path = &args[1];

let array = Int32Array::from(&[
Some(0),
Some(1),
Some(2),
Some(3),
Some(4),
Some(5),
Some(6),
]);
let field = Field::new("c1", array.data_type().clone(), true);
let schema = Schema::new(vec![field]);

let mut file = File::create(path)?;
write_avro(&mut file, &[(&array) as &dyn Array], &schema, None)?;

Ok(())
}
1 change: 1 addition & 0 deletions guide/src/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@
- [Read Arrow stream](./io/ipc_stream_read.md)
- [Write Arrow](./io/ipc_write.md)
- [Read Avro](./io/avro_read.md)
- [Write Avro](./io/avro_write.md)
8 changes: 8 additions & 0 deletions guide/src/io/avro_write.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Avro write

You can use this crate to write to Apache Avro.
Below is an example, which you can run when this crate is compiled with feature `io_avro`.

```rust
{{#include ../../../examples/avro_write.rs}}
```
11 changes: 10 additions & 1 deletion src/io/avro/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,23 @@ pub mod read;
#[cfg(feature = "io_avro_async")]
#[cfg_attr(docsrs, doc(cfg(feature = "io_avro_async")))]
pub mod read_async;
pub mod write;

/// Valid compressions
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub enum Compression {
/// Deflate
Deflate,
/// Snappy
Snappy,
}

// macros that can operate in sync and async code.
macro_rules! avro_decode {
($reader:ident $($_await:tt)*) => {
{
let mut i = 0u64;
let mut buf = [0u8; 1];

let mut j = 0;
loop {
if j > 9 {
Expand Down
2 changes: 1 addition & 1 deletion src/io/avro/read/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ fn read_size<R: Read>(reader: &mut R) -> Result<(usize, usize)> {
Ok((rows as usize, bytes as usize))
}

/// Reads a block from the file into `buf`.
/// Reads a block from the `reader` into `buf`.
/// # Panic
/// Panics iff the block marker does not equal to the file's marker
fn read_block<R: Read>(reader: &mut R, buf: &mut Vec<u8>, file_marker: [u8; 16]) -> Result<usize> {
Expand Down
11 changes: 2 additions & 9 deletions src/io/avro/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,15 @@ use crate::datatypes::Schema;
use crate::error::Result;
use crate::record_batch::RecordBatch;

/// Valid compressions
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub enum Compression {
/// Deflate
Deflate,
/// Snappy
Snappy,
}
use super::Compression;

/// Reads the avro metadata from `reader` into a [`Schema`], [`Compression`] and magic marker.
#[allow(clippy::type_complexity)]
pub fn read_metadata<R: std::io::Read>(
reader: &mut R,
) -> Result<(Vec<AvroSchema>, Schema, Option<Compression>, [u8; 16])> {
let (avro_schema, codec, marker) = util::read_schema(reader)?;
let schema = schema::convert_schema(&avro_schema)?;
let schema = convert_schema(&avro_schema)?;

let avro_schema = if let AvroSchema::Record(Record { fields, .. }) = avro_schema {
fields.into_iter().map(|x| x.schema).collect()
Expand Down
22 changes: 13 additions & 9 deletions src/io/avro/read/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ fn external_props(schema: &AvroSchema) -> BTreeMap<String, String> {
props
}

/// Maps an Avro Schema into a [`Schema`].
pub fn convert_schema(schema: &AvroSchema) -> Result<Schema> {
let mut schema_fields = vec![];
match schema {
Expand All @@ -65,22 +66,25 @@ pub fn convert_schema(schema: &AvroSchema) -> Result<Schema> {
&field.schema,
Some(&field.name),
false,
Some(&external_props(&field.schema)),
Some(external_props(&field.schema)),
)?)
}
}
schema => schema_fields.push(schema_to_field(schema, Some(""), false, None)?),
}

let schema = Schema::new(schema_fields);
Ok(schema)
other => {
return Err(ArrowError::OutOfSpec(format!(
"An avro Schema must be of type Record - it is of type {:?}",
other
)))
}
};
Ok(Schema::new(schema_fields))
}

fn schema_to_field(
schema: &AvroSchema,
name: Option<&str>,
mut nullable: bool,
props: Option<&BTreeMap<String, String>>,
props: Option<BTreeMap<String, String>>,
) -> Result<Field> {
let data_type = match schema {
AvroSchema::Null => DataType::Null,
Expand Down Expand Up @@ -169,7 +173,7 @@ fn schema_to_field(
&field.schema,
Some(&format!("{}.{}", name, field.name)),
false,
Some(&props),
Some(props),
)
})
.collect();
Expand Down Expand Up @@ -198,6 +202,6 @@ fn schema_to_field(
let name = name.unwrap_or_default();

let mut field = Field::new(name, data_type, nullable);
field.set_metadata(props.cloned());
field.set_metadata(props);
Ok(field)
}
2 changes: 1 addition & 1 deletion src/io/avro/read_async/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::error::{ArrowError, Result};

use super::super::read::convert_schema;
use super::super::read::deserialize_header;
use super::super::read::Compression;
use super::super::Compression;
use super::super::{read_header, read_metadata};
use super::utils::zigzag_i64;

Expand Down
69 changes: 69 additions & 0 deletions src/io/avro/write/block.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use std::io::Write;

use crate::{error::Result, io::avro::Compression};

use super::{util::zigzag_encode, SYNC_NUMBER};

/// A compressed Avro block.
#[derive(Debug, Clone, Default, PartialEq)]
pub struct CompressedBlock {
/// The number of rows
pub number_of_rows: usize,
/// The compressed data
pub data: Vec<u8>,
}

impl CompressedBlock {
/// Creates a new CompressedBlock
pub fn new(number_of_rows: usize, data: Vec<u8>) -> Self {
Self {
number_of_rows,
data,
}
}
}

/// An uncompressed Avro block.
#[derive(Debug, Clone, Default, PartialEq)]
pub struct Block {
/// The number of rows
pub number_of_rows: usize,
/// The uncompressed data
pub data: Vec<u8>,
}

impl Block {
/// Creates a new Block
pub fn new(number_of_rows: usize, data: Vec<u8>) -> Self {
Self {
number_of_rows,
data,
}
}
}

/// Writes a [`CompressedBlock`] to `writer`
pub fn write_block<W: Write>(writer: &mut W, compressed_block: &CompressedBlock) -> Result<()> {
// write size and rows
zigzag_encode(compressed_block.number_of_rows as i64, writer)?;
zigzag_encode(compressed_block.data.len() as i64, writer)?;

writer.write_all(&compressed_block.data)?;

writer.write_all(&SYNC_NUMBER)?;

Ok(())
}

/// Compresses an [`Block`] to a [`CompressedBlock`].
pub fn compress(
block: &Block,
compressed_block: &mut CompressedBlock,
compression: Compression,
) -> Result<()> {
compressed_block.number_of_rows = block.number_of_rows;
match compression {
Compression::Deflate => todo!(),
Compression::Snappy => todo!(),
}
}
30 changes: 30 additions & 0 deletions src/io/avro/write/header.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use std::collections::HashMap;

use avro_schema::Schema;
use serde_json;

use crate::error::{ArrowError, Result};

use super::Compression;

/// Serializes an [`Schema`] and optional [`Compression`] into an avro header.
pub(crate) fn serialize_header(
schema: &Schema,
compression: Option<Compression>,
) -> Result<HashMap<String, Vec<u8>>> {
let schema =
serde_json::to_string(schema).map_err(|e| ArrowError::ExternalFormat(e.to_string()))?;

let mut header = HashMap::<String, Vec<u8>>::default();

header.insert("avro.schema".to_string(), schema.into_bytes());
if let Some(compression) = compression {
let value = match compression {
Compression::Snappy => b"snappy".to_vec(),
Compression::Deflate => b"deflate".to_vec(),
};
header.insert("avro.codec".to_string(), value);
};

Ok(header)
}
69 changes: 69 additions & 0 deletions src/io/avro/write/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
//! APIs to write to Avro format.
use std::io::Write;

use avro_schema::{Field as AvroField, Record, Schema as AvroSchema};

use crate::error::Result;

pub use super::Compression;

mod header;
use header::serialize_header;
mod schema;
pub use schema::to_avro_schema;
mod serialize;
pub use serialize::{can_serialize, new_serializer, BoxSerializer};
mod block;
pub use block::*;
mod util;

const SYNC_NUMBER: [u8; 16] = [1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4];

/// Writes Avro's metadata to `writer`.
pub fn write_metadata<W: std::io::Write>(
writer: &mut W,
fields: Vec<AvroField>,
compression: Option<Compression>,
) -> Result<()> {
// * Four bytes, ASCII 'O', 'b', 'j', followed by 1.
let avro_magic = [b'O', b'b', b'j', 1u8];
writer.write_all(&avro_magic)?;

// * file metadata, including the schema.
let schema = AvroSchema::Record(Record::new("", fields));
let header = serialize_header(&schema, compression)?;

util::zigzag_encode(header.len() as i64, writer)?;
for (name, item) in header {
util::write_binary(name.as_bytes(), writer)?;
util::write_binary(&item, writer)?;
}
writer.write_all(&[0])?;

// The 16-byte, randomly-generated sync marker for this file.
writer.write_all(&SYNC_NUMBER)?;

Ok(())
}

/// consumes a set of [`BoxSerializer`] into an [`Block`].
/// # Panics
/// Panics iff the number of items in any of the serializers is not equal to the number of rows
/// declared in the `block`.
pub fn serialize<'a>(serializers: &mut [BoxSerializer<'a>], block: &mut Block) -> Result<()> {
let Block {
data,
number_of_rows,
} = block;

data.clear(); // restart it

// _the_ transpose (columns -> rows)
for _ in 0..*number_of_rows {
for serializer in &mut *serializers {
let item_data = serializer.next().unwrap();
data.write_all(item_data)?;
}
}
Ok(())
}
Loading

0 comments on commit 45e72e9

Please sign in to comment.