From e89fe349fa7b2105e203468956f1e29b5faaca32 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Wed, 29 Jun 2022 13:07:17 +0000 Subject: [PATCH 1/3] Improved read performance --- src/error.rs | 6 ++++++ src/io/parquet/read/row_group.rs | 19 +++++++++++++------ 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/src/error.rs b/src/error.rs index 3bf4d8b6501..52a8cd9062b 100644 --- a/src/error.rs +++ b/src/error.rs @@ -62,6 +62,12 @@ impl From for Error { } } +impl From for Error { + fn from(_: std::collections::TryReserveError) -> Error { + Error::Overflow + } +} + impl Display for Error { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { diff --git a/src/io/parquet/read/row_group.rs b/src/io/parquet/read/row_group.rs index 5d9198e6f45..deefaea3644 100644 --- a/src/io/parquet/read/row_group.rs +++ b/src/io/parquet/read/row_group.rs @@ -120,10 +120,15 @@ fn _read_single_column<'a, R>( where R: Read + Seek, { - let (start, len) = meta.byte_range(); + let (start, length) = meta.byte_range(); reader.seek(std::io::SeekFrom::Start(start))?; - let mut chunk = vec![0; len as usize]; - reader.read_exact(&mut chunk)?; + + let mut chunk = vec![]; + chunk.try_reserve(length as usize)?; + reader + .by_ref() + .take(length as u64) + .read_to_end(&mut chunk)?; Ok((meta, chunk)) } @@ -136,10 +141,12 @@ where F: Fn() -> BoxFuture<'b, std::io::Result>, { let mut reader = factory().await?; - let (start, len) = meta.byte_range(); + let (start, length) = meta.byte_range(); reader.seek(std::io::SeekFrom::Start(start)).await?; - let mut chunk = vec![0; len as usize]; - reader.read_exact(&mut chunk).await?; + + let mut chunk = vec![]; + chunk.try_reserve(length as usize)?; + reader.take(length as u64).read_to_end(&mut chunk).await?; Result::Ok((meta, chunk)) } From 339610eced3fa4d2c7333d52b28079e82a8e2368 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Wed, 29 Jun 2022 14:06:50 +0000 Subject: [PATCH 2/3] Improved read of IPC --- src/io/ipc/read/array/binary.rs | 4 +- src/io/ipc/read/array/dictionary.rs | 4 +- src/io/ipc/read/array/fixed_size_binary.rs | 4 +- src/io/ipc/read/array/fixed_size_list.rs | 4 +- src/io/ipc/read/array/list.rs | 6 +-- src/io/ipc/read/array/map.rs | 6 +-- src/io/ipc/read/array/primitive.rs | 4 +- src/io/ipc/read/array/struct_.rs | 4 +- src/io/ipc/read/array/union.rs | 6 +-- src/io/ipc/read/array/utf8.rs | 4 +- src/io/ipc/read/common.rs | 5 +- src/io/ipc/read/deserialize.rs | 4 +- src/io/ipc/read/file_async.rs | 63 ++++++++++++++-------- src/io/ipc/read/mod.rs | 2 - src/io/ipc/read/read_basic.rs | 18 ++++--- src/io/ipc/read/reader.rs | 34 +++++++----- src/io/ipc/read/stream.rs | 56 ++++++++++++------- src/io/ipc/read/stream_async.rs | 45 ++++++++++------ src/io/mod.rs | 6 --- src/io/readbuf.rs | 50 ----------------- 20 files changed, 162 insertions(+), 167 deletions(-) delete mode 100644 src/io/readbuf.rs diff --git a/src/io/ipc/read/array/binary.rs b/src/io/ipc/read/array/binary.rs index 51d3d7d26b8..917f46b4017 100644 --- a/src/io/ipc/read/array/binary.rs +++ b/src/io/ipc/read/array/binary.rs @@ -7,7 +7,7 @@ use crate::datatypes::DataType; use crate::error::{Error, Result}; use super::super::read_basic::*; -use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind, ReadBuffer}; +use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind}; #[allow(clippy::too_many_arguments)] pub fn read_binary( @@ -18,7 +18,7 @@ pub fn read_binary( block_offset: u64, is_little_endian: bool, compression: Option, - scratch: &mut ReadBuffer, + scratch: &mut Vec, ) -> Result> { let field_node = field_nodes.pop_front().ok_or_else(|| { Error::oos(format!( diff --git a/src/io/ipc/read/array/dictionary.rs b/src/io/ipc/read/array/dictionary.rs index 0b212c45012..63a0cb0b80f 100644 --- a/src/io/ipc/read/array/dictionary.rs +++ b/src/io/ipc/read/array/dictionary.rs @@ -6,7 +6,7 @@ use crate::array::{DictionaryArray, DictionaryKey}; use crate::error::{Error, Result}; use super::super::Dictionaries; -use super::super::{Compression, IpcBuffer, Node, ReadBuffer}; +use super::super::{Compression, IpcBuffer, Node}; use super::{read_primitive, skip_primitive}; #[allow(clippy::too_many_arguments)] @@ -19,7 +19,7 @@ pub fn read_dictionary( block_offset: u64, compression: Option, is_little_endian: bool, - scratch: &mut ReadBuffer, + scratch: &mut Vec, ) -> Result> where Vec: TryInto, diff --git a/src/io/ipc/read/array/fixed_size_binary.rs b/src/io/ipc/read/array/fixed_size_binary.rs index 5dc1b2ab0bf..e4cb0480def 100644 --- a/src/io/ipc/read/array/fixed_size_binary.rs +++ b/src/io/ipc/read/array/fixed_size_binary.rs @@ -6,7 +6,7 @@ use crate::datatypes::DataType; use crate::error::{Error, Result}; use super::super::read_basic::*; -use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind, ReadBuffer}; +use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind}; #[allow(clippy::too_many_arguments)] pub fn read_fixed_size_binary( @@ -17,7 +17,7 @@ pub fn read_fixed_size_binary( block_offset: u64, is_little_endian: bool, compression: Option, - scratch: &mut ReadBuffer, + scratch: &mut Vec, ) -> Result { let field_node = field_nodes.pop_front().ok_or_else(|| { Error::oos(format!( diff --git a/src/io/ipc/read/array/fixed_size_list.rs b/src/io/ipc/read/array/fixed_size_list.rs index ac169ed3f67..d8eb9725ed8 100644 --- a/src/io/ipc/read/array/fixed_size_list.rs +++ b/src/io/ipc/read/array/fixed_size_list.rs @@ -8,7 +8,7 @@ use crate::error::{Error, Result}; use super::super::super::IpcField; use super::super::deserialize::{read, skip}; use super::super::read_basic::*; -use super::super::{Compression, Dictionaries, IpcBuffer, Node, ReadBuffer, Version}; +use super::super::{Compression, Dictionaries, IpcBuffer, Node, Version}; #[allow(clippy::too_many_arguments)] pub fn read_fixed_size_list( @@ -22,7 +22,7 @@ pub fn read_fixed_size_list( is_little_endian: bool, compression: Option, version: Version, - scratch: &mut ReadBuffer, + scratch: &mut Vec, ) -> Result { let field_node = field_nodes.pop_front().ok_or_else(|| { Error::oos(format!( diff --git a/src/io/ipc/read/array/list.rs b/src/io/ipc/read/array/list.rs index bb85f233cf4..44fb17bd2a6 100644 --- a/src/io/ipc/read/array/list.rs +++ b/src/io/ipc/read/array/list.rs @@ -10,9 +10,7 @@ use crate::error::{Error, Result}; use super::super::super::IpcField; use super::super::deserialize::{read, skip}; use super::super::read_basic::*; -use super::super::{ - Compression, Dictionaries, IpcBuffer, Node, OutOfSpecKind, ReadBuffer, Version, -}; +use super::super::{Compression, Dictionaries, IpcBuffer, Node, OutOfSpecKind, Version}; #[allow(clippy::too_many_arguments)] pub fn read_list( @@ -26,7 +24,7 @@ pub fn read_list( is_little_endian: bool, compression: Option, version: Version, - scratch: &mut ReadBuffer, + scratch: &mut Vec, ) -> Result> where Vec: TryInto, diff --git a/src/io/ipc/read/array/map.rs b/src/io/ipc/read/array/map.rs index 40b4f5c0389..5020a157a0a 100644 --- a/src/io/ipc/read/array/map.rs +++ b/src/io/ipc/read/array/map.rs @@ -9,9 +9,7 @@ use crate::error::{Error, Result}; use super::super::super::IpcField; use super::super::deserialize::{read, skip}; use super::super::read_basic::*; -use super::super::{ - Compression, Dictionaries, IpcBuffer, Node, OutOfSpecKind, ReadBuffer, Version, -}; +use super::super::{Compression, Dictionaries, IpcBuffer, Node, OutOfSpecKind, Version}; #[allow(clippy::too_many_arguments)] pub fn read_map( @@ -25,7 +23,7 @@ pub fn read_map( is_little_endian: bool, compression: Option, version: Version, - scratch: &mut ReadBuffer, + scratch: &mut Vec, ) -> Result { let field_node = field_nodes.pop_front().ok_or_else(|| { Error::oos(format!( diff --git a/src/io/ipc/read/array/primitive.rs b/src/io/ipc/read/array/primitive.rs index 1e4000123b6..75a62e5a835 100644 --- a/src/io/ipc/read/array/primitive.rs +++ b/src/io/ipc/read/array/primitive.rs @@ -6,7 +6,7 @@ use crate::error::{Error, Result}; use crate::{array::PrimitiveArray, types::NativeType}; use super::super::read_basic::*; -use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind, ReadBuffer}; +use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind}; #[allow(clippy::too_many_arguments)] pub fn read_primitive( @@ -17,7 +17,7 @@ pub fn read_primitive( block_offset: u64, is_little_endian: bool, compression: Option, - scratch: &mut ReadBuffer, + scratch: &mut Vec, ) -> Result> where Vec: TryInto, diff --git a/src/io/ipc/read/array/struct_.rs b/src/io/ipc/read/array/struct_.rs index ae23c0f8829..469689d247b 100644 --- a/src/io/ipc/read/array/struct_.rs +++ b/src/io/ipc/read/array/struct_.rs @@ -8,7 +8,7 @@ use crate::error::{Error, Result}; use super::super::super::IpcField; use super::super::deserialize::{read, skip}; use super::super::read_basic::*; -use super::super::{Compression, Dictionaries, IpcBuffer, Node, ReadBuffer, Version}; +use super::super::{Compression, Dictionaries, IpcBuffer, Node, Version}; #[allow(clippy::too_many_arguments)] pub fn read_struct( @@ -22,7 +22,7 @@ pub fn read_struct( is_little_endian: bool, compression: Option, version: Version, - scratch: &mut ReadBuffer, + scratch: &mut Vec, ) -> Result { let field_node = field_nodes.pop_front().ok_or_else(|| { Error::oos(format!( diff --git a/src/io/ipc/read/array/union.rs b/src/io/ipc/read/array/union.rs index fc907a32395..678d65ded9d 100644 --- a/src/io/ipc/read/array/union.rs +++ b/src/io/ipc/read/array/union.rs @@ -9,9 +9,7 @@ use crate::error::{Error, Result}; use super::super::super::IpcField; use super::super::deserialize::{read, skip}; use super::super::read_basic::*; -use super::super::{ - Compression, Dictionaries, IpcBuffer, Node, OutOfSpecKind, ReadBuffer, Version, -}; +use super::super::{Compression, Dictionaries, IpcBuffer, Node, OutOfSpecKind, Version}; #[allow(clippy::too_many_arguments)] pub fn read_union( @@ -25,7 +23,7 @@ pub fn read_union( is_little_endian: bool, compression: Option, version: Version, - scratch: &mut ReadBuffer, + scratch: &mut Vec, ) -> Result { let field_node = field_nodes.pop_front().ok_or_else(|| { Error::oos(format!( diff --git a/src/io/ipc/read/array/utf8.rs b/src/io/ipc/read/array/utf8.rs index 10f1260f5ac..af25eaed896 100644 --- a/src/io/ipc/read/array/utf8.rs +++ b/src/io/ipc/read/array/utf8.rs @@ -7,7 +7,7 @@ use crate::datatypes::DataType; use crate::error::{Error, Result}; use super::super::read_basic::*; -use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind, ReadBuffer}; +use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind}; #[allow(clippy::too_many_arguments)] pub fn read_utf8( @@ -18,7 +18,7 @@ pub fn read_utf8( block_offset: u64, is_little_endian: bool, compression: Option, - scratch: &mut ReadBuffer, + scratch: &mut Vec, ) -> Result> { let field_node = field_nodes.pop_front().ok_or_else(|| { Error::oos(format!( diff --git a/src/io/ipc/read/common.rs b/src/io/ipc/read/common.rs index 18ddd8cbb8b..26252dd1831 100644 --- a/src/io/ipc/read/common.rs +++ b/src/io/ipc/read/common.rs @@ -9,7 +9,6 @@ use crate::datatypes::{DataType, Field}; use crate::error::{Error, Result}; use crate::io::ipc::read::OutOfSpecKind; use crate::io::ipc::{IpcField, IpcSchema}; -use crate::io::ReadBuffer; use super::deserialize::{read, skip}; use super::Dictionaries; @@ -86,7 +85,7 @@ pub fn read_record_batch( reader: &mut R, block_offset: u64, file_size: u64, - scratch: &mut ReadBuffer, + scratch: &mut Vec, ) -> Result>> { assert_eq!(fields.len(), ipc_schema.fields.len()); let buffers = batch @@ -234,7 +233,7 @@ pub fn read_dictionary( reader: &mut R, block_offset: u64, file_size: u64, - scratch: &mut ReadBuffer, + scratch: &mut Vec, ) -> Result<()> { if batch .is_delta() diff --git a/src/io/ipc/read/deserialize.rs b/src/io/ipc/read/deserialize.rs index 3378f99a894..3ed5c9349c7 100644 --- a/src/io/ipc/read/deserialize.rs +++ b/src/io/ipc/read/deserialize.rs @@ -10,7 +10,7 @@ use crate::error::Result; use crate::io::ipc::IpcField; use super::{array::*, Dictionaries}; -use super::{IpcBuffer, Node, ReadBuffer}; +use super::{IpcBuffer, Node}; #[allow(clippy::too_many_arguments)] pub fn read( @@ -24,7 +24,7 @@ pub fn read( is_little_endian: bool, compression: Option, version: MetadataVersion, - scratch: &mut ReadBuffer, + scratch: &mut Vec, ) -> Result> { use PhysicalType::*; let data_type = field.data_type.clone(); diff --git a/src/io/ipc/read/file_async.rs b/src/io/ipc/read/file_async.rs index cabfdd43b70..100990a6509 100644 --- a/src/io/ipc/read/file_async.rs +++ b/src/io/ipc/read/file_async.rs @@ -18,7 +18,6 @@ use super::reader::{deserialize_footer, get_serialized_batch}; use super::Dictionaries; use super::FileMetadata; use super::OutOfSpecKind; -use super::ReadBuffer; /// Async reader for Arrow IPC files pub struct FileStream<'a> { @@ -142,9 +141,14 @@ where { let footer_size = read_footer_len(reader).await?; // Read footer - let mut footer = vec![0; footer_size]; reader.seek(SeekFrom::End(-10 - footer_size as i64)).await?; - reader.read_exact(&mut footer).await?; + + let mut footer = vec![]; + footer.try_reserve(footer_size)?; + reader + .take(footer_size as u64) + .read_to_end(&mut footer) + .await?; deserialize_footer(&footer, u64::MAX) } @@ -156,9 +160,9 @@ async fn read_batch( metadata: &FileMetadata, projection: Option<&[usize]>, block: usize, - meta_buffer: &mut ReadBuffer, - block_buffer: &mut ReadBuffer, - scratch: &mut ReadBuffer, + meta_buffer: &mut Vec, + block_buffer: &mut Vec, + scratch: &mut Vec, ) -> Result>> where R: AsyncRead + AsyncSeek + Unpin, @@ -181,10 +185,14 @@ where .try_into() .map_err(|_| Error::from(OutOfSpecKind::UnexpectedNegativeInteger))?; - meta_buffer.set_len(meta_len); - reader.read_exact(meta_buffer.as_mut()).await?; + meta_buffer.clear(); + meta_buffer.try_reserve(meta_len)?; + (&mut reader) + .take(meta_len as u64) + .read_to_end(meta_buffer) + .await?; - let message = arrow_format::ipc::MessageRef::read_as_root(meta_buffer.as_ref()) + let message = arrow_format::ipc::MessageRef::read_as_root(meta_buffer) .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?; let batch = get_serialized_batch(&message)?; @@ -195,9 +203,14 @@ where .try_into() .map_err(|_| Error::from(OutOfSpecKind::UnexpectedNegativeInteger))?; - block_buffer.set_len(block_length); - reader.read_exact(block_buffer.as_mut()).await?; - let mut cursor = std::io::Cursor::new(block_buffer.as_ref()); + block_buffer.clear(); + block_buffer.try_reserve(block_length)?; + reader + .take(block_length as u64) + .read_to_end(block_buffer) + .await?; + + let mut cursor = std::io::Cursor::new(&block_buffer); read_record_batch( batch, @@ -220,14 +233,14 @@ async fn read_dictionaries( fields: &[Field], ipc_schema: &IpcSchema, blocks: &[Block], - scratch: &mut ReadBuffer, + scratch: &mut Vec, ) -> Result where R: AsyncRead + AsyncSeek + Unpin, { let mut dictionaries = Default::default(); - let mut data: ReadBuffer = vec![].into(); - let mut buffer: ReadBuffer = vec![].into(); + let mut data: Vec = vec![]; + let mut buffer: Vec = vec![]; for block in blocks { let offset: u64 = block @@ -250,11 +263,15 @@ where .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferHeader(err)))? .ok_or_else(|| Error::from(OutOfSpecKind::MissingMessageHeader))?; - buffer.set_len(length); match header { MessageHeaderRef::DictionaryBatch(batch) => { - reader.read_exact(buffer.as_mut()).await?; - let mut cursor = std::io::Cursor::new(buffer.as_ref()); + buffer.clear(); + buffer.try_reserve(length)?; + (&mut reader) + .take(length as u64) + .read_to_end(&mut buffer) + .await?; + let mut cursor = std::io::Cursor::new(&buffer); read_dictionary( batch, fields, @@ -272,7 +289,7 @@ where Ok(dictionaries) } -async fn read_dictionary_message(mut reader: R, offset: u64, data: &mut ReadBuffer) -> Result<()> +async fn read_dictionary_message(mut reader: R, offset: u64, data: &mut Vec) -> Result<()> where R: AsyncRead + AsyncSeek + Unpin, { @@ -288,8 +305,12 @@ where .try_into() .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - data.set_len(footer_size); - reader.read_exact(data.as_mut()).await?; + data.clear(); + data.try_reserve(footer_size)?; + (&mut reader) + .take(footer_size as u64) + .read_to_end(data) + .await?; Ok(()) } diff --git a/src/io/ipc/read/mod.rs b/src/io/ipc/read/mod.rs index f25da7a4a1c..5ffe6426e20 100644 --- a/src/io/ipc/read/mod.rs +++ b/src/io/ipc/read/mod.rs @@ -27,8 +27,6 @@ pub mod stream_async; #[cfg_attr(docsrs, doc(cfg(feature = "io_ipc_read_async")))] pub mod file_async; -use super::super::ReadBuffer; - pub use common::{read_dictionary, read_record_batch}; pub use reader::{ read_batch, read_file_dictionaries, read_file_metadata, FileMetadata, FileReader, diff --git a/src/io/ipc/read/read_basic.rs b/src/io/ipc/read/read_basic.rs index cc04035eec7..7a5ae859a6a 100644 --- a/src/io/ipc/read/read_basic.rs +++ b/src/io/ipc/read/read_basic.rs @@ -7,7 +7,7 @@ use crate::{bitmap::Bitmap, types::NativeType}; use super::super::compression; use super::super::endianess::is_native_little_endian; -use super::{Compression, IpcBuffer, Node, OutOfSpecKind, ReadBuffer}; +use super::{Compression, IpcBuffer, Node, OutOfSpecKind}; fn read_swapped( reader: &mut R, @@ -93,7 +93,7 @@ fn read_compressed_buffer( length: usize, is_little_endian: bool, compression: Compression, - scratch: &mut ReadBuffer, + scratch: &mut Vec, ) -> Result> { if is_little_endian != is_native_little_endian() { return Err(Error::NotYetImplemented( @@ -106,8 +106,12 @@ fn read_compressed_buffer( let mut buffer = vec![T::default(); length]; // decompress first - scratch.set_len(buffer_length); - reader.read_exact(scratch.as_mut())?; + scratch.clear(); + scratch.try_reserve(buffer_length)?; + reader + .by_ref() + .take(buffer_length as u64) + .read_to_end(scratch)?; let out_slice = bytemuck::cast_slice_mut(&mut buffer); @@ -117,10 +121,10 @@ fn read_compressed_buffer( match compression { arrow_format::ipc::CompressionType::Lz4Frame => { - compression::decompress_lz4(&scratch.as_ref()[8..], out_slice)?; + compression::decompress_lz4(&scratch[8..], out_slice)?; } arrow_format::ipc::CompressionType::Zstd => { - compression::decompress_zstd(&scratch.as_ref()[8..], out_slice)?; + compression::decompress_zstd(&scratch[8..], out_slice)?; } } Ok(buffer) @@ -133,7 +137,7 @@ pub fn read_buffer( block_offset: u64, is_little_endian: bool, compression: Option, - scratch: &mut ReadBuffer, + scratch: &mut Vec, ) -> Result> { let buf = buf .pop_front() diff --git a/src/io/ipc/read/reader.rs b/src/io/ipc/read/reader.rs index b01d85cf499..776ee684e82 100644 --- a/src/io/ipc/read/reader.rs +++ b/src/io/ipc/read/reader.rs @@ -13,7 +13,6 @@ use super::common::*; use super::schema::fb_to_schema; use super::Dictionaries; use super::OutOfSpecKind; -use super::ReadBuffer; use arrow_format::ipc::planus::ReadAsRoot; /// Metadata of an Arrow IPC file, written in the footer of the file. @@ -45,14 +44,14 @@ pub struct FileReader { dictionaries: Option, current_block: usize, projection: Option<(Vec, HashMap, Schema)>, - data_scratch: ReadBuffer, - message_scratch: ReadBuffer, + data_scratch: Vec, + message_scratch: Vec, } fn read_dictionary_message( reader: &mut R, offset: u64, - data: &mut ReadBuffer, + data: &mut Vec, ) -> Result<()> { let mut message_size: [u8; 4] = [0; 4]; reader.seek(SeekFrom::Start(offset))?; @@ -66,8 +65,13 @@ fn read_dictionary_message( .try_into() .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - data.set_len(message_length); - reader.read_exact(data.as_mut())?; + data.clear(); + data.try_reserve(message_length)?; + reader + .by_ref() + .take(message_length as u64) + .read_to_end(data)?; + Ok(()) } @@ -76,8 +80,8 @@ fn read_dictionary_block( metadata: &FileMetadata, block: &arrow_format::ipc::Block, dictionaries: &mut Dictionaries, - message_scratch: &mut ReadBuffer, - dictionary_scratch: &mut ReadBuffer, + message_scratch: &mut Vec, + dictionary_scratch: &mut Vec, ) -> Result<()> { let offset: u64 = block .offset @@ -120,7 +124,7 @@ fn read_dictionary_block( pub fn read_file_dictionaries( reader: &mut R, metadata: &FileMetadata, - scratch: &mut ReadBuffer, + scratch: &mut Vec, ) -> Result { let mut dictionaries = Default::default(); @@ -259,8 +263,8 @@ pub fn read_batch( metadata: &FileMetadata, projection: Option<&[usize]>, index: usize, - message_scratch: &mut ReadBuffer, - data_scratch: &mut ReadBuffer, + message_scratch: &mut Vec, + data_scratch: &mut Vec, ) -> Result>> { let block = metadata.blocks[index]; @@ -281,8 +285,12 @@ pub fn read_batch( .try_into() .map_err(|_| Error::from(OutOfSpecKind::UnexpectedNegativeInteger))?; - message_scratch.set_len(meta_len); - reader.read_exact(message_scratch.as_mut())?; + message_scratch.clear(); + message_scratch.try_reserve(meta_len)?; + reader + .by_ref() + .take(meta_len as u64) + .read_to_end(message_scratch)?; let message = arrow_format::ipc::MessageRef::read_as_root(message_scratch.as_ref()) .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?; diff --git a/src/io/ipc/read/stream.rs b/src/io/ipc/read/stream.rs index 5dd7b5cddda..e7d3d590aa4 100644 --- a/src/io/ipc/read/stream.rs +++ b/src/io/ipc/read/stream.rs @@ -15,7 +15,6 @@ use super::common::*; use super::schema::deserialize_stream_metadata; use super::Dictionaries; use super::OutOfSpecKind; -use super::ReadBuffer; /// Metadata of an Arrow IPC stream, written at the start of the stream #[derive(Debug, Clone)] @@ -44,14 +43,18 @@ pub fn read_stream_metadata(reader: &mut R) -> Result { i32::from_le_bytes(meta_size) }; - let meta_length: usize = meta_length + let length: usize = meta_length .try_into() .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - let mut meta_buffer = vec![0; meta_length]; - reader.read_exact(&mut meta_buffer)?; + let mut buffer = vec![]; + buffer.try_reserve(length as usize)?; + reader + .by_ref() + .take(length as u64) + .read_to_end(&mut buffer)?; - deserialize_stream_metadata(&meta_buffer) + deserialize_stream_metadata(&buffer) } /// Encodes the stream's status after each read. @@ -91,10 +94,10 @@ fn read_next( reader: &mut R, metadata: &StreamMetadata, dictionaries: &mut Dictionaries, - message_buffer: &mut ReadBuffer, - data_buffer: &mut ReadBuffer, + message_buffer: &mut Vec, + data_buffer: &mut Vec, projection: &Option<(Vec, HashMap, Schema)>, - scratch: &mut ReadBuffer, + scratch: &mut Vec, ) -> Result> { // determine metadata length let mut meta_length: [u8; 4] = [0; 4]; @@ -131,8 +134,12 @@ fn read_next( return Ok(None); } - message_buffer.set_len(meta_length); - reader.read_exact(message_buffer.as_mut())?; + message_buffer.clear(); + message_buffer.try_reserve(meta_length as usize)?; + reader + .by_ref() + .take(meta_length as u64) + .read_to_end(message_buffer)?; let message = arrow_format::ipc::MessageRef::read_as_root(message_buffer.as_ref()) .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?; @@ -148,12 +155,16 @@ fn read_next( .try_into() .map_err(|_| Error::from(OutOfSpecKind::UnexpectedNegativeInteger))?; - data_buffer.set_len(block_length); match header { arrow_format::ipc::MessageHeaderRef::RecordBatch(batch) => { - reader.read_exact(data_buffer.as_mut())?; + data_buffer.clear(); + data_buffer.try_reserve(block_length as usize)?; + reader + .by_ref() + .take(block_length as u64) + .read_to_end(data_buffer)?; - let file_size = data_buffer.as_ref().len() as u64; + let file_size = data_buffer.len() as u64; let mut reader = std::io::Cursor::new(data_buffer); @@ -180,10 +191,15 @@ fn read_next( } } arrow_format::ipc::MessageHeaderRef::DictionaryBatch(batch) => { - let mut buf = vec![0; block_length]; - reader.read_exact(&mut buf)?; + data_buffer.clear(); + data_buffer.try_reserve(block_length as usize)?; + reader + .by_ref() + .take(block_length as u64) + .read_to_end(data_buffer)?; - let mut dict_reader = std::io::Cursor::new(&buf); + let file_size = data_buffer.len() as u64; + let mut dict_reader = std::io::Cursor::new(&data_buffer); read_dictionary( batch, @@ -192,7 +208,7 @@ fn read_next( dictionaries, &mut dict_reader, 0, - buf.len() as u64, + file_size, scratch, )?; @@ -222,10 +238,10 @@ pub struct StreamReader { metadata: StreamMetadata, dictionaries: Dictionaries, finished: bool, - data_buffer: ReadBuffer, - message_buffer: ReadBuffer, + data_buffer: Vec, + message_buffer: Vec, projection: Option<(Vec, HashMap, Schema)>, - scratch: ReadBuffer, + scratch: Vec, } impl StreamReader { diff --git a/src/io/ipc/read/stream_async.rs b/src/io/ipc/read/stream_async.rs index cfbfe5ad3e0..b637dc0a2d8 100644 --- a/src/io/ipc/read/stream_async.rs +++ b/src/io/ipc/read/stream_async.rs @@ -16,7 +16,6 @@ use super::common::{read_dictionary, read_record_batch}; use super::schema::deserialize_stream_metadata; use super::Dictionaries; use super::OutOfSpecKind; -use super::ReadBuffer; use super::StreamMetadata; /// A (private) state of stream messages @@ -25,9 +24,9 @@ struct ReadState { pub metadata: StreamMetadata, pub dictionaries: Dictionaries, /// The internal buffer to read data inside the messages (records and dictionaries) to - pub data_buffer: ReadBuffer, + pub data_buffer: Vec, /// The internal buffer to read messages to - pub message_buffer: ReadBuffer, + pub message_buffer: Vec, } /// The state of an Arrow stream @@ -58,8 +57,12 @@ pub async fn read_stream_metadata_async( .try_into() .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - let mut meta_buffer = vec![0; meta_len]; - reader.read_exact(&mut meta_buffer).await?; + let mut meta_buffer = vec![]; + meta_buffer.try_reserve(meta_len as usize)?; + reader + .take(meta_len as u64) + .read_to_end(&mut meta_buffer) + .await?; deserialize_stream_metadata(&meta_buffer) } @@ -105,10 +108,11 @@ async fn maybe_next( return Ok(None); } - state.message_buffer.set_len(meta_length); - state - .reader - .read_exact(state.message_buffer.as_mut()) + state.message_buffer.clear(); + state.message_buffer.try_reserve(meta_length as usize)?; + (&mut state.reader) + .take(meta_length as u64) + .read_to_end(&mut state.message_buffer) .await?; let message = arrow_format::ipc::MessageRef::read_as_root(state.message_buffer.as_ref()) @@ -125,11 +129,14 @@ async fn maybe_next( .try_into() .map_err(|_| Error::from(OutOfSpecKind::UnexpectedNegativeInteger))?; - state.data_buffer.set_len(block_length); - match header { arrow_format::ipc::MessageHeaderRef::RecordBatch(batch) => { - state.reader.read_exact(state.data_buffer.as_mut()).await?; + state.data_buffer.clear(); + state.data_buffer.try_reserve(block_length as usize)?; + (&mut state.reader) + .take(block_length as u64) + .read_to_end(&mut state.data_buffer) + .await?; read_record_batch( batch, @@ -140,18 +147,22 @@ async fn maybe_next( state.metadata.version, &mut std::io::Cursor::new(&state.data_buffer), 0, - state.data_buffer.as_ref().len() as u64, + state.data_buffer.len() as u64, &mut scratch, ) .map(|chunk| Some(StreamState::Some((state, chunk)))) } arrow_format::ipc::MessageHeaderRef::DictionaryBatch(batch) => { - let mut body = vec![0; block_length]; - state.reader.read_exact(&mut body).await?; + state.data_buffer.clear(); + state.data_buffer.try_reserve(block_length as usize)?; + (&mut state.reader) + .take(block_length as u64) + .read_to_end(&mut state.data_buffer) + .await?; - let file_size = body.len() as u64; + let file_size = state.data_buffer.len() as u64; - let mut dict_reader = std::io::Cursor::new(body); + let mut dict_reader = std::io::Cursor::new(&state.data_buffer); read_dictionary( batch, diff --git a/src/io/mod.rs b/src/io/mod.rs index 95a17ee8bf3..9343d4281ce 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -53,9 +53,3 @@ pub mod print; #[cfg(any(feature = "io_csv_write", feature = "io_avro", feature = "io_json"))] mod iterator; - -#[cfg(feature = "io_ipc")] -mod readbuf; - -#[cfg(feature = "io_ipc")] -pub use readbuf::ReadBuffer; diff --git a/src/io/readbuf.rs b/src/io/readbuf.rs deleted file mode 100644 index d30396a8289..00000000000 --- a/src/io/readbuf.rs +++ /dev/null @@ -1,50 +0,0 @@ -/// A small wrapper around [`Vec`] that allows us to reuse memory once it is initialized. -/// This may improve performance of the [`Read`] trait. -#[derive(Clone, Default)] -pub struct ReadBuffer { - data: Vec, - // length to be read or is read - length: usize, -} - -impl ReadBuffer { - /// Set the minimal length of the [`ReadBuf`]. Contrary to the - /// method on `Vec` this is `safe` because this function guarantees that - /// the underlying data always is initialized. - pub fn set_len(&mut self, length: usize) { - if length > self.data.capacity() { - // exponential growing strategy - // benchmark showed it was ~5% faster - // in reading lz4 yellow-trip dataset - self.data = vec![0; length * 2]; - } else if length > self.data.len() { - self.data.resize(length, 0); - } - self.length = length; - } -} - -impl AsRef<[u8]> for ReadBuffer { - fn as_ref(&self) -> &[u8] { - &self.data[..self.length] - } -} - -impl AsMut<[u8]> for ReadBuffer { - fn as_mut(&mut self) -> &mut [u8] { - &mut self.data[..self.length] - } -} - -impl From> for ReadBuffer { - fn from(data: Vec) -> Self { - let length = data.len(); - Self { data, length } - } -} - -impl From for Vec { - fn from(buf: ReadBuffer) -> Self { - buf.data - } -} From 5b3e897be986189308d04683daf04c88a6fa9473 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Wed, 29 Jun 2022 14:24:57 +0000 Subject: [PATCH 3/3] Improved reading of bitmaps from IPC --- src/io/ipc/read/array/binary.rs | 1 + src/io/ipc/read/array/boolean.rs | 4 ++++ src/io/ipc/read/array/fixed_size_binary.rs | 1 + src/io/ipc/read/array/fixed_size_list.rs | 1 + src/io/ipc/read/array/list.rs | 1 + src/io/ipc/read/array/map.rs | 1 + src/io/ipc/read/array/primitive.rs | 1 + src/io/ipc/read/array/struct_.rs | 1 + src/io/ipc/read/array/utf8.rs | 1 + src/io/ipc/read/deserialize.rs | 3 ++- src/io/ipc/read/read_basic.rs | 28 +++++++++++++--------- src/io/ipc/read/reader.rs | 9 +++++-- 12 files changed, 38 insertions(+), 14 deletions(-) diff --git a/src/io/ipc/read/array/binary.rs b/src/io/ipc/read/array/binary.rs index 917f46b4017..f32ca3e8713 100644 --- a/src/io/ipc/read/array/binary.rs +++ b/src/io/ipc/read/array/binary.rs @@ -34,6 +34,7 @@ pub fn read_binary( block_offset, is_little_endian, compression, + scratch, )?; let length: usize = field_node diff --git a/src/io/ipc/read/array/boolean.rs b/src/io/ipc/read/array/boolean.rs index 4bfe9063a57..4440062a299 100644 --- a/src/io/ipc/read/array/boolean.rs +++ b/src/io/ipc/read/array/boolean.rs @@ -8,6 +8,7 @@ use crate::error::{Error, Result}; use super::super::read_basic::*; use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind}; +#[allow(clippy::too_many_arguments)] pub fn read_boolean( field_nodes: &mut VecDeque, data_type: DataType, @@ -16,6 +17,7 @@ pub fn read_boolean( block_offset: u64, is_little_endian: bool, compression: Option, + scratch: &mut Vec, ) -> Result { let field_node = field_nodes.pop_front().ok_or_else(|| { Error::oos(format!( @@ -36,6 +38,7 @@ pub fn read_boolean( block_offset, is_little_endian, compression, + scratch, )?; let values = read_bitmap( @@ -45,6 +48,7 @@ pub fn read_boolean( block_offset, is_little_endian, compression, + scratch, )?; BooleanArray::try_new(data_type, values, validity) } diff --git a/src/io/ipc/read/array/fixed_size_binary.rs b/src/io/ipc/read/array/fixed_size_binary.rs index e4cb0480def..ffceb00bc94 100644 --- a/src/io/ipc/read/array/fixed_size_binary.rs +++ b/src/io/ipc/read/array/fixed_size_binary.rs @@ -33,6 +33,7 @@ pub fn read_fixed_size_binary( block_offset, is_little_endian, compression, + scratch, )?; let length: usize = field_node diff --git a/src/io/ipc/read/array/fixed_size_list.rs b/src/io/ipc/read/array/fixed_size_list.rs index d8eb9725ed8..3ea4ef42b30 100644 --- a/src/io/ipc/read/array/fixed_size_list.rs +++ b/src/io/ipc/read/array/fixed_size_list.rs @@ -38,6 +38,7 @@ pub fn read_fixed_size_list( block_offset, is_little_endian, compression, + scratch, )?; let (field, _) = FixedSizeListArray::get_child_and_size(&data_type); diff --git a/src/io/ipc/read/array/list.rs b/src/io/ipc/read/array/list.rs index 44fb17bd2a6..e12dc5e02ec 100644 --- a/src/io/ipc/read/array/list.rs +++ b/src/io/ipc/read/array/list.rs @@ -43,6 +43,7 @@ where block_offset, is_little_endian, compression, + scratch, )?; let length: usize = field_node diff --git a/src/io/ipc/read/array/map.rs b/src/io/ipc/read/array/map.rs index 5020a157a0a..8a2a5e4a124 100644 --- a/src/io/ipc/read/array/map.rs +++ b/src/io/ipc/read/array/map.rs @@ -39,6 +39,7 @@ pub fn read_map( block_offset, is_little_endian, compression, + scratch, )?; let length: usize = field_node diff --git a/src/io/ipc/read/array/primitive.rs b/src/io/ipc/read/array/primitive.rs index 75a62e5a835..434aab0bd9c 100644 --- a/src/io/ipc/read/array/primitive.rs +++ b/src/io/ipc/read/array/primitive.rs @@ -36,6 +36,7 @@ where block_offset, is_little_endian, compression, + scratch, )?; let length: usize = field_node diff --git a/src/io/ipc/read/array/struct_.rs b/src/io/ipc/read/array/struct_.rs index 469689d247b..3bbd777a019 100644 --- a/src/io/ipc/read/array/struct_.rs +++ b/src/io/ipc/read/array/struct_.rs @@ -38,6 +38,7 @@ pub fn read_struct( block_offset, is_little_endian, compression, + scratch, )?; let fields = StructArray::get_fields(&data_type); diff --git a/src/io/ipc/read/array/utf8.rs b/src/io/ipc/read/array/utf8.rs index af25eaed896..342da0c4de8 100644 --- a/src/io/ipc/read/array/utf8.rs +++ b/src/io/ipc/read/array/utf8.rs @@ -34,6 +34,7 @@ pub fn read_utf8( block_offset, is_little_endian, compression, + scratch, )?; let length: usize = field_node diff --git a/src/io/ipc/read/deserialize.rs b/src/io/ipc/read/deserialize.rs index 3ed5c9349c7..780f33e0d59 100644 --- a/src/io/ipc/read/deserialize.rs +++ b/src/io/ipc/read/deserialize.rs @@ -39,6 +39,7 @@ pub fn read( block_offset, is_little_endian, compression, + scratch, ) .map(|x| x.boxed()), Primitive(primitive) => with_match_primitive_type!(primitive, |$T| { @@ -50,7 +51,7 @@ pub fn read( block_offset, is_little_endian, compression, - scratch + scratch, ) .map(|x| x.boxed()) }), diff --git a/src/io/ipc/read/read_basic.rs b/src/io/ipc/read/read_basic.rs index 7a5ae859a6a..e58b01b58ed 100644 --- a/src/io/ipc/read/read_basic.rs +++ b/src/io/ipc/read/read_basic.rs @@ -181,10 +181,13 @@ fn read_uncompressed_bitmap( number_of_bits: bytes * 8, })); } - // it is undefined behavior to call read_exact on un-initialized, https://doc.rust-lang.org/std/io/trait.Read.html#tymethod.read - // see also https://github.com/MaikKlein/ash/issues/354#issue-781730580 - let mut buffer = vec![0; bytes]; - reader.read_exact(buffer.as_mut_slice())?; + + let mut buffer = vec![]; + buffer.try_reserve(bytes)?; + reader + .by_ref() + .take(bytes as u64) + .read_to_end(&mut buffer)?; Ok(buffer) } @@ -194,13 +197,13 @@ fn read_compressed_bitmap( bytes: usize, compression: Compression, reader: &mut R, + scratch: &mut Vec, ) -> Result> { let mut buffer = vec![0; (length + 7) / 8]; - // read all first - // todo: move this allocation to an external buffer for re-use - let mut slice = vec![0u8; bytes]; - reader.read_exact(&mut slice)?; + scratch.clear(); + scratch.try_reserve(bytes)?; + reader.by_ref().take(bytes as u64).read_to_end(scratch)?; let compression = compression .codec() @@ -208,10 +211,10 @@ fn read_compressed_bitmap( match compression { arrow_format::ipc::CompressionType::Lz4Frame => { - compression::decompress_lz4(&slice[8..], &mut buffer)?; + compression::decompress_lz4(&scratch[8..], &mut buffer)?; } arrow_format::ipc::CompressionType::Zstd => { - compression::decompress_zstd(&slice[8..], &mut buffer)?; + compression::decompress_zstd(&scratch[8..], &mut buffer)?; } } Ok(buffer) @@ -224,6 +227,7 @@ pub fn read_bitmap( block_offset: u64, _: bool, compression: Option, + scratch: &mut Vec, ) -> Result { let buf = buf .pop_front() @@ -242,7 +246,7 @@ pub fn read_bitmap( reader.seek(SeekFrom::Start(block_offset + offset))?; let buffer = if let Some(compression) = compression { - read_compressed_bitmap(length, bytes, compression, reader) + read_compressed_bitmap(length, bytes, compression, reader, scratch) } else { read_uncompressed_bitmap(length, bytes, reader) }?; @@ -257,6 +261,7 @@ pub fn read_validity( block_offset: u64, is_little_endian: bool, compression: Option, + scratch: &mut Vec, ) -> Result> { let length: usize = field_node .length() @@ -271,6 +276,7 @@ pub fn read_validity( block_offset, is_little_endian, compression, + scratch, )?) } else { let _ = buffers diff --git a/src/io/ipc/read/reader.rs b/src/io/ipc/read/reader.rs index 776ee684e82..801f5041aed 100644 --- a/src/io/ipc/read/reader.rs +++ b/src/io/ipc/read/reader.rs @@ -230,9 +230,14 @@ pub fn read_file_metadata(reader: &mut R) -> Result