Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
use ReadBuffer abstraction
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jun 26, 2022
1 parent 3aeecbb commit 654dd39
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 68 deletions.
2 changes: 1 addition & 1 deletion examples/ipc_file_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ fn read_batch(path: &str) -> Result<(Schema, Chunk<Box<dyn Array>>)> {
&metadata,
None,
chunk_index,
&mut vec![],
&mut Default::default(),
)?;

Ok((schema, chunk))
Expand Down
54 changes: 54 additions & 0 deletions src/io/ipc/read/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,60 @@ pub fn read_dictionary<R: Read + Seek>(
Ok(())
}

/// A small wrapper around `[Vec<u8>]` that allows us to reuse memory once it is initialized.
/// This may improve performance of the `[Read]` trait.
#[derive(Clone, Default)]
pub struct ReadBuffer {
data: Vec<u8>,
// length to be read or is read
length: usize,
}

impl ReadBuffer {
/// Create a new [`ReadBuf`] initialized to `length`
pub fn new(length: usize) -> Self {
let data = vec![0; length];
Self { data, length }
}

/// Set the length of the [`ReadBuf`]. Contrary to the
/// method on `Vec` this is `safe` because this function guarantees that
/// the underlying data always is initialized.
pub fn set_len(&mut self, length: usize) {
if length > self.data.capacity() {
self.data = vec![0; length];
} else if length > self.data.len() {
self.data.resize(length, 0);
}
self.length = length;
}
}

impl AsRef<[u8]> for ReadBuffer {
fn as_ref(&self) -> &[u8] {
&self.data[..self.length]
}
}

impl AsMut<[u8]> for ReadBuffer {
fn as_mut(&mut self) -> &mut [u8] {
&mut self.data[..self.length]
}
}

impl From<Vec<u8>> for ReadBuffer {
fn from(data: Vec<u8>) -> Self {
let length = data.len();
Self { data, length }
}
}

impl From<ReadBuffer> for Vec<u8> {
fn from(buf: ReadBuffer) -> Self {
buf.data
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
47 changes: 22 additions & 25 deletions src/io/ipc/read/file_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ use crate::array::*;
use crate::chunk::Chunk;
use crate::datatypes::{Field, Schema};
use crate::error::{Error, Result};
use crate::io::ipc::read::reader::prepare_scratch;
use crate::io::ipc::{IpcSchema, ARROW_MAGIC, CONTINUATION_MARKER};

use super::common::{apply_projection, prepare_projection, read_dictionary, read_record_batch};
use super::common::{
apply_projection, prepare_projection, read_dictionary, read_record_batch, ReadBuffer,
};
use super::reader::{deserialize_footer, get_serialized_batch};
use super::Dictionaries;
use super::FileMetadata;
Expand Down Expand Up @@ -78,8 +79,8 @@ impl<'a> FileStream<'a> {
// read dictionaries
cached_read_dictionaries(&mut reader, &metadata, &mut dictionaries).await?;

let mut meta_buffer = vec![];
let mut block_buffer = vec![];
let mut meta_buffer = Default::default();
let mut block_buffer = Default::default();
for block in 0..metadata.blocks.len() {
let chunk = read_batch(
&mut reader,
Expand Down Expand Up @@ -153,8 +154,8 @@ async fn read_batch<R>(
metadata: &FileMetadata,
projection: Option<&[usize]>,
block: usize,
meta_buffer: &mut Vec<u8>,
block_buffer: &mut Vec<u8>,
meta_buffer: &mut ReadBuffer,
block_buffer: &mut ReadBuffer,
) -> Result<Chunk<Box<dyn Array>>>
where
R: AsyncRead + AsyncSeek + Unpin,
Expand All @@ -177,11 +178,10 @@ where
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::UnexpectedNegativeInteger))?;

reader
.read_exact(prepare_scratch(meta_buffer, meta_len))
.await?;
meta_buffer.set_len(meta_len);
reader.read_exact(meta_buffer.as_mut()).await?;

let message = arrow_format::ipc::MessageRef::read_as_root(meta_buffer)
let message = arrow_format::ipc::MessageRef::read_as_root(meta_buffer.as_ref())
.map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?;

let batch = get_serialized_batch(&message)?;
Expand All @@ -192,10 +192,9 @@ where
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::UnexpectedNegativeInteger))?;

reader
.read_exact(prepare_scratch(block_buffer, block_length))
.await?;
let mut cursor = std::io::Cursor::new(block_buffer);
block_buffer.set_len(block_length);
reader.read_exact(block_buffer.as_mut()).await?;
let mut cursor = std::io::Cursor::new(block_buffer.as_ref());

read_record_batch(
batch,
Expand All @@ -222,8 +221,8 @@ where
R: AsyncRead + AsyncSeek + Unpin,
{
let mut dictionaries = Default::default();
let mut data = vec![];
let mut buffer = vec![];
let mut data = ReadBuffer::new(0);
let mut buffer = ReadBuffer::new(0);

for block in blocks {
let offset: u64 = block
Expand All @@ -238,20 +237,19 @@ where

read_dictionary_message(&mut reader, offset, &mut data).await?;

let message = arrow_format::ipc::MessageRef::read_as_root(&data)
let message = arrow_format::ipc::MessageRef::read_as_root(data.as_ref())
.map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?;

let header = message
.header()
.map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferHeader(err)))?
.ok_or_else(|| Error::from(OutOfSpecKind::MissingMessageHeader))?;

buffer.set_len(length);
match header {
MessageHeaderRef::DictionaryBatch(batch) => {
reader
.read_exact(prepare_scratch(&mut buffer, length))
.await?;
let mut cursor = std::io::Cursor::new(&mut buffer);
reader.read_exact(buffer.as_mut()).await?;
let mut cursor = std::io::Cursor::new(buffer.as_ref());
read_dictionary(
batch,
fields,
Expand All @@ -268,7 +266,7 @@ where
Ok(dictionaries)
}

async fn read_dictionary_message<R>(mut reader: R, offset: u64, data: &mut Vec<u8>) -> Result<()>
async fn read_dictionary_message<R>(mut reader: R, offset: u64, data: &mut ReadBuffer) -> Result<()>
where
R: AsyncRead + AsyncSeek + Unpin,
{
Expand All @@ -284,9 +282,8 @@ where
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

reader
.read_exact(prepare_scratch(data, footer_size))
.await?;
data.set_len(footer_size);
reader.read_exact(data.as_mut()).await?;

Ok(())
}
Expand Down
32 changes: 12 additions & 20 deletions src/io/ipc/read/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,6 @@ pub struct FileMetadata {
pub(crate) size: u64,
}

/// prepare `scratch` to read the message
pub(super) fn prepare_scratch(scratch: &mut Vec<u8>, message_length: usize) -> &mut [u8] {
// ensure that we have enough scratch space
if message_length > scratch.len() {
scratch.resize(message_length, 0);
}
// return the buffer that will be overwritten by read
&mut scratch[..message_length]
}

/// Arrow File reader
pub struct FileReader<R: Read + Seek> {
reader: R,
Expand All @@ -54,13 +44,13 @@ pub struct FileReader<R: Read + Seek> {
dictionaries: Option<Dictionaries>,
current_block: usize,
projection: Option<(Vec<usize>, HashMap<usize, usize>, Schema)>,
buffer: Vec<u8>,
buffer: ReadBuffer,
}

fn read_dictionary_message<R: Read + Seek>(
reader: &mut R,
offset: u64,
data: &mut Vec<u8>,
data: &mut ReadBuffer,
) -> Result<()> {
let mut message_size: [u8; 4] = [0; 4];
reader.seek(SeekFrom::Start(offset))?;
Expand All @@ -74,7 +64,8 @@ fn read_dictionary_message<R: Read + Seek>(
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

reader.read_exact(prepare_scratch(data, message_length))?;
data.set_len(message_length);
reader.read_exact(data.as_mut())?;
Ok(())
}

Expand All @@ -83,7 +74,7 @@ fn read_dictionary_block<R: Read + Seek>(
metadata: &FileMetadata,
block: &arrow_format::ipc::Block,
dictionaries: &mut Dictionaries,
scratch: &mut Vec<u8>,
scratch: &mut ReadBuffer,
) -> Result<()> {
let offset: u64 = block
.offset
Expand All @@ -95,7 +86,7 @@ fn read_dictionary_block<R: Read + Seek>(
.map_err(|_| Error::from(OutOfSpecKind::UnexpectedNegativeInteger))?;
read_dictionary_message(reader, offset, scratch)?;

let message = arrow_format::ipc::MessageRef::read_as_root(scratch)
let message = arrow_format::ipc::MessageRef::read_as_root(scratch.as_ref())
.map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?;

let header = message
Expand Down Expand Up @@ -127,7 +118,7 @@ pub fn read_file_dictionaries<R: Read + Seek>(
metadata: &FileMetadata,
) -> Result<Dictionaries> {
let mut dictionaries = Default::default();
let mut data = vec![];
let mut data = vec![].into();

let blocks = if let Some(blocks) = metadata.dictionaries.as_deref() {
blocks
Expand Down Expand Up @@ -255,7 +246,7 @@ pub fn read_batch<R: Read + Seek>(
metadata: &FileMetadata,
projection: Option<&[usize]>,
index: usize,
scratch: &mut Vec<u8>,
scratch: &mut ReadBuffer,
) -> Result<Chunk<Box<dyn Array>>> {
let block = metadata.blocks[index];

Expand All @@ -276,9 +267,10 @@ pub fn read_batch<R: Read + Seek>(
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::UnexpectedNegativeInteger))?;

reader.read_exact(prepare_scratch(scratch, meta_len))?;
scratch.set_len(meta_len);
reader.read_exact(scratch.as_mut())?;

let message = arrow_format::ipc::MessageRef::read_as_root(scratch)
let message = arrow_format::ipc::MessageRef::read_as_root(scratch.as_ref())
.map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?;

let batch = get_serialized_batch(&message)?;
Expand Down Expand Up @@ -327,7 +319,7 @@ impl<R: Read + Seek> FileReader<R> {
dictionaries: Default::default(),
projection,
current_block: 0,
buffer: vec![],
buffer: Default::default(),
}
}

Expand Down
23 changes: 12 additions & 11 deletions src/io/ipc/read/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use crate::array::Array;
use crate::chunk::Chunk;
use crate::datatypes::Schema;
use crate::error::{Error, Result};
use crate::io::ipc::read::reader::prepare_scratch;
use crate::io::ipc::IpcSchema;

use super::super::CONTINUATION_MARKER;
Expand Down Expand Up @@ -90,8 +89,8 @@ fn read_next<R: Read>(
reader: &mut R,
metadata: &StreamMetadata,
dictionaries: &mut Dictionaries,
message_buffer: &mut Vec<u8>,
data_buffer: &mut Vec<u8>,
message_buffer: &mut ReadBuffer,
data_buffer: &mut ReadBuffer,
) -> Result<Option<StreamState>> {
// determine metadata length
let mut meta_length: [u8; 4] = [0; 4];
Expand Down Expand Up @@ -128,9 +127,10 @@ fn read_next<R: Read>(
return Ok(None);
}

reader.read_exact(prepare_scratch(message_buffer, meta_length))?;
message_buffer.set_len(meta_length);
reader.read_exact(message_buffer.as_mut())?;

let message = arrow_format::ipc::MessageRef::read_as_root(message_buffer)
let message = arrow_format::ipc::MessageRef::read_as_root(message_buffer.as_ref())
.map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?;

let header = message
Expand All @@ -144,11 +144,12 @@ fn read_next<R: Read>(
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::UnexpectedNegativeInteger))?;

data_buffer.set_len(block_length);
match header {
arrow_format::ipc::MessageHeaderRef::RecordBatch(batch) => {
reader.read_exact(prepare_scratch(data_buffer, block_length))?;
reader.read_exact(data_buffer.as_mut())?;

let file_size = data_buffer.len() as u64;
let file_size = data_buffer.as_ref().len() as u64;

let mut reader = std::io::Cursor::new(data_buffer);

Expand Down Expand Up @@ -199,8 +200,8 @@ pub struct StreamReader<R: Read> {
metadata: StreamMetadata,
dictionaries: Dictionaries,
finished: bool,
data_buffer: Vec<u8>,
message_buffer: Vec<u8>,
data_buffer: ReadBuffer,
message_buffer: ReadBuffer,
}

impl<R: Read> StreamReader<R> {
Expand All @@ -215,8 +216,8 @@ impl<R: Read> StreamReader<R> {
metadata,
dictionaries: Default::default(),
finished: false,
data_buffer: vec![],
message_buffer: vec![],
data_buffer: Default::default(),
message_buffer: Default::default(),
}
}

Expand Down
Loading

0 comments on commit 654dd39

Please sign in to comment.