Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Removed bunch of allocations.
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Nov 16, 2021
1 parent 70db412 commit 46b582b
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 109 deletions.
2 changes: 1 addition & 1 deletion src/io/ipc/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ mod reader;
mod stream;

pub use common::{read_dictionary, read_record_batch};
pub use reader::{read_file_metadata, FileMetadata, FileReader, LimitRows};
pub use reader::{read_file_metadata, FileMetadata, FileReader};
pub use stream::{read_stream_metadata, StreamMetadata, StreamReader, StreamState};
135 changes: 46 additions & 89 deletions src/io/ipc/read/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ pub struct FileMetadata {
/// A block indicates the regions in the file to read to get data
blocks: Vec<ipc::File::Block>,

/// The total number of blocks, which may contain record batches and other types
total_blocks: usize,

/// Dictionaries associated to each dict_id
dictionaries: HashMap<usize, Arc<dyn Array>>,

Expand All @@ -66,6 +63,28 @@ pub struct FileReader<R: Read + Seek> {
metadata: FileMetadata,
current_block: usize,
projection: Option<(Vec<usize>, Arc<Schema>)>,
buffer: Vec<u8>,
}

fn read_dictionary_message<R: Read + Seek>(
reader: &mut R,
offset: u64,
data: &mut Vec<u8>,
) -> Result<()> {
let mut message_size: [u8; 4] = [0; 4];
reader.seek(SeekFrom::Start(offset))?;
reader.read_exact(&mut message_size)?;
if message_size == CONTINUATION_MARKER {
reader.read_exact(&mut message_size)?;
};
let footer_len = i32::from_le_bytes(message_size);

// prepare `data` to read the message
data.clear();
data.resize(footer_len as usize, 0);

reader.read_exact(data)?;
Ok(())
}

/// Read the IPC file's metadata
Expand Down Expand Up @@ -109,38 +128,34 @@ pub fn read_file_metadata<R: Read + Seek>(reader: &mut R) -> Result<FileMetadata
let footer = ipc::File::root_as_footer_with_opts(&verifier_options, &footer_data[..])
.map_err(|err| ArrowError::Ipc(format!("Unable to get root as footer: {:?}", err)))?;

let blocks = footer.recordBatches().ok_or_else(|| {
ArrowError::Ipc("Unable to get record batches from IPC Footer".to_string())
})?;
let blocks = footer
.recordBatches()
.ok_or_else(|| ArrowError::Ipc("Unable to get record batches from footer".to_string()))?;

let total_blocks = blocks.len();

let ipc_schema = footer.schema().unwrap();
let ipc_schema = footer
.schema()
.ok_or_else(|| ArrowError::Ipc("Unable to get the schema from footer".to_string()))?;
let (schema, is_little_endian) = convert::fb_to_schema(ipc_schema);
let schema = Arc::new(schema);

let mut dictionaries = Default::default();

for block in footer.dictionaries().unwrap() {
// read length from end of offset
let mut message_size: [u8; 4] = [0; 4];
reader.seek(SeekFrom::Start(block.offset() as u64))?;
reader.read_exact(&mut message_size)?;
if message_size == CONTINUATION_MARKER {
reader.read_exact(&mut message_size)?;
};
let footer_len = i32::from_le_bytes(message_size);

let mut block_data = vec![0; footer_len as usize];
let dictionary_blocks = footer
.dictionaries()
.ok_or_else(|| ArrowError::Ipc("Unable to get dictionaries from footer".to_string()))?;

reader.read_exact(&mut block_data)?;
let mut data = vec![];
for block in dictionary_blocks {
let offset = block.offset() as u64;
let length = block.metaDataLength() as u64;
read_dictionary_message(reader, offset, &mut data)?;

let message = ipc::Message::root_as_message(&block_data[..])
let message = ipc::Message::root_as_message(&data)
.map_err(|err| ArrowError::Ipc(format!("Unable to get root as message: {:?}", err)))?;

match message.header_type() {
ipc::Message::MessageHeader::DictionaryBatch => {
let block_offset = block.offset() as u64 + block.metaDataLength() as u64;
let block_offset = offset + length;
let batch = message.header_as_dictionary_batch().unwrap();
read_dictionary(
batch,
Expand All @@ -163,7 +178,6 @@ pub fn read_file_metadata<R: Read + Seek>(reader: &mut R) -> Result<FileMetadata
schema,
is_little_endian,
blocks: blocks.to_vec(),
total_blocks,
dictionaries,
version: footer.version(),
})
Expand Down Expand Up @@ -194,6 +208,7 @@ pub fn read_batch<R: Read + Seek>(
metadata: &FileMetadata,
projection: Option<(&[usize], Arc<Schema>)>,
block: usize,
block_data: &mut Vec<u8>,
) -> Result<RecordBatch> {
let block = metadata.blocks[block];

Expand All @@ -205,10 +220,11 @@ pub fn read_batch<R: Read + Seek>(
// continuation marker encountered, read message next
reader.read_exact(&mut meta_buf)?;
}
let meta_len = i32::from_le_bytes(meta_buf);
let meta_len = i32::from_le_bytes(meta_buf) as usize;

let mut block_data = vec![0; meta_len as usize];
reader.read_exact(&mut block_data)?;
block_data.clear();
block_data.resize(meta_len, 0);
reader.read_exact(block_data)?;

let message = ipc::Message::root_as_message(&block_data[..])
.map_err(|err| ArrowError::Ipc(format!("Unable to get root as footer: {:?}", err)))?;
Expand Down Expand Up @@ -262,6 +278,7 @@ impl<R: Read + Seek> FileReader<R> {
metadata,
projection,
current_block: 0,
buffer: vec![],
}
}

Expand All @@ -284,7 +301,7 @@ impl<R: Read + Seek> Iterator for FileReader<R> {

fn next(&mut self) -> Option<Self::Item> {
// get current block
if self.current_block < self.metadata.total_blocks {
if self.current_block < self.metadata.blocks.len() {
let block = self.current_block;
self.current_block += 1;
Some(read_batch(
Expand All @@ -294,6 +311,7 @@ impl<R: Read + Seek> Iterator for FileReader<R> {
.as_ref()
.map(|x| (x.0.as_ref(), x.1.clone())),
block,
&mut self.buffer,
))
} else {
None
Expand All @@ -306,64 +324,3 @@ impl<R: Read + Seek> RecordBatchReader for FileReader<R> {
self.schema().as_ref()
}
}

fn limit_batch(batch: RecordBatch, limit: usize) -> RecordBatch {
if batch.num_rows() < limit {
let RecordBatch { schema, columns } = batch;
let columns = columns
.into_iter()
.map(|x| x.slice(0, limit).into())
.collect();
RecordBatch { schema, columns }
} else {
batch
}
}

/// Iterator adapter that limits the number of rows read by a
/// fallible [`Iterator`] of [`RecordBatch`]es.
/// # Implementation
/// Tracks the number of remaining rows and slices the last [`RecordBatch`] to fit exactly.
pub struct LimitRows<I: Iterator<Item = Result<RecordBatch>>> {
iterator: I,
limit: Option<usize>,
}

impl<I: Iterator<Item = Result<RecordBatch>>> LimitRows<I> {
/// Creates a new [`LimitRows`]. If `limit` is [`None`], it does not limit the iterator.
pub fn new(iterator: I, limit: Option<usize>) -> Self {
Self { iterator, limit }
}
}

impl<I: Iterator<Item = Result<RecordBatch>>> Iterator for LimitRows<I> {
type Item = Result<RecordBatch>;

fn next(&mut self) -> Option<Self::Item> {
if let Some(limit) = self.limit {
// no more rows required => finish
if limit == 0 {
return None;
}
};

let batch = self.iterator.next();

if let Some(Ok(batch)) = batch {
Some(Ok(if let Some(ref mut limit) = self.limit {
// slice the last batch if it is too large
let batch = if batch.num_rows() > *limit {
limit_batch(batch, *limit)
} else {
batch
};
*limit -= batch.num_rows();
batch
} else {
batch
}))
} else {
batch
}
}
}
47 changes: 30 additions & 17 deletions src/io/ipc/read/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,15 +109,17 @@ impl StreamState {

/// Reads the next item, yielding `None` if the stream is done,
/// and a [`StreamState`] otherwise.
pub fn read_next<R: Read>(
fn read_next<R: Read>(
reader: &mut R,
metadata: &StreamMetadata,
dictionaries: &mut HashMap<usize, Arc<dyn Array>>,
message_buffer: &mut Vec<u8>,
data_buffer: &mut Vec<u8>,
) -> Result<Option<StreamState>> {
// determine metadata length
let mut meta_size: [u8; 4] = [0; 4];
let mut meta_length: [u8; 4] = [0; 4];

match reader.read_exact(&mut meta_size) {
match reader.read_exact(&mut meta_length) {
Ok(()) => (),
Err(e) => {
return if e.kind() == std::io::ErrorKind::UnexpectedEof {
Expand All @@ -131,25 +133,25 @@ pub fn read_next<R: Read>(
}
}

let meta_len = {
let meta_length = {
// If a continuation marker is encountered, skip over it and read
// the size from the next four bytes.
if meta_size == CONTINUATION_MARKER {
reader.read_exact(&mut meta_size)?;
if meta_length == CONTINUATION_MARKER {
reader.read_exact(&mut meta_length)?;
}
i32::from_le_bytes(meta_size)
i32::from_le_bytes(meta_length) as usize
};

if meta_len == 0 {
if meta_length == 0 {
// the stream has ended, mark the reader as finished
return Ok(None);
}

let mut meta_buffer = vec![0; meta_len as usize];
reader.read_exact(&mut meta_buffer)?;
message_buffer.clear();
message_buffer.resize(meta_length, 0);
reader.read_exact(message_buffer)?;

let vecs = &meta_buffer.to_vec();
let message = ipc::Message::root_as_message(vecs)
let message = ipc::Message::root_as_message(message_buffer)
.map_err(|err| ArrowError::Ipc(format!("Unable to get root as message: {:?}", err)))?;

match message.header_type() {
Expand All @@ -161,10 +163,11 @@ pub fn read_next<R: Read>(
ArrowError::Ipc("Unable to read IPC message as record batch".to_string())
})?;
// read the block that makes up the record batch into a buffer
let mut buf = vec![0; message.bodyLength() as usize];
reader.read_exact(&mut buf)?;
data_buffer.clear();
data_buffer.resize(message.bodyLength() as usize, 0);
reader.read_exact(data_buffer)?;

let mut reader = std::io::Cursor::new(buf);
let mut reader = std::io::Cursor::new(data_buffer);

read_record_batch(
batch,
Expand Down Expand Up @@ -198,7 +201,7 @@ pub fn read_next<R: Read>(
)?;

// read the next message until we encounter a RecordBatch
read_next(reader, metadata, dictionaries)
read_next(reader, metadata, dictionaries, message_buffer, data_buffer)
}
ipc::Message::MessageHeader::NONE => Ok(Some(StreamState::Waiting)),
t => Err(ArrowError::Ipc(format!(
Expand All @@ -219,6 +222,8 @@ pub struct StreamReader<R: Read> {
metadata: StreamMetadata,
dictionaries: HashMap<usize, Arc<dyn Array>>,
finished: bool,
data_buffer: Vec<u8>,
message_buffer: Vec<u8>,
}

impl<R: Read> StreamReader<R> {
Expand All @@ -233,6 +238,8 @@ impl<R: Read> StreamReader<R> {
metadata,
dictionaries: Default::default(),
finished: false,
data_buffer: vec![],
message_buffer: vec![],
}
}

Expand All @@ -250,7 +257,13 @@ impl<R: Read> StreamReader<R> {
if self.finished {
return Ok(None);
}
let batch = read_next(&mut self.reader, &self.metadata, &mut self.dictionaries)?;
let batch = read_next(
&mut self.reader,
&self.metadata,
&mut self.dictionaries,
&mut self.message_buffer,
&mut self.data_buffer,
)?;
if batch.is_none() {
self.finished = true;
}
Expand Down
4 changes: 2 additions & 2 deletions src/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use crate::error::{ArrowError, Result};
/// Cloning is `O(C)` where `C` is the number of columns.
#[derive(Clone, Debug, PartialEq)]
pub struct RecordBatch {
pub(crate) schema: Arc<Schema>,
pub(crate) columns: Vec<Arc<dyn Array>>,
schema: Arc<Schema>,
columns: Vec<Arc<dyn Array>>,
}

impl RecordBatch {
Expand Down

0 comments on commit 46b582b

Please sign in to comment.