Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Made reading dictionaries lazy
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Apr 30, 2022
1 parent 3a3e41b commit 48902a5
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 90 deletions.
6 changes: 2 additions & 4 deletions src/io/flight/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ pub fn deserialize_batch(
data: &FlightData,
fields: &[Field],
ipc_schema: &IpcSchema,
dictionaries: &read::Dictionaries,
dictionaries: &mut read::Dictionaries,
) -> Result<Chunk<Arc<dyn Array>>> {
// check that the data_header is a record batch message
let message =
Expand All @@ -120,8 +120,6 @@ pub fn deserialize_batch(

let mut reader = std::io::Cursor::new(&data.data_body);

let version = message.version()?;

match message.header()?.ok_or_else(|| {
ArrowError::oos("Unable to convert flight data header to a record batch".to_string())
})? {
Expand All @@ -131,7 +129,7 @@ pub fn deserialize_batch(
ipc_schema,
None,
dictionaries,
version,
message.version()?,
&mut reader,
0,
),
Expand Down
12 changes: 12 additions & 0 deletions src/io/ipc/read/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,18 @@ use super::Dictionaries;

type ArrayRef = Arc<dyn Array>;

/// Auxiliary metadata to deserialize Arrow IPC data
#[derive(Clone, Copy, PartialEq)]
pub struct Metadata<'a> {
/// The fields
pub fields: &'a [Field],
/// The [`IpcSchema`]
pub ipc_schema: &'a IpcSchema,
/// the version
pub version: arrow_format::ipc::MetadataVersion,
//pub dictionary_blocks: &'a [arrow_format::ipc::Block],
}

#[derive(Debug, Eq, PartialEq, Hash)]
enum ProjectionResult<A> {
Selected(A),
Expand Down
136 changes: 75 additions & 61 deletions src/io/ipc/read/file_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@ use std::collections::HashMap;
use std::io::SeekFrom;
use std::sync::Arc;

use arrow_format::ipc::{
planus::{ReadAsRoot, Vector},
BlockRef, MessageHeaderRef, MessageRef,
};
use arrow_format::ipc::{planus::ReadAsRoot, Block, MessageHeaderRef, MessageRef};
use futures::{
stream::BoxStream, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, Stream, StreamExt,
};
Expand Down Expand Up @@ -49,7 +46,7 @@ impl<'a> FileStream<'a> {
(None, None)
};

let stream = Self::stream(reader, metadata.clone(), projection);
let stream = Self::stream(reader, None, metadata.clone(), projection);
Self {
stream,
metadata,
Expand All @@ -69,18 +66,23 @@ impl<'a> FileStream<'a> {

fn stream<R>(
mut reader: R,
mut dictionaries: Option<Dictionaries>,
metadata: FileMetadata,
projection: Option<(Vec<usize>, HashMap<usize, usize>)>,
) -> BoxStream<'a, Result<Chunk<Arc<dyn Array>>>>
where
R: AsyncRead + AsyncSeek + Unpin + Send + 'a,
{
async_stream::try_stream! {
// read dictionaries
cached_read_dictionaries(&mut reader, &metadata, &mut dictionaries).await?;

let mut meta_buffer = vec![];
let mut block_buffer = vec![];
for block in 0..metadata.blocks.len() {
let chunk = read_batch(
&mut reader,
dictionaries.as_mut().unwrap(),
&metadata,
projection.as_ref().map(|x| x.0.as_ref()),
block,
Expand Down Expand Up @@ -143,28 +145,58 @@ where
reader.seek(SeekFrom::End(-10 - footer_size as i64)).await?;
reader.read_exact(&mut footer).await?;

let (mut metadata, dictionary_blocks) = deserialize_footer(&footer)?;
deserialize_footer(&footer)
}

metadata.dictionaries = if let Some(blocks) = dictionary_blocks {
read_dictionaries(
reader,
&metadata.schema.fields,
&metadata.ipc_schema,
blocks,
)
.await?
} else {
Default::default()
};
async fn read_batch<R>(
mut reader: R,
dictionaries: &mut Dictionaries,
metadata: &FileMetadata,
projection: Option<&[usize]>,
block: usize,
meta_buffer: &mut Vec<u8>,
block_buffer: &mut Vec<u8>,
) -> Result<Chunk<Arc<dyn Array>>>
where
R: AsyncRead + AsyncSeek + Unpin,
{
let block = metadata.blocks[block];
reader.seek(SeekFrom::Start(block.offset as u64)).await?;
let mut meta_buf = [0; 4];
reader.read_exact(&mut meta_buf).await?;
if meta_buf == CONTINUATION_MARKER {
reader.read_exact(&mut meta_buf).await?;
}
let meta_len = i32::from_le_bytes(meta_buf) as usize;
meta_buffer.clear();
meta_buffer.resize(meta_len, 0);
reader.read_exact(meta_buffer).await?;

let message = MessageRef::read_as_root(&meta_buffer[..])
.map_err(|err| ArrowError::oos(format!("unable to parse message: {:?}", err)))?;
let batch = get_serialized_batch(&message)?;
block_buffer.clear();
block_buffer.resize(message.body_length()? as usize, 0);
reader.read_exact(block_buffer).await?;
let mut cursor = std::io::Cursor::new(block_buffer);

Ok(metadata)
read_record_batch(
batch,
&metadata.schema.fields,
&metadata.ipc_schema,
projection,
dictionaries,
message.version()?,
&mut cursor,
0,
)
}

async fn read_dictionaries<R>(
mut reader: R,
fields: &[Field],
ipc_schema: &IpcSchema,
blocks: Vector<'_, BlockRef<'_>>,
blocks: &[Block],
) -> Result<Dictionaries>
where
R: AsyncRead + AsyncSeek + Unpin,
Expand All @@ -174,7 +206,8 @@ where
let mut buffer = vec![];

for block in blocks {
let offset = block.offset() as u64;
let offset = block.offset as u64;
let length = block.body_length as usize;
read_dictionary_message(&mut reader, offset, &mut data).await?;

let message = MessageRef::read_as_root(&data).map_err(|err| {
Expand All @@ -186,7 +219,7 @@ where
match header {
MessageHeaderRef::DictionaryBatch(batch) => {
buffer.clear();
buffer.resize(block.body_length() as usize, 0);
buffer.resize(length, 0);
reader.read_exact(&mut buffer).await?;
let mut cursor = std::io::Cursor::new(&mut buffer);
read_dictionary(batch, fields, ipc_schema, &mut dictionaries, &mut cursor, 0)?;
Expand Down Expand Up @@ -220,45 +253,26 @@ where
Ok(())
}

async fn read_batch<R>(
mut reader: R,
async fn cached_read_dictionaries<R: AsyncRead + AsyncSeek + Unpin>(
reader: &mut R,
metadata: &FileMetadata,
projection: Option<&[usize]>,
block: usize,
meta_buffer: &mut Vec<u8>,
block_buffer: &mut Vec<u8>,
) -> Result<Chunk<Arc<dyn Array>>>
where
R: AsyncRead + AsyncSeek + Unpin,
{
let block = metadata.blocks[block];
reader.seek(SeekFrom::Start(block.offset as u64)).await?;
let mut meta_buf = [0; 4];
reader.read_exact(&mut meta_buf).await?;
if meta_buf == CONTINUATION_MARKER {
reader.read_exact(&mut meta_buf).await?;
}
let meta_len = i32::from_le_bytes(meta_buf) as usize;
meta_buffer.clear();
meta_buffer.resize(meta_len, 0);
reader.read_exact(meta_buffer).await?;

let message = MessageRef::read_as_root(&meta_buffer[..])
.map_err(|err| ArrowError::oos(format!("unable to parse message: {:?}", err)))?;
let batch = get_serialized_batch(&message)?;
block_buffer.clear();
block_buffer.resize(message.body_length()? as usize, 0);
reader.read_exact(block_buffer).await?;
let mut cursor = std::io::Cursor::new(block_buffer);
let chunk = read_record_batch(
batch,
&metadata.schema.fields,
&metadata.ipc_schema,
projection,
&metadata.dictionaries,
message.version()?,
&mut cursor,
0,
)?;
Ok(chunk)
dictionaries: &mut Option<Dictionaries>,
) -> Result<()> {
match (&dictionaries, metadata.dictionaries.as_deref()) {
(None, Some(blocks)) => {
let new_dictionaries = read_dictionaries(
reader,
&metadata.schema.fields,
&metadata.ipc_schema,
blocks,
)
.await?;
*dictionaries = Some(new_dictionaries);
}
(None, None) => {
*dictionaries = Some(Default::default());
}
_ => {}
};
Ok(())
}
2 changes: 1 addition & 1 deletion src/io/ipc/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub mod stream_async;
#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc_read_async")))]
pub mod file_async;

pub use common::{read_dictionary, read_record_batch};
pub use common::{read_dictionary, read_record_batch, Metadata};
pub use reader::{read_file_metadata, FileMetadata, FileReader};
pub use schema::deserialize_schema;
pub use stream::{read_stream_metadata, StreamMetadata, StreamReader, StreamState};
Expand Down
Loading

0 comments on commit 48902a5

Please sign in to comment.