Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Bumped parquet and aligned API to fit into it #795

Merged
merged 4 commits into from
Feb 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ futures = { version = "0.3", optional = true }
ahash = { version = "0.7", optional = true }

# parquet support
parquet2 = { version = "0.9", optional = true, default_features = false, features = ["stream"] }
parquet2 = { version = "0.10", optional = true, default_features = false, features = ["stream"] }

# avro support
avro-schema = { version = "0.2", optional = true }
Expand Down
26 changes: 13 additions & 13 deletions arrow-parquet-integration-testing/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use arrow2::{
json_integration::read,
json_integration::ArrowJson,
parquet::write::{
write_file, Compression, Encoding, RowGroupIterator, Version, WriteOptions,
Compression, Encoding, FileWriter, RowGroupIterator, Version, WriteOptions,
},
},
};
Expand Down Expand Up @@ -201,17 +201,17 @@ fn main() -> Result<()> {

let row_groups =
RowGroupIterator::try_new(batches.into_iter().map(Ok), &schema, options, encodings)?;
let parquet_schema = row_groups.parquet_schema().clone();

let mut writer = File::create(write_path)?;

let _ = write_file(
&mut writer,
row_groups,
&schema,
parquet_schema,
options,
None,
)?;

let writer = File::create(write_path)?;

let mut writer = FileWriter::try_new(writer, schema, options)?;

writer.start()?;
for group in row_groups {
let (group, len) = group?;
writer.write(group, len)?;
}
let _ = writer.end(None)?;

Ok(())
}
19 changes: 10 additions & 9 deletions benches/write_parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,16 @@ fn write(array: &dyn Array, encoding: Encoding) -> Result<()> {
vec![encoding],
)?;

let mut writer = Cursor::new(vec![]);
write_file(
&mut writer,
row_groups,
&schema,
to_parquet_schema(&schema)?,
options,
None,
)?;
let mut writer = vec![];

let mut writer = FileWriter::try_new(writer, schema, options)?;

writer.start()?;
for group in row_groups {
let (group, len) = group?;
writer.write(group, len)?;
}
let _ = writer.end(None)?;
Ok(())
}

Expand Down
60 changes: 21 additions & 39 deletions examples/parquet_write.rs
Original file line number Diff line number Diff line change
@@ -1,60 +1,39 @@
use std::fs::File;
use std::iter::once;
use std::sync::Arc;

use arrow2::error::ArrowError;
use arrow2::io::parquet::write::to_parquet_schema;
use arrow2::{
array::{Array, Int32Array},
datatypes::Field,
chunk::Chunk,
datatypes::{Field, Schema},
error::Result,
io::parquet::write::{
array_to_pages, write_file, Compression, Compressor, DynIter, DynStreamingIterator,
Encoding, FallibleStreamingIterator, Version, WriteOptions,
Compression, Encoding, FileWriter, RowGroupIterator, Version, WriteOptions,
},
};

fn write_single_array(path: &str, array: &dyn Array, field: Field) -> Result<()> {
let schema = vec![field].into();

fn write_batch(path: &str, schema: Schema, columns: Chunk<Arc<dyn Array>>) -> Result<()> {
let options = WriteOptions {
write_statistics: true,
compression: Compression::Uncompressed,
version: Version::V2,
};
let encoding = Encoding::Plain;

// map arrow fields to parquet fields
let parquet_schema = to_parquet_schema(&schema)?;

let descriptor = parquet_schema.columns()[0].clone();
let iter = vec![Ok(columns)];

// Declare the row group iterator. This must be an iterator of iterators of streaming iterators
// * first iterator over row groups
let row_groups = once(Result::Ok(DynIter::new(
// * second iterator over column chunks (we assume no struct arrays -> `once` column)
once(
// * third iterator over (compressed) pages; dictionary encoding may lead to multiple pages per array.
array_to_pages(array, descriptor, options, encoding).map(move |pages| {
let encoded_pages = DynIter::new(pages.map(|x| Ok(x?)));
let compressed_pages = Compressor::new(encoded_pages, options.compression, vec![])
.map_err(ArrowError::from);
DynStreamingIterator::new(compressed_pages)
}),
),
)));
let row_groups =
RowGroupIterator::try_new(iter.into_iter(), &schema, options, vec![Encoding::Plain])?;

// Create a new empty file
let mut file = File::create(path)?;
let file = File::create(path)?;

// Write the file. Note that, at present, any error results in a corrupted file.
let _ = write_file(
&mut file,
row_groups,
&schema,
parquet_schema,
options,
None,
)?;
let mut writer = FileWriter::try_new(file, schema, options)?;

writer.start()?;
for group in row_groups {
let (group, len) = group?;
writer.write(group, len)?;
}
let _size = writer.end(None)?;
Ok(())
}

Expand All @@ -69,5 +48,8 @@ fn main() -> Result<()> {
Some(6),
]);
let field = Field::new("c1", array.data_type().clone(), true);
write_single_array("test.parquet", &array, field)
let schema = Schema::from(vec![field]);
let columns = Chunk::new(vec![Arc::new(array) as Arc<dyn Array>]);

write_batch("test.parquet", schema, columns)
}
11 changes: 9 additions & 2 deletions examples/parquet_write_parallel/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,17 @@ fn parallel_write(path: &str, schema: &Schema, batches: &[Chunk]) -> Result<()>
});

// Create a new empty file
let mut file = std::fs::File::create(path)?;
let file = std::fs::File::create(path)?;

let mut writer = FileWriter::try_new(file, schema, options)?;

// Write the file.
let _file_size = write_file(&mut file, row_groups, schema, parquet_schema, options, None)?;
writer.start()?;
for group in row_groups {
let (group, len) = group?;
writer.write(group, len)?;
}
let _size = writer.end(None)?;

Ok(())
}
Expand Down
57 changes: 0 additions & 57 deletions examples/parquet_write_record.rs

This file was deleted.

9 changes: 1 addition & 8 deletions guide/src/io/parquet_write.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,12 @@ First, some notation:

## Single threaded

Here is an example of how to write a single column chunk into a single row group:
Here is an example of how to write a single chunk:

```rust
{{#include ../../../examples/parquet_write.rs}}
```

For single-threaded writing, this crate offers an API that encapsulates the above logic. It
assumes that a `Chunk` is mapped to a single row group with a single page per column.

```rust
{{#include ../../../examples/parquet_write_record.rs}}
```

## Multi-threaded writing

As user of this crate, you will need to decide how you would like to parallelize,
Expand Down
19 changes: 9 additions & 10 deletions src/doc/lib.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,15 @@ fn main() -> Result<()> {
// anything implementing `std::io::Write` works
let mut file = vec![];

let parquet_schema = row_groups.parquet_schema().clone();
let _ = write_file(
&mut file,
row_groups,
&schema,
parquet_schema,
options,
None,
)?;

let mut writer = FileWriter::try_new(file, schema, options)?;

// Write the file.
writer.start()?;
for group in row_groups {
let (group, len) = group?;
writer.write(group, len)?;
}
let _ = writer.end(None)?;
Ok(())
}
```
Expand Down
82 changes: 82 additions & 0 deletions src/io/parquet/write/file.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
use std::io::Write;

use parquet2::metadata::SchemaDescriptor;
use parquet2::write::RowGroupIter;
use parquet2::{metadata::KeyValue, write::WriteOptions};

use crate::datatypes::Schema;
use crate::error::{ArrowError, Result};

use super::{schema::schema_to_metadata_key, to_parquet_schema};

/// Attaches [`Schema`] to `key_value_metadata`
pub fn add_arrow_schema(
schema: &Schema,
key_value_metadata: Option<Vec<KeyValue>>,
) -> Option<Vec<KeyValue>> {
key_value_metadata
.map(|mut x| {
x.push(schema_to_metadata_key(schema));
x
})
.or_else(|| Some(vec![schema_to_metadata_key(schema)]))
}

pub struct FileWriter<W: Write> {
writer: parquet2::write::FileWriter<W>,
schema: Schema,
}

// Accessors
impl<W: Write> FileWriter<W> {
/// The options assigned to the file
pub fn options(&self) -> &WriteOptions {
self.writer.options()
}

/// The [`SchemaDescriptor`] assigned to this file
pub fn parquet_schema(&self) -> &SchemaDescriptor {
self.writer.schema()
}

/// The [`Schema`] assigned to this file
pub fn schema(&self) -> &Schema {
&self.schema
}
}

impl<W: Write> FileWriter<W> {
/// Returns a new [`FileWriter`].
/// # Error
/// If it is unable to derive a parquet schema from [`Schema`].
pub fn try_new(writer: W, schema: Schema, options: WriteOptions) -> Result<Self> {
let parquet_schema = to_parquet_schema(&schema)?;

let created_by = Some("Arrow2 - Native Rust implementation of Arrow".to_string());

Ok(Self {
writer: parquet2::write::FileWriter::new(writer, parquet_schema, options, created_by),
schema,
})
}

/// Writes the header of the file
pub fn start(&mut self) -> Result<()> {
Ok(self.writer.start()?)
}

/// Writes a row group to the file.
pub fn write(
&mut self,
row_group: RowGroupIter<'_, ArrowError>,
num_rows: usize,
) -> Result<()> {
Ok(self.writer.write(row_group, num_rows)?)
}

/// Writes the footer of the parquet file. Returns the total size of the file.
pub fn end(self, key_value_metadata: Option<Vec<KeyValue>>) -> Result<(u64, W)> {
let key_value_metadata = add_arrow_schema(&self.schema, key_value_metadata);
Ok(self.writer.end(key_value_metadata)?)
}
}
Loading