From 430bdd4df258561e77f49b5864956170a222818a Mon Sep 17 00:00:00 2001 From: Yordan Pavlov <64363766+yordan-pavlov@users.noreply.github.com> Date: Wed, 5 Jan 2022 19:57:19 +0000 Subject: [PATCH] Fix reading of dictionary encoded pages with null values (#1111) (#1130) * fix reading of dictionary encoded pages with null values * fix linting issues --- parquet/src/arrow/array_reader.rs | 147 ++++++++++++++++ parquet/src/arrow/arrow_array_reader.rs | 220 ++++++++++++++++++++++-- 2 files changed, 355 insertions(+), 12 deletions(-) diff --git a/parquet/src/arrow/array_reader.rs b/parquet/src/arrow/array_reader.rs index ef8cf701090b..21900765bc74 100644 --- a/parquet/src/arrow/array_reader.rs +++ b/parquet/src/arrow/array_reader.rs @@ -2467,6 +2467,153 @@ mod tests { ); } + #[test] + fn test_complex_array_reader_dict_enc_string() { + use crate::encoding::{DictEncoder, Encoder}; + use crate::memory::MemTracker; + // Construct column schema + let message_type = " + message test_schema { + REPEATED Group test_mid { + OPTIONAL BYTE_ARRAY leaf (UTF8); + } + } + "; + let num_pages = 2; + let values_per_page = 100; + let str_base = "Hello World"; + + let schema = parse_message_type(message_type) + .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t)))) + .unwrap(); + let column_desc = schema.column(0); + let max_def_level = column_desc.max_def_level(); + let max_rep_level = column_desc.max_rep_level(); + + assert_eq!(max_def_level, 2); + assert_eq!(max_rep_level, 1); + + let mut rng = thread_rng(); + let mut pages: Vec> = Vec::new(); + + let mut rep_levels = Vec::with_capacity(num_pages * values_per_page); + let mut def_levels = Vec::with_capacity(num_pages * values_per_page); + let mut all_values = Vec::with_capacity(num_pages * values_per_page); + + for i in 0..num_pages { + let mem_tracker = Arc::new(MemTracker::new()); + let mut dict_encoder = + DictEncoder::::new(column_desc.clone(), mem_tracker); + // add data page + let mut values = Vec::with_capacity(values_per_page); + + for _ in 0..values_per_page { + let def_level = rng.gen_range(0..max_def_level + 1); + let rep_level = rng.gen_range(0..max_rep_level + 1); + if def_level == max_def_level { + let len = rng.gen_range(1..str_base.len()); + let slice = &str_base[..len]; + values.push(ByteArray::from(slice)); + all_values.push(Some(slice.to_string())); + } else { + all_values.push(None) + } + rep_levels.push(rep_level); + def_levels.push(def_level) + } + + let range = i * values_per_page..(i + 1) * values_per_page; + let mut pb = + DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true); + pb.add_rep_levels(max_rep_level, &rep_levels.as_slice()[range.clone()]); + pb.add_def_levels(max_def_level, &def_levels.as_slice()[range]); + let _ = dict_encoder.put(&values); + let indices = dict_encoder + .write_indices() + .expect("write_indices() should be OK"); + pb.add_indices(indices); + let data_page = pb.consume(); + // for each page log num_values vs actual values in page + // println!("page num_values: {}, values.len(): {}", data_page.num_values(), values.len()); + // add dictionary page + let dict = dict_encoder + .write_dict() + .expect("write_dict() should be OK"); + let dict_page = Page::DictionaryPage { + buf: dict, + num_values: dict_encoder.num_entries() as u32, + encoding: Encoding::RLE_DICTIONARY, + is_sorted: false, + }; + pages.push(vec![dict_page, data_page]); + } + + let page_iterator = InMemoryPageIterator::new(schema, column_desc.clone(), pages); + let converter = Utf8Converter::new(Utf8ArrayConverter {}); + let mut array_reader = + ComplexObjectArrayReader::::new( + Box::new(page_iterator), + column_desc, + converter, + None, + ) + .unwrap(); + + let mut accu_len: usize = 0; + + // println!("---------- reading a batch of {} values ----------", values_per_page / 2); + let array = array_reader.next_batch(values_per_page / 2).unwrap(); + assert_eq!(array.len(), values_per_page / 2); + assert_eq!( + Some(&def_levels[accu_len..(accu_len + array.len())]), + array_reader.get_def_levels() + ); + assert_eq!( + Some(&rep_levels[accu_len..(accu_len + array.len())]), + array_reader.get_rep_levels() + ); + accu_len += array.len(); + + // Read next values_per_page values, the first values_per_page/2 ones are from the first column chunk, + // and the last values_per_page/2 ones are from the second column chunk + // println!("---------- reading a batch of {} values ----------", values_per_page); + let array = array_reader.next_batch(values_per_page).unwrap(); + assert_eq!(array.len(), values_per_page); + assert_eq!( + Some(&def_levels[accu_len..(accu_len + array.len())]), + array_reader.get_def_levels() + ); + assert_eq!( + Some(&rep_levels[accu_len..(accu_len + array.len())]), + array_reader.get_rep_levels() + ); + let strings = array.as_any().downcast_ref::().unwrap(); + for i in 0..array.len() { + if array.is_valid(i) { + assert_eq!( + all_values[i + accu_len].as_ref().unwrap().as_str(), + strings.value(i) + ) + } else { + assert_eq!(all_values[i + accu_len], None) + } + } + accu_len += array.len(); + + // Try to read values_per_page values, however there are only values_per_page/2 values + // println!("---------- reading a batch of {} values ----------", values_per_page); + let array = array_reader.next_batch(values_per_page).unwrap(); + assert_eq!(array.len(), values_per_page / 2); + assert_eq!( + Some(&def_levels[accu_len..(accu_len + array.len())]), + array_reader.get_def_levels() + ); + assert_eq!( + Some(&rep_levels[accu_len..(accu_len + array.len())]), + array_reader.get_rep_levels() + ); + } + /// Array reader for test. struct InMemoryArrayReader { data_type: ArrowType, diff --git a/parquet/src/arrow/arrow_array_reader.rs b/parquet/src/arrow/arrow_array_reader.rs index 3f2acf4568d7..cd1e92b1c083 100644 --- a/parquet/src/arrow/arrow_array_reader.rs +++ b/parquet/src/arrow/arrow_array_reader.rs @@ -329,26 +329,43 @@ impl<'a, C: ArrayConverter + 'a> ArrowArrayReader<'a, C> { )))) }; // create def level decoder iterator - let def_level_iter: Box = + let (def_level_iter, value_count): (Box, usize) = if Self::def_levels_available(column_desc) { + // calculate actual value count let mut def_decoder = LevelDecoder::v1( def_level_encoding, column_desc.max_def_level(), ); + def_decoder.set_data(num_values as usize, buffer_ptr.all()); + let value_count = Self::count_def_level_values( + column_desc, + def_decoder, + num_values as usize, + )?; + // create def level decoder + def_decoder = LevelDecoder::v1( + def_level_encoding, + column_desc.max_def_level(), + ); let def_levels_byte_len = def_decoder.set_data(num_values as usize, buffer_ptr.all()); // advance buffer pointer buffer_ptr = buffer_ptr.start_from(def_levels_byte_len); - Box::new(LevelValueDecoder::new(def_decoder)) + (Box::new(LevelValueDecoder::new(def_decoder)), value_count) } else { - Box::new(::once(Err(ParquetError::General( - "def levels are not available".to_string(), - )))) + ( + Box::new(::once(Err( + ParquetError::General( + "def levels are not available".to_string(), + ), + ))), + num_values as usize, + ) }; // create value decoder iterator let value_iter = Self::get_value_decoder( buffer_ptr, - num_values as usize, + value_count, encoding, column_desc, column_chunk_context, @@ -387,9 +404,10 @@ impl<'a, C: ArrayConverter + 'a> ArrowArrayReader<'a, C> { )))) }; // create def level decoder iterator - let def_level_iter: Box = + let (def_level_iter, value_count): (Box, usize) = if Self::def_levels_available(column_desc) { let def_levels_byte_len = def_levels_byte_len as usize; + // calculate actual value count let mut def_decoder = LevelDecoder::v2(column_desc.max_def_level()); def_decoder.set_data_range( @@ -398,19 +416,37 @@ impl<'a, C: ArrayConverter + 'a> ArrowArrayReader<'a, C> { offset, def_levels_byte_len, ); + let value_count = Self::count_def_level_values( + column_desc, + def_decoder, + num_values as usize, + )?; + // create def level decoder + def_decoder = LevelDecoder::v2(column_desc.max_def_level()); + def_decoder.set_data_range( + num_values as usize, + &buf, + offset, + def_levels_byte_len, + ); offset += def_levels_byte_len; - Box::new(LevelValueDecoder::new(def_decoder)) + (Box::new(LevelValueDecoder::new(def_decoder)), value_count) } else { - Box::new(::once(Err(ParquetError::General( - "def levels are not available".to_string(), - )))) + ( + Box::new(::once(Err( + ParquetError::General( + "def levels are not available".to_string(), + ), + ))), + num_values as usize, + ) }; // create value decoder iterator let values_buffer = buf.start_from(offset); let value_iter = Self::get_value_decoder( values_buffer, - num_values as usize, + value_count, encoding, column_desc, column_chunk_context, @@ -420,6 +456,24 @@ impl<'a, C: ArrayConverter + 'a> ArrowArrayReader<'a, C> { } } + fn count_def_level_values( + column_desc: &ColumnDescriptor, + level_decoder: crate::encodings::levels::LevelDecoder, + num_values: usize, + ) -> Result { + let mut def_level_decoder = LevelValueDecoder::new(level_decoder); + let def_level_array = + Self::build_level_array(&mut def_level_decoder, num_values)?; + let def_level_count = def_level_array.len(); + // use eq_scalar to efficiently build null bitmap array from def levels + let null_bitmap_array = + arrow::compute::eq_scalar(&def_level_array, column_desc.max_def_level())?; + // efficiently calculate values to read + Ok(null_bitmap_array + .values() + .count_set_bits_offset(0, def_level_count)) + } + fn get_dictionary_page_decoder( values_buffer: ByteBufferPtr, num_values: usize, @@ -1209,10 +1263,12 @@ mod tests { use crate::column::writer::ColumnWriter; use crate::data_type::ByteArray; use crate::data_type::ByteArrayType; + use crate::encoding::{DictEncoder, Encoder}; use crate::file::properties::WriterProperties; use crate::file::reader::SerializedFileReader; use crate::file::serialized_reader::SliceableCursor; use crate::file::writer::{FileWriter, SerializedFileWriter, TryClone}; + use crate::memory::MemTracker; use crate::schema::parser::parse_message_type; use crate::schema::types::SchemaDescriptor; use crate::util::test_common::page_util::{ @@ -1608,6 +1664,146 @@ mod tests { ); } + #[test] + fn test_arrow_array_reader_dict_enc_string() { + // Construct column schema + let message_type = " + message test_schema { + REPEATED Group test_mid { + OPTIONAL BYTE_ARRAY leaf (UTF8); + } + } + "; + let num_pages = 2; + let values_per_page = 100; + let str_base = "Hello World"; + + let schema = parse_message_type(message_type) + .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t)))) + .unwrap(); + let column_desc = schema.column(0); + let max_def_level = column_desc.max_def_level(); + let max_rep_level = column_desc.max_rep_level(); + + assert_eq!(max_def_level, 2); + assert_eq!(max_rep_level, 1); + + let mut rng = thread_rng(); + let mut pages: Vec> = Vec::new(); + + let mut rep_levels = Vec::with_capacity(num_pages * values_per_page); + let mut def_levels = Vec::with_capacity(num_pages * values_per_page); + let mut all_values = Vec::with_capacity(num_pages * values_per_page); + + for i in 0..num_pages { + let mem_tracker = Arc::new(MemTracker::new()); + let mut dict_encoder = + DictEncoder::::new(column_desc.clone(), mem_tracker); + // add data page + let mut values = Vec::with_capacity(values_per_page); + + for _ in 0..values_per_page { + let def_level = rng.gen_range(0..max_def_level + 1); + let rep_level = rng.gen_range(0..max_rep_level + 1); + if def_level == max_def_level { + let len = rng.gen_range(1..str_base.len()); + let slice = &str_base[..len]; + values.push(ByteArray::from(slice)); + all_values.push(Some(slice.to_string())); + } else { + all_values.push(None) + } + rep_levels.push(rep_level); + def_levels.push(def_level) + } + + let range = i * values_per_page..(i + 1) * values_per_page; + let mut pb = + DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true); + pb.add_rep_levels(max_rep_level, &rep_levels.as_slice()[range.clone()]); + pb.add_def_levels(max_def_level, &def_levels.as_slice()[range]); + let _ = dict_encoder.put(&values); + let indices = dict_encoder + .write_indices() + .expect("write_indices() should be OK"); + pb.add_indices(indices); + let data_page = pb.consume(); + // for each page log num_values vs actual values in page + // println!("page num_values: {}, values.len(): {}", data_page.num_values(), values.len()); + // add dictionary page + let dict = dict_encoder + .write_dict() + .expect("write_dict() should be OK"); + let dict_page = Page::DictionaryPage { + buf: dict, + num_values: dict_encoder.num_entries() as u32, + encoding: Encoding::RLE_DICTIONARY, + is_sorted: false, + }; + pages.push(vec![dict_page, data_page]); + } + + let page_iterator = InMemoryPageIterator::new(schema, column_desc.clone(), pages); + let converter = StringArrayConverter::new(); + let mut array_reader = + ArrowArrayReader::try_new(page_iterator, column_desc, converter, None) + .unwrap(); + + let mut accu_len: usize = 0; + + // println!("---------- reading a batch of {} values ----------", values_per_page / 2); + let array = array_reader.next_batch(values_per_page / 2).unwrap(); + assert_eq!(array.len(), values_per_page / 2); + assert_eq!( + Some(&def_levels[accu_len..(accu_len + array.len())]), + array_reader.get_def_levels() + ); + assert_eq!( + Some(&rep_levels[accu_len..(accu_len + array.len())]), + array_reader.get_rep_levels() + ); + accu_len += array.len(); + + // Read next values_per_page values, the first values_per_page/2 ones are from the first column chunk, + // and the last values_per_page/2 ones are from the second column chunk + // println!("---------- reading a batch of {} values ----------", values_per_page); + let array = array_reader.next_batch(values_per_page).unwrap(); + assert_eq!(array.len(), values_per_page); + assert_eq!( + Some(&def_levels[accu_len..(accu_len + array.len())]), + array_reader.get_def_levels() + ); + assert_eq!( + Some(&rep_levels[accu_len..(accu_len + array.len())]), + array_reader.get_rep_levels() + ); + let strings = array.as_any().downcast_ref::().unwrap(); + for i in 0..array.len() { + if array.is_valid(i) { + assert_eq!( + all_values[i + accu_len].as_ref().unwrap().as_str(), + strings.value(i) + ) + } else { + assert_eq!(all_values[i + accu_len], None) + } + } + accu_len += array.len(); + + // Try to read values_per_page values, however there are only values_per_page/2 values + // println!("---------- reading a batch of {} values ----------", values_per_page); + let array = array_reader.next_batch(values_per_page).unwrap(); + assert_eq!(array.len(), values_per_page / 2); + assert_eq!( + Some(&def_levels[accu_len..(accu_len + array.len())]), + array_reader.get_def_levels() + ); + assert_eq!( + Some(&rep_levels[accu_len..(accu_len + array.len())]), + array_reader.get_rep_levels() + ); + } + /// Allows to write parquet into memory. Intended only for use in tests. #[derive(Clone)] struct VecWriter {