Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added more IPC documentation (#534)
Browse files Browse the repository at this point in the history
  • Loading branch information
HagaiHargil authored Oct 17, 2021
1 parent ee68d18 commit 60ce40c
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 2 deletions.
77 changes: 77 additions & 0 deletions src/io/ipc/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,81 @@
//! APIs to read from and write to Arrow's IPC format.
//!
//! Inter-process communication is a method through which different processes
//! share and pass data between them. Its use-cases include parallel
//! processing of chunks of data across different CPU cores, transferring
//! data between different Apache Arrow implementations in other languages and
//! more. Under the hood Apache Arrow uses [FlatBuffers](https://google.github.io/flatbuffers/)
//! as its binary protocol, so every Arrow-centered streaming or serialiation
//! problem that could be solved using FlatBuffers could probably be solved
//! using the more integrated approach that is exposed in this module.
//!
//! [Arrow's IPC protocol](https://arrow.apache.org/docs/format/Columnar.html#serialization-and-interprocess-communication-ipc)
//! allows only [`RecordBatch`](crate::record_batch::RecordBatch)es or
//! [`DictionaryBatch`](gen::Message::DictionaryBatch) to be passed
//! around due to its reliance on a pre-defined data scheme. This limitation
//! provides a large performance gain because serialized data will always have a
//! known structutre, i.e. the same fields and datatypes, with the only variance
//! being the number of rows and the actual data inside the Batch. This dramatically
//! increases the deserialization rate, as the bytes in the file or stream are already
//! structured "correctly".
//!
//! Reading and writing IPC messages is done using one of two variants - either
//! [`FileReader`](read::FileReader) <-> [`FileWriter`](struct@write::FileWriter) or
//! [`StreamReader`](read::StreamReader) <-> [`StreamWriter`](struct@write::StreamWriter).
//! These two variants wrap a type `T` that implements [`Read`](std::io::Read), and in
//! the case of the `File` variant it also implements [`Seek`](std::io::Seek). In
//! practice it means that `File`s can be arbitrarily accessed while `Stream`s are only
//! read in certain order - the one they were written in (first in, first out).
//!
//! # Examples
//! Read and write to a file:
//! ```
//! use arrow2::io::ipc::{{read::{FileReader, read_file_metadata}}, {write::FileWriter}};
//! # use std::fs::File;
//! # use std::sync::Arc;
//! # use arrow2::datatypes::{Field, Schema, DataType};
//! # use arrow2::array::Int32Array;
//! # use arrow2::record_batch::RecordBatch;
//! # use arrow2::error::ArrowError;
//! // Setup the writer
//! let path = "example.arrow".to_string();
//! let mut file = File::create(&path)?;
//! let x_coord = Field::new("x", DataType::Int32, false);
//! let y_coord = Field::new("y", DataType::Int32, false);
//! let schema = Schema::new(vec![x_coord, y_coord]);
//! let mut writer = FileWriter::try_new(file, &schema)?;
//!
//! // Setup the data
//! let x_data = Int32Array::from_slice([-1i32, 1]);
//! let y_data = Int32Array::from_slice([1i32, -1]);
//! let batch = RecordBatch::try_new(
//! Arc::new(schema),
//! vec![Arc::new(x_data), Arc::new(y_data)]
//! )?;
//!
//! // Write the messages and finalize the stream
//! for _ in 0..5 {
//! writer.write(&batch);
//! }
//! writer.finish();
//!
//! // Fetch some of the data and get the reader back
//! let mut reader = File::open(&path)?;
//! let metadata = read_file_metadata(&mut reader)?;
//! let mut filereader = FileReader::new(reader, metadata, None);
//! let row1 = filereader.next().unwrap(); // [[-1, 1], [1, -1]]
//! let row2 = filereader.next().unwrap(); // [[-1, 1], [1, -1]]
//! let mut reader = filereader.into_inner();
//! // Do more stuff with the reader, like seeking ahead.
//! # Ok::<(), ArrowError>(())
//! ```
//!
//! For further information and examples please consult the
//! [user guide](https://jorgecarleitao.github.io/arrow2/io/index.html).
//! For even more examples check the `examples` folder in the main repository
//! ([1](https://github.com/jorgecarleitao/arrow2/blob/main/examples/ipc_file_read.rs),
//! [2](https://github.com/jorgecarleitao/arrow2/blob/main/examples/ipc_file_write.rs),
//! [3](https://github.com/jorgecarleitao/arrow2/tree/main/examples/ipc_pyarrow)).
#![allow(missing_debug_implementations)]
#![allow(non_camel_case_types)]
Expand Down
6 changes: 6 additions & 0 deletions src/io/ipc/read/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
//! APIs to read Arrow's IPC format.
//!
//! The two important structs here are the [`FileReader`](reader::FileReader),
//! which provides arbitrary access to any of its messages, and the
//! [`StreamReader`](stream::StreamReader), which only supports reading
//! data in the order it was written in.
mod array;
mod common;
mod deserialize;
Expand Down
26 changes: 24 additions & 2 deletions src/io/ipc/read/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,28 @@ pub fn read_stream_metadata<R: Read>(reader: &mut R) -> Result<StreamMetadata> {
})
}

/// Encodes the stream's status after each read.
///
/// A stream is an iterator, and an iterator returns `Option<Item>`. The `Item`
/// type in the [`StreamReader`] case is `StreamState`, which means that an Arrow
/// stream may yield one of three values: (1) `None`, which signals that the stream
/// is done; (2) `Some(StreamState::Some(RecordBatch))`, which signals that there was
/// data waiting in the stream and we read it; and finally (3)
/// `Some(StreamState::Waiting)`, which means that the stream is still "live", it
/// just doesn't hold any data right now.
pub enum StreamState {
/// A live stream without data
Waiting,
/// Next item in the stream
Some(RecordBatch),
}

impl StreamState {
/// Return the data inside this wrapper.
///
/// # Panics
///
/// If the `StreamState` was `Waiting`.
pub fn unwrap(self) -> RecordBatch {
if let StreamState::Some(batch) = self {
batch
Expand All @@ -91,7 +107,8 @@ impl StreamState {
}
}

/// Reads the next item
/// Reads the next item, yielding `None` if the stream is done,
/// and a [`StreamState`] otherwise.
pub fn read_next<R: Read>(
reader: &mut R,
metadata: &StreamMetadata,
Expand Down Expand Up @@ -191,7 +208,12 @@ pub fn read_next<R: Read>(
}
}

/// Arrow Stream reader
/// Arrow Stream reader.
///
/// An [`Iterator`] over an Arrow stream that yields a result of [`StreamState`]s.
/// This is the recommended way to read an arrow stream (by iterating over its data).
///
/// For a more thorough walkthrough consult [this example](https://github.com/jorgecarleitao/arrow2/tree/main/examples/ipc_pyarrow).
pub struct StreamReader<R: Read> {
reader: R,
metadata: StreamMetadata,
Expand Down
6 changes: 6 additions & 0 deletions src/io/ipc/write/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ use crate::datatypes::*;
use crate::error::{ArrowError, Result};
use crate::record_batch::RecordBatch;

/// Arrow stream writer
///
/// The data written by this writer must be read in order. To signal that no more
/// data is arriving through the stream call [`self.finish()`](StreamWriter::finish);
///
/// For a usage walkthrough consult [this example](https://github.com/jorgecarleitao/arrow2/tree/main/examples/ipc_pyarrow).
pub struct StreamWriter<W: Write> {
/// The object to write to
writer: BufWriter<W>,
Expand Down
1 change: 1 addition & 0 deletions src/io/ipc/write/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use crate::datatypes::*;
use crate::error::{ArrowError, Result};
use crate::record_batch::RecordBatch;

/// Arrow file writer
pub struct FileWriter<W: Write> {
/// The object to write to
writer: W,
Expand Down

0 comments on commit 60ce40c

Please sign in to comment.