diff --git a/src/io/ipc/read/reader.rs b/src/io/ipc/read/reader.rs index e9a8328cfde..26203aff7c7 100644 --- a/src/io/ipc/read/reader.rs +++ b/src/io/ipc/read/reader.rs @@ -41,9 +41,6 @@ pub struct FileMetadata { /// A block indicates the regions in the file to read to get data blocks: Vec, - /// The total number of blocks, which may contain record batches and other types - total_blocks: usize, - /// Dictionaries associated to each dict_id dictionaries: HashMap>, @@ -66,6 +63,28 @@ pub struct FileReader { metadata: FileMetadata, current_block: usize, projection: Option<(Vec, Arc)>, + buffer: Vec, +} + +fn read_dictionary_message( + reader: &mut R, + offset: u64, + data: &mut Vec, +) -> Result<()> { + let mut message_size: [u8; 4] = [0; 4]; + reader.seek(SeekFrom::Start(offset))?; + reader.read_exact(&mut message_size)?; + if message_size == CONTINUATION_MARKER { + reader.read_exact(&mut message_size)?; + }; + let footer_len = i32::from_le_bytes(message_size); + + // prepare `data` to read the message + data.clear(); + data.resize(footer_len as usize, 0); + + reader.read_exact(data)?; + Ok(()) } /// Read the IPC file's metadata @@ -109,38 +128,34 @@ pub fn read_file_metadata(reader: &mut R) -> Result { - let block_offset = block.offset() as u64 + block.metaDataLength() as u64; + let block_offset = offset + length; let batch = message.header_as_dictionary_batch().unwrap(); read_dictionary( batch, @@ -163,19 +178,38 @@ pub fn read_file_metadata(reader: &mut R) -> Result( + message: &'a ipc::Message::Message, +) -> Result> { + match message.header_type() { + ipc::Message::MessageHeader::Schema => Err(ArrowError::Ipc( + "Not expecting a schema when messages are read".to_string(), + )), + ipc::Message::MessageHeader::RecordBatch => { + message.header_as_record_batch().ok_or_else(|| { + ArrowError::Ipc("Unable to read IPC message as record batch".to_string()) + }) + } + t => Err(ArrowError::Ipc(format!( + "Reading types other than record batches not yet supported, unable to read {:?}", + t + ))), + } +} + +/// Read a batch from the reader. pub fn read_batch( reader: &mut R, metadata: &FileMetadata, projection: Option<(&[usize], Arc)>, block: usize, -) -> Result> { + block_data: &mut Vec, +) -> Result { let block = metadata.blocks[block]; // read length @@ -186,10 +220,11 @@ pub fn read_batch( // continuation marker encountered, read message next reader.read_exact(&mut meta_buf)?; } - let meta_len = i32::from_le_bytes(meta_buf); + let meta_len = i32::from_le_bytes(meta_buf) as usize; - let mut block_data = vec![0; meta_len as usize]; - reader.read_exact(&mut block_data)?; + block_data.clear(); + block_data.resize(meta_len, 0); + reader.read_exact(block_data)?; let message = ipc::Message::root_as_message(&block_data[..]) .map_err(|err| ArrowError::Ipc(format!("Unable to get root as footer: {:?}", err)))?; @@ -202,32 +237,18 @@ pub fn read_batch( )); } - match message.header_type() { - ipc::Message::MessageHeader::Schema => Err(ArrowError::Ipc( - "Not expecting a schema when messages are read".to_string(), - )), - ipc::Message::MessageHeader::RecordBatch => { - let batch = message.header_as_record_batch().ok_or_else(|| { - ArrowError::Ipc("Unable to read IPC message as record batch".to_string()) - })?; - read_record_batch( - batch, - metadata.schema.clone(), - projection, - metadata.is_little_endian, - &metadata.dictionaries, - metadata.version, - reader, - block.offset() as u64 + block.metaDataLength() as u64, - ) - .map(Some) - } - ipc::Message::MessageHeader::NONE => Ok(None), - t => Err(ArrowError::Ipc(format!( - "Reading types other than record batches not yet supported, unable to read {:?}", - t - ))), - } + let batch = get_serialized_batch(&message)?; + + read_record_batch( + batch, + metadata.schema.clone(), + projection, + metadata.is_little_endian, + &metadata.dictionaries, + metadata.version, + reader, + block.offset() as u64 + block.metaDataLength() as u64, + ) } impl FileReader { @@ -257,6 +278,7 @@ impl FileReader { metadata, projection, current_block: 0, + buffer: vec![], } } @@ -279,18 +301,18 @@ impl Iterator for FileReader { fn next(&mut self) -> Option { // get current block - if self.current_block < self.metadata.total_blocks { + if self.current_block < self.metadata.blocks.len() { let block = self.current_block; self.current_block += 1; - read_batch( + Some(read_batch( &mut self.reader, &self.metadata, self.projection .as_ref() .map(|x| (x.0.as_ref(), x.1.clone())), block, - ) - .transpose() + &mut self.buffer, + )) } else { None } diff --git a/src/io/ipc/read/stream.rs b/src/io/ipc/read/stream.rs index d915d9fa79a..7e96e78c229 100644 --- a/src/io/ipc/read/stream.rs +++ b/src/io/ipc/read/stream.rs @@ -109,15 +109,17 @@ impl StreamState { /// Reads the next item, yielding `None` if the stream is done, /// and a [`StreamState`] otherwise. -pub fn read_next( +fn read_next( reader: &mut R, metadata: &StreamMetadata, dictionaries: &mut HashMap>, + message_buffer: &mut Vec, + data_buffer: &mut Vec, ) -> Result> { // determine metadata length - let mut meta_size: [u8; 4] = [0; 4]; + let mut meta_length: [u8; 4] = [0; 4]; - match reader.read_exact(&mut meta_size) { + match reader.read_exact(&mut meta_length) { Ok(()) => (), Err(e) => { return if e.kind() == std::io::ErrorKind::UnexpectedEof { @@ -131,25 +133,25 @@ pub fn read_next( } } - let meta_len = { + let meta_length = { // If a continuation marker is encountered, skip over it and read // the size from the next four bytes. - if meta_size == CONTINUATION_MARKER { - reader.read_exact(&mut meta_size)?; + if meta_length == CONTINUATION_MARKER { + reader.read_exact(&mut meta_length)?; } - i32::from_le_bytes(meta_size) + i32::from_le_bytes(meta_length) as usize }; - if meta_len == 0 { + if meta_length == 0 { // the stream has ended, mark the reader as finished return Ok(None); } - let mut meta_buffer = vec![0; meta_len as usize]; - reader.read_exact(&mut meta_buffer)?; + message_buffer.clear(); + message_buffer.resize(meta_length, 0); + reader.read_exact(message_buffer)?; - let vecs = &meta_buffer.to_vec(); - let message = ipc::Message::root_as_message(vecs) + let message = ipc::Message::root_as_message(message_buffer) .map_err(|err| ArrowError::Ipc(format!("Unable to get root as message: {:?}", err)))?; match message.header_type() { @@ -161,10 +163,11 @@ pub fn read_next( ArrowError::Ipc("Unable to read IPC message as record batch".to_string()) })?; // read the block that makes up the record batch into a buffer - let mut buf = vec![0; message.bodyLength() as usize]; - reader.read_exact(&mut buf)?; + data_buffer.clear(); + data_buffer.resize(message.bodyLength() as usize, 0); + reader.read_exact(data_buffer)?; - let mut reader = std::io::Cursor::new(buf); + let mut reader = std::io::Cursor::new(data_buffer); read_record_batch( batch, @@ -198,7 +201,7 @@ pub fn read_next( )?; // read the next message until we encounter a RecordBatch - read_next(reader, metadata, dictionaries) + read_next(reader, metadata, dictionaries, message_buffer, data_buffer) } ipc::Message::MessageHeader::NONE => Ok(Some(StreamState::Waiting)), t => Err(ArrowError::Ipc(format!( @@ -219,6 +222,8 @@ pub struct StreamReader { metadata: StreamMetadata, dictionaries: HashMap>, finished: bool, + data_buffer: Vec, + message_buffer: Vec, } impl StreamReader { @@ -233,6 +238,8 @@ impl StreamReader { metadata, dictionaries: Default::default(), finished: false, + data_buffer: vec![], + message_buffer: vec![], } } @@ -250,7 +257,13 @@ impl StreamReader { if self.finished { return Ok(None); } - let batch = read_next(&mut self.reader, &self.metadata, &mut self.dictionaries)?; + let batch = read_next( + &mut self.reader, + &self.metadata, + &mut self.dictionaries, + &mut self.message_buffer, + &mut self.data_buffer, + )?; if batch.is_none() { self.finished = true; }