Skip to content

Commit

Permalink
Fix reading of dictionary encoded pages with null values (#1111) (#1130)
Browse files Browse the repository at this point in the history
* fix reading of dictionary encoded pages with null values

* fix linting issues
  • Loading branch information
yordan-pavlov authored Jan 5, 2022
1 parent bc2d7aa commit 430bdd4
Show file tree
Hide file tree
Showing 2 changed files with 355 additions and 12 deletions.
147 changes: 147 additions & 0 deletions parquet/src/arrow/array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2467,6 +2467,153 @@ mod tests {
);
}

#[test]
fn test_complex_array_reader_dict_enc_string() {
use crate::encoding::{DictEncoder, Encoder};
use crate::memory::MemTracker;
// Construct column schema
let message_type = "
message test_schema {
REPEATED Group test_mid {
OPTIONAL BYTE_ARRAY leaf (UTF8);
}
}
";
let num_pages = 2;
let values_per_page = 100;
let str_base = "Hello World";

let schema = parse_message_type(message_type)
.map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
.unwrap();
let column_desc = schema.column(0);
let max_def_level = column_desc.max_def_level();
let max_rep_level = column_desc.max_rep_level();

assert_eq!(max_def_level, 2);
assert_eq!(max_rep_level, 1);

let mut rng = thread_rng();
let mut pages: Vec<Vec<Page>> = Vec::new();

let mut rep_levels = Vec::with_capacity(num_pages * values_per_page);
let mut def_levels = Vec::with_capacity(num_pages * values_per_page);
let mut all_values = Vec::with_capacity(num_pages * values_per_page);

for i in 0..num_pages {
let mem_tracker = Arc::new(MemTracker::new());
let mut dict_encoder =
DictEncoder::<ByteArrayType>::new(column_desc.clone(), mem_tracker);
// add data page
let mut values = Vec::with_capacity(values_per_page);

for _ in 0..values_per_page {
let def_level = rng.gen_range(0..max_def_level + 1);
let rep_level = rng.gen_range(0..max_rep_level + 1);
if def_level == max_def_level {
let len = rng.gen_range(1..str_base.len());
let slice = &str_base[..len];
values.push(ByteArray::from(slice));
all_values.push(Some(slice.to_string()));
} else {
all_values.push(None)
}
rep_levels.push(rep_level);
def_levels.push(def_level)
}

let range = i * values_per_page..(i + 1) * values_per_page;
let mut pb =
DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
pb.add_rep_levels(max_rep_level, &rep_levels.as_slice()[range.clone()]);
pb.add_def_levels(max_def_level, &def_levels.as_slice()[range]);
let _ = dict_encoder.put(&values);
let indices = dict_encoder
.write_indices()
.expect("write_indices() should be OK");
pb.add_indices(indices);
let data_page = pb.consume();
// for each page log num_values vs actual values in page
// println!("page num_values: {}, values.len(): {}", data_page.num_values(), values.len());
// add dictionary page
let dict = dict_encoder
.write_dict()
.expect("write_dict() should be OK");
let dict_page = Page::DictionaryPage {
buf: dict,
num_values: dict_encoder.num_entries() as u32,
encoding: Encoding::RLE_DICTIONARY,
is_sorted: false,
};
pages.push(vec![dict_page, data_page]);
}

let page_iterator = InMemoryPageIterator::new(schema, column_desc.clone(), pages);
let converter = Utf8Converter::new(Utf8ArrayConverter {});
let mut array_reader =
ComplexObjectArrayReader::<ByteArrayType, Utf8Converter>::new(
Box::new(page_iterator),
column_desc,
converter,
None,
)
.unwrap();

let mut accu_len: usize = 0;

// println!("---------- reading a batch of {} values ----------", values_per_page / 2);
let array = array_reader.next_batch(values_per_page / 2).unwrap();
assert_eq!(array.len(), values_per_page / 2);
assert_eq!(
Some(&def_levels[accu_len..(accu_len + array.len())]),
array_reader.get_def_levels()
);
assert_eq!(
Some(&rep_levels[accu_len..(accu_len + array.len())]),
array_reader.get_rep_levels()
);
accu_len += array.len();

// Read next values_per_page values, the first values_per_page/2 ones are from the first column chunk,
// and the last values_per_page/2 ones are from the second column chunk
// println!("---------- reading a batch of {} values ----------", values_per_page);
let array = array_reader.next_batch(values_per_page).unwrap();
assert_eq!(array.len(), values_per_page);
assert_eq!(
Some(&def_levels[accu_len..(accu_len + array.len())]),
array_reader.get_def_levels()
);
assert_eq!(
Some(&rep_levels[accu_len..(accu_len + array.len())]),
array_reader.get_rep_levels()
);
let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
for i in 0..array.len() {
if array.is_valid(i) {
assert_eq!(
all_values[i + accu_len].as_ref().unwrap().as_str(),
strings.value(i)
)
} else {
assert_eq!(all_values[i + accu_len], None)
}
}
accu_len += array.len();

// Try to read values_per_page values, however there are only values_per_page/2 values
// println!("---------- reading a batch of {} values ----------", values_per_page);
let array = array_reader.next_batch(values_per_page).unwrap();
assert_eq!(array.len(), values_per_page / 2);
assert_eq!(
Some(&def_levels[accu_len..(accu_len + array.len())]),
array_reader.get_def_levels()
);
assert_eq!(
Some(&rep_levels[accu_len..(accu_len + array.len())]),
array_reader.get_rep_levels()
);
}

/// Array reader for test.
struct InMemoryArrayReader {
data_type: ArrowType,
Expand Down
Loading

0 comments on commit 430bdd4

Please sign in to comment.