Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Fixed writing required list (#968)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Apr 29, 2022
1 parent b93333c commit e850cf9
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 22 deletions.
69 changes: 49 additions & 20 deletions src/io/parquet/write/levels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,14 @@ impl<O: Offset> Iterator for RepLevelsIter<'_, O> {
}
}

enum OffsetsIter<'a, O> {
Optional(std::iter::Zip<std::slice::Windows<'a, O>, BitmapIter<'a>>),
Required(std::slice::Windows<'a, O>),
}

/// Iterator adapter of parquet / dremel definition levels
pub struct DefLevelsIter<'a, O: Offset> {
iter: std::iter::Zip<std::slice::Windows<'a, O>, Box<dyn Iterator<Item = bool> + 'a>>,
iter: OffsetsIter<'a, O>,
primitive_validity: Option<BitmapIter<'a>>,
remaining: usize,
is_valid: bool,
Expand All @@ -92,15 +97,12 @@ impl<'a, O: Offset> DefLevelsIter<'a, O> {

let primitive_validity = primitive_validity.map(|x| x.iter());

let validity = validity
.map(|x| Box::new(x.iter()) as Box<dyn Iterator<Item = bool>>)
.unwrap_or_else(|| {
Box::new(std::iter::repeat(true).take(offsets.len() - 1))
as Box<dyn Iterator<Item = bool>>
});
let iter = validity
.map(|x| OffsetsIter::Optional(offsets.windows(2).zip(x.iter())))
.unwrap_or_else(|| OffsetsIter::Required(offsets.windows(2)));

Self {
iter: offsets.windows(2).zip(validity),
iter,
primitive_validity,
remaining: 0,
length: 0,
Expand All @@ -115,18 +117,31 @@ impl<O: Offset> Iterator for DefLevelsIter<'_, O> {

fn next(&mut self) -> Option<Self::Item> {
if self.remaining == self.length {
if let Some((w, is_valid)) = self.iter.next() {
let start = w[0].to_usize();
let end = w[1].to_usize();
self.length = end - start;
self.remaining = 0;
self.is_valid = is_valid;
if self.length == 0 {
self.total_size -= 1;
return Some(is_valid as u32);
match &mut self.iter {
OffsetsIter::Optional(iter) => {
let (w, is_valid) = iter.next()?;
let start = w[0].to_usize();
let end = w[1].to_usize();
self.length = end - start;
self.remaining = 0;
self.is_valid = is_valid;
if self.length == 0 {
self.total_size -= 1;
return Some(self.is_valid as u32);
}
}
OffsetsIter::Required(iter) => {
let w = iter.next()?;
let start = w[0].to_usize();
let end = w[1].to_usize();
self.length = end - start;
self.remaining = 0;
self.is_valid = true;
if self.length == 0 {
self.total_size -= 1;
return Some(0);
}
}
} else {
return None;
}
}
self.remaining += 1;
Expand Down Expand Up @@ -218,7 +233,7 @@ pub fn write_def_levels<O: Offset>(
validity: Option<&Bitmap>,
version: Version,
) -> Result<()> {
let num_bits = 2;
let num_bits = 1 + validity.is_some() as u8;

match version {
Version::V1 => {
Expand Down Expand Up @@ -252,6 +267,7 @@ mod tests {

#[test]
fn test_def_levels() {
// [[0, 1], None, [2, None, 3], [4, 5, 6], [], [7, 8, 9], None, [10]]
let offsets = [0, 2, 2, 5, 8, 8, 11, 11, 12].as_ref();
let validity = Some(Bitmap::from([
true, false, true, true, true, true, false, true,
Expand All @@ -269,4 +285,17 @@ mod tests {
.collect::<Vec<_>>();
assert_eq!(result, expected)
}

#[test]
fn test_def_levels1() {
// [[0, 1], [], [2, 0, 3], [4, 5, 6], [], [7, 8, 9], [], [10]]
let offsets = [0, 2, 2, 5, 8, 8, 11, 11, 12].as_ref();
let validity = None;
let primitive_validity = None;
let expected = vec![1u32, 1, 0, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 0, 1];

let result = DefLevelsIter::new(offsets, validity.as_ref(), primitive_validity.as_ref())
.collect::<Vec<_>>();
assert_eq!(result, expected)
}
}
2 changes: 1 addition & 1 deletion tests/it/io/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ pub fn pyarrow_nested_nullable(column: &str) -> Box<dyn Array> {

match column {
"list_int64_required_required" => {
// [[0, 1], [], [2, None, 3], [4, 5, 6], [], [7, 8, 9], [], [10]]
// [[0, 1], [], [2, 0, 3], [4, 5, 6], [], [7, 8, 9], [], [10]]
let data_type = DataType::List(Box::new(Field::new("item", DataType::Int64, false)));
Box::new(ListArray::<i32>::from_data(
data_type, offsets, values, None,
Expand Down
2 changes: 1 addition & 1 deletion tests/it/io/parquet/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ fn v1_nested_i16_dict() -> Result<()> {
}

#[test]
fn v2_nested_i16_required_dict() -> Result<()> {
fn v1_nested_i16_required_dict() -> Result<()> {
test_pyarrow_integration(
"list_int64_required_required",
1,
Expand Down
24 changes: 24 additions & 0 deletions tests/it/io/parquet/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,30 @@ fn list_int64_optional_v1() -> Result<()> {
)
}

#[test]
fn list_int64_required_required_v1() -> Result<()> {
round_trip(
"list_int64_required_required",
false,
true,
Version::V1,
CompressionOptions::Uncompressed,
Encoding::Plain,
)
}

#[test]
fn list_int64_required_required_v2() -> Result<()> {
round_trip(
"list_int64_required_required",
false,
true,
Version::V2,
CompressionOptions::Uncompressed,
Encoding::Plain,
)
}

#[test]
fn list_bool_optional_v2() -> Result<()> {
round_trip(
Expand Down

0 comments on commit e850cf9

Please sign in to comment.