Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Surfaced avro errors in reading.
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Oct 30, 2021
1 parent 5fc843d commit d46105a
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 26 deletions.
7 changes: 6 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ csv = { version = "^1.1", optional = true }
regex = { version = "^1.3", optional = true }
lazy_static = { version = "^1.4", optional = true }
streaming-iterator = { version = "0.1", optional = true }
fallible-streaming-iterator = { version = "0.1", optional = true }

serde = { version = "^1.0", features = ["rc"], optional = true }
serde_derive = { version = "^1.0", optional = true }
Expand Down Expand Up @@ -119,7 +120,7 @@ io_parquet_compression = [
"parquet2/lz4",
"parquet2/brotli",
]
io_avro = ["avro-rs", "streaming-iterator", "serde_json", "libflate"]
io_avro = ["avro-rs", "fallible-streaming-iterator", "serde_json", "libflate"]
# io_json: its dependencies + error handling
# serde_derive: there is some derive around
io_json_integration = ["io_json", "serde_derive", "hex"]
Expand Down Expand Up @@ -244,3 +245,7 @@ harness = false
[[bench]]
name = "iter_list"
harness = false

[[bench]]
name = "avro_read"
harness = false
51 changes: 26 additions & 25 deletions src/io/avro/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use std::io::Read;
use std::sync::Arc;

use avro_rs::{Codec, Schema as AvroSchema};
use fallible_streaming_iterator::FallibleStreamingIterator;
use libflate::deflate::Decoder;
use streaming_iterator::StreamingIterator;

mod deserialize;
mod nested;
Expand Down Expand Up @@ -68,17 +68,19 @@ fn read_block<R: Read>(reader: &mut R, buf: &mut Vec<u8>, file_marker: [u8; 16])
Ok(rows)
}

fn decompress_block(buf: &mut Vec<u8>, decompress: &mut Vec<u8>, codec: Codec) -> Result<bool> {
/// Decompresses an avro block.
/// Returns whether the buffers where swapped.
fn decompress_block(block: &mut Vec<u8>, decompress: &mut Vec<u8>, codec: Codec) -> Result<bool> {
match codec {
Codec::Null => {
std::mem::swap(buf, decompress);
Ok(false)
std::mem::swap(block, decompress);
Ok(true)
}
Codec::Deflate => {
decompress.clear();
let mut decoder = Decoder::new(&buf[..]);
let mut decoder = Decoder::new(&block[..]);
decoder.read_to_end(decompress)?;
Ok(true)
Ok(false)
}
}
}
Expand Down Expand Up @@ -106,13 +108,14 @@ impl<'a, R: Read> BlockStreamIterator<'a, R> {
}
}

impl<'a, R: Read> StreamingIterator for BlockStreamIterator<'a, R> {
impl<'a, R: Read> FallibleStreamingIterator for BlockStreamIterator<'a, R> {
type Error = ArrowError;
type Item = (Vec<u8>, usize);

fn advance(&mut self) {
fn advance(&mut self) -> Result<()> {
let (buf, rows) = &mut self.buf;
// todo: surface this error
*rows = read_block(self.reader, buf, self.file_marker).unwrap();
*rows = read_block(self.reader, buf, self.file_marker)?;
Ok(())
}

fn get(&self) -> Option<&Self::Item> {
Expand Down Expand Up @@ -144,17 +147,18 @@ impl<'a, R: Read> Decompressor<'a, R> {
}
}

impl<'a, R: Read> StreamingIterator for Decompressor<'a, R> {
impl<'a, R: Read> FallibleStreamingIterator for Decompressor<'a, R> {
type Error = ArrowError;
type Item = (Vec<u8>, usize);

fn advance(&mut self) {
fn advance(&mut self) -> Result<()> {
if self.was_swapped {
std::mem::swap(self.blocks.buffer(), &mut self.buf.0);
}
self.blocks.advance();
self.was_swapped =
decompress_block(self.blocks.buffer(), &mut self.buf.0, self.codec).unwrap();
self.blocks.advance()?;
self.was_swapped = decompress_block(self.blocks.buffer(), &mut self.buf.0, self.codec)?;
self.buf.1 = self.blocks.get().map(|(_, rows)| *rows).unwrap_or_default();
Ok(())
}

fn get(&self) -> Option<&Self::Item> {
Expand Down Expand Up @@ -192,15 +196,12 @@ impl<'a, R: Read> Iterator for Reader<'a, R> {
type Item = Result<RecordBatch>;

fn next(&mut self) -> Option<Self::Item> {
if let Some((data, rows)) = self.iter.next() {
Some(deserialize::deserialize(
data,
*rows,
self.schema.clone(),
&self.avro_schemas,
))
} else {
None
}
let schema = self.schema.clone();
let avro_schemas = &self.avro_schemas;

self.iter.next().transpose().map(|x| {
let (data, rows) = x?;
deserialize::deserialize(data, *rows, schema, avro_schemas)
})
}
}

0 comments on commit d46105a

Please sign in to comment.