From 3d8750ba526b17d9f6af167fe8fbb4f0380f2187 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Fri, 17 Dec 2021 16:46:50 +0000 Subject: [PATCH 1/5] Added Avro metadata write --- examples/avro_write.rs | 54 ++++++++++ src/io/avro/mod.rs | 11 +- src/io/avro/read/block.rs | 2 +- src/io/avro/read/mod.rs | 9 +- src/io/avro/read/schema.rs | 22 ++-- src/io/avro/read_async/metadata.rs | 2 +- src/io/avro/write/block.rs | 70 +++++++++++++ src/io/avro/write/header.rs | 30 ++++++ src/io/avro/write/mod.rs | 68 ++++++++++++ src/io/avro/write/schema.rs | 55 ++++++++++ src/io/avro/write/serialize.rs | 163 +++++++++++++++++++++++++++++ src/io/avro/write/util.rs | 26 +++++ src/io/csv/write/mod.rs | 3 +- src/io/csv/write/serialize.rs | 2 +- src/io/{csv/write => }/iterator.rs | 0 src/io/mod.rs | 3 + 16 files changed, 497 insertions(+), 23 deletions(-) create mode 100644 examples/avro_write.rs create mode 100644 src/io/avro/write/block.rs create mode 100644 src/io/avro/write/header.rs create mode 100644 src/io/avro/write/mod.rs create mode 100644 src/io/avro/write/schema.rs create mode 100644 src/io/avro/write/serialize.rs create mode 100644 src/io/avro/write/util.rs rename src/io/{csv/write => }/iterator.rs (100%) diff --git a/examples/avro_write.rs b/examples/avro_write.rs new file mode 100644 index 00000000000..c83e3d85b3d --- /dev/null +++ b/examples/avro_write.rs @@ -0,0 +1,54 @@ +use std::{fs::File, sync::Arc}; + +use arrow2::{ + array::{Array, Int32Array}, + datatypes::{Field, Schema}, + error::Result, + io::avro::write, + record_batch::RecordBatch, +}; + +fn main() -> Result<()> { + use std::env; + let args: Vec = env::args().collect(); + + let path = &args[1]; + + let array = Int32Array::from(&[ + Some(0), + Some(1), + Some(2), + Some(3), + Some(4), + Some(5), + Some(6), + ]); + let field = Field::new("c1", array.data_type().clone(), true); + let schema = Schema::new(vec![field]); + + let avro_schema = write::to_avro_schema(&schema)?; + + let mut file = File::create(path)?; + + let compression = None; + + write::write_metadata(&mut file, &avro_schema, compression)?; + + let serializer = write::new_serializer(&array, avro_schema.fields()[0]); + let mut block = write::Block::new(array.len(), vec![]); + + write::serialize(&mut vec![serializer], &mut block)?; + + let mut compressed_block = write::CompressedBlock::default(); + + if let Some(compression) = compression { + write::compress(&block, &mut compressed_block, compression)?; + } else { + compressed_block.number_of_rows = block.number_of_rows; + std::mem::swap(&mut compressed_block.data, &mut block.data); + } + + write::write_block(&mut file, &compressed_block)?; + + Ok(()) +} diff --git a/src/io/avro/mod.rs b/src/io/avro/mod.rs index d5a69ab3557..0dc4b8909e6 100644 --- a/src/io/avro/mod.rs +++ b/src/io/avro/mod.rs @@ -6,6 +6,16 @@ pub mod read; #[cfg(feature = "io_avro_async")] #[cfg_attr(docsrs, doc(cfg(feature = "io_avro_async")))] pub mod read_async; +pub mod write; + +/// Valid compressions +#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] +pub enum Compression { + /// Deflate + Deflate, + /// Snappy + Snappy, +} // macros that can operate in sync and async code. macro_rules! avro_decode { @@ -13,7 +23,6 @@ macro_rules! avro_decode { { let mut i = 0u64; let mut buf = [0u8; 1]; - let mut j = 0; loop { if j > 9 { diff --git a/src/io/avro/read/block.rs b/src/io/avro/read/block.rs index 69bc0e3ffb6..40f30ac3d06 100644 --- a/src/io/avro/read/block.rs +++ b/src/io/avro/read/block.rs @@ -24,7 +24,7 @@ fn read_size(reader: &mut R) -> Result<(usize, usize)> { Ok((rows as usize, bytes as usize)) } -/// Reads a block from the file into `buf`. +/// Reads a block from the `reader` into `buf`. /// # Panic /// Panics iff the block marker does not equal to the file's marker fn read_block(reader: &mut R, buf: &mut Vec, file_marker: [u8; 16]) -> Result { diff --git a/src/io/avro/read/mod.rs b/src/io/avro/read/mod.rs index fea56e8e030..6f33fa70a71 100644 --- a/src/io/avro/read/mod.rs +++ b/src/io/avro/read/mod.rs @@ -24,14 +24,7 @@ use crate::datatypes::Schema; use crate::error::Result; use crate::record_batch::RecordBatch; -/// Valid compressions -#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] -pub enum Compression { - /// Deflate - Deflate, - /// Snappy - Snappy, -} +use super::Compression; /// Reads the avro metadata from `reader` into a [`Schema`], [`Compression`] and magic marker. #[allow(clippy::type_complexity)] diff --git a/src/io/avro/read/schema.rs b/src/io/avro/read/schema.rs index 7ab286f244a..420187988ef 100644 --- a/src/io/avro/read/schema.rs +++ b/src/io/avro/read/schema.rs @@ -56,6 +56,7 @@ fn external_props(schema: &AvroSchema) -> BTreeMap { props } +/// Maps an Avro Schema into a [`Schema`]. pub fn convert_schema(schema: &AvroSchema) -> Result { let mut schema_fields = vec![]; match schema { @@ -65,22 +66,25 @@ pub fn convert_schema(schema: &AvroSchema) -> Result { &field.schema, Some(&field.name), false, - Some(&external_props(&field.schema)), + Some(external_props(&field.schema)), )?) } } - schema => schema_fields.push(schema_to_field(schema, Some(""), false, None)?), - } - - let schema = Schema::new(schema_fields); - Ok(schema) + other => { + return Err(ArrowError::OutOfSpec(format!( + "An avro Schema must be of type Record - it is of type {:?}", + other + ))) + } + }; + Ok(Schema::new(schema_fields)) } fn schema_to_field( schema: &AvroSchema, name: Option<&str>, mut nullable: bool, - props: Option<&BTreeMap>, + props: Option>, ) -> Result { let data_type = match schema { AvroSchema::Null => DataType::Null, @@ -169,7 +173,7 @@ fn schema_to_field( &field.schema, Some(&format!("{}.{}", name, field.name)), false, - Some(&props), + Some(props), ) }) .collect(); @@ -198,6 +202,6 @@ fn schema_to_field( let name = name.unwrap_or_default(); let mut field = Field::new(name, data_type, nullable); - field.set_metadata(props.cloned()); + field.set_metadata(props); Ok(field) } diff --git a/src/io/avro/read_async/metadata.rs b/src/io/avro/read_async/metadata.rs index 3040799aa5d..1fb3526d91e 100644 --- a/src/io/avro/read_async/metadata.rs +++ b/src/io/avro/read_async/metadata.rs @@ -10,7 +10,7 @@ use crate::error::{ArrowError, Result}; use super::super::read::convert_schema; use super::super::read::deserialize_header; -use super::super::read::Compression; +use super::super::Compression; use super::super::{read_header, read_metadata}; use super::utils::zigzag_i64; diff --git a/src/io/avro/write/block.rs b/src/io/avro/write/block.rs new file mode 100644 index 00000000000..9251ca24448 --- /dev/null +++ b/src/io/avro/write/block.rs @@ -0,0 +1,70 @@ +use std::io::Write; + +use crate::{error::Result, io::avro::Compression}; + +use super::{util::zigzag_encode, SYNC_NUMBER}; + +/// A compressed Avro block. +#[derive(Debug, Clone, Default, PartialEq)] +pub struct CompressedBlock { + /// The number of rows + pub number_of_rows: usize, + /// The compressed data + pub data: Vec, +} + +impl CompressedBlock { + /// Creates a new CompressedBlock + pub fn new(number_of_rows: usize, data: Vec) -> Self { + Self { + number_of_rows, + data, + } + } +} + +/// An uncompressed Avro block. +#[derive(Debug, Clone, Default, PartialEq)] +pub struct Block { + /// The number of rows + pub number_of_rows: usize, + /// The uncompressed data + pub data: Vec, +} + +impl Block { + /// Creates a new Block + pub fn new(number_of_rows: usize, data: Vec) -> Self { + Self { + number_of_rows, + data, + } + } +} + +/// Writes a [`CompressedBlock`] to `writer` +pub fn write_block(writer: &mut W, compressed_block: &CompressedBlock) -> Result<()> { + // write size and rows + zigzag_encode(compressed_block.number_of_rows as i64, writer)?; + zigzag_encode(compressed_block.data.len() as i64, writer)?; + + writer.write_all(&compressed_block.data)?; + + writer.write_all(&SYNC_NUMBER)?; + + Ok(()) +} + +/// Compresses an [`Block`] to a [`CompressedBlock`]. +pub fn compress( + block: &Block, + compressed_block: &mut CompressedBlock, + compression: Compression, +) -> Result<()> { + match compression { + Compression::Deflate => todo!(), + Compression::Snappy => todo!(), + } + compressed_block.number_of_rows = block.number_of_rows; + Ok(()) +} diff --git a/src/io/avro/write/header.rs b/src/io/avro/write/header.rs new file mode 100644 index 00000000000..8d21cc0ad63 --- /dev/null +++ b/src/io/avro/write/header.rs @@ -0,0 +1,30 @@ +use std::collections::HashMap; + +use avro_schema::Schema; +use serde_json; + +use crate::error::{ArrowError, Result}; + +use super::Compression; + +/// Serializes an [`Schema`] and optional [`Compression`] into an avro header. +pub(crate) fn serialize_header( + schema: &Schema, + compression: Option, +) -> Result>> { + let schema = + serde_json::to_string(schema).map_err(|e| ArrowError::ExternalFormat(e.to_string()))?; + + let mut header = HashMap::>::default(); + + header.insert("avro.schema".to_string(), schema.into_bytes()); + if let Some(compression) = compression { + let value = match compression { + Compression::Snappy => b"snappy".to_vec(), + Compression::Deflate => b"deflate".to_vec(), + }; + header.insert("avro.codec".to_string(), value); + }; + + Ok(header) +} diff --git a/src/io/avro/write/mod.rs b/src/io/avro/write/mod.rs new file mode 100644 index 00000000000..74ce4c82bc5 --- /dev/null +++ b/src/io/avro/write/mod.rs @@ -0,0 +1,68 @@ +//! APIs to write to Avro format. +use std::io::Write; + +use avro_schema::Schema as AvroSchema; + +use crate::error::Result; + +use super::Compression; + +mod header; +use header::serialize_header; +mod schema; +pub use schema::to_avro_schema; +mod serialize; +pub use serialize::{can_serialize, new_serializer, BoxSerializer}; +mod block; +pub use block::*; +mod util; + +const SYNC_NUMBER: [u8; 16] = [1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4]; + +/// Writes Avro's metadata to `writer`. +pub fn write_metadata( + writer: &mut W, + schema: &AvroSchema, + compression: Option, +) -> Result<()> { + // * Four bytes, ASCII 'O', 'b', 'j', followed by 1. + let avro_magic = [b'O', b'b', b'j', 1u8]; + writer.write_all(&avro_magic)?; + + // * file metadata, including the schema. + let header = serialize_header(schema, compression)?; + + util::zigzag_encode(header.len() as i64, writer)?; + for (name, item) in header { + util::write_binary(name.as_bytes(), writer)?; + util::write_binary(&item, writer)?; + } + writer.write_all(&[0])?; + + // The 16-byte, randomly-generated sync marker for this file. + writer.write_all(&SYNC_NUMBER)?; + + Ok(()) +} + +/// consumes a set of [`BoxSerializer`] into an [`Block`]. +/// # Panics +/// Panics iff the number of items in any of the serializers is not equal to the number of rows +/// declared in the `block`. +pub fn serialize<'a>(serializers: &mut [BoxSerializer<'a>], block: &mut Block) -> Result<()> { + let Block { + data, + number_of_rows, + } = block; + + data.clear(); // restart it + + // _the_ transpose (columns -> rows) + for _ in 0..*number_of_rows { + for serializer in &mut *serializers { + let item_data = serializer.next().unwrap(); + data.write_all(item_data)?; + } + } + Ok(()) +} diff --git a/src/io/avro/write/schema.rs b/src/io/avro/write/schema.rs new file mode 100644 index 00000000000..54e77401c28 --- /dev/null +++ b/src/io/avro/write/schema.rs @@ -0,0 +1,55 @@ +use avro_schema::{ + Field as AvroField, Fixed, FixedLogical, IntLogical, LongLogical, Record, Schema as AvroSchema, +}; + +use crate::datatypes::*; +use crate::error::{ArrowError, Result}; + +/// Converts a [`Schema`] to an avro [`AvroSchema::Record`] with it. +pub fn to_avro_schema(schema: &Schema) -> Result { + let fields = schema + .fields + .iter() + .map(|field| field_to_field(field)) + .collect::>>()?; + Ok(avro_schema::Schema::Record(Record::new("", fields))) +} + +fn field_to_field(field: &Field) -> Result { + let schema = type_to_schema(field.data_type())?; + Ok(AvroField::new(field.name(), schema)) +} + +fn type_to_schema(data_type: &DataType) -> Result { + Ok(match data_type.to_logical_type() { + DataType::Null => AvroSchema::Null, + DataType::Boolean => AvroSchema::Int(None), + DataType::Int64 => AvroSchema::Long(None), + DataType::Float32 => AvroSchema::Float, + DataType::Float64 => AvroSchema::Double, + DataType::Binary => AvroSchema::Bytes(None), + DataType::Utf8 => AvroSchema::String(None), + DataType::List(inner) => AvroSchema::Array(Box::new(type_to_schema(inner.data_type())?)), + DataType::Date32 => AvroSchema::Int(Some(IntLogical::Date)), + DataType::Time32(TimeUnit::Millisecond) => AvroSchema::Int(Some(IntLogical::Time)), + DataType::Time64(TimeUnit::Microsecond) => AvroSchema::Long(Some(LongLogical::Time)), + DataType::Timestamp(TimeUnit::Millisecond, None) => { + AvroSchema::Long(Some(LongLogical::LocalTimestampMillis)) + } + DataType::Timestamp(TimeUnit::Microsecond, None) => { + AvroSchema::Long(Some(LongLogical::LocalTimestampMicros)) + } + DataType::Interval(IntervalUnit::MonthDayNano) => { + let mut fixed = Fixed::new("", 12); + fixed.logical = Some(FixedLogical::Duration); + AvroSchema::Fixed(fixed) + } + DataType::FixedSizeBinary(size) => AvroSchema::Fixed(Fixed::new("", *size)), + other => { + return Err(ArrowError::NotYetImplemented(format!( + "write {:?} to avro", + other + ))) + } + }) +} diff --git a/src/io/avro/write/serialize.rs b/src/io/avro/write/serialize.rs new file mode 100644 index 00000000000..7e7b2b58ecb --- /dev/null +++ b/src/io/avro/write/serialize.rs @@ -0,0 +1,163 @@ +use avro_schema::Schema as AvroSchema; + +use crate::{array::*, datatypes::DataType}; + +use super::super::super::iterator::*; +use super::util; + +/// A type alias for a boxed [`StreamingIterator`], used to write arrays into avro rows +/// (i.e. a column -> row transposition of types known at run-time) +pub type BoxSerializer<'a> = Box + 'a + Send + Sync>; + +/// Creates a [`StreamingIterator`] trait object that presents items from `array` +/// encoded according to `schema`. +/// # Panic +/// This function panics iff the `data_type` is not supported (use [`can_serialize`] to check) +/// # Implementation +/// This function performs minimal CPU work: it dynamically dispatches based on the schema +/// and arrow type. +pub fn new_serializer<'a>(array: &'a dyn Array, schema: &AvroSchema) -> BoxSerializer<'a> { + let data_type = array.data_type().to_logical_type(); + + match (data_type, schema) { + (DataType::Boolean, AvroSchema::Boolean) => { + let values = array.as_any().downcast_ref::().unwrap(); + Box::new(BufStreamingIterator::new( + values.values_iter(), + |x, buf| { + buf.push(x as u8); + }, + vec![], + )) + } + (DataType::Boolean, AvroSchema::Union(_)) => { + let values = array.as_any().downcast_ref::().unwrap(); + Box::new(BufStreamingIterator::new( + values.iter(), + |x, buf| { + util::zigzag_encode(x.is_some() as i64, buf).unwrap(); + if let Some(x) = x { + buf.push(x as u8); + } + }, + vec![], + )) + } + (DataType::Utf8, AvroSchema::Union(_)) => { + let values = array.as_any().downcast_ref::>().unwrap(); + Box::new(BufStreamingIterator::new( + values.iter(), + |x, buf| { + util::zigzag_encode(x.is_some() as i64, buf).unwrap(); + if let Some(x) = x { + util::zigzag_encode(x.len() as i64, buf).unwrap(); + buf.extend_from_slice(x.as_bytes()); + } + }, + vec![], + )) + } + (DataType::Utf8, AvroSchema::String(_)) => { + let values = array.as_any().downcast_ref::>().unwrap(); + Box::new(BufStreamingIterator::new( + values.values_iter(), + |x, buf| { + util::zigzag_encode(x.len() as i64, buf).unwrap(); + buf.extend_from_slice(x.as_bytes()); + }, + vec![], + )) + } + (DataType::Binary, AvroSchema::Union(_)) => { + let values = array.as_any().downcast_ref::>().unwrap(); + Box::new(BufStreamingIterator::new( + values.iter(), + |x, buf| { + util::zigzag_encode(x.is_some() as i64, buf).unwrap(); + if let Some(x) = x { + util::zigzag_encode(x.len() as i64, buf).unwrap(); + buf.extend_from_slice(x); + } + }, + vec![], + )) + } + (DataType::Binary, AvroSchema::Bytes(_)) => { + let values = array.as_any().downcast_ref::>().unwrap(); + Box::new(BufStreamingIterator::new( + values.values_iter(), + |x, buf| { + util::zigzag_encode(x.len() as i64, buf).unwrap(); + buf.extend_from_slice(x); + }, + vec![], + )) + } + + (DataType::Int32, AvroSchema::Union(_)) => { + let values = array + .as_any() + .downcast_ref::>() + .unwrap(); + Box::new(BufStreamingIterator::new( + values.iter(), + |x, buf| { + util::zigzag_encode(x.is_some() as i64, buf).unwrap(); + if let Some(x) = x { + util::zigzag_encode(*x as i64, buf).unwrap(); + } + }, + vec![], + )) + } + (DataType::Int32, AvroSchema::Int(_)) => { + let values = array + .as_any() + .downcast_ref::>() + .unwrap(); + Box::new(BufStreamingIterator::new( + values.values().iter(), + |x, buf| { + util::zigzag_encode(*x as i64, buf).unwrap(); + }, + vec![], + )) + } + (DataType::Int64, AvroSchema::Union(_)) => { + let values = array + .as_any() + .downcast_ref::>() + .unwrap(); + Box::new(BufStreamingIterator::new( + values.iter(), + |x, buf| { + util::zigzag_encode(x.is_some() as i64, buf).unwrap(); + if let Some(x) = x { + util::zigzag_encode(*x, buf).unwrap(); + } + }, + vec![], + )) + } + (DataType::Int64, AvroSchema::Long(_)) => { + let values = array + .as_any() + .downcast_ref::>() + .unwrap(); + Box::new(BufStreamingIterator::new( + values.values().iter(), + |x, buf| { + util::zigzag_encode(*x, buf).unwrap(); + }, + vec![], + )) + } + _ => todo!(), + } +} + +/// Whether [`new_serializer`] supports `data_type`. +pub fn can_serialize(data_type: &DataType) -> bool { + use DataType::*; + matches!(data_type, Boolean | Int32 | Int64 | Utf8 | Binary) +} diff --git a/src/io/avro/write/util.rs b/src/io/avro/write/util.rs new file mode 100644 index 00000000000..52d13c026e9 --- /dev/null +++ b/src/io/avro/write/util.rs @@ -0,0 +1,26 @@ +use crate::error::Result; + +#[inline] +pub fn zigzag_encode(n: i64, writer: &mut W) -> Result<()> { + _zigzag_encode(((n << 1) ^ (n >> 63)) as u64, writer) +} + +#[inline] +fn _zigzag_encode(mut z: u64, writer: &mut W) -> Result<()> { + loop { + if z <= 0x7F { + writer.write_all(&[(z & 0x7F) as u8])?; + break; + } else { + writer.write_all(&[(0x80 | (z & 0x7F)) as u8])?; + z >>= 7; + } + } + Ok(()) +} + +pub(crate) fn write_binary(bytes: &[u8], writer: &mut W) -> Result<()> { + zigzag_encode(bytes.len() as i64, writer)?; + writer.write_all(bytes)?; + Ok(()) +} diff --git a/src/io/csv/write/mod.rs b/src/io/csv/write/mod.rs index e74bc150c16..5aa44c91d31 100644 --- a/src/io/csv/write/mod.rs +++ b/src/io/csv/write/mod.rs @@ -1,8 +1,7 @@ //! APIs to write to CSV -mod iterator; mod serialize; -use iterator::StreamingIterator; +use super::super::iterator::StreamingIterator; use std::io::Write; diff --git a/src/io/csv/write/serialize.rs b/src/io/csv/write/serialize.rs index 41086388010..6c37e0eb24f 100644 --- a/src/io/csv/write/serialize.rs +++ b/src/io/csv/write/serialize.rs @@ -10,7 +10,7 @@ use crate::{ error::Result, }; -use super::iterator::{BufStreamingIterator, StreamingIterator}; +use super::super::super::iterator::{BufStreamingIterator, StreamingIterator}; use crate::array::{DictionaryArray, DictionaryKey, Offset}; use std::any::Any; diff --git a/src/io/csv/write/iterator.rs b/src/io/iterator.rs similarity index 100% rename from src/io/csv/write/iterator.rs rename to src/io/iterator.rs diff --git a/src/io/mod.rs b/src/io/mod.rs index a8d68d66951..e406de9847c 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -42,3 +42,6 @@ pub mod avro; #[cfg(feature = "io_print")] #[cfg_attr(docsrs, doc(cfg(feature = "io_print")))] pub mod print; + +#[cfg(any(feature = "io_csv_write", feature = "io_avro"))] +mod iterator; From a9b7e26d1acc44ea43178aceca03ccd254af139f Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Mon, 20 Dec 2021 07:05:26 +0000 Subject: [PATCH 2/5] Finished first iteration of avro writer. --- Cargo.toml | 2 +- examples/avro_write.rs | 9 ++-- src/io/avro/read/mod.rs | 2 +- src/io/avro/write/mod.rs | 9 ++-- src/io/avro/write/schema.rs | 29 +++++++---- src/io/avro/write/serialize.rs | 49 ++++++++++++++---- tests/it/io/avro/mod.rs | 1 + tests/it/io/avro/read.rs | 78 +++++++++++++++------------- tests/it/io/avro/read_async.rs | 7 +-- tests/it/io/avro/write.rs | 95 ++++++++++++++++++++++++++++++++++ 10 files changed, 212 insertions(+), 69 deletions(-) create mode 100644 tests/it/io/avro/write.rs diff --git a/Cargo.toml b/Cargo.toml index cef7012b1b7..f85f01a0afd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -138,7 +138,7 @@ io_parquet_compression = [ "parquet2/lz4", "parquet2/brotli", ] -io_avro = ["avro-schema", "fallible-streaming-iterator", "serde_json"] +io_avro = ["avro-schema", "streaming-iterator", "fallible-streaming-iterator", "serde_json"] io_avro_compression = [ "libflate", "snap", diff --git a/examples/avro_write.rs b/examples/avro_write.rs index c83e3d85b3d..866ebe05f9c 100644 --- a/examples/avro_write.rs +++ b/examples/avro_write.rs @@ -5,7 +5,6 @@ use arrow2::{ datatypes::{Field, Schema}, error::Result, io::avro::write, - record_batch::RecordBatch, }; fn main() -> Result<()> { @@ -26,18 +25,18 @@ fn main() -> Result<()> { let field = Field::new("c1", array.data_type().clone(), true); let schema = Schema::new(vec![field]); - let avro_schema = write::to_avro_schema(&schema)?; + let avro_fields = write::to_avro_schema(&schema)?; let mut file = File::create(path)?; let compression = None; - write::write_metadata(&mut file, &avro_schema, compression)?; + write::write_metadata(&mut file, avro_fields.clone(), compression)?; - let serializer = write::new_serializer(&array, avro_schema.fields()[0]); + let serializer = write::new_serializer(&array, &avro_fields[0].schema); let mut block = write::Block::new(array.len(), vec![]); - write::serialize(&mut vec![serializer], &mut block)?; + write::serialize(&mut [serializer], &mut block)?; let mut compressed_block = write::CompressedBlock::default(); diff --git a/src/io/avro/read/mod.rs b/src/io/avro/read/mod.rs index 6f33fa70a71..1cb32186fa4 100644 --- a/src/io/avro/read/mod.rs +++ b/src/io/avro/read/mod.rs @@ -32,7 +32,7 @@ pub fn read_metadata( reader: &mut R, ) -> Result<(Vec, Schema, Option, [u8; 16])> { let (avro_schema, codec, marker) = util::read_schema(reader)?; - let schema = schema::convert_schema(&avro_schema)?; + let schema = convert_schema(&avro_schema)?; let avro_schema = if let AvroSchema::Record(Record { fields, .. }) = avro_schema { fields.into_iter().map(|x| x.schema).collect() diff --git a/src/io/avro/write/mod.rs b/src/io/avro/write/mod.rs index 74ce4c82bc5..3be4fc53103 100644 --- a/src/io/avro/write/mod.rs +++ b/src/io/avro/write/mod.rs @@ -1,11 +1,11 @@ //! APIs to write to Avro format. use std::io::Write; -use avro_schema::Schema as AvroSchema; +use avro_schema::{Field as AvroField, Record, Schema as AvroSchema}; use crate::error::Result; -use super::Compression; +pub use super::Compression; mod header; use header::serialize_header; @@ -22,7 +22,7 @@ const SYNC_NUMBER: [u8; 16] = [1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4]; /// Writes Avro's metadata to `writer`. pub fn write_metadata( writer: &mut W, - schema: &AvroSchema, + fields: Vec, compression: Option, ) -> Result<()> { // * Four bytes, ASCII 'O', 'b', 'j', followed by 1. @@ -30,7 +30,8 @@ pub fn write_metadata( writer.write_all(&avro_magic)?; // * file metadata, including the schema. - let header = serialize_header(schema, compression)?; + let schema = AvroSchema::Record(Record::new("", fields)); + let header = serialize_header(&schema, compression)?; util::zigzag_encode(header.len() as i64, writer)?; for (name, item) in header { diff --git a/src/io/avro/write/schema.rs b/src/io/avro/write/schema.rs index 54e77401c28..fdee06a1ede 100644 --- a/src/io/avro/write/schema.rs +++ b/src/io/avro/write/schema.rs @@ -1,35 +1,46 @@ use avro_schema::{ - Field as AvroField, Fixed, FixedLogical, IntLogical, LongLogical, Record, Schema as AvroSchema, + Field as AvroField, Fixed, FixedLogical, IntLogical, LongLogical, Schema as AvroSchema, }; use crate::datatypes::*; use crate::error::{ArrowError, Result}; /// Converts a [`Schema`] to an avro [`AvroSchema::Record`] with it. -pub fn to_avro_schema(schema: &Schema) -> Result { - let fields = schema +pub fn to_avro_schema(schema: &Schema) -> Result> { + schema .fields .iter() .map(|field| field_to_field(field)) - .collect::>>()?; - Ok(avro_schema::Schema::Record(Record::new("", fields))) + .collect() } fn field_to_field(field: &Field) -> Result { - let schema = type_to_schema(field.data_type())?; + let schema = type_to_schema(field.data_type(), field.is_nullable())?; Ok(AvroField::new(field.name(), schema)) } -fn type_to_schema(data_type: &DataType) -> Result { +fn type_to_schema(data_type: &DataType, is_nullable: bool) -> Result { + Ok(if is_nullable { + AvroSchema::Union(vec![AvroSchema::Null, _type_to_schema(data_type)?]) + } else { + _type_to_schema(data_type)? + }) +} + +fn _type_to_schema(data_type: &DataType) -> Result { Ok(match data_type.to_logical_type() { DataType::Null => AvroSchema::Null, - DataType::Boolean => AvroSchema::Int(None), + DataType::Boolean => AvroSchema::Boolean, + DataType::Int32 => AvroSchema::Int(None), DataType::Int64 => AvroSchema::Long(None), DataType::Float32 => AvroSchema::Float, DataType::Float64 => AvroSchema::Double, DataType::Binary => AvroSchema::Bytes(None), DataType::Utf8 => AvroSchema::String(None), - DataType::List(inner) => AvroSchema::Array(Box::new(type_to_schema(inner.data_type())?)), + DataType::List(inner) => AvroSchema::Array(Box::new(type_to_schema( + inner.data_type(), + inner.is_nullable(), + )?)), DataType::Date32 => AvroSchema::Int(Some(IntLogical::Date)), DataType::Time32(TimeUnit::Millisecond) => AvroSchema::Int(Some(IntLogical::Time)), DataType::Time64(TimeUnit::Microsecond) => AvroSchema::Long(Some(LongLogical::Time)), diff --git a/src/io/avro/write/serialize.rs b/src/io/avro/write/serialize.rs index 7e7b2b58ecb..5a4e7d52541 100644 --- a/src/io/avro/write/serialize.rs +++ b/src/io/avro/write/serialize.rs @@ -1,5 +1,6 @@ use avro_schema::Schema as AvroSchema; +use crate::datatypes::{PhysicalType, PrimitiveType}; use crate::{array::*, datatypes::DataType}; use super::super::super::iterator::*; @@ -17,10 +18,10 @@ pub type BoxSerializer<'a> = Box + 'a + Send /// This function performs minimal CPU work: it dynamically dispatches based on the schema /// and arrow type. pub fn new_serializer<'a>(array: &'a dyn Array, schema: &AvroSchema) -> BoxSerializer<'a> { - let data_type = array.data_type().to_logical_type(); + let data_type = array.data_type().to_physical_type(); match (data_type, schema) { - (DataType::Boolean, AvroSchema::Boolean) => { + (PhysicalType::Boolean, AvroSchema::Boolean) => { let values = array.as_any().downcast_ref::().unwrap(); Box::new(BufStreamingIterator::new( values.values_iter(), @@ -30,7 +31,7 @@ pub fn new_serializer<'a>(array: &'a dyn Array, schema: &AvroSchema) -> BoxSeria vec![], )) } - (DataType::Boolean, AvroSchema::Union(_)) => { + (PhysicalType::Boolean, AvroSchema::Union(_)) => { let values = array.as_any().downcast_ref::().unwrap(); Box::new(BufStreamingIterator::new( values.iter(), @@ -43,7 +44,7 @@ pub fn new_serializer<'a>(array: &'a dyn Array, schema: &AvroSchema) -> BoxSeria vec![], )) } - (DataType::Utf8, AvroSchema::Union(_)) => { + (PhysicalType::Utf8, AvroSchema::Union(_)) => { let values = array.as_any().downcast_ref::>().unwrap(); Box::new(BufStreamingIterator::new( values.iter(), @@ -57,7 +58,7 @@ pub fn new_serializer<'a>(array: &'a dyn Array, schema: &AvroSchema) -> BoxSeria vec![], )) } - (DataType::Utf8, AvroSchema::String(_)) => { + (PhysicalType::Utf8, AvroSchema::String(_)) => { let values = array.as_any().downcast_ref::>().unwrap(); Box::new(BufStreamingIterator::new( values.values_iter(), @@ -68,7 +69,7 @@ pub fn new_serializer<'a>(array: &'a dyn Array, schema: &AvroSchema) -> BoxSeria vec![], )) } - (DataType::Binary, AvroSchema::Union(_)) => { + (PhysicalType::Binary, AvroSchema::Union(_)) => { let values = array.as_any().downcast_ref::>().unwrap(); Box::new(BufStreamingIterator::new( values.iter(), @@ -82,7 +83,7 @@ pub fn new_serializer<'a>(array: &'a dyn Array, schema: &AvroSchema) -> BoxSeria vec![], )) } - (DataType::Binary, AvroSchema::Bytes(_)) => { + (PhysicalType::Binary, AvroSchema::Bytes(_)) => { let values = array.as_any().downcast_ref::>().unwrap(); Box::new(BufStreamingIterator::new( values.values_iter(), @@ -94,7 +95,7 @@ pub fn new_serializer<'a>(array: &'a dyn Array, schema: &AvroSchema) -> BoxSeria )) } - (DataType::Int32, AvroSchema::Union(_)) => { + (PhysicalType::Primitive(PrimitiveType::Int32), AvroSchema::Union(_)) => { let values = array .as_any() .downcast_ref::>() @@ -110,7 +111,7 @@ pub fn new_serializer<'a>(array: &'a dyn Array, schema: &AvroSchema) -> BoxSeria vec![], )) } - (DataType::Int32, AvroSchema::Int(_)) => { + (PhysicalType::Primitive(PrimitiveType::Int32), AvroSchema::Int(_)) => { let values = array .as_any() .downcast_ref::>() @@ -123,7 +124,7 @@ pub fn new_serializer<'a>(array: &'a dyn Array, schema: &AvroSchema) -> BoxSeria vec![], )) } - (DataType::Int64, AvroSchema::Union(_)) => { + (PhysicalType::Primitive(PrimitiveType::Int64), AvroSchema::Union(_)) => { let values = array .as_any() .downcast_ref::>() @@ -139,7 +140,7 @@ pub fn new_serializer<'a>(array: &'a dyn Array, schema: &AvroSchema) -> BoxSeria vec![], )) } - (DataType::Int64, AvroSchema::Long(_)) => { + (PhysicalType::Primitive(PrimitiveType::Int64), AvroSchema::Long(_)) => { let values = array .as_any() .downcast_ref::>() @@ -152,6 +153,32 @@ pub fn new_serializer<'a>(array: &'a dyn Array, schema: &AvroSchema) -> BoxSeria vec![], )) } + (PhysicalType::Primitive(PrimitiveType::Float32), AvroSchema::Float) => { + let values = array + .as_any() + .downcast_ref::>() + .unwrap(); + Box::new(BufStreamingIterator::new( + values.values().iter(), + |x, buf| { + buf.extend_from_slice(&x.to_le_bytes()); + }, + vec![], + )) + } + (PhysicalType::Primitive(PrimitiveType::Float64), AvroSchema::Double) => { + let values = array + .as_any() + .downcast_ref::>() + .unwrap(); + Box::new(BufStreamingIterator::new( + values.values().iter(), + |x, buf| { + buf.extend_from_slice(&x.to_le_bytes()); + }, + vec![], + )) + } _ => todo!(), } } diff --git a/tests/it/io/avro/mod.rs b/tests/it/io/avro/mod.rs index ee0459c04a4..29b3c6b003a 100644 --- a/tests/it/io/avro/mod.rs +++ b/tests/it/io/avro/mod.rs @@ -3,3 +3,4 @@ mod read; #[cfg(feature = "io_avro_async")] mod read_async; +mod write; diff --git a/tests/it/io/avro/read.rs b/tests/it/io/avro/read.rs index db6e501d851..ba58a6901fe 100644 --- a/tests/it/io/avro/read.rs +++ b/tests/it/io/avro/read.rs @@ -10,7 +10,7 @@ use arrow2::error::Result; use arrow2::io::avro::read; use arrow2::record_batch::RecordBatch; -fn schema() -> (AvroSchema, Schema) { +pub(super) fn schema() -> (AvroSchema, Schema) { let raw_schema = r#" { "type": "record", @@ -69,7 +69,35 @@ fn schema() -> (AvroSchema, Schema) { (AvroSchema::parse_str(raw_schema).unwrap(), schema) } -pub(super) fn write(codec: Codec) -> std::result::Result<(Vec, RecordBatch), avro_rs::Error> { +pub(super) fn data() -> RecordBatch { + let data = vec![ + Some(vec![Some(1i32), None, Some(3)]), + Some(vec![Some(1i32), None, Some(3)]), + ]; + + let mut array = MutableListArray::>::new(); + array.try_extend(data).unwrap(); + + let columns = vec![ + Arc::new(Int64Array::from_slice([27, 47])) as Arc, + Arc::new(Utf8Array::::from_slice(["foo", "bar"])) as Arc, + Arc::new(Int32Array::from_slice([1, 1])) as Arc, + Arc::new(Int32Array::from_slice([1, 2]).to(DataType::Date32)) as Arc, + Arc::new(BinaryArray::::from_slice([b"foo", b"bar"])) as Arc, + Arc::new(PrimitiveArray::::from_slice([1.0, 2.0])) as Arc, + Arc::new(BooleanArray::from_slice([true, false])) as Arc, + Arc::new(Utf8Array::::from([Some("foo"), None])) as Arc, + array.into_arc(), + Arc::new(DictionaryArray::::from_data( + Int32Array::from_slice([1, 0]), + Arc::new(Utf8Array::::from_slice(["SPADES", "HEARTS"])), + )) as Arc, + ]; + + RecordBatch::try_new(Arc::new(schema().1), columns).unwrap() +} + +pub(super) fn write_avro(codec: Codec) -> std::result::Result, avro_rs::Error> { let (avro, schema) = schema(); // a writer needs a schema and something to write to let mut writer = Writer::with_codec(&avro, Vec::new(), codec); @@ -118,40 +146,11 @@ pub(super) fn write(codec: Codec) -> std::result::Result<(Vec, RecordBatch), ); record.put("enum", Value::Enum(0, "SPADES".to_string())); writer.append(record)?; - - let data = vec![ - Some(vec![Some(1i32), None, Some(3)]), - Some(vec![Some(1i32), None, Some(3)]), - ]; - - let mut array = MutableListArray::>::new(); - array.try_extend(data).unwrap(); - - let columns = vec![ - Arc::new(Int64Array::from_slice([27, 47])) as Arc, - Arc::new(Utf8Array::::from_slice(["foo", "bar"])) as Arc, - Arc::new(Int32Array::from_slice([1, 1])) as Arc, - Arc::new(Int32Array::from_slice([1, 2]).to(DataType::Date32)) as Arc, - Arc::new(BinaryArray::::from_slice([b"foo", b"bar"])) as Arc, - Arc::new(PrimitiveArray::::from_slice([1.0, 2.0])) as Arc, - Arc::new(BooleanArray::from_slice([true, false])) as Arc, - Arc::new(Utf8Array::::from([Some("foo"), None])) as Arc, - array.into_arc(), - Arc::new(DictionaryArray::::from_data( - Int32Array::from_slice([1, 0]), - Arc::new(Utf8Array::::from_slice(["SPADES", "HEARTS"])), - )) as Arc, - ]; - - let expected = RecordBatch::try_new(Arc::new(schema), columns).unwrap(); - - Ok((writer.into_inner().unwrap(), expected)) + Ok(writer.into_inner().unwrap()) } -fn test(codec: Codec) -> Result<()> { - let (data, expected) = write(codec).unwrap(); - - let file = &mut &data[..]; +pub(super) fn read_avro(mut avro: &[u8]) -> Result { + let file = &mut avro; let (avro_schema, schema, codec, file_marker) = read::read_metadata(file)?; @@ -161,7 +160,16 @@ fn test(codec: Codec) -> Result<()> { Arc::new(schema), ); - assert_eq!(reader.next().unwrap().unwrap(), expected); + reader.next().unwrap() +} + +fn test(codec: Codec) -> Result<()> { + let avro = write_avro(codec).unwrap(); + let expected = data(); + + let result = read_avro(&avro)?; + + assert_eq!(result, expected); Ok(()) } diff --git a/tests/it/io/avro/read_async.rs b/tests/it/io/avro/read_async.rs index 3ace4f05ae5..7b6b019b45b 100644 --- a/tests/it/io/avro/read_async.rs +++ b/tests/it/io/avro/read_async.rs @@ -8,12 +8,13 @@ use futures::StreamExt; use arrow2::error::Result; use arrow2::io::avro::read_async::*; -use super::read::write; +use super::read::{data, write_avro}; async fn test(codec: Codec) -> Result<()> { - let (data, expected) = write(codec).unwrap(); + let avro_data = write_avro(codec).unwrap(); + let expected = data(); - let mut reader = &mut &data[..]; + let mut reader = &mut &avro_data[..]; let (_, schema, _, marker) = read_metadata(&mut reader).await?; let schema = Arc::new(schema); diff --git a/tests/it/io/avro/write.rs b/tests/it/io/avro/write.rs new file mode 100644 index 00000000000..b4e026c9c90 --- /dev/null +++ b/tests/it/io/avro/write.rs @@ -0,0 +1,95 @@ +use std::sync::Arc; + +use arrow2::array::*; +use arrow2::datatypes::*; +use arrow2::error::Result; +use arrow2::io::avro::write; +use arrow2::record_batch::RecordBatch; + +fn schema() -> Schema { + Schema::new(vec![ + Field::new("a", DataType::Int64, false), + Field::new("b", DataType::Utf8, false), + Field::new("c", DataType::Int32, false), + Field::new("date", DataType::Date32, false), + Field::new("d", DataType::Binary, false), + Field::new("e", DataType::Float64, false), + Field::new("f", DataType::Boolean, false), + Field::new("g", DataType::Utf8, true), + ]) +} + +fn data() -> RecordBatch { + let columns = vec![ + Arc::new(Int64Array::from_slice([27, 47])) as Arc, + Arc::new(Utf8Array::::from_slice(["foo", "bar"])) as Arc, + Arc::new(Int32Array::from_slice([1, 1])) as Arc, + Arc::new(Int32Array::from_slice([1, 2]).to(DataType::Date32)) as Arc, + Arc::new(BinaryArray::::from_slice([b"foo", b"bar"])) as Arc, + Arc::new(PrimitiveArray::::from_slice([1.0, 2.0])) as Arc, + Arc::new(BooleanArray::from_slice([true, false])) as Arc, + Arc::new(Utf8Array::::from([Some("foo"), None])) as Arc, + ]; + + RecordBatch::try_new(Arc::new(schema()), columns).unwrap() +} + +use super::read::read_avro; + +fn write_avro>( + arrays: &[R], + schema: &Schema, + compression: Option, +) -> Result> { + let avro_fields = write::to_avro_schema(schema)?; + + let mut serializers = arrays + .iter() + .map(|x| x.as_ref()) + .zip(avro_fields.iter()) + .map(|(array, field)| write::new_serializer(array, &field.schema)) + .collect::>(); + let mut block = write::Block::new(arrays[0].as_ref().len(), vec![]); + + write::serialize(&mut serializers, &mut block)?; + + let mut compressed_block = write::CompressedBlock::default(); + + if let Some(compression) = compression { + write::compress(&block, &mut compressed_block, compression)?; + } else { + compressed_block.number_of_rows = block.number_of_rows; + std::mem::swap(&mut compressed_block.data, &mut block.data); + } + + let mut file = vec![]; + + write::write_metadata(&mut file, avro_fields.clone(), compression)?; + + write::write_block(&mut file, &compressed_block)?; + + Ok(file) +} + +fn roundtrip(compression: Option) -> Result<()> { + let expected = data(); + + let arrays = expected.columns(); + let schema = expected.schema(); + + let data = write_avro(arrays, schema, compression)?; + + let result = read_avro(&data)?; + + assert_eq!(result.schema(), expected.schema()); + for (c1, c2) in result.columns().iter().zip(expected.columns().iter()) { + assert_eq!(c1, c2); + } + assert_eq!(result, expected); + Ok(()) +} + +#[test] +fn no_compression() -> Result<()> { + roundtrip(None) +} From 1150bf3b45cf01a0f7e59f21c7c153016aa69ede Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Mon, 20 Dec 2021 07:15:16 +0000 Subject: [PATCH 3/5] Improved example and added guide entry --- examples/avro_write.rs | 58 +++++++++++++++++++++++++----------------- guide/src/SUMMARY.md | 1 + 2 files changed, 36 insertions(+), 23 deletions(-) diff --git a/examples/avro_write.rs b/examples/avro_write.rs index 866ebe05f9c..6d970bd9e01 100644 --- a/examples/avro_write.rs +++ b/examples/avro_write.rs @@ -1,4 +1,4 @@ -use std::{fs::File, sync::Arc}; +use std::fs::File; use arrow2::{ array::{Array, Int32Array}, @@ -7,6 +7,39 @@ use arrow2::{ io::avro::write, }; +fn write_avro( + file: &mut W, + arrays: &[&dyn Array], + schema: &Schema, + compression: Option, +) -> Result<()> { + let avro_fields = write::to_avro_schema(schema)?; + + let mut serializers = arrays + .iter() + .zip(avro_fields.iter()) + .map(|(array, field)| write::new_serializer(*array, &field.schema)) + .collect::>(); + let mut block = write::Block::new(arrays[0].len(), vec![]); + + write::serialize(&mut serializers, &mut block)?; + + let mut compressed_block = write::CompressedBlock::default(); + + if let Some(compression) = compression { + write::compress(&block, &mut compressed_block, compression)?; + } else { + compressed_block.number_of_rows = block.number_of_rows; + std::mem::swap(&mut compressed_block.data, &mut block.data); + } + + write::write_metadata(file, avro_fields.clone(), compression)?; + + write::write_block(file, &compressed_block)?; + + Ok(()) +} + fn main() -> Result<()> { use std::env; let args: Vec = env::args().collect(); @@ -25,29 +58,8 @@ fn main() -> Result<()> { let field = Field::new("c1", array.data_type().clone(), true); let schema = Schema::new(vec![field]); - let avro_fields = write::to_avro_schema(&schema)?; - let mut file = File::create(path)?; - - let compression = None; - - write::write_metadata(&mut file, avro_fields.clone(), compression)?; - - let serializer = write::new_serializer(&array, &avro_fields[0].schema); - let mut block = write::Block::new(array.len(), vec![]); - - write::serialize(&mut [serializer], &mut block)?; - - let mut compressed_block = write::CompressedBlock::default(); - - if let Some(compression) = compression { - write::compress(&block, &mut compressed_block, compression)?; - } else { - compressed_block.number_of_rows = block.number_of_rows; - std::mem::swap(&mut compressed_block.data, &mut block.data); - } - - write::write_block(&mut file, &compressed_block)?; + write_avro(&mut file, &[(&array) as &dyn Array], &schema, None)?; Ok(()) } diff --git a/guide/src/SUMMARY.md b/guide/src/SUMMARY.md index a66c993096d..9719504ee56 100644 --- a/guide/src/SUMMARY.md +++ b/guide/src/SUMMARY.md @@ -17,3 +17,4 @@ - [Read Arrow stream](./io/ipc_stream_read.md) - [Write Arrow](./io/ipc_write.md) - [Read Avro](./io/avro_read.md) + - [Write Avro](./io/avro_write.md) From b8b329661bbaacd6a557f2db8779cc25196207a9 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Mon, 20 Dec 2021 07:18:13 +0000 Subject: [PATCH 4/5] Clippy --- src/io/avro/write/block.rs | 3 +-- src/io/avro/write/schema.rs | 8 ++------ tests/it/io/avro/read.rs | 2 +- 3 files changed, 4 insertions(+), 9 deletions(-) diff --git a/src/io/avro/write/block.rs b/src/io/avro/write/block.rs index 9251ca24448..562019f6c56 100644 --- a/src/io/avro/write/block.rs +++ b/src/io/avro/write/block.rs @@ -61,10 +61,9 @@ pub fn compress( compressed_block: &mut CompressedBlock, compression: Compression, ) -> Result<()> { + compressed_block.number_of_rows = block.number_of_rows; match compression { Compression::Deflate => todo!(), Compression::Snappy => todo!(), } - compressed_block.number_of_rows = block.number_of_rows; - Ok(()) } diff --git a/src/io/avro/write/schema.rs b/src/io/avro/write/schema.rs index fdee06a1ede..b8cb5917a10 100644 --- a/src/io/avro/write/schema.rs +++ b/src/io/avro/write/schema.rs @@ -5,13 +5,9 @@ use avro_schema::{ use crate::datatypes::*; use crate::error::{ArrowError, Result}; -/// Converts a [`Schema`] to an avro [`AvroSchema::Record`] with it. +/// Converts a [`Schema`] to a vector of [`AvroField`] with it. pub fn to_avro_schema(schema: &Schema) -> Result> { - schema - .fields - .iter() - .map(|field| field_to_field(field)) - .collect() + schema.fields.iter().map(field_to_field).collect() } fn field_to_field(field: &Field) -> Result { diff --git a/tests/it/io/avro/read.rs b/tests/it/io/avro/read.rs index ba58a6901fe..d55e10332d3 100644 --- a/tests/it/io/avro/read.rs +++ b/tests/it/io/avro/read.rs @@ -98,7 +98,7 @@ pub(super) fn data() -> RecordBatch { } pub(super) fn write_avro(codec: Codec) -> std::result::Result, avro_rs::Error> { - let (avro, schema) = schema(); + let (avro, _) = schema(); // a writer needs a schema and something to write to let mut writer = Writer::with_codec(&avro, Vec::new(), codec); From 38289605105f450dd62682df047916558b058022 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Tue, 21 Dec 2021 16:54:49 +0000 Subject: [PATCH 5/5] Added guide. --- guide/src/io/avro_write.md | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 guide/src/io/avro_write.md diff --git a/guide/src/io/avro_write.md b/guide/src/io/avro_write.md new file mode 100644 index 00000000000..ca84219356e --- /dev/null +++ b/guide/src/io/avro_write.md @@ -0,0 +1,8 @@ +# Avro write + +You can use this crate to write to Apache Avro. +Below is an example, which you can run when this crate is compiled with feature `io_avro`. + +```rust +{{#include ../../../examples/avro_write.rs}} +```