Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added example.
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Aug 19, 2021
1 parent f6775a6 commit 96432ac
Show file tree
Hide file tree
Showing 9 changed files with 95 additions and 10 deletions.
1 change: 1 addition & 0 deletions examples/ipc_pyarrow/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
data.arrows
7 changes: 7 additions & 0 deletions examples/ipc_pyarrow/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
[package]
name = "ipc_stream"
version = "0.1.0"
edition = "2018"

[dependencies]
arrow2 = { path = "../../", default-features = false, features = ["io_ipc"] }
16 changes: 16 additions & 0 deletions examples/ipc_pyarrow/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import pyarrow as pa
from time import sleep


data = [
pa.array([1, 2, 3, 4]),
pa.array(["foo", "bar", "baz", None]),
pa.array([True, None, False, True]),
]

batch = pa.record_batch(data, names=["f0", "f1", "f2"])
writer = pa.ipc.new_stream("data.arrows", batch.schema)
while True:
for _ in range(10):
writer.write(batch)
sleep(1)
7 changes: 7 additions & 0 deletions examples/ipc_pyarrow/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
python main.py &
PRODUCER_PID=$!

sleep 1 # wait for metadata to be available.
cargo run

kill $PRODUCER_PID
32 changes: 32 additions & 0 deletions examples/ipc_pyarrow/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use std::fs::File;
use std::io::BufReader;
use std::thread;
use std::time::Duration;

use arrow2::array::{Array, Int64Array};
use arrow2::datatypes::DataType;
use arrow2::error::Result;
use arrow2::io::ipc::read;

fn main() -> Result<()> {
let mut reader = File::open("data.arrows")?;
let metadata = read::read_stream_metadata(&mut reader)?;
let mut stream = read::StreamReader::new(&mut reader, metadata);

let mut idx = 0;
loop {
match stream.next() {
Some(x) => match x {
Ok(read::StreamState::Some(b)) => {
idx += 1;
println!("batch: {:?}", idx)
}
Ok(read::StreamState::Waiting) => thread::sleep(Duration::from_millis(4000)),
Err(l) => println!("{:?} ({})", l, idx),
},
None => break,
};
}

Ok(())
}
1 change: 1 addition & 0 deletions guide/src/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@
- [Read Parquet](./io/parquet_read.md)
- [Write Parquet](./io/parquet_write.md)
- [Read Arrow](./io/ipc_read.md)
- [Read Arrow stream](./io/ipc_stream_read.md)
- [Write Arrow](./io/ipc_write.md)
21 changes: 21 additions & 0 deletions guide/src/io/ipc_stream_read.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Read Arrow streams

When compiled with feature `io_ipc`, this crate can be used to read Arrow streams.

The example below shows how to read from a stream:

```rust
{{#include ../../../examples/ipc_pyarrow/src/main.rs}}
```

e.g. written by pyarrow:

```python,ignore
{{#include ../../../examples/ipc_pyarrow/main.py}}
```

via

```bash,ignore
{{#include ../../../examples/ipc_pyarrow/run.sh}}
```
2 changes: 1 addition & 1 deletion src/io/ipc/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@ mod stream;

pub use common::{read_dictionary, read_record_batch};
pub use reader::{read_file_metadata, FileMetadata, FileReader};
pub use stream::{read_stream_metadata, StreamMetadata, StreamReader};
pub use stream::{read_stream_metadata, StreamMetadata, StreamReader, StreamState};
18 changes: 9 additions & 9 deletions src/io/ipc/read/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,14 @@ pub fn read_stream_metadata<R: Read>(reader: &mut R) -> Result<StreamMetadata> {
})
}

pub enum State {
pub enum StreamState {
Waiting,
Some(RecordBatch),
}

impl State {
impl StreamState {
pub fn unwrap(self) -> RecordBatch {
if let State::Some(batch) = self {
if let StreamState::Some(batch) = self {
batch
} else {
panic!("The batch is not available")
Expand All @@ -96,7 +96,7 @@ pub fn read_next<R: Read>(
reader: &mut R,
metadata: &StreamMetadata,
dictionaries_by_field: &mut Vec<Option<ArrayRef>>,
) -> Result<Option<State>> {
) -> Result<Option<StreamState>> {
// determine metadata length
let mut meta_size: [u8; 4] = [0; 4];

Expand All @@ -107,7 +107,7 @@ pub fn read_next<R: Read>(
// Handle EOF without the "0xFFFFFFFF 0x00000000"
// valid according to:
// https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format
Ok(Some(State::Waiting))
Ok(Some(StreamState::Waiting))
} else {
Err(ArrowError::from(e))
};
Expand Down Expand Up @@ -159,7 +159,7 @@ pub fn read_next<R: Read>(
&mut reader,
0,
)
.map(|x| Some(State::Some(x)))
.map(|x| Some(StreamState::Some(x)))
}
gen::Message::MessageHeader::DictionaryBatch => {
let batch = message.header_as_dictionary_batch().ok_or_else(|| {
Expand All @@ -183,7 +183,7 @@ pub fn read_next<R: Read>(
// read the next message until we encounter a RecordBatch
read_next(reader, metadata, dictionaries_by_field)
}
gen::Message::MessageHeader::NONE => Ok(Some(State::Waiting)),
gen::Message::MessageHeader::NONE => Ok(Some(StreamState::Waiting)),
t => Err(ArrowError::Ipc(format!(
"Reading types other than record batches not yet supported, unable to read {:?} ",
t
Expand Down Expand Up @@ -225,7 +225,7 @@ impl<R: Read> StreamReader<R> {
self.finished
}

fn maybe_next(&mut self) -> Result<Option<State>> {
fn maybe_next(&mut self) -> Result<Option<StreamState>> {
if self.finished {
return Ok(None);
}
Expand All @@ -242,7 +242,7 @@ impl<R: Read> StreamReader<R> {
}

impl<R: Read> Iterator for StreamReader<R> {
type Item = Result<State>;
type Item = Result<StreamState>;

fn next(&mut self) -> Option<Self::Item> {
self.maybe_next().transpose()
Expand Down

0 comments on commit 96432ac

Please sign in to comment.