Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Improved example.
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Aug 23, 2021
1 parent cfe4329 commit c838e40
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 9 deletions.
18 changes: 13 additions & 5 deletions examples/ipc_pyarrow/main.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import pyarrow as pa
from time import sleep
import socket

# Set up the data exchange socket
sk = socket.socket()
sk.bind(("127.0.0.1", 12989))
sk.listen()

data = [
pa.array([1, 2, 3, 4]),
Expand All @@ -9,8 +14,11 @@
]

batch = pa.record_batch(data, names=["f0", "f1", "f2"])
writer = pa.ipc.new_stream("data.arrows", batch.schema)
while True:
for _ in range(10):
writer.write(batch)
sleep(1)

# Accept incoming connection and stream the data away
connection, address = sk.accept()
dummy_socket_file = connection.makefile("wb")
with pa.RecordBatchStreamWriter(dummy_socket_file, batch.schema) as writer:
for i in range(50):
writer.write_batch(batch)
sleep(1)
9 changes: 5 additions & 4 deletions examples/ipc_pyarrow/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::fs::File;
use std::io::BufReader;
use std::net::TcpStream;
use std::thread;
use std::time::Duration;

Expand All @@ -9,7 +8,9 @@ use arrow2::error::Result;
use arrow2::io::ipc::read;

fn main() -> Result<()> {
let mut reader = File::open("data.arrows")?;
const ADDRESS: &str = "127.0.0.1:12989";

let mut reader = TcpStream::connect(ADDRESS)?;
let metadata = read::read_stream_metadata(&mut reader)?;
let mut stream = read::StreamReader::new(&mut reader, metadata);

Expand All @@ -21,7 +22,7 @@ fn main() -> Result<()> {
idx += 1;
println!("batch: {:?}", idx)
}
Ok(read::StreamState::Waiting) => thread::sleep(Duration::from_millis(4000)),
Ok(read::StreamState::Waiting) => thread::sleep(Duration::from_millis(2000)),
Err(l) => println!("{:?} ({})", l, idx),
},
None => break,
Expand Down

0 comments on commit c838e40

Please sign in to comment.