From c7570bfcceb4a072096e6d88bd83fa96e593a186 Mon Sep 17 00:00:00 2001 From: Jorge Leitao Date: Tue, 11 Jan 2022 08:22:05 +0100 Subject: [PATCH] Migrated to latest parquet2 (#752) --- Cargo.toml | 2 +- src/io/parquet/write/record_batch.rs | 2 +- src/io/parquet/write/stream.rs | 25 ++++++++++++++----------- 3 files changed, 16 insertions(+), 13 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 2d7578f4e5c..f3a3f4096f6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -68,7 +68,7 @@ futures = { version = "0.3", optional = true } ahash = { version = "0.7", optional = true } # parquet support -parquet2 = { version = "0.8", optional = true, default_features = false, features = ["stream"] } +parquet2 = { version = "0.9", optional = true, default_features = false, features = ["stream"] } # avro support avro-schema = { version = "0.2", optional = true } diff --git a/src/io/parquet/write/record_batch.rs b/src/io/parquet/write/record_batch.rs index c298f82e1ed..7574e04739f 100644 --- a/src/io/parquet/write/record_batch.rs +++ b/src/io/parquet/write/record_batch.rs @@ -48,7 +48,7 @@ impl + 'static, I: Iterator>>> RowGro } } -impl + 'static, I: Iterator>>> Iterator +impl + 'static + Send + Sync, I: Iterator>>> Iterator for RowGroupIterator { type Item = Result>; diff --git a/src/io/parquet/write/stream.rs b/src/io/parquet/write/stream.rs index 925bc4fd7a7..2fc642bc330 100644 --- a/src/io/parquet/write/stream.rs +++ b/src/io/parquet/write/stream.rs @@ -1,5 +1,6 @@ //! Contains `async` APIs to write to parquet. use futures::stream::Stream; +use futures::Future; use parquet2::write::RowGroupIter; use parquet2::{ @@ -15,28 +16,29 @@ use super::schema::schema_to_metadata_key; use super::WriteOptions; /// Writes -pub async fn write_stream<'a, W, I>( - writer: &mut W, - row_groups: I, - schema: &Schema, +pub async fn write_stream<'a, 'b, W, S, F>( + mut writer: W, + row_groups: S, + schema: Schema, parquet_schema: SchemaDescriptor, options: WriteOptions, key_value_metadata: Option>, ) -> Result where W: std::io::Write, - I: Stream>>, + F: Future, ArrowError>>, + S: Stream, { let key_value_metadata = key_value_metadata .map(|mut x| { - x.push(schema_to_metadata_key(schema)); + x.push(schema_to_metadata_key(&schema)); x }) - .or_else(|| Some(vec![schema_to_metadata_key(schema)])); + .or_else(|| Some(vec![schema_to_metadata_key(&schema)])); let created_by = Some("Arrow2 - Native Rust implementation of Arrow".to_string()); Ok(parquet_write_stream( - writer, + &mut writer, row_groups, parquet_schema, options, @@ -47,9 +49,9 @@ where } /// Async writes -pub async fn write_stream_stream<'a, W, I>( +pub async fn write_stream_stream<'a, W, S, F>( writer: &mut W, - row_groups: I, + row_groups: S, schema: &Schema, parquet_schema: SchemaDescriptor, options: WriteOptions, @@ -57,7 +59,8 @@ pub async fn write_stream_stream<'a, W, I>( ) -> Result where W: futures::io::AsyncWrite + Unpin + Send, - I: Stream>>, + F: Future, ArrowError>>, + S: Stream, { let key_value_metadata = key_value_metadata .map(|mut x| {