Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Added support to round-trip dictionary arrays on parquet #232

Merged
merged 2 commits into from
Aug 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/integration-parquet.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,5 @@ jobs:
pip install --upgrade pip
pip install pyarrow pyspark
python main.py
# test delta encoding against spark (pyarrow does not support it)
# test against spark
python main_spark.py
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ hex = { version = "^0.4", optional = true }

# for IPC compression
lz4 = { version = "1.23.1", optional = true }
zstd = { version = "^0.6", optional = true }
zstd = { version = "0.9", optional = true }

rand = { version = "0.8", optional = true }

Expand All @@ -58,7 +58,7 @@ futures = { version = "0.3", optional = true }
# for faster hashing
ahash = { version = "0.7", optional = true }

parquet2 = { version = "0.1", optional = true, default_features = false, features = ["stream"] }
parquet2 = { version = "0.2", optional = true, default_features = false, features = ["stream"] }

[dev-dependencies]
rand = "0.8"
Expand Down
3 changes: 2 additions & 1 deletion arrow-parquet-integration-testing/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ def variations():
"generated_datetime",
"generated_decimal",
"generated_interval",
# requires writing Dictionary
# see https://issues.apache.org/jira/browse/ARROW-13486 and
# https://issues.apache.org/jira/browse/ARROW-13487
# "generated_dictionary",
# requires writing Struct
# "generated_duplicate_fieldnames",
Expand Down
49 changes: 31 additions & 18 deletions arrow-parquet-integration-testing/main_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,40 @@

from main import _prepare, _expected

_file = "generated_primitive"
_version = "2"
_encoding = "delta"
column = ("utf8_nullable", 24)

expected = _expected(_file)
expected = next(c for i, c in enumerate(expected) if i == column[1])
expected = expected.combine_chunks().tolist()
def test(file: str, version: str, column, encoding: str):
"""
Tests that pyspark can read a parquet file written by arrow2.
path = _prepare(_file, _version, _encoding, [column[1]])
In arrow2: read IPC, write parquet
In pyarrow: read (same) IPC to Python
In pyspark: read (written) parquet to Python
assert that they are equal
"""
# write parquet
path = _prepare(file, version, encoding, [column[1]])

spark = pyspark.sql.SparkSession.builder.config(
# see https://stackoverflow.com/a/62024670/931303
"spark.sql.parquet.enableVectorizedReader",
"false",
).getOrCreate()
# read IPC to Python
expected = _expected(file)
expected = next(c for i, c in enumerate(expected) if i == column[1])
expected = expected.combine_chunks().tolist()

df = spark.read.parquet(path)
# read parquet to Python
spark = pyspark.sql.SparkSession.builder.config(
# see https://stackoverflow.com/a/62024670/931303
"spark.sql.parquet.enableVectorizedReader",
"false",
).getOrCreate()

r = df.select(column[0]).collect()
os.remove(path)
result = spark.read.parquet(path).select(column[0]).collect()
result = [r[column[0]] for r in result]
os.remove(path)

result = [r[column[0]] for r in r]
assert expected == result
# assert equality
assert expected == result


test("generated_primitive", "2", ("utf8_nullable", 24), "delta")

test("generated_dictionary", "1", ("dict0", 0), "")
test("generated_dictionary", "2", ("dict0", 0), "")
1 change: 1 addition & 0 deletions arrow-parquet-integration-testing/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ fn main() -> Result<()> {
.fields()
.iter()
.map(|x| match x.data_type() {
DataType::Dictionary(_, _) => Encoding::RleDictionary,
DataType::Utf8 | DataType::LargeUtf8 => {
if utf8_encoding == "delta" {
Encoding::DeltaLengthByteArray
Expand Down
26 changes: 16 additions & 10 deletions src/io/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,14 +250,19 @@ mod tests {
.iter()
.map(|x| x.map(|x| x as u32))
.collect::<Vec<_>>();
Box::new(PrimitiveArray::<u32>::from(values).to(DataType::UInt32))
Box::new(PrimitiveArray::<u32>::from(values))
}
6 => {
let keys = PrimitiveArray::<i32>::from([Some(0), Some(1), None, Some(1)]);
let values = Arc::new(PrimitiveArray::<i32>::from_slice([10, 200]));
Box::new(DictionaryArray::<i32>::from_data(keys, values))
}
_ => unreachable!(),
}
}

pub fn pyarrow_nullable_statistics(column: usize) -> Box<dyn Statistics> {
match column {
pub fn pyarrow_nullable_statistics(column: usize) -> Option<Box<dyn Statistics>> {
Some(match column {
0 => Box::new(PrimitiveStatistics::<i64> {
data_type: DataType::Int64,
distinct_count: None,
Expand Down Expand Up @@ -300,8 +305,9 @@ mod tests {
min_value: Some(0),
max_value: Some(9),
}),
6 => return None,
_ => unreachable!(),
}
})
}

// these values match the values in `integration`
Expand Down Expand Up @@ -331,8 +337,8 @@ mod tests {
}
}

pub fn pyarrow_required_statistics(column: usize) -> Box<dyn Statistics> {
match column {
pub fn pyarrow_required_statistics(column: usize) -> Option<Box<dyn Statistics>> {
Some(match column {
0 => Box::new(PrimitiveStatistics::<i64> {
data_type: DataType::Int64,
null_count: Some(0),
Expand All @@ -353,11 +359,11 @@ mod tests {
max_value: Some("def".to_string()),
}),
_ => unreachable!(),
}
})
}

pub fn pyarrow_nested_nullable_statistics(column: usize) -> Box<dyn Statistics> {
match column {
pub fn pyarrow_nested_nullable_statistics(column: usize) -> Option<Box<dyn Statistics>> {
Some(match column {
3 => Box::new(PrimitiveStatistics::<i16> {
data_type: DataType::Int16,
distinct_count: None,
Expand Down Expand Up @@ -390,7 +396,7 @@ mod tests {
min_value: Some(0),
max_value: Some(9),
}),
}
})
}
}

Expand Down
51 changes: 28 additions & 23 deletions src/io/parquet/read/binary/basic.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use parquet2::{
encoding::{bitpacking, delta_length_byte_array, hybrid_rle, uleb128, Encoding},
metadata::{ColumnChunkMetaData, ColumnDescriptor},
read::{levels, BinaryPageDict, Page, PageHeader, StreamingIterator},
page::{BinaryPageDict, DataPage, DataPageHeader},
read::{levels, StreamingIterator},
};

use crate::{
Expand Down Expand Up @@ -202,7 +203,7 @@ pub(super) fn read_plain_required<O: Offset>(
}

fn extend_from_page<O: Offset>(
page: &Page,
page: &DataPage,
descriptor: &ColumnDescriptor,
offsets: &mut MutableBuffer<O>,
values: &mut MutableBuffer<u8>,
Expand All @@ -212,22 +213,24 @@ fn extend_from_page<O: Offset>(
assert!(descriptor.max_def_level() <= 1);
let is_optional = descriptor.max_def_level() == 1;
match page.header() {
PageHeader::V1(header) => {
DataPageHeader::V1(header) => {
assert_eq!(header.definition_level_encoding, Encoding::Rle);

let (_, validity_buffer, values_buffer) =
levels::split_buffer_v1(page.buffer(), false, is_optional);

match (&page.encoding(), page.dictionary_page(), is_optional) {
(Encoding::PlainDictionary, Some(dict), true) => read_dict_buffer::<O>(
validity_buffer,
values_buffer,
page.num_values() as u32,
dict.as_any().downcast_ref().unwrap(),
offsets,
values,
validity,
),
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true) => {
read_dict_buffer::<O>(
validity_buffer,
values_buffer,
page.num_values() as u32,
dict.as_any().downcast_ref().unwrap(),
offsets,
values,
validity,
)
}
(Encoding::DeltaLengthByteArray, None, true) => read_delta_optional::<O>(
validity_buffer,
values_buffer,
Expand Down Expand Up @@ -258,22 +261,24 @@ fn extend_from_page<O: Offset>(
}
}
}
PageHeader::V2(header) => {
DataPageHeader::V2(header) => {
let def_level_buffer_length = header.definition_levels_byte_length as usize;

let (_, validity_buffer, values_buffer) =
levels::split_buffer_v2(page.buffer(), 0, def_level_buffer_length);

match (page.encoding(), page.dictionary_page(), is_optional) {
(Encoding::PlainDictionary, Some(dict), true) => read_dict_buffer::<O>(
validity_buffer,
values_buffer,
page.num_values() as u32,
dict.as_any().downcast_ref().unwrap(),
offsets,
values,
validity,
),
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true) => {
read_dict_buffer::<O>(
validity_buffer,
values_buffer,
page.num_values() as u32,
dict.as_any().downcast_ref().unwrap(),
offsets,
values,
validity,
)
}
(Encoding::Plain, None, true) => read_plain_optional::<O>(
validity_buffer,
values_buffer,
Expand Down Expand Up @@ -317,7 +322,7 @@ where
ArrowError: From<E>,
O: Offset,
E: Clone,
I: StreamingIterator<Item = std::result::Result<Page, E>>,
I: StreamingIterator<Item = std::result::Result<DataPage, E>>,
{
let capacity = metadata.num_values() as usize;
let mut values = MutableBuffer::<u8>::with_capacity(0);
Expand Down
11 changes: 6 additions & 5 deletions src/io/parquet/read/binary/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ use std::sync::Arc;
use parquet2::{
encoding::Encoding,
metadata::{ColumnChunkMetaData, ColumnDescriptor},
page::{DataPage, DataPageHeader},
read::{
levels::{get_bit_width, split_buffer_v1, split_buffer_v2, RLEDecoder},
Page, PageHeader, StreamingIterator,
StreamingIterator,
},
};

Expand Down Expand Up @@ -108,7 +109,7 @@ fn read<O: Offset>(
}

fn extend_from_page<O: Offset>(
page: &Page,
page: &DataPage,
descriptor: &ColumnDescriptor,
is_nullable: bool,
nested: &mut Vec<Box<dyn Nested>>,
Expand All @@ -119,7 +120,7 @@ fn extend_from_page<O: Offset>(
let additional = page.num_values();

match page.header() {
PageHeader::V1(header) => {
DataPageHeader::V1(header) => {
assert_eq!(header.definition_level_encoding, Encoding::Rle);
assert_eq!(header.repetition_level_encoding, Encoding::Rle);

Expand Down Expand Up @@ -161,7 +162,7 @@ fn extend_from_page<O: Offset>(
}
}
}
PageHeader::V2(header) => match (&page.encoding(), page.dictionary_page()) {
DataPageHeader::V2(header) => match (&page.encoding(), page.dictionary_page()) {
(Encoding::Plain, None) => {
let def_level_buffer_length = header.definition_levels_byte_length as usize;
let rep_level_buffer_length = header.repetition_levels_byte_length as usize;
Expand Down Expand Up @@ -207,7 +208,7 @@ where
O: Offset,
ArrowError: From<E>,
E: Clone,
I: StreamingIterator<Item = std::result::Result<Page, E>>,
I: StreamingIterator<Item = std::result::Result<DataPage, E>>,
{
let capacity = metadata.num_values() as usize;
let mut values = MutableBuffer::<u8>::with_capacity(0);
Expand Down
11 changes: 6 additions & 5 deletions src/io/parquet/read/boolean/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ use super::super::utils;
use parquet2::{
encoding::{hybrid_rle, Encoding},
metadata::{ColumnChunkMetaData, ColumnDescriptor},
read::{levels, Page, PageHeader, StreamingIterator},
page::{DataPage, DataPageHeader},
read::{levels, StreamingIterator},
};

pub(super) fn read_required(buffer: &[u8], additional: usize, values: &mut MutableBitmap) {
Expand Down Expand Up @@ -68,7 +69,7 @@ pub fn iter_to_array<I, E>(mut iter: I, metadata: &ColumnChunkMetaData) -> Resul
where
ArrowError: From<E>,
E: Clone,
I: StreamingIterator<Item = std::result::Result<Page, E>>,
I: StreamingIterator<Item = std::result::Result<DataPage, E>>,
{
let capacity = metadata.num_values() as usize;
let mut values = MutableBitmap::with_capacity(capacity);
Expand All @@ -86,7 +87,7 @@ where
}

fn extend_from_page(
page: &Page,
page: &DataPage,
descriptor: &ColumnDescriptor,
values: &mut MutableBitmap,
validity: &mut MutableBitmap,
Expand All @@ -95,7 +96,7 @@ fn extend_from_page(
assert!(descriptor.max_def_level() <= 1);
let is_optional = descriptor.max_def_level() == 1;
match page.header() {
PageHeader::V1(header) => {
DataPageHeader::V1(header) => {
assert_eq!(header.definition_level_encoding, Encoding::Rle);

match (&page.encoding(), page.dictionary_page(), is_optional) {
Expand Down Expand Up @@ -124,7 +125,7 @@ fn extend_from_page(
}
}
}
PageHeader::V2(header) => {
DataPageHeader::V2(header) => {
let def_level_buffer_length = header.definition_levels_byte_length as usize;
match (page.encoding(), page.dictionary_page(), is_optional) {
(Encoding::Plain, None, true) => {
Expand Down
Loading