Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
IPC sink types and IPC file stream (#878)
Browse files Browse the repository at this point in the history
  • Loading branch information
Dexter Duckworth authored Mar 5, 2022
1 parent 39ff99c commit dda052f
Show file tree
Hide file tree
Showing 13 changed files with 862 additions and 87 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ io_csv_write = ["csv", "csv-core", "streaming-iterator", "lexical-core"]
io_json = ["serde", "serde_json", "streaming-iterator", "fallible-streaming-iterator", "indexmap", "lexical-core"]
io_ipc = ["arrow-format"]
io_ipc_write_async = ["io_ipc", "futures"]
io_ipc_read_async = ["io_ipc", "futures"]
io_ipc_read_async = ["io_ipc", "futures", "async-stream"]
io_ipc_compression = ["lz4", "zstd"]
io_flight = ["io_ipc", "arrow-format/flight-data"]
# base64 + io_ipc because arrow schemas are stored as base64-encoded ipc format.
Expand Down
275 changes: 275 additions & 0 deletions src/io/ipc/read/file_async.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,275 @@
//! Async reader for Arrow IPC files
use std::io::SeekFrom;
use std::sync::Arc;

use arrow_format::ipc::{
planus::{ReadAsRoot, Vector},
BlockRef, FooterRef, MessageHeaderRef, MessageRef,
};
use futures::{
stream::BoxStream, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, Stream, StreamExt,
};

use crate::array::*;
use crate::chunk::Chunk;
use crate::datatypes::{Field, Schema};
use crate::error::{ArrowError, Result};
use crate::io::ipc::{IpcSchema, ARROW_MAGIC, CONTINUATION_MARKER};

use super::common::{read_dictionary, read_record_batch};
use super::reader::get_serialized_batch;
use super::schema::fb_to_schema;
use super::Dictionaries;
use super::FileMetadata;

/// Async reader for Arrow IPC files
pub struct FileStream<'a> {
stream: BoxStream<'a, Result<Chunk<Arc<dyn Array>>>>,
metadata: FileMetadata,
schema: Schema,
}

impl<'a> FileStream<'a> {
/// Create a new IPC file reader.
///
/// # Examples
/// See [`FileSink`](crate::io::ipc::write::file_async::FileSink).
pub fn new<R>(reader: R, metadata: FileMetadata, projection: Option<Vec<usize>>) -> Self
where
R: AsyncRead + AsyncSeek + Unpin + Send + 'a,
{
let schema = if let Some(projection) = projection.as_ref() {
projection.windows(2).for_each(|x| {
assert!(
x[0] < x[1],
"IPC projection must be ordered and non-overlapping",
)
});
let fields = projection
.iter()
.map(|&x| metadata.schema.fields[x].clone())
.collect::<Vec<_>>();
Schema {
fields,
metadata: metadata.schema.metadata.clone(),
}
} else {
metadata.schema.clone()
};

let stream = Self::stream(reader, metadata.clone(), projection);
Self {
stream,
metadata,
schema,
}
}

/// Get the metadata from the IPC file.
pub fn metadata(&self) -> &FileMetadata {
&self.metadata
}

/// Get the projected schema from the IPC file.
pub fn schema(&self) -> &Schema {
&self.schema
}

fn stream<R>(
mut reader: R,
metadata: FileMetadata,
projection: Option<Vec<usize>>,
) -> BoxStream<'a, Result<Chunk<Arc<dyn Array>>>>
where
R: AsyncRead + AsyncSeek + Unpin + Send + 'a,
{
async_stream::try_stream! {
let mut meta_buffer = vec![];
let mut block_buffer = vec![];
for block in 0..metadata.blocks.len() {
let chunk = read_batch(
&mut reader,
&metadata,
projection.as_deref(),
block,
&mut meta_buffer,
&mut block_buffer,
).await?;
yield chunk;
}
}
.boxed()
}
}

impl<'a> Stream for FileStream<'a> {
type Item = Result<Chunk<Arc<dyn Array>>>;

fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.get_mut().stream.poll_next_unpin(cx)
}
}

/// Read the metadata from an IPC file.
pub async fn read_file_metadata_async<R>(mut reader: R) -> Result<FileMetadata>
where
R: AsyncRead + AsyncSeek + Unpin,
{
// Check header
let mut magic = [0; 6];
reader.read_exact(&mut magic).await?;
if magic != ARROW_MAGIC {
return Err(ArrowError::OutOfSpec(
"file does not contain correct Arrow header".to_string(),
));
}
// Check footer
reader.seek(SeekFrom::End(-6)).await?;
reader.read_exact(&mut magic).await?;
if magic != ARROW_MAGIC {
return Err(ArrowError::OutOfSpec(
"file does not contain correct Arrow footer".to_string(),
));
}
// Get footer size
let mut footer_size = [0; 4];
reader.seek(SeekFrom::End(-10)).await?;
reader.read_exact(&mut footer_size).await?;
let footer_size = i32::from_le_bytes(footer_size);
// Read footer
let mut footer = vec![0; footer_size as usize];
reader.seek(SeekFrom::End(-10 - footer_size as i64)).await?;
reader.read_exact(&mut footer).await?;
let footer = FooterRef::read_as_root(&footer[..])
.map_err(|err| ArrowError::OutOfSpec(format!("unable to get root as footer: {:?}", err)))?;

let blocks = footer.record_batches()?.ok_or_else(|| {
ArrowError::OutOfSpec("unable to get record batches from footer".to_string())
})?;
let schema = footer
.schema()?
.ok_or_else(|| ArrowError::OutOfSpec("unable to get schema from footer".to_string()))?;
let (schema, ipc_schema) = fb_to_schema(schema)?;
let dictionary_blocks = footer.dictionaries()?;
let dictionaries = if let Some(blocks) = dictionary_blocks {
read_dictionaries(reader, &schema.fields[..], &ipc_schema, blocks).await?
} else {
Default::default()
};

Ok(FileMetadata {
schema,
ipc_schema,
blocks: blocks
.iter()
.map(|block| Ok(block.try_into()?))
.collect::<Result<Vec<_>>>()?,
dictionaries,
})
}

async fn read_dictionaries<R>(
mut reader: R,
fields: &[Field],
ipc_schema: &IpcSchema,
blocks: Vector<'_, BlockRef<'_>>,
) -> Result<Dictionaries>
where
R: AsyncRead + AsyncSeek + Unpin,
{
let mut dictionaries = Default::default();
let mut data = vec![];
let mut buffer = vec![];

for block in blocks {
let offset = block.offset() as u64;
read_dictionary_message(&mut reader, offset, &mut data).await?;

let message = MessageRef::read_as_root(&data).map_err(|err| {
ArrowError::OutOfSpec(format!("unable to get root as message: {:?}", err))
})?;
let header = message
.header()?
.ok_or_else(|| ArrowError::oos("message must have a header"))?;
match header {
MessageHeaderRef::DictionaryBatch(batch) => {
buffer.clear();
buffer.resize(block.body_length() as usize, 0);
reader.read_exact(&mut buffer).await?;
let mut cursor = std::io::Cursor::new(&mut buffer);
read_dictionary(batch, fields, ipc_schema, &mut dictionaries, &mut cursor, 0)?;
}
other => {
return Err(ArrowError::OutOfSpec(format!(
"expected DictionaryBatch in dictionary blocks, found {:?}",
other,
)))
}
}
}
Ok(dictionaries)
}

async fn read_dictionary_message<R>(mut reader: R, offset: u64, data: &mut Vec<u8>) -> Result<()>
where
R: AsyncRead + AsyncSeek + Unpin,
{
let mut message_size = [0; 4];
reader.seek(SeekFrom::Start(offset)).await?;
reader.read_exact(&mut message_size).await?;
if message_size == CONTINUATION_MARKER {
reader.read_exact(&mut message_size).await?;
}
let footer_size = i32::from_le_bytes(message_size);
data.clear();
data.resize(footer_size as usize, 0);
reader.read_exact(data).await?;

Ok(())
}

async fn read_batch<R>(
mut reader: R,
metadata: &FileMetadata,
projection: Option<&[usize]>,
block: usize,
meta_buffer: &mut Vec<u8>,
block_buffer: &mut Vec<u8>,
) -> Result<Chunk<Arc<dyn Array>>>
where
R: AsyncRead + AsyncSeek + Unpin,
{
let block = metadata.blocks[block];
reader.seek(SeekFrom::Start(block.offset as u64)).await?;
let mut meta_buf = [0; 4];
reader.read_exact(&mut meta_buf).await?;
if meta_buf == CONTINUATION_MARKER {
reader.read_exact(&mut meta_buf).await?;
}
let meta_len = i32::from_le_bytes(meta_buf) as usize;
meta_buffer.clear();
meta_buffer.resize(meta_len, 0);
reader.read_exact(meta_buffer).await?;

let message = MessageRef::read_as_root(&meta_buffer[..])
.map_err(|err| ArrowError::oos(format!("unable to parse message: {:?}", err)))?;
let batch = get_serialized_batch(&message)?;
block_buffer.clear();
block_buffer.resize(message.body_length()? as usize, 0);
reader.read_exact(block_buffer).await?;
let mut cursor = std::io::Cursor::new(block_buffer);
let chunk = read_record_batch(
batch,
&metadata.schema.fields,
&metadata.ipc_schema,
projection,
&metadata.dictionaries,
message.version()?,
&mut cursor,
0,
)?;
Ok(chunk)
}
4 changes: 4 additions & 0 deletions src/io/ipc/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ mod stream;
#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc_read_async")))]
pub mod stream_async;

#[cfg(feature = "io_ipc_read_async")]
#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc_read_async")))]
pub mod file_async;

pub use common::{read_dictionary, read_record_batch};
pub use reader::{read_file_metadata, FileMetadata, FileReader};
pub use schema::deserialize_schema;
Expand Down
6 changes: 3 additions & 3 deletions src/io/ipc/read/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ pub struct FileMetadata {
/// The blocks in the file
///
/// A block indicates the regions in the file to read to get data
blocks: Vec<arrow_format::ipc::Block>,
pub(super) blocks: Vec<arrow_format::ipc::Block>,

/// Dictionaries associated to each dict_id
dictionaries: Dictionaries,
pub(super) dictionaries: Dictionaries,
}

/// Arrow File reader
Expand Down Expand Up @@ -166,7 +166,7 @@ pub fn read_file_metadata<R: Read + Seek>(reader: &mut R) -> Result<FileMetadata
})
}

fn get_serialized_batch<'a>(
pub(super) fn get_serialized_batch<'a>(
message: &'a arrow_format::ipc::MessageRef,
) -> Result<arrow_format::ipc::RecordBatchRef<'a>> {
let header = message.header()?.ok_or_else(|| {
Expand Down
53 changes: 53 additions & 0 deletions src/io/ipc/write/common.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::borrow::{Borrow, Cow};
use std::sync::Arc;

use arrow_format::ipc::planus::Builder;
Expand Down Expand Up @@ -379,3 +380,55 @@ pub struct EncodedData {
pub(crate) fn pad_to_8(len: usize) -> usize {
(((len + 7) & !7) - len) as usize
}

/// An array [`Chunk`] with optional accompanying IPC fields.
#[derive(Debug, Clone, PartialEq)]
pub struct Record<'a> {
columns: Cow<'a, Chunk<Arc<dyn Array>>>,
fields: Option<Cow<'a, [IpcField]>>,
}

impl<'a> Record<'a> {
/// Get the IPC fields for this record.
pub fn fields(&self) -> Option<&[IpcField]> {
self.fields.as_deref()
}

/// Get the Arrow columns in this record.
pub fn columns(&self) -> &Chunk<Arc<dyn Array>> {
self.columns.borrow()
}
}

impl From<Chunk<Arc<dyn Array>>> for Record<'static> {
fn from(columns: Chunk<Arc<dyn Array>>) -> Self {
Self {
columns: Cow::Owned(columns),
fields: None,
}
}
}

impl<'a, F> From<(Chunk<Arc<dyn Array>>, Option<F>)> for Record<'a>
where
F: Into<Cow<'a, [IpcField]>>,
{
fn from((columns, fields): (Chunk<Arc<dyn Array>>, Option<F>)) -> Self {
Self {
columns: Cow::Owned(columns),
fields: fields.map(|f| f.into()),
}
}
}

impl<'a, F> From<(&'a Chunk<Arc<dyn Array>>, Option<F>)> for Record<'a>
where
F: Into<Cow<'a, [IpcField]>>,
{
fn from((columns, fields): (&'a Chunk<Arc<dyn Array>>, Option<F>)) -> Self {
Self {
columns: Cow::Borrowed(columns),
fields: fields.map(|f| f.into()),
}
}
}
Loading

0 comments on commit dda052f

Please sign in to comment.