From 1933f85c35b7ee60835196405538f2b886111f24 Mon Sep 17 00:00:00 2001 From: Dexter Duckworth Date: Thu, 17 Feb 2022 16:34:39 -0500 Subject: [PATCH 01/10] Added lifetime parameter to async IPC reader. --- src/io/ipc/read/stream_async.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/io/ipc/read/stream_async.rs b/src/io/ipc/read/stream_async.rs index 9e054cd07ce..bd10ada166d 100644 --- a/src/io/ipc/read/stream_async.rs +++ b/src/io/ipc/read/stream_async.rs @@ -153,12 +153,12 @@ async fn maybe_next( } /// A [`Stream`] over an Arrow IPC stream that asynchronously yields [`Chunk`]s. -pub struct AsyncStreamReader { +pub struct AsyncStreamReader<'a, R: AsyncRead + Unpin + Send + 'a> { metadata: StreamMetadata, - future: Option>>>>, + future: Option>>>>, } -impl AsyncStreamReader { +impl<'a, R: AsyncRead + Unpin + Send + 'a> AsyncStreamReader<'a, R> { /// Creates a new [`AsyncStreamReader`] pub fn new(reader: R, metadata: StreamMetadata) -> Self { let state = ReadState { @@ -178,7 +178,7 @@ impl AsyncStreamReader { } } -impl Stream for AsyncStreamReader { +impl<'a, R: AsyncRead + Unpin + Send> Stream for AsyncStreamReader<'a, R> { type Item = Result>>; fn poll_next( From d66f04249620234033940af369582b7bcafdfaf3 Mon Sep 17 00:00:00 2001 From: Dexter Duckworth Date: Thu, 17 Feb 2022 17:29:27 -0500 Subject: [PATCH 02/10] Added sink implementation for writing IPC streams. --- src/io/ipc/write/mod.rs | 3 + src/io/ipc/write/sink.rs | 130 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 133 insertions(+) create mode 100644 src/io/ipc/write/sink.rs diff --git a/src/io/ipc/write/mod.rs b/src/io/ipc/write/mod.rs index 6331e9dc9a0..d64e96adbbd 100644 --- a/src/io/ipc/write/mod.rs +++ b/src/io/ipc/write/mod.rs @@ -18,6 +18,9 @@ mod common_async; #[cfg(feature = "io_ipc_write_async")] #[cfg_attr(docsrs, doc(cfg(feature = "io_ipc_write_async")))] pub mod stream_async; +#[cfg(feature = "io_ipc_write_async")] +#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc_write_async")))] +pub mod sink; use crate::datatypes::{DataType, Field}; diff --git a/src/io/ipc/write/sink.rs b/src/io/ipc/write/sink.rs new file mode 100644 index 00000000000..80075274152 --- /dev/null +++ b/src/io/ipc/write/sink.rs @@ -0,0 +1,130 @@ +//! Sink interface for writing arrow streams. + +use super::{stream_async::StreamWriter, WriteOptions}; +use crate::{array::Array, chunk::Chunk, datatypes::Schema, error::ArrowError, io::ipc::IpcField}; +use futures::{future::BoxFuture, AsyncWrite, FutureExt, Sink}; +use std::{pin::Pin, sync::Arc, task::Poll}; + +/// A sink that writes array [`chunks`](Chunk) to an async writer. +pub struct IpcSink<'a, W: AsyncWrite + Unpin + Send + 'a> { + sink: Option>, + task: Option>, ArrowError>>>, + schema: Arc, + ipc_fields: Arc>>, +} + +impl<'a, W> IpcSink<'a, W> +where + W: AsyncWrite + Unpin + Send + 'a, +{ + /// Create a new [`IpcSink`]. + pub fn new( + writer: W, + schema: Schema, + ipc_fields: Option<&[IpcField]>, + write_options: WriteOptions, + ) -> Self { + let mut sink = StreamWriter::new(writer, write_options); + let schema = Arc::new(schema); + let s = schema.clone(); + let ipc_fields = Arc::new(ipc_fields.map(|f| f.to_vec())); + let f = ipc_fields.clone(); + let task = Some( + async move { + sink.start(&s, f.as_deref()).await?; + Ok(Some(sink)) + } + .boxed(), + ); + Self { + sink: None, + task, + schema, + ipc_fields, + } + } + + fn poll_complete(&mut self, cx: &mut std::task::Context<'_>) -> Poll> { + if let Some(task) = &mut self.task { + match futures::ready!(task.poll_unpin(cx)) { + Ok(sink) => { + self.sink = sink; + self.task = None; + Poll::Ready(Ok(())) + } + Err(error) => { + self.task = None; + Poll::Ready(Err(error)) + } + } + } else { + Poll::Ready(Ok(())) + } + } +} + +impl<'a, W> Sink>> for IpcSink<'a, W> +where + W: AsyncWrite + Unpin + Send, +{ + type Error = ArrowError; + + fn poll_ready( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + self.get_mut().poll_complete(cx) + } + + fn start_send(self: Pin<&mut Self>, item: Chunk>) -> Result<(), Self::Error> { + let this = self.get_mut(); + if let Some(mut sink) = this.sink.take() { + let schema = this.schema.clone(); + let fields = this.ipc_fields.clone(); + this.task = Some( + async move { + sink.write(&item, &schema, fields.as_deref()).await?; + Ok(Some(sink)) + } + .boxed(), + ); + Ok(()) + } else { + Err(ArrowError::Io(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "writer closed".to_string(), + ))) + } + } + + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + self.get_mut().poll_complete(cx) + } + + fn poll_close( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + let this = self.get_mut(); + match this.poll_complete(cx) { + Poll::Ready(Ok(())) => { + if let Some(mut sink) = this.sink.take() { + this.task = Some( + async move { + sink.finish().await?; + Ok(None) + } + .boxed(), + ); + this.poll_complete(cx) + } else { + Poll::Ready(Ok(())) + } + } + res => res, + } + } +} From 33f1022f6b990414d71465cfd2ceb44b75c9f427 Mon Sep 17 00:00:00 2001 From: Dexter Duckworth Date: Thu, 17 Feb 2022 17:58:22 -0500 Subject: [PATCH 03/10] Added sink implementation for writing Parquet streams. --- src/io/parquet/write/mod.rs | 2 + src/io/parquet/write/sink.rs | 134 +++++++++++++++++++++++++++++++++++ 2 files changed, 136 insertions(+) create mode 100644 src/io/parquet/write/sink.rs diff --git a/src/io/parquet/write/mod.rs b/src/io/parquet/write/mod.rs index 5fec264cfd3..863777bee32 100644 --- a/src/io/parquet/write/mod.rs +++ b/src/io/parquet/write/mod.rs @@ -11,6 +11,7 @@ mod schema; mod stream; mod utf8; mod utils; +mod sink; use crate::array::*; use crate::bitmap::Bitmap; @@ -40,6 +41,7 @@ pub use file::FileWriter; pub use row_group::{row_group_iter, RowGroupIterator}; pub use schema::to_parquet_type; pub use stream::FileStreamer; +pub use sink::ParquetSink; pub(self) fn decimal_length_from_precision(precision: usize) -> usize { // digits = floor(log_10(2^(8*n - 1) - 1)) diff --git a/src/io/parquet/write/sink.rs b/src/io/parquet/write/sink.rs new file mode 100644 index 00000000000..77c96cf5071 --- /dev/null +++ b/src/io/parquet/write/sink.rs @@ -0,0 +1,134 @@ +use crate::{ + array::Array, + chunk::Chunk, + datatypes::Schema, + error::ArrowError, + io::parquet::write::{Encoding, FileStreamer, SchemaDescriptor, WriteOptions}, +}; +use futures::{future::BoxFuture, AsyncWrite, FutureExt, Sink, TryFutureExt}; +use std::{pin::Pin, sync::Arc, task::Poll}; + +/// Sink that writes array [`chunks`](Chunk) to an async writer. +pub struct ParquetSink<'a, W: AsyncWrite + Send + Unpin> { + writer: Option>, + task: Option>, ArrowError>>>, + options: WriteOptions, + encoding: Vec, + schema: SchemaDescriptor, +} + +impl<'a, W> ParquetSink<'a, W> +where + W: AsyncWrite + Send + Unpin + 'a, +{ + /// Create a new sink that writes arrays to the provided `writer`. + /// # Error + /// If the Arrow schema can't be converted to a valid Parquet schema. + pub fn try_new( + writer: W, + schema: Schema, + encoding: Vec, + options: WriteOptions, + ) -> Result { + let mut writer = FileStreamer::try_new(writer, schema.clone(), options)?; + let schema = crate::io::parquet::write::to_parquet_schema(&schema)?; + let task = Some( + async move { + writer.start().await?; + Ok(Some(writer)) + } + .boxed(), + ); + Ok(Self { + writer: None, + task, + options, + schema, + encoding, + }) + } + + fn poll_complete( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + if let Some(task) = &mut self.task { + match futures::ready!(task.poll_unpin(cx)) { + Ok(writer) => { + self.task = None; + self.writer = writer; + Poll::Ready(Ok(())) + } + Err(error) => { + self.task = None; + Poll::Ready(Err(error)) + } + } + } else { + Poll::Ready(Ok(())) + } + } +} + +impl<'a, W> Sink>> for ParquetSink<'a, W> +where + W: AsyncWrite + Send + Unpin + 'a, +{ + type Error = ArrowError; + + fn start_send(self: Pin<&mut Self>, item: Chunk>) -> Result<(), Self::Error> { + let this = self.get_mut(); + if let Some(mut writer) = this.writer.take() { + let count = item.len(); + let rows = crate::io::parquet::write::row_group_iter( + item, + this.encoding.clone(), + this.schema.columns().to_vec(), + this.options, + ); + this.task = Some(Box::pin(async move { + writer.write(rows, count).await?; + Ok(Some(writer)) + })); + Ok(()) + } else { + Err(ArrowError::Io(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "writer closed".to_string(), + ))) + } + } + + fn poll_ready( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.get_mut().poll_complete(cx) + } + + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.get_mut().poll_complete(cx) + } + + fn poll_close( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let this = self.get_mut(); + match futures::ready!(this.poll_complete(cx)) { + Ok(()) => { + let writer = this.writer.take(); + if let Some(writer) = writer { + this.task = Some(writer.end(None).map_ok(|_| None).boxed()); + this.poll_complete(cx) + } else { + Poll::Ready(Ok(())) + } + } + Err(error) => Poll::Ready(Err(error)), + } + } +} From f7a6fd0ffb0a166fb2ca613741545ec3c45b5249 Mon Sep 17 00:00:00 2001 From: Dexter Duckworth Date: Fri, 25 Feb 2022 14:33:16 -0500 Subject: [PATCH 04/10] Added IPC file stream and sink implementations. --- Cargo.toml | 2 +- src/io/ipc/read/file_async.rs | 260 +++++++++++++++++++++++++++++++++ src/io/ipc/read/mod.rs | 4 + src/io/ipc/read/reader.rs | 6 +- src/io/ipc/write/file_async.rs | 180 +++++++++++++++++++++++ src/io/ipc/write/mod.rs | 4 + 6 files changed, 452 insertions(+), 4 deletions(-) create mode 100644 src/io/ipc/read/file_async.rs create mode 100644 src/io/ipc/write/file_async.rs diff --git a/Cargo.toml b/Cargo.toml index db0e2e89bc6..172a65cf836 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -133,7 +133,7 @@ io_csv_write = ["csv", "streaming-iterator", "lexical-core"] io_json = ["serde", "serde_json", "streaming-iterator", "fallible-streaming-iterator", "indexmap", "lexical-core"] io_ipc = ["arrow-format"] io_ipc_write_async = ["io_ipc", "futures"] -io_ipc_read_async = ["io_ipc", "futures"] +io_ipc_read_async = ["io_ipc", "futures", "async-stream"] io_ipc_compression = ["lz4", "zstd"] io_flight = ["io_ipc", "arrow-format/flight-data"] # base64 + io_ipc because arrow schemas are stored as base64-encoded ipc format. diff --git a/src/io/ipc/read/file_async.rs b/src/io/ipc/read/file_async.rs new file mode 100644 index 00000000000..4ee8f45fb48 --- /dev/null +++ b/src/io/ipc/read/file_async.rs @@ -0,0 +1,260 @@ +//! Async reader for Arrow IPC files +use std::io::SeekFrom; +use std::sync::Arc; + +use arrow_format::ipc::BlockRef; +use arrow_format::ipc::FooterRef; +use arrow_format::ipc::MessageHeaderRef; +use arrow_format::ipc::MessageRef; +use arrow_format::ipc::planus::ReadAsRoot; +use arrow_format::ipc::planus::Vector; +use futures::AsyncSeek; +use futures::AsyncSeekExt; +use futures::AsyncRead; +use futures::AsyncReadExt; +use futures::Stream; +use futures::stream::BoxStream; +use futures::StreamExt; + +use crate::array::*; +use crate::chunk::Chunk; +use crate::error::{ArrowError, Result}; +use crate::io::ipc::ARROW_MAGIC; +use crate::datatypes::Schema; +use crate::io::ipc::IpcSchema; +use crate::datatypes::Field; + +use super::super::CONTINUATION_MARKER; +use super::FileMetadata; +use super::common::{read_dictionary, read_record_batch}; +use super::reader::get_serialized_batch; +use super::schema::deserialize_stream_metadata; +use super::Dictionaries; +use super::schema::fb_to_schema; + +/// Async reader for Arrow IPC files +pub struct FileStream<'a> { + stream: BoxStream<'a, Result>>>, + metadata: FileMetadata, + schema: Schema, +} + +impl<'a> FileStream<'a> { + /// Create a new IPC file reader. + pub fn new(reader: R, metadata: FileMetadata, projection: Option>) -> Self + where R: AsyncRead + AsyncSeek + Unpin + Send + 'a + { + let schema = if let Some(projection) = projection.as_ref() { + projection.windows(2).for_each(|x| assert!( + x[0] < x[1], + "IPC projection must be ordered and non-overlapping", + )); + let fields = projection + .iter() + .map(|&x| metadata.schema.fields[x].clone()) + .collect::>(); + Schema { + fields, + metadata: metadata.schema.metadata.clone(), + } + } else { + metadata.schema.clone() + }; + + let stream = Self::stream(reader, metadata.clone(), projection); + Self { + stream, + metadata, + schema, + } + } + + /// Get the metadata from the IPC file. + pub fn metadata(&self) -> &FileMetadata { + &self.metadata + } + + /// Get the projected schema from the IPC file. + pub fn schema(&self) -> &Schema { + &self.schema + } + + fn stream(mut reader: R, metadata: FileMetadata, projection: Option>) -> BoxStream<'a, Result>>> + where R: AsyncRead + AsyncSeek + Unpin + Send + 'a + { + async_stream::try_stream! { + let mut meta_buffer = vec![]; + let mut block_buffer = vec![]; + for block in 0..metadata.blocks.len() { + let chunk = read_batch( + &mut reader, + &metadata, + projection.as_deref(), + block, + &mut meta_buffer, + &mut block_buffer, + ).await?; + yield chunk; + } + }.boxed() + } +} + +impl<'a> Stream for FileStream<'a> { + type Item = Result>>; + + fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll> { + self.get_mut().stream.poll_next_unpin(cx) + } +} + +/// Read the metadata from an IPC file. +pub async fn read_file_metadata_async(reader: &mut R) -> Result + where R: AsyncRead + AsyncSeek + Unpin +{ + // Check header + let mut magic = [0; 6]; + reader.read_exact(&mut magic).await?; + if magic != ARROW_MAGIC { + return Err(ArrowError::OutOfSpec("file does not contain correct Arrow header".to_string())); + } + // Check footer + reader.seek(SeekFrom::End(-6)).await?; + reader.read_exact(&mut magic).await?; + if magic != ARROW_MAGIC { + return Err(ArrowError::OutOfSpec("file does not contain correct Arrow footer".to_string())); + } + // Get footer size + let mut footer_size = [0; 4]; + reader.seek(SeekFrom::End(-10)).await?; + reader.read_exact(&mut footer_size).await?; + let footer_size = i32::from_le_bytes(footer_size); + // Read footer + let mut footer = vec![0; footer_size as usize]; + reader.seek(SeekFrom::End(-10 - footer_size as i64)).await?; + reader.read_exact(&mut footer).await?; + let footer = FooterRef::read_as_root(&footer[..]) + .map_err(|err| ArrowError::OutOfSpec(format!("unable to get root as footer: {:?}", err)))?; + + let blocks = footer.record_batches()? + .ok_or_else(|| ArrowError::OutOfSpec("unable to get record batches from footer".to_string()))?; + let schema = footer.schema()? + .ok_or_else(|| ArrowError::OutOfSpec("unable to get schema from footer".to_string()))?; + let (schema, ipc_schema) = fb_to_schema(schema)?; + let dictionary_blocks = footer.dictionaries()?; + let dictionaries = if let Some(blocks) = dictionary_blocks { + read_dictionaries(reader, &schema.fields[..], &ipc_schema, blocks).await? + } else { + Default::default() + }; + + Ok(FileMetadata { + schema, + ipc_schema, + blocks: blocks.iter().map(|block| Ok(block.try_into()?)).collect::>>()?, + dictionaries, + }) +} + +async fn read_dictionaries( + reader: &mut R, + fields: &[Field], + ipc_schema: &IpcSchema, + blocks: Vector<'_, BlockRef<'_>>, +) -> Result + where R: AsyncRead + AsyncSeek + Unpin, +{ + let mut dictionaries = Default::default(); + let mut data = vec![]; + let mut buffer = vec![]; + + for block in blocks { + let offset = block.offset() as u64; + read_dictionary_message(reader, offset, &mut data).await?; + + let message = MessageRef::read_as_root(&data) + .map_err(|err| ArrowError::OutOfSpec(format!("unable to get root as message: {:?}", err)))?; + let header = message.header()? + .ok_or_else(|| ArrowError::oos("message must have a header"))?; + match header { + MessageHeaderRef::DictionaryBatch(batch) => { + buffer.clear(); + buffer.resize(block.body_length() as usize, 0); + reader.read_exact(&mut buffer).await?; + let mut cursor = std::io::Cursor::new(&mut buffer); + read_dictionary( + batch, + fields, + ipc_schema, + &mut dictionaries, + &mut cursor, + 0, + )?; + }, + other => return Err(ArrowError::OutOfSpec(format!( + "expected DictionaryBatch in dictionary blocks, found {:?}", + other, + ))), + } + } + Ok(dictionaries) +} + +async fn read_dictionary_message(reader: &mut R, offset: u64, data: &mut Vec) -> Result<()> + where R: AsyncRead + AsyncSeek + Unpin, +{ + let mut message_size = [0; 4]; + reader.seek(SeekFrom::Start(offset)).await?; + reader.read_exact(&mut message_size).await?; + if message_size == CONTINUATION_MARKER { + reader.read_exact(&mut message_size).await?; + } + let footer_size = i32::from_le_bytes(message_size); + data.clear(); + data.resize(footer_size as usize, 0); + reader.read_exact(data).await?; + + Ok(()) +} + +async fn read_batch( + reader: &mut R, + metadata: &FileMetadata, + projection: Option<&[usize]>, + block: usize, + meta_buffer: &mut Vec, + block_buffer: &mut Vec, +) -> Result>> + where R: AsyncRead + AsyncSeek + Unpin, +{ + let block = metadata.blocks[block]; + reader.seek(SeekFrom::Start(block.offset as u64)).await?; + let mut meta_buf = [0; 4]; + reader.read_exact(&mut meta_buf).await?; + if meta_buf == CONTINUATION_MARKER { + reader.read_exact(&mut meta_buf).await?; + } + let meta_len = i32::from_le_bytes(meta_buf) as usize; + meta_buffer.clear(); + meta_buffer.resize(meta_len, 0); + reader.read_exact(meta_buffer).await?; + + let message = MessageRef::read_as_root(&meta_buffer[..]) + .map_err(|err| ArrowError::oos(format!("unable to parse message: {:?}", err)))?; + let batch = get_serialized_batch(&message)?; + block_buffer.clear(); + block_buffer.resize(message.body_length()? as usize, 0); + reader.read_exact(block_buffer).await?; + let mut cursor = std::io::Cursor::new(block_buffer); + let chunk = read_record_batch( + batch, + &metadata.schema.fields, + &metadata.ipc_schema, + projection, + &metadata.dictionaries, + message.version()?, + &mut cursor, + 0, + )?; + Ok(chunk) +} diff --git a/src/io/ipc/read/mod.rs b/src/io/ipc/read/mod.rs index 3a45d4ecac6..207c33329fc 100644 --- a/src/io/ipc/read/mod.rs +++ b/src/io/ipc/read/mod.rs @@ -20,6 +20,10 @@ mod stream; #[cfg_attr(docsrs, doc(cfg(feature = "io_ipc_read_async")))] pub mod stream_async; +#[cfg(feature = "io_ipc_read_async")] +#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc_read_async")))] +pub mod file_async; + pub use common::{read_dictionary, read_record_batch}; pub use reader::{read_file_metadata, FileMetadata, FileReader}; pub use schema::deserialize_schema; diff --git a/src/io/ipc/read/reader.rs b/src/io/ipc/read/reader.rs index 3254eb4a7ef..d31e975d666 100644 --- a/src/io/ipc/read/reader.rs +++ b/src/io/ipc/read/reader.rs @@ -26,10 +26,10 @@ pub struct FileMetadata { /// The blocks in the file /// /// A block indicates the regions in the file to read to get data - blocks: Vec, + pub(super) blocks: Vec, /// Dictionaries associated to each dict_id - dictionaries: Dictionaries, + pub(super) dictionaries: Dictionaries, } /// Arrow File reader @@ -166,7 +166,7 @@ pub fn read_file_metadata(reader: &mut R) -> Result( +pub(super) fn get_serialized_batch<'a>( message: &'a arrow_format::ipc::MessageRef, ) -> Result> { let header = message.header()?.ok_or_else(|| { diff --git a/src/io/ipc/write/file_async.rs b/src/io/ipc/write/file_async.rs new file mode 100644 index 00000000000..ffe76108367 --- /dev/null +++ b/src/io/ipc/write/file_async.rs @@ -0,0 +1,180 @@ +//! Async writer for IPC files. + +use std::sync::Arc; +use std::task::Poll; +use arrow_format::ipc::planus::Builder; +use arrow_format::ipc::{Block, Footer, MetadataVersion}; +use futures::future::BoxFuture; +use futures::{AsyncWrite, Sink, AsyncWriteExt, FutureExt}; +use super::super::IpcField; +pub use super::common::WriteOptions; +use super::common::{encode_chunk, DictionaryTracker, EncodedData}; +use super::common_async::{write_continuation, write_message}; +use super::schema::serialize_schema; +use super::{default_ipc_fields, schema_to_bytes}; +use crate::array::Array; +use crate::chunk::Chunk; +use crate::datatypes::*; +use crate::error::{ArrowError, Result}; +use crate::io::ipc::ARROW_MAGIC; + +/// Async Arrow file writer +pub struct FileSink<'a, W: AsyncWrite + Unpin + Send + 'a> { + writer: Option, + task: Option, Vec, Option)>>>, + /// IPC write options + options: WriteOptions, + /// Keeps track of dictionaries that have been written + dictionary_tracker: DictionaryTracker, + offset: usize, + fields: Vec, + record_blocks: Vec, + dictionary_blocks: Vec, + schema: Schema, +} + +impl<'a, W> FileSink<'a, W> where W: AsyncWrite + Unpin + Send + 'a { + /// Create a new file writer. + pub fn new(writer: W, schema: &Schema, ipc_fields: Option>, options: WriteOptions) -> Self { + let fields = ipc_fields.unwrap_or_else(|| default_ipc_fields(&schema.fields)); + let encoded = EncodedData { + ipc_message: schema_to_bytes(schema, &fields), + arrow_data: vec![], + }; + let task = Some(Self::start(writer, encoded).boxed()); + Self { + writer: None, + task, + options, + fields, + offset: 0, + schema: schema.clone(), + dictionary_tracker: DictionaryTracker::new(true), + record_blocks: vec![], + dictionary_blocks: vec![], + } + } + + async fn start(mut writer: W, encoded: EncodedData) -> Result<(usize, Option, Vec, Option)> { + writer.write_all(&ARROW_MAGIC[..]).await?; + writer.write_all(&[0, 0]).await?; + let (meta, data) = write_message(&mut writer, encoded).await?; + + Ok((meta + data + 8, None, vec![], Some(writer))) + } + + async fn write(mut writer: W, mut offset: usize, record: EncodedData, dictionaries: Vec) -> Result<(usize, Option, Vec, Option)> { + let mut dict_blocks = vec![]; + for dict in dictionaries { + let (meta, data) = write_message(&mut writer, dict).await?; + let block = Block { + offset: offset as i64, + meta_data_length: meta as i32, + body_length: data as i64, + }; + dict_blocks.push(block); + offset += meta + data; + } + let (meta, data) = write_message(&mut writer, record).await?; + let block = Block { + offset: offset as i64, + meta_data_length: meta as i32, + body_length: data as i64, + }; + offset += meta + data; + Ok((offset, Some(block), dict_blocks, Some(writer))) + } + + async fn finish(mut writer: W, footer: Footer) -> Result<(usize, Option, Vec, Option)> { + write_continuation(&mut writer, 0).await?; + let footer = { + let mut builder = Builder::new(); + builder.finish(&footer, None).to_owned() + }; + writer.write_all(&footer[..]).await?; + writer.write_all(&(footer.len() as i32).to_le_bytes()).await?; + writer.write_all(&ARROW_MAGIC).await?; + writer.close().await?; + + Ok((0, None, vec![], None)) + } + + fn poll_write(&mut self, cx: &mut std::task::Context<'_>) -> Poll> { + if let Some(task) = &mut self.task { + match futures::ready!(task.poll_unpin(cx)) { + Ok((offset, record, mut dictionaries, writer)) => { + self.task = None; + self.writer = writer; + self.offset = offset; + if let Some(block) = record { + self.record_blocks.push(block); + } + self.dictionary_blocks.append(&mut dictionaries); + Poll::Ready(Ok(())) + }, + Err(error) => { + self.task = None; + Poll::Ready(Err(error)) + }, + } + } else { + Poll::Ready(Ok(())) + } + } +} + +impl<'a, W> Sink>> for FileSink<'a, W> where W: AsyncWrite + Unpin + Send + 'a { + type Error = ArrowError; + + fn poll_ready(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll> { + self.get_mut().poll_write(cx) + } + + fn start_send(self: std::pin::Pin<&mut Self>, item: Chunk>) -> Result<()> { + let this = self.get_mut(); + + if let Some(writer) = this.writer.take() { + let (dictionaries, record) = encode_chunk( + &item, + &this.fields[..], + &mut this.dictionary_tracker, + &this.options, + )?; + + this.task = Some(Self::write(writer, this.offset, record, dictionaries).boxed()); + Ok(()) + } else { + Err(ArrowError::Io(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "writer is closed", + ))) + } + } + + fn poll_flush(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll> { + self.get_mut().poll_write(cx) + } + + fn poll_close(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll> { + let this = self.get_mut(); + match futures::ready!(this.poll_write(cx)) { + Ok(()) => { + if let Some(writer) = this.writer.take() { + let schema = serialize_schema(&this.schema, &this.fields); + let footer = Footer { + version: MetadataVersion::V5, + schema: Some(Box::new(schema)), + dictionaries: Some(std::mem::take(&mut this.dictionary_blocks)), + record_batches: Some(std::mem::take(&mut this.record_blocks)), + custom_metadata: None, + }; + this.task = Some(Self::finish(writer, footer).boxed()); + this.poll_write(cx) + } else { + Poll::Ready(Ok(())) + } + }, + Err(error) => Poll::Ready(Err(error)), + } + } +} diff --git a/src/io/ipc/write/mod.rs b/src/io/ipc/write/mod.rs index d64e96adbbd..829206eb433 100644 --- a/src/io/ipc/write/mod.rs +++ b/src/io/ipc/write/mod.rs @@ -22,6 +22,10 @@ pub mod stream_async; #[cfg_attr(docsrs, doc(cfg(feature = "io_ipc_write_async")))] pub mod sink; +#[cfg(feature = "io_ipc_write_async")] +#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc_write_async")))] +pub mod file_async; + use crate::datatypes::{DataType, Field}; use super::IpcField; From 7350481c6ea5dbd4827173e6ea25f2480e6c3aa1 Mon Sep 17 00:00:00 2001 From: Dexter Duckworth Date: Wed, 2 Mar 2022 12:29:19 -0500 Subject: [PATCH 05/10] Moved IPC stream sink into stream_async file. --- src/io/ipc/write/mod.rs | 3 - src/io/ipc/write/sink.rs | 130 ------------------------- src/io/ipc/write/stream_async.rs | 162 ++++++++++++++++++++++++++++++- 3 files changed, 160 insertions(+), 135 deletions(-) delete mode 100644 src/io/ipc/write/sink.rs diff --git a/src/io/ipc/write/mod.rs b/src/io/ipc/write/mod.rs index 829206eb433..41abe67ebf2 100644 --- a/src/io/ipc/write/mod.rs +++ b/src/io/ipc/write/mod.rs @@ -18,9 +18,6 @@ mod common_async; #[cfg(feature = "io_ipc_write_async")] #[cfg_attr(docsrs, doc(cfg(feature = "io_ipc_write_async")))] pub mod stream_async; -#[cfg(feature = "io_ipc_write_async")] -#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc_write_async")))] -pub mod sink; #[cfg(feature = "io_ipc_write_async")] #[cfg_attr(docsrs, doc(cfg(feature = "io_ipc_write_async")))] diff --git a/src/io/ipc/write/sink.rs b/src/io/ipc/write/sink.rs deleted file mode 100644 index 80075274152..00000000000 --- a/src/io/ipc/write/sink.rs +++ /dev/null @@ -1,130 +0,0 @@ -//! Sink interface for writing arrow streams. - -use super::{stream_async::StreamWriter, WriteOptions}; -use crate::{array::Array, chunk::Chunk, datatypes::Schema, error::ArrowError, io::ipc::IpcField}; -use futures::{future::BoxFuture, AsyncWrite, FutureExt, Sink}; -use std::{pin::Pin, sync::Arc, task::Poll}; - -/// A sink that writes array [`chunks`](Chunk) to an async writer. -pub struct IpcSink<'a, W: AsyncWrite + Unpin + Send + 'a> { - sink: Option>, - task: Option>, ArrowError>>>, - schema: Arc, - ipc_fields: Arc>>, -} - -impl<'a, W> IpcSink<'a, W> -where - W: AsyncWrite + Unpin + Send + 'a, -{ - /// Create a new [`IpcSink`]. - pub fn new( - writer: W, - schema: Schema, - ipc_fields: Option<&[IpcField]>, - write_options: WriteOptions, - ) -> Self { - let mut sink = StreamWriter::new(writer, write_options); - let schema = Arc::new(schema); - let s = schema.clone(); - let ipc_fields = Arc::new(ipc_fields.map(|f| f.to_vec())); - let f = ipc_fields.clone(); - let task = Some( - async move { - sink.start(&s, f.as_deref()).await?; - Ok(Some(sink)) - } - .boxed(), - ); - Self { - sink: None, - task, - schema, - ipc_fields, - } - } - - fn poll_complete(&mut self, cx: &mut std::task::Context<'_>) -> Poll> { - if let Some(task) = &mut self.task { - match futures::ready!(task.poll_unpin(cx)) { - Ok(sink) => { - self.sink = sink; - self.task = None; - Poll::Ready(Ok(())) - } - Err(error) => { - self.task = None; - Poll::Ready(Err(error)) - } - } - } else { - Poll::Ready(Ok(())) - } - } -} - -impl<'a, W> Sink>> for IpcSink<'a, W> -where - W: AsyncWrite + Unpin + Send, -{ - type Error = ArrowError; - - fn poll_ready( - self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { - self.get_mut().poll_complete(cx) - } - - fn start_send(self: Pin<&mut Self>, item: Chunk>) -> Result<(), Self::Error> { - let this = self.get_mut(); - if let Some(mut sink) = this.sink.take() { - let schema = this.schema.clone(); - let fields = this.ipc_fields.clone(); - this.task = Some( - async move { - sink.write(&item, &schema, fields.as_deref()).await?; - Ok(Some(sink)) - } - .boxed(), - ); - Ok(()) - } else { - Err(ArrowError::Io(std::io::Error::new( - std::io::ErrorKind::UnexpectedEof, - "writer closed".to_string(), - ))) - } - } - - fn poll_flush( - self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { - self.get_mut().poll_complete(cx) - } - - fn poll_close( - self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { - let this = self.get_mut(); - match this.poll_complete(cx) { - Poll::Ready(Ok(())) => { - if let Some(mut sink) = this.sink.take() { - this.task = Some( - async move { - sink.finish().await?; - Ok(None) - } - .boxed(), - ); - this.poll_complete(cx) - } else { - Poll::Ready(Ok(())) - } - } - res => res, - } - } -} diff --git a/src/io/ipc/write/stream_async.rs b/src/io/ipc/write/stream_async.rs index 7da157c1f4b..b580b89e52b 100644 --- a/src/io/ipc/write/stream_async.rs +++ b/src/io/ipc/write/stream_async.rs @@ -1,14 +1,15 @@ //! `async` writing of arrow streams use std::sync::Arc; -use futures::AsyncWrite; - use super::super::IpcField; pub use super::common::WriteOptions; use super::common::{encode_chunk, DictionaryTracker, EncodedData}; use super::common_async::{write_continuation, write_message}; use super::{default_ipc_fields, schema_to_bytes}; +use futures::{future::BoxFuture, AsyncWrite, FutureExt, Sink}; +use std::{pin::Pin, task::Poll}; + use crate::array::Array; use crate::chunk::Chunk; use crate::datatypes::*; @@ -106,3 +107,160 @@ impl StreamWriter { self.writer } } + + +/// A sink that writes array [`chunks`](Chunk) to an async writer. +/// +/// # Examples +/// +/// ``` +/// use std::sync::Arc; +/// use futures::SinkExt; +/// use arrow2::array::{Array, Int32Array}; +/// use arrow2::datatypes::{DataType, Field, Schema}; +/// use arrow2::chunk::Chunk; +/// # use arrow2::io::ipc::write::stream_async::StreamSink; +/// # futures::executor::block_on(async move { +/// let schema = Schema::from(vec![ +/// Field::new("values", DataType::Int32, true), +/// ]); +/// +/// let mut buffer = vec![]; +/// let mut sink = StreamSink::new( +/// &mut buffer, +/// schema, +/// None, +/// Default::default(), +/// ); +/// +/// for i in 0..3 { +/// let values = Int32Array::from(&[Some(i), None]); +/// let chunk = Chunk::new(vec![Arc::new(values) as Arc]); +/// sink.feed(chunk).await?; +/// } +/// sink.close().await?; +/// # arrow2::error::Result::Ok(()) +/// # }).unwrap(); +/// ``` +pub struct StreamSink<'a, W: AsyncWrite + Unpin + Send + 'a> { + sink: Option>, + task: Option>>>>, + schema: Arc, + ipc_fields: Arc>>, +} + +impl<'a, W> StreamSink<'a, W> +where + W: AsyncWrite + Unpin + Send + 'a, +{ + /// Create a new [`StreamSink`]. + pub fn new( + writer: W, + schema: Schema, + ipc_fields: Option<&[IpcField]>, + write_options: WriteOptions, + ) -> Self { + let mut sink = StreamWriter::new(writer, write_options); + let schema = Arc::new(schema); + let s = schema.clone(); + let ipc_fields = Arc::new(ipc_fields.map(|f| f.to_vec())); + let f = ipc_fields.clone(); + let task = Some( + async move { + sink.start(&s, f.as_deref()).await?; + Ok(Some(sink)) + } + .boxed(), + ); + Self { + sink: None, + task, + schema, + ipc_fields, + } + } + + fn poll_complete(&mut self, cx: &mut std::task::Context<'_>) -> Poll> { + if let Some(task) = &mut self.task { + match futures::ready!(task.poll_unpin(cx)) { + Ok(sink) => { + self.sink = sink; + self.task = None; + Poll::Ready(Ok(())) + } + Err(error) => { + self.task = None; + Poll::Ready(Err(error)) + } + } + } else { + Poll::Ready(Ok(())) + } + } +} + +impl<'a, W> Sink>> for StreamSink<'a, W> +where + W: AsyncWrite + Unpin + Send, +{ + type Error = ArrowError; + + fn poll_ready( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + self.get_mut().poll_complete(cx) + } + + fn start_send(self: Pin<&mut Self>, item: Chunk>) -> Result<()> { + let this = self.get_mut(); + if let Some(mut sink) = this.sink.take() { + let schema = this.schema.clone(); + let fields = this.ipc_fields.clone(); + this.task = Some( + async move { + sink.write(&item, &schema, fields.as_deref()).await?; + Ok(Some(sink)) + } + .boxed(), + ); + Ok(()) + } else { + Err(ArrowError::Io(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "writer closed".to_string(), + ))) + } + } + + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + self.get_mut().poll_complete(cx) + } + + fn poll_close( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + let this = self.get_mut(); + match this.poll_complete(cx) { + Poll::Ready(Ok(())) => { + if let Some(mut sink) = this.sink.take() { + this.task = Some( + async move { + sink.finish().await?; + Ok(None) + } + .boxed(), + ); + this.poll_complete(cx) + } else { + Poll::Ready(Ok(())) + } + } + res => res, + } + } +} From 1f64b98e58772e9c8de3492d5dea789eb6f76647 Mon Sep 17 00:00:00 2001 From: Dexter Duckworth Date: Wed, 2 Mar 2022 12:47:45 -0500 Subject: [PATCH 06/10] Added test case to IPC stream sink. --- src/io/ipc/write/stream_async.rs | 69 ++++++++++++++++++++++++++------ 1 file changed, 56 insertions(+), 13 deletions(-) diff --git a/src/io/ipc/write/stream_async.rs b/src/io/ipc/write/stream_async.rs index b580b89e52b..ab3cfd58c35 100644 --- a/src/io/ipc/write/stream_async.rs +++ b/src/io/ipc/write/stream_async.rs @@ -108,7 +108,6 @@ impl StreamWriter { } } - /// A sink that writes array [`chunks`](Chunk) to an async writer. /// /// # Examples @@ -205,10 +204,7 @@ where { type Error = ArrowError; - fn poll_ready( - self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { + fn poll_ready(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll> { self.get_mut().poll_complete(cx) } @@ -233,17 +229,11 @@ where } } - fn poll_flush( - self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { + fn poll_flush(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll> { self.get_mut().poll_complete(cx) } - fn poll_close( - self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { + fn poll_close(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll> { let this = self.get_mut(); match this.poll_complete(cx) { Poll::Ready(Ok(())) => { @@ -264,3 +254,56 @@ where } } } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use futures::{SinkExt, TryStreamExt}; + + use crate::{ + array::{Array, Float32Array, Int32Array}, + chunk::Chunk, + datatypes::{DataType, Field, Schema}, + io::ipc::read::stream_async::{read_stream_metadata_async, AsyncStreamReader}, + }; + + use super::StreamSink; + + // Verify round trip data integrity when using async read + write. + #[test] + fn test_stream_sink_roundtrip() { + futures::executor::block_on(async move { + let mut data = vec![]; + for i in 0..5 { + let a1 = Int32Array::from(&[Some(i), None, Some(i + 1)]); + let a2 = Float32Array::from(&[None, Some(i as f32), None]); + let chunk = Chunk::new(vec![ + Arc::new(a1) as Arc, + Arc::new(a2) as Arc, + ]); + data.push(chunk); + } + let schema = Schema::from(vec![ + Field::new("a1", DataType::Int32, true), + Field::new("a2", DataType::Float32, true), + ]); + + let mut buffer = vec![]; + let mut sink = StreamSink::new(&mut buffer, schema, None, Default::default()); + for chunk in &data { + sink.feed(chunk.clone()).await.unwrap(); + } + sink.close().await.unwrap(); + drop(sink); + + let mut reader = &buffer[..]; + let metadata = read_stream_metadata_async(&mut reader).await.unwrap(); + let stream = AsyncStreamReader::new(reader, metadata); + let out = stream.try_collect::>().await.unwrap(); + for i in 0..5 { + assert_eq!(data[i], out[i]); + } + }) + } +} From f5b02ed2870427350af43d615d8d462ea78df6e2 Mon Sep 17 00:00:00 2001 From: Dexter Duckworth Date: Wed, 2 Mar 2022 13:11:17 -0500 Subject: [PATCH 07/10] Added test for async IPC file read/write. --- src/io/ipc/write/file_async.rs | 168 +++++++++++++++++++++++++------ src/io/ipc/write/stream_async.rs | 18 ++-- 2 files changed, 149 insertions(+), 37 deletions(-) diff --git a/src/io/ipc/write/file_async.rs b/src/io/ipc/write/file_async.rs index ffe76108367..4ea422f5847 100644 --- a/src/io/ipc/write/file_async.rs +++ b/src/io/ipc/write/file_async.rs @@ -1,14 +1,6 @@ //! Async writer for IPC files. -use std::sync::Arc; -use std::task::Poll; -use arrow_format::ipc::planus::Builder; -use arrow_format::ipc::{Block, Footer, MetadataVersion}; -use futures::future::BoxFuture; -use futures::{AsyncWrite, Sink, AsyncWriteExt, FutureExt}; -use super::super::IpcField; -pub use super::common::WriteOptions; -use super::common::{encode_chunk, DictionaryTracker, EncodedData}; +use super::common::{encode_chunk, DictionaryTracker, EncodedData, WriteOptions}; use super::common_async::{write_continuation, write_message}; use super::schema::serialize_schema; use super::{default_ipc_fields, schema_to_bytes}; @@ -16,15 +8,56 @@ use crate::array::Array; use crate::chunk::Chunk; use crate::datatypes::*; use crate::error::{ArrowError, Result}; -use crate::io::ipc::ARROW_MAGIC; +use crate::io::ipc::{IpcField, ARROW_MAGIC}; +use arrow_format::ipc::{planus::Builder, Block, Footer, MetadataVersion}; +use futures::{future::BoxFuture, AsyncWrite, AsyncWriteExt, FutureExt, Sink}; +use std::sync::Arc; +use std::task::Poll; + +type WriteOutput = (usize, Option, Vec, Option); -/// Async Arrow file writer +/// Sink that writes array [`chunks`](Chunk) as an IPC file. +/// +/// The file header is automatically written before writing the first chunk, and the file footer is +/// automatically written when the sink is closed. +/// +/// The sink uses the same `ipc_fields` projection and `write_options` for each chunk. +/// +/// # Examples +/// +/// ``` +/// use std::sync::Arc; +/// use futures::SinkExt; +/// use arrow2::array::{Array, Int32Array}; +/// use arrow2::datatypes::{DataType, Field, Schema}; +/// use arrow2::chunk::Chunk; +/// # use arrow2::io::ipc::write::file_async::FileSink; +/// # futures::executor::block_on(async move { +/// let schema = Schema::from(vec![ +/// Field::new("values", DataType::Int32, true), +/// ]); +/// +/// let mut buffer = vec![]; +/// let mut sink = FileSink::new( +/// &mut buffer, +/// schema, +/// None, +/// Default::default(), +/// ); +/// +/// for i in 0..3 { +/// let values = Int32Array::from(&[Some(i), None]); +/// let chunk = Chunk::new(vec![Arc::new(values) as Arc]); +/// sink.feed(chunk).await?; +/// } +/// sink.close().await?; +/// # arrow2::error::Result::Ok(()) +/// # }).unwrap(); +/// ``` pub struct FileSink<'a, W: AsyncWrite + Unpin + Send + 'a> { writer: Option, - task: Option, Vec, Option)>>>, - /// IPC write options + task: Option>>>, options: WriteOptions, - /// Keeps track of dictionaries that have been written dictionary_tracker: DictionaryTracker, offset: usize, fields: Vec, @@ -33,12 +66,20 @@ pub struct FileSink<'a, W: AsyncWrite + Unpin + Send + 'a> { schema: Schema, } -impl<'a, W> FileSink<'a, W> where W: AsyncWrite + Unpin + Send + 'a { +impl<'a, W> FileSink<'a, W> +where + W: AsyncWrite + Unpin + Send + 'a, +{ /// Create a new file writer. - pub fn new(writer: W, schema: &Schema, ipc_fields: Option>, options: WriteOptions) -> Self { + pub fn new( + writer: W, + schema: Schema, + ipc_fields: Option>, + options: WriteOptions, + ) -> Self { let fields = ipc_fields.unwrap_or_else(|| default_ipc_fields(&schema.fields)); let encoded = EncodedData { - ipc_message: schema_to_bytes(schema, &fields), + ipc_message: schema_to_bytes(&schema, &fields), arrow_data: vec![], }; let task = Some(Self::start(writer, encoded).boxed()); @@ -48,14 +89,14 @@ impl<'a, W> FileSink<'a, W> where W: AsyncWrite + Unpin + Send + 'a { options, fields, offset: 0, - schema: schema.clone(), + schema, dictionary_tracker: DictionaryTracker::new(true), record_blocks: vec![], dictionary_blocks: vec![], } } - async fn start(mut writer: W, encoded: EncodedData) -> Result<(usize, Option, Vec, Option)> { + async fn start(mut writer: W, encoded: EncodedData) -> Result> { writer.write_all(&ARROW_MAGIC[..]).await?; writer.write_all(&[0, 0]).await?; let (meta, data) = write_message(&mut writer, encoded).await?; @@ -63,7 +104,12 @@ impl<'a, W> FileSink<'a, W> where W: AsyncWrite + Unpin + Send + 'a { Ok((meta + data + 8, None, vec![], Some(writer))) } - async fn write(mut writer: W, mut offset: usize, record: EncodedData, dictionaries: Vec) -> Result<(usize, Option, Vec, Option)> { + async fn write( + mut writer: W, + mut offset: usize, + record: EncodedData, + dictionaries: Vec, + ) -> Result> { let mut dict_blocks = vec![]; for dict in dictionaries { let (meta, data) = write_message(&mut writer, dict).await?; @@ -85,14 +131,16 @@ impl<'a, W> FileSink<'a, W> where W: AsyncWrite + Unpin + Send + 'a { Ok((offset, Some(block), dict_blocks, Some(writer))) } - async fn finish(mut writer: W, footer: Footer) -> Result<(usize, Option, Vec, Option)> { + async fn finish(mut writer: W, footer: Footer) -> Result> { write_continuation(&mut writer, 0).await?; let footer = { let mut builder = Builder::new(); builder.finish(&footer, None).to_owned() }; writer.write_all(&footer[..]).await?; - writer.write_all(&(footer.len() as i32).to_le_bytes()).await?; + writer + .write_all(&(footer.len() as i32).to_le_bytes()) + .await?; writer.write_all(&ARROW_MAGIC).await?; writer.close().await?; @@ -111,11 +159,11 @@ impl<'a, W> FileSink<'a, W> where W: AsyncWrite + Unpin + Send + 'a { } self.dictionary_blocks.append(&mut dictionaries); Poll::Ready(Ok(())) - }, + } Err(error) => { self.task = None; Poll::Ready(Err(error)) - }, + } } } else { Poll::Ready(Ok(())) @@ -123,10 +171,16 @@ impl<'a, W> FileSink<'a, W> where W: AsyncWrite + Unpin + Send + 'a { } } -impl<'a, W> Sink>> for FileSink<'a, W> where W: AsyncWrite + Unpin + Send + 'a { +impl<'a, W> Sink>> for FileSink<'a, W> +where + W: AsyncWrite + Unpin + Send + 'a, +{ type Error = ArrowError; - fn poll_ready(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll> { + fn poll_ready( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { self.get_mut().poll_write(cx) } @@ -151,11 +205,17 @@ impl<'a, W> Sink>> for FileSink<'a, W> where W: AsyncWrite } } - fn poll_flush(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll> { + fn poll_flush( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { self.get_mut().poll_write(cx) } - fn poll_close(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll> { + fn poll_close( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { let this = self.get_mut(); match futures::ready!(this.poll_write(cx)) { Ok(()) => { @@ -173,8 +233,58 @@ impl<'a, W> Sink>> for FileSink<'a, W> where W: AsyncWrite } else { Poll::Ready(Ok(())) } - }, + } Err(error) => Poll::Ready(Err(error)), } } } + +#[cfg(test)] +mod tests { + use super::FileSink; + use crate::{ + array::{Array, Float32Array, Int32Array}, + chunk::Chunk, + datatypes::{DataType, Field, Schema}, + io::ipc::read::file_async::{read_file_metadata_async, FileStream}, + }; + use futures::{io::Cursor, SinkExt, TryStreamExt}; + use std::sync::Arc; + + // Verify round trip data integrity when using async read + write. + #[test] + fn test_file_async_roundtrip() { + futures::executor::block_on(async move { + let mut data = vec![]; + for i in 0..5 { + let a1 = Int32Array::from(&[Some(i), None, Some(i + 1)]); + let a2 = Float32Array::from(&[None, Some(i as f32), None]); + let chunk = Chunk::new(vec![ + Arc::new(a1) as Arc, + Arc::new(a2) as Arc, + ]); + data.push(chunk); + } + let schema = Schema::from(vec![ + Field::new("a1", DataType::Int32, true), + Field::new("a2", DataType::Float32, true), + ]); + + let mut buffer = Cursor::new(Vec::new()); + let mut sink = FileSink::new(&mut buffer, schema, None, Default::default()); + for chunk in &data { + sink.feed(chunk.clone()).await.unwrap(); + } + sink.close().await.unwrap(); + drop(sink); + + buffer.set_position(0); + let metadata = read_file_metadata_async(&mut buffer).await.unwrap(); + let stream = FileStream::new(buffer, metadata, None); + let out = stream.try_collect::>().await.unwrap(); + for i in 0..5 { + assert_eq!(data[i], out[i]); + } + }) + } +} diff --git a/src/io/ipc/write/stream_async.rs b/src/io/ipc/write/stream_async.rs index ab3cfd58c35..8a2485b4dfa 100644 --- a/src/io/ipc/write/stream_async.rs +++ b/src/io/ipc/write/stream_async.rs @@ -108,7 +108,12 @@ impl StreamWriter { } } -/// A sink that writes array [`chunks`](Chunk) to an async writer. +/// A sink that writes array [`chunks`](Chunk) as an IPC stream. +/// +/// The stream header is automatically written before writing the first chunk. +/// +/// The sink uses the same `ipc_fields` projection and `write_options` for each chunk. +/// For more fine-grained control over those parameters, see [`StreamWriter`]. /// /// # Examples /// @@ -257,22 +262,19 @@ where #[cfg(test)] mod tests { - use std::sync::Arc; - - use futures::{SinkExt, TryStreamExt}; - + use super::StreamSink; use crate::{ array::{Array, Float32Array, Int32Array}, chunk::Chunk, datatypes::{DataType, Field, Schema}, io::ipc::read::stream_async::{read_stream_metadata_async, AsyncStreamReader}, }; - - use super::StreamSink; + use futures::{SinkExt, TryStreamExt}; + use std::sync::Arc; // Verify round trip data integrity when using async read + write. #[test] - fn test_stream_sink_roundtrip() { + fn test_stream_async_roundtrip() { futures::executor::block_on(async move { let mut data = vec![]; for i in 0..5 { From 106bfd607b7e9863fe41832b48872dcb5b54a91c Mon Sep 17 00:00:00 2001 From: Dexter Duckworth Date: Wed, 2 Mar 2022 13:23:52 -0500 Subject: [PATCH 08/10] Added documentation for async IPC file reader. --- src/io/ipc/read/file_async.rs | 127 ++++++++++++++++++--------------- src/io/ipc/write/file_async.rs | 18 +++-- 2 files changed, 85 insertions(+), 60 deletions(-) diff --git a/src/io/ipc/read/file_async.rs b/src/io/ipc/read/file_async.rs index 4ee8f45fb48..a67170e51a0 100644 --- a/src/io/ipc/read/file_async.rs +++ b/src/io/ipc/read/file_async.rs @@ -2,35 +2,25 @@ use std::io::SeekFrom; use std::sync::Arc; -use arrow_format::ipc::BlockRef; -use arrow_format::ipc::FooterRef; -use arrow_format::ipc::MessageHeaderRef; -use arrow_format::ipc::MessageRef; -use arrow_format::ipc::planus::ReadAsRoot; -use arrow_format::ipc::planus::Vector; -use futures::AsyncSeek; -use futures::AsyncSeekExt; -use futures::AsyncRead; -use futures::AsyncReadExt; -use futures::Stream; -use futures::stream::BoxStream; -use futures::StreamExt; +use arrow_format::ipc::{ + planus::{ReadAsRoot, Vector}, + BlockRef, FooterRef, MessageHeaderRef, MessageRef, +}; +use futures::{ + stream::BoxStream, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, Stream, StreamExt, +}; use crate::array::*; use crate::chunk::Chunk; +use crate::datatypes::{Field, Schema}; use crate::error::{ArrowError, Result}; -use crate::io::ipc::ARROW_MAGIC; -use crate::datatypes::Schema; -use crate::io::ipc::IpcSchema; -use crate::datatypes::Field; +use crate::io::ipc::{IpcSchema, ARROW_MAGIC, CONTINUATION_MARKER}; -use super::super::CONTINUATION_MARKER; -use super::FileMetadata; use super::common::{read_dictionary, read_record_batch}; use super::reader::get_serialized_batch; -use super::schema::deserialize_stream_metadata; -use super::Dictionaries; use super::schema::fb_to_schema; +use super::Dictionaries; +use super::FileMetadata; /// Async reader for Arrow IPC files pub struct FileStream<'a> { @@ -41,14 +31,20 @@ pub struct FileStream<'a> { impl<'a> FileStream<'a> { /// Create a new IPC file reader. + /// + /// # Examples + /// See [`FileSink`](crate::io::ipc::write::file_async::FileSink). pub fn new(reader: R, metadata: FileMetadata, projection: Option>) -> Self - where R: AsyncRead + AsyncSeek + Unpin + Send + 'a + where + R: AsyncRead + AsyncSeek + Unpin + Send + 'a, { let schema = if let Some(projection) = projection.as_ref() { - projection.windows(2).for_each(|x| assert!( - x[0] < x[1], - "IPC projection must be ordered and non-overlapping", - )); + projection.windows(2).for_each(|x| { + assert!( + x[0] < x[1], + "IPC projection must be ordered and non-overlapping", + ) + }); let fields = projection .iter() .map(|&x| metadata.schema.fields[x].clone()) @@ -79,8 +75,13 @@ impl<'a> FileStream<'a> { &self.schema } - fn stream(mut reader: R, metadata: FileMetadata, projection: Option>) -> BoxStream<'a, Result>>> - where R: AsyncRead + AsyncSeek + Unpin + Send + 'a + fn stream( + mut reader: R, + metadata: FileMetadata, + projection: Option>, + ) -> BoxStream<'a, Result>>> + where + R: AsyncRead + AsyncSeek + Unpin + Send + 'a, { async_stream::try_stream! { let mut meta_buffer = vec![]; @@ -96,33 +97,42 @@ impl<'a> FileStream<'a> { ).await?; yield chunk; } - }.boxed() + } + .boxed() } } impl<'a> Stream for FileStream<'a> { type Item = Result>>; - fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll> { + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { self.get_mut().stream.poll_next_unpin(cx) } } /// Read the metadata from an IPC file. pub async fn read_file_metadata_async(reader: &mut R) -> Result - where R: AsyncRead + AsyncSeek + Unpin +where + R: AsyncRead + AsyncSeek + Unpin, { // Check header let mut magic = [0; 6]; reader.read_exact(&mut magic).await?; if magic != ARROW_MAGIC { - return Err(ArrowError::OutOfSpec("file does not contain correct Arrow header".to_string())); + return Err(ArrowError::OutOfSpec( + "file does not contain correct Arrow header".to_string(), + )); } // Check footer reader.seek(SeekFrom::End(-6)).await?; reader.read_exact(&mut magic).await?; if magic != ARROW_MAGIC { - return Err(ArrowError::OutOfSpec("file does not contain correct Arrow footer".to_string())); + return Err(ArrowError::OutOfSpec( + "file does not contain correct Arrow footer".to_string(), + )); } // Get footer size let mut footer_size = [0; 4]; @@ -136,9 +146,11 @@ pub async fn read_file_metadata_async(reader: &mut R) -> Result let footer = FooterRef::read_as_root(&footer[..]) .map_err(|err| ArrowError::OutOfSpec(format!("unable to get root as footer: {:?}", err)))?; - let blocks = footer.record_batches()? - .ok_or_else(|| ArrowError::OutOfSpec("unable to get record batches from footer".to_string()))?; - let schema = footer.schema()? + let blocks = footer.record_batches()?.ok_or_else(|| { + ArrowError::OutOfSpec("unable to get record batches from footer".to_string()) + })?; + let schema = footer + .schema()? .ok_or_else(|| ArrowError::OutOfSpec("unable to get schema from footer".to_string()))?; let (schema, ipc_schema) = fb_to_schema(schema)?; let dictionary_blocks = footer.dictionaries()?; @@ -151,7 +163,10 @@ pub async fn read_file_metadata_async(reader: &mut R) -> Result Ok(FileMetadata { schema, ipc_schema, - blocks: blocks.iter().map(|block| Ok(block.try_into()?)).collect::>>()?, + blocks: blocks + .iter() + .map(|block| Ok(block.try_into()?)) + .collect::>>()?, dictionaries, }) } @@ -162,7 +177,8 @@ async fn read_dictionaries( ipc_schema: &IpcSchema, blocks: Vector<'_, BlockRef<'_>>, ) -> Result - where R: AsyncRead + AsyncSeek + Unpin, +where + R: AsyncRead + AsyncSeek + Unpin, { let mut dictionaries = Default::default(); let mut data = vec![]; @@ -172,9 +188,11 @@ async fn read_dictionaries( let offset = block.offset() as u64; read_dictionary_message(reader, offset, &mut data).await?; - let message = MessageRef::read_as_root(&data) - .map_err(|err| ArrowError::OutOfSpec(format!("unable to get root as message: {:?}", err)))?; - let header = message.header()? + let message = MessageRef::read_as_root(&data).map_err(|err| { + ArrowError::OutOfSpec(format!("unable to get root as message: {:?}", err)) + })?; + let header = message + .header()? .ok_or_else(|| ArrowError::oos("message must have a header"))?; match header { MessageHeaderRef::DictionaryBatch(batch) => { @@ -182,26 +200,22 @@ async fn read_dictionaries( buffer.resize(block.body_length() as usize, 0); reader.read_exact(&mut buffer).await?; let mut cursor = std::io::Cursor::new(&mut buffer); - read_dictionary( - batch, - fields, - ipc_schema, - &mut dictionaries, - &mut cursor, - 0, - )?; - }, - other => return Err(ArrowError::OutOfSpec(format!( - "expected DictionaryBatch in dictionary blocks, found {:?}", - other, - ))), + read_dictionary(batch, fields, ipc_schema, &mut dictionaries, &mut cursor, 0)?; + } + other => { + return Err(ArrowError::OutOfSpec(format!( + "expected DictionaryBatch in dictionary blocks, found {:?}", + other, + ))) + } } } Ok(dictionaries) } async fn read_dictionary_message(reader: &mut R, offset: u64, data: &mut Vec) -> Result<()> - where R: AsyncRead + AsyncSeek + Unpin, +where + R: AsyncRead + AsyncSeek + Unpin, { let mut message_size = [0; 4]; reader.seek(SeekFrom::Start(offset)).await?; @@ -225,7 +239,8 @@ async fn read_batch( meta_buffer: &mut Vec, block_buffer: &mut Vec, ) -> Result>> - where R: AsyncRead + AsyncSeek + Unpin, +where + R: AsyncRead + AsyncSeek + Unpin, { let block = metadata.blocks[block]; reader.seek(SeekFrom::Start(block.offset as u64)).await?; diff --git a/src/io/ipc/write/file_async.rs b/src/io/ipc/write/file_async.rs index 4ea422f5847..bc0f187ad79 100644 --- a/src/io/ipc/write/file_async.rs +++ b/src/io/ipc/write/file_async.rs @@ -27,17 +27,18 @@ type WriteOutput = (usize, Option, Vec, Option); /// /// ``` /// use std::sync::Arc; -/// use futures::SinkExt; +/// use futures::{SinkExt, TryStreamExt, io::Cursor}; /// use arrow2::array::{Array, Int32Array}; /// use arrow2::datatypes::{DataType, Field, Schema}; /// use arrow2::chunk::Chunk; -/// # use arrow2::io::ipc::write::file_async::FileSink; +/// use arrow2::io::ipc::write::file_async::FileSink; +/// use arrow2::io::ipc::read::file_async::{read_file_metadata_async, FileStream}; /// # futures::executor::block_on(async move { /// let schema = Schema::from(vec![ /// Field::new("values", DataType::Int32, true), /// ]); /// -/// let mut buffer = vec![]; +/// let mut buffer = Cursor::new(vec![]); /// let mut sink = FileSink::new( /// &mut buffer, /// schema, @@ -45,12 +46,20 @@ type WriteOutput = (usize, Option, Vec, Option); /// Default::default(), /// ); /// +/// // Write chunks to file /// for i in 0..3 { /// let values = Int32Array::from(&[Some(i), None]); /// let chunk = Chunk::new(vec![Arc::new(values) as Arc]); /// sink.feed(chunk).await?; /// } /// sink.close().await?; +/// drop(sink); +/// +/// // Read chunks from file +/// buffer.set_position(0); +/// let metadata = read_file_metadata_async(&mut buffer).await?; +/// let mut stream = FileStream::new(buffer, metadata, None); +/// let chunks = stream.try_collect::>().await?; /// # arrow2::error::Result::Ok(()) /// # }).unwrap(); /// ``` @@ -271,7 +280,7 @@ mod tests { ]); let mut buffer = Cursor::new(Vec::new()); - let mut sink = FileSink::new(&mut buffer, schema, None, Default::default()); + let mut sink = FileSink::new(&mut buffer, schema.clone(), None, Default::default()); for chunk in &data { sink.feed(chunk.clone()).await.unwrap(); } @@ -280,6 +289,7 @@ mod tests { buffer.set_position(0); let metadata = read_file_metadata_async(&mut buffer).await.unwrap(); + assert_eq!(schema, metadata.schema); let stream = FileStream::new(buffer, metadata, None); let out = stream.try_collect::>().await.unwrap(); for i in 0..5 { From 8e8b3f4d6af0b03b06d81fd0cc84bbc7f53ea834 Mon Sep 17 00:00:00 2001 From: Dexter Duckworth Date: Wed, 2 Mar 2022 14:04:02 -0500 Subject: [PATCH 09/10] Added test and documentation for parquet sink. --- src/io/ipc/write/stream_async.rs | 3 +- src/io/parquet/write/mod.rs | 2 +- src/io/parquet/write/sink.rs | 161 +++++++++++++++++++++++++++++-- 3 files changed, 158 insertions(+), 8 deletions(-) diff --git a/src/io/ipc/write/stream_async.rs b/src/io/ipc/write/stream_async.rs index 8a2485b4dfa..85feecf8ecc 100644 --- a/src/io/ipc/write/stream_async.rs +++ b/src/io/ipc/write/stream_async.rs @@ -292,7 +292,7 @@ mod tests { ]); let mut buffer = vec![]; - let mut sink = StreamSink::new(&mut buffer, schema, None, Default::default()); + let mut sink = StreamSink::new(&mut buffer, schema.clone(), None, Default::default()); for chunk in &data { sink.feed(chunk.clone()).await.unwrap(); } @@ -301,6 +301,7 @@ mod tests { let mut reader = &buffer[..]; let metadata = read_stream_metadata_async(&mut reader).await.unwrap(); + assert_eq!(schema, metadata.schema); let stream = AsyncStreamReader::new(reader, metadata); let out = stream.try_collect::>().await.unwrap(); for i in 0..5 { diff --git a/src/io/parquet/write/mod.rs b/src/io/parquet/write/mod.rs index 863777bee32..58f7931d821 100644 --- a/src/io/parquet/write/mod.rs +++ b/src/io/parquet/write/mod.rs @@ -41,7 +41,7 @@ pub use file::FileWriter; pub use row_group::{row_group_iter, RowGroupIterator}; pub use schema::to_parquet_type; pub use stream::FileStreamer; -pub use sink::ParquetSink; +pub use sink::FileSink; pub(self) fn decimal_length_from_precision(precision: usize) -> usize { // digits = floor(log_10(2^(8*n - 1) - 1)) diff --git a/src/io/parquet/write/sink.rs b/src/io/parquet/write/sink.rs index 77c96cf5071..c5e251290ba 100644 --- a/src/io/parquet/write/sink.rs +++ b/src/io/parquet/write/sink.rs @@ -6,22 +6,66 @@ use crate::{ io::parquet::write::{Encoding, FileStreamer, SchemaDescriptor, WriteOptions}, }; use futures::{future::BoxFuture, AsyncWrite, FutureExt, Sink, TryFutureExt}; -use std::{pin::Pin, sync::Arc, task::Poll}; +use parquet2::metadata::KeyValue; +use std::{collections::HashMap, pin::Pin, sync::Arc, task::Poll}; -/// Sink that writes array [`chunks`](Chunk) to an async writer. -pub struct ParquetSink<'a, W: AsyncWrite + Send + Unpin> { +/// Sink that writes array [`chunks`](Chunk) as a Parquet file. +/// +/// # Examples +/// +/// ``` +/// use std::sync::Arc; +/// use futures::SinkExt; +/// use arrow2::array::{Array, Int32Array}; +/// use arrow2::datatypes::{DataType, Field, Schema}; +/// use arrow2::chunk::Chunk; +/// use arrow2::io::parquet::write::{Encoding, WriteOptions, Compression, Version}; +/// # use arrow2::io::parquet::write::FileSink; +/// # futures::executor::block_on(async move { +/// let schema = Schema::from(vec![ +/// Field::new("values", DataType::Int32, true), +/// ]); +/// let encoding = vec![Encoding::Plain]; +/// let options = WriteOptions { +/// write_statistics: true, +/// compression: Compression::Uncompressed, +/// version: Version::V2, +/// }; +/// +/// let mut buffer = vec![]; +/// let mut sink = FileSink::try_new( +/// &mut buffer, +/// schema, +/// encoding, +/// options, +/// )?; +/// +/// for i in 0..3 { +/// let values = Int32Array::from(&[Some(i), None]); +/// let chunk = Chunk::new(vec![Arc::new(values) as Arc]); +/// sink.feed(chunk).await?; +/// } +/// sink.metadata.insert(String::from("key"), Some(String::from("value"))); +/// sink.close().await?; +/// # arrow2::error::Result::Ok(()) +/// # }).unwrap(); +/// ``` +pub struct FileSink<'a, W: AsyncWrite + Send + Unpin> { writer: Option>, task: Option>, ArrowError>>>, options: WriteOptions, encoding: Vec, schema: SchemaDescriptor, + /// Key-value metadata that will be written to the file on close. + pub metadata: HashMap>, } -impl<'a, W> ParquetSink<'a, W> +impl<'a, W> FileSink<'a, W> where W: AsyncWrite + Send + Unpin + 'a, { /// Create a new sink that writes arrays to the provided `writer`. + /// /// # Error /// If the Arrow schema can't be converted to a valid Parquet schema. pub fn try_new( @@ -45,6 +89,7 @@ where options, schema, encoding, + metadata: HashMap::default(), }) } @@ -70,7 +115,7 @@ where } } -impl<'a, W> Sink>> for ParquetSink<'a, W> +impl<'a, W> Sink>> for FileSink<'a, W> where W: AsyncWrite + Send + Unpin + 'a, { @@ -122,7 +167,18 @@ where Ok(()) => { let writer = this.writer.take(); if let Some(writer) = writer { - this.task = Some(writer.end(None).map_ok(|_| None).boxed()); + let meta = std::mem::take(&mut this.metadata); + let metadata = if meta.is_empty() { + None + } else { + Some( + meta.into_iter() + .map(|(k, v)| KeyValue::new(k, v)) + .collect::>(), + ) + }; + + this.task = Some(writer.end(metadata).map_ok(|_| None).boxed()); this.poll_complete(cx) } else { Poll::Ready(Ok(())) @@ -132,3 +188,96 @@ where } } } + +#[cfg(test)] +mod tests { + use futures::{future::BoxFuture, io::Cursor, SinkExt}; + use parquet2::{ + compression::Compression, + write::{Version, WriteOptions}, + }; + use std::{collections::HashMap, sync::Arc}; + + use crate::{ + array::{Array, Float32Array, Int32Array}, + chunk::Chunk, + datatypes::{DataType, Field, Schema}, + error::Result, + io::parquet::{ + read::{ + infer_schema, read_columns_many_async, read_metadata_async, RowGroupDeserializer, + }, + write::Encoding, + }, + }; + + use super::FileSink; + + #[test] + fn test_parquet_async_roundtrip() { + futures::executor::block_on(async move { + let mut data = vec![]; + for i in 0..5 { + let a1 = Int32Array::from(&[Some(i), None, Some(i + 1)]); + let a2 = Float32Array::from(&[None, Some(i as f32), None]); + let chunk = Chunk::new(vec![ + Arc::new(a1) as Arc, + Arc::new(a2) as Arc, + ]); + data.push(chunk); + } + let schema = Schema::from(vec![ + Field::new("a1", DataType::Int32, true), + Field::new("a2", DataType::Float32, true), + ]); + let encoding = vec![Encoding::Plain, Encoding::Plain]; + let options = WriteOptions { + write_statistics: true, + compression: Compression::Uncompressed, + version: Version::V2, + }; + + let mut buffer = Cursor::new(Vec::new()); + let mut sink = + FileSink::try_new(&mut buffer, schema.clone(), encoding, options).unwrap(); + sink.metadata + .insert(String::from("key"), Some("value".to_string())); + for chunk in &data { + sink.feed(chunk.clone()).await.unwrap(); + } + sink.close().await.unwrap(); + drop(sink); + + buffer.set_position(0); + let metadata = read_metadata_async(&mut buffer).await.unwrap(); + let kv = HashMap::>::from_iter( + metadata + .key_value_metadata() + .to_owned() + .unwrap() + .into_iter() + .map(|kv| (kv.key, kv.value)), + ); + assert_eq!(kv.get("key").unwrap(), &Some("value".to_string())); + let read_schema = infer_schema(&metadata).unwrap(); + assert_eq!(read_schema, schema); + let factory = || Box::pin(futures::future::ready(Ok(buffer.clone()))) as BoxFuture<_>; + + let mut out = vec![]; + for group in &metadata.row_groups { + let column_chunks = + read_columns_many_async(factory, group, schema.fields.clone(), None) + .await + .unwrap(); + let chunks = + RowGroupDeserializer::new(column_chunks, group.num_rows() as usize, None); + let mut chunks = chunks.collect::>>().unwrap(); + out.append(&mut chunks); + } + + for i in 0..5 { + assert_eq!(data[i], out[i]); + } + }) + } +} From 9e545661916ce73a5dfb88e677d83aaa7e87da51 Mon Sep 17 00:00:00 2001 From: Dexter Duckworth Date: Wed, 2 Mar 2022 14:29:46 -0500 Subject: [PATCH 10/10] Fixed formatting. --- src/io/parquet/write/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/io/parquet/write/mod.rs b/src/io/parquet/write/mod.rs index 58f7931d821..599657e6bd4 100644 --- a/src/io/parquet/write/mod.rs +++ b/src/io/parquet/write/mod.rs @@ -8,10 +8,10 @@ mod levels; mod primitive; mod row_group; mod schema; +mod sink; mod stream; mod utf8; mod utils; -mod sink; use crate::array::*; use crate::bitmap::Bitmap; @@ -40,8 +40,8 @@ pub use parquet2::{ pub use file::FileWriter; pub use row_group::{row_group_iter, RowGroupIterator}; pub use schema::to_parquet_type; -pub use stream::FileStreamer; pub use sink::FileSink; +pub use stream::FileStreamer; pub(self) fn decimal_length_from_precision(precision: usize) -> usize { // digits = floor(log_10(2^(8*n - 1) - 1))