From c740e9f7138837fd5216b003b3e3516bf74dc68a Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Sat, 11 Dec 2021 08:02:22 +0000 Subject: [PATCH] Migrated to latest parquet --- Cargo.toml | 2 +- src/io/parquet/write/record_batch.rs | 2 +- src/io/parquet/write/stream.rs | 25 ++++++++++++++----------- 3 files changed, 16 insertions(+), 13 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 13fbf2e05cf..43657ed25a1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -67,7 +67,7 @@ futures = { version = "0.3", optional = true } ahash = { version = "0.7", optional = true } # parquet support -parquet2 = { version = "0.8", optional = true, default_features = false, features = ["stream"] } +parquet2 = { version = "0.9", optional = true, default_features = false, features = ["stream"] } # avro support avro-schema = { version = "0.2", optional = true } diff --git a/src/io/parquet/write/record_batch.rs b/src/io/parquet/write/record_batch.rs index c298f82e1ed..7574e04739f 100644 --- a/src/io/parquet/write/record_batch.rs +++ b/src/io/parquet/write/record_batch.rs @@ -48,7 +48,7 @@ impl + 'static, I: Iterator>>> RowGro } } -impl + 'static, I: Iterator>>> Iterator +impl + 'static + Send + Sync, I: Iterator>>> Iterator for RowGroupIterator { type Item = Result>; diff --git a/src/io/parquet/write/stream.rs b/src/io/parquet/write/stream.rs index 925bc4fd7a7..2fc642bc330 100644 --- a/src/io/parquet/write/stream.rs +++ b/src/io/parquet/write/stream.rs @@ -1,5 +1,6 @@ //! Contains `async` APIs to write to parquet. use futures::stream::Stream; +use futures::Future; use parquet2::write::RowGroupIter; use parquet2::{ @@ -15,28 +16,29 @@ use super::schema::schema_to_metadata_key; use super::WriteOptions; /// Writes -pub async fn write_stream<'a, W, I>( - writer: &mut W, - row_groups: I, - schema: &Schema, +pub async fn write_stream<'a, 'b, W, S, F>( + mut writer: W, + row_groups: S, + schema: Schema, parquet_schema: SchemaDescriptor, options: WriteOptions, key_value_metadata: Option>, ) -> Result where W: std::io::Write, - I: Stream>>, + F: Future, ArrowError>>, + S: Stream, { let key_value_metadata = key_value_metadata .map(|mut x| { - x.push(schema_to_metadata_key(schema)); + x.push(schema_to_metadata_key(&schema)); x }) - .or_else(|| Some(vec![schema_to_metadata_key(schema)])); + .or_else(|| Some(vec![schema_to_metadata_key(&schema)])); let created_by = Some("Arrow2 - Native Rust implementation of Arrow".to_string()); Ok(parquet_write_stream( - writer, + &mut writer, row_groups, parquet_schema, options, @@ -47,9 +49,9 @@ where } /// Async writes -pub async fn write_stream_stream<'a, W, I>( +pub async fn write_stream_stream<'a, W, S, F>( writer: &mut W, - row_groups: I, + row_groups: S, schema: &Schema, parquet_schema: SchemaDescriptor, options: WriteOptions, @@ -57,7 +59,8 @@ pub async fn write_stream_stream<'a, W, I>( ) -> Result where W: futures::io::AsyncWrite + Unpin + Send, - I: Stream>>, + F: Future, ArrowError>>, + S: Stream, { let key_value_metadata = key_value_metadata .map(|mut x| {