Skip to content

Commit

Permalink
Added support to write to async writers (#35)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Aug 25, 2021
1 parent eca414b commit 450c99d
Show file tree
Hide file tree
Showing 9 changed files with 267 additions and 17 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ name = "parquet2"
bench = false

[dependencies]
parquet-format-async-temp = "0.1.0"
parquet-format-async-temp = "0.1.1"
bitpacking = { version = "0.8.2", features = ["bitpacker1x"] }
streaming-iterator = "0.1.5"

Expand Down
41 changes: 38 additions & 3 deletions src/write/column_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ use std::convert::TryInto;
use std::io::{Seek, Write};
use std::{collections::HashSet, error::Error};

use parquet_format_async_temp::thrift::protocol::TCompactOutputProtocol;
use parquet_format_async_temp::thrift::protocol::TOutputProtocol;
use futures::{AsyncSeek, AsyncWrite};
use parquet_format_async_temp::thrift::protocol::{
TCompactOutputProtocol, TCompactOutputStreamProtocol, TOutputProtocol, TOutputStreamProtocol,
};
use parquet_format_async_temp::{ColumnChunk, ColumnMetaData};

use crate::statistics::serialize_statistics;
Expand All @@ -16,7 +18,7 @@ use crate::{
schema::types::{physical_type_to_type, ParquetType},
};

use super::page::{write_page, PageWriteSpec};
use super::page::{write_page, write_page_async, PageWriteSpec};
use super::statistics::reduce;

pub fn write_column_chunk<
Expand Down Expand Up @@ -49,6 +51,39 @@ pub fn write_column_chunk<
Ok(column_chunk)
}

pub async fn write_column_chunk_async<
W: AsyncWrite + AsyncSeek + Unpin + Send,
I: Iterator<Item = std::result::Result<CompressedPage, E>>,
E: Error + Send + Sync + 'static,
>(
writer: &mut W,
descriptor: &ColumnDescriptor,
compression: Compression,
compressed_pages: I,
) -> Result<ColumnChunk> {
// write every page
let mut specs = vec![];
for compressed_page in compressed_pages {
let spec = write_page_async(
writer,
compressed_page.map_err(ParquetError::from_external_error)?,
)
.await?;
specs.push(spec);
}

let column_chunk = build_column_chunk(&specs, descriptor, compression)?;

// write metadata
let mut protocol = TCompactOutputStreamProtocol::new(writer);
column_chunk
.write_to_out_stream_protocol(&mut protocol)
.await?;
protocol.flush().await?;

Ok(column_chunk)
}

fn build_column_chunk(
specs: &[PageWriteSpec],
descriptor: &ColumnDescriptor,
Expand Down
10 changes: 5 additions & 5 deletions src/write/dyn_iter.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
/// [`DynIter`] is an implementation of a single-threaded, dynamically-typed iterator.
pub struct DynIter<'a, V> {
iter: Box<dyn Iterator<Item = V> + 'a>,
pub struct DynIter<V> {
iter: Box<dyn Iterator<Item = V>>,
}

impl<'iter, V> Iterator for DynIter<'iter, V> {
impl<V> Iterator for DynIter<V> {
type Item = V;
fn next(&mut self) -> Option<Self::Item> {
self.iter.next()
Expand All @@ -14,10 +14,10 @@ impl<'iter, V> Iterator for DynIter<'iter, V> {
}
}

impl<'iter, V> DynIter<'iter, V> {
impl<V> DynIter<V> {
pub fn new<I>(iter: I) -> Self
where
I: Iterator<Item = V> + 'iter,
I: Iterator<Item = V> + 'static,
{
Self {
iter: Box::new(iter),
Expand Down
4 changes: 2 additions & 2 deletions src/write/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub(super) fn end_file<W: Write + Seek>(mut writer: &mut W, metadata: FileMetaDa
Ok(())
}

pub fn write_file<'a, W, I, E>(
pub fn write_file<W, I, E>(
writer: &mut W,
row_groups: I,
schema: SchemaDescriptor,
Expand All @@ -54,7 +54,7 @@ pub fn write_file<'a, W, I, E>(
) -> Result<()>
where
W: Write + Seek,
I: Iterator<Item = std::result::Result<RowGroupIter<'a, E>, E>>,
I: Iterator<Item = std::result::Result<RowGroupIter<E>, E>>,
E: Error + Send + Sync + 'static,
{
start_file(writer)?;
Expand Down
6 changes: 4 additions & 2 deletions src/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ pub(self) mod statistics;

#[cfg(feature = "stream")]
pub mod stream;
#[cfg(feature = "stream")]
mod stream_stream;

mod dyn_iter;
pub use dyn_iter::DynIter;
Expand All @@ -15,8 +17,8 @@ pub use file::write_file;
use crate::compression::Compression;
use crate::page::CompressedPage;

pub type RowGroupIter<'a, E> =
DynIter<'a, std::result::Result<DynIter<'a, std::result::Result<CompressedPage, E>>, E>>;
pub type RowGroupIter<E> =
DynIter<std::result::Result<DynIter<std::result::Result<CompressedPage, E>>, E>>;

#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
pub struct WriteOptions {
Expand Down
55 changes: 53 additions & 2 deletions src/write/page.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use std::io::{Seek, SeekFrom, Write};
use std::sync::Arc;

use parquet_format_async_temp::thrift::protocol::TCompactOutputProtocol;
use parquet_format_async_temp::thrift::protocol::TOutputProtocol;
use futures::{AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt};
use parquet_format_async_temp::thrift::protocol::{
TCompactOutputProtocol, TCompactOutputStreamProtocol, TOutputProtocol, TOutputStreamProtocol,
};
use parquet_format_async_temp::{DictionaryPageHeader, Encoding, PageType};

use crate::error::Result;
Expand Down Expand Up @@ -54,6 +56,40 @@ pub fn write_page<W: Write + Seek>(
})
}

pub async fn write_page_async<W: AsyncWrite + AsyncSeek + Unpin + Send>(
writer: &mut W,
compressed_page: CompressedPage,
) -> Result<PageWriteSpec> {
let header = match &compressed_page {
CompressedPage::Data(compressed_page) => assemble_data_page_header(compressed_page),
CompressedPage::Dict(compressed_page) => assemble_dict_page_header(compressed_page),
};

let start_pos = writer.seek(SeekFrom::Current(0)).await?;

let header_size = write_page_header_async(writer, &header).await?;

match &compressed_page {
CompressedPage::Data(compressed_page) => writer.write_all(&compressed_page.buffer).await?,
CompressedPage::Dict(compressed_page) => writer.write_all(&compressed_page.buffer).await?,
}

let statistics = match &compressed_page {
CompressedPage::Data(compressed_page) => compressed_page.statistics().transpose()?,
CompressedPage::Dict(_) => None,
};

let end_pos = writer.seek(SeekFrom::Current(0)).await?;

Ok(PageWriteSpec {
header,
header_size,
offset: start_pos,
bytes_written: end_pos - start_pos,
statistics,
})
}

fn assemble_data_page_header(compressed_page: &CompressedDataPage) -> ParquetPageHeader {
let mut page_header = ParquetPageHeader {
type_: match compressed_page.header() {
Expand Down Expand Up @@ -111,3 +147,18 @@ fn write_page_header<W: Write + Seek>(
let end_pos = writer.seek(SeekFrom::Current(0))?;
Ok((end_pos - start_pos) as usize)
}

/// writes the page header into `writer`, returning the number of bytes used in the process.
async fn write_page_header_async<W: AsyncWrite + AsyncSeek + Unpin + Send>(
mut writer: &mut W,
header: &ParquetPageHeader,
) -> Result<usize> {
let start_pos = writer.seek(SeekFrom::Current(0)).await?;
{
let mut protocol = TCompactOutputStreamProtocol::new(&mut writer);
header.write_to_out_stream_protocol(&mut protocol).await?;
protocol.flush().await?;
}
let end_pos = writer.seek(SeekFrom::Current(0)).await?;
Ok((end_pos - start_pos) as usize)
}
60 changes: 59 additions & 1 deletion src/write/row_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{
io::{Seek, Write},
};

use futures::{AsyncSeek, AsyncWrite};
use parquet_format_async_temp::RowGroup;

use crate::{
Expand All @@ -12,7 +13,10 @@ use crate::{
page::CompressedPage,
};

use super::{column_chunk::write_column_chunk, DynIter};
use super::{
column_chunk::{write_column_chunk, write_column_chunk_async},
DynIter,
};

fn same_elements<T: PartialEq + Copy>(arr: &[T]) -> Option<Option<T>> {
if arr.is_empty() {
Expand Down Expand Up @@ -78,3 +82,57 @@ where
ordinal: None,
})
}

pub async fn write_row_group_async<
W,
E, // external error any of the iterators may emit
>(
writer: &mut W,
descriptors: &[ColumnDescriptor],
compression: Compression,
columns: DynIter<std::result::Result<DynIter<std::result::Result<CompressedPage, E>>, E>>,
) -> Result<RowGroup>
where
W: AsyncWrite + AsyncSeek + Unpin + Send,
E: Error + Send + Sync + 'static,
{
let column_iter = descriptors.iter().zip(columns);

let mut columns = vec![];
for (descriptor, page_iter) in column_iter {
let spec = write_column_chunk_async(
writer,
descriptor,
compression,
page_iter.map_err(ParquetError::from_external_error)?,
)
.await?;
columns.push(spec);
}

// compute row group stats
let num_rows = columns
.iter()
.map(|c| c.meta_data.as_ref().unwrap().num_values)
.collect::<Vec<_>>();
let num_rows = match same_elements(&num_rows) {
None => return Err(general_err!("Every column chunk in a row group MUST have the same number of rows. The columns have rows: {:?}", num_rows)),
Some(None) => 0,
Some(Some(v)) => v
};

let total_byte_size = columns
.iter()
.map(|c| c.meta_data.as_ref().unwrap().total_compressed_size)
.sum();

Ok(RowGroup {
columns,
total_byte_size,
num_rows,
sorting_columns: None,
file_offset: None,
total_compressed_size: None,
ordinal: None,
})
}
4 changes: 3 additions & 1 deletion src/write/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub async fn write_stream<'a, W, S, E>(
) -> Result<()>
where
W: Write + Seek,
S: Stream<Item = std::result::Result<RowGroupIter<'a, E>, E>>,
S: Stream<Item = std::result::Result<RowGroupIter<E>, E>>,
E: Error + Send + Sync + 'static,
{
start_file(writer)?;
Expand Down Expand Up @@ -63,3 +63,5 @@ where
end_file(writer, metadata)?;
Ok(())
}

pub use super::stream_stream::write_stream_stream;
Loading

0 comments on commit 450c99d

Please sign in to comment.