Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Surfaced errors in reading from avro #558

Merged
merged 1 commit into from
Oct 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ csv = { version = "^1.1", optional = true }
regex = { version = "^1.3", optional = true }
lazy_static = { version = "^1.4", optional = true }
streaming-iterator = { version = "0.1", optional = true }
fallible-streaming-iterator = { version = "0.1", optional = true }

serde = { version = "^1.0", features = ["rc"], optional = true }
serde_derive = { version = "^1.0", optional = true }
Expand Down Expand Up @@ -119,7 +120,7 @@ io_parquet_compression = [
"parquet2/lz4",
"parquet2/brotli",
]
io_avro = ["avro-rs", "streaming-iterator", "serde_json", "libflate"]
io_avro = ["avro-rs", "fallible-streaming-iterator", "serde_json", "libflate"]
# io_json: its dependencies + error handling
# serde_derive: there is some derive around
io_json_integration = ["io_json", "serde_derive", "hex"]
Expand Down Expand Up @@ -244,3 +245,7 @@ harness = false
[[bench]]
name = "iter_list"
harness = false

[[bench]]
name = "avro_read"
harness = false
83 changes: 83 additions & 0 deletions benches/avro_read.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
use std::io::Cursor;
use std::sync::Arc;

use avro_rs::types::Record;
use criterion::*;

use arrow2::error::Result;
use arrow2::io::avro::read;
use avro_rs::*;
use avro_rs::{Codec, Schema as AvroSchema};

fn schema() -> AvroSchema {
let raw_schema = r#"
{
"type": "record",
"name": "test",
"fields": [
{"name": "a", "type": "string"}
]
}
"#;
AvroSchema::parse_str(raw_schema).unwrap()
}

fn write(size: usize, has_codec: bool) -> Result<Vec<u8>> {
let avro = schema();
// a writer needs a schema and something to write to
let mut writer: Writer<Vec<u8>>;
if has_codec {
writer = Writer::with_codec(&avro, Vec::new(), Codec::Deflate);
} else {
writer = Writer::new(&avro, Vec::new());
}

(0..size).for_each(|_| {
let mut record = Record::new(writer.schema()).unwrap();
record.put("a", "foo");
writer.append(record).unwrap();
});

Ok(writer.into_inner().unwrap())
}

fn read_batch(buffer: &[u8], size: usize) -> Result<()> {
let mut file = Cursor::new(buffer);

let (avro_schema, schema, codec, file_marker) = read::read_metadata(&mut file)?;

let reader = read::Reader::new(
read::Decompressor::new(
read::BlockStreamIterator::new(&mut file, file_marker),
codec,
),
avro_schema,
Arc::new(schema),
);

let mut rows = 0;
for maybe_batch in reader {
let batch = maybe_batch?;
rows += batch.num_rows();
}
assert_eq!(rows, size);
Ok(())
}

fn add_benchmark(c: &mut Criterion) {
let mut group = c.benchmark_group("avro_read");

for log2_size in (10..=20).step_by(2) {
let size = 2usize.pow(log2_size);
let buffer = write(size, false).unwrap();

group.throughput(Throughput::Elements(size as u64));

group.bench_with_input(BenchmarkId::new("utf8", log2_size), &buffer, |b, buffer| {
b.iter(|| read_batch(buffer, size).unwrap())
});
}
}

criterion_group!(benches, add_benchmark);
criterion_main!(benches);
51 changes: 26 additions & 25 deletions src/io/avro/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use std::io::Read;
use std::sync::Arc;

use avro_rs::{Codec, Schema as AvroSchema};
use fallible_streaming_iterator::FallibleStreamingIterator;
use libflate::deflate::Decoder;
use streaming_iterator::StreamingIterator;

mod deserialize;
mod nested;
Expand Down Expand Up @@ -68,17 +68,19 @@ fn read_block<R: Read>(reader: &mut R, buf: &mut Vec<u8>, file_marker: [u8; 16])
Ok(rows)
}

fn decompress_block(buf: &mut Vec<u8>, decompress: &mut Vec<u8>, codec: Codec) -> Result<bool> {
/// Decompresses an avro block.
/// Returns whether the buffers where swapped.
fn decompress_block(block: &mut Vec<u8>, decompress: &mut Vec<u8>, codec: Codec) -> Result<bool> {
match codec {
Codec::Null => {
std::mem::swap(buf, decompress);
Ok(false)
std::mem::swap(block, decompress);
Ok(true)
}
Codec::Deflate => {
decompress.clear();
let mut decoder = Decoder::new(&buf[..]);
let mut decoder = Decoder::new(&block[..]);
decoder.read_to_end(decompress)?;
Ok(true)
Ok(false)
}
}
}
Expand Down Expand Up @@ -106,13 +108,14 @@ impl<'a, R: Read> BlockStreamIterator<'a, R> {
}
}

impl<'a, R: Read> StreamingIterator for BlockStreamIterator<'a, R> {
impl<'a, R: Read> FallibleStreamingIterator for BlockStreamIterator<'a, R> {
type Error = ArrowError;
type Item = (Vec<u8>, usize);

fn advance(&mut self) {
fn advance(&mut self) -> Result<()> {
let (buf, rows) = &mut self.buf;
// todo: surface this error
*rows = read_block(self.reader, buf, self.file_marker).unwrap();
*rows = read_block(self.reader, buf, self.file_marker)?;
Ok(())
}

fn get(&self) -> Option<&Self::Item> {
Expand Down Expand Up @@ -144,17 +147,18 @@ impl<'a, R: Read> Decompressor<'a, R> {
}
}

impl<'a, R: Read> StreamingIterator for Decompressor<'a, R> {
impl<'a, R: Read> FallibleStreamingIterator for Decompressor<'a, R> {
type Error = ArrowError;
type Item = (Vec<u8>, usize);

fn advance(&mut self) {
fn advance(&mut self) -> Result<()> {
if self.was_swapped {
std::mem::swap(self.blocks.buffer(), &mut self.buf.0);
}
self.blocks.advance();
self.was_swapped =
decompress_block(self.blocks.buffer(), &mut self.buf.0, self.codec).unwrap();
self.blocks.advance()?;
self.was_swapped = decompress_block(self.blocks.buffer(), &mut self.buf.0, self.codec)?;
self.buf.1 = self.blocks.get().map(|(_, rows)| *rows).unwrap_or_default();
Ok(())
}

fn get(&self) -> Option<&Self::Item> {
Expand Down Expand Up @@ -192,15 +196,12 @@ impl<'a, R: Read> Iterator for Reader<'a, R> {
type Item = Result<RecordBatch>;

fn next(&mut self) -> Option<Self::Item> {
if let Some((data, rows)) = self.iter.next() {
Some(deserialize::deserialize(
data,
*rows,
self.schema.clone(),
&self.avro_schemas,
))
} else {
None
}
let schema = self.schema.clone();
let avro_schemas = &self.avro_schemas;

self.iter.next().transpose().map(|x| {
let (data, rows) = x?;
deserialize::deserialize(data, *rows, schema, avro_schemas)
})
}
}