From 46b582b29ef2e5a13e579cbe51e4839770556f23 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Tue, 16 Nov 2021 20:04:50 +0000 Subject: [PATCH] Removed bunch of allocations. --- src/io/ipc/read/mod.rs | 2 +- src/io/ipc/read/reader.rs | 135 +++++++++++++------------------------- src/io/ipc/read/stream.rs | 47 ++++++++----- src/record_batch.rs | 4 +- 4 files changed, 79 insertions(+), 109 deletions(-) diff --git a/src/io/ipc/read/mod.rs b/src/io/ipc/read/mod.rs index a374df53287..adc2b41e477 100644 --- a/src/io/ipc/read/mod.rs +++ b/src/io/ipc/read/mod.rs @@ -13,5 +13,5 @@ mod reader; mod stream; pub use common::{read_dictionary, read_record_batch}; -pub use reader::{read_file_metadata, FileMetadata, FileReader, LimitRows}; +pub use reader::{read_file_metadata, FileMetadata, FileReader}; pub use stream::{read_stream_metadata, StreamMetadata, StreamReader, StreamState}; diff --git a/src/io/ipc/read/reader.rs b/src/io/ipc/read/reader.rs index 9066542743e..26203aff7c7 100644 --- a/src/io/ipc/read/reader.rs +++ b/src/io/ipc/read/reader.rs @@ -41,9 +41,6 @@ pub struct FileMetadata { /// A block indicates the regions in the file to read to get data blocks: Vec, - /// The total number of blocks, which may contain record batches and other types - total_blocks: usize, - /// Dictionaries associated to each dict_id dictionaries: HashMap>, @@ -66,6 +63,28 @@ pub struct FileReader { metadata: FileMetadata, current_block: usize, projection: Option<(Vec, Arc)>, + buffer: Vec, +} + +fn read_dictionary_message( + reader: &mut R, + offset: u64, + data: &mut Vec, +) -> Result<()> { + let mut message_size: [u8; 4] = [0; 4]; + reader.seek(SeekFrom::Start(offset))?; + reader.read_exact(&mut message_size)?; + if message_size == CONTINUATION_MARKER { + reader.read_exact(&mut message_size)?; + }; + let footer_len = i32::from_le_bytes(message_size); + + // prepare `data` to read the message + data.clear(); + data.resize(footer_len as usize, 0); + + reader.read_exact(data)?; + Ok(()) } /// Read the IPC file's metadata @@ -109,38 +128,34 @@ pub fn read_file_metadata(reader: &mut R) -> Result { - let block_offset = block.offset() as u64 + block.metaDataLength() as u64; + let block_offset = offset + length; let batch = message.header_as_dictionary_batch().unwrap(); read_dictionary( batch, @@ -163,7 +178,6 @@ pub fn read_file_metadata(reader: &mut R) -> Result( metadata: &FileMetadata, projection: Option<(&[usize], Arc)>, block: usize, + block_data: &mut Vec, ) -> Result { let block = metadata.blocks[block]; @@ -205,10 +220,11 @@ pub fn read_batch( // continuation marker encountered, read message next reader.read_exact(&mut meta_buf)?; } - let meta_len = i32::from_le_bytes(meta_buf); + let meta_len = i32::from_le_bytes(meta_buf) as usize; - let mut block_data = vec![0; meta_len as usize]; - reader.read_exact(&mut block_data)?; + block_data.clear(); + block_data.resize(meta_len, 0); + reader.read_exact(block_data)?; let message = ipc::Message::root_as_message(&block_data[..]) .map_err(|err| ArrowError::Ipc(format!("Unable to get root as footer: {:?}", err)))?; @@ -262,6 +278,7 @@ impl FileReader { metadata, projection, current_block: 0, + buffer: vec![], } } @@ -284,7 +301,7 @@ impl Iterator for FileReader { fn next(&mut self) -> Option { // get current block - if self.current_block < self.metadata.total_blocks { + if self.current_block < self.metadata.blocks.len() { let block = self.current_block; self.current_block += 1; Some(read_batch( @@ -294,6 +311,7 @@ impl Iterator for FileReader { .as_ref() .map(|x| (x.0.as_ref(), x.1.clone())), block, + &mut self.buffer, )) } else { None @@ -306,64 +324,3 @@ impl RecordBatchReader for FileReader { self.schema().as_ref() } } - -fn limit_batch(batch: RecordBatch, limit: usize) -> RecordBatch { - if batch.num_rows() < limit { - let RecordBatch { schema, columns } = batch; - let columns = columns - .into_iter() - .map(|x| x.slice(0, limit).into()) - .collect(); - RecordBatch { schema, columns } - } else { - batch - } -} - -/// Iterator adapter that limits the number of rows read by a -/// fallible [`Iterator`] of [`RecordBatch`]es. -/// # Implementation -/// Tracks the number of remaining rows and slices the last [`RecordBatch`] to fit exactly. -pub struct LimitRows>> { - iterator: I, - limit: Option, -} - -impl>> LimitRows { - /// Creates a new [`LimitRows`]. If `limit` is [`None`], it does not limit the iterator. - pub fn new(iterator: I, limit: Option) -> Self { - Self { iterator, limit } - } -} - -impl>> Iterator for LimitRows { - type Item = Result; - - fn next(&mut self) -> Option { - if let Some(limit) = self.limit { - // no more rows required => finish - if limit == 0 { - return None; - } - }; - - let batch = self.iterator.next(); - - if let Some(Ok(batch)) = batch { - Some(Ok(if let Some(ref mut limit) = self.limit { - // slice the last batch if it is too large - let batch = if batch.num_rows() > *limit { - limit_batch(batch, *limit) - } else { - batch - }; - *limit -= batch.num_rows(); - batch - } else { - batch - })) - } else { - batch - } - } -} diff --git a/src/io/ipc/read/stream.rs b/src/io/ipc/read/stream.rs index d915d9fa79a..7e96e78c229 100644 --- a/src/io/ipc/read/stream.rs +++ b/src/io/ipc/read/stream.rs @@ -109,15 +109,17 @@ impl StreamState { /// Reads the next item, yielding `None` if the stream is done, /// and a [`StreamState`] otherwise. -pub fn read_next( +fn read_next( reader: &mut R, metadata: &StreamMetadata, dictionaries: &mut HashMap>, + message_buffer: &mut Vec, + data_buffer: &mut Vec, ) -> Result> { // determine metadata length - let mut meta_size: [u8; 4] = [0; 4]; + let mut meta_length: [u8; 4] = [0; 4]; - match reader.read_exact(&mut meta_size) { + match reader.read_exact(&mut meta_length) { Ok(()) => (), Err(e) => { return if e.kind() == std::io::ErrorKind::UnexpectedEof { @@ -131,25 +133,25 @@ pub fn read_next( } } - let meta_len = { + let meta_length = { // If a continuation marker is encountered, skip over it and read // the size from the next four bytes. - if meta_size == CONTINUATION_MARKER { - reader.read_exact(&mut meta_size)?; + if meta_length == CONTINUATION_MARKER { + reader.read_exact(&mut meta_length)?; } - i32::from_le_bytes(meta_size) + i32::from_le_bytes(meta_length) as usize }; - if meta_len == 0 { + if meta_length == 0 { // the stream has ended, mark the reader as finished return Ok(None); } - let mut meta_buffer = vec![0; meta_len as usize]; - reader.read_exact(&mut meta_buffer)?; + message_buffer.clear(); + message_buffer.resize(meta_length, 0); + reader.read_exact(message_buffer)?; - let vecs = &meta_buffer.to_vec(); - let message = ipc::Message::root_as_message(vecs) + let message = ipc::Message::root_as_message(message_buffer) .map_err(|err| ArrowError::Ipc(format!("Unable to get root as message: {:?}", err)))?; match message.header_type() { @@ -161,10 +163,11 @@ pub fn read_next( ArrowError::Ipc("Unable to read IPC message as record batch".to_string()) })?; // read the block that makes up the record batch into a buffer - let mut buf = vec![0; message.bodyLength() as usize]; - reader.read_exact(&mut buf)?; + data_buffer.clear(); + data_buffer.resize(message.bodyLength() as usize, 0); + reader.read_exact(data_buffer)?; - let mut reader = std::io::Cursor::new(buf); + let mut reader = std::io::Cursor::new(data_buffer); read_record_batch( batch, @@ -198,7 +201,7 @@ pub fn read_next( )?; // read the next message until we encounter a RecordBatch - read_next(reader, metadata, dictionaries) + read_next(reader, metadata, dictionaries, message_buffer, data_buffer) } ipc::Message::MessageHeader::NONE => Ok(Some(StreamState::Waiting)), t => Err(ArrowError::Ipc(format!( @@ -219,6 +222,8 @@ pub struct StreamReader { metadata: StreamMetadata, dictionaries: HashMap>, finished: bool, + data_buffer: Vec, + message_buffer: Vec, } impl StreamReader { @@ -233,6 +238,8 @@ impl StreamReader { metadata, dictionaries: Default::default(), finished: false, + data_buffer: vec![], + message_buffer: vec![], } } @@ -250,7 +257,13 @@ impl StreamReader { if self.finished { return Ok(None); } - let batch = read_next(&mut self.reader, &self.metadata, &mut self.dictionaries)?; + let batch = read_next( + &mut self.reader, + &self.metadata, + &mut self.dictionaries, + &mut self.message_buffer, + &mut self.data_buffer, + )?; if batch.is_none() { self.finished = true; } diff --git a/src/record_batch.rs b/src/record_batch.rs index 6ead0ac213c..02182af41df 100644 --- a/src/record_batch.rs +++ b/src/record_batch.rs @@ -11,8 +11,8 @@ use crate::error::{ArrowError, Result}; /// Cloning is `O(C)` where `C` is the number of columns. #[derive(Clone, Debug, PartialEq)] pub struct RecordBatch { - pub(crate) schema: Arc, - pub(crate) columns: Vec>, + schema: Arc, + columns: Vec>, } impl RecordBatch {