Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

implemented futures::Sink for parquet async writer #877

Merged
merged 30 commits into from
Mar 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
1933f85
Added lifetime parameter to async IPC reader.
Feb 17, 2022
6c4df8d
Merge branch 'main' of github.com:mindx/arrow2 into main
Feb 17, 2022
6b834c5
Merge branch 'jorgecarleitao:main' into main
Feb 17, 2022
b55447e
Merge branch 'jorgecarleitao:main' into main
Mar 2, 2022
7e7c52c
DRY parquet reading (#845)
jorgecarleitao Feb 16, 2022
46f8c98
include validities in comparisons (#846)
ritchie46 Feb 16, 2022
3d2b687
Fix wrong null_count when slicing a sliced Bitmap (#848)
satlank Feb 17, 2022
cd6098a
Added sink implementation for writing Parquet streams.
Feb 17, 2022
9501521
Added lifetime parameter to async IPC reader. (#851)
Feb 17, 2022
3f39db2
Fixed error in writing compressed arrow (#855)
jorgecarleitao Feb 18, 2022
5b22c5e
Simplified API for FFI (#854)
jorgecarleitao Feb 19, 2022
21b01fc
Bumped crc (#856)
jorgecarleitao Feb 20, 2022
29e563a
add support for datatypes serde (#858)
houqp Feb 22, 2022
7b822a0
Added support to the Arrow C stream interface (read and write) (#857)
jorgecarleitao Feb 22, 2022
db520fe
Docs and lint (#860)
jorgecarleitao Feb 23, 2022
740dfe9
Removed warnings when crate is compiled without flags (#847)
jorgecarleitao Feb 23, 2022
d6f9359
Fixed reading parquet with timezone (#862)
jorgecarleitao Feb 24, 2022
f1ffed1
Moved files internally for better organization (#863)
jorgecarleitao Feb 25, 2022
5a0928a
Simplified API for writing to JSON (#864)
jorgecarleitao Feb 25, 2022
9e545b1
change csv-writer (#866)
ritchie46 Feb 26, 2022
d9c3854
Fixed json writing of dates and datetimes (#867)
jorgecarleitao Feb 26, 2022
3c714aa
Refactored JSON IO (better support for JSON and NDJSON) (#870)
jorgecarleitao Feb 27, 2022
e1c8a04
Improved performance of filter performance via Simd selection [3x] (#…
sundy-li Feb 28, 2022
034cc94
Added `try_new` and `new` to all arrays (#873)
jorgecarleitao Mar 1, 2022
5fa0918
Added test and documentation for parquet sink.
Mar 2, 2022
f2b51ea
Fixed formatting.
Mar 2, 2022
8d0722b
Replaced Parquet FileStream with FileSink.
Mar 2, 2022
57b82db
Moved Parquet Sink test to correct directory.
Mar 2, 2022
bcf1b64
Updated Parquet sink documentation.
Mar 2, 2022
1f0fcbf
Merge branch 'main' into parquet_sink
Mar 2, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/io/parquet/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ mod levels;
mod primitive;
mod row_group;
mod schema;
mod stream;
mod sink;
mod utf8;
mod utils;

Expand Down Expand Up @@ -39,7 +39,7 @@ pub use parquet2::{
pub use file::FileWriter;
pub use row_group::{row_group_iter, RowGroupIterator};
pub use schema::to_parquet_type;
pub use stream::FileStreamer;
pub use sink::FileSink;

pub(self) fn decimal_length_from_precision(precision: usize) -> usize {
// digits = floor(log_10(2^(8*n - 1) - 1))
Expand Down
223 changes: 223 additions & 0 deletions src/io/parquet/write/sink.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
use crate::{
array::Array,
chunk::Chunk,
datatypes::Schema,
error::ArrowError,
io::parquet::write::{Encoding, SchemaDescriptor, WriteOptions},
};
use futures::{future::BoxFuture, AsyncWrite, FutureExt, Sink, TryFutureExt};
use parquet2::metadata::KeyValue;
use parquet2::write::FileStreamer;
use std::{collections::HashMap, pin::Pin, sync::Arc, task::Poll};

use super::file::add_arrow_schema;

/// Sink that writes array [`chunks`](Chunk) as a Parquet file.
///
/// Any values in the sink's `metadata` field will be written to the file's footer
/// when the sink is closed.
///
/// # Examples
///
/// ```
/// use std::sync::Arc;
/// use futures::SinkExt;
/// use arrow2::array::{Array, Int32Array};
/// use arrow2::datatypes::{DataType, Field, Schema};
/// use arrow2::chunk::Chunk;
/// use arrow2::io::parquet::write::{Encoding, WriteOptions, Compression, Version};
/// # use arrow2::io::parquet::write::FileSink;
/// # futures::executor::block_on(async move {
///
/// let schema = Schema::from(vec![
/// Field::new("values", DataType::Int32, true),
/// ]);
/// let encoding = vec![Encoding::Plain];
/// let options = WriteOptions {
/// write_statistics: true,
/// compression: Compression::Uncompressed,
/// version: Version::V2,
/// };
///
/// let mut buffer = vec![];
/// let mut sink = FileSink::try_new(
/// &mut buffer,
/// schema,
/// encoding,
/// options,
/// )?;
///
/// for i in 0..3 {
/// let values = Int32Array::from(&[Some(i), None]);
/// let chunk = Chunk::new(vec![Arc::new(values) as Arc<dyn Array>]);
/// sink.feed(chunk).await?;
/// }
/// sink.metadata.insert(String::from("key"), Some(String::from("value")));
/// sink.close().await?;
/// # arrow2::error::Result::Ok(())
/// # }).unwrap();
/// ```
pub struct FileSink<'a, W: AsyncWrite + Send + Unpin> {
writer: Option<FileStreamer<W>>,
task: Option<BoxFuture<'a, Result<Option<FileStreamer<W>>, ArrowError>>>,
options: WriteOptions,
encoding: Vec<Encoding>,
schema: Schema,
parquet_schema: SchemaDescriptor,
/// Key-value metadata that will be written to the file on close.
pub metadata: HashMap<String, Option<String>>,
}

impl<'a, W> FileSink<'a, W>
where
W: AsyncWrite + Send + Unpin + 'a,
{
/// Create a new sink that writes arrays to the provided `writer`.
///
/// # Error
/// If the Arrow schema can't be converted to a valid Parquet schema.
pub fn try_new(
writer: W,
schema: Schema,
encoding: Vec<Encoding>,
options: WriteOptions,
) -> Result<Self, ArrowError> {
// let mut writer = FileStreamer::try_new(writer, schema.clone(), options)?;
let parquet_schema = crate::io::parquet::write::to_parquet_schema(&schema)?;
let created_by = Some("Arrow2 - Native Rust implementation of Arrow".to_string());
let mut writer = FileStreamer::new(writer, parquet_schema.clone(), options, created_by);
let task = Some(
async move {
writer.start().await?;
Ok(Some(writer))
}
.boxed(),
);
Ok(Self {
writer: None,
task,
options,
schema,
encoding,
parquet_schema,
metadata: HashMap::default(),
})
}

/// The Arrow [`Schema`] for the file.
pub fn schema(&self) -> &Schema {
&self.schema
}

/// The Parquet [`SchemaDescriptor`] for the file.
pub fn parquet_schema(&self) -> &SchemaDescriptor {
&self.parquet_schema
}

/// The write options for the file.
pub fn options(&self) -> &WriteOptions {
&self.options
}

fn poll_complete(
&mut self,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), ArrowError>> {
if let Some(task) = &mut self.task {
match futures::ready!(task.poll_unpin(cx)) {
Ok(writer) => {
self.task = None;
self.writer = writer;
Poll::Ready(Ok(()))
}
Err(error) => {
self.task = None;
Poll::Ready(Err(error))
}
}
} else {
Poll::Ready(Ok(()))
}
}
}

impl<'a, W> Sink<Chunk<Arc<dyn Array>>> for FileSink<'a, W>
where
W: AsyncWrite + Send + Unpin + 'a,
{
type Error = ArrowError;

fn start_send(self: Pin<&mut Self>, item: Chunk<Arc<dyn Array>>) -> Result<(), Self::Error> {
let this = self.get_mut();
if let Some(mut writer) = this.writer.take() {
let count = item.len();
let rows = crate::io::parquet::write::row_group_iter(
item,
this.encoding.clone(),
this.parquet_schema.columns().to_vec(),
this.options,
);
this.task = Some(Box::pin(async move {
writer.write(rows, count).await?;
Ok(Some(writer))
}));
Ok(())
} else {
Err(ArrowError::Io(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"writer closed".to_string(),
)))
}
}

fn poll_ready(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
self.get_mut().poll_complete(cx)
}

fn poll_flush(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
self.get_mut().poll_complete(cx)
}

fn poll_close(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
let this = self.get_mut();
match futures::ready!(this.poll_complete(cx)) {
Ok(()) => {
let writer = this.writer.take();
if let Some(writer) = writer {
let meta = std::mem::take(&mut this.metadata);
let metadata = if meta.is_empty() {
None
} else {
Some(
meta.into_iter()
.map(|(k, v)| KeyValue::new(k, v))
.collect::<Vec<_>>(),
)
};
let kv_meta = add_arrow_schema(&this.schema, metadata);

this.task = Some(
writer
.end(kv_meta)
.map_ok(|_| None)
.map_err(ArrowError::from)
.boxed(),
);
this.poll_complete(cx)
} else {
Poll::Ready(Ok(()))
}
}
Err(error) => Poll::Ready(Err(error)),
}
}
}
71 changes: 0 additions & 71 deletions src/io/parquet/write/stream.rs

This file was deleted.

1 change: 1 addition & 0 deletions tests/it/io/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::io::ipc::read_gzip_json;

mod read;
mod write;
mod write_async;

type ArrayStats = (Arc<dyn Array>, Option<Box<dyn Statistics>>);

Expand Down
Loading