Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Added support for async read of Avro #620

Merged
merged 1 commit into from
Dec 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ avro-rs = { version = "0.13", optional = true, default_features = false }
# compression of avro
libflate = { version = "1.1.1", optional = true }
snap = { version = "1", optional = true }
# async avro
async-stream = { version = "0.3.2", optional = true }

# for division/remainder optimization at runtime
strength_reduce = { version = "0.2", optional = true }
Expand Down Expand Up @@ -141,7 +143,7 @@ io_avro_compression = [
"libflate",
"snap",
]
io_avro_async = ["io_avro", "futures"]
io_avro_async = ["io_avro", "futures", "async-stream"]
# io_json: its dependencies + error handling
# serde_derive: there is some derive around
io_json_integration = ["io_json", "serde_derive", "hex"]
Expand Down
42 changes: 42 additions & 0 deletions examples/avro_read_async.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use std::sync::Arc;

use futures::pin_mut;
use futures::StreamExt;
use tokio::fs::File;
use tokio_util::compat::*;

use arrow2::error::Result;
use arrow2::io::avro::read::{decompress_block, deserialize};
use arrow2::io::avro::read_async::*;

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()> {
use std::env;
let args: Vec<String> = env::args().collect();

let file_path = &args[1];

let mut reader = File::open(file_path).await?.compat();

let (avro_schemas, schema, compression, marker) = read_metadata(&mut reader).await?;
let schema = Arc::new(schema);
let avro_schemas = Arc::new(avro_schemas);

let blocks = block_stream(&mut reader, marker).await;

pin_mut!(blocks);
while let Some((mut block, rows)) = blocks.next().await.transpose()? {
// the content here is blocking. In general this should run on spawn_blocking
let schema = schema.clone();
let avro_schemas = avro_schemas.clone();
let handle = tokio::task::spawn_blocking(move || {
let mut decompressed = vec![];
decompress_block(&mut block, &mut decompressed, compression)?;
deserialize(&decompressed, rows, schema, &avro_schemas)
});
let batch = handle.await.unwrap()?;
assert!(batch.num_rows() > 0);
}

Ok(())
}
1 change: 1 addition & 0 deletions guide/src/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@
- [Read Arrow](./io/ipc_read.md)
- [Read Arrow stream](./io/ipc_stream_read.md)
- [Write Arrow](./io/ipc_write.md)
- [Read Avro](./io/avro_read.md)
1 change: 1 addition & 0 deletions guide/src/io/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ This crate offers optional features that enable interoperability with different
* CSV (`io_csv`)
* Parquet (`io_parquet`)
* Json (`io_json`)
* Avro (`io_avro` and `io_avro_async`)

In this section you can find a guide and examples for each one of them.
11 changes: 11 additions & 0 deletions guide/src/io/avro_read.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Avro read

When compiled with feature `io_avro_async`, you can use this crate to read Avro files
asynchronously.

```rust
{{#include ../../../examples/avro_read_async.rs}}
```

Note how both decompression and deserialization is performed on a separate thread pool to not
block (see also [here](https://ryhl.io/blog/async-what-is-blocking/)).
1 change: 1 addition & 0 deletions src/io/avro/read/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ fn read_block<R: Read>(reader: &mut R, buf: &mut Vec<u8>, file_marker: [u8; 16])
return Ok(0);
};

buf.clear();
buf.resize(bytes, 0);
reader.read_exact(buf)?;

Expand Down
18 changes: 9 additions & 9 deletions src/io/avro/read/decompress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,33 +8,33 @@ use crate::error::{ArrowError, Result};
use super::BlockStreamIterator;
use super::Compression;

/// Decompresses an avro block.
/// Decompresses an Avro block.
/// Returns whether the buffers where swapped.
fn decompress_block(
pub fn decompress_block(
block: &mut Vec<u8>,
decompress: &mut Vec<u8>,
decompressed: &mut Vec<u8>,
compression: Option<Compression>,
) -> Result<bool> {
match compression {
None => {
std::mem::swap(block, decompress);
std::mem::swap(block, decompressed);
Ok(true)
}
#[cfg(feature = "io_avro_compression")]
Some(Compression::Deflate) => {
decompress.clear();
decompressed.clear();
let mut decoder = libflate::deflate::Decoder::new(&block[..]);
decoder.read_to_end(decompress)?;
decoder.read_to_end(decompressed)?;
Ok(false)
}
#[cfg(feature = "io_avro_compression")]
Some(Compression::Snappy) => {
let len = snap::raw::decompress_len(&block[..block.len() - 4])
.map_err(|_| ArrowError::ExternalFormat("Failed to decompress snap".to_string()))?;
decompress.clear();
decompress.resize(len, 0);
decompressed.clear();
decompressed.resize(len, 0);
snap::raw::Decoder::new()
.decompress(&block[..block.len() - 4], decompress)
.decompress(&block[..block.len() - 4], decompressed)
.map_err(|_| ArrowError::ExternalFormat("Failed to decompress snap".to_string()))?;
Ok(false)
}
Expand Down
1 change: 1 addition & 0 deletions src/io/avro/read/deserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ fn deserialize_value<'a>(
Ok(block)
}

/// Deserializes an Avro block into a [`RecordBatch`].
pub fn deserialize(
mut block: &[u8],
rows: usize,
Expand Down
6 changes: 4 additions & 2 deletions src/io/avro/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,16 @@ use fallible_streaming_iterator::FallibleStreamingIterator;
mod block;
mod decompress;
pub use block::BlockStreamIterator;
pub use decompress::Decompressor;
pub use decompress::{decompress_block, Decompressor};
mod deserialize;
pub use deserialize::deserialize;
mod header;
mod nested;
mod schema;
mod util;

pub(super) use header::deserialize_header;
pub(super) use schema::convert_schema;

use crate::datatypes::Schema;
use crate::error::Result;
Expand Down Expand Up @@ -80,7 +82,7 @@ impl<R: Read> Iterator for Reader<R> {

self.iter.next().transpose().map(|x| {
let (data, rows) = x?;
deserialize::deserialize(data, *rows, schema, avro_schemas)
deserialize(data, *rows, schema, avro_schemas)
})
}
}
2 changes: 1 addition & 1 deletion src/io/avro/read/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::datatypes::*;
use crate::error::{ArrowError, Result};

/// Returns the fully qualified name for a field
pub fn aliased(name: &str, namespace: Option<&str>, default_namespace: Option<&str>) -> String {
fn aliased(name: &str, namespace: Option<&str>, default_namespace: Option<&str>) -> String {
if name.contains('.') {
name.to_string()
} else {
Expand Down
67 changes: 67 additions & 0 deletions src/io/avro/read_async/block.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
//! APIs to read from Avro format to arrow.
use async_stream::try_stream;
use futures::AsyncRead;
use futures::AsyncReadExt;
use futures::Stream;

use crate::error::{ArrowError, Result};

use super::utils::zigzag_i64;

async fn read_size<R: AsyncRead + Unpin + Send>(reader: &mut R) -> Result<(usize, usize)> {
let rows = match zigzag_i64(reader).await {
Ok(a) => a,
Err(ArrowError::Io(io_err)) => {
if let std::io::ErrorKind::UnexpectedEof = io_err.kind() {
// end
return Ok((0, 0));
} else {
return Err(ArrowError::Io(io_err));
}
}
Err(other) => return Err(other),
};
let bytes = zigzag_i64(reader).await?;
Ok((rows as usize, bytes as usize))
}

/// Reads a block from the file into `buf`.
/// # Panic
/// Panics iff the block marker does not equal to the file's marker
async fn read_block<R: AsyncRead + Unpin + Send>(
reader: &mut R,
buf: &mut Vec<u8>,
file_marker: [u8; 16],
) -> Result<usize> {
let (rows, bytes) = read_size(reader).await?;
if rows == 0 {
return Ok(0);
};

buf.clear();
buf.resize(bytes, 0);
reader.read_exact(buf).await?;

let mut marker = [0u8; 16];
reader.read_exact(&mut marker).await?;

assert!(!(marker != file_marker));
Ok(rows)
}

/// Returns a fallible [`Stream`] of Avro blocks bound to `reader`
pub async fn block_stream<R: AsyncRead + Unpin + Send>(
reader: &mut R,
file_marker: [u8; 16],
) -> impl Stream<Item = Result<(Vec<u8>, usize)>> + '_ {
try_stream! {
loop {
let mut buffer = vec![];
let rows = read_block(reader, &mut buffer, file_marker).await?;
if rows == 0 {
break
}
yield (buffer, rows)
}
}
}
Empty file removed src/io/avro/read_async/header.rs
Empty file.
60 changes: 60 additions & 0 deletions src/io/avro/read_async/metadata.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
//! Async Avro
use std::collections::HashMap;

use avro_rs::Schema as AvroSchema;
use futures::AsyncRead;
use futures::AsyncReadExt;

use crate::datatypes::Schema;
use crate::error::{ArrowError, Result};

use super::super::read::convert_schema;
use super::super::read::deserialize_header;
use super::super::read::Compression;
use super::super::{read_header, read_metadata};
use super::utils::zigzag_i64;

/// Reads Avro's metadata from `reader` into a [`AvroSchema`], [`Compression`] and magic marker.
#[allow(clippy::type_complexity)]
async fn read_metadata_async<R: AsyncRead + Unpin + Send>(
reader: &mut R,
) -> Result<(AvroSchema, Option<Compression>, [u8; 16])> {
read_metadata!(reader.await)
}

/// Reads the avro metadata from `reader` into a [`AvroSchema`], [`Compression`] and magic marker.
#[allow(clippy::type_complexity)]
pub async fn read_metadata<R: AsyncRead + Unpin + Send>(
reader: &mut R,
) -> Result<(Vec<AvroSchema>, Schema, Option<Compression>, [u8; 16])> {
let (avro_schema, codec, marker) = read_metadata_async(reader).await?;
let schema = convert_schema(&avro_schema)?;

let avro_schema = if let AvroSchema::Record { fields, .. } = avro_schema {
fields.into_iter().map(|x| x.schema).collect()
} else {
panic!()
};

Ok((avro_schema, schema, codec, marker))
}

/// Reads the file marker asynchronously
async fn read_file_marker<R: AsyncRead + Unpin + Send>(reader: &mut R) -> Result<[u8; 16]> {
let mut marker = [0u8; 16];
reader.read_exact(&mut marker).await?;
Ok(marker)
}

async fn _read_binary<R: AsyncRead + Unpin + Send>(reader: &mut R) -> Result<Vec<u8>> {
let len: usize = zigzag_i64(reader).await? as usize;
let mut buf = vec![0u8; len];
reader.read_exact(&mut buf).await?;
Ok(buf)
}

async fn read_header<R: AsyncRead + Unpin + Send>(
reader: &mut R,
) -> Result<HashMap<String, Vec<u8>>> {
read_header!(reader.await)
}
55 changes: 5 additions & 50 deletions src/io/avro/read_async/mod.rs
Original file line number Diff line number Diff line change
@@ -1,53 +1,8 @@
//! Async Avro
use std::collections::HashMap;

use avro_rs::Schema;
use futures::AsyncRead;
use futures::AsyncReadExt;
mod block;
mod metadata;
pub(self) mod utils;

use crate::error::{ArrowError, Result};

use super::read::deserialize_header;
use super::read::Compression;
use super::{avro_decode, read_header, read_metadata};

/// Reads Avro's metadata from `reader` into a [`Schema`], [`Compression`] and magic marker.
#[allow(clippy::type_complexity)]
pub async fn read_metadata_async<R: AsyncRead + Unpin + Send>(
reader: &mut R,
) -> Result<(Schema, Option<Compression>, [u8; 16])> {
read_metadata!(reader.await)
}

/// Reads the file marker asynchronously
async fn read_file_marker<R: AsyncRead + Unpin + Send>(reader: &mut R) -> Result<[u8; 16]> {
let mut marker = [0u8; 16];
reader.read_exact(&mut marker).await?;
Ok(marker)
}

async fn zigzag_i64<R: AsyncRead + Unpin + Send>(reader: &mut R) -> Result<i64> {
let z = decode_variable(reader).await?;
Ok(if z & 0x1 == 0 {
(z >> 1) as i64
} else {
!(z >> 1) as i64
})
}

async fn decode_variable<R: AsyncRead + Unpin + Send>(reader: &mut R) -> Result<u64> {
avro_decode!(reader.await)
}

async fn _read_binary<R: AsyncRead + Unpin + Send>(reader: &mut R) -> Result<Vec<u8>> {
let len: usize = zigzag_i64(reader).await? as usize;
let mut buf = vec![0u8; len];
reader.read_exact(&mut buf).await?;
Ok(buf)
}

async fn read_header<R: AsyncRead + Unpin + Send>(
reader: &mut R,
) -> Result<HashMap<String, Vec<u8>>> {
read_header!(reader.await)
}
pub use block::block_stream;
pub use metadata::read_metadata;
19 changes: 19 additions & 0 deletions src/io/avro/read_async/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use futures::AsyncRead;
use futures::AsyncReadExt;

use crate::error::{ArrowError, Result};

use super::super::avro_decode;

pub async fn zigzag_i64<R: AsyncRead + Unpin + Send>(reader: &mut R) -> Result<i64> {
let z = decode_variable(reader).await?;
Ok(if z & 0x1 == 0 {
(z >> 1) as i64
} else {
!(z >> 1) as i64
})
}

async fn decode_variable<R: AsyncRead + Unpin + Send>(reader: &mut R) -> Result<u64> {
avro_decode!(reader.await)
}
Loading