diff --git a/examples/ipc_pyarrow/main.py b/examples/ipc_pyarrow/main.py index c59eb271f19..c3b1468dea8 100644 --- a/examples/ipc_pyarrow/main.py +++ b/examples/ipc_pyarrow/main.py @@ -1,6 +1,11 @@ import pyarrow as pa from time import sleep +import socket +# Set up the data exchange socket +sk = socket.socket() +sk.bind(("127.0.0.1", 12989)) +sk.listen() data = [ pa.array([1, 2, 3, 4]), @@ -9,8 +14,11 @@ ] batch = pa.record_batch(data, names=["f0", "f1", "f2"]) -writer = pa.ipc.new_stream("data.arrows", batch.schema) -while True: - for _ in range(10): - writer.write(batch) - sleep(1) + +# Accept incoming connection and stream the data away +connection, address = sk.accept() +dummy_socket_file = connection.makefile("wb") +with pa.RecordBatchStreamWriter(dummy_socket_file, batch.schema) as writer: + for i in range(50): + writer.write_batch(batch) + sleep(1) diff --git a/examples/ipc_pyarrow/src/main.rs b/examples/ipc_pyarrow/src/main.rs index 42e99d47442..ce92e4e1b21 100644 --- a/examples/ipc_pyarrow/src/main.rs +++ b/examples/ipc_pyarrow/src/main.rs @@ -1,5 +1,4 @@ -use std::fs::File; -use std::io::BufReader; +use std::net::TcpStream; use std::thread; use std::time::Duration; @@ -9,7 +8,9 @@ use arrow2::error::Result; use arrow2::io::ipc::read; fn main() -> Result<()> { - let mut reader = File::open("data.arrows")?; + const ADDRESS: &str = "127.0.0.1:12989"; + + let mut reader = TcpStream::connect(ADDRESS)?; let metadata = read::read_stream_metadata(&mut reader)?; let mut stream = read::StreamReader::new(&mut reader, metadata); @@ -21,7 +22,7 @@ fn main() -> Result<()> { idx += 1; println!("batch: {:?}", idx) } - Ok(read::StreamState::Waiting) => thread::sleep(Duration::from_millis(4000)), + Ok(read::StreamState::Waiting) => thread::sleep(Duration::from_millis(2000)), Err(l) => println!("{:?} ({})", l, idx), }, None => break,