Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Bump parquet and aligned API to fit into it (#795)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Feb 2, 2022
1 parent 54797de commit 777f375
Show file tree
Hide file tree
Showing 14 changed files with 287 additions and 316 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ futures = { version = "0.3", optional = true }
ahash = { version = "0.7", optional = true }

# parquet support
parquet2 = { version = "0.9", optional = true, default_features = false, features = ["stream"] }
parquet2 = { version = "0.10", optional = true, default_features = false, features = ["stream"] }

# avro support
avro-schema = { version = "0.2", optional = true }
Expand Down
26 changes: 13 additions & 13 deletions arrow-parquet-integration-testing/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use arrow2::{
json_integration::read,
json_integration::ArrowJson,
parquet::write::{
write_file, Compression, Encoding, RowGroupIterator, Version, WriteOptions,
Compression, Encoding, FileWriter, RowGroupIterator, Version, WriteOptions,
},
},
};
Expand Down Expand Up @@ -201,17 +201,17 @@ fn main() -> Result<()> {

let row_groups =
RowGroupIterator::try_new(batches.into_iter().map(Ok), &schema, options, encodings)?;
let parquet_schema = row_groups.parquet_schema().clone();

let mut writer = File::create(write_path)?;

let _ = write_file(
&mut writer,
row_groups,
&schema,
parquet_schema,
options,
None,
)?;

let writer = File::create(write_path)?;

let mut writer = FileWriter::try_new(writer, schema, options)?;

writer.start()?;
for group in row_groups {
let (group, len) = group?;
writer.write(group, len)?;
}
let _ = writer.end(None)?;

Ok(())
}
19 changes: 10 additions & 9 deletions benches/write_parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,16 @@ fn write(array: &dyn Array, encoding: Encoding) -> Result<()> {
vec![encoding],
)?;

let mut writer = Cursor::new(vec![]);
write_file(
&mut writer,
row_groups,
&schema,
to_parquet_schema(&schema)?,
options,
None,
)?;
let mut writer = vec![];

let mut writer = FileWriter::try_new(writer, schema, options)?;

writer.start()?;
for group in row_groups {
let (group, len) = group?;
writer.write(group, len)?;
}
let _ = writer.end(None)?;
Ok(())
}

Expand Down
60 changes: 21 additions & 39 deletions examples/parquet_write.rs
Original file line number Diff line number Diff line change
@@ -1,60 +1,39 @@
use std::fs::File;
use std::iter::once;
use std::sync::Arc;

use arrow2::error::ArrowError;
use arrow2::io::parquet::write::to_parquet_schema;
use arrow2::{
array::{Array, Int32Array},
datatypes::Field,
chunk::Chunk,
datatypes::{Field, Schema},
error::Result,
io::parquet::write::{
array_to_pages, write_file, Compression, Compressor, DynIter, DynStreamingIterator,
Encoding, FallibleStreamingIterator, Version, WriteOptions,
Compression, Encoding, FileWriter, RowGroupIterator, Version, WriteOptions,
},
};

fn write_single_array(path: &str, array: &dyn Array, field: Field) -> Result<()> {
let schema = vec![field].into();

fn write_batch(path: &str, schema: Schema, columns: Chunk<Arc<dyn Array>>) -> Result<()> {
let options = WriteOptions {
write_statistics: true,
compression: Compression::Uncompressed,
version: Version::V2,
};
let encoding = Encoding::Plain;

// map arrow fields to parquet fields
let parquet_schema = to_parquet_schema(&schema)?;

let descriptor = parquet_schema.columns()[0].clone();
let iter = vec![Ok(columns)];

// Declare the row group iterator. This must be an iterator of iterators of streaming iterators
// * first iterator over row groups
let row_groups = once(Result::Ok(DynIter::new(
// * second iterator over column chunks (we assume no struct arrays -> `once` column)
once(
// * third iterator over (compressed) pages; dictionary encoding may lead to multiple pages per array.
array_to_pages(array, descriptor, options, encoding).map(move |pages| {
let encoded_pages = DynIter::new(pages.map(|x| Ok(x?)));
let compressed_pages = Compressor::new(encoded_pages, options.compression, vec![])
.map_err(ArrowError::from);
DynStreamingIterator::new(compressed_pages)
}),
),
)));
let row_groups =
RowGroupIterator::try_new(iter.into_iter(), &schema, options, vec![Encoding::Plain])?;

// Create a new empty file
let mut file = File::create(path)?;
let file = File::create(path)?;

// Write the file. Note that, at present, any error results in a corrupted file.
let _ = write_file(
&mut file,
row_groups,
&schema,
parquet_schema,
options,
None,
)?;
let mut writer = FileWriter::try_new(file, schema, options)?;

writer.start()?;
for group in row_groups {
let (group, len) = group?;
writer.write(group, len)?;
}
let _size = writer.end(None)?;
Ok(())
}

Expand All @@ -69,5 +48,8 @@ fn main() -> Result<()> {
Some(6),
]);
let field = Field::new("c1", array.data_type().clone(), true);
write_single_array("test.parquet", &array, field)
let schema = Schema::from(vec![field]);
let columns = Chunk::new(vec![Arc::new(array) as Arc<dyn Array>]);

write_batch("test.parquet", schema, columns)
}
11 changes: 9 additions & 2 deletions examples/parquet_write_parallel/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,17 @@ fn parallel_write(path: &str, schema: &Schema, batches: &[Chunk]) -> Result<()>
});

// Create a new empty file
let mut file = std::fs::File::create(path)?;
let file = std::fs::File::create(path)?;

let mut writer = FileWriter::try_new(file, schema, options)?;

// Write the file.
let _file_size = write_file(&mut file, row_groups, schema, parquet_schema, options, None)?;
writer.start()?;
for group in row_groups {
let (group, len) = group?;
writer.write(group, len)?;
}
let _size = writer.end(None)?;

Ok(())
}
Expand Down
57 changes: 0 additions & 57 deletions examples/parquet_write_record.rs

This file was deleted.

9 changes: 1 addition & 8 deletions guide/src/io/parquet_write.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,12 @@ First, some notation:

## Single threaded

Here is an example of how to write a single column chunk into a single row group:
Here is an example of how to write a single chunk:

```rust
{{#include ../../../examples/parquet_write.rs}}
```

For single-threaded writing, this crate offers an API that encapsulates the above logic. It
assumes that a `Chunk` is mapped to a single row group with a single page per column.

```rust
{{#include ../../../examples/parquet_write_record.rs}}
```

## Multi-threaded writing

As user of this crate, you will need to decide how you would like to parallelize,
Expand Down
19 changes: 9 additions & 10 deletions src/doc/lib.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,15 @@ fn main() -> Result<()> {
// anything implementing `std::io::Write` works
let mut file = vec![];

let parquet_schema = row_groups.parquet_schema().clone();
let _ = write_file(
&mut file,
row_groups,
&schema,
parquet_schema,
options,
None,
)?;

let mut writer = FileWriter::try_new(file, schema, options)?;

// Write the file.
writer.start()?;
for group in row_groups {
let (group, len) = group?;
writer.write(group, len)?;
}
let _ = writer.end(None)?;
Ok(())
}
```
Expand Down
82 changes: 82 additions & 0 deletions src/io/parquet/write/file.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
use std::io::Write;

use parquet2::metadata::SchemaDescriptor;
use parquet2::write::RowGroupIter;
use parquet2::{metadata::KeyValue, write::WriteOptions};

use crate::datatypes::Schema;
use crate::error::{ArrowError, Result};

use super::{schema::schema_to_metadata_key, to_parquet_schema};

/// Attaches [`Schema`] to `key_value_metadata`
pub fn add_arrow_schema(
schema: &Schema,
key_value_metadata: Option<Vec<KeyValue>>,
) -> Option<Vec<KeyValue>> {
key_value_metadata
.map(|mut x| {
x.push(schema_to_metadata_key(schema));
x
})
.or_else(|| Some(vec![schema_to_metadata_key(schema)]))
}

pub struct FileWriter<W: Write> {
writer: parquet2::write::FileWriter<W>,
schema: Schema,
}

// Accessors
impl<W: Write> FileWriter<W> {
/// The options assigned to the file
pub fn options(&self) -> &WriteOptions {
self.writer.options()
}

/// The [`SchemaDescriptor`] assigned to this file
pub fn parquet_schema(&self) -> &SchemaDescriptor {
self.writer.schema()
}

/// The [`Schema`] assigned to this file
pub fn schema(&self) -> &Schema {
&self.schema
}
}

impl<W: Write> FileWriter<W> {
/// Returns a new [`FileWriter`].
/// # Error
/// If it is unable to derive a parquet schema from [`Schema`].
pub fn try_new(writer: W, schema: Schema, options: WriteOptions) -> Result<Self> {
let parquet_schema = to_parquet_schema(&schema)?;

let created_by = Some("Arrow2 - Native Rust implementation of Arrow".to_string());

Ok(Self {
writer: parquet2::write::FileWriter::new(writer, parquet_schema, options, created_by),
schema,
})
}

/// Writes the header of the file
pub fn start(&mut self) -> Result<()> {
Ok(self.writer.start()?)
}

/// Writes a row group to the file.
pub fn write(
&mut self,
row_group: RowGroupIter<'_, ArrowError>,
num_rows: usize,
) -> Result<()> {
Ok(self.writer.write(row_group, num_rows)?)
}

/// Writes the footer of the parquet file. Returns the total size of the file.
pub fn end(self, key_value_metadata: Option<Vec<KeyValue>>) -> Result<(u64, W)> {
let key_value_metadata = add_arrow_schema(&self.schema, key_value_metadata);
Ok(self.writer.end(key_value_metadata)?)
}
}
Loading

0 comments on commit 777f375

Please sign in to comment.