Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Fixed writing required list to parquet #968

Merged
merged 1 commit into from
Apr 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 49 additions & 20 deletions src/io/parquet/write/levels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,14 @@ impl<O: Offset> Iterator for RepLevelsIter<'_, O> {
}
}

enum OffsetsIter<'a, O> {
Optional(std::iter::Zip<std::slice::Windows<'a, O>, BitmapIter<'a>>),
Required(std::slice::Windows<'a, O>),
}

/// Iterator adapter of parquet / dremel definition levels
pub struct DefLevelsIter<'a, O: Offset> {
iter: std::iter::Zip<std::slice::Windows<'a, O>, Box<dyn Iterator<Item = bool> + 'a>>,
iter: OffsetsIter<'a, O>,
primitive_validity: Option<BitmapIter<'a>>,
remaining: usize,
is_valid: bool,
Expand All @@ -92,15 +97,12 @@ impl<'a, O: Offset> DefLevelsIter<'a, O> {

let primitive_validity = primitive_validity.map(|x| x.iter());

let validity = validity
.map(|x| Box::new(x.iter()) as Box<dyn Iterator<Item = bool>>)
.unwrap_or_else(|| {
Box::new(std::iter::repeat(true).take(offsets.len() - 1))
as Box<dyn Iterator<Item = bool>>
});
let iter = validity
.map(|x| OffsetsIter::Optional(offsets.windows(2).zip(x.iter())))
.unwrap_or_else(|| OffsetsIter::Required(offsets.windows(2)));

Self {
iter: offsets.windows(2).zip(validity),
iter,
primitive_validity,
remaining: 0,
length: 0,
Expand All @@ -115,18 +117,31 @@ impl<O: Offset> Iterator for DefLevelsIter<'_, O> {

fn next(&mut self) -> Option<Self::Item> {
if self.remaining == self.length {
if let Some((w, is_valid)) = self.iter.next() {
let start = w[0].to_usize();
let end = w[1].to_usize();
self.length = end - start;
self.remaining = 0;
self.is_valid = is_valid;
if self.length == 0 {
self.total_size -= 1;
return Some(is_valid as u32);
match &mut self.iter {
OffsetsIter::Optional(iter) => {
let (w, is_valid) = iter.next()?;
let start = w[0].to_usize();
let end = w[1].to_usize();
self.length = end - start;
self.remaining = 0;
self.is_valid = is_valid;
if self.length == 0 {
self.total_size -= 1;
return Some(self.is_valid as u32);
}
}
OffsetsIter::Required(iter) => {
let w = iter.next()?;
let start = w[0].to_usize();
let end = w[1].to_usize();
self.length = end - start;
self.remaining = 0;
self.is_valid = true;
if self.length == 0 {
self.total_size -= 1;
return Some(0);
}
}
} else {
return None;
}
}
self.remaining += 1;
Expand Down Expand Up @@ -218,7 +233,7 @@ pub fn write_def_levels<O: Offset>(
validity: Option<&Bitmap>,
version: Version,
) -> Result<()> {
let num_bits = 2;
let num_bits = 1 + validity.is_some() as u8;

match version {
Version::V1 => {
Expand Down Expand Up @@ -252,6 +267,7 @@ mod tests {

#[test]
fn test_def_levels() {
// [[0, 1], None, [2, None, 3], [4, 5, 6], [], [7, 8, 9], None, [10]]
let offsets = [0, 2, 2, 5, 8, 8, 11, 11, 12].as_ref();
let validity = Some(Bitmap::from([
true, false, true, true, true, true, false, true,
Expand All @@ -269,4 +285,17 @@ mod tests {
.collect::<Vec<_>>();
assert_eq!(result, expected)
}

#[test]
fn test_def_levels1() {
// [[0, 1], [], [2, 0, 3], [4, 5, 6], [], [7, 8, 9], [], [10]]
let offsets = [0, 2, 2, 5, 8, 8, 11, 11, 12].as_ref();
let validity = None;
let primitive_validity = None;
let expected = vec![1u32, 1, 0, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 0, 1];

let result = DefLevelsIter::new(offsets, validity.as_ref(), primitive_validity.as_ref())
.collect::<Vec<_>>();
assert_eq!(result, expected)
}
}
2 changes: 1 addition & 1 deletion tests/it/io/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ pub fn pyarrow_nested_nullable(column: &str) -> Box<dyn Array> {

match column {
"list_int64_required_required" => {
// [[0, 1], [], [2, None, 3], [4, 5, 6], [], [7, 8, 9], [], [10]]
// [[0, 1], [], [2, 0, 3], [4, 5, 6], [], [7, 8, 9], [], [10]]
let data_type = DataType::List(Box::new(Field::new("item", DataType::Int64, false)));
Box::new(ListArray::<i32>::from_data(
data_type, offsets, values, None,
Expand Down
2 changes: 1 addition & 1 deletion tests/it/io/parquet/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ fn v1_nested_i16_dict() -> Result<()> {
}

#[test]
fn v2_nested_i16_required_dict() -> Result<()> {
fn v1_nested_i16_required_dict() -> Result<()> {
test_pyarrow_integration(
"list_int64_required_required",
1,
Expand Down
24 changes: 24 additions & 0 deletions tests/it/io/parquet/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,30 @@ fn list_int64_optional_v1() -> Result<()> {
)
}

#[test]
fn list_int64_required_required_v1() -> Result<()> {
round_trip(
"list_int64_required_required",
false,
true,
Version::V1,
CompressionOptions::Uncompressed,
Encoding::Plain,
)
}

#[test]
fn list_int64_required_required_v2() -> Result<()> {
round_trip(
"list_int64_required_required",
false,
true,
Version::V2,
CompressionOptions::Uncompressed,
Encoding::Plain,
)
}

#[test]
fn list_bool_optional_v2() -> Result<()> {
round_trip(
Expand Down