From f5ac25b68e4be90d6136fc93315b8928aac4078b Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Sat, 12 Feb 2022 06:53:10 +0000 Subject: [PATCH] Added Async Arrow stream reading --- Cargo.toml | 2 + src/io/ipc/read/mod.rs | 3 + src/io/ipc/read/schema.rs | 30 +++- src/io/ipc/read/stream_async.rs | 217 +++++++++++++++++++++++++++ tests/it/io/ipc/mod.rs | 3 + tests/it/io/ipc/read_stream_async.rs | 46 ++++++ 6 files changed, 300 insertions(+), 1 deletion(-) create mode 100644 src/io/ipc/read/stream_async.rs create mode 100644 tests/it/io/ipc/read_stream_async.rs diff --git a/Cargo.toml b/Cargo.toml index 9aa2b01eae0..895467ed86a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -111,6 +111,7 @@ full = [ "io_ipc", "io_flight", "io_ipc_write_async", + "io_ipc_read_async", "io_ipc_compression", "io_json_integration", "io_print", @@ -132,6 +133,7 @@ io_csv_write = ["csv", "streaming-iterator", "lexical-core"] io_json = ["serde", "serde_json", "streaming-iterator", "fallible-streaming-iterator", "indexmap", "lexical-core"] io_ipc = ["arrow-format"] io_ipc_write_async = ["io_ipc", "futures"] +io_ipc_read_async = ["io_ipc", "futures"] io_ipc_compression = ["lz4", "zstd"] io_flight = ["io_ipc", "arrow-format/flight-data"] # base64 + io_ipc because arrow schemas are stored as base64-encoded ipc format. diff --git a/src/io/ipc/read/mod.rs b/src/io/ipc/read/mod.rs index 22b3a2fe448..3a45d4ecac6 100644 --- a/src/io/ipc/read/mod.rs +++ b/src/io/ipc/read/mod.rs @@ -16,6 +16,9 @@ mod read_basic; mod reader; mod schema; mod stream; +#[cfg(feature = "io_ipc_read_async")] +#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc_read_async")))] +pub mod stream_async; pub use common::{read_dictionary, read_record_batch}; pub use reader::{read_file_metadata, FileMetadata, FileReader}; diff --git a/src/io/ipc/read/schema.rs b/src/io/ipc/read/schema.rs index 7dbeeedfd73..bde3deba1e2 100644 --- a/src/io/ipc/read/schema.rs +++ b/src/io/ipc/read/schema.rs @@ -8,7 +8,10 @@ use crate::{ error::{ArrowError, Result}, }; -use super::super::{IpcField, IpcSchema}; +use super::{ + super::{IpcField, IpcSchema}, + StreamMetadata, +}; fn try_unzip_vec>>(iter: I) -> Result<(Vec, Vec)> { let mut a = vec![]; @@ -370,3 +373,28 @@ pub(super) fn fb_to_schema(schema: arrow_format::ipc::SchemaRef) -> Result<(Sche }, )) } + +pub(super) fn deserialize_stream_metadata(meta: &[u8]) -> Result { + let message = arrow_format::ipc::MessageRef::read_as_root(meta).map_err(|err| { + ArrowError::OutOfSpec(format!("Unable to get root as message: {:?}", err)) + })?; + let version = message.version()?; + // message header is a Schema, so read it + let header = message + .header()? + .ok_or_else(|| ArrowError::oos("Unable to read the first IPC message"))?; + let schema = if let arrow_format::ipc::MessageHeaderRef::Schema(schema) = header { + schema + } else { + return Err(ArrowError::oos( + "The first IPC message of the stream must be a schema", + )); + }; + let (schema, ipc_schema) = fb_to_schema(schema)?; + + Ok(StreamMetadata { + schema, + version, + ipc_schema, + }) +} diff --git a/src/io/ipc/read/stream_async.rs b/src/io/ipc/read/stream_async.rs new file mode 100644 index 00000000000..a3ccffec9ae --- /dev/null +++ b/src/io/ipc/read/stream_async.rs @@ -0,0 +1,217 @@ +//! APIs to read Arrow streams asynchronously +use std::sync::Arc; + +use arrow_format::ipc::planus::ReadAsRoot; +use futures::future::BoxFuture; +use futures::AsyncRead; +use futures::AsyncReadExt; +use futures::Stream; + +use crate::array::*; +use crate::chunk::Chunk; +use crate::error::{ArrowError, Result}; + +use super::super::CONTINUATION_MARKER; +use super::common::{read_dictionary, read_record_batch}; +use super::schema::deserialize_stream_metadata; +use super::Dictionaries; +use super::StreamMetadata; + +/// The state of an Arrow stream +enum StreamState { + /// The stream does not contain new chunks (and it has not been closed) + Waiting((R, StreamMetadata, Dictionaries)), + /// The stream contain a new chunk + Some((R, StreamMetadata, Dictionaries, Chunk>)), +} + +/// Reads the [`StreamMetadata`] of the Arrow stream asynchronously +pub async fn read_stream_metadata_async( + reader: &mut R, +) -> Result { + // determine metadata length + let mut meta_size: [u8; 4] = [0; 4]; + reader.read_exact(&mut meta_size).await?; + let meta_len = { + // If a continuation marker is encountered, skip over it and read + // the size from the next four bytes. + if meta_size == CONTINUATION_MARKER { + reader.read_exact(&mut meta_size).await?; + } + i32::from_le_bytes(meta_size) + }; + + let mut meta_buffer = vec![0; meta_len as usize]; + reader.read_exact(&mut meta_buffer).await?; + + deserialize_stream_metadata(&meta_buffer) +} + +/// Reads the next item, yielding `None` if the stream has been closed, +/// or a [`StreamState`] otherwise. +async fn _read_next( + mut reader: R, + metadata: StreamMetadata, + mut dictionaries: Dictionaries, + message_buffer: &mut Vec, + data_buffer: &mut Vec, +) -> Result>> { + // determine metadata length + let mut meta_length: [u8; 4] = [0; 4]; + + match reader.read_exact(&mut meta_length).await { + Ok(()) => (), + Err(e) => { + return if e.kind() == std::io::ErrorKind::UnexpectedEof { + // Handle EOF without the "0xFFFFFFFF 0x00000000" + // valid according to: + // https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format + Ok(Some(StreamState::Waiting((reader, metadata, dictionaries)))) + } else { + Err(ArrowError::from(e)) + }; + } + } + + let meta_length = { + // If a continuation marker is encountered, skip over it and read + // the size from the next four bytes. + if meta_length == CONTINUATION_MARKER { + reader.read_exact(&mut meta_length).await?; + } + i32::from_le_bytes(meta_length) as usize + }; + + if meta_length == 0 { + // the stream has ended, mark the reader as finished + return Ok(None); + } + + message_buffer.clear(); + message_buffer.resize(meta_length, 0); + reader.read_exact(message_buffer).await?; + + let message = arrow_format::ipc::MessageRef::read_as_root(message_buffer).map_err(|err| { + ArrowError::OutOfSpec(format!("Unable to get root as message: {:?}", err)) + })?; + let header = message.header()?.ok_or_else(|| { + ArrowError::oos("IPC: unable to fetch the message header. The file or stream is corrupted.") + })?; + + match header { + arrow_format::ipc::MessageHeaderRef::Schema(_) => Err(ArrowError::oos("A stream ")), + arrow_format::ipc::MessageHeaderRef::RecordBatch(batch) => { + // read the block that makes up the record batch into a buffer + data_buffer.clear(); + data_buffer.resize(message.body_length()? as usize, 0); + reader.read_exact(data_buffer).await?; + + read_record_batch( + batch, + &metadata.schema.fields, + &metadata.ipc_schema, + None, + &dictionaries, + metadata.version, + &mut std::io::Cursor::new(data_buffer), + 0, + ) + .map(|x| Some(StreamState::Some((reader, metadata, dictionaries, x)))) + } + arrow_format::ipc::MessageHeaderRef::DictionaryBatch(batch) => { + // read the block that makes up the dictionary batch into a buffer + let mut buf = vec![0; message.body_length()? as usize]; + reader.read_exact(&mut buf).await?; + + let mut dict_reader = std::io::Cursor::new(buf); + + read_dictionary( + batch, + &metadata.schema.fields, + &metadata.ipc_schema, + &mut dictionaries, + &mut dict_reader, + 0, + )?; + + // read the next message until we encounter a Chunk> message + Ok(Some(StreamState::Waiting((reader, metadata, dictionaries)))) + } + t => Err(ArrowError::OutOfSpec(format!( + "Reading types other than record batches not yet supported, unable to read {:?} ", + t + ))), + } +} + +/// Reads the next item, yielding `None` if the stream is done, +/// and a [`StreamState`] otherwise. +async fn maybe_next( + reader: R, + metadata: StreamMetadata, + dictionaries: Dictionaries, +) -> Result>> { + _read_next(reader, metadata, dictionaries, &mut vec![], &mut vec![]).await +} + +/// Arrow Stream reader. +/// +/// A [`Stream`] over an Arrow stream that yields a result of [`StreamState`]s. +/// This is the recommended way to read an arrow stream (by iterating over its data). +/// +/// For a more thorough walkthrough consult [this example](https://github.com/jorgecarleitao/arrow2/tree/main/examples/ipc_pyarrow). +pub struct AsyncStreamReader { + metadata: StreamMetadata, + future: Option>>>>, +} + +impl AsyncStreamReader { + /// Creates a new [`AsyncStreamReader`] + pub fn new(reader: R, metadata: StreamMetadata) -> Self { + let future = Some(Box::pin(maybe_next(reader, metadata.clone(), Default::default())) as _); + Self { metadata, future } + } + + /// Return the schema of the stream + pub fn metadata(&self) -> &StreamMetadata { + &self.metadata + } +} + +impl Stream for AsyncStreamReader { + type Item = Result>>; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + use std::pin::Pin; + use std::task::Poll; + let me = Pin::into_inner(self); + + match &mut me.future { + Some(fut) => match fut.as_mut().poll(cx) { + Poll::Ready(Ok(None)) => { + me.future = None; + Poll::Ready(None) + } + Poll::Ready(Ok(Some(StreamState::Some(( + reader, + metadata, + dictionaries, + batch, + ))))) => { + me.future = Some(Box::pin(maybe_next(reader, metadata, dictionaries))); + Poll::Ready(Some(Ok(batch))) + } + Poll::Ready(Ok(Some(StreamState::Waiting(_)))) => Poll::Pending, + Poll::Ready(Err(err)) => { + me.future = None; + Poll::Ready(Some(Err(err))) + } + Poll::Pending => Poll::Pending, + }, + None => Poll::Ready(None), + } + } +} diff --git a/tests/it/io/ipc/mod.rs b/tests/it/io/ipc/mod.rs index 6dafdaed47c..6464685561b 100644 --- a/tests/it/io/ipc/mod.rs +++ b/tests/it/io/ipc/mod.rs @@ -6,3 +6,6 @@ pub use common::read_gzip_json; #[cfg(feature = "io_ipc_write_async")] mod write_async; + +//#[cfg(feature = "io_ipc_read_async")] +mod read_stream_async; diff --git a/tests/it/io/ipc/read_stream_async.rs b/tests/it/io/ipc/read_stream_async.rs new file mode 100644 index 00000000000..427f9dd2570 --- /dev/null +++ b/tests/it/io/ipc/read_stream_async.rs @@ -0,0 +1,46 @@ +use futures::io::Cursor as AsyncCursor; +use futures::StreamExt; +use tokio::fs::File; +use tokio_util::compat::*; + +use arrow2::error::Result; +use arrow2::io::ipc::read::stream_async::*; + +use crate::io::ipc::common::read_gzip_json; + +async fn test_file(version: &str, file_name: &str) -> Result<()> { + let testdata = crate::test_util::arrow_test_data(); + let mut file = File::open(format!( + "{}/arrow-ipc-stream/integration/{}/{}.stream", + testdata, version, file_name + )) + .await? + .compat(); + + let metadata = read_stream_metadata_async(&mut file).await?; + let mut reader = AsyncStreamReader::new(file, metadata); + + // read expected JSON output + let (schema, ipc_fields, batches) = read_gzip_json(version, file_name)?; + + assert_eq!(&schema, &reader.metadata().schema); + assert_eq!(&ipc_fields, &reader.metadata().ipc_schema.fields); + + let mut items = vec![]; + while let Some(item) = reader.next().await { + items.push(item?) + } + + batches + .iter() + .zip(items.into_iter()) + .for_each(|(lhs, rhs)| { + assert_eq!(lhs, &rhs); + }); + Ok(()) +} + +#[tokio::test] +async fn write_async() -> Result<()> { + test_file("1.0.0-littleendian", "generated_primitive").await +}