diff --git a/Cargo.toml b/Cargo.toml index 0df68fc2cd3..783300ca679 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,7 @@ csv = { version = "^1.1", optional = true } regex = { version = "^1.3", optional = true } lazy_static = { version = "^1.4", optional = true } streaming-iterator = { version = "0.1", optional = true } +fallible-streaming-iterator = { version = "0.1", optional = true } serde = { version = "^1.0", features = ["rc"], optional = true } serde_derive = { version = "^1.0", optional = true } @@ -119,7 +120,7 @@ io_parquet_compression = [ "parquet2/lz4", "parquet2/brotli", ] -io_avro = ["avro-rs", "streaming-iterator", "serde_json", "libflate"] +io_avro = ["avro-rs", "fallible-streaming-iterator", "serde_json", "libflate"] # io_json: its dependencies + error handling # serde_derive: there is some derive around io_json_integration = ["io_json", "serde_derive", "hex"] @@ -244,3 +245,7 @@ harness = false [[bench]] name = "iter_list" harness = false + +[[bench]] +name = "avro_read" +harness = false diff --git a/src/io/avro/read/mod.rs b/src/io/avro/read/mod.rs index 069c40393c2..a0669507e13 100644 --- a/src/io/avro/read/mod.rs +++ b/src/io/avro/read/mod.rs @@ -4,8 +4,8 @@ use std::io::Read; use std::sync::Arc; use avro_rs::{Codec, Schema as AvroSchema}; +use fallible_streaming_iterator::FallibleStreamingIterator; use libflate::deflate::Decoder; -use streaming_iterator::StreamingIterator; mod deserialize; mod nested; @@ -68,17 +68,19 @@ fn read_block(reader: &mut R, buf: &mut Vec, file_marker: [u8; 16]) Ok(rows) } -fn decompress_block(buf: &mut Vec, decompress: &mut Vec, codec: Codec) -> Result { +/// Decompresses an avro block. +/// Returns whether the buffers where swapped. +fn decompress_block(block: &mut Vec, decompress: &mut Vec, codec: Codec) -> Result { match codec { Codec::Null => { - std::mem::swap(buf, decompress); - Ok(false) + std::mem::swap(block, decompress); + Ok(true) } Codec::Deflate => { decompress.clear(); - let mut decoder = Decoder::new(&buf[..]); + let mut decoder = Decoder::new(&block[..]); decoder.read_to_end(decompress)?; - Ok(true) + Ok(false) } } } @@ -106,13 +108,14 @@ impl<'a, R: Read> BlockStreamIterator<'a, R> { } } -impl<'a, R: Read> StreamingIterator for BlockStreamIterator<'a, R> { +impl<'a, R: Read> FallibleStreamingIterator for BlockStreamIterator<'a, R> { + type Error = ArrowError; type Item = (Vec, usize); - fn advance(&mut self) { + fn advance(&mut self) -> Result<()> { let (buf, rows) = &mut self.buf; - // todo: surface this error - *rows = read_block(self.reader, buf, self.file_marker).unwrap(); + *rows = read_block(self.reader, buf, self.file_marker)?; + Ok(()) } fn get(&self) -> Option<&Self::Item> { @@ -144,17 +147,18 @@ impl<'a, R: Read> Decompressor<'a, R> { } } -impl<'a, R: Read> StreamingIterator for Decompressor<'a, R> { +impl<'a, R: Read> FallibleStreamingIterator for Decompressor<'a, R> { + type Error = ArrowError; type Item = (Vec, usize); - fn advance(&mut self) { + fn advance(&mut self) -> Result<()> { if self.was_swapped { std::mem::swap(self.blocks.buffer(), &mut self.buf.0); } - self.blocks.advance(); - self.was_swapped = - decompress_block(self.blocks.buffer(), &mut self.buf.0, self.codec).unwrap(); + self.blocks.advance()?; + self.was_swapped = decompress_block(self.blocks.buffer(), &mut self.buf.0, self.codec)?; self.buf.1 = self.blocks.get().map(|(_, rows)| *rows).unwrap_or_default(); + Ok(()) } fn get(&self) -> Option<&Self::Item> { @@ -192,15 +196,12 @@ impl<'a, R: Read> Iterator for Reader<'a, R> { type Item = Result; fn next(&mut self) -> Option { - if let Some((data, rows)) = self.iter.next() { - Some(deserialize::deserialize( - data, - *rows, - self.schema.clone(), - &self.avro_schemas, - )) - } else { - None - } + let schema = self.schema.clone(); + let avro_schemas = &self.avro_schemas; + + self.iter.next().transpose().map(|x| { + let (data, rows) = x?; + deserialize::deserialize(data, *rows, schema, avro_schemas) + }) } }