From 53d246d1470c8d08cdda7e5eb1f0709537aa5e13 Mon Sep 17 00:00:00 2001 From: Jorge Leitao Date: Sat, 30 Apr 2022 11:41:50 +0100 Subject: [PATCH] Made reading dictionaries lazy (#971) --- src/io/flight/mod.rs | 4 +- src/io/ipc/read/file_async.rs | 136 +++++++++++++++++++--------------- src/io/ipc/read/reader.rs | 84 +++++++++++++++------ 3 files changed, 136 insertions(+), 88 deletions(-) diff --git a/src/io/flight/mod.rs b/src/io/flight/mod.rs index f0bf1d73cd1..5202dc89c13 100644 --- a/src/io/flight/mod.rs +++ b/src/io/flight/mod.rs @@ -120,8 +120,6 @@ pub fn deserialize_batch( let mut reader = std::io::Cursor::new(&data.data_body); - let version = message.version()?; - match message.header()?.ok_or_else(|| { ArrowError::oos("Unable to convert flight data header to a record batch".to_string()) })? { @@ -131,7 +129,7 @@ pub fn deserialize_batch( ipc_schema, None, dictionaries, - version, + message.version()?, &mut reader, 0, ), diff --git a/src/io/ipc/read/file_async.rs b/src/io/ipc/read/file_async.rs index 13530dc2809..05a0a468779 100644 --- a/src/io/ipc/read/file_async.rs +++ b/src/io/ipc/read/file_async.rs @@ -3,10 +3,7 @@ use std::collections::HashMap; use std::io::SeekFrom; use std::sync::Arc; -use arrow_format::ipc::{ - planus::{ReadAsRoot, Vector}, - BlockRef, MessageHeaderRef, MessageRef, -}; +use arrow_format::ipc::{planus::ReadAsRoot, Block, MessageHeaderRef, MessageRef}; use futures::{ stream::BoxStream, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, Stream, StreamExt, }; @@ -49,7 +46,7 @@ impl<'a> FileStream<'a> { (None, None) }; - let stream = Self::stream(reader, metadata.clone(), projection); + let stream = Self::stream(reader, None, metadata.clone(), projection); Self { stream, metadata, @@ -69,6 +66,7 @@ impl<'a> FileStream<'a> { fn stream( mut reader: R, + mut dictionaries: Option, metadata: FileMetadata, projection: Option<(Vec, HashMap)>, ) -> BoxStream<'a, Result>>> @@ -76,11 +74,15 @@ impl<'a> FileStream<'a> { R: AsyncRead + AsyncSeek + Unpin + Send + 'a, { async_stream::try_stream! { + // read dictionaries + cached_read_dictionaries(&mut reader, &metadata, &mut dictionaries).await?; + let mut meta_buffer = vec![]; let mut block_buffer = vec![]; for block in 0..metadata.blocks.len() { let chunk = read_batch( &mut reader, + dictionaries.as_mut().unwrap(), &metadata, projection.as_ref().map(|x| x.0.as_ref()), block, @@ -143,28 +145,58 @@ where reader.seek(SeekFrom::End(-10 - footer_size as i64)).await?; reader.read_exact(&mut footer).await?; - let (mut metadata, dictionary_blocks) = deserialize_footer(&footer)?; + deserialize_footer(&footer) +} - metadata.dictionaries = if let Some(blocks) = dictionary_blocks { - read_dictionaries( - reader, - &metadata.schema.fields, - &metadata.ipc_schema, - blocks, - ) - .await? - } else { - Default::default() - }; +async fn read_batch( + mut reader: R, + dictionaries: &mut Dictionaries, + metadata: &FileMetadata, + projection: Option<&[usize]>, + block: usize, + meta_buffer: &mut Vec, + block_buffer: &mut Vec, +) -> Result>> +where + R: AsyncRead + AsyncSeek + Unpin, +{ + let block = metadata.blocks[block]; + reader.seek(SeekFrom::Start(block.offset as u64)).await?; + let mut meta_buf = [0; 4]; + reader.read_exact(&mut meta_buf).await?; + if meta_buf == CONTINUATION_MARKER { + reader.read_exact(&mut meta_buf).await?; + } + let meta_len = i32::from_le_bytes(meta_buf) as usize; + meta_buffer.clear(); + meta_buffer.resize(meta_len, 0); + reader.read_exact(meta_buffer).await?; + + let message = MessageRef::read_as_root(&meta_buffer[..]) + .map_err(|err| ArrowError::oos(format!("unable to parse message: {:?}", err)))?; + let batch = get_serialized_batch(&message)?; + block_buffer.clear(); + block_buffer.resize(message.body_length()? as usize, 0); + reader.read_exact(block_buffer).await?; + let mut cursor = std::io::Cursor::new(block_buffer); - Ok(metadata) + read_record_batch( + batch, + &metadata.schema.fields, + &metadata.ipc_schema, + projection, + dictionaries, + message.version()?, + &mut cursor, + 0, + ) } async fn read_dictionaries( mut reader: R, fields: &[Field], ipc_schema: &IpcSchema, - blocks: Vector<'_, BlockRef<'_>>, + blocks: &[Block], ) -> Result where R: AsyncRead + AsyncSeek + Unpin, @@ -174,7 +206,8 @@ where let mut buffer = vec![]; for block in blocks { - let offset = block.offset() as u64; + let offset = block.offset as u64; + let length = block.body_length as usize; read_dictionary_message(&mut reader, offset, &mut data).await?; let message = MessageRef::read_as_root(&data).map_err(|err| { @@ -186,7 +219,7 @@ where match header { MessageHeaderRef::DictionaryBatch(batch) => { buffer.clear(); - buffer.resize(block.body_length() as usize, 0); + buffer.resize(length, 0); reader.read_exact(&mut buffer).await?; let mut cursor = std::io::Cursor::new(&mut buffer); read_dictionary(batch, fields, ipc_schema, &mut dictionaries, &mut cursor, 0)?; @@ -220,45 +253,26 @@ where Ok(()) } -async fn read_batch( - mut reader: R, +async fn cached_read_dictionaries( + reader: &mut R, metadata: &FileMetadata, - projection: Option<&[usize]>, - block: usize, - meta_buffer: &mut Vec, - block_buffer: &mut Vec, -) -> Result>> -where - R: AsyncRead + AsyncSeek + Unpin, -{ - let block = metadata.blocks[block]; - reader.seek(SeekFrom::Start(block.offset as u64)).await?; - let mut meta_buf = [0; 4]; - reader.read_exact(&mut meta_buf).await?; - if meta_buf == CONTINUATION_MARKER { - reader.read_exact(&mut meta_buf).await?; - } - let meta_len = i32::from_le_bytes(meta_buf) as usize; - meta_buffer.clear(); - meta_buffer.resize(meta_len, 0); - reader.read_exact(meta_buffer).await?; - - let message = MessageRef::read_as_root(&meta_buffer[..]) - .map_err(|err| ArrowError::oos(format!("unable to parse message: {:?}", err)))?; - let batch = get_serialized_batch(&message)?; - block_buffer.clear(); - block_buffer.resize(message.body_length()? as usize, 0); - reader.read_exact(block_buffer).await?; - let mut cursor = std::io::Cursor::new(block_buffer); - let chunk = read_record_batch( - batch, - &metadata.schema.fields, - &metadata.ipc_schema, - projection, - &metadata.dictionaries, - message.version()?, - &mut cursor, - 0, - )?; - Ok(chunk) + dictionaries: &mut Option, +) -> Result<()> { + match (&dictionaries, metadata.dictionaries.as_deref()) { + (None, Some(blocks)) => { + let new_dictionaries = read_dictionaries( + reader, + &metadata.schema.fields, + &metadata.ipc_schema, + blocks, + ) + .await?; + *dictionaries = Some(new_dictionaries); + } + (None, None) => { + *dictionaries = Some(Default::default()); + } + _ => {} + }; + Ok(()) } diff --git a/src/io/ipc/read/reader.rs b/src/io/ipc/read/reader.rs index 307bfb6357d..a951105bd35 100644 --- a/src/io/ipc/read/reader.rs +++ b/src/io/ipc/read/reader.rs @@ -13,7 +13,7 @@ use super::super::{ARROW_MAGIC, CONTINUATION_MARKER}; use super::common::*; use super::schema::fb_to_schema; use super::Dictionaries; -use arrow_format::ipc::planus::{ReadAsRoot, Vector}; +use arrow_format::ipc::planus::ReadAsRoot; /// Metadata of an Arrow IPC file, written in the footer of the file. #[derive(Debug, Clone)] @@ -30,13 +30,15 @@ pub struct FileMetadata { pub(super) blocks: Vec, /// Dictionaries associated to each dict_id - pub(super) dictionaries: Dictionaries, + pub(super) dictionaries: Option>, } /// Arrow File reader pub struct FileReader { reader: R, metadata: FileMetadata, + // the dictionaries are going to be read + dictionaries: Option, current_block: usize, projection: Option<(Vec, HashMap, Schema)>, buffer: Vec, @@ -53,11 +55,11 @@ fn read_dictionary_message( if message_size == CONTINUATION_MARKER { reader.read_exact(&mut message_size)?; }; - let footer_len = i32::from_le_bytes(message_size); + let message_length = i32::from_le_bytes(message_size); // prepare `data` to read the message data.clear(); - data.resize(footer_len as usize, 0); + data.resize(message_length as usize, 0); reader.read_exact(data)?; Ok(()) @@ -67,14 +69,14 @@ fn read_dictionaries( reader: &mut R, fields: &[Field], ipc_schema: &IpcSchema, - blocks: Vector, + blocks: &[arrow_format::ipc::Block], ) -> Result { let mut dictionaries = Default::default(); let mut data = vec![]; for block in blocks { - let offset = block.offset() as u64; - let length = block.meta_data_length() as u64; + let offset = block.offset as u64; + let length = block.meta_data_length as u64; read_dictionary_message(reader, offset, &mut data)?; let message = arrow_format::ipc::MessageRef::read_as_root(&data).map_err(|err| { @@ -127,9 +129,7 @@ fn read_footer_len(reader: &mut R) -> Result { .map_err(|_| ArrowError::oos("The footer's lenght must be a positive number")) } -pub(super) fn deserialize_footer( - footer_data: &[u8], -) -> Result<(FileMetadata, Option>)> { +pub(super) fn deserialize_footer(footer_data: &[u8]) -> Result { let footer = arrow_format::ipc::FooterRef::read_as_root(footer_data) .map_err(|err| ArrowError::OutOfSpec(format!("Unable to get root as footer: {:?}", err)))?; @@ -147,15 +147,22 @@ pub(super) fn deserialize_footer( .ok_or_else(|| ArrowError::OutOfSpec("Unable to get the schema from footer".to_string()))?; let (schema, ipc_schema) = fb_to_schema(ipc_schema)?; - Ok(( - FileMetadata { - schema, - ipc_schema, - blocks, - dictionaries: Default::default(), - }, - footer.dictionaries()?, - )) + let dictionaries = footer + .dictionaries()? + .map(|dictionaries| { + dictionaries + .into_iter() + .map(|x| Ok(x.try_into()?)) + .collect::>>() + }) + .transpose()?; + + Ok(FileMetadata { + schema, + ipc_schema, + blocks, + dictionaries, + }) } /// Read the IPC file's metadata @@ -176,8 +183,9 @@ pub fn read_file_metadata(reader: &mut R) -> Result(reader: &mut R) -> Result( @@ -214,6 +220,7 @@ pub(super) fn get_serialized_batch<'a>( /// Read a batch from the reader. pub fn read_batch( reader: &mut R, + dictionaries: &Dictionaries, metadata: &FileMetadata, projection: Option<&[usize]>, block: usize, @@ -245,7 +252,7 @@ pub fn read_batch( &metadata.schema.fields, &metadata.ipc_schema, projection, - &metadata.dictionaries, + dictionaries, message.version()?, reader, block.offset as u64 + block.meta_data_length as u64, @@ -268,6 +275,7 @@ impl FileReader { Self { reader, metadata, + dictionaries: Default::default(), projection, current_block: 0, buffer: vec![], @@ -291,6 +299,28 @@ impl FileReader { pub fn into_inner(self) -> R { self.reader } + + fn read_dictionaries(&mut self) -> Result<()> { + match ( + &mut self.dictionaries, + self.metadata.dictionaries.as_deref(), + ) { + (None, Some(blocks)) => { + let dictionaries = read_dictionaries( + &mut self.reader, + &self.metadata.schema.fields, + &self.metadata.ipc_schema, + blocks, + )?; + self.dictionaries = Some(dictionaries); + } + (None, None) => { + self.dictionaries = Some(Default::default()); + } + _ => {} + }; + Ok(()) + } } impl Iterator for FileReader { @@ -302,11 +332,17 @@ impl Iterator for FileReader { return None; } + match self.read_dictionaries() { + Ok(_) => {} + Err(e) => return Some(Err(e)), + }; + let block = self.current_block; self.current_block += 1; let chunk = read_batch( &mut self.reader, + self.dictionaries.as_ref().unwrap(), &self.metadata, self.projection.as_ref().map(|x| x.0.as_ref()), block,