From 3aeecbb406b9bef17f6351edd48ac22ae0b0cbb4 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Sat, 25 Jun 2022 14:51:09 +0200 Subject: [PATCH] IPC: don't reassign all bytes before overwriting them --- src/io/ipc/read/file_async.rs | 25 +++++++++++++------------ src/io/ipc/read/reader.rs | 24 ++++++++++++++---------- src/io/ipc/read/stream.rs | 9 +++------ src/io/ipc/read/stream_async.rs | 15 +++++++++------ 4 files changed, 39 insertions(+), 34 deletions(-) diff --git a/src/io/ipc/read/file_async.rs b/src/io/ipc/read/file_async.rs index 8397eb4a016..3ec50482c74 100644 --- a/src/io/ipc/read/file_async.rs +++ b/src/io/ipc/read/file_async.rs @@ -11,6 +11,7 @@ use crate::array::*; use crate::chunk::Chunk; use crate::datatypes::{Field, Schema}; use crate::error::{Error, Result}; +use crate::io::ipc::read::reader::prepare_scratch; use crate::io::ipc::{IpcSchema, ARROW_MAGIC, CONTINUATION_MARKER}; use super::common::{apply_projection, prepare_projection, read_dictionary, read_record_batch}; @@ -176,9 +177,9 @@ where .try_into() .map_err(|_| Error::from(OutOfSpecKind::UnexpectedNegativeInteger))?; - meta_buffer.clear(); - meta_buffer.resize(meta_len, 0); - reader.read_exact(meta_buffer).await?; + reader + .read_exact(prepare_scratch(meta_buffer, meta_len)) + .await?; let message = arrow_format::ipc::MessageRef::read_as_root(meta_buffer) .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?; @@ -191,9 +192,9 @@ where .try_into() .map_err(|_| Error::from(OutOfSpecKind::UnexpectedNegativeInteger))?; - block_buffer.clear(); - block_buffer.resize(block_length, 0); - reader.read_exact(block_buffer).await?; + reader + .read_exact(prepare_scratch(block_buffer, block_length)) + .await?; let mut cursor = std::io::Cursor::new(block_buffer); read_record_batch( @@ -247,9 +248,9 @@ where match header { MessageHeaderRef::DictionaryBatch(batch) => { - buffer.clear(); - buffer.resize(length, 0); - reader.read_exact(&mut buffer).await?; + reader + .read_exact(prepare_scratch(&mut buffer, length)) + .await?; let mut cursor = std::io::Cursor::new(&mut buffer); read_dictionary( batch, @@ -283,9 +284,9 @@ where .try_into() .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - data.clear(); - data.resize(footer_size, 0); - reader.read_exact(data).await?; + reader + .read_exact(prepare_scratch(data, footer_size)) + .await?; Ok(()) } diff --git a/src/io/ipc/read/reader.rs b/src/io/ipc/read/reader.rs index 9807c198496..ff6bf752ab3 100644 --- a/src/io/ipc/read/reader.rs +++ b/src/io/ipc/read/reader.rs @@ -36,6 +36,16 @@ pub struct FileMetadata { pub(crate) size: u64, } +/// prepare `scratch` to read the message +pub(super) fn prepare_scratch(scratch: &mut Vec, message_length: usize) -> &mut [u8] { + // ensure that we have enough scratch space + if message_length > scratch.len() { + scratch.resize(message_length, 0); + } + // return the buffer that will be overwritten by read + &mut scratch[..message_length] +} + /// Arrow File reader pub struct FileReader { reader: R, @@ -64,11 +74,7 @@ fn read_dictionary_message( .try_into() .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - // prepare `data` to read the message - data.clear(); - data.resize(message_length, 0); - - reader.read_exact(data)?; + reader.read_exact(prepare_scratch(data, message_length))?; Ok(()) } @@ -249,7 +255,7 @@ pub fn read_batch( metadata: &FileMetadata, projection: Option<&[usize]>, index: usize, - stratch: &mut Vec, + scratch: &mut Vec, ) -> Result>> { let block = metadata.blocks[index]; @@ -270,11 +276,9 @@ pub fn read_batch( .try_into() .map_err(|_| Error::from(OutOfSpecKind::UnexpectedNegativeInteger))?; - stratch.clear(); - stratch.resize(meta_len, 0); - reader.read_exact(stratch)?; + reader.read_exact(prepare_scratch(scratch, meta_len))?; - let message = arrow_format::ipc::MessageRef::read_as_root(stratch) + let message = arrow_format::ipc::MessageRef::read_as_root(scratch) .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?; let batch = get_serialized_batch(&message)?; diff --git a/src/io/ipc/read/stream.rs b/src/io/ipc/read/stream.rs index c49512faaaa..5048451e17e 100644 --- a/src/io/ipc/read/stream.rs +++ b/src/io/ipc/read/stream.rs @@ -7,6 +7,7 @@ use crate::array::Array; use crate::chunk::Chunk; use crate::datatypes::Schema; use crate::error::{Error, Result}; +use crate::io::ipc::read::reader::prepare_scratch; use crate::io::ipc::IpcSchema; use super::super::CONTINUATION_MARKER; @@ -127,9 +128,7 @@ fn read_next( return Ok(None); } - message_buffer.clear(); - message_buffer.resize(meta_length, 0); - reader.read_exact(message_buffer)?; + reader.read_exact(prepare_scratch(message_buffer, meta_length))?; let message = arrow_format::ipc::MessageRef::read_as_root(message_buffer) .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?; @@ -147,9 +146,7 @@ fn read_next( match header { arrow_format::ipc::MessageHeaderRef::RecordBatch(batch) => { - data_buffer.clear(); - data_buffer.resize(block_length, 0); - reader.read_exact(data_buffer)?; + reader.read_exact(prepare_scratch(data_buffer, block_length))?; let file_size = data_buffer.len() as u64; diff --git a/src/io/ipc/read/stream_async.rs b/src/io/ipc/read/stream_async.rs index 1e130b48496..2e51860d052 100644 --- a/src/io/ipc/read/stream_async.rs +++ b/src/io/ipc/read/stream_async.rs @@ -10,6 +10,7 @@ use futures::Stream; use crate::array::*; use crate::chunk::Chunk; use crate::error::{Error, Result}; +use crate::io::ipc::read::reader::prepare_scratch; use super::super::CONTINUATION_MARKER; use super::common::{read_dictionary, read_record_batch}; @@ -103,9 +104,10 @@ async fn maybe_next( return Ok(None); } - state.message_buffer.clear(); - state.message_buffer.resize(meta_length, 0); - state.reader.read_exact(&mut state.message_buffer).await?; + state + .reader + .read_exact(prepare_scratch(&mut state.message_buffer, meta_length)) + .await?; let message = arrow_format::ipc::MessageRef::read_as_root(&state.message_buffer) .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?; @@ -123,9 +125,10 @@ async fn maybe_next( match header { arrow_format::ipc::MessageHeaderRef::RecordBatch(batch) => { - state.data_buffer.clear(); - state.data_buffer.resize(block_length, 0); - state.reader.read_exact(&mut state.data_buffer).await?; + state + .reader + .read_exact(prepare_scratch(&mut state.data_buffer, block_length)) + .await?; read_record_batch( batch,