Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Added more IPC documentation #534

Merged
merged 8 commits into from
Oct 17, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions src/io/ipc/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,80 @@
//! APIs to read from and write to Arrow's IPC format.
//!
//! Inter-process communication is a method through which different processes
//! share and pass data between them. Its use-cases include parallel
//! processing of chunks of data across different CPU cores, transferring
//! data between different Apache Arrow implementations in other languages and
//! more. Under the hood Apache Arrow uses [FlatBuffers](https://google.github.io/flatbuffers/)
//! as its binary protocol, so every Arrow-centered streaming or serialiation
//! problem that could be solved using FlatBuffers could probably be solved
//! using the more integrated approach that is exposed in this module.
//!
//! [Arrow's IPC protocol](https://arrow.apache.org/docs/format/Columnar.html#serialization-and-interprocess-communication-ipc)
//! allows only [`RecordBatch`](crate::record_batch::RecordBatch)es or
//! [`DictionaryBatch`](gen::Message::DictionaryBatch) to be passed
//! around due to its reliance on a pre-defined data scheme. This limitation
//! provides a large performance gain because serialized data will always have a
//! known structutre, i.e. the same fields and datatypes, with the only variance
//! being the number of rows and the actual data inside the Batch. This dramatically
//! increases the deserialization rate, as the bytes in the file or stream are already
//! structured "correctly".
//!
//! Reading and writing IPC messages is done using one of two variants - either
//! [`FileReader`](read::FileReader) <-> [`FileWriter`](struct@write::FileWriter) or
//! [`StreamReader`](read::StreamReader) <-> [`StreamWriter`](struct@write::StreamWriter).
//! These two variants wrap a type `T` that implements [`Read`](std::io::Read), and in
//! the case of the `File` variant it also implements [`Seek`](std::io::Seek). In
//! practice it means that `File`s can be arbitrarily accessed while `Stream`s are only
//! read in certain order - the one they were written in (first in, first out).
//!
//! # Examples
//! Read and write to a file:
//! ```
//! use arrow2::io::ipc::{{read::{FileReader, read_file_metadata}}, {write::FileWriter}};
//! # use std::fs::File;
//! # use std::sync::Arc;
//! # use arrow2::datatypes::{Field, Schema, DataType};
//! # use arrow2::array::Int32Array;
//! # use arrow2::record_batch::RecordBatch;
//! // Setup the writer
//! let path = "example.dat".to_string();
HagaiHargil marked this conversation as resolved.
Show resolved Hide resolved
//! let mut file = File::create(&path).unwrap();
HagaiHargil marked this conversation as resolved.
Show resolved Hide resolved
//! let x_coord = Field::new("x", DataType::Int32, false);
//! let y_coord = Field::new("y", DataType::Int32, false);
//! let schema = Schema::new(vec![x_coord, y_coord]);
//! let mut writer = FileWriter::try_new(file, &schema).unwrap();
//!
//! // Setup the data
//! let x_data = Int32Array::from_slice([-1i32, 1]);
//! let y_data = Int32Array::from_slice([1i32, -1]);
//! let batch = RecordBatch::try_new(
//! Arc::new(schema),
//! vec![Arc::new(x_data), Arc::new(y_data)]
//! ).unwrap();
//!
//! // Write the messages and finalize the stream
//! for _ in 0..5 {
//! writer.write(&batch);
//! }
//! writer.finish();
//!
//! // Fetch some of the data and get the reader back
//! let mut reader = File::open(&path).unwrap();
//! let metadata = read_file_metadata(&mut reader).unwrap();
//! let mut filereader = FileReader::new(reader, metadata, None);
//! let row1 = filereader.next().unwrap(); // [[-1, 1], [1, -1]]
//! let row2 = filereader.next().unwrap(); // [[-1, 1], [1, -1]]
//! let mut reader = filereader.into_inner();
//! // Do more stuff with the reader, like seeking ahead.
//!
//! ```
//!
//! For further information and examples please consult the
//! [user guide](https://jorgecarleitao.github.io/arrow2/io/index.html).
//! For even more examples check the `examples` folder in the main repository
//! ([1](https://github.com/jorgecarleitao/arrow2/blob/main/examples/ipc_file_read.rs),
//! [2](https://github.com/jorgecarleitao/arrow2/blob/main/examples/ipc_file_write.rs),
//! [3](https://github.com/jorgecarleitao/arrow2/tree/main/examples/ipc_pyarrow)).

#![allow(missing_debug_implementations)]
#![allow(non_camel_case_types)]
Expand Down
6 changes: 6 additions & 0 deletions src/io/ipc/read/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
//! APIs to read Arrow's IPC format.
//!
//! The two important structs here are the [`FileReader`](reader::FileReader),
//! which provides arbitrary access to any of its messages, and the
//! [`StreamReader`](stream::StreamReader), which only supports reading
//! data in the order it was written in.

mod array;
mod common;
mod deserialize;
Expand Down
29 changes: 27 additions & 2 deletions src/io/ipc/read/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,28 @@ pub fn read_stream_metadata<R: Read>(reader: &mut R) -> Result<StreamMetadata> {
})
}

/// Encodes the stream's status after each read.
///
/// A stream is an iterator, and an iterator returns `Option<Item>`. The `Item`
/// type in the [`StreamReader`] case is `StreamState`, which means that an Arrow
/// stream may yield one of three values: (1) `None`, which signals that the stream
/// is done; (2) `Some(StreamState::Some(RecordBatch))`, which signals that there was
/// data waiting in the stream and we read it; and finally (3)
/// `Some(StreamState::Waiting)`, which means that the stream is still "live", it
/// just doesn't hold any data right now.
pub enum StreamState {
/// A live stream without data
Waiting,
/// Next item in the stream
Some(RecordBatch),
}

impl StreamState {
/// Return the data inside this wrapper.
///
/// # Panics
///
/// If the `StreamState` was `Waiting`.
pub fn unwrap(self) -> RecordBatch {
if let StreamState::Some(batch) = self {
batch
Expand All @@ -91,7 +107,8 @@ impl StreamState {
}
}

/// Reads the next item
/// Reads the next item, yielding `None` if the stream is done,
/// and a `StreamState` otherwise.
HagaiHargil marked this conversation as resolved.
Show resolved Hide resolved
pub fn read_next<R: Read>(
reader: &mut R,
metadata: &StreamMetadata,
Expand Down Expand Up @@ -191,7 +208,15 @@ pub fn read_next<R: Read>(
}
}

/// Arrow Stream reader
/// Arrow Stream reader.
///
/// Once a stream is created the recommended way to interact with it is by
/// iterating over its data. Each such iteration yields an [`Option<StreamState>`](StreamState),
/// signaling that the stream might not currently have any data, even though it's still a "live"
/// stream.
HagaiHargil marked this conversation as resolved.
Show resolved Hide resolved
///
/// For a full usage examples consult [this](https://github.com/jorgecarleitao/arrow2/tree/main/examples/ipc_pyarrow)
/// example in the main repository.
HagaiHargil marked this conversation as resolved.
Show resolved Hide resolved
pub struct StreamReader<R: Read> {
reader: R,
metadata: StreamMetadata,
Expand Down
4 changes: 4 additions & 0 deletions src/io/ipc/write/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ use crate::datatypes::*;
use crate::error::{ArrowError, Result};
use crate::record_batch::RecordBatch;

/// Arrow stream writer
///
/// The data written by this writer must be read in order. To signal that no more
/// data is arriving through the stream call [`self.finish()`](StreamWriter::finish);
pub struct StreamWriter<W: Write> {
/// The object to write to
writer: BufWriter<W>,
Expand Down
1 change: 1 addition & 0 deletions src/io/ipc/write/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use crate::datatypes::*;
use crate::error::{ArrowError, Result};
use crate::record_batch::RecordBatch;

/// Arrow file writer
pub struct FileWriter<W: Write> {
/// The object to write to
writer: W,
Expand Down