From 654dd3981661fdf9ebd04bcf6892a6da1b661c1f Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Sun, 26 Jun 2022 15:20:21 +0200 Subject: [PATCH] use ReadBuffer abstraction --- examples/ipc_file_read.rs | 2 +- src/io/ipc/read/common.rs | 54 +++++++++++++++++++++++++++++++++ src/io/ipc/read/file_async.rs | 47 ++++++++++++++-------------- src/io/ipc/read/reader.rs | 32 ++++++++----------- src/io/ipc/read/stream.rs | 23 +++++++------- src/io/ipc/read/stream_async.rs | 20 ++++++------ 6 files changed, 110 insertions(+), 68 deletions(-) diff --git a/examples/ipc_file_read.rs b/examples/ipc_file_read.rs index 23d179adb47..d4ea15a3595 100644 --- a/examples/ipc_file_read.rs +++ b/examples/ipc_file_read.rs @@ -43,7 +43,7 @@ fn read_batch(path: &str) -> Result<(Schema, Chunk>)> { &metadata, None, chunk_index, - &mut vec![], + &mut Default::default(), )?; Ok((schema, chunk)) diff --git a/src/io/ipc/read/common.rs b/src/io/ipc/read/common.rs index 7e32cde5d0c..04504c0f6d7 100644 --- a/src/io/ipc/read/common.rs +++ b/src/io/ipc/read/common.rs @@ -286,6 +286,60 @@ pub fn read_dictionary( Ok(()) } +/// A small wrapper around `[Vec]` that allows us to reuse memory once it is initialized. +/// This may improve performance of the `[Read]` trait. +#[derive(Clone, Default)] +pub struct ReadBuffer { + data: Vec, + // length to be read or is read + length: usize, +} + +impl ReadBuffer { + /// Create a new [`ReadBuf`] initialized to `length` + pub fn new(length: usize) -> Self { + let data = vec![0; length]; + Self { data, length } + } + + /// Set the length of the [`ReadBuf`]. Contrary to the + /// method on `Vec` this is `safe` because this function guarantees that + /// the underlying data always is initialized. + pub fn set_len(&mut self, length: usize) { + if length > self.data.capacity() { + self.data = vec![0; length]; + } else if length > self.data.len() { + self.data.resize(length, 0); + } + self.length = length; + } +} + +impl AsRef<[u8]> for ReadBuffer { + fn as_ref(&self) -> &[u8] { + &self.data[..self.length] + } +} + +impl AsMut<[u8]> for ReadBuffer { + fn as_mut(&mut self) -> &mut [u8] { + &mut self.data[..self.length] + } +} + +impl From> for ReadBuffer { + fn from(data: Vec) -> Self { + let length = data.len(); + Self { data, length } + } +} + +impl From for Vec { + fn from(buf: ReadBuffer) -> Self { + buf.data + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/io/ipc/read/file_async.rs b/src/io/ipc/read/file_async.rs index 3ec50482c74..194a41a12e9 100644 --- a/src/io/ipc/read/file_async.rs +++ b/src/io/ipc/read/file_async.rs @@ -11,10 +11,11 @@ use crate::array::*; use crate::chunk::Chunk; use crate::datatypes::{Field, Schema}; use crate::error::{Error, Result}; -use crate::io::ipc::read::reader::prepare_scratch; use crate::io::ipc::{IpcSchema, ARROW_MAGIC, CONTINUATION_MARKER}; -use super::common::{apply_projection, prepare_projection, read_dictionary, read_record_batch}; +use super::common::{ + apply_projection, prepare_projection, read_dictionary, read_record_batch, ReadBuffer, +}; use super::reader::{deserialize_footer, get_serialized_batch}; use super::Dictionaries; use super::FileMetadata; @@ -78,8 +79,8 @@ impl<'a> FileStream<'a> { // read dictionaries cached_read_dictionaries(&mut reader, &metadata, &mut dictionaries).await?; - let mut meta_buffer = vec![]; - let mut block_buffer = vec![]; + let mut meta_buffer = Default::default(); + let mut block_buffer = Default::default(); for block in 0..metadata.blocks.len() { let chunk = read_batch( &mut reader, @@ -153,8 +154,8 @@ async fn read_batch( metadata: &FileMetadata, projection: Option<&[usize]>, block: usize, - meta_buffer: &mut Vec, - block_buffer: &mut Vec, + meta_buffer: &mut ReadBuffer, + block_buffer: &mut ReadBuffer, ) -> Result>> where R: AsyncRead + AsyncSeek + Unpin, @@ -177,11 +178,10 @@ where .try_into() .map_err(|_| Error::from(OutOfSpecKind::UnexpectedNegativeInteger))?; - reader - .read_exact(prepare_scratch(meta_buffer, meta_len)) - .await?; + meta_buffer.set_len(meta_len); + reader.read_exact(meta_buffer.as_mut()).await?; - let message = arrow_format::ipc::MessageRef::read_as_root(meta_buffer) + let message = arrow_format::ipc::MessageRef::read_as_root(meta_buffer.as_ref()) .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?; let batch = get_serialized_batch(&message)?; @@ -192,10 +192,9 @@ where .try_into() .map_err(|_| Error::from(OutOfSpecKind::UnexpectedNegativeInteger))?; - reader - .read_exact(prepare_scratch(block_buffer, block_length)) - .await?; - let mut cursor = std::io::Cursor::new(block_buffer); + block_buffer.set_len(block_length); + reader.read_exact(block_buffer.as_mut()).await?; + let mut cursor = std::io::Cursor::new(block_buffer.as_ref()); read_record_batch( batch, @@ -222,8 +221,8 @@ where R: AsyncRead + AsyncSeek + Unpin, { let mut dictionaries = Default::default(); - let mut data = vec![]; - let mut buffer = vec![]; + let mut data = ReadBuffer::new(0); + let mut buffer = ReadBuffer::new(0); for block in blocks { let offset: u64 = block @@ -238,7 +237,7 @@ where read_dictionary_message(&mut reader, offset, &mut data).await?; - let message = arrow_format::ipc::MessageRef::read_as_root(&data) + let message = arrow_format::ipc::MessageRef::read_as_root(data.as_ref()) .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?; let header = message @@ -246,12 +245,11 @@ where .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferHeader(err)))? .ok_or_else(|| Error::from(OutOfSpecKind::MissingMessageHeader))?; + buffer.set_len(length); match header { MessageHeaderRef::DictionaryBatch(batch) => { - reader - .read_exact(prepare_scratch(&mut buffer, length)) - .await?; - let mut cursor = std::io::Cursor::new(&mut buffer); + reader.read_exact(buffer.as_mut()).await?; + let mut cursor = std::io::Cursor::new(buffer.as_ref()); read_dictionary( batch, fields, @@ -268,7 +266,7 @@ where Ok(dictionaries) } -async fn read_dictionary_message(mut reader: R, offset: u64, data: &mut Vec) -> Result<()> +async fn read_dictionary_message(mut reader: R, offset: u64, data: &mut ReadBuffer) -> Result<()> where R: AsyncRead + AsyncSeek + Unpin, { @@ -284,9 +282,8 @@ where .try_into() .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - reader - .read_exact(prepare_scratch(data, footer_size)) - .await?; + data.set_len(footer_size); + reader.read_exact(data.as_mut()).await?; Ok(()) } diff --git a/src/io/ipc/read/reader.rs b/src/io/ipc/read/reader.rs index ff6bf752ab3..c40adbdd694 100644 --- a/src/io/ipc/read/reader.rs +++ b/src/io/ipc/read/reader.rs @@ -36,16 +36,6 @@ pub struct FileMetadata { pub(crate) size: u64, } -/// prepare `scratch` to read the message -pub(super) fn prepare_scratch(scratch: &mut Vec, message_length: usize) -> &mut [u8] { - // ensure that we have enough scratch space - if message_length > scratch.len() { - scratch.resize(message_length, 0); - } - // return the buffer that will be overwritten by read - &mut scratch[..message_length] -} - /// Arrow File reader pub struct FileReader { reader: R, @@ -54,13 +44,13 @@ pub struct FileReader { dictionaries: Option, current_block: usize, projection: Option<(Vec, HashMap, Schema)>, - buffer: Vec, + buffer: ReadBuffer, } fn read_dictionary_message( reader: &mut R, offset: u64, - data: &mut Vec, + data: &mut ReadBuffer, ) -> Result<()> { let mut message_size: [u8; 4] = [0; 4]; reader.seek(SeekFrom::Start(offset))?; @@ -74,7 +64,8 @@ fn read_dictionary_message( .try_into() .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - reader.read_exact(prepare_scratch(data, message_length))?; + data.set_len(message_length); + reader.read_exact(data.as_mut())?; Ok(()) } @@ -83,7 +74,7 @@ fn read_dictionary_block( metadata: &FileMetadata, block: &arrow_format::ipc::Block, dictionaries: &mut Dictionaries, - scratch: &mut Vec, + scratch: &mut ReadBuffer, ) -> Result<()> { let offset: u64 = block .offset @@ -95,7 +86,7 @@ fn read_dictionary_block( .map_err(|_| Error::from(OutOfSpecKind::UnexpectedNegativeInteger))?; read_dictionary_message(reader, offset, scratch)?; - let message = arrow_format::ipc::MessageRef::read_as_root(scratch) + let message = arrow_format::ipc::MessageRef::read_as_root(scratch.as_ref()) .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?; let header = message @@ -127,7 +118,7 @@ pub fn read_file_dictionaries( metadata: &FileMetadata, ) -> Result { let mut dictionaries = Default::default(); - let mut data = vec![]; + let mut data = vec![].into(); let blocks = if let Some(blocks) = metadata.dictionaries.as_deref() { blocks @@ -255,7 +246,7 @@ pub fn read_batch( metadata: &FileMetadata, projection: Option<&[usize]>, index: usize, - scratch: &mut Vec, + scratch: &mut ReadBuffer, ) -> Result>> { let block = metadata.blocks[index]; @@ -276,9 +267,10 @@ pub fn read_batch( .try_into() .map_err(|_| Error::from(OutOfSpecKind::UnexpectedNegativeInteger))?; - reader.read_exact(prepare_scratch(scratch, meta_len))?; + scratch.set_len(meta_len); + reader.read_exact(scratch.as_mut())?; - let message = arrow_format::ipc::MessageRef::read_as_root(scratch) + let message = arrow_format::ipc::MessageRef::read_as_root(scratch.as_ref()) .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?; let batch = get_serialized_batch(&message)?; @@ -327,7 +319,7 @@ impl FileReader { dictionaries: Default::default(), projection, current_block: 0, - buffer: vec![], + buffer: Default::default(), } } diff --git a/src/io/ipc/read/stream.rs b/src/io/ipc/read/stream.rs index 5048451e17e..480d9d9d619 100644 --- a/src/io/ipc/read/stream.rs +++ b/src/io/ipc/read/stream.rs @@ -7,7 +7,6 @@ use crate::array::Array; use crate::chunk::Chunk; use crate::datatypes::Schema; use crate::error::{Error, Result}; -use crate::io::ipc::read::reader::prepare_scratch; use crate::io::ipc::IpcSchema; use super::super::CONTINUATION_MARKER; @@ -90,8 +89,8 @@ fn read_next( reader: &mut R, metadata: &StreamMetadata, dictionaries: &mut Dictionaries, - message_buffer: &mut Vec, - data_buffer: &mut Vec, + message_buffer: &mut ReadBuffer, + data_buffer: &mut ReadBuffer, ) -> Result> { // determine metadata length let mut meta_length: [u8; 4] = [0; 4]; @@ -128,9 +127,10 @@ fn read_next( return Ok(None); } - reader.read_exact(prepare_scratch(message_buffer, meta_length))?; + message_buffer.set_len(meta_length); + reader.read_exact(message_buffer.as_mut())?; - let message = arrow_format::ipc::MessageRef::read_as_root(message_buffer) + let message = arrow_format::ipc::MessageRef::read_as_root(message_buffer.as_ref()) .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?; let header = message @@ -144,11 +144,12 @@ fn read_next( .try_into() .map_err(|_| Error::from(OutOfSpecKind::UnexpectedNegativeInteger))?; + data_buffer.set_len(block_length); match header { arrow_format::ipc::MessageHeaderRef::RecordBatch(batch) => { - reader.read_exact(prepare_scratch(data_buffer, block_length))?; + reader.read_exact(data_buffer.as_mut())?; - let file_size = data_buffer.len() as u64; + let file_size = data_buffer.as_ref().len() as u64; let mut reader = std::io::Cursor::new(data_buffer); @@ -199,8 +200,8 @@ pub struct StreamReader { metadata: StreamMetadata, dictionaries: Dictionaries, finished: bool, - data_buffer: Vec, - message_buffer: Vec, + data_buffer: ReadBuffer, + message_buffer: ReadBuffer, } impl StreamReader { @@ -215,8 +216,8 @@ impl StreamReader { metadata, dictionaries: Default::default(), finished: false, - data_buffer: vec![], - message_buffer: vec![], + data_buffer: Default::default(), + message_buffer: Default::default(), } } diff --git a/src/io/ipc/read/stream_async.rs b/src/io/ipc/read/stream_async.rs index 2e51860d052..6730999c5e4 100644 --- a/src/io/ipc/read/stream_async.rs +++ b/src/io/ipc/read/stream_async.rs @@ -10,10 +10,9 @@ use futures::Stream; use crate::array::*; use crate::chunk::Chunk; use crate::error::{Error, Result}; -use crate::io::ipc::read::reader::prepare_scratch; use super::super::CONTINUATION_MARKER; -use super::common::{read_dictionary, read_record_batch}; +use super::common::{read_dictionary, read_record_batch, ReadBuffer}; use super::schema::deserialize_stream_metadata; use super::Dictionaries; use super::OutOfSpecKind; @@ -25,9 +24,9 @@ struct ReadState { pub metadata: StreamMetadata, pub dictionaries: Dictionaries, /// The internal buffer to read data inside the messages (records and dictionaries) to - pub data_buffer: Vec, + pub data_buffer: ReadBuffer, /// The internal buffer to read messages to - pub message_buffer: Vec, + pub message_buffer: ReadBuffer, } /// The state of an Arrow stream @@ -104,12 +103,13 @@ async fn maybe_next( return Ok(None); } + state.message_buffer.set_len(meta_length); state .reader - .read_exact(prepare_scratch(&mut state.message_buffer, meta_length)) + .read_exact(state.message_buffer.as_mut()) .await?; - let message = arrow_format::ipc::MessageRef::read_as_root(&state.message_buffer) + let message = arrow_format::ipc::MessageRef::read_as_root(state.message_buffer.as_ref()) .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?; let header = message @@ -123,12 +123,10 @@ async fn maybe_next( .try_into() .map_err(|_| Error::from(OutOfSpecKind::UnexpectedNegativeInteger))?; + state.data_buffer.set_len(block_length); match header { arrow_format::ipc::MessageHeaderRef::RecordBatch(batch) => { - state - .reader - .read_exact(prepare_scratch(&mut state.data_buffer, block_length)) - .await?; + state.reader.read_exact(state.data_buffer.as_mut()).await?; read_record_batch( batch, @@ -139,7 +137,7 @@ async fn maybe_next( state.metadata.version, &mut std::io::Cursor::new(&state.data_buffer), 0, - state.data_buffer.len() as u64, + state.data_buffer.as_ref().len() as u64, ) .map(|chunk| Some(StreamState::Some((state, chunk)))) }