diff --git a/src/io/ipc/mod.rs b/src/io/ipc/mod.rs index f01ae010340..1e6e4d603fc 100644 --- a/src/io/ipc/mod.rs +++ b/src/io/ipc/mod.rs @@ -1,4 +1,81 @@ //! APIs to read from and write to Arrow's IPC format. +//! +//! Inter-process communication is a method through which different processes +//! share and pass data between them. Its use-cases include parallel +//! processing of chunks of data across different CPU cores, transferring +//! data between different Apache Arrow implementations in other languages and +//! more. Under the hood Apache Arrow uses [FlatBuffers](https://google.github.io/flatbuffers/) +//! as its binary protocol, so every Arrow-centered streaming or serialiation +//! problem that could be solved using FlatBuffers could probably be solved +//! using the more integrated approach that is exposed in this module. +//! +//! [Arrow's IPC protocol](https://arrow.apache.org/docs/format/Columnar.html#serialization-and-interprocess-communication-ipc) +//! allows only [`RecordBatch`](crate::record_batch::RecordBatch)es or +//! [`DictionaryBatch`](gen::Message::DictionaryBatch) to be passed +//! around due to its reliance on a pre-defined data scheme. This limitation +//! provides a large performance gain because serialized data will always have a +//! known structutre, i.e. the same fields and datatypes, with the only variance +//! being the number of rows and the actual data inside the Batch. This dramatically +//! increases the deserialization rate, as the bytes in the file or stream are already +//! structured "correctly". +//! +//! Reading and writing IPC messages is done using one of two variants - either +//! [`FileReader`](read::FileReader) <-> [`FileWriter`](struct@write::FileWriter) or +//! [`StreamReader`](read::StreamReader) <-> [`StreamWriter`](struct@write::StreamWriter). +//! These two variants wrap a type `T` that implements [`Read`](std::io::Read), and in +//! the case of the `File` variant it also implements [`Seek`](std::io::Seek). In +//! practice it means that `File`s can be arbitrarily accessed while `Stream`s are only +//! read in certain order - the one they were written in (first in, first out). +//! +//! # Examples +//! Read and write to a file: +//! ``` +//! use arrow2::io::ipc::{{read::{FileReader, read_file_metadata}}, {write::FileWriter}}; +//! # use std::fs::File; +//! # use std::sync::Arc; +//! # use arrow2::datatypes::{Field, Schema, DataType}; +//! # use arrow2::array::Int32Array; +//! # use arrow2::record_batch::RecordBatch; +//! # use arrow2::error::ArrowError; +//! // Setup the writer +//! let path = "example.arrow".to_string(); +//! let mut file = File::create(&path)?; +//! let x_coord = Field::new("x", DataType::Int32, false); +//! let y_coord = Field::new("y", DataType::Int32, false); +//! let schema = Schema::new(vec![x_coord, y_coord]); +//! let mut writer = FileWriter::try_new(file, &schema)?; +//! +//! // Setup the data +//! let x_data = Int32Array::from_slice([-1i32, 1]); +//! let y_data = Int32Array::from_slice([1i32, -1]); +//! let batch = RecordBatch::try_new( +//! Arc::new(schema), +//! vec![Arc::new(x_data), Arc::new(y_data)] +//! )?; +//! +//! // Write the messages and finalize the stream +//! for _ in 0..5 { +//! writer.write(&batch); +//! } +//! writer.finish(); +//! +//! // Fetch some of the data and get the reader back +//! let mut reader = File::open(&path)?; +//! let metadata = read_file_metadata(&mut reader)?; +//! let mut filereader = FileReader::new(reader, metadata, None); +//! let row1 = filereader.next().unwrap(); // [[-1, 1], [1, -1]] +//! let row2 = filereader.next().unwrap(); // [[-1, 1], [1, -1]] +//! let mut reader = filereader.into_inner(); +//! // Do more stuff with the reader, like seeking ahead. +//! # Ok::<(), ArrowError>(()) +//! ``` +//! +//! For further information and examples please consult the +//! [user guide](https://jorgecarleitao.github.io/arrow2/io/index.html). +//! For even more examples check the `examples` folder in the main repository +//! ([1](https://github.com/jorgecarleitao/arrow2/blob/main/examples/ipc_file_read.rs), +//! [2](https://github.com/jorgecarleitao/arrow2/blob/main/examples/ipc_file_write.rs), +//! [3](https://github.com/jorgecarleitao/arrow2/tree/main/examples/ipc_pyarrow)). #![allow(missing_debug_implementations)] #![allow(non_camel_case_types)] diff --git a/src/io/ipc/read/mod.rs b/src/io/ipc/read/mod.rs index aec51e1a8fd..adc2b41e477 100644 --- a/src/io/ipc/read/mod.rs +++ b/src/io/ipc/read/mod.rs @@ -1,4 +1,10 @@ //! APIs to read Arrow's IPC format. +//! +//! The two important structs here are the [`FileReader`](reader::FileReader), +//! which provides arbitrary access to any of its messages, and the +//! [`StreamReader`](stream::StreamReader), which only supports reading +//! data in the order it was written in. + mod array; mod common; mod deserialize; diff --git a/src/io/ipc/read/stream.rs b/src/io/ipc/read/stream.rs index ed7f5955dfe..e93d79d313a 100644 --- a/src/io/ipc/read/stream.rs +++ b/src/io/ipc/read/stream.rs @@ -76,12 +76,28 @@ pub fn read_stream_metadata(reader: &mut R) -> Result { }) } +/// Encodes the stream's status after each read. +/// +/// A stream is an iterator, and an iterator returns `Option`. The `Item` +/// type in the [`StreamReader`] case is `StreamState`, which means that an Arrow +/// stream may yield one of three values: (1) `None`, which signals that the stream +/// is done; (2) `Some(StreamState::Some(RecordBatch))`, which signals that there was +/// data waiting in the stream and we read it; and finally (3) +/// `Some(StreamState::Waiting)`, which means that the stream is still "live", it +/// just doesn't hold any data right now. pub enum StreamState { + /// A live stream without data Waiting, + /// Next item in the stream Some(RecordBatch), } impl StreamState { + /// Return the data inside this wrapper. + /// + /// # Panics + /// + /// If the `StreamState` was `Waiting`. pub fn unwrap(self) -> RecordBatch { if let StreamState::Some(batch) = self { batch @@ -91,7 +107,8 @@ impl StreamState { } } -/// Reads the next item +/// Reads the next item, yielding `None` if the stream is done, +/// and a [`StreamState`] otherwise. pub fn read_next( reader: &mut R, metadata: &StreamMetadata, @@ -191,7 +208,12 @@ pub fn read_next( } } -/// Arrow Stream reader +/// Arrow Stream reader. +/// +/// An [`Iterator`] over an Arrow stream that yields a result of [`StreamState`]s. +/// This is the recommended way to read an arrow stream (by iterating over its data). +/// +/// For a more thorough walkthrough consult [this example](https://github.com/jorgecarleitao/arrow2/tree/main/examples/ipc_pyarrow). pub struct StreamReader { reader: R, metadata: StreamMetadata, diff --git a/src/io/ipc/write/stream.rs b/src/io/ipc/write/stream.rs index 5a52341b092..7c86d7e3979 100644 --- a/src/io/ipc/write/stream.rs +++ b/src/io/ipc/write/stream.rs @@ -32,6 +32,12 @@ use crate::datatypes::*; use crate::error::{ArrowError, Result}; use crate::record_batch::RecordBatch; +/// Arrow stream writer +/// +/// The data written by this writer must be read in order. To signal that no more +/// data is arriving through the stream call [`self.finish()`](StreamWriter::finish); +/// +/// For a usage walkthrough consult [this example](https://github.com/jorgecarleitao/arrow2/tree/main/examples/ipc_pyarrow). pub struct StreamWriter { /// The object to write to writer: BufWriter, diff --git a/src/io/ipc/write/writer.rs b/src/io/ipc/write/writer.rs index 76811dc12a1..81a62491f2d 100644 --- a/src/io/ipc/write/writer.rs +++ b/src/io/ipc/write/writer.rs @@ -37,6 +37,7 @@ use crate::datatypes::*; use crate::error::{ArrowError, Result}; use crate::record_batch::RecordBatch; +/// Arrow file writer pub struct FileWriter { /// The object to write to writer: W,