Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added documentation for async IPC file reader.
Browse files Browse the repository at this point in the history
  • Loading branch information
Dexter Duckworth committed Mar 2, 2022
1 parent f5b02ed commit 106bfd6
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 60 deletions.
127 changes: 71 additions & 56 deletions src/io/ipc/read/file_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,25 @@
use std::io::SeekFrom;
use std::sync::Arc;

use arrow_format::ipc::BlockRef;
use arrow_format::ipc::FooterRef;
use arrow_format::ipc::MessageHeaderRef;
use arrow_format::ipc::MessageRef;
use arrow_format::ipc::planus::ReadAsRoot;
use arrow_format::ipc::planus::Vector;
use futures::AsyncSeek;
use futures::AsyncSeekExt;
use futures::AsyncRead;
use futures::AsyncReadExt;
use futures::Stream;
use futures::stream::BoxStream;
use futures::StreamExt;
use arrow_format::ipc::{
planus::{ReadAsRoot, Vector},
BlockRef, FooterRef, MessageHeaderRef, MessageRef,
};
use futures::{
stream::BoxStream, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, Stream, StreamExt,
};

use crate::array::*;
use crate::chunk::Chunk;
use crate::datatypes::{Field, Schema};
use crate::error::{ArrowError, Result};
use crate::io::ipc::ARROW_MAGIC;
use crate::datatypes::Schema;
use crate::io::ipc::IpcSchema;
use crate::datatypes::Field;
use crate::io::ipc::{IpcSchema, ARROW_MAGIC, CONTINUATION_MARKER};

use super::super::CONTINUATION_MARKER;
use super::FileMetadata;
use super::common::{read_dictionary, read_record_batch};
use super::reader::get_serialized_batch;
use super::schema::deserialize_stream_metadata;
use super::Dictionaries;
use super::schema::fb_to_schema;
use super::Dictionaries;
use super::FileMetadata;

/// Async reader for Arrow IPC files
pub struct FileStream<'a> {
Expand All @@ -41,14 +31,20 @@ pub struct FileStream<'a> {

impl<'a> FileStream<'a> {
/// Create a new IPC file reader.
///
/// # Examples
/// See [`FileSink`](crate::io::ipc::write::file_async::FileSink).
pub fn new<R>(reader: R, metadata: FileMetadata, projection: Option<Vec<usize>>) -> Self
where R: AsyncRead + AsyncSeek + Unpin + Send + 'a
where
R: AsyncRead + AsyncSeek + Unpin + Send + 'a,
{
let schema = if let Some(projection) = projection.as_ref() {
projection.windows(2).for_each(|x| assert!(
x[0] < x[1],
"IPC projection must be ordered and non-overlapping",
));
projection.windows(2).for_each(|x| {
assert!(
x[0] < x[1],
"IPC projection must be ordered and non-overlapping",
)
});
let fields = projection
.iter()
.map(|&x| metadata.schema.fields[x].clone())
Expand Down Expand Up @@ -79,8 +75,13 @@ impl<'a> FileStream<'a> {
&self.schema
}

fn stream<R>(mut reader: R, metadata: FileMetadata, projection: Option<Vec<usize>>) -> BoxStream<'a, Result<Chunk<Arc<dyn Array>>>>
where R: AsyncRead + AsyncSeek + Unpin + Send + 'a
fn stream<R>(
mut reader: R,
metadata: FileMetadata,
projection: Option<Vec<usize>>,
) -> BoxStream<'a, Result<Chunk<Arc<dyn Array>>>>
where
R: AsyncRead + AsyncSeek + Unpin + Send + 'a,
{
async_stream::try_stream! {
let mut meta_buffer = vec![];
Expand All @@ -96,33 +97,42 @@ impl<'a> FileStream<'a> {
).await?;
yield chunk;
}
}.boxed()
}
.boxed()
}
}

impl<'a> Stream for FileStream<'a> {
type Item = Result<Chunk<Arc<dyn Array>>>;

fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Option<Self::Item>> {
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.get_mut().stream.poll_next_unpin(cx)
}
}

/// Read the metadata from an IPC file.
pub async fn read_file_metadata_async<R>(reader: &mut R) -> Result<FileMetadata>
where R: AsyncRead + AsyncSeek + Unpin
where
R: AsyncRead + AsyncSeek + Unpin,
{
// Check header
let mut magic = [0; 6];
reader.read_exact(&mut magic).await?;
if magic != ARROW_MAGIC {
return Err(ArrowError::OutOfSpec("file does not contain correct Arrow header".to_string()));
return Err(ArrowError::OutOfSpec(
"file does not contain correct Arrow header".to_string(),
));
}
// Check footer
reader.seek(SeekFrom::End(-6)).await?;
reader.read_exact(&mut magic).await?;
if magic != ARROW_MAGIC {
return Err(ArrowError::OutOfSpec("file does not contain correct Arrow footer".to_string()));
return Err(ArrowError::OutOfSpec(
"file does not contain correct Arrow footer".to_string(),
));
}
// Get footer size
let mut footer_size = [0; 4];
Expand All @@ -136,9 +146,11 @@ pub async fn read_file_metadata_async<R>(reader: &mut R) -> Result<FileMetadata>
let footer = FooterRef::read_as_root(&footer[..])
.map_err(|err| ArrowError::OutOfSpec(format!("unable to get root as footer: {:?}", err)))?;

let blocks = footer.record_batches()?
.ok_or_else(|| ArrowError::OutOfSpec("unable to get record batches from footer".to_string()))?;
let schema = footer.schema()?
let blocks = footer.record_batches()?.ok_or_else(|| {
ArrowError::OutOfSpec("unable to get record batches from footer".to_string())
})?;
let schema = footer
.schema()?
.ok_or_else(|| ArrowError::OutOfSpec("unable to get schema from footer".to_string()))?;
let (schema, ipc_schema) = fb_to_schema(schema)?;
let dictionary_blocks = footer.dictionaries()?;
Expand All @@ -151,7 +163,10 @@ pub async fn read_file_metadata_async<R>(reader: &mut R) -> Result<FileMetadata>
Ok(FileMetadata {
schema,
ipc_schema,
blocks: blocks.iter().map(|block| Ok(block.try_into()?)).collect::<Result<Vec<_>>>()?,
blocks: blocks
.iter()
.map(|block| Ok(block.try_into()?))
.collect::<Result<Vec<_>>>()?,
dictionaries,
})
}
Expand All @@ -162,7 +177,8 @@ async fn read_dictionaries<R>(
ipc_schema: &IpcSchema,
blocks: Vector<'_, BlockRef<'_>>,
) -> Result<Dictionaries>
where R: AsyncRead + AsyncSeek + Unpin,
where
R: AsyncRead + AsyncSeek + Unpin,
{
let mut dictionaries = Default::default();
let mut data = vec![];
Expand All @@ -172,36 +188,34 @@ async fn read_dictionaries<R>(
let offset = block.offset() as u64;
read_dictionary_message(reader, offset, &mut data).await?;

let message = MessageRef::read_as_root(&data)
.map_err(|err| ArrowError::OutOfSpec(format!("unable to get root as message: {:?}", err)))?;
let header = message.header()?
let message = MessageRef::read_as_root(&data).map_err(|err| {
ArrowError::OutOfSpec(format!("unable to get root as message: {:?}", err))
})?;
let header = message
.header()?
.ok_or_else(|| ArrowError::oos("message must have a header"))?;
match header {
MessageHeaderRef::DictionaryBatch(batch) => {
buffer.clear();
buffer.resize(block.body_length() as usize, 0);
reader.read_exact(&mut buffer).await?;
let mut cursor = std::io::Cursor::new(&mut buffer);
read_dictionary(
batch,
fields,
ipc_schema,
&mut dictionaries,
&mut cursor,
0,
)?;
},
other => return Err(ArrowError::OutOfSpec(format!(
"expected DictionaryBatch in dictionary blocks, found {:?}",
other,
))),
read_dictionary(batch, fields, ipc_schema, &mut dictionaries, &mut cursor, 0)?;
}
other => {
return Err(ArrowError::OutOfSpec(format!(
"expected DictionaryBatch in dictionary blocks, found {:?}",
other,
)))
}
}
}
Ok(dictionaries)
}

async fn read_dictionary_message<R>(reader: &mut R, offset: u64, data: &mut Vec<u8>) -> Result<()>
where R: AsyncRead + AsyncSeek + Unpin,
where
R: AsyncRead + AsyncSeek + Unpin,
{
let mut message_size = [0; 4];
reader.seek(SeekFrom::Start(offset)).await?;
Expand All @@ -225,7 +239,8 @@ async fn read_batch<R>(
meta_buffer: &mut Vec<u8>,
block_buffer: &mut Vec<u8>,
) -> Result<Chunk<Arc<dyn Array>>>
where R: AsyncRead + AsyncSeek + Unpin,
where
R: AsyncRead + AsyncSeek + Unpin,
{
let block = metadata.blocks[block];
reader.seek(SeekFrom::Start(block.offset as u64)).await?;
Expand Down
18 changes: 14 additions & 4 deletions src/io/ipc/write/file_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,30 +27,39 @@ type WriteOutput<W> = (usize, Option<Block>, Vec<Block>, Option<W>);
///
/// ```
/// use std::sync::Arc;
/// use futures::SinkExt;
/// use futures::{SinkExt, TryStreamExt, io::Cursor};
/// use arrow2::array::{Array, Int32Array};
/// use arrow2::datatypes::{DataType, Field, Schema};
/// use arrow2::chunk::Chunk;
/// # use arrow2::io::ipc::write::file_async::FileSink;
/// use arrow2::io::ipc::write::file_async::FileSink;
/// use arrow2::io::ipc::read::file_async::{read_file_metadata_async, FileStream};
/// # futures::executor::block_on(async move {
/// let schema = Schema::from(vec![
/// Field::new("values", DataType::Int32, true),
/// ]);
///
/// let mut buffer = vec![];
/// let mut buffer = Cursor::new(vec![]);
/// let mut sink = FileSink::new(
/// &mut buffer,
/// schema,
/// None,
/// Default::default(),
/// );
///
/// // Write chunks to file
/// for i in 0..3 {
/// let values = Int32Array::from(&[Some(i), None]);
/// let chunk = Chunk::new(vec![Arc::new(values) as Arc<dyn Array>]);
/// sink.feed(chunk).await?;
/// }
/// sink.close().await?;
/// drop(sink);
///
/// // Read chunks from file
/// buffer.set_position(0);
/// let metadata = read_file_metadata_async(&mut buffer).await?;
/// let mut stream = FileStream::new(buffer, metadata, None);
/// let chunks = stream.try_collect::<Vec<_>>().await?;
/// # arrow2::error::Result::Ok(())
/// # }).unwrap();
/// ```
Expand Down Expand Up @@ -271,7 +280,7 @@ mod tests {
]);

let mut buffer = Cursor::new(Vec::new());
let mut sink = FileSink::new(&mut buffer, schema, None, Default::default());
let mut sink = FileSink::new(&mut buffer, schema.clone(), None, Default::default());
for chunk in &data {
sink.feed(chunk.clone()).await.unwrap();
}
Expand All @@ -280,6 +289,7 @@ mod tests {

buffer.set_position(0);
let metadata = read_file_metadata_async(&mut buffer).await.unwrap();
assert_eq!(schema, metadata.schema);
let stream = FileStream::new(buffer, metadata, None);
let out = stream.try_collect::<Vec<_>>().await.unwrap();
for i in 0..5 {
Expand Down

0 comments on commit 106bfd6

Please sign in to comment.