From 45e72e98c8857007ca8679e68837ff0eb50f551d Mon Sep 17 00:00:00 2001 From: Jorge Leitao Date: Wed, 22 Dec 2021 06:40:48 +0000 Subject: [PATCH] Added support to write to Avro (#690) --- Cargo.toml | 2 +- examples/avro_write.rs | 65 ++++++++++ guide/src/SUMMARY.md | 1 + guide/src/io/avro_write.md | 8 ++ src/io/avro/mod.rs | 11 +- src/io/avro/read/block.rs | 2 +- src/io/avro/read/mod.rs | 11 +- src/io/avro/read/schema.rs | 22 ++-- src/io/avro/read_async/metadata.rs | 2 +- src/io/avro/write/block.rs | 69 +++++++++++ src/io/avro/write/header.rs | 30 +++++ src/io/avro/write/mod.rs | 69 +++++++++++ src/io/avro/write/schema.rs | 62 ++++++++++ src/io/avro/write/serialize.rs | 190 +++++++++++++++++++++++++++++ src/io/avro/write/util.rs | 26 ++++ src/io/csv/write/mod.rs | 3 +- src/io/csv/write/serialize.rs | 2 +- src/io/{csv/write => }/iterator.rs | 0 src/io/mod.rs | 3 + tests/it/io/avro/mod.rs | 1 + tests/it/io/avro/read.rs | 80 ++++++------ tests/it/io/avro/read_async.rs | 7 +- tests/it/io/avro/write.rs | 95 +++++++++++++++ 23 files changed, 697 insertions(+), 64 deletions(-) create mode 100644 examples/avro_write.rs create mode 100644 guide/src/io/avro_write.md create mode 100644 src/io/avro/write/block.rs create mode 100644 src/io/avro/write/header.rs create mode 100644 src/io/avro/write/mod.rs create mode 100644 src/io/avro/write/schema.rs create mode 100644 src/io/avro/write/serialize.rs create mode 100644 src/io/avro/write/util.rs rename src/io/{csv/write => }/iterator.rs (100%) create mode 100644 tests/it/io/avro/write.rs diff --git a/Cargo.toml b/Cargo.toml index cef7012b1b7..f85f01a0afd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -138,7 +138,7 @@ io_parquet_compression = [ "parquet2/lz4", "parquet2/brotli", ] -io_avro = ["avro-schema", "fallible-streaming-iterator", "serde_json"] +io_avro = ["avro-schema", "streaming-iterator", "fallible-streaming-iterator", "serde_json"] io_avro_compression = [ "libflate", "snap", diff --git a/examples/avro_write.rs b/examples/avro_write.rs new file mode 100644 index 00000000000..6d970bd9e01 --- /dev/null +++ b/examples/avro_write.rs @@ -0,0 +1,65 @@ +use std::fs::File; + +use arrow2::{ + array::{Array, Int32Array}, + datatypes::{Field, Schema}, + error::Result, + io::avro::write, +}; + +fn write_avro( + file: &mut W, + arrays: &[&dyn Array], + schema: &Schema, + compression: Option, +) -> Result<()> { + let avro_fields = write::to_avro_schema(schema)?; + + let mut serializers = arrays + .iter() + .zip(avro_fields.iter()) + .map(|(array, field)| write::new_serializer(*array, &field.schema)) + .collect::>(); + let mut block = write::Block::new(arrays[0].len(), vec![]); + + write::serialize(&mut serializers, &mut block)?; + + let mut compressed_block = write::CompressedBlock::default(); + + if let Some(compression) = compression { + write::compress(&block, &mut compressed_block, compression)?; + } else { + compressed_block.number_of_rows = block.number_of_rows; + std::mem::swap(&mut compressed_block.data, &mut block.data); + } + + write::write_metadata(file, avro_fields.clone(), compression)?; + + write::write_block(file, &compressed_block)?; + + Ok(()) +} + +fn main() -> Result<()> { + use std::env; + let args: Vec = env::args().collect(); + + let path = &args[1]; + + let array = Int32Array::from(&[ + Some(0), + Some(1), + Some(2), + Some(3), + Some(4), + Some(5), + Some(6), + ]); + let field = Field::new("c1", array.data_type().clone(), true); + let schema = Schema::new(vec![field]); + + let mut file = File::create(path)?; + write_avro(&mut file, &[(&array) as &dyn Array], &schema, None)?; + + Ok(()) +} diff --git a/guide/src/SUMMARY.md b/guide/src/SUMMARY.md index a66c993096d..9719504ee56 100644 --- a/guide/src/SUMMARY.md +++ b/guide/src/SUMMARY.md @@ -17,3 +17,4 @@ - [Read Arrow stream](./io/ipc_stream_read.md) - [Write Arrow](./io/ipc_write.md) - [Read Avro](./io/avro_read.md) + - [Write Avro](./io/avro_write.md) diff --git a/guide/src/io/avro_write.md b/guide/src/io/avro_write.md new file mode 100644 index 00000000000..ca84219356e --- /dev/null +++ b/guide/src/io/avro_write.md @@ -0,0 +1,8 @@ +# Avro write + +You can use this crate to write to Apache Avro. +Below is an example, which you can run when this crate is compiled with feature `io_avro`. + +```rust +{{#include ../../../examples/avro_write.rs}} +``` diff --git a/src/io/avro/mod.rs b/src/io/avro/mod.rs index d5a69ab3557..0dc4b8909e6 100644 --- a/src/io/avro/mod.rs +++ b/src/io/avro/mod.rs @@ -6,6 +6,16 @@ pub mod read; #[cfg(feature = "io_avro_async")] #[cfg_attr(docsrs, doc(cfg(feature = "io_avro_async")))] pub mod read_async; +pub mod write; + +/// Valid compressions +#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] +pub enum Compression { + /// Deflate + Deflate, + /// Snappy + Snappy, +} // macros that can operate in sync and async code. macro_rules! avro_decode { @@ -13,7 +23,6 @@ macro_rules! avro_decode { { let mut i = 0u64; let mut buf = [0u8; 1]; - let mut j = 0; loop { if j > 9 { diff --git a/src/io/avro/read/block.rs b/src/io/avro/read/block.rs index 69bc0e3ffb6..40f30ac3d06 100644 --- a/src/io/avro/read/block.rs +++ b/src/io/avro/read/block.rs @@ -24,7 +24,7 @@ fn read_size(reader: &mut R) -> Result<(usize, usize)> { Ok((rows as usize, bytes as usize)) } -/// Reads a block from the file into `buf`. +/// Reads a block from the `reader` into `buf`. /// # Panic /// Panics iff the block marker does not equal to the file's marker fn read_block(reader: &mut R, buf: &mut Vec, file_marker: [u8; 16]) -> Result { diff --git a/src/io/avro/read/mod.rs b/src/io/avro/read/mod.rs index fea56e8e030..1cb32186fa4 100644 --- a/src/io/avro/read/mod.rs +++ b/src/io/avro/read/mod.rs @@ -24,14 +24,7 @@ use crate::datatypes::Schema; use crate::error::Result; use crate::record_batch::RecordBatch; -/// Valid compressions -#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] -pub enum Compression { - /// Deflate - Deflate, - /// Snappy - Snappy, -} +use super::Compression; /// Reads the avro metadata from `reader` into a [`Schema`], [`Compression`] and magic marker. #[allow(clippy::type_complexity)] @@ -39,7 +32,7 @@ pub fn read_metadata( reader: &mut R, ) -> Result<(Vec, Schema, Option, [u8; 16])> { let (avro_schema, codec, marker) = util::read_schema(reader)?; - let schema = schema::convert_schema(&avro_schema)?; + let schema = convert_schema(&avro_schema)?; let avro_schema = if let AvroSchema::Record(Record { fields, .. }) = avro_schema { fields.into_iter().map(|x| x.schema).collect() diff --git a/src/io/avro/read/schema.rs b/src/io/avro/read/schema.rs index 7ab286f244a..420187988ef 100644 --- a/src/io/avro/read/schema.rs +++ b/src/io/avro/read/schema.rs @@ -56,6 +56,7 @@ fn external_props(schema: &AvroSchema) -> BTreeMap { props } +/// Maps an Avro Schema into a [`Schema`]. pub fn convert_schema(schema: &AvroSchema) -> Result { let mut schema_fields = vec![]; match schema { @@ -65,22 +66,25 @@ pub fn convert_schema(schema: &AvroSchema) -> Result { &field.schema, Some(&field.name), false, - Some(&external_props(&field.schema)), + Some(external_props(&field.schema)), )?) } } - schema => schema_fields.push(schema_to_field(schema, Some(""), false, None)?), - } - - let schema = Schema::new(schema_fields); - Ok(schema) + other => { + return Err(ArrowError::OutOfSpec(format!( + "An avro Schema must be of type Record - it is of type {:?}", + other + ))) + } + }; + Ok(Schema::new(schema_fields)) } fn schema_to_field( schema: &AvroSchema, name: Option<&str>, mut nullable: bool, - props: Option<&BTreeMap>, + props: Option>, ) -> Result { let data_type = match schema { AvroSchema::Null => DataType::Null, @@ -169,7 +173,7 @@ fn schema_to_field( &field.schema, Some(&format!("{}.{}", name, field.name)), false, - Some(&props), + Some(props), ) }) .collect(); @@ -198,6 +202,6 @@ fn schema_to_field( let name = name.unwrap_or_default(); let mut field = Field::new(name, data_type, nullable); - field.set_metadata(props.cloned()); + field.set_metadata(props); Ok(field) } diff --git a/src/io/avro/read_async/metadata.rs b/src/io/avro/read_async/metadata.rs index 3040799aa5d..1fb3526d91e 100644 --- a/src/io/avro/read_async/metadata.rs +++ b/src/io/avro/read_async/metadata.rs @@ -10,7 +10,7 @@ use crate::error::{ArrowError, Result}; use super::super::read::convert_schema; use super::super::read::deserialize_header; -use super::super::read::Compression; +use super::super::Compression; use super::super::{read_header, read_metadata}; use super::utils::zigzag_i64; diff --git a/src/io/avro/write/block.rs b/src/io/avro/write/block.rs new file mode 100644 index 00000000000..562019f6c56 --- /dev/null +++ b/src/io/avro/write/block.rs @@ -0,0 +1,69 @@ +use std::io::Write; + +use crate::{error::Result, io::avro::Compression}; + +use super::{util::zigzag_encode, SYNC_NUMBER}; + +/// A compressed Avro block. +#[derive(Debug, Clone, Default, PartialEq)] +pub struct CompressedBlock { + /// The number of rows + pub number_of_rows: usize, + /// The compressed data + pub data: Vec, +} + +impl CompressedBlock { + /// Creates a new CompressedBlock + pub fn new(number_of_rows: usize, data: Vec) -> Self { + Self { + number_of_rows, + data, + } + } +} + +/// An uncompressed Avro block. +#[derive(Debug, Clone, Default, PartialEq)] +pub struct Block { + /// The number of rows + pub number_of_rows: usize, + /// The uncompressed data + pub data: Vec, +} + +impl Block { + /// Creates a new Block + pub fn new(number_of_rows: usize, data: Vec) -> Self { + Self { + number_of_rows, + data, + } + } +} + +/// Writes a [`CompressedBlock`] to `writer` +pub fn write_block(writer: &mut W, compressed_block: &CompressedBlock) -> Result<()> { + // write size and rows + zigzag_encode(compressed_block.number_of_rows as i64, writer)?; + zigzag_encode(compressed_block.data.len() as i64, writer)?; + + writer.write_all(&compressed_block.data)?; + + writer.write_all(&SYNC_NUMBER)?; + + Ok(()) +} + +/// Compresses an [`Block`] to a [`CompressedBlock`]. +pub fn compress( + block: &Block, + compressed_block: &mut CompressedBlock, + compression: Compression, +) -> Result<()> { + compressed_block.number_of_rows = block.number_of_rows; + match compression { + Compression::Deflate => todo!(), + Compression::Snappy => todo!(), + } +} diff --git a/src/io/avro/write/header.rs b/src/io/avro/write/header.rs new file mode 100644 index 00000000000..8d21cc0ad63 --- /dev/null +++ b/src/io/avro/write/header.rs @@ -0,0 +1,30 @@ +use std::collections::HashMap; + +use avro_schema::Schema; +use serde_json; + +use crate::error::{ArrowError, Result}; + +use super::Compression; + +/// Serializes an [`Schema`] and optional [`Compression`] into an avro header. +pub(crate) fn serialize_header( + schema: &Schema, + compression: Option, +) -> Result>> { + let schema = + serde_json::to_string(schema).map_err(|e| ArrowError::ExternalFormat(e.to_string()))?; + + let mut header = HashMap::>::default(); + + header.insert("avro.schema".to_string(), schema.into_bytes()); + if let Some(compression) = compression { + let value = match compression { + Compression::Snappy => b"snappy".to_vec(), + Compression::Deflate => b"deflate".to_vec(), + }; + header.insert("avro.codec".to_string(), value); + }; + + Ok(header) +} diff --git a/src/io/avro/write/mod.rs b/src/io/avro/write/mod.rs new file mode 100644 index 00000000000..3be4fc53103 --- /dev/null +++ b/src/io/avro/write/mod.rs @@ -0,0 +1,69 @@ +//! APIs to write to Avro format. +use std::io::Write; + +use avro_schema::{Field as AvroField, Record, Schema as AvroSchema}; + +use crate::error::Result; + +pub use super::Compression; + +mod header; +use header::serialize_header; +mod schema; +pub use schema::to_avro_schema; +mod serialize; +pub use serialize::{can_serialize, new_serializer, BoxSerializer}; +mod block; +pub use block::*; +mod util; + +const SYNC_NUMBER: [u8; 16] = [1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4]; + +/// Writes Avro's metadata to `writer`. +pub fn write_metadata( + writer: &mut W, + fields: Vec, + compression: Option, +) -> Result<()> { + // * Four bytes, ASCII 'O', 'b', 'j', followed by 1. + let avro_magic = [b'O', b'b', b'j', 1u8]; + writer.write_all(&avro_magic)?; + + // * file metadata, including the schema. + let schema = AvroSchema::Record(Record::new("", fields)); + let header = serialize_header(&schema, compression)?; + + util::zigzag_encode(header.len() as i64, writer)?; + for (name, item) in header { + util::write_binary(name.as_bytes(), writer)?; + util::write_binary(&item, writer)?; + } + writer.write_all(&[0])?; + + // The 16-byte, randomly-generated sync marker for this file. + writer.write_all(&SYNC_NUMBER)?; + + Ok(()) +} + +/// consumes a set of [`BoxSerializer`] into an [`Block`]. +/// # Panics +/// Panics iff the number of items in any of the serializers is not equal to the number of rows +/// declared in the `block`. +pub fn serialize<'a>(serializers: &mut [BoxSerializer<'a>], block: &mut Block) -> Result<()> { + let Block { + data, + number_of_rows, + } = block; + + data.clear(); // restart it + + // _the_ transpose (columns -> rows) + for _ in 0..*number_of_rows { + for serializer in &mut *serializers { + let item_data = serializer.next().unwrap(); + data.write_all(item_data)?; + } + } + Ok(()) +} diff --git a/src/io/avro/write/schema.rs b/src/io/avro/write/schema.rs new file mode 100644 index 00000000000..b8cb5917a10 --- /dev/null +++ b/src/io/avro/write/schema.rs @@ -0,0 +1,62 @@ +use avro_schema::{ + Field as AvroField, Fixed, FixedLogical, IntLogical, LongLogical, Schema as AvroSchema, +}; + +use crate::datatypes::*; +use crate::error::{ArrowError, Result}; + +/// Converts a [`Schema`] to a vector of [`AvroField`] with it. +pub fn to_avro_schema(schema: &Schema) -> Result> { + schema.fields.iter().map(field_to_field).collect() +} + +fn field_to_field(field: &Field) -> Result { + let schema = type_to_schema(field.data_type(), field.is_nullable())?; + Ok(AvroField::new(field.name(), schema)) +} + +fn type_to_schema(data_type: &DataType, is_nullable: bool) -> Result { + Ok(if is_nullable { + AvroSchema::Union(vec![AvroSchema::Null, _type_to_schema(data_type)?]) + } else { + _type_to_schema(data_type)? + }) +} + +fn _type_to_schema(data_type: &DataType) -> Result { + Ok(match data_type.to_logical_type() { + DataType::Null => AvroSchema::Null, + DataType::Boolean => AvroSchema::Boolean, + DataType::Int32 => AvroSchema::Int(None), + DataType::Int64 => AvroSchema::Long(None), + DataType::Float32 => AvroSchema::Float, + DataType::Float64 => AvroSchema::Double, + DataType::Binary => AvroSchema::Bytes(None), + DataType::Utf8 => AvroSchema::String(None), + DataType::List(inner) => AvroSchema::Array(Box::new(type_to_schema( + inner.data_type(), + inner.is_nullable(), + )?)), + DataType::Date32 => AvroSchema::Int(Some(IntLogical::Date)), + DataType::Time32(TimeUnit::Millisecond) => AvroSchema::Int(Some(IntLogical::Time)), + DataType::Time64(TimeUnit::Microsecond) => AvroSchema::Long(Some(LongLogical::Time)), + DataType::Timestamp(TimeUnit::Millisecond, None) => { + AvroSchema::Long(Some(LongLogical::LocalTimestampMillis)) + } + DataType::Timestamp(TimeUnit::Microsecond, None) => { + AvroSchema::Long(Some(LongLogical::LocalTimestampMicros)) + } + DataType::Interval(IntervalUnit::MonthDayNano) => { + let mut fixed = Fixed::new("", 12); + fixed.logical = Some(FixedLogical::Duration); + AvroSchema::Fixed(fixed) + } + DataType::FixedSizeBinary(size) => AvroSchema::Fixed(Fixed::new("", *size)), + other => { + return Err(ArrowError::NotYetImplemented(format!( + "write {:?} to avro", + other + ))) + } + }) +} diff --git a/src/io/avro/write/serialize.rs b/src/io/avro/write/serialize.rs new file mode 100644 index 00000000000..5a4e7d52541 --- /dev/null +++ b/src/io/avro/write/serialize.rs @@ -0,0 +1,190 @@ +use avro_schema::Schema as AvroSchema; + +use crate::datatypes::{PhysicalType, PrimitiveType}; +use crate::{array::*, datatypes::DataType}; + +use super::super::super::iterator::*; +use super::util; + +/// A type alias for a boxed [`StreamingIterator`], used to write arrays into avro rows +/// (i.e. a column -> row transposition of types known at run-time) +pub type BoxSerializer<'a> = Box + 'a + Send + Sync>; + +/// Creates a [`StreamingIterator`] trait object that presents items from `array` +/// encoded according to `schema`. +/// # Panic +/// This function panics iff the `data_type` is not supported (use [`can_serialize`] to check) +/// # Implementation +/// This function performs minimal CPU work: it dynamically dispatches based on the schema +/// and arrow type. +pub fn new_serializer<'a>(array: &'a dyn Array, schema: &AvroSchema) -> BoxSerializer<'a> { + let data_type = array.data_type().to_physical_type(); + + match (data_type, schema) { + (PhysicalType::Boolean, AvroSchema::Boolean) => { + let values = array.as_any().downcast_ref::().unwrap(); + Box::new(BufStreamingIterator::new( + values.values_iter(), + |x, buf| { + buf.push(x as u8); + }, + vec![], + )) + } + (PhysicalType::Boolean, AvroSchema::Union(_)) => { + let values = array.as_any().downcast_ref::().unwrap(); + Box::new(BufStreamingIterator::new( + values.iter(), + |x, buf| { + util::zigzag_encode(x.is_some() as i64, buf).unwrap(); + if let Some(x) = x { + buf.push(x as u8); + } + }, + vec![], + )) + } + (PhysicalType::Utf8, AvroSchema::Union(_)) => { + let values = array.as_any().downcast_ref::>().unwrap(); + Box::new(BufStreamingIterator::new( + values.iter(), + |x, buf| { + util::zigzag_encode(x.is_some() as i64, buf).unwrap(); + if let Some(x) = x { + util::zigzag_encode(x.len() as i64, buf).unwrap(); + buf.extend_from_slice(x.as_bytes()); + } + }, + vec![], + )) + } + (PhysicalType::Utf8, AvroSchema::String(_)) => { + let values = array.as_any().downcast_ref::>().unwrap(); + Box::new(BufStreamingIterator::new( + values.values_iter(), + |x, buf| { + util::zigzag_encode(x.len() as i64, buf).unwrap(); + buf.extend_from_slice(x.as_bytes()); + }, + vec![], + )) + } + (PhysicalType::Binary, AvroSchema::Union(_)) => { + let values = array.as_any().downcast_ref::>().unwrap(); + Box::new(BufStreamingIterator::new( + values.iter(), + |x, buf| { + util::zigzag_encode(x.is_some() as i64, buf).unwrap(); + if let Some(x) = x { + util::zigzag_encode(x.len() as i64, buf).unwrap(); + buf.extend_from_slice(x); + } + }, + vec![], + )) + } + (PhysicalType::Binary, AvroSchema::Bytes(_)) => { + let values = array.as_any().downcast_ref::>().unwrap(); + Box::new(BufStreamingIterator::new( + values.values_iter(), + |x, buf| { + util::zigzag_encode(x.len() as i64, buf).unwrap(); + buf.extend_from_slice(x); + }, + vec![], + )) + } + + (PhysicalType::Primitive(PrimitiveType::Int32), AvroSchema::Union(_)) => { + let values = array + .as_any() + .downcast_ref::>() + .unwrap(); + Box::new(BufStreamingIterator::new( + values.iter(), + |x, buf| { + util::zigzag_encode(x.is_some() as i64, buf).unwrap(); + if let Some(x) = x { + util::zigzag_encode(*x as i64, buf).unwrap(); + } + }, + vec![], + )) + } + (PhysicalType::Primitive(PrimitiveType::Int32), AvroSchema::Int(_)) => { + let values = array + .as_any() + .downcast_ref::>() + .unwrap(); + Box::new(BufStreamingIterator::new( + values.values().iter(), + |x, buf| { + util::zigzag_encode(*x as i64, buf).unwrap(); + }, + vec![], + )) + } + (PhysicalType::Primitive(PrimitiveType::Int64), AvroSchema::Union(_)) => { + let values = array + .as_any() + .downcast_ref::>() + .unwrap(); + Box::new(BufStreamingIterator::new( + values.iter(), + |x, buf| { + util::zigzag_encode(x.is_some() as i64, buf).unwrap(); + if let Some(x) = x { + util::zigzag_encode(*x, buf).unwrap(); + } + }, + vec![], + )) + } + (PhysicalType::Primitive(PrimitiveType::Int64), AvroSchema::Long(_)) => { + let values = array + .as_any() + .downcast_ref::>() + .unwrap(); + Box::new(BufStreamingIterator::new( + values.values().iter(), + |x, buf| { + util::zigzag_encode(*x, buf).unwrap(); + }, + vec![], + )) + } + (PhysicalType::Primitive(PrimitiveType::Float32), AvroSchema::Float) => { + let values = array + .as_any() + .downcast_ref::>() + .unwrap(); + Box::new(BufStreamingIterator::new( + values.values().iter(), + |x, buf| { + buf.extend_from_slice(&x.to_le_bytes()); + }, + vec![], + )) + } + (PhysicalType::Primitive(PrimitiveType::Float64), AvroSchema::Double) => { + let values = array + .as_any() + .downcast_ref::>() + .unwrap(); + Box::new(BufStreamingIterator::new( + values.values().iter(), + |x, buf| { + buf.extend_from_slice(&x.to_le_bytes()); + }, + vec![], + )) + } + _ => todo!(), + } +} + +/// Whether [`new_serializer`] supports `data_type`. +pub fn can_serialize(data_type: &DataType) -> bool { + use DataType::*; + matches!(data_type, Boolean | Int32 | Int64 | Utf8 | Binary) +} diff --git a/src/io/avro/write/util.rs b/src/io/avro/write/util.rs new file mode 100644 index 00000000000..52d13c026e9 --- /dev/null +++ b/src/io/avro/write/util.rs @@ -0,0 +1,26 @@ +use crate::error::Result; + +#[inline] +pub fn zigzag_encode(n: i64, writer: &mut W) -> Result<()> { + _zigzag_encode(((n << 1) ^ (n >> 63)) as u64, writer) +} + +#[inline] +fn _zigzag_encode(mut z: u64, writer: &mut W) -> Result<()> { + loop { + if z <= 0x7F { + writer.write_all(&[(z & 0x7F) as u8])?; + break; + } else { + writer.write_all(&[(0x80 | (z & 0x7F)) as u8])?; + z >>= 7; + } + } + Ok(()) +} + +pub(crate) fn write_binary(bytes: &[u8], writer: &mut W) -> Result<()> { + zigzag_encode(bytes.len() as i64, writer)?; + writer.write_all(bytes)?; + Ok(()) +} diff --git a/src/io/csv/write/mod.rs b/src/io/csv/write/mod.rs index e74bc150c16..5aa44c91d31 100644 --- a/src/io/csv/write/mod.rs +++ b/src/io/csv/write/mod.rs @@ -1,8 +1,7 @@ //! APIs to write to CSV -mod iterator; mod serialize; -use iterator::StreamingIterator; +use super::super::iterator::StreamingIterator; use std::io::Write; diff --git a/src/io/csv/write/serialize.rs b/src/io/csv/write/serialize.rs index 41086388010..6c37e0eb24f 100644 --- a/src/io/csv/write/serialize.rs +++ b/src/io/csv/write/serialize.rs @@ -10,7 +10,7 @@ use crate::{ error::Result, }; -use super::iterator::{BufStreamingIterator, StreamingIterator}; +use super::super::super::iterator::{BufStreamingIterator, StreamingIterator}; use crate::array::{DictionaryArray, DictionaryKey, Offset}; use std::any::Any; diff --git a/src/io/csv/write/iterator.rs b/src/io/iterator.rs similarity index 100% rename from src/io/csv/write/iterator.rs rename to src/io/iterator.rs diff --git a/src/io/mod.rs b/src/io/mod.rs index a8d68d66951..e406de9847c 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -42,3 +42,6 @@ pub mod avro; #[cfg(feature = "io_print")] #[cfg_attr(docsrs, doc(cfg(feature = "io_print")))] pub mod print; + +#[cfg(any(feature = "io_csv_write", feature = "io_avro"))] +mod iterator; diff --git a/tests/it/io/avro/mod.rs b/tests/it/io/avro/mod.rs index ee0459c04a4..29b3c6b003a 100644 --- a/tests/it/io/avro/mod.rs +++ b/tests/it/io/avro/mod.rs @@ -3,3 +3,4 @@ mod read; #[cfg(feature = "io_avro_async")] mod read_async; +mod write; diff --git a/tests/it/io/avro/read.rs b/tests/it/io/avro/read.rs index db6e501d851..d55e10332d3 100644 --- a/tests/it/io/avro/read.rs +++ b/tests/it/io/avro/read.rs @@ -10,7 +10,7 @@ use arrow2::error::Result; use arrow2::io::avro::read; use arrow2::record_batch::RecordBatch; -fn schema() -> (AvroSchema, Schema) { +pub(super) fn schema() -> (AvroSchema, Schema) { let raw_schema = r#" { "type": "record", @@ -69,8 +69,36 @@ fn schema() -> (AvroSchema, Schema) { (AvroSchema::parse_str(raw_schema).unwrap(), schema) } -pub(super) fn write(codec: Codec) -> std::result::Result<(Vec, RecordBatch), avro_rs::Error> { - let (avro, schema) = schema(); +pub(super) fn data() -> RecordBatch { + let data = vec![ + Some(vec![Some(1i32), None, Some(3)]), + Some(vec![Some(1i32), None, Some(3)]), + ]; + + let mut array = MutableListArray::>::new(); + array.try_extend(data).unwrap(); + + let columns = vec![ + Arc::new(Int64Array::from_slice([27, 47])) as Arc, + Arc::new(Utf8Array::::from_slice(["foo", "bar"])) as Arc, + Arc::new(Int32Array::from_slice([1, 1])) as Arc, + Arc::new(Int32Array::from_slice([1, 2]).to(DataType::Date32)) as Arc, + Arc::new(BinaryArray::::from_slice([b"foo", b"bar"])) as Arc, + Arc::new(PrimitiveArray::::from_slice([1.0, 2.0])) as Arc, + Arc::new(BooleanArray::from_slice([true, false])) as Arc, + Arc::new(Utf8Array::::from([Some("foo"), None])) as Arc, + array.into_arc(), + Arc::new(DictionaryArray::::from_data( + Int32Array::from_slice([1, 0]), + Arc::new(Utf8Array::::from_slice(["SPADES", "HEARTS"])), + )) as Arc, + ]; + + RecordBatch::try_new(Arc::new(schema().1), columns).unwrap() +} + +pub(super) fn write_avro(codec: Codec) -> std::result::Result, avro_rs::Error> { + let (avro, _) = schema(); // a writer needs a schema and something to write to let mut writer = Writer::with_codec(&avro, Vec::new(), codec); @@ -118,40 +146,11 @@ pub(super) fn write(codec: Codec) -> std::result::Result<(Vec, RecordBatch), ); record.put("enum", Value::Enum(0, "SPADES".to_string())); writer.append(record)?; - - let data = vec![ - Some(vec![Some(1i32), None, Some(3)]), - Some(vec![Some(1i32), None, Some(3)]), - ]; - - let mut array = MutableListArray::>::new(); - array.try_extend(data).unwrap(); - - let columns = vec![ - Arc::new(Int64Array::from_slice([27, 47])) as Arc, - Arc::new(Utf8Array::::from_slice(["foo", "bar"])) as Arc, - Arc::new(Int32Array::from_slice([1, 1])) as Arc, - Arc::new(Int32Array::from_slice([1, 2]).to(DataType::Date32)) as Arc, - Arc::new(BinaryArray::::from_slice([b"foo", b"bar"])) as Arc, - Arc::new(PrimitiveArray::::from_slice([1.0, 2.0])) as Arc, - Arc::new(BooleanArray::from_slice([true, false])) as Arc, - Arc::new(Utf8Array::::from([Some("foo"), None])) as Arc, - array.into_arc(), - Arc::new(DictionaryArray::::from_data( - Int32Array::from_slice([1, 0]), - Arc::new(Utf8Array::::from_slice(["SPADES", "HEARTS"])), - )) as Arc, - ]; - - let expected = RecordBatch::try_new(Arc::new(schema), columns).unwrap(); - - Ok((writer.into_inner().unwrap(), expected)) + Ok(writer.into_inner().unwrap()) } -fn test(codec: Codec) -> Result<()> { - let (data, expected) = write(codec).unwrap(); - - let file = &mut &data[..]; +pub(super) fn read_avro(mut avro: &[u8]) -> Result { + let file = &mut avro; let (avro_schema, schema, codec, file_marker) = read::read_metadata(file)?; @@ -161,7 +160,16 @@ fn test(codec: Codec) -> Result<()> { Arc::new(schema), ); - assert_eq!(reader.next().unwrap().unwrap(), expected); + reader.next().unwrap() +} + +fn test(codec: Codec) -> Result<()> { + let avro = write_avro(codec).unwrap(); + let expected = data(); + + let result = read_avro(&avro)?; + + assert_eq!(result, expected); Ok(()) } diff --git a/tests/it/io/avro/read_async.rs b/tests/it/io/avro/read_async.rs index 3ace4f05ae5..7b6b019b45b 100644 --- a/tests/it/io/avro/read_async.rs +++ b/tests/it/io/avro/read_async.rs @@ -8,12 +8,13 @@ use futures::StreamExt; use arrow2::error::Result; use arrow2::io::avro::read_async::*; -use super::read::write; +use super::read::{data, write_avro}; async fn test(codec: Codec) -> Result<()> { - let (data, expected) = write(codec).unwrap(); + let avro_data = write_avro(codec).unwrap(); + let expected = data(); - let mut reader = &mut &data[..]; + let mut reader = &mut &avro_data[..]; let (_, schema, _, marker) = read_metadata(&mut reader).await?; let schema = Arc::new(schema); diff --git a/tests/it/io/avro/write.rs b/tests/it/io/avro/write.rs new file mode 100644 index 00000000000..b4e026c9c90 --- /dev/null +++ b/tests/it/io/avro/write.rs @@ -0,0 +1,95 @@ +use std::sync::Arc; + +use arrow2::array::*; +use arrow2::datatypes::*; +use arrow2::error::Result; +use arrow2::io::avro::write; +use arrow2::record_batch::RecordBatch; + +fn schema() -> Schema { + Schema::new(vec![ + Field::new("a", DataType::Int64, false), + Field::new("b", DataType::Utf8, false), + Field::new("c", DataType::Int32, false), + Field::new("date", DataType::Date32, false), + Field::new("d", DataType::Binary, false), + Field::new("e", DataType::Float64, false), + Field::new("f", DataType::Boolean, false), + Field::new("g", DataType::Utf8, true), + ]) +} + +fn data() -> RecordBatch { + let columns = vec![ + Arc::new(Int64Array::from_slice([27, 47])) as Arc, + Arc::new(Utf8Array::::from_slice(["foo", "bar"])) as Arc, + Arc::new(Int32Array::from_slice([1, 1])) as Arc, + Arc::new(Int32Array::from_slice([1, 2]).to(DataType::Date32)) as Arc, + Arc::new(BinaryArray::::from_slice([b"foo", b"bar"])) as Arc, + Arc::new(PrimitiveArray::::from_slice([1.0, 2.0])) as Arc, + Arc::new(BooleanArray::from_slice([true, false])) as Arc, + Arc::new(Utf8Array::::from([Some("foo"), None])) as Arc, + ]; + + RecordBatch::try_new(Arc::new(schema()), columns).unwrap() +} + +use super::read::read_avro; + +fn write_avro>( + arrays: &[R], + schema: &Schema, + compression: Option, +) -> Result> { + let avro_fields = write::to_avro_schema(schema)?; + + let mut serializers = arrays + .iter() + .map(|x| x.as_ref()) + .zip(avro_fields.iter()) + .map(|(array, field)| write::new_serializer(array, &field.schema)) + .collect::>(); + let mut block = write::Block::new(arrays[0].as_ref().len(), vec![]); + + write::serialize(&mut serializers, &mut block)?; + + let mut compressed_block = write::CompressedBlock::default(); + + if let Some(compression) = compression { + write::compress(&block, &mut compressed_block, compression)?; + } else { + compressed_block.number_of_rows = block.number_of_rows; + std::mem::swap(&mut compressed_block.data, &mut block.data); + } + + let mut file = vec![]; + + write::write_metadata(&mut file, avro_fields.clone(), compression)?; + + write::write_block(&mut file, &compressed_block)?; + + Ok(file) +} + +fn roundtrip(compression: Option) -> Result<()> { + let expected = data(); + + let arrays = expected.columns(); + let schema = expected.schema(); + + let data = write_avro(arrays, schema, compression)?; + + let result = read_avro(&data)?; + + assert_eq!(result.schema(), expected.schema()); + for (c1, c2) in result.columns().iter().zip(expected.columns().iter()) { + assert_eq!(c1, c2); + } + assert_eq!(result, expected); + Ok(()) +} + +#[test] +fn no_compression() -> Result<()> { + roundtrip(None) +}