From 777f375d109d1957241c960ce438a88f37d87475 Mon Sep 17 00:00:00 2001 From: Jorge Leitao Date: Wed, 2 Feb 2022 21:00:30 +0100 Subject: [PATCH] Bump parquet and aligned API to fit into it (#795) --- Cargo.toml | 2 +- arrow-parquet-integration-testing/src/main.rs | 26 ++-- benches/write_parquet.rs | 19 +-- examples/parquet_write.rs | 60 +++------ examples/parquet_write_parallel/src/main.rs | 11 +- examples/parquet_write_record.rs | 57 -------- guide/src/io/parquet_write.md | 9 +- src/doc/lib.md | 19 ++- src/io/parquet/write/file.rs | 82 ++++++++++++ src/io/parquet/write/mod.rs | 46 ++----- .../write/{record_batch.rs => row_group.rs} | 67 ++++++---- src/io/parquet/write/stream.rs | 123 ++++++++---------- tests/it/io/parquet/mod.rs | 62 ++++----- tests/it/io/parquet/write.rs | 20 ++- 14 files changed, 287 insertions(+), 316 deletions(-) delete mode 100644 examples/parquet_write_record.rs create mode 100644 src/io/parquet/write/file.rs rename src/io/parquet/write/{record_batch.rs => row_group.rs} (54%) diff --git a/Cargo.toml b/Cargo.toml index 42171337c0f..a4daf1e7a71 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -69,7 +69,7 @@ futures = { version = "0.3", optional = true } ahash = { version = "0.7", optional = true } # parquet support -parquet2 = { version = "0.9", optional = true, default_features = false, features = ["stream"] } +parquet2 = { version = "0.10", optional = true, default_features = false, features = ["stream"] } # avro support avro-schema = { version = "0.2", optional = true } diff --git a/arrow-parquet-integration-testing/src/main.rs b/arrow-parquet-integration-testing/src/main.rs index e3f438f9e2f..b3eac1a6c44 100644 --- a/arrow-parquet-integration-testing/src/main.rs +++ b/arrow-parquet-integration-testing/src/main.rs @@ -12,7 +12,7 @@ use arrow2::{ json_integration::read, json_integration::ArrowJson, parquet::write::{ - write_file, Compression, Encoding, RowGroupIterator, Version, WriteOptions, + Compression, Encoding, FileWriter, RowGroupIterator, Version, WriteOptions, }, }, }; @@ -201,17 +201,17 @@ fn main() -> Result<()> { let row_groups = RowGroupIterator::try_new(batches.into_iter().map(Ok), &schema, options, encodings)?; - let parquet_schema = row_groups.parquet_schema().clone(); - - let mut writer = File::create(write_path)?; - - let _ = write_file( - &mut writer, - row_groups, - &schema, - parquet_schema, - options, - None, - )?; + + let writer = File::create(write_path)?; + + let mut writer = FileWriter::try_new(writer, schema, options)?; + + writer.start()?; + for group in row_groups { + let (group, len) = group?; + writer.write(group, len)?; + } + let _ = writer.end(None)?; + Ok(()) } diff --git a/benches/write_parquet.rs b/benches/write_parquet.rs index 815326c1eec..3709aedba65 100644 --- a/benches/write_parquet.rs +++ b/benches/write_parquet.rs @@ -29,15 +29,16 @@ fn write(array: &dyn Array, encoding: Encoding) -> Result<()> { vec![encoding], )?; - let mut writer = Cursor::new(vec![]); - write_file( - &mut writer, - row_groups, - &schema, - to_parquet_schema(&schema)?, - options, - None, - )?; + let mut writer = vec![]; + + let mut writer = FileWriter::try_new(writer, schema, options)?; + + writer.start()?; + for group in row_groups { + let (group, len) = group?; + writer.write(group, len)?; + } + let _ = writer.end(None)?; Ok(()) } diff --git a/examples/parquet_write.rs b/examples/parquet_write.rs index ae898e11a92..df7939563bb 100644 --- a/examples/parquet_write.rs +++ b/examples/parquet_write.rs @@ -1,60 +1,39 @@ use std::fs::File; -use std::iter::once; +use std::sync::Arc; -use arrow2::error::ArrowError; -use arrow2::io::parquet::write::to_parquet_schema; use arrow2::{ array::{Array, Int32Array}, - datatypes::Field, + chunk::Chunk, + datatypes::{Field, Schema}, error::Result, io::parquet::write::{ - array_to_pages, write_file, Compression, Compressor, DynIter, DynStreamingIterator, - Encoding, FallibleStreamingIterator, Version, WriteOptions, + Compression, Encoding, FileWriter, RowGroupIterator, Version, WriteOptions, }, }; -fn write_single_array(path: &str, array: &dyn Array, field: Field) -> Result<()> { - let schema = vec![field].into(); - +fn write_batch(path: &str, schema: Schema, columns: Chunk>) -> Result<()> { let options = WriteOptions { write_statistics: true, compression: Compression::Uncompressed, version: Version::V2, }; - let encoding = Encoding::Plain; - - // map arrow fields to parquet fields - let parquet_schema = to_parquet_schema(&schema)?; - let descriptor = parquet_schema.columns()[0].clone(); + let iter = vec![Ok(columns)]; - // Declare the row group iterator. This must be an iterator of iterators of streaming iterators - // * first iterator over row groups - let row_groups = once(Result::Ok(DynIter::new( - // * second iterator over column chunks (we assume no struct arrays -> `once` column) - once( - // * third iterator over (compressed) pages; dictionary encoding may lead to multiple pages per array. - array_to_pages(array, descriptor, options, encoding).map(move |pages| { - let encoded_pages = DynIter::new(pages.map(|x| Ok(x?))); - let compressed_pages = Compressor::new(encoded_pages, options.compression, vec![]) - .map_err(ArrowError::from); - DynStreamingIterator::new(compressed_pages) - }), - ), - ))); + let row_groups = + RowGroupIterator::try_new(iter.into_iter(), &schema, options, vec![Encoding::Plain])?; // Create a new empty file - let mut file = File::create(path)?; + let file = File::create(path)?; - // Write the file. Note that, at present, any error results in a corrupted file. - let _ = write_file( - &mut file, - row_groups, - &schema, - parquet_schema, - options, - None, - )?; + let mut writer = FileWriter::try_new(file, schema, options)?; + + writer.start()?; + for group in row_groups { + let (group, len) = group?; + writer.write(group, len)?; + } + let _size = writer.end(None)?; Ok(()) } @@ -69,5 +48,8 @@ fn main() -> Result<()> { Some(6), ]); let field = Field::new("c1", array.data_type().clone(), true); - write_single_array("test.parquet", &array, field) + let schema = Schema::from(vec![field]); + let columns = Chunk::new(vec![Arc::new(array) as Arc]); + + write_batch("test.parquet", schema, columns) } diff --git a/examples/parquet_write_parallel/src/main.rs b/examples/parquet_write_parallel/src/main.rs index 915bbe98233..e997f11ce4b 100644 --- a/examples/parquet_write_parallel/src/main.rs +++ b/examples/parquet_write_parallel/src/main.rs @@ -92,10 +92,17 @@ fn parallel_write(path: &str, schema: &Schema, batches: &[Chunk]) -> Result<()> }); // Create a new empty file - let mut file = std::fs::File::create(path)?; + let file = std::fs::File::create(path)?; + + let mut writer = FileWriter::try_new(file, schema, options)?; // Write the file. - let _file_size = write_file(&mut file, row_groups, schema, parquet_schema, options, None)?; + writer.start()?; + for group in row_groups { + let (group, len) = group?; + writer.write(group, len)?; + } + let _size = writer.end(None)?; Ok(()) } diff --git a/examples/parquet_write_record.rs b/examples/parquet_write_record.rs deleted file mode 100644 index 0198c10324e..00000000000 --- a/examples/parquet_write_record.rs +++ /dev/null @@ -1,57 +0,0 @@ -use std::fs::File; -use std::sync::Arc; - -use arrow2::{ - array::{Array, Int32Array}, - chunk::Chunk, - datatypes::{Field, Schema}, - error::Result, - io::parquet::write::{ - write_file, Compression, Encoding, RowGroupIterator, Version, WriteOptions, - }, -}; - -fn write_batch(path: &str, schema: Schema, columns: Chunk>) -> Result<()> { - let options = WriteOptions { - write_statistics: true, - compression: Compression::Uncompressed, - version: Version::V2, - }; - - let iter = vec![Ok(columns)]; - - let row_groups = - RowGroupIterator::try_new(iter.into_iter(), &schema, options, vec![Encoding::Plain])?; - - // Create a new empty file - let mut file = File::create(path)?; - - // Write the file. Note that, at present, any error results in a corrupted file. - let parquet_schema = row_groups.parquet_schema().clone(); - let _ = write_file( - &mut file, - row_groups, - &schema, - parquet_schema, - options, - None, - )?; - Ok(()) -} - -fn main() -> Result<()> { - let array = Int32Array::from(&[ - Some(0), - Some(1), - Some(2), - Some(3), - Some(4), - Some(5), - Some(6), - ]); - let field = Field::new("c1", array.data_type().clone(), true); - let schema = Schema::from(vec![field]); - let columns = Chunk::new(vec![Arc::new(array) as Arc]); - - write_batch("test.parquet", schema, columns) -} diff --git a/guide/src/io/parquet_write.md b/guide/src/io/parquet_write.md index a0272bd9709..5e4f1f0dbc9 100644 --- a/guide/src/io/parquet_write.md +++ b/guide/src/io/parquet_write.md @@ -13,19 +13,12 @@ First, some notation: ## Single threaded -Here is an example of how to write a single column chunk into a single row group: +Here is an example of how to write a single chunk: ```rust {{#include ../../../examples/parquet_write.rs}} ``` -For single-threaded writing, this crate offers an API that encapsulates the above logic. It -assumes that a `Chunk` is mapped to a single row group with a single page per column. - -```rust -{{#include ../../../examples/parquet_write_record.rs}} -``` - ## Multi-threaded writing As user of this crate, you will need to decide how you would like to parallelize, diff --git a/src/doc/lib.md b/src/doc/lib.md index 614271e718a..3fb07c1e42f 100644 --- a/src/doc/lib.md +++ b/src/doc/lib.md @@ -57,16 +57,15 @@ fn main() -> Result<()> { // anything implementing `std::io::Write` works let mut file = vec![]; - let parquet_schema = row_groups.parquet_schema().clone(); - let _ = write_file( - &mut file, - row_groups, - &schema, - parquet_schema, - options, - None, - )?; - + let mut writer = FileWriter::try_new(file, schema, options)?; + + // Write the file. + writer.start()?; + for group in row_groups { + let (group, len) = group?; + writer.write(group, len)?; + } + let _ = writer.end(None)?; Ok(()) } ``` diff --git a/src/io/parquet/write/file.rs b/src/io/parquet/write/file.rs new file mode 100644 index 00000000000..649d6805334 --- /dev/null +++ b/src/io/parquet/write/file.rs @@ -0,0 +1,82 @@ +use std::io::Write; + +use parquet2::metadata::SchemaDescriptor; +use parquet2::write::RowGroupIter; +use parquet2::{metadata::KeyValue, write::WriteOptions}; + +use crate::datatypes::Schema; +use crate::error::{ArrowError, Result}; + +use super::{schema::schema_to_metadata_key, to_parquet_schema}; + +/// Attaches [`Schema`] to `key_value_metadata` +pub fn add_arrow_schema( + schema: &Schema, + key_value_metadata: Option>, +) -> Option> { + key_value_metadata + .map(|mut x| { + x.push(schema_to_metadata_key(schema)); + x + }) + .or_else(|| Some(vec![schema_to_metadata_key(schema)])) +} + +pub struct FileWriter { + writer: parquet2::write::FileWriter, + schema: Schema, +} + +// Accessors +impl FileWriter { + /// The options assigned to the file + pub fn options(&self) -> &WriteOptions { + self.writer.options() + } + + /// The [`SchemaDescriptor`] assigned to this file + pub fn parquet_schema(&self) -> &SchemaDescriptor { + self.writer.schema() + } + + /// The [`Schema`] assigned to this file + pub fn schema(&self) -> &Schema { + &self.schema + } +} + +impl FileWriter { + /// Returns a new [`FileWriter`]. + /// # Error + /// If it is unable to derive a parquet schema from [`Schema`]. + pub fn try_new(writer: W, schema: Schema, options: WriteOptions) -> Result { + let parquet_schema = to_parquet_schema(&schema)?; + + let created_by = Some("Arrow2 - Native Rust implementation of Arrow".to_string()); + + Ok(Self { + writer: parquet2::write::FileWriter::new(writer, parquet_schema, options, created_by), + schema, + }) + } + + /// Writes the header of the file + pub fn start(&mut self) -> Result<()> { + Ok(self.writer.start()?) + } + + /// Writes a row group to the file. + pub fn write( + &mut self, + row_group: RowGroupIter<'_, ArrowError>, + num_rows: usize, + ) -> Result<()> { + Ok(self.writer.write(row_group, num_rows)?) + } + + /// Writes the footer of the parquet file. Returns the total size of the file. + pub fn end(self, key_value_metadata: Option>) -> Result<(u64, W)> { + let key_value_metadata = add_arrow_schema(&self.schema, key_value_metadata); + Ok(self.writer.end(key_value_metadata)?) + } +} diff --git a/src/io/parquet/write/mod.rs b/src/io/parquet/write/mod.rs index 37d97004ea3..5fec264cfd3 100644 --- a/src/io/parquet/write/mod.rs +++ b/src/io/parquet/write/mod.rs @@ -2,16 +2,16 @@ mod binary; mod boolean; mod dictionary; +mod file; mod fixed_len_bytes; mod levels; mod primitive; -mod record_batch; +mod row_group; mod schema; +mod stream; mod utf8; mod utils; -pub mod stream; - use crate::array::*; use crate::bitmap::Bitmap; use crate::buffer::Buffer; @@ -31,14 +31,15 @@ pub use parquet2::{ page::{CompressedDataPage, CompressedPage, EncodedPage}, schema::types::ParquetType, write::{ - compress, write_file as parquet_write_file, Compressor, DynIter, DynStreamingIterator, - RowGroupIter, Version, WriteOptions, + compress, Compressor, DynIter, DynStreamingIterator, RowGroupIter, Version, WriteOptions, }, FallibleStreamingIterator, }; -pub use record_batch::RowGroupIterator; -use schema::schema_to_metadata_key; + +pub use file::FileWriter; +pub use row_group::{row_group_iter, RowGroupIterator}; pub use schema::to_parquet_type; +pub use stream::FileStreamer; pub(self) fn decimal_length_from_precision(precision: usize) -> usize { // digits = floor(log_10(2^(8*n - 1) - 1)) @@ -61,37 +62,6 @@ pub fn to_parquet_schema(schema: &Schema) -> Result { Ok(SchemaDescriptor::new("root".to_string(), parquet_types)) } -/// Writes -pub fn write_file<'a, W, I>( - writer: &mut W, - row_groups: I, - schema: &Schema, - parquet_schema: SchemaDescriptor, - options: WriteOptions, - key_value_metadata: Option>, -) -> Result -where - W: std::io::Write, - I: Iterator>>, -{ - let key_value_metadata = key_value_metadata - .map(|mut x| { - x.push(schema_to_metadata_key(schema)); - x - }) - .or_else(|| Some(vec![schema_to_metadata_key(schema)])); - - let created_by = Some("Arrow2 - Native Rust implementation of Arrow".to_string()); - Ok(parquet_write_file( - writer, - row_groups, - parquet_schema, - options, - created_by, - key_value_metadata, - )?) -} - /// Checks whether the `data_type` can be encoded as `encoding`. /// Note that this is whether this implementation supports it, which is a subset of /// what the parquet spec allows. diff --git a/src/io/parquet/write/record_batch.rs b/src/io/parquet/write/row_group.rs similarity index 54% rename from src/io/parquet/write/record_batch.rs rename to src/io/parquet/write/row_group.rs index 7574e04739f..f6076808ac5 100644 --- a/src/io/parquet/write/record_batch.rs +++ b/src/io/parquet/write/row_group.rs @@ -1,10 +1,6 @@ -use parquet2::write::Compressor; use parquet2::FallibleStreamingIterator; +use parquet2::{metadata::ColumnDescriptor, write::Compressor}; -use super::{ - array_to_pages, to_parquet_schema, DynIter, DynStreamingIterator, Encoding, RowGroupIter, - SchemaDescriptor, WriteOptions, -}; use crate::{ array::Array, chunk::Chunk, @@ -12,6 +8,37 @@ use crate::{ error::{ArrowError, Result}, }; +use super::{ + array_to_pages, to_parquet_schema, DynIter, DynStreamingIterator, Encoding, RowGroupIter, + SchemaDescriptor, WriteOptions, +}; + +/// Maps a [`Chunk`] and parquet-specific options to an [`RowGroupIter`] used to +/// write to parquet +pub fn row_group_iter + 'static + Send + Sync>( + chunk: Chunk, + encodings: Vec, + columns: Vec, + options: WriteOptions, +) -> RowGroupIter<'static, ArrowError> { + DynIter::new( + chunk + .into_arrays() + .into_iter() + .zip(columns.into_iter()) + .zip(encodings.into_iter()) + .map(move |((array, descriptor), encoding)| { + array_to_pages(array.as_ref(), descriptor, options, encoding).map(move |pages| { + let encoded_pages = DynIter::new(pages.map(|x| Ok(x?))); + let compressed_pages = + Compressor::new(encoded_pages, options.compression, vec![]) + .map_err(ArrowError::from); + DynStreamingIterator::new(compressed_pages) + }) + }), + ) +} + /// An iterator adapter that converts an iterator over [`Chunk`] into an iterator /// of row groups. /// Use it to create an iterator consumable by the parquet's API. @@ -51,31 +78,23 @@ impl + 'static, I: Iterator>>> RowGro impl + 'static + Send + Sync, I: Iterator>>> Iterator for RowGroupIterator { - type Item = Result>; + type Item = Result<(RowGroupIter<'static, ArrowError>, usize)>; fn next(&mut self) -> Option { let options = self.options; self.iter.next().map(|maybe_chunk| { - let columns = maybe_chunk?; + let chunk = maybe_chunk?; + let len = chunk.len(); let encodings = self.encodings.clone(); - Ok(DynIter::new( - columns - .into_arrays() - .into_iter() - .zip(self.parquet_schema.columns().to_vec().into_iter()) - .zip(encodings.into_iter()) - .map(move |((array, descriptor), encoding)| { - array_to_pages(array.as_ref(), descriptor, options, encoding).map( - move |pages| { - let encoded_pages = DynIter::new(pages.map(|x| Ok(x?))); - let compressed_pages = - Compressor::new(encoded_pages, options.compression, vec![]) - .map_err(ArrowError::from); - DynStreamingIterator::new(compressed_pages) - }, - ) - }), + Ok(( + row_group_iter( + chunk, + encodings, + self.parquet_schema.columns().to_vec(), + options, + ), + len, )) }) } diff --git a/src/io/parquet/write/stream.rs b/src/io/parquet/write/stream.rs index 2fc642bc330..259c5aaa347 100644 --- a/src/io/parquet/write/stream.rs +++ b/src/io/parquet/write/stream.rs @@ -1,82 +1,71 @@ //! Contains `async` APIs to write to parquet. -use futures::stream::Stream; -use futures::Future; +use futures::AsyncWrite; +use parquet2::metadata::{KeyValue, SchemaDescriptor}; use parquet2::write::RowGroupIter; -use parquet2::{ - metadata::{KeyValue, SchemaDescriptor}, - write::stream::write_stream as parquet_write_stream, - write::stream::write_stream_stream as parquet_write_stream_stream, -}; use crate::datatypes::*; use crate::error::{ArrowError, Result}; -use super::schema::schema_to_metadata_key; -use super::WriteOptions; +use super::file::add_arrow_schema; +use super::{to_parquet_schema, WriteOptions}; -/// Writes -pub async fn write_stream<'a, 'b, W, S, F>( - mut writer: W, - row_groups: S, +/// An interface to write a parquet to a [`AsyncWrite`] +pub struct FileStreamer { + writer: parquet2::write::FileStreamer, schema: Schema, - parquet_schema: SchemaDescriptor, - options: WriteOptions, - key_value_metadata: Option>, -) -> Result -where - W: std::io::Write, - F: Future, ArrowError>>, - S: Stream, -{ - let key_value_metadata = key_value_metadata - .map(|mut x| { - x.push(schema_to_metadata_key(&schema)); - x - }) - .or_else(|| Some(vec![schema_to_metadata_key(&schema)])); +} + +// Accessors +impl FileStreamer { + /// The options assigned to the file + pub fn options(&self) -> &WriteOptions { + self.writer.options() + } + + /// The [`SchemaDescriptor`] assigned to this file + pub fn parquet_schema(&self) -> &SchemaDescriptor { + self.writer.schema() + } - let created_by = Some("Arrow2 - Native Rust implementation of Arrow".to_string()); - Ok(parquet_write_stream( - &mut writer, - row_groups, - parquet_schema, - options, - created_by, - key_value_metadata, - ) - .await?) + /// The [`Schema`] assigned to this file + pub fn schema(&self) -> &Schema { + &self.schema + } } -/// Async writes -pub async fn write_stream_stream<'a, W, S, F>( - writer: &mut W, - row_groups: S, - schema: &Schema, - parquet_schema: SchemaDescriptor, - options: WriteOptions, - key_value_metadata: Option>, -) -> Result -where - W: futures::io::AsyncWrite + Unpin + Send, - F: Future, ArrowError>>, - S: Stream, -{ - let key_value_metadata = key_value_metadata - .map(|mut x| { - x.push(schema_to_metadata_key(schema)); - x +impl FileStreamer { + /// Returns a new [`FileStreamer`]. + /// # Error + /// If it is unable to derive a parquet schema from [`Schema`]. + pub fn try_new(writer: W, schema: Schema, options: WriteOptions) -> Result { + let parquet_schema = to_parquet_schema(&schema)?; + + let created_by = Some("Arrow2 - Native Rust implementation of Arrow".to_string()); + + Ok(Self { + writer: parquet2::write::FileStreamer::new(writer, parquet_schema, options, created_by), + schema, }) - .or_else(|| Some(vec![schema_to_metadata_key(schema)])); + } + + /// Writes the header of the file + pub async fn start(&mut self) -> Result<()> { + Ok(self.writer.start().await?) + } + + /// Writes a row group to the file. + pub async fn write( + &mut self, + row_group: RowGroupIter<'_, ArrowError>, + num_rows: usize, + ) -> Result<()> { + Ok(self.writer.write(row_group, num_rows).await?) + } - let created_by = Some("Arrow2 - Native Rust implementation of Arrow".to_string()); - Ok(parquet_write_stream_stream( - writer, - row_groups, - parquet_schema, - options, - created_by, - key_value_metadata, - ) - .await?) + /// Writes the footer of the parquet file. Returns the total size of the file. + pub async fn end(self, key_value_metadata: Option>) -> Result<(u64, W)> { + let key_value_metadata = add_arrow_schema(&self.schema, key_value_metadata); + Ok(self.writer.end(key_value_metadata).await?) + } } diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index 5f5493e957d..49e58e6d495 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -1,7 +1,6 @@ use std::io::{Cursor, Read, Seek}; use std::sync::Arc; -use arrow2::error::ArrowError; use arrow2::{ array::*, bitmap::Bitmap, buffer::Buffer, chunk::Chunk, datatypes::*, error::Result, io::parquet::read::statistics::*, io::parquet::read::*, io::parquet::write::*, @@ -627,42 +626,31 @@ fn integration_write(schema: &Schema, batches: &[Chunk>]) -> Resu version: Version::V1, }; - let parquet_schema = to_parquet_schema(schema)?; - let descritors = parquet_schema.columns().to_vec().into_iter(); - - let row_groups = batches.iter().map(|batch| { - let iterator = batch - .columns() - .iter() - .zip(descritors.clone()) - .map(|(array, descriptor)| { - let encoding = if let DataType::Dictionary(..) = array.data_type() { - Encoding::RleDictionary - } else { - Encoding::Plain - }; - array_to_pages(array.as_ref(), descriptor, options, encoding).map(|pages| { - let encoded_pages = DynIter::new(pages.map(|x| Ok(x?))); - let compressed_pages = - Compressor::new(encoded_pages, options.compression, vec![]) - .map_err(ArrowError::from); - DynStreamingIterator::new(compressed_pages) - }) - }); - let iterator = DynIter::new(iterator); - Ok(iterator) - }); - - let mut writer = Cursor::new(vec![]); - - write_file( - &mut writer, - row_groups, - schema, - parquet_schema, - options, - None, - )?; + let encodings = schema + .fields + .iter() + .map(|x| { + if let DataType::Dictionary(..) = x.data_type() { + Encoding::RleDictionary + } else { + Encoding::Plain + } + }) + .collect(); + + let row_groups = + RowGroupIterator::try_new(batches.iter().cloned().map(Ok), schema, options, encodings)?; + + let writer = Cursor::new(vec![]); + + let mut writer = FileWriter::try_new(writer, schema.clone(), options)?; + + writer.start()?; + for group in row_groups { + let (group, len) = group?; + writer.write(group, len)?; + } + let (_size, writer) = writer.end(None)?; Ok(writer.into_inner()) } diff --git a/tests/it/io/parquet/write.rs b/tests/it/io/parquet/write.rs index 76d4cc858ec..98d50841634 100644 --- a/tests/it/io/parquet/write.rs +++ b/tests/it/io/parquet/write.rs @@ -40,21 +40,19 @@ fn round_trip( version, }; - let parquet_schema = to_parquet_schema(&schema)?; - let iter = vec![Chunk::try_new(vec![array.clone()])]; let row_groups = RowGroupIterator::try_new(iter.into_iter(), &schema, options, vec![encoding])?; - let mut writer = Cursor::new(vec![]); - write_file( - &mut writer, - row_groups, - &schema, - parquet_schema, - options, - None, - )?; + let writer = Cursor::new(vec![]); + let mut writer = FileWriter::try_new(writer, schema, options)?; + + writer.start()?; + for group in row_groups { + let (group, len) = group?; + writer.write(group, len)?; + } + let (_size, writer) = writer.end(None)?; let data = writer.into_inner();