From cf99298bd1388e5de13e7bf5394920a8b5dc1355 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Mon, 13 Sep 2021 21:41:59 +0200 Subject: [PATCH 1/5] Implement read_dict_buffer_required --- src/io/parquet/read/primitive/basic.rs | 41 ++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/src/io/parquet/read/primitive/basic.rs b/src/io/parquet/read/primitive/basic.rs index 968945a1771..f1182c1639a 100644 --- a/src/io/parquet/read/primitive/basic.rs +++ b/src/io/parquet/read/primitive/basic.rs @@ -72,6 +72,37 @@ fn read_dict_buffer_optional( } } +fn read_dict_buffer_required( + indices_buffer: &[u8], + additional: usize, + dict: &PrimitivePageDict, + values: &mut MutableBuffer, + validity: &mut MutableBitmap, + op: F, +) where + T: NativeType, + A: ArrowNativeType, + F: Fn(T) -> A, +{ + let length = additional + values.len(); + let dict_values = dict.values(); + + // SPEC: Data page format: the bit width used to encode the entry ids stored as 1 byte (max bit width = 32), + // SPEC: followed by the values encoded using RLE/Bit packed described above (with the given bit width). + let bit_width = indices_buffer[0]; + let indices_buffer = &indices_buffer[1..]; + + let indices = + hybrid_rle::HybridRleDecoder::new(indices_buffer, bit_width as u32, additional); + + for index in indices { + let value = op(dict_values[index as usize]); + values.push(value); + } + + validity.extend_constant(additional, true); +} + fn read_nullable( validity_buffer: &[u8], values_buffer: &[u8], @@ -170,6 +201,16 @@ where op, ) } + (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false) => { + read_dict_buffer_required( + values_buffer, + additional, + dict.as_any().downcast_ref().unwrap(), + values, + validity, + op, + ) + } // it can happen that there is a dictionary but the encoding is plain because // it falled back. (Encoding::Plain, _, true) => read_nullable( From e90830570329a3ac77f96774465442ec3c8d9621 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Mon, 13 Sep 2021 21:42:57 +0200 Subject: [PATCH 2/5] Fmt --- src/io/parquet/read/primitive/basic.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/io/parquet/read/primitive/basic.rs b/src/io/parquet/read/primitive/basic.rs index f1182c1639a..374cc430532 100644 --- a/src/io/parquet/read/primitive/basic.rs +++ b/src/io/parquet/read/primitive/basic.rs @@ -92,8 +92,7 @@ fn read_dict_buffer_required( let bit_width = indices_buffer[0]; let indices_buffer = &indices_buffer[1..]; - let indices = - hybrid_rle::HybridRleDecoder::new(indices_buffer, bit_width as u32, additional); + let indices = hybrid_rle::HybridRleDecoder::new(indices_buffer, bit_width as u32, additional); for index in indices { let value = op(dict_values[index as usize]); From a11fb3f404e8de491ae0950a4c75d2d924141151 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Mon, 13 Sep 2021 22:10:18 +0200 Subject: [PATCH 3/5] Add tests for required primitive dict files --- tests/it/io/parquet/read.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/tests/it/io/parquet/read.rs b/tests/it/io/parquet/read.rs index 61d855edf79..35ddcf3b5ce 100644 --- a/tests/it/io/parquet/read.rs +++ b/tests/it/io/parquet/read.rs @@ -112,6 +112,16 @@ fn v1_int64_nullable_dict() -> Result<()> { test_pyarrow_integration(0, 1, "basic", true, false) } +#[test] +fn v2_int64_required_dict() -> Result<()> { + test_pyarrow_integration(0, 2, "basic", true, true) +} + +#[test] +fn v1_int64_required_dict() -> Result<()> { + test_pyarrow_integration(0, 1, "basic", true, true) +} + #[test] fn v2_utf8_nullable() -> Result<()> { test_pyarrow_integration(2, 2, "basic", false, false) From 6ced92d28bd802b5e919c6e5003b103b6e0f1b84 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Mon, 13 Sep 2021 22:12:58 +0200 Subject: [PATCH 4/5] Remove length value --- src/io/parquet/read/primitive/basic.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/io/parquet/read/primitive/basic.rs b/src/io/parquet/read/primitive/basic.rs index 374cc430532..b524f7f2f69 100644 --- a/src/io/parquet/read/primitive/basic.rs +++ b/src/io/parquet/read/primitive/basic.rs @@ -84,7 +84,6 @@ fn read_dict_buffer_required( A: ArrowNativeType, F: Fn(T) -> A, { - let length = additional + values.len(); let dict_values = dict.values(); // SPEC: Data page format: the bit width used to encode the entry ids stored as 1 byte (max bit width = 32), From a9d980b5bb1735482a5f06f4ac4537691760532a Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Mon, 13 Sep 2021 22:27:56 +0200 Subject: [PATCH 5/5] Use extend --- src/io/parquet/read/primitive/basic.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/io/parquet/read/primitive/basic.rs b/src/io/parquet/read/primitive/basic.rs index b524f7f2f69..a783f001a4a 100644 --- a/src/io/parquet/read/primitive/basic.rs +++ b/src/io/parquet/read/primitive/basic.rs @@ -93,10 +93,7 @@ fn read_dict_buffer_required( let indices = hybrid_rle::HybridRleDecoder::new(indices_buffer, bit_width as u32, additional); - for index in indices { - let value = op(dict_values[index as usize]); - values.push(value); - } + values.extend(indices.map(|index| op(dict_values[index as usize]))); validity.extend_constant(additional, true); }