Skip to content

Commit

Permalink
Simplified API to write files (#78)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Feb 2, 2022
1 parent cf419c9 commit dde40a6
Show file tree
Hide file tree
Showing 8 changed files with 285 additions and 257 deletions.
21 changes: 21 additions & 0 deletions integration-tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,27 @@ pub enum Array {
Struct(Vec<Array>, Vec<bool>),
}

impl Array {
pub fn len(&self) -> usize {
match self {
Array::UInt32(a) => a.len(),
Array::Int32(a) => a.len(),
Array::Int64(a) => a.len(),
Array::Int96(a) => a.len(),
Array::Float32(a) => a.len(),
Array::Float64(a) => a.len(),
Array::Boolean(a) => a.len(),
Array::Binary(a) => a.len(),
Array::List(a) => a.len(),
Array::Struct(a, _) => a[0].len(),
}
}

pub fn is_empty(&self) -> bool {
self.len() == 0
}
}

// The dynamic representation of values in native Rust. This is not exaustive.
// todo: maybe refactor this into serde/json?
#[derive(Debug, PartialEq)]
Expand Down
57 changes: 32 additions & 25 deletions integration-tests/src/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ mod tests {
use parquet::error::Result;
use parquet::metadata::SchemaDescriptor;
use parquet::statistics::Statistics;
use parquet::write::{write_file, Compressor, DynIter, DynStreamingIterator, Version};
use parquet::write::{Compressor, DynIter, DynStreamingIterator, FileWriter, Version};

use super::*;

Expand Down Expand Up @@ -65,16 +65,20 @@ mod tests {

let a = schema.columns();

let row_groups = std::iter::once(Ok(DynIter::new(std::iter::once(Ok(
DynStreamingIterator::new(Compressor::new_from_vec(
DynIter::new(std::iter::once(array_to_page(&array, &options, &a[0]))),
options.compression,
vec![],
)),
)))));
let num_rows = array.len();
let pages = DynStreamingIterator::new(Compressor::new_from_vec(
DynIter::new(std::iter::once(array_to_page(&array, &options, &a[0]))),
options.compression,
vec![],
));
let columns = std::iter::once(Ok(pages));

let mut writer = Cursor::new(vec![]);
write_file(&mut writer, row_groups, schema, options, None, None)?;
let writer = Cursor::new(vec![]);
let mut writer = FileWriter::new(writer, schema, options, None);

writer.start()?;
writer.write(DynIter::new(columns), num_rows)?;
let writer = writer.end(None)?.1;

let data = writer.into_inner();

Expand Down Expand Up @@ -142,7 +146,7 @@ mod tests2 {
error::Result,
metadata::SchemaDescriptor,
read::read_metadata,
write::{write_file, Compressor, DynIter, DynStreamingIterator, Version},
write::{Compressor, DynIter, DynStreamingIterator, FileWriter, Version},
};

#[test]
Expand All @@ -165,20 +169,23 @@ mod tests2 {

let schema = SchemaDescriptor::try_from_message("message schema { OPTIONAL INT32 col; }")?;

let row_groups = std::iter::once(Ok(DynIter::new(std::iter::once(Ok(
DynStreamingIterator::new(Compressor::new_from_vec(
DynIter::new(std::iter::once(array_to_page_v1(
&array,
&options,
&schema.columns()[0],
))),
options.compression,
vec![],
)),
)))));

let mut writer = Cursor::new(vec![]);
write_file(&mut writer, row_groups, schema, options, None, None)?;
let pages = DynStreamingIterator::new(Compressor::new_from_vec(
DynIter::new(std::iter::once(array_to_page_v1(
&array,
&options,
&schema.columns()[0],
))),
options.compression,
vec![],
));
let columns = std::iter::once(Ok(pages));

let writer = Cursor::new(vec![]);
let mut writer = FileWriter::new(writer, schema, options, None);

writer.start()?;
writer.write(DynIter::new(columns), 7)?;
let writer = writer.end(None)?.1;

let data = writer.into_inner();
let mut reader = Cursor::new(data);
Expand Down
3 changes: 1 addition & 2 deletions src/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,8 +342,7 @@ mod tests {
let column = 0;
let column_metadata = metadata.row_groups[row_group].column(column);
let buffer = vec![];
let mut iter: Vec<_> =
get_page_iterator(column_metadata, &mut file, None, buffer)?.collect();
let iter: Vec<_> = get_page_iterator(column_metadata, &mut file, None, buffer)?.collect();

let field = metadata.schema().fields()[0].clone();
let mut iter = ReadColumnIterator::new(field, vec![(iter, column_metadata.clone())]);
Expand Down
139 changes: 95 additions & 44 deletions src/write/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use parquet_format_async_temp::FileMetaData;

use parquet_format_async_temp::thrift::protocol::TCompactOutputProtocol;
use parquet_format_async_temp::thrift::protocol::TOutputProtocol;
use parquet_format_async_temp::RowGroup;

pub use crate::metadata::KeyValue;
use crate::{
Expand Down Expand Up @@ -37,53 +38,103 @@ pub(super) fn end_file<W: Write>(mut writer: &mut W, metadata: FileMetaData) ->
Ok(metadata_len as u64 + FOOTER_SIZE)
}

pub fn write_file<'a, W, I, E>(
writer: &mut W,
row_groups: I,
/// An interface to write a parquet file.
/// Use `start` to write the header, `write` to write a row group,
/// and `end` to write the footer.
pub struct FileWriter<W: Write> {
writer: W,
schema: SchemaDescriptor,
options: WriteOptions,
created_by: Option<String>,
key_value_metadata: Option<Vec<KeyValue>>,
) -> Result<u64>
where
W: Write,
I: Iterator<Item = std::result::Result<RowGroupIter<'a, E>, E>>,
ParquetError: From<E>,
E: std::error::Error,
{
let mut offset = start_file(writer)? as u64;

let row_groups = row_groups
.map(|row_group| {
let (group, size) = write_row_group(
writer,
offset,
schema.columns(),
options.compression,
row_group?,
)?;
offset += size;
Ok(group)
})
.collect::<Result<Vec<_>>>()?;

// compute file stats
let num_rows = row_groups.iter().map(|group| group.num_rows).sum();

let metadata = FileMetaData::new(
options.version.into(),
schema.into_thrift()?,
num_rows,
row_groups,
key_value_metadata,
created_by,
None,
None,
None,
);

let len = end_file(writer, metadata)?;
Ok(offset + len)

offset: u64,
row_groups: Vec<RowGroup>,
}

// Accessors
impl<W: Write> FileWriter<W> {
/// The options assigned to the file
pub fn options(&self) -> &WriteOptions {
&self.options
}

/// The [`SchemaDescriptor`] assigned to this file
pub fn schema(&self) -> &SchemaDescriptor {
&self.schema
}
}

impl<W: Write> FileWriter<W> {
/// Returns a new [`FileWriter`].
pub fn new(
writer: W,
schema: SchemaDescriptor,
options: WriteOptions,
created_by: Option<String>,
) -> Self {
Self {
writer,
schema,
options,
created_by,
offset: 0,
row_groups: vec![],
}
}

/// Writes the header of the file
pub fn start(&mut self) -> Result<()> {
self.offset = start_file(&mut self.writer)? as u64;
Ok(())
}

/// Writes a row group to the file.
///
/// This call is IO-bounded
pub fn write<E>(&mut self, row_group: RowGroupIter<'_, E>, num_rows: usize) -> Result<()>
where
ParquetError: From<E>,
E: std::error::Error,
{
if self.offset == 0 {
return Err(ParquetError::General(
"You must call `start` before writing the first row group".to_string(),
));
}
let (group, size) = write_row_group(
&mut self.writer,
self.offset,
self.schema.columns(),
self.options.compression,
row_group,
num_rows,
)?;
self.offset += size;
self.row_groups.push(group);
Ok(())
}

/// Writes the footer of the parquet file. Returns the total size of the file and the
/// underlying writer.
pub fn end(mut self, key_value_metadata: Option<Vec<KeyValue>>) -> Result<(u64, W)> {
// compute file stats
let num_rows = self.row_groups.iter().map(|group| group.num_rows).sum();

let metadata = FileMetaData::new(
self.options.version.into(),
self.schema.into_thrift()?,
num_rows,
self.row_groups,
key_value_metadata,
self.created_by,
None,
None,
None,
);

let len = end_file(&mut self.writer, metadata)?;
Ok((self.offset + len, self.writer))
}
}

#[cfg(test)]
Expand Down
11 changes: 8 additions & 3 deletions src/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,35 @@ mod row_group;
pub(self) mod statistics;

#[cfg(feature = "stream")]
pub mod stream;
mod stream;
#[cfg(feature = "stream")]
mod stream_stream;
pub use stream::FileStreamer;

mod dyn_iter;
pub use dyn_iter::{DynIter, DynStreamingIterator};

pub use compression::{compress, Compressor};

pub use file::write_file;
pub use file::FileWriter;

use crate::compression::Compression;
use crate::page::CompressedPage;

pub type RowGroupIter<'a, E> =
DynIter<'a, std::result::Result<DynStreamingIterator<'a, CompressedPage, E>, E>>;

/// Write options of different interfaces on this crate
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
pub struct WriteOptions {
/// Whether to write statistics
pub write_statistics: bool,
/// Whether to use compression
pub compression: Compression,
/// Which Parquet version to use
pub version: Version,
}

/// The parquet version to use
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
pub enum Version {
V1,
Expand Down
Loading

0 comments on commit dde40a6

Please sign in to comment.