Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
IPC: don't reassign all bytes before overwriting them
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jun 26, 2022
1 parent b679b06 commit 653956e
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 34 deletions.
25 changes: 13 additions & 12 deletions src/io/ipc/read/file_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::array::*;
use crate::chunk::Chunk;
use crate::datatypes::{Field, Schema};
use crate::error::{Error, Result};
use crate::io::ipc::read::reader::prepare_scratch;
use crate::io::ipc::{IpcSchema, ARROW_MAGIC, CONTINUATION_MARKER};

use super::common::{apply_projection, prepare_projection, read_dictionary, read_record_batch};
Expand Down Expand Up @@ -176,9 +177,9 @@ where
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::UnexpectedNegativeInteger))?;

meta_buffer.clear();
meta_buffer.resize(meta_len, 0);
reader.read_exact(meta_buffer).await?;
reader
.read_exact(prepare_scratch(meta_buffer, meta_len))
.await?;

let message = arrow_format::ipc::MessageRef::read_as_root(meta_buffer)
.map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?;
Expand All @@ -191,9 +192,9 @@ where
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::UnexpectedNegativeInteger))?;

block_buffer.clear();
block_buffer.resize(block_length, 0);
reader.read_exact(block_buffer).await?;
reader
.read_exact(prepare_scratch(block_buffer, block_length))
.await?;
let mut cursor = std::io::Cursor::new(block_buffer);

read_record_batch(
Expand Down Expand Up @@ -247,9 +248,9 @@ where

match header {
MessageHeaderRef::DictionaryBatch(batch) => {
buffer.clear();
buffer.resize(length, 0);
reader.read_exact(&mut buffer).await?;
reader
.read_exact(prepare_scratch(&mut buffer, length))
.await?;
let mut cursor = std::io::Cursor::new(&mut buffer);
read_dictionary(
batch,
Expand Down Expand Up @@ -283,9 +284,9 @@ where
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

data.clear();
data.resize(footer_size, 0);
reader.read_exact(data).await?;
reader
.read_exact(prepare_scratch(data, footer_size))
.await?;

Ok(())
}
Expand Down
24 changes: 14 additions & 10 deletions src/io/ipc/read/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,16 @@ pub struct FileMetadata {
pub(crate) size: u64,
}

/// prepare `scratch` to read the message
pub(super) fn prepare_scratch(scratch: &mut Vec<u8>, message_length: usize) -> &mut [u8] {
// ensure that we have enough scratch space
if message_length > scratch.len() {
scratch.resize(message_length, 0);
}
// return the buffer that will be overwritten by read
&mut scratch[..message_length]
}

/// Arrow File reader
pub struct FileReader<R: Read + Seek> {
reader: R,
Expand Down Expand Up @@ -64,11 +74,7 @@ fn read_dictionary_message<R: Read + Seek>(
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

// prepare `data` to read the message
data.clear();
data.resize(message_length, 0);

reader.read_exact(data)?;
reader.read_exact(prepare_scratch(data, message_length))?;
Ok(())
}

Expand Down Expand Up @@ -249,7 +255,7 @@ pub fn read_batch<R: Read + Seek>(
metadata: &FileMetadata,
projection: Option<&[usize]>,
index: usize,
stratch: &mut Vec<u8>,
scratch: &mut Vec<u8>,
) -> Result<Chunk<Box<dyn Array>>> {
let block = metadata.blocks[index];

Expand All @@ -270,11 +276,9 @@ pub fn read_batch<R: Read + Seek>(
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::UnexpectedNegativeInteger))?;

stratch.clear();
stratch.resize(meta_len, 0);
reader.read_exact(stratch)?;
reader.read_exact(prepare_scratch(scratch, meta_len))?;

let message = arrow_format::ipc::MessageRef::read_as_root(stratch)
let message = arrow_format::ipc::MessageRef::read_as_root(scratch)
.map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?;

let batch = get_serialized_batch(&message)?;
Expand Down
9 changes: 3 additions & 6 deletions src/io/ipc/read/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::array::Array;
use crate::chunk::Chunk;
use crate::datatypes::Schema;
use crate::error::{Error, Result};
use crate::io::ipc::read::reader::prepare_scratch;
use crate::io::ipc::IpcSchema;

use super::super::CONTINUATION_MARKER;
Expand Down Expand Up @@ -129,9 +130,7 @@ fn read_next<R: Read>(
return Ok(None);
}

message_buffer.clear();
message_buffer.resize(meta_length, 0);
reader.read_exact(message_buffer)?;
reader.read_exact(prepare_scratch(message_buffer, meta_length))?;

let message = arrow_format::ipc::MessageRef::read_as_root(message_buffer)
.map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?;
Expand All @@ -149,9 +148,7 @@ fn read_next<R: Read>(

match header {
arrow_format::ipc::MessageHeaderRef::RecordBatch(batch) => {
data_buffer.clear();
data_buffer.resize(block_length, 0);
reader.read_exact(data_buffer)?;
reader.read_exact(prepare_scratch(data_buffer, block_length))?;

let file_size = data_buffer.len() as u64;

Expand Down
15 changes: 9 additions & 6 deletions src/io/ipc/read/stream_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use futures::Stream;
use crate::array::*;
use crate::chunk::Chunk;
use crate::error::{Error, Result};
use crate::io::ipc::read::reader::prepare_scratch;

use super::super::CONTINUATION_MARKER;
use super::common::{read_dictionary, read_record_batch};
Expand Down Expand Up @@ -103,9 +104,10 @@ async fn maybe_next<R: AsyncRead + Unpin + Send>(
return Ok(None);
}

state.message_buffer.clear();
state.message_buffer.resize(meta_length, 0);
state.reader.read_exact(&mut state.message_buffer).await?;
state
.reader
.read_exact(prepare_scratch(&mut state.message_buffer, meta_length))
.await?;

let message = arrow_format::ipc::MessageRef::read_as_root(&state.message_buffer)
.map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?;
Expand All @@ -123,9 +125,10 @@ async fn maybe_next<R: AsyncRead + Unpin + Send>(

match header {
arrow_format::ipc::MessageHeaderRef::RecordBatch(batch) => {
state.data_buffer.clear();
state.data_buffer.resize(block_length, 0);
state.reader.read_exact(&mut state.data_buffer).await?;
state
.reader
.read_exact(prepare_scratch(&mut state.data_buffer, block_length))
.await?;

read_record_batch(
batch,
Expand Down

0 comments on commit 653956e

Please sign in to comment.