Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added test for indexed read
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Mar 22, 2022
1 parent 68ee80b commit 5e986bb
Show file tree
Hide file tree
Showing 6 changed files with 160 additions and 12 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ ahash = { version = "0.7", optional = true }

# parquet support
#parquet2 = { version = "0.10", optional = true, default_features = false, features = ["stream"] }
parquet2 = { git = "https://github.com/jorgecarleitao/parquet2", branch = "write_indexes", optional = true, default_features = false, features = ["stream"] }
parquet2 = { git = "https://github.com/jorgecarleitao/parquet2", rev = "40caedeebda6641443b7fc99f0adeb38e6bec518", optional = true, default_features = false, features = ["stream"] }

# avro support
avro-schema = { version = "0.2", optional = true }
Expand Down
12 changes: 8 additions & 4 deletions src/io/parquet/read/deserialize/binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,19 @@ fn read_delta_optional<O: Offset>(

#[derive(Debug)]
pub(super) struct Required<'a> {
pub values: BinaryIter<'a>,
pub values: std::iter::Take<std::iter::Skip<BinaryIter<'a>>>,
// here because `BinaryIter` has no size_hint.
pub remaining: usize,
}

impl<'a> Required<'a> {
pub fn new(page: &'a DataPage) -> Self {
let values = BinaryIter::new(page.buffer());
let (offset, length) = page.rows.unwrap_or((0, page.num_values()));

Self {
values: BinaryIter::new(page.buffer()),
remaining: page.num_values(),
values: values.skip(offset).take(length),
remaining: length,
}
}
}
Expand Down Expand Up @@ -236,7 +240,7 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
page_values,
),
State::Required(page) => {
page.remaining -= additional;
page.remaining = page.remaining.saturating_sub(additional);
for x in page.values.by_ref().take(additional) {
values.push(x)
}
Expand Down
27 changes: 25 additions & 2 deletions src/io/parquet/read/deserialize/primitive/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,29 @@ use super::super::utils;
use super::super::utils::OptionalPageValidity;
use super::super::DataPages;

#[derive(Debug)]
pub(super) struct RequiredValues<'a> {
pub values: std::iter::Take<std::iter::Skip<std::slice::ChunksExact<'a, u8>>>,
}

impl<'a> RequiredValues<'a> {
pub fn new<P: ParquetNativeType>(page: &'a DataPage) -> Self {
let (_, _, values) = utils::split_buffer(page);
assert_eq!(values.len() % std::mem::size_of::<P>(), 0);
let values = values.chunks_exact(std::mem::size_of::<P>());

let (offset, length) = page.rows.unwrap_or((0, page.num_values()));
Self {
values: values.skip(offset).take(length),
}
}

#[inline]
pub fn len(&self) -> usize {
self.values.size_hint().0
}
}

#[derive(Debug)]
pub(super) struct Values<'a> {
pub values: std::slice::ChunksExact<'a, u8>,
Expand Down Expand Up @@ -73,7 +96,7 @@ where
P: ParquetNativeType,
{
Optional(OptionalPageValidity<'a>, Values<'a>),
Required(Values<'a>),
Required(RequiredValues<'a>),
RequiredDictionary(ValuesDictionary<'a, P>),
OptionalDictionary(OptionalPageValidity<'a>, ValuesDictionary<'a, P>),
}
Expand Down Expand Up @@ -158,7 +181,7 @@ where

Ok(State::Optional(validity, values))
}
(Encoding::Plain, _, false) => Ok(State::Required(Values::new::<P>(page))),
(Encoding::Plain, _, false) => Ok(State::Required(RequiredValues::new::<P>(page))),
_ => Err(utils::not_implemented(
&page.encoding(),
is_optional,
Expand Down
4 changes: 2 additions & 2 deletions src/io/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ pub use parquet2::{
decompress, get_column_iterator, get_page_iterator as _get_page_iterator,
get_page_stream as _get_page_stream, read_columns_indexes as _read_columns_indexes,
read_metadata as _read_metadata, read_metadata_async as _read_metadata_async,
BasicDecompressor, ColumnChunkIter, Decompressor, MutStreamingIterator, PageFilter,
PageReader, ReadColumnIterator, State,
read_pages_locations, BasicDecompressor, ColumnChunkIter, Decompressor,
MutStreamingIterator, PageFilter, PageReader, ReadColumnIterator, State,
},
schema::types::{
LogicalType, ParquetType, PhysicalType, PrimitiveConvertedType,
Expand Down
4 changes: 1 addition & 3 deletions src/io/parquet/read/row_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,7 @@ pub fn to_deserializer<'a>(
.map(|(column_meta, chunk)| {
let pages = PageReader::new(
std::io::Cursor::new(chunk),
column_meta.num_values(),
column_meta.compression(),
column_meta.descriptor().descriptor.clone(),
column_meta,
Arc::new(|_, _| true),
vec![],
);
Expand Down
123 changes: 123 additions & 0 deletions tests/it/io/parquet/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use std::io::{Cursor, Read, Seek};
use std::sync::Arc;

use arrow2::error::ArrowError;
use arrow2::{
array::*, bitmap::Bitmap, buffer::Buffer, chunk::Chunk, datatypes::*, error::Result,
io::parquet::read::statistics::*, io::parquet::read::*, io::parquet::write::*,
};
use parquet2::indexes::{compute_rows, select_pages};
use parquet2::read::IndexedPageReader;

use crate::io::ipc::read_gzip_json;

Expand Down Expand Up @@ -830,3 +833,123 @@ fn arrow_type() -> Result<()> {
assert_eq!(new_batches, vec![batch]);
Ok(())
}

/// Returns 2 sets of pages with different the same number of rows distributed un-evenly
fn pages() -> Result<(Vec<EncodedPage>, Vec<EncodedPage>, Schema)> {
// create pages with different number of rows
let array11 = PrimitiveArray::<i64>::from_slice([1, 2, 3, 4, 5]);
let array12 = PrimitiveArray::<i64>::from_slice([6]);
let array21 = Utf8Array::<i32>::from_slice(["a", "b", "c"]);
let array22 = Utf8Array::<i32>::from_slice(["d", "e", "f"]);

let schema = Schema::from(vec![
Field::new("a1", DataType::Int64, false),
Field::new("a2", DataType::Utf8, false),
]);

let parquet_schema = to_parquet_schema(&schema)?;

let options = WriteOptions {
write_statistics: true,
compression: Compression::Uncompressed,
version: Version::V1,
};

let pages1 = vec![
array_to_page(
&array11,
parquet_schema.columns()[0].descriptor.clone(),
options,
Encoding::Plain,
)?,
array_to_page(
&array12,
parquet_schema.columns()[0].descriptor.clone(),
options,
Encoding::Plain,
)?,
];
let pages2 = vec![
array_to_page(
&array21,
parquet_schema.columns()[1].descriptor.clone(),
options,
Encoding::Plain,
)?,
array_to_page(
&array22,
parquet_schema.columns()[1].descriptor.clone(),
options,
Encoding::Plain,
)?,
];

Ok((pages1, pages2, schema))
}

/// Tests that when arrow-specific types (Duration and LargeUtf8) are written to parquet, we can rountrip its
/// logical types.
#[test]
fn read_with_indexes() -> Result<()> {
let (pages1, pages2, schema) = pages()?;

let options = WriteOptions {
write_statistics: true,
compression: Compression::Uncompressed,
version: Version::V1,
};

let to_compressed = |pages: Vec<EncodedPage>| {
let encoded_pages = DynIter::new(pages.into_iter().map(Ok));
let compressed_pages =
Compressor::new(encoded_pages, options.compression, vec![]).map_err(ArrowError::from);
Result::Ok(DynStreamingIterator::new(compressed_pages))
};

let row_group = DynIter::new(vec![to_compressed(pages1), to_compressed(pages2)].into_iter());

let writer = vec![];
let mut writer = FileWriter::try_new(writer, schema, options)?;

writer.start()?;
writer.write(row_group)?;
let (_, data) = writer.end(None)?;

let mut reader = Cursor::new(data);

let metadata = read_metadata(&mut reader)?;

let schema = infer_schema(&metadata)?;

let row_group = &metadata.row_groups[0];

let pages = read_pages_locations(&mut reader, row_group.columns())?;

// say we concluded from the indexes that we only needed the "6" from the first column, so second page.
let _indexes = read_columns_indexes(&mut reader, row_group.columns(), &schema.fields)?;
let intervals = compute_rows(&[false, true], &pages[0], row_group.num_rows() as u64)?;

// based on the intervals from c1, we compute which pages from the second column are required:
let pages = select_pages(&intervals, &pages[1], row_group.num_rows() as u64)?;

// and read them:
let c1 = &metadata.row_groups[0].columns()[1];

let pages = IndexedPageReader::new(reader, c1, pages, vec![], vec![]);
let pages = BasicDecompressor::new(pages, vec![]);

let arrays = column_iter_to_arrays(
vec![pages],
vec![&c1.descriptor().descriptor.primitive_type],
schema.fields[1].clone(),
row_group.num_rows() as usize,
)?;

let arrays = arrays.collect::<Result<Vec<_>>>()?;

assert_eq!(
arrays,
vec![Arc::new(Utf8Array::<i32>::from_slice(["f"])) as Arc<dyn Array>]
);
Ok(())
}

0 comments on commit 5e986bb

Please sign in to comment.