diff --git a/src/io/ipc/read/array/binary.rs b/src/io/ipc/read/array/binary.rs index 51d3d7d26b8..917f46b4017 100644 --- a/src/io/ipc/read/array/binary.rs +++ b/src/io/ipc/read/array/binary.rs @@ -7,7 +7,7 @@ use crate::datatypes::DataType; use crate::error::{Error, Result}; use super::super::read_basic::*; -use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind, ReadBuffer}; +use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind}; #[allow(clippy::too_many_arguments)] pub fn read_binary( @@ -18,7 +18,7 @@ pub fn read_binary( block_offset: u64, is_little_endian: bool, compression: Option, - scratch: &mut ReadBuffer, + scratch: &mut Vec, ) -> Result> { let field_node = field_nodes.pop_front().ok_or_else(|| { Error::oos(format!( diff --git a/src/io/ipc/read/array/dictionary.rs b/src/io/ipc/read/array/dictionary.rs index 0b212c45012..63a0cb0b80f 100644 --- a/src/io/ipc/read/array/dictionary.rs +++ b/src/io/ipc/read/array/dictionary.rs @@ -6,7 +6,7 @@ use crate::array::{DictionaryArray, DictionaryKey}; use crate::error::{Error, Result}; use super::super::Dictionaries; -use super::super::{Compression, IpcBuffer, Node, ReadBuffer}; +use super::super::{Compression, IpcBuffer, Node}; use super::{read_primitive, skip_primitive}; #[allow(clippy::too_many_arguments)] @@ -19,7 +19,7 @@ pub fn read_dictionary( block_offset: u64, compression: Option, is_little_endian: bool, - scratch: &mut ReadBuffer, + scratch: &mut Vec, ) -> Result> where Vec: TryInto, diff --git a/src/io/ipc/read/array/fixed_size_binary.rs b/src/io/ipc/read/array/fixed_size_binary.rs index 5dc1b2ab0bf..e4cb0480def 100644 --- a/src/io/ipc/read/array/fixed_size_binary.rs +++ b/src/io/ipc/read/array/fixed_size_binary.rs @@ -6,7 +6,7 @@ use crate::datatypes::DataType; use crate::error::{Error, Result}; use super::super::read_basic::*; -use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind, ReadBuffer}; +use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind}; #[allow(clippy::too_many_arguments)] pub fn read_fixed_size_binary( @@ -17,7 +17,7 @@ pub fn read_fixed_size_binary( block_offset: u64, is_little_endian: bool, compression: Option, - scratch: &mut ReadBuffer, + scratch: &mut Vec, ) -> Result { let field_node = field_nodes.pop_front().ok_or_else(|| { Error::oos(format!( diff --git a/src/io/ipc/read/array/fixed_size_list.rs b/src/io/ipc/read/array/fixed_size_list.rs index ac169ed3f67..d8eb9725ed8 100644 --- a/src/io/ipc/read/array/fixed_size_list.rs +++ b/src/io/ipc/read/array/fixed_size_list.rs @@ -8,7 +8,7 @@ use crate::error::{Error, Result}; use super::super::super::IpcField; use super::super::deserialize::{read, skip}; use super::super::read_basic::*; -use super::super::{Compression, Dictionaries, IpcBuffer, Node, ReadBuffer, Version}; +use super::super::{Compression, Dictionaries, IpcBuffer, Node, Version}; #[allow(clippy::too_many_arguments)] pub fn read_fixed_size_list( @@ -22,7 +22,7 @@ pub fn read_fixed_size_list( is_little_endian: bool, compression: Option, version: Version, - scratch: &mut ReadBuffer, + scratch: &mut Vec, ) -> Result { let field_node = field_nodes.pop_front().ok_or_else(|| { Error::oos(format!( diff --git a/src/io/ipc/read/array/list.rs b/src/io/ipc/read/array/list.rs index bb85f233cf4..44fb17bd2a6 100644 --- a/src/io/ipc/read/array/list.rs +++ b/src/io/ipc/read/array/list.rs @@ -10,9 +10,7 @@ use crate::error::{Error, Result}; use super::super::super::IpcField; use super::super::deserialize::{read, skip}; use super::super::read_basic::*; -use super::super::{ - Compression, Dictionaries, IpcBuffer, Node, OutOfSpecKind, ReadBuffer, Version, -}; +use super::super::{Compression, Dictionaries, IpcBuffer, Node, OutOfSpecKind, Version}; #[allow(clippy::too_many_arguments)] pub fn read_list( @@ -26,7 +24,7 @@ pub fn read_list( is_little_endian: bool, compression: Option, version: Version, - scratch: &mut ReadBuffer, + scratch: &mut Vec, ) -> Result> where Vec: TryInto, diff --git a/src/io/ipc/read/array/map.rs b/src/io/ipc/read/array/map.rs index 40b4f5c0389..5020a157a0a 100644 --- a/src/io/ipc/read/array/map.rs +++ b/src/io/ipc/read/array/map.rs @@ -9,9 +9,7 @@ use crate::error::{Error, Result}; use super::super::super::IpcField; use super::super::deserialize::{read, skip}; use super::super::read_basic::*; -use super::super::{ - Compression, Dictionaries, IpcBuffer, Node, OutOfSpecKind, ReadBuffer, Version, -}; +use super::super::{Compression, Dictionaries, IpcBuffer, Node, OutOfSpecKind, Version}; #[allow(clippy::too_many_arguments)] pub fn read_map( @@ -25,7 +23,7 @@ pub fn read_map( is_little_endian: bool, compression: Option, version: Version, - scratch: &mut ReadBuffer, + scratch: &mut Vec, ) -> Result { let field_node = field_nodes.pop_front().ok_or_else(|| { Error::oos(format!( diff --git a/src/io/ipc/read/array/primitive.rs b/src/io/ipc/read/array/primitive.rs index 1e4000123b6..75a62e5a835 100644 --- a/src/io/ipc/read/array/primitive.rs +++ b/src/io/ipc/read/array/primitive.rs @@ -6,7 +6,7 @@ use crate::error::{Error, Result}; use crate::{array::PrimitiveArray, types::NativeType}; use super::super::read_basic::*; -use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind, ReadBuffer}; +use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind}; #[allow(clippy::too_many_arguments)] pub fn read_primitive( @@ -17,7 +17,7 @@ pub fn read_primitive( block_offset: u64, is_little_endian: bool, compression: Option, - scratch: &mut ReadBuffer, + scratch: &mut Vec, ) -> Result> where Vec: TryInto, diff --git a/src/io/ipc/read/array/struct_.rs b/src/io/ipc/read/array/struct_.rs index ae23c0f8829..469689d247b 100644 --- a/src/io/ipc/read/array/struct_.rs +++ b/src/io/ipc/read/array/struct_.rs @@ -8,7 +8,7 @@ use crate::error::{Error, Result}; use super::super::super::IpcField; use super::super::deserialize::{read, skip}; use super::super::read_basic::*; -use super::super::{Compression, Dictionaries, IpcBuffer, Node, ReadBuffer, Version}; +use super::super::{Compression, Dictionaries, IpcBuffer, Node, Version}; #[allow(clippy::too_many_arguments)] pub fn read_struct( @@ -22,7 +22,7 @@ pub fn read_struct( is_little_endian: bool, compression: Option, version: Version, - scratch: &mut ReadBuffer, + scratch: &mut Vec, ) -> Result { let field_node = field_nodes.pop_front().ok_or_else(|| { Error::oos(format!( diff --git a/src/io/ipc/read/array/union.rs b/src/io/ipc/read/array/union.rs index fc907a32395..678d65ded9d 100644 --- a/src/io/ipc/read/array/union.rs +++ b/src/io/ipc/read/array/union.rs @@ -9,9 +9,7 @@ use crate::error::{Error, Result}; use super::super::super::IpcField; use super::super::deserialize::{read, skip}; use super::super::read_basic::*; -use super::super::{ - Compression, Dictionaries, IpcBuffer, Node, OutOfSpecKind, ReadBuffer, Version, -}; +use super::super::{Compression, Dictionaries, IpcBuffer, Node, OutOfSpecKind, Version}; #[allow(clippy::too_many_arguments)] pub fn read_union( @@ -25,7 +23,7 @@ pub fn read_union( is_little_endian: bool, compression: Option, version: Version, - scratch: &mut ReadBuffer, + scratch: &mut Vec, ) -> Result { let field_node = field_nodes.pop_front().ok_or_else(|| { Error::oos(format!( diff --git a/src/io/ipc/read/array/utf8.rs b/src/io/ipc/read/array/utf8.rs index 10f1260f5ac..af25eaed896 100644 --- a/src/io/ipc/read/array/utf8.rs +++ b/src/io/ipc/read/array/utf8.rs @@ -7,7 +7,7 @@ use crate::datatypes::DataType; use crate::error::{Error, Result}; use super::super::read_basic::*; -use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind, ReadBuffer}; +use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind}; #[allow(clippy::too_many_arguments)] pub fn read_utf8( @@ -18,7 +18,7 @@ pub fn read_utf8( block_offset: u64, is_little_endian: bool, compression: Option, - scratch: &mut ReadBuffer, + scratch: &mut Vec, ) -> Result> { let field_node = field_nodes.pop_front().ok_or_else(|| { Error::oos(format!( diff --git a/src/io/ipc/read/common.rs b/src/io/ipc/read/common.rs index 18ddd8cbb8b..26252dd1831 100644 --- a/src/io/ipc/read/common.rs +++ b/src/io/ipc/read/common.rs @@ -9,7 +9,6 @@ use crate::datatypes::{DataType, Field}; use crate::error::{Error, Result}; use crate::io::ipc::read::OutOfSpecKind; use crate::io::ipc::{IpcField, IpcSchema}; -use crate::io::ReadBuffer; use super::deserialize::{read, skip}; use super::Dictionaries; @@ -86,7 +85,7 @@ pub fn read_record_batch( reader: &mut R, block_offset: u64, file_size: u64, - scratch: &mut ReadBuffer, + scratch: &mut Vec, ) -> Result>> { assert_eq!(fields.len(), ipc_schema.fields.len()); let buffers = batch @@ -234,7 +233,7 @@ pub fn read_dictionary( reader: &mut R, block_offset: u64, file_size: u64, - scratch: &mut ReadBuffer, + scratch: &mut Vec, ) -> Result<()> { if batch .is_delta() diff --git a/src/io/ipc/read/deserialize.rs b/src/io/ipc/read/deserialize.rs index 3378f99a894..3ed5c9349c7 100644 --- a/src/io/ipc/read/deserialize.rs +++ b/src/io/ipc/read/deserialize.rs @@ -10,7 +10,7 @@ use crate::error::Result; use crate::io::ipc::IpcField; use super::{array::*, Dictionaries}; -use super::{IpcBuffer, Node, ReadBuffer}; +use super::{IpcBuffer, Node}; #[allow(clippy::too_many_arguments)] pub fn read( @@ -24,7 +24,7 @@ pub fn read( is_little_endian: bool, compression: Option, version: MetadataVersion, - scratch: &mut ReadBuffer, + scratch: &mut Vec, ) -> Result> { use PhysicalType::*; let data_type = field.data_type.clone(); diff --git a/src/io/ipc/read/file_async.rs b/src/io/ipc/read/file_async.rs index cabfdd43b70..100990a6509 100644 --- a/src/io/ipc/read/file_async.rs +++ b/src/io/ipc/read/file_async.rs @@ -18,7 +18,6 @@ use super::reader::{deserialize_footer, get_serialized_batch}; use super::Dictionaries; use super::FileMetadata; use super::OutOfSpecKind; -use super::ReadBuffer; /// Async reader for Arrow IPC files pub struct FileStream<'a> { @@ -142,9 +141,14 @@ where { let footer_size = read_footer_len(reader).await?; // Read footer - let mut footer = vec![0; footer_size]; reader.seek(SeekFrom::End(-10 - footer_size as i64)).await?; - reader.read_exact(&mut footer).await?; + + let mut footer = vec![]; + footer.try_reserve(footer_size)?; + reader + .take(footer_size as u64) + .read_to_end(&mut footer) + .await?; deserialize_footer(&footer, u64::MAX) } @@ -156,9 +160,9 @@ async fn read_batch( metadata: &FileMetadata, projection: Option<&[usize]>, block: usize, - meta_buffer: &mut ReadBuffer, - block_buffer: &mut ReadBuffer, - scratch: &mut ReadBuffer, + meta_buffer: &mut Vec, + block_buffer: &mut Vec, + scratch: &mut Vec, ) -> Result>> where R: AsyncRead + AsyncSeek + Unpin, @@ -181,10 +185,14 @@ where .try_into() .map_err(|_| Error::from(OutOfSpecKind::UnexpectedNegativeInteger))?; - meta_buffer.set_len(meta_len); - reader.read_exact(meta_buffer.as_mut()).await?; + meta_buffer.clear(); + meta_buffer.try_reserve(meta_len)?; + (&mut reader) + .take(meta_len as u64) + .read_to_end(meta_buffer) + .await?; - let message = arrow_format::ipc::MessageRef::read_as_root(meta_buffer.as_ref()) + let message = arrow_format::ipc::MessageRef::read_as_root(meta_buffer) .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?; let batch = get_serialized_batch(&message)?; @@ -195,9 +203,14 @@ where .try_into() .map_err(|_| Error::from(OutOfSpecKind::UnexpectedNegativeInteger))?; - block_buffer.set_len(block_length); - reader.read_exact(block_buffer.as_mut()).await?; - let mut cursor = std::io::Cursor::new(block_buffer.as_ref()); + block_buffer.clear(); + block_buffer.try_reserve(block_length)?; + reader + .take(block_length as u64) + .read_to_end(block_buffer) + .await?; + + let mut cursor = std::io::Cursor::new(&block_buffer); read_record_batch( batch, @@ -220,14 +233,14 @@ async fn read_dictionaries( fields: &[Field], ipc_schema: &IpcSchema, blocks: &[Block], - scratch: &mut ReadBuffer, + scratch: &mut Vec, ) -> Result where R: AsyncRead + AsyncSeek + Unpin, { let mut dictionaries = Default::default(); - let mut data: ReadBuffer = vec![].into(); - let mut buffer: ReadBuffer = vec![].into(); + let mut data: Vec = vec![]; + let mut buffer: Vec = vec![]; for block in blocks { let offset: u64 = block @@ -250,11 +263,15 @@ where .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferHeader(err)))? .ok_or_else(|| Error::from(OutOfSpecKind::MissingMessageHeader))?; - buffer.set_len(length); match header { MessageHeaderRef::DictionaryBatch(batch) => { - reader.read_exact(buffer.as_mut()).await?; - let mut cursor = std::io::Cursor::new(buffer.as_ref()); + buffer.clear(); + buffer.try_reserve(length)?; + (&mut reader) + .take(length as u64) + .read_to_end(&mut buffer) + .await?; + let mut cursor = std::io::Cursor::new(&buffer); read_dictionary( batch, fields, @@ -272,7 +289,7 @@ where Ok(dictionaries) } -async fn read_dictionary_message(mut reader: R, offset: u64, data: &mut ReadBuffer) -> Result<()> +async fn read_dictionary_message(mut reader: R, offset: u64, data: &mut Vec) -> Result<()> where R: AsyncRead + AsyncSeek + Unpin, { @@ -288,8 +305,12 @@ where .try_into() .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - data.set_len(footer_size); - reader.read_exact(data.as_mut()).await?; + data.clear(); + data.try_reserve(footer_size)?; + (&mut reader) + .take(footer_size as u64) + .read_to_end(data) + .await?; Ok(()) } diff --git a/src/io/ipc/read/mod.rs b/src/io/ipc/read/mod.rs index f25da7a4a1c..5ffe6426e20 100644 --- a/src/io/ipc/read/mod.rs +++ b/src/io/ipc/read/mod.rs @@ -27,8 +27,6 @@ pub mod stream_async; #[cfg_attr(docsrs, doc(cfg(feature = "io_ipc_read_async")))] pub mod file_async; -use super::super::ReadBuffer; - pub use common::{read_dictionary, read_record_batch}; pub use reader::{ read_batch, read_file_dictionaries, read_file_metadata, FileMetadata, FileReader, diff --git a/src/io/ipc/read/read_basic.rs b/src/io/ipc/read/read_basic.rs index cc04035eec7..7a5ae859a6a 100644 --- a/src/io/ipc/read/read_basic.rs +++ b/src/io/ipc/read/read_basic.rs @@ -7,7 +7,7 @@ use crate::{bitmap::Bitmap, types::NativeType}; use super::super::compression; use super::super::endianess::is_native_little_endian; -use super::{Compression, IpcBuffer, Node, OutOfSpecKind, ReadBuffer}; +use super::{Compression, IpcBuffer, Node, OutOfSpecKind}; fn read_swapped( reader: &mut R, @@ -93,7 +93,7 @@ fn read_compressed_buffer( length: usize, is_little_endian: bool, compression: Compression, - scratch: &mut ReadBuffer, + scratch: &mut Vec, ) -> Result> { if is_little_endian != is_native_little_endian() { return Err(Error::NotYetImplemented( @@ -106,8 +106,12 @@ fn read_compressed_buffer( let mut buffer = vec![T::default(); length]; // decompress first - scratch.set_len(buffer_length); - reader.read_exact(scratch.as_mut())?; + scratch.clear(); + scratch.try_reserve(buffer_length)?; + reader + .by_ref() + .take(buffer_length as u64) + .read_to_end(scratch)?; let out_slice = bytemuck::cast_slice_mut(&mut buffer); @@ -117,10 +121,10 @@ fn read_compressed_buffer( match compression { arrow_format::ipc::CompressionType::Lz4Frame => { - compression::decompress_lz4(&scratch.as_ref()[8..], out_slice)?; + compression::decompress_lz4(&scratch[8..], out_slice)?; } arrow_format::ipc::CompressionType::Zstd => { - compression::decompress_zstd(&scratch.as_ref()[8..], out_slice)?; + compression::decompress_zstd(&scratch[8..], out_slice)?; } } Ok(buffer) @@ -133,7 +137,7 @@ pub fn read_buffer( block_offset: u64, is_little_endian: bool, compression: Option, - scratch: &mut ReadBuffer, + scratch: &mut Vec, ) -> Result> { let buf = buf .pop_front() diff --git a/src/io/ipc/read/reader.rs b/src/io/ipc/read/reader.rs index b01d85cf499..776ee684e82 100644 --- a/src/io/ipc/read/reader.rs +++ b/src/io/ipc/read/reader.rs @@ -13,7 +13,6 @@ use super::common::*; use super::schema::fb_to_schema; use super::Dictionaries; use super::OutOfSpecKind; -use super::ReadBuffer; use arrow_format::ipc::planus::ReadAsRoot; /// Metadata of an Arrow IPC file, written in the footer of the file. @@ -45,14 +44,14 @@ pub struct FileReader { dictionaries: Option, current_block: usize, projection: Option<(Vec, HashMap, Schema)>, - data_scratch: ReadBuffer, - message_scratch: ReadBuffer, + data_scratch: Vec, + message_scratch: Vec, } fn read_dictionary_message( reader: &mut R, offset: u64, - data: &mut ReadBuffer, + data: &mut Vec, ) -> Result<()> { let mut message_size: [u8; 4] = [0; 4]; reader.seek(SeekFrom::Start(offset))?; @@ -66,8 +65,13 @@ fn read_dictionary_message( .try_into() .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - data.set_len(message_length); - reader.read_exact(data.as_mut())?; + data.clear(); + data.try_reserve(message_length)?; + reader + .by_ref() + .take(message_length as u64) + .read_to_end(data)?; + Ok(()) } @@ -76,8 +80,8 @@ fn read_dictionary_block( metadata: &FileMetadata, block: &arrow_format::ipc::Block, dictionaries: &mut Dictionaries, - message_scratch: &mut ReadBuffer, - dictionary_scratch: &mut ReadBuffer, + message_scratch: &mut Vec, + dictionary_scratch: &mut Vec, ) -> Result<()> { let offset: u64 = block .offset @@ -120,7 +124,7 @@ fn read_dictionary_block( pub fn read_file_dictionaries( reader: &mut R, metadata: &FileMetadata, - scratch: &mut ReadBuffer, + scratch: &mut Vec, ) -> Result { let mut dictionaries = Default::default(); @@ -259,8 +263,8 @@ pub fn read_batch( metadata: &FileMetadata, projection: Option<&[usize]>, index: usize, - message_scratch: &mut ReadBuffer, - data_scratch: &mut ReadBuffer, + message_scratch: &mut Vec, + data_scratch: &mut Vec, ) -> Result>> { let block = metadata.blocks[index]; @@ -281,8 +285,12 @@ pub fn read_batch( .try_into() .map_err(|_| Error::from(OutOfSpecKind::UnexpectedNegativeInteger))?; - message_scratch.set_len(meta_len); - reader.read_exact(message_scratch.as_mut())?; + message_scratch.clear(); + message_scratch.try_reserve(meta_len)?; + reader + .by_ref() + .take(meta_len as u64) + .read_to_end(message_scratch)?; let message = arrow_format::ipc::MessageRef::read_as_root(message_scratch.as_ref()) .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?; diff --git a/src/io/ipc/read/stream.rs b/src/io/ipc/read/stream.rs index 5dd7b5cddda..e7d3d590aa4 100644 --- a/src/io/ipc/read/stream.rs +++ b/src/io/ipc/read/stream.rs @@ -15,7 +15,6 @@ use super::common::*; use super::schema::deserialize_stream_metadata; use super::Dictionaries; use super::OutOfSpecKind; -use super::ReadBuffer; /// Metadata of an Arrow IPC stream, written at the start of the stream #[derive(Debug, Clone)] @@ -44,14 +43,18 @@ pub fn read_stream_metadata(reader: &mut R) -> Result { i32::from_le_bytes(meta_size) }; - let meta_length: usize = meta_length + let length: usize = meta_length .try_into() .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - let mut meta_buffer = vec![0; meta_length]; - reader.read_exact(&mut meta_buffer)?; + let mut buffer = vec![]; + buffer.try_reserve(length as usize)?; + reader + .by_ref() + .take(length as u64) + .read_to_end(&mut buffer)?; - deserialize_stream_metadata(&meta_buffer) + deserialize_stream_metadata(&buffer) } /// Encodes the stream's status after each read. @@ -91,10 +94,10 @@ fn read_next( reader: &mut R, metadata: &StreamMetadata, dictionaries: &mut Dictionaries, - message_buffer: &mut ReadBuffer, - data_buffer: &mut ReadBuffer, + message_buffer: &mut Vec, + data_buffer: &mut Vec, projection: &Option<(Vec, HashMap, Schema)>, - scratch: &mut ReadBuffer, + scratch: &mut Vec, ) -> Result> { // determine metadata length let mut meta_length: [u8; 4] = [0; 4]; @@ -131,8 +134,12 @@ fn read_next( return Ok(None); } - message_buffer.set_len(meta_length); - reader.read_exact(message_buffer.as_mut())?; + message_buffer.clear(); + message_buffer.try_reserve(meta_length as usize)?; + reader + .by_ref() + .take(meta_length as u64) + .read_to_end(message_buffer)?; let message = arrow_format::ipc::MessageRef::read_as_root(message_buffer.as_ref()) .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?; @@ -148,12 +155,16 @@ fn read_next( .try_into() .map_err(|_| Error::from(OutOfSpecKind::UnexpectedNegativeInteger))?; - data_buffer.set_len(block_length); match header { arrow_format::ipc::MessageHeaderRef::RecordBatch(batch) => { - reader.read_exact(data_buffer.as_mut())?; + data_buffer.clear(); + data_buffer.try_reserve(block_length as usize)?; + reader + .by_ref() + .take(block_length as u64) + .read_to_end(data_buffer)?; - let file_size = data_buffer.as_ref().len() as u64; + let file_size = data_buffer.len() as u64; let mut reader = std::io::Cursor::new(data_buffer); @@ -180,10 +191,15 @@ fn read_next( } } arrow_format::ipc::MessageHeaderRef::DictionaryBatch(batch) => { - let mut buf = vec![0; block_length]; - reader.read_exact(&mut buf)?; + data_buffer.clear(); + data_buffer.try_reserve(block_length as usize)?; + reader + .by_ref() + .take(block_length as u64) + .read_to_end(data_buffer)?; - let mut dict_reader = std::io::Cursor::new(&buf); + let file_size = data_buffer.len() as u64; + let mut dict_reader = std::io::Cursor::new(&data_buffer); read_dictionary( batch, @@ -192,7 +208,7 @@ fn read_next( dictionaries, &mut dict_reader, 0, - buf.len() as u64, + file_size, scratch, )?; @@ -222,10 +238,10 @@ pub struct StreamReader { metadata: StreamMetadata, dictionaries: Dictionaries, finished: bool, - data_buffer: ReadBuffer, - message_buffer: ReadBuffer, + data_buffer: Vec, + message_buffer: Vec, projection: Option<(Vec, HashMap, Schema)>, - scratch: ReadBuffer, + scratch: Vec, } impl StreamReader { diff --git a/src/io/ipc/read/stream_async.rs b/src/io/ipc/read/stream_async.rs index cfbfe5ad3e0..b637dc0a2d8 100644 --- a/src/io/ipc/read/stream_async.rs +++ b/src/io/ipc/read/stream_async.rs @@ -16,7 +16,6 @@ use super::common::{read_dictionary, read_record_batch}; use super::schema::deserialize_stream_metadata; use super::Dictionaries; use super::OutOfSpecKind; -use super::ReadBuffer; use super::StreamMetadata; /// A (private) state of stream messages @@ -25,9 +24,9 @@ struct ReadState { pub metadata: StreamMetadata, pub dictionaries: Dictionaries, /// The internal buffer to read data inside the messages (records and dictionaries) to - pub data_buffer: ReadBuffer, + pub data_buffer: Vec, /// The internal buffer to read messages to - pub message_buffer: ReadBuffer, + pub message_buffer: Vec, } /// The state of an Arrow stream @@ -58,8 +57,12 @@ pub async fn read_stream_metadata_async( .try_into() .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - let mut meta_buffer = vec![0; meta_len]; - reader.read_exact(&mut meta_buffer).await?; + let mut meta_buffer = vec![]; + meta_buffer.try_reserve(meta_len as usize)?; + reader + .take(meta_len as u64) + .read_to_end(&mut meta_buffer) + .await?; deserialize_stream_metadata(&meta_buffer) } @@ -105,10 +108,11 @@ async fn maybe_next( return Ok(None); } - state.message_buffer.set_len(meta_length); - state - .reader - .read_exact(state.message_buffer.as_mut()) + state.message_buffer.clear(); + state.message_buffer.try_reserve(meta_length as usize)?; + (&mut state.reader) + .take(meta_length as u64) + .read_to_end(&mut state.message_buffer) .await?; let message = arrow_format::ipc::MessageRef::read_as_root(state.message_buffer.as_ref()) @@ -125,11 +129,14 @@ async fn maybe_next( .try_into() .map_err(|_| Error::from(OutOfSpecKind::UnexpectedNegativeInteger))?; - state.data_buffer.set_len(block_length); - match header { arrow_format::ipc::MessageHeaderRef::RecordBatch(batch) => { - state.reader.read_exact(state.data_buffer.as_mut()).await?; + state.data_buffer.clear(); + state.data_buffer.try_reserve(block_length as usize)?; + (&mut state.reader) + .take(block_length as u64) + .read_to_end(&mut state.data_buffer) + .await?; read_record_batch( batch, @@ -140,18 +147,22 @@ async fn maybe_next( state.metadata.version, &mut std::io::Cursor::new(&state.data_buffer), 0, - state.data_buffer.as_ref().len() as u64, + state.data_buffer.len() as u64, &mut scratch, ) .map(|chunk| Some(StreamState::Some((state, chunk)))) } arrow_format::ipc::MessageHeaderRef::DictionaryBatch(batch) => { - let mut body = vec![0; block_length]; - state.reader.read_exact(&mut body).await?; + state.data_buffer.clear(); + state.data_buffer.try_reserve(block_length as usize)?; + (&mut state.reader) + .take(block_length as u64) + .read_to_end(&mut state.data_buffer) + .await?; - let file_size = body.len() as u64; + let file_size = state.data_buffer.len() as u64; - let mut dict_reader = std::io::Cursor::new(body); + let mut dict_reader = std::io::Cursor::new(&state.data_buffer); read_dictionary( batch, diff --git a/src/io/mod.rs b/src/io/mod.rs index 95a17ee8bf3..9343d4281ce 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -53,9 +53,3 @@ pub mod print; #[cfg(any(feature = "io_csv_write", feature = "io_avro", feature = "io_json"))] mod iterator; - -#[cfg(feature = "io_ipc")] -mod readbuf; - -#[cfg(feature = "io_ipc")] -pub use readbuf::ReadBuffer; diff --git a/src/io/readbuf.rs b/src/io/readbuf.rs deleted file mode 100644 index d30396a8289..00000000000 --- a/src/io/readbuf.rs +++ /dev/null @@ -1,50 +0,0 @@ -/// A small wrapper around [`Vec`] that allows us to reuse memory once it is initialized. -/// This may improve performance of the [`Read`] trait. -#[derive(Clone, Default)] -pub struct ReadBuffer { - data: Vec, - // length to be read or is read - length: usize, -} - -impl ReadBuffer { - /// Set the minimal length of the [`ReadBuf`]. Contrary to the - /// method on `Vec` this is `safe` because this function guarantees that - /// the underlying data always is initialized. - pub fn set_len(&mut self, length: usize) { - if length > self.data.capacity() { - // exponential growing strategy - // benchmark showed it was ~5% faster - // in reading lz4 yellow-trip dataset - self.data = vec![0; length * 2]; - } else if length > self.data.len() { - self.data.resize(length, 0); - } - self.length = length; - } -} - -impl AsRef<[u8]> for ReadBuffer { - fn as_ref(&self) -> &[u8] { - &self.data[..self.length] - } -} - -impl AsMut<[u8]> for ReadBuffer { - fn as_mut(&mut self) -> &mut [u8] { - &mut self.data[..self.length] - } -} - -impl From> for ReadBuffer { - fn from(data: Vec) -> Self { - let length = data.len(); - Self { data, length } - } -} - -impl From for Vec { - fn from(buf: ReadBuffer) -> Self { - buf.data - } -}