From b8d608719950a6d5f8940877ddffa3110d2a84b4 Mon Sep 17 00:00:00 2001 From: HagaiHargil Date: Sat, 16 Oct 2021 20:42:48 +0300 Subject: [PATCH 1/8] Main mod docs --- src/io/ipc/mod.rs | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/src/io/ipc/mod.rs b/src/io/ipc/mod.rs index f01ae010340..18ba3c16121 100644 --- a/src/io/ipc/mod.rs +++ b/src/io/ipc/mod.rs @@ -1,4 +1,38 @@ //! APIs to read from and write to Arrow's IPC format. +//! +//! Inter-process communication is a method through which different processes +//! share and pass data between them. Its use-cases include parallel +//! processing of chunks of data across different CPU cores, transferring +//! data between different Apache Arrow implementations in other languages and +//! more. Under the hood Apache Arrow uses [FlatBuffers](https://google.github.io/flatbuffers/) +//! as its binary protocol, so every Arrow-centered streaming or serialiation +//! problem that could be solved using FlatBuffers could probably be solved +//! using the more integrated approach that is exposed in this module. +//! +//! [Arrow's IPC protocol](https://arrow.apache.org/docs/format/Columnar.html#serialization-and-interprocess-communication-ipc) +//! allows only [`RecordBatch`](crate::record_batch::RecordBatch)es to be passed +//! around due to its reliance on a pre-defined data scheme. This limitation +//! provides a large performance gain because serialized data will always have a +//! known structutre, i.e. the same fields and datatypes, with the only variance +//! being the number of rows and the actual data inside the RecordBatch. This dramatically +//! increases the deserialization rate, as the bytes in the file or stream are already +//! structured "correctly". +//! +//! Reading and writing IPC messages is done using one of two variants - either +//! [`FileReader`](read::FileReader) <-> [`FileWriter`](struct@write::FileWriter) or +//! [`StreamReader`](read::StreamReader) <-> [`StreamWriter`](struct@write::StreamWriter). +//! These two variants wrap a type `T` that implements [`Read`](std::io::Read), and in +//! the case of the `File` variant it also implements [`Seek`](std::io::Seek). In +//! practice it means that `File`s can be arbitrarily accessed while `Stream`s are only +//! read in certain order - the one they were written in (first in, first out). +//! +//! For further information and examples please consult the +//! [user guide](https://jorgecarleitao.github.io/arrow2/io/index.html). +//! For more examples check the [read](mod@read) and [write](mod@write) modules +//! or look at the `examples` folder in the main repository +//! ([1](https://github.com/jorgecarleitao/arrow2/blob/main/examples/ipc_file_read.rs), +//! [2](https://github.com/jorgecarleitao/arrow2/blob/main/examples/ipc_file_write.rs), +//! [3](https://github.com/jorgecarleitao/arrow2/tree/main/examples/ipc_pyarrow)). #![allow(missing_debug_implementations)] #![allow(non_camel_case_types)] From e5ee6e9db108bf98a1467a83d2f37855674062ce Mon Sep 17 00:00:00 2001 From: HagaiHargil Date: Sat, 16 Oct 2021 21:55:50 +0300 Subject: [PATCH 2/8] More detailed docs --- src/io/ipc/mod.rs | 50 +++++++++++++++++++++++++++++++++++--- src/io/ipc/read/mod.rs | 6 +++++ src/io/ipc/read/stream.rs | 29 ++++++++++++++++++++-- src/io/ipc/write/stream.rs | 1 + src/io/ipc/write/writer.rs | 1 + 5 files changed, 81 insertions(+), 6 deletions(-) diff --git a/src/io/ipc/mod.rs b/src/io/ipc/mod.rs index 18ba3c16121..dfcc5c4f57c 100644 --- a/src/io/ipc/mod.rs +++ b/src/io/ipc/mod.rs @@ -10,11 +10,12 @@ //! using the more integrated approach that is exposed in this module. //! //! [Arrow's IPC protocol](https://arrow.apache.org/docs/format/Columnar.html#serialization-and-interprocess-communication-ipc) -//! allows only [`RecordBatch`](crate::record_batch::RecordBatch)es to be passed +//! allows only [`RecordBatch`](crate::record_batch::RecordBatch)es or +//! [`DictionaryBatch`](gen::Message::DictionaryBatch) to be passed //! around due to its reliance on a pre-defined data scheme. This limitation //! provides a large performance gain because serialized data will always have a //! known structutre, i.e. the same fields and datatypes, with the only variance -//! being the number of rows and the actual data inside the RecordBatch. This dramatically +//! being the number of rows and the actual data inside the Batch. This dramatically //! increases the deserialization rate, as the bytes in the file or stream are already //! structured "correctly". //! @@ -26,10 +27,51 @@ //! practice it means that `File`s can be arbitrarily accessed while `Stream`s are only //! read in certain order - the one they were written in (first in, first out). //! +//! # Examples +//! Read and write to a file: +//! ``` +//! use arrow2::io::ipc::{{read::{FileReader, read_file_metadata}}, {write::FileWriter}}; +//! # use std::fs::File; +//! # use std::sync::Arc; +//! # use arrow2::datatypes::{Field, Schema, DataType}; +//! # use arrow2::array::Int32Array; +//! # use arrow2::record_batch::RecordBatch; +//! // Setup the writer +//! let path = "/tmp/example.dat".to_string(); +//! let mut file = File::create(&path).unwrap(); +//! let x_coord = Field::new("x", DataType::Int32, false); +//! let y_coord = Field::new("y", DataType::Int32, false); +//! let schema = Schema::new(vec![x_coord, y_coord]); +//! let mut writer = FileWriter::try_new(file, &schema).unwrap(); +//! +//! // Setup the data +//! let x_data = Int32Array::from_slice([-1i32, 1]); +//! let y_data = Int32Array::from_slice([1i32, -1]); +//! let batch = RecordBatch::try_new( +//! Arc::new(schema), +//! vec![Arc::new(x_data), Arc::new(y_data)] +//! ).unwrap(); +//! +//! // Write the messages and finalize the stream +//! for _ in 0..5 { +//! writer.write(&batch); +//! } +//! writer.finish(); +//! +//! // Fetch some of the data and get the reader back +//! let mut reader = File::open(&path).unwrap(); +//! let metadata = read_file_metadata(&mut reader).unwrap(); +//! let mut filereader = FileReader::new(reader, metadata, None); +//! let row1 = filereader.next().unwrap(); // [[-1, 1], [1, -1]] +//! let row2 = filereader.next().unwrap(); // [[-1, 1], [1, -1]] +//! let mut reader = filereader.into_inner(); +//! // Do more stuff with the reader, like seeking ahead. +//! +//! ``` +//! //! For further information and examples please consult the //! [user guide](https://jorgecarleitao.github.io/arrow2/io/index.html). -//! For more examples check the [read](mod@read) and [write](mod@write) modules -//! or look at the `examples` folder in the main repository +//! For even more examples check the `examples` folder in the main repository //! ([1](https://github.com/jorgecarleitao/arrow2/blob/main/examples/ipc_file_read.rs), //! [2](https://github.com/jorgecarleitao/arrow2/blob/main/examples/ipc_file_write.rs), //! [3](https://github.com/jorgecarleitao/arrow2/tree/main/examples/ipc_pyarrow)). diff --git a/src/io/ipc/read/mod.rs b/src/io/ipc/read/mod.rs index aec51e1a8fd..adc2b41e477 100644 --- a/src/io/ipc/read/mod.rs +++ b/src/io/ipc/read/mod.rs @@ -1,4 +1,10 @@ //! APIs to read Arrow's IPC format. +//! +//! The two important structs here are the [`FileReader`](reader::FileReader), +//! which provides arbitrary access to any of its messages, and the +//! [`StreamReader`](stream::StreamReader), which only supports reading +//! data in the order it was written in. + mod array; mod common; mod deserialize; diff --git a/src/io/ipc/read/stream.rs b/src/io/ipc/read/stream.rs index ed7f5955dfe..d54b61b9605 100644 --- a/src/io/ipc/read/stream.rs +++ b/src/io/ipc/read/stream.rs @@ -76,12 +76,28 @@ pub fn read_stream_metadata(reader: &mut R) -> Result { }) } +/// Encodes the stream's status after each read. +/// +/// A stream is an iterator, and an iterator returns `Option`. The `Item` +/// type in the [`StreamReader`] case is `StreamState`, which means that an Arrow +/// stream may yield one of three values: (1) `None`, which signals that the stream +/// is done; (2) `Some(StreamState::Some(RecordBatch))`, which signals that there was +/// data waiting in the stream and we read it; and finally (3) +/// `Some(StreamState::Waiting)`, which means that the stream is still "live", it +/// just doesn't hold any data right now. pub enum StreamState { + /// A live stream without data Waiting, + /// Next item in the stream Some(RecordBatch), } impl StreamState { + /// Return the data inside this wrapper. + /// + /// # Panics + /// + /// If the `StreamState` was `Waiting`. pub fn unwrap(self) -> RecordBatch { if let StreamState::Some(batch) = self { batch @@ -91,7 +107,8 @@ impl StreamState { } } -/// Reads the next item +/// Reads the next item, yielding `None` if the stream is done, +/// and a `StreamState` otherwise. pub fn read_next( reader: &mut R, metadata: &StreamMetadata, @@ -191,7 +208,15 @@ pub fn read_next( } } -/// Arrow Stream reader +/// Arrow Stream reader. +/// +/// Once a stream is created the recommended way to interact with it is by +/// iterating over its data. Each such iteration yields an [`Option`](StreamState), +/// signaling that the stream might not currently have any data, even though it's still a "live" +/// stream. +/// +/// For a full usage examples consult [this](https://github.com/jorgecarleitao/arrow2/tree/main/examples/ipc_pyarrow) +/// example in the main repository. pub struct StreamReader { reader: R, metadata: StreamMetadata, diff --git a/src/io/ipc/write/stream.rs b/src/io/ipc/write/stream.rs index 5a52341b092..854a49946f0 100644 --- a/src/io/ipc/write/stream.rs +++ b/src/io/ipc/write/stream.rs @@ -32,6 +32,7 @@ use crate::datatypes::*; use crate::error::{ArrowError, Result}; use crate::record_batch::RecordBatch; +/// Arrow stream writer pub struct StreamWriter { /// The object to write to writer: BufWriter, diff --git a/src/io/ipc/write/writer.rs b/src/io/ipc/write/writer.rs index 76811dc12a1..81a62491f2d 100644 --- a/src/io/ipc/write/writer.rs +++ b/src/io/ipc/write/writer.rs @@ -37,6 +37,7 @@ use crate::datatypes::*; use crate::error::{ArrowError, Result}; use crate::record_batch::RecordBatch; +/// Arrow file writer pub struct FileWriter { /// The object to write to writer: W, From 3f579bd9d145dc17a741106784265fa8d6ac368b Mon Sep 17 00:00:00 2001 From: HagaiHargil Date: Sat, 16 Oct 2021 21:59:15 +0300 Subject: [PATCH 3/8] StreamWriter addition --- src/io/ipc/write/stream.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/io/ipc/write/stream.rs b/src/io/ipc/write/stream.rs index 854a49946f0..49265914e64 100644 --- a/src/io/ipc/write/stream.rs +++ b/src/io/ipc/write/stream.rs @@ -33,6 +33,9 @@ use crate::error::{ArrowError, Result}; use crate::record_batch::RecordBatch; /// Arrow stream writer +/// +/// The data written by this writer must be read in order. To signal that no more +/// data is arriving through the stream call [`self.finish()`](StreamWriter::finish); pub struct StreamWriter { /// The object to write to writer: BufWriter, From e6d825bda34950ea5327fca7c0d248141ccfaeca Mon Sep 17 00:00:00 2001 From: HagaiHargil Date: Sat, 16 Oct 2021 23:05:13 +0300 Subject: [PATCH 4/8] Fix windows path --- src/io/ipc/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/io/ipc/mod.rs b/src/io/ipc/mod.rs index dfcc5c4f57c..fa43470969b 100644 --- a/src/io/ipc/mod.rs +++ b/src/io/ipc/mod.rs @@ -37,7 +37,7 @@ //! # use arrow2::array::Int32Array; //! # use arrow2::record_batch::RecordBatch; //! // Setup the writer -//! let path = "/tmp/example.dat".to_string(); +//! let path = "example.dat".to_string(); //! let mut file = File::create(&path).unwrap(); //! let x_coord = Field::new("x", DataType::Int32, false); //! let y_coord = Field::new("y", DataType::Int32, false); From b836f9cc0f5ea8ee754ae7c07e6045f262e6e898 Mon Sep 17 00:00:00 2001 From: Hagai Har-Gil Date: Sun, 17 Oct 2021 08:31:12 +0300 Subject: [PATCH 5/8] Update src/io/ipc/mod.rs Co-authored-by: Jorge Leitao --- src/io/ipc/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/io/ipc/mod.rs b/src/io/ipc/mod.rs index fa43470969b..d0348c1354d 100644 --- a/src/io/ipc/mod.rs +++ b/src/io/ipc/mod.rs @@ -37,7 +37,7 @@ //! # use arrow2::array::Int32Array; //! # use arrow2::record_batch::RecordBatch; //! // Setup the writer -//! let path = "example.dat".to_string(); +//! let path = "example.arrow".to_string(); //! let mut file = File::create(&path).unwrap(); //! let x_coord = Field::new("x", DataType::Int32, false); //! let y_coord = Field::new("y", DataType::Int32, false); From 7a142a732d48cd2072dfe56e123597a3fc75ba7f Mon Sep 17 00:00:00 2001 From: Hagai Har-Gil Date: Sun, 17 Oct 2021 08:32:25 +0300 Subject: [PATCH 6/8] Update src/io/ipc/read/stream.rs Co-authored-by: Jorge Leitao --- src/io/ipc/read/stream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/io/ipc/read/stream.rs b/src/io/ipc/read/stream.rs index d54b61b9605..75ecfc3d110 100644 --- a/src/io/ipc/read/stream.rs +++ b/src/io/ipc/read/stream.rs @@ -108,7 +108,7 @@ impl StreamState { } /// Reads the next item, yielding `None` if the stream is done, -/// and a `StreamState` otherwise. +/// and a [`StreamState`] otherwise. pub fn read_next( reader: &mut R, metadata: &StreamMetadata, From eb51cbc3028533204ebb538be607516b369f0a29 Mon Sep 17 00:00:00 2001 From: Hagai Har-Gil Date: Sun, 17 Oct 2021 08:33:47 +0300 Subject: [PATCH 7/8] Update src/io/ipc/read/stream.rs Co-authored-by: Jorge Leitao --- src/io/ipc/read/stream.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/io/ipc/read/stream.rs b/src/io/ipc/read/stream.rs index 75ecfc3d110..c83e6c4b2f9 100644 --- a/src/io/ipc/read/stream.rs +++ b/src/io/ipc/read/stream.rs @@ -210,10 +210,8 @@ pub fn read_next( /// Arrow Stream reader. /// -/// Once a stream is created the recommended way to interact with it is by -/// iterating over its data. Each such iteration yields an [`Option`](StreamState), -/// signaling that the stream might not currently have any data, even though it's still a "live" -/// stream. +/// An [`Iterator`] over an Arrow stream that yields a result of [`StreamState`]s. +/// This is the recommended way to read an arrow stream (by iterating over its data). /// /// For a full usage examples consult [this](https://github.com/jorgecarleitao/arrow2/tree/main/examples/ipc_pyarrow) /// example in the main repository. From f373afa6964d768dcadcd303ad6ae5c8a867d490 Mon Sep 17 00:00:00 2001 From: HagaiHargil Date: Sun, 17 Oct 2021 08:41:55 +0300 Subject: [PATCH 8/8] No unwraps and other fixes --- src/io/ipc/mod.rs | 13 +++++++------ src/io/ipc/read/stream.rs | 3 +-- src/io/ipc/write/stream.rs | 2 ++ 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/src/io/ipc/mod.rs b/src/io/ipc/mod.rs index d0348c1354d..1e6e4d603fc 100644 --- a/src/io/ipc/mod.rs +++ b/src/io/ipc/mod.rs @@ -36,13 +36,14 @@ //! # use arrow2::datatypes::{Field, Schema, DataType}; //! # use arrow2::array::Int32Array; //! # use arrow2::record_batch::RecordBatch; +//! # use arrow2::error::ArrowError; //! // Setup the writer //! let path = "example.arrow".to_string(); -//! let mut file = File::create(&path).unwrap(); +//! let mut file = File::create(&path)?; //! let x_coord = Field::new("x", DataType::Int32, false); //! let y_coord = Field::new("y", DataType::Int32, false); //! let schema = Schema::new(vec![x_coord, y_coord]); -//! let mut writer = FileWriter::try_new(file, &schema).unwrap(); +//! let mut writer = FileWriter::try_new(file, &schema)?; //! //! // Setup the data //! let x_data = Int32Array::from_slice([-1i32, 1]); @@ -50,7 +51,7 @@ //! let batch = RecordBatch::try_new( //! Arc::new(schema), //! vec![Arc::new(x_data), Arc::new(y_data)] -//! ).unwrap(); +//! )?; //! //! // Write the messages and finalize the stream //! for _ in 0..5 { @@ -59,14 +60,14 @@ //! writer.finish(); //! //! // Fetch some of the data and get the reader back -//! let mut reader = File::open(&path).unwrap(); -//! let metadata = read_file_metadata(&mut reader).unwrap(); +//! let mut reader = File::open(&path)?; +//! let metadata = read_file_metadata(&mut reader)?; //! let mut filereader = FileReader::new(reader, metadata, None); //! let row1 = filereader.next().unwrap(); // [[-1, 1], [1, -1]] //! let row2 = filereader.next().unwrap(); // [[-1, 1], [1, -1]] //! let mut reader = filereader.into_inner(); //! // Do more stuff with the reader, like seeking ahead. -//! +//! # Ok::<(), ArrowError>(()) //! ``` //! //! For further information and examples please consult the diff --git a/src/io/ipc/read/stream.rs b/src/io/ipc/read/stream.rs index c83e6c4b2f9..e93d79d313a 100644 --- a/src/io/ipc/read/stream.rs +++ b/src/io/ipc/read/stream.rs @@ -213,8 +213,7 @@ pub fn read_next( /// An [`Iterator`] over an Arrow stream that yields a result of [`StreamState`]s. /// This is the recommended way to read an arrow stream (by iterating over its data). /// -/// For a full usage examples consult [this](https://github.com/jorgecarleitao/arrow2/tree/main/examples/ipc_pyarrow) -/// example in the main repository. +/// For a more thorough walkthrough consult [this example](https://github.com/jorgecarleitao/arrow2/tree/main/examples/ipc_pyarrow). pub struct StreamReader { reader: R, metadata: StreamMetadata, diff --git a/src/io/ipc/write/stream.rs b/src/io/ipc/write/stream.rs index 49265914e64..7c86d7e3979 100644 --- a/src/io/ipc/write/stream.rs +++ b/src/io/ipc/write/stream.rs @@ -36,6 +36,8 @@ use crate::record_batch::RecordBatch; /// /// The data written by this writer must be read in order. To signal that no more /// data is arriving through the stream call [`self.finish()`](StreamWriter::finish); +/// +/// For a usage walkthrough consult [this example](https://github.com/jorgecarleitao/arrow2/tree/main/examples/ipc_pyarrow). pub struct StreamWriter { /// The object to write to writer: BufWriter,