Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added support to read parquet asynchronously.
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Aug 10, 2021
1 parent bd84193 commit 685849d
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 5 deletions.
25 changes: 22 additions & 3 deletions src/io/parquet/read/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use std::io::{Read, Seek};

use futures::Stream;
use futures::{AsyncRead, AsyncSeek, Stream};
pub use parquet2::{
error::ParquetError,
metadata::{ColumnChunkMetaData, ColumnDescriptor, RowGroupMetaData},
page::{CompressedDataPage, DataPage, DataPageHeader},
read::{
decompress, get_page_iterator as _get_page_iterator, read_metadata as _read_metadata,
decompress, get_page_iterator as _get_page_iterator, get_page_stream as _get_page_stream,
read_metadata as _read_metadata, read_metadata_async as _read_metadata_async,
streaming_iterator, Decompressor, PageIterator, StreamingIterator,
},
schema::types::{
Expand Down Expand Up @@ -48,11 +49,29 @@ pub fn get_page_iterator<'b, RR: Read + Seek>(
)?)
}

/// Reads parquets' metadata.
/// Creates a new iterator of compressed pages.
pub async fn get_page_stream<'a, RR: AsyncRead + Unpin + Send + AsyncSeek>(
metadata: &'a FileMetaData,
row_group: usize,
column: usize,
reader: &'a mut RR,
buffer: Vec<u8>,
) -> Result<impl Stream<Item = std::result::Result<CompressedDataPage, ParquetError>> + 'a> {
Ok(_get_page_stream(metadata, row_group, column, reader, buffer).await?)
}

/// Reads parquets' metadata syncronously.
pub fn read_metadata<R: Read + Seek>(reader: &mut R) -> Result<FileMetaData> {
Ok(_read_metadata(reader)?)
}

/// Reads parquets' metadata asynchronously.
pub async fn read_metadata_async<R: AsyncRead + AsyncSeek + Send + Unpin>(
reader: &mut R,
) -> Result<FileMetaData> {
Ok(_read_metadata_async(reader).await?)
}

pub fn page_iter_to_array<
I: StreamingIterator<Item = std::result::Result<DataPage, ParquetError>>,
>(
Expand Down
5 changes: 3 additions & 2 deletions src/io/parquet/write/utf8/nested.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use parquet2::encoding::Encoding;
use parquet2::{metadata::ColumnDescriptor, page::CompressedDataPage, write::WriteOptions};
use parquet2::{
encoding::Encoding, metadata::ColumnDescriptor, page::CompressedDataPage, write::WriteOptions,
};

use super::super::{levels, utils};
use super::basic::{build_statistics, encode_plain};
Expand Down

0 comments on commit 685849d

Please sign in to comment.