Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added support for dictionary-encoding.
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Aug 4, 2021
1 parent 07942f1 commit 063ede8
Show file tree
Hide file tree
Showing 33 changed files with 685 additions and 164 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/integration-parquet.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,5 @@ jobs:
pip install --upgrade pip
pip install pyarrow pyspark
python main.py
# test delta encoding against spark (pyarrow does not support it)
# test against spark
python main_spark.py
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ hex = { version = "^0.4", optional = true }

# for IPC compression
lz4 = { version = "1.23.1", optional = true }
zstd = { version = "^0.6", optional = true }
zstd = { version = "0.9", optional = true }

rand = { version = "0.8", optional = true }

Expand All @@ -58,7 +58,7 @@ futures = { version = "0.3", optional = true }
# for faster hashing
ahash = { version = "0.7", optional = true }

parquet2 = { version = "0.1", optional = true, default_features = false, features = ["stream"] }
parquet2 = { version = "0.2", optional = true, default_features = false, features = ["stream"] }

[dev-dependencies]
rand = "0.8"
Expand Down
3 changes: 2 additions & 1 deletion arrow-parquet-integration-testing/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ def variations():
"generated_datetime",
"generated_decimal",
"generated_interval",
# requires writing Dictionary
# see https://issues.apache.org/jira/browse/ARROW-13486 and
# https://issues.apache.org/jira/browse/ARROW-13487
# "generated_dictionary",
# requires writing Struct
# "generated_duplicate_fieldnames",
Expand Down
49 changes: 31 additions & 18 deletions arrow-parquet-integration-testing/main_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,40 @@

from main import _prepare, _expected

_file = "generated_primitive"
_version = "2"
_encoding = "delta"
column = ("utf8_nullable", 24)

expected = _expected(_file)
expected = next(c for i, c in enumerate(expected) if i == column[1])
expected = expected.combine_chunks().tolist()
def test(file: str, version: str, column, encoding: str):
"""
Tests that pyspark can read a parquet file written by arrow2.
path = _prepare(_file, _version, _encoding, [column[1]])
In arrow2: read IPC, write parquet
In pyarrow: read (same) IPC to Python
In pyspark: read (written) parquet to Python
assert that they are equal
"""
# write parquet
path = _prepare(file, version, encoding, [column[1]])

spark = pyspark.sql.SparkSession.builder.config(
# see https://stackoverflow.com/a/62024670/931303
"spark.sql.parquet.enableVectorizedReader",
"false",
).getOrCreate()
# read IPC to Python
expected = _expected(file)
expected = next(c for i, c in enumerate(expected) if i == column[1])
expected = expected.combine_chunks().tolist()

df = spark.read.parquet(path)
# read parquet to Python
spark = pyspark.sql.SparkSession.builder.config(
# see https://stackoverflow.com/a/62024670/931303
"spark.sql.parquet.enableVectorizedReader",
"false",
).getOrCreate()

r = df.select(column[0]).collect()
os.remove(path)
result = spark.read.parquet(path).select(column[0]).collect()
result = [r[column[0]] for r in result]
os.remove(path)

result = [r[column[0]] for r in r]
assert expected == result
# assert equality
assert expected == result


test("generated_primitive", "2", ("utf8_nullable", 24), "delta")

test("generated_dictionary", "1", ("dict0", 0), "")
test("generated_dictionary", "2", ("dict0", 0), "")
1 change: 1 addition & 0 deletions arrow-parquet-integration-testing/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ fn main() -> Result<()> {
.fields()
.iter()
.map(|x| match x.data_type() {
DataType::Dictionary(_, _) => Encoding::RleDictionary,
DataType::Utf8 | DataType::LargeUtf8 => {
if utf8_encoding == "delta" {
Encoding::DeltaLengthByteArray
Expand Down
26 changes: 16 additions & 10 deletions src/io/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,14 +250,19 @@ mod tests {
.iter()
.map(|x| x.map(|x| x as u32))
.collect::<Vec<_>>();
Box::new(PrimitiveArray::<u32>::from(values).to(DataType::UInt32))
Box::new(PrimitiveArray::<u32>::from(values))
}
6 => {
let keys = PrimitiveArray::<i32>::from([Some(0), Some(1), None, Some(1)]);
let values = Arc::new(PrimitiveArray::<i32>::from_slice([10, 200]));
Box::new(DictionaryArray::<i32>::from_data(keys, values))
}
_ => unreachable!(),
}
}

pub fn pyarrow_nullable_statistics(column: usize) -> Box<dyn Statistics> {
match column {
pub fn pyarrow_nullable_statistics(column: usize) -> Option<Box<dyn Statistics>> {
Some(match column {
0 => Box::new(PrimitiveStatistics::<i64> {
data_type: DataType::Int64,
distinct_count: None,
Expand Down Expand Up @@ -300,8 +305,9 @@ mod tests {
min_value: Some(0),
max_value: Some(9),
}),
6 => return None,
_ => unreachable!(),
}
})
}

// these values match the values in `integration`
Expand Down Expand Up @@ -331,8 +337,8 @@ mod tests {
}
}

pub fn pyarrow_required_statistics(column: usize) -> Box<dyn Statistics> {
match column {
pub fn pyarrow_required_statistics(column: usize) -> Option<Box<dyn Statistics>> {
Some(match column {
0 => Box::new(PrimitiveStatistics::<i64> {
data_type: DataType::Int64,
null_count: Some(0),
Expand All @@ -353,11 +359,11 @@ mod tests {
max_value: Some("def".to_string()),
}),
_ => unreachable!(),
}
})
}

pub fn pyarrow_nested_nullable_statistics(column: usize) -> Box<dyn Statistics> {
match column {
pub fn pyarrow_nested_nullable_statistics(column: usize) -> Option<Box<dyn Statistics>> {
Some(match column {
3 => Box::new(PrimitiveStatistics::<i16> {
data_type: DataType::Int16,
distinct_count: None,
Expand Down Expand Up @@ -390,7 +396,7 @@ mod tests {
min_value: Some(0),
max_value: Some(9),
}),
}
})
}
}

Expand Down
51 changes: 28 additions & 23 deletions src/io/parquet/read/binary/basic.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use parquet2::{
encoding::{bitpacking, delta_length_byte_array, hybrid_rle, uleb128, Encoding},
metadata::{ColumnChunkMetaData, ColumnDescriptor},
read::{levels, BinaryPageDict, Page, PageHeader, StreamingIterator},
page::{BinaryPageDict, DataPage, DataPageHeader},
read::{levels, StreamingIterator},
};

use crate::{
Expand Down Expand Up @@ -202,7 +203,7 @@ pub(super) fn read_plain_required<O: Offset>(
}

fn extend_from_page<O: Offset>(
page: &Page,
page: &DataPage,
descriptor: &ColumnDescriptor,
offsets: &mut MutableBuffer<O>,
values: &mut MutableBuffer<u8>,
Expand All @@ -212,22 +213,24 @@ fn extend_from_page<O: Offset>(
assert!(descriptor.max_def_level() <= 1);
let is_optional = descriptor.max_def_level() == 1;
match page.header() {
PageHeader::V1(header) => {
DataPageHeader::V1(header) => {
assert_eq!(header.definition_level_encoding, Encoding::Rle);

let (_, validity_buffer, values_buffer) =
levels::split_buffer_v1(page.buffer(), false, is_optional);

match (&page.encoding(), page.dictionary_page(), is_optional) {
(Encoding::PlainDictionary, Some(dict), true) => read_dict_buffer::<O>(
validity_buffer,
values_buffer,
page.num_values() as u32,
dict.as_any().downcast_ref().unwrap(),
offsets,
values,
validity,
),
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true) => {
read_dict_buffer::<O>(
validity_buffer,
values_buffer,
page.num_values() as u32,
dict.as_any().downcast_ref().unwrap(),
offsets,
values,
validity,
)
}
(Encoding::DeltaLengthByteArray, None, true) => read_delta_optional::<O>(
validity_buffer,
values_buffer,
Expand Down Expand Up @@ -258,22 +261,24 @@ fn extend_from_page<O: Offset>(
}
}
}
PageHeader::V2(header) => {
DataPageHeader::V2(header) => {
let def_level_buffer_length = header.definition_levels_byte_length as usize;

let (_, validity_buffer, values_buffer) =
levels::split_buffer_v2(page.buffer(), 0, def_level_buffer_length);

match (page.encoding(), page.dictionary_page(), is_optional) {
(Encoding::PlainDictionary, Some(dict), true) => read_dict_buffer::<O>(
validity_buffer,
values_buffer,
page.num_values() as u32,
dict.as_any().downcast_ref().unwrap(),
offsets,
values,
validity,
),
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true) => {
read_dict_buffer::<O>(
validity_buffer,
values_buffer,
page.num_values() as u32,
dict.as_any().downcast_ref().unwrap(),
offsets,
values,
validity,
)
}
(Encoding::Plain, None, true) => read_plain_optional::<O>(
validity_buffer,
values_buffer,
Expand Down Expand Up @@ -317,7 +322,7 @@ where
ArrowError: From<E>,
O: Offset,
E: Clone,
I: StreamingIterator<Item = std::result::Result<Page, E>>,
I: StreamingIterator<Item = std::result::Result<DataPage, E>>,
{
let capacity = metadata.num_values() as usize;
let mut values = MutableBuffer::<u8>::with_capacity(0);
Expand Down
11 changes: 6 additions & 5 deletions src/io/parquet/read/binary/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ use std::sync::Arc;
use parquet2::{
encoding::Encoding,
metadata::{ColumnChunkMetaData, ColumnDescriptor},
page::{DataPage, DataPageHeader},
read::{
levels::{get_bit_width, split_buffer_v1, split_buffer_v2, RLEDecoder},
Page, PageHeader, StreamingIterator,
StreamingIterator,
},
};

Expand Down Expand Up @@ -108,7 +109,7 @@ fn read<O: Offset>(
}

fn extend_from_page<O: Offset>(
page: &Page,
page: &DataPage,
descriptor: &ColumnDescriptor,
is_nullable: bool,
nested: &mut Vec<Box<dyn Nested>>,
Expand All @@ -119,7 +120,7 @@ fn extend_from_page<O: Offset>(
let additional = page.num_values();

match page.header() {
PageHeader::V1(header) => {
DataPageHeader::V1(header) => {
assert_eq!(header.definition_level_encoding, Encoding::Rle);
assert_eq!(header.repetition_level_encoding, Encoding::Rle);

Expand Down Expand Up @@ -161,7 +162,7 @@ fn extend_from_page<O: Offset>(
}
}
}
PageHeader::V2(header) => match (&page.encoding(), page.dictionary_page()) {
DataPageHeader::V2(header) => match (&page.encoding(), page.dictionary_page()) {
(Encoding::Plain, None) => {
let def_level_buffer_length = header.definition_levels_byte_length as usize;
let rep_level_buffer_length = header.repetition_levels_byte_length as usize;
Expand Down Expand Up @@ -207,7 +208,7 @@ where
O: Offset,
ArrowError: From<E>,
E: Clone,
I: StreamingIterator<Item = std::result::Result<Page, E>>,
I: StreamingIterator<Item = std::result::Result<DataPage, E>>,
{
let capacity = metadata.num_values() as usize;
let mut values = MutableBuffer::<u8>::with_capacity(0);
Expand Down
11 changes: 6 additions & 5 deletions src/io/parquet/read/boolean/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ use super::super::utils;
use parquet2::{
encoding::{hybrid_rle, Encoding},
metadata::{ColumnChunkMetaData, ColumnDescriptor},
read::{levels, Page, PageHeader, StreamingIterator},
page::{DataPage, DataPageHeader},
read::{levels, StreamingIterator},
};

pub(super) fn read_required(buffer: &[u8], additional: usize, values: &mut MutableBitmap) {
Expand Down Expand Up @@ -68,7 +69,7 @@ pub fn iter_to_array<I, E>(mut iter: I, metadata: &ColumnChunkMetaData) -> Resul
where
ArrowError: From<E>,
E: Clone,
I: StreamingIterator<Item = std::result::Result<Page, E>>,
I: StreamingIterator<Item = std::result::Result<DataPage, E>>,
{
let capacity = metadata.num_values() as usize;
let mut values = MutableBitmap::with_capacity(capacity);
Expand All @@ -86,7 +87,7 @@ where
}

fn extend_from_page(
page: &Page,
page: &DataPage,
descriptor: &ColumnDescriptor,
values: &mut MutableBitmap,
validity: &mut MutableBitmap,
Expand All @@ -95,7 +96,7 @@ fn extend_from_page(
assert!(descriptor.max_def_level() <= 1);
let is_optional = descriptor.max_def_level() == 1;
match page.header() {
PageHeader::V1(header) => {
DataPageHeader::V1(header) => {
assert_eq!(header.definition_level_encoding, Encoding::Rle);

match (&page.encoding(), page.dictionary_page(), is_optional) {
Expand Down Expand Up @@ -124,7 +125,7 @@ fn extend_from_page(
}
}
}
PageHeader::V2(header) => {
DataPageHeader::V2(header) => {
let def_level_buffer_length = header.definition_levels_byte_length as usize;
match (page.encoding(), page.dictionary_page(), is_optional) {
(Encoding::Plain, None, true) => {
Expand Down
Loading

0 comments on commit 063ede8

Please sign in to comment.