Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Bumped parquet2 dependency #1304

Merged
merged 3 commits into from
Dec 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,6 @@ futures = { version = "0.3", optional = true }
# to read IPC as a stream
async-stream = { version = "0.3.2", optional = true }

# parquet support
parquet2 = { version = "0.16", optional = true, default_features = false, features = ["async"] }

# avro support
avro-schema = { version = "0.3", optional = true }

Expand Down Expand Up @@ -108,6 +105,13 @@ getrandom = { version = "0.2", features = ["js"] }
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
ahash = { version = "0.8", features=["runtime-rng"] }

# parquet support
[dependencies.parquet2]
version = "0.17"
optional = true
default_features = false
features = ["async"]

[dev-dependencies]
criterion = "0.3"
flate2 = "1"
Expand Down
3 changes: 1 addition & 2 deletions src/io/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ pub use parquet2::{
decompress, get_column_iterator, get_page_stream,
read_columns_indexes as _read_columns_indexes, read_metadata as _read_metadata,
read_metadata_async as _read_metadata_async, read_pages_locations, BasicDecompressor,
ColumnChunkIter, Decompressor, MutStreamingIterator, PageFilter, PageReader,
ReadColumnIterator, State,
Decompressor, MutStreamingIterator, PageFilter, PageReader, ReadColumnIterator, State,
},
schema::types::{
GroupLogicalType, ParquetType, PhysicalType, PrimitiveConvertedType, PrimitiveLogicalType,
Expand Down
10 changes: 5 additions & 5 deletions src/io/parquet/write/dictionary.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use parquet2::{
encoding::{hybrid_rle::encode_u32, Encoding},
page::{DictPage, EncodedPage},
page::{DictPage, Page},
schema::types::PrimitiveType,
statistics::{serialize_statistics, ParquetStatistics},
write::DynIter,
Expand Down Expand Up @@ -106,7 +106,7 @@ fn serialize_keys<K: DictionaryKey>(
nested: &[Nested],
statistics: ParquetStatistics,
options: WriteOptions,
) -> Result<EncodedPage> {
) -> Result<Page> {
let mut buffer = vec![];

// parquet only accepts a single validity - we "&" the validities into a single one
Expand Down Expand Up @@ -142,7 +142,7 @@ fn serialize_keys<K: DictionaryKey>(
options,
Encoding::RleDictionary,
)
.map(EncodedPage::Data)
.map(Page::Data)
}

macro_rules! dyn_prim {
Expand All @@ -162,7 +162,7 @@ pub fn array_to_pages<K: DictionaryKey>(
nested: &[Nested],
options: WriteOptions,
encoding: Encoding,
) -> Result<DynIter<'static, Result<EncodedPage>>> {
) -> Result<DynIter<'static, Result<Page>>> {
match encoding {
Encoding::PlainDictionary | Encoding::RleDictionary => {
// write DictPage
Expand Down Expand Up @@ -230,7 +230,7 @@ pub fn array_to_pages<K: DictionaryKey>(
)))
}
};
let dict_page = EncodedPage::Dict(dict_page);
let dict_page = Page::Dict(dict_page);

// write DataPage pointing to DictPage
let data_page = serialize_keys(array, type_, nested, statistics, options)?;
Expand Down
18 changes: 9 additions & 9 deletions src/io/parquet/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub use parquet2::{
encoding::Encoding,
fallible_streaming_iterator,
metadata::{Descriptor, FileMetaData, KeyValue, SchemaDescriptor, ThriftFileMetaData},
page::{CompressedDataPage, CompressedPage, EncodedPage},
page::{CompressedDataPage, CompressedPage, Page},
schema::types::{FieldInfo, ParquetType, PhysicalType as ParquetPhysicalType},
write::{
compress, write_metadata_sidecar, Compressor, DynIter, DynStreamingIterator, RowGroupIter,
Expand Down Expand Up @@ -130,15 +130,15 @@ pub fn can_encode(data_type: &DataType, encoding: Encoding) -> bool {
)
}

/// Returns an iterator of [`EncodedPage`].
/// Returns an iterator of [`Page`].
#[allow(clippy::needless_collect)]
pub fn array_to_pages(
array: &dyn Array,
type_: ParquetPrimitiveType,
nested: &[Nested],
options: WriteOptions,
encoding: Encoding,
) -> Result<DynIter<'static, Result<EncodedPage>>> {
) -> Result<DynIter<'static, Result<Page>>> {
// maximum page size is 2^31 e.g. i32::MAX
// we split at 2^31 - 2^25 to err on the safe side
// we also check for an array.len > 3 to prevent infinite recursion
Expand Down Expand Up @@ -175,7 +175,7 @@ pub fn array_to_pages(
((array_byte_size as f64) / ((array.len() + 1) as f64)) as usize;
let rows_per_page = (page_size / (bytes_per_row + 1)).max(1);

let vs: Vec<Result<EncodedPage>> = (0..array.len())
let vs: Vec<Result<Page>> = (0..array.len())
.step_by(rows_per_page)
.map(|offset| {
let length = if offset + rows_per_page > array.len() {
Expand All @@ -202,7 +202,7 @@ pub fn array_to_page(
nested: &[Nested],
options: WriteOptions,
encoding: Encoding,
) -> Result<EncodedPage> {
) -> Result<Page> {
if nested.len() == 1 {
// special case where validity == def levels
return array_to_page_simple(array, type_, options, encoding);
Expand All @@ -216,7 +216,7 @@ pub fn array_to_page_simple(
type_: ParquetPrimitiveType,
options: WriteOptions,
encoding: Encoding,
) -> Result<EncodedPage> {
) -> Result<Page> {
let data_type = array.data_type();
if !can_encode(data_type, encoding) {
return Err(Error::InvalidArgumentError(format!(
Expand Down Expand Up @@ -439,7 +439,7 @@ pub fn array_to_page_simple(
other
))),
}
.map(EncodedPage::Data)
.map(Page::Data)
}

fn array_to_page_nested(
Expand All @@ -448,7 +448,7 @@ fn array_to_page_nested(
nested: &[Nested],
options: WriteOptions,
_encoding: Encoding,
) -> Result<EncodedPage> {
) -> Result<Page> {
use DataType::*;
match array.data_type().to_logical_type() {
Null => {
Expand Down Expand Up @@ -520,7 +520,7 @@ fn array_to_page_nested(
other
))),
}
.map(EncodedPage::Data)
.map(Page::Data)
}

fn transverse_recursive<T, F: Fn(&DataType) -> T + Clone>(
Expand Down
6 changes: 3 additions & 3 deletions src/io/parquet/write/pages.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use parquet2::schema::types::{ParquetType, PrimitiveType as ParquetPrimitiveType};
use parquet2::{page::EncodedPage, write::DynIter};
use parquet2::{page::Page, write::DynIter};

use crate::array::{ListArray, Offset, StructArray};
use crate::bitmap::Bitmap;
Expand Down Expand Up @@ -193,13 +193,13 @@ fn to_parquet_leafs_recursive(type_: ParquetType, leafs: &mut Vec<ParquetPrimiti
}
}

/// Returns a vector of iterators of [`EncodedPage`], one per leaf column in the array
/// Returns a vector of iterators of [`Page`], one per leaf column in the array
pub fn array_to_columns<A: AsRef<dyn Array> + Send + Sync>(
array: A,
type_: ParquetType,
options: WriteOptions,
encoding: &[Encoding],
) -> Result<Vec<DynIter<'static, Result<EncodedPage>>>> {
) -> Result<Vec<DynIter<'static, Result<Page>>>> {
let array = array.as_ref();
let nested = to_nested(array, &type_)?;

Expand Down
9 changes: 3 additions & 6 deletions tests/it/io/parquet/read_indexes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@ use arrow2::io::parquet::read::indexes;
use arrow2::{array::*, datatypes::*, error::Result, io::parquet::read::*, io::parquet::write::*};

/// Returns 2 sets of pages with different the same number of rows distributed un-evenly
fn pages(
arrays: &[&dyn Array],
encoding: Encoding,
) -> Result<(Vec<EncodedPage>, Vec<EncodedPage>, Schema)> {
fn pages(arrays: &[&dyn Array], encoding: Encoding) -> Result<(Vec<Page>, Vec<Page>, Schema)> {
// create pages with different number of rows
let array11 = PrimitiveArray::<i64>::from_slice([1, 2, 3, 4]);
let array12 = PrimitiveArray::<i64>::from_slice([5]);
Expand Down Expand Up @@ -73,7 +70,7 @@ fn pages(

/// Tests reading pages while skipping indexes
fn read_with_indexes(
(pages1, pages2, schema): (Vec<EncodedPage>, Vec<EncodedPage>, Schema),
(pages1, pages2, schema): (Vec<Page>, Vec<Page>, Schema),
expected: Box<dyn Array>,
) -> Result<()> {
let options = WriteOptions {
Expand All @@ -83,7 +80,7 @@ fn read_with_indexes(
data_pagesize_limit: None,
};

let to_compressed = |pages: Vec<EncodedPage>| {
let to_compressed = |pages: Vec<Page>| {
let encoded_pages = DynIter::new(pages.into_iter().map(Ok));
let compressed_pages =
Compressor::new(encoded_pages, options.compression, vec![]).map_err(Error::from);
Expand Down