diff --git a/Cargo.toml b/Cargo.toml index 5db6ff772..93783586e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ name = "parquet2" bench = false [dependencies] -parquet-format-async-temp = "0.1.0" +parquet-format-async-temp = "0.1.1" bitpacking = { version = "0.8.2", features = ["bitpacker1x"] } streaming-iterator = "0.1.5" diff --git a/src/write/column_chunk.rs b/src/write/column_chunk.rs index 2e6f810dd..3489b1f1d 100644 --- a/src/write/column_chunk.rs +++ b/src/write/column_chunk.rs @@ -2,8 +2,10 @@ use std::convert::TryInto; use std::io::{Seek, Write}; use std::{collections::HashSet, error::Error}; -use parquet_format_async_temp::thrift::protocol::TCompactOutputProtocol; -use parquet_format_async_temp::thrift::protocol::TOutputProtocol; +use futures::{AsyncSeek, AsyncWrite}; +use parquet_format_async_temp::thrift::protocol::{ + TCompactOutputProtocol, TCompactOutputStreamProtocol, TOutputProtocol, TOutputStreamProtocol, +}; use parquet_format_async_temp::{ColumnChunk, ColumnMetaData}; use crate::statistics::serialize_statistics; @@ -16,7 +18,7 @@ use crate::{ schema::types::{physical_type_to_type, ParquetType}, }; -use super::page::{write_page, PageWriteSpec}; +use super::page::{write_page, write_page_async, PageWriteSpec}; use super::statistics::reduce; pub fn write_column_chunk< @@ -49,6 +51,39 @@ pub fn write_column_chunk< Ok(column_chunk) } +pub async fn write_column_chunk_async< + W: AsyncWrite + AsyncSeek + Unpin + Send, + I: Iterator>, + E: Error + Send + Sync + 'static, +>( + writer: &mut W, + descriptor: &ColumnDescriptor, + compression: Compression, + compressed_pages: I, +) -> Result { + // write every page + let mut specs = vec![]; + for compressed_page in compressed_pages { + let spec = write_page_async( + writer, + compressed_page.map_err(ParquetError::from_external_error)?, + ) + .await?; + specs.push(spec); + } + + let column_chunk = build_column_chunk(&specs, descriptor, compression)?; + + // write metadata + let mut protocol = TCompactOutputStreamProtocol::new(writer); + column_chunk + .write_to_out_stream_protocol(&mut protocol) + .await?; + protocol.flush().await?; + + Ok(column_chunk) +} + fn build_column_chunk( specs: &[PageWriteSpec], descriptor: &ColumnDescriptor, diff --git a/src/write/dyn_iter.rs b/src/write/dyn_iter.rs index 2e40ca3ee..039693dce 100644 --- a/src/write/dyn_iter.rs +++ b/src/write/dyn_iter.rs @@ -1,9 +1,9 @@ /// [`DynIter`] is an implementation of a single-threaded, dynamically-typed iterator. -pub struct DynIter<'a, V> { - iter: Box + 'a>, +pub struct DynIter { + iter: Box>, } -impl<'iter, V> Iterator for DynIter<'iter, V> { +impl Iterator for DynIter { type Item = V; fn next(&mut self) -> Option { self.iter.next() @@ -14,10 +14,10 @@ impl<'iter, V> Iterator for DynIter<'iter, V> { } } -impl<'iter, V> DynIter<'iter, V> { +impl DynIter { pub fn new(iter: I) -> Self where - I: Iterator + 'iter, + I: Iterator + 'static, { Self { iter: Box::new(iter), diff --git a/src/write/file.rs b/src/write/file.rs index 54a35b0d1..55afd70ac 100644 --- a/src/write/file.rs +++ b/src/write/file.rs @@ -44,7 +44,7 @@ pub(super) fn end_file(mut writer: &mut W, metadata: FileMetaDa Ok(()) } -pub fn write_file<'a, W, I, E>( +pub fn write_file( writer: &mut W, row_groups: I, schema: SchemaDescriptor, @@ -54,7 +54,7 @@ pub fn write_file<'a, W, I, E>( ) -> Result<()> where W: Write + Seek, - I: Iterator, E>>, + I: Iterator, E>>, E: Error + Send + Sync + 'static, { start_file(writer)?; diff --git a/src/write/mod.rs b/src/write/mod.rs index 3ac58b952..f03bf14cc 100644 --- a/src/write/mod.rs +++ b/src/write/mod.rs @@ -6,6 +6,8 @@ pub(self) mod statistics; #[cfg(feature = "stream")] pub mod stream; +#[cfg(feature = "stream")] +mod stream_stream; mod dyn_iter; pub use dyn_iter::DynIter; @@ -15,8 +17,8 @@ pub use file::write_file; use crate::compression::Compression; use crate::page::CompressedPage; -pub type RowGroupIter<'a, E> = - DynIter<'a, std::result::Result>, E>>; +pub type RowGroupIter = + DynIter>, E>>; #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] pub struct WriteOptions { diff --git a/src/write/page.rs b/src/write/page.rs index 08a458b19..9edc59877 100644 --- a/src/write/page.rs +++ b/src/write/page.rs @@ -1,8 +1,10 @@ use std::io::{Seek, SeekFrom, Write}; use std::sync::Arc; -use parquet_format_async_temp::thrift::protocol::TCompactOutputProtocol; -use parquet_format_async_temp::thrift::protocol::TOutputProtocol; +use futures::{AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt}; +use parquet_format_async_temp::thrift::protocol::{ + TCompactOutputProtocol, TCompactOutputStreamProtocol, TOutputProtocol, TOutputStreamProtocol, +}; use parquet_format_async_temp::{DictionaryPageHeader, Encoding, PageType}; use crate::error::Result; @@ -54,6 +56,40 @@ pub fn write_page( }) } +pub async fn write_page_async( + writer: &mut W, + compressed_page: CompressedPage, +) -> Result { + let header = match &compressed_page { + CompressedPage::Data(compressed_page) => assemble_data_page_header(compressed_page), + CompressedPage::Dict(compressed_page) => assemble_dict_page_header(compressed_page), + }; + + let start_pos = writer.seek(SeekFrom::Current(0)).await?; + + let header_size = write_page_header_async(writer, &header).await?; + + match &compressed_page { + CompressedPage::Data(compressed_page) => writer.write_all(&compressed_page.buffer).await?, + CompressedPage::Dict(compressed_page) => writer.write_all(&compressed_page.buffer).await?, + } + + let statistics = match &compressed_page { + CompressedPage::Data(compressed_page) => compressed_page.statistics().transpose()?, + CompressedPage::Dict(_) => None, + }; + + let end_pos = writer.seek(SeekFrom::Current(0)).await?; + + Ok(PageWriteSpec { + header, + header_size, + offset: start_pos, + bytes_written: end_pos - start_pos, + statistics, + }) +} + fn assemble_data_page_header(compressed_page: &CompressedDataPage) -> ParquetPageHeader { let mut page_header = ParquetPageHeader { type_: match compressed_page.header() { @@ -111,3 +147,18 @@ fn write_page_header( let end_pos = writer.seek(SeekFrom::Current(0))?; Ok((end_pos - start_pos) as usize) } + +/// writes the page header into `writer`, returning the number of bytes used in the process. +async fn write_page_header_async( + mut writer: &mut W, + header: &ParquetPageHeader, +) -> Result { + let start_pos = writer.seek(SeekFrom::Current(0)).await?; + { + let mut protocol = TCompactOutputStreamProtocol::new(&mut writer); + header.write_to_out_stream_protocol(&mut protocol).await?; + protocol.flush().await?; + } + let end_pos = writer.seek(SeekFrom::Current(0)).await?; + Ok((end_pos - start_pos) as usize) +} diff --git a/src/write/row_group.rs b/src/write/row_group.rs index e82906fb2..1cd8d762f 100644 --- a/src/write/row_group.rs +++ b/src/write/row_group.rs @@ -3,6 +3,7 @@ use std::{ io::{Seek, Write}, }; +use futures::{AsyncSeek, AsyncWrite}; use parquet_format_async_temp::RowGroup; use crate::{ @@ -12,7 +13,10 @@ use crate::{ page::CompressedPage, }; -use super::{column_chunk::write_column_chunk, DynIter}; +use super::{ + column_chunk::{write_column_chunk, write_column_chunk_async}, + DynIter, +}; fn same_elements(arr: &[T]) -> Option> { if arr.is_empty() { @@ -78,3 +82,57 @@ where ordinal: None, }) } + +pub async fn write_row_group_async< + W, + E, // external error any of the iterators may emit +>( + writer: &mut W, + descriptors: &[ColumnDescriptor], + compression: Compression, + columns: DynIter>, E>>, +) -> Result +where + W: AsyncWrite + AsyncSeek + Unpin + Send, + E: Error + Send + Sync + 'static, +{ + let column_iter = descriptors.iter().zip(columns); + + let mut columns = vec![]; + for (descriptor, page_iter) in column_iter { + let spec = write_column_chunk_async( + writer, + descriptor, + compression, + page_iter.map_err(ParquetError::from_external_error)?, + ) + .await?; + columns.push(spec); + } + + // compute row group stats + let num_rows = columns + .iter() + .map(|c| c.meta_data.as_ref().unwrap().num_values) + .collect::>(); + let num_rows = match same_elements(&num_rows) { + None => return Err(general_err!("Every column chunk in a row group MUST have the same number of rows. The columns have rows: {:?}", num_rows)), + Some(None) => 0, + Some(Some(v)) => v + }; + + let total_byte_size = columns + .iter() + .map(|c| c.meta_data.as_ref().unwrap().total_compressed_size) + .sum(); + + Ok(RowGroup { + columns, + total_byte_size, + num_rows, + sorting_columns: None, + file_offset: None, + total_compressed_size: None, + ordinal: None, + }) +} diff --git a/src/write/stream.rs b/src/write/stream.rs index 1bd104f7c..bbbdc32b4 100644 --- a/src/write/stream.rs +++ b/src/write/stream.rs @@ -28,7 +28,7 @@ pub async fn write_stream<'a, W, S, E>( ) -> Result<()> where W: Write + Seek, - S: Stream, E>>, + S: Stream, E>>, E: Error + Send + Sync + 'static, { start_file(writer)?; @@ -63,3 +63,5 @@ where end_file(writer, metadata)?; Ok(()) } + +pub use super::stream_stream::write_stream_stream; diff --git a/src/write/stream_stream.rs b/src/write/stream_stream.rs new file mode 100644 index 000000000..c77a748d1 --- /dev/null +++ b/src/write/stream_stream.rs @@ -0,0 +1,102 @@ +use std::{ + error::Error, + io::{SeekFrom, Write}, +}; + +use futures::{ + pin_mut, stream::Stream, AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt, StreamExt, +}; + +use parquet_format_async_temp::{ + thrift::protocol::{TCompactOutputStreamProtocol, TOutputStreamProtocol}, + FileMetaData, +}; + +use crate::{ + error::{ParquetError, Result}, + metadata::{KeyValue, SchemaDescriptor}, + FOOTER_SIZE, PARQUET_MAGIC, +}; + +use super::{row_group::write_row_group_async, RowGroupIter, WriteOptions}; + +async fn start_file(writer: &mut W) -> Result<()> { + Ok(writer.write_all(&PARQUET_MAGIC).await?) +} + +async fn end_file( + mut writer: &mut W, + metadata: FileMetaData, +) -> Result<()> { + // Write file metadata + let start_pos = writer.seek(SeekFrom::Current(0)).await?; + { + let mut protocol = TCompactOutputStreamProtocol::new(&mut writer); + metadata.write_to_out_stream_protocol(&mut protocol).await?; + protocol.flush().await? + } + let end_pos = writer.seek(SeekFrom::Current(0)).await?; + let metadata_len = (end_pos - start_pos) as i32; + + // Write footer + let metadata_len = metadata_len.to_le_bytes(); + let mut footer_buffer = [0u8; FOOTER_SIZE as usize]; + (0..4).for_each(|i| { + footer_buffer[i] = metadata_len[i]; + }); + + (&mut footer_buffer[4..]).write_all(&PARQUET_MAGIC)?; + writer.write_all(&footer_buffer).await?; + Ok(()) +} + +/// Given a stream of [`RowGroupIter`] and and an `async` writer, returns a future +/// of writing a parquet file to the writer. +pub async fn write_stream_stream( + writer: &mut W, + row_groups: S, + schema: SchemaDescriptor, + options: WriteOptions, + created_by: Option, + key_value_metadata: Option>, +) -> Result<()> +where + W: AsyncWrite + AsyncSeek + Unpin + Send, + S: Stream, E>>, + E: Error + Send + Sync + 'static, +{ + start_file(writer).await?; + + let mut row_groups_c = vec![]; + + pin_mut!(row_groups); + while let Some(row_group) = row_groups.next().await { + //for row_group in row_groups { + let row_group = write_row_group_async( + writer, + schema.columns(), + options.compression, + row_group.map_err(ParquetError::from_external_error)?, + ) + .await?; + row_groups_c.push(row_group); + } + + // compute file stats + let num_rows = row_groups_c.iter().map(|group| group.num_rows).sum(); + + let metadata = FileMetaData::new( + options.version.into(), + schema.into_thrift()?, + num_rows, + row_groups_c, + key_value_metadata, + created_by, + None, + None, + None, + ); + + end_file(writer, metadata).await?; + Ok(()) +}