Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Fixed error
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed May 26, 2022
1 parent b60c454 commit 22a1c84
Show file tree
Hide file tree
Showing 12 changed files with 72 additions and 34 deletions.
1 change: 1 addition & 0 deletions arrow-parquet-integration-testing/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def variations():
"generated_datetime",
"generated_decimal",
"generated_interval",
"generated_nested",
# see https://issues.apache.org/jira/browse/ARROW-13486 and
# https://issues.apache.org/jira/browse/ARROW-13487
# "generated_dictionary",
Expand Down
8 changes: 4 additions & 4 deletions arrow-parquet-integration-testing/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,15 +175,15 @@ fn main() -> Result<()> {
.fields
.iter()
.map(|x| match x.data_type() {
DataType::Dictionary(..) => Encoding::RleDictionary,
DataType::Dictionary(..) => vec![Encoding::RleDictionary],
DataType::Utf8 | DataType::LargeUtf8 => {
if args.encoding_utf8 == EncodingScheme::Delta {
vec![if args.encoding_utf8 == EncodingScheme::Delta {
Encoding::DeltaLengthByteArray
} else {
Encoding::Plain
}
}]
}
_ => Encoding::Plain,
_ => vec![Encoding::Plain],
})
.collect();

Expand Down
2 changes: 1 addition & 1 deletion src/doc/lib.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ fn main() -> Result<()> {
vec![Ok(chunk)].into_iter(),
&schema,
options,
vec![Encoding::Plain, Encoding::Plain],
vec![vec![Encoding::Plain], vec![Encoding::Plain]],
)?;

// anything implementing `std::io::Write` works
Expand Down
7 changes: 2 additions & 5 deletions src/io/parquet/write/binary/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,8 @@ where
let is_optional = is_nullable(&type_.field_info);

let mut buffer = vec![];
nested::write_rep_levels(&mut buffer, &nested, options.version)?;
let repetition_levels_byte_length = buffer.len();

nested::write_def_levels(&mut buffer, &nested, options.version)?;
let definition_levels_byte_length = buffer.len() - repetition_levels_byte_length;
let (repetition_levels_byte_length, definition_levels_byte_length) =
nested::write_rep_and_def(options.version, &nested, &mut buffer)?;

encode_plain(array, is_optional, &mut buffer);

Expand Down
7 changes: 2 additions & 5 deletions src/io/parquet/write/boolean/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,8 @@ pub fn array_to_page(
let is_optional = is_nullable(&type_.field_info);

let mut buffer = vec![];
nested::write_rep_levels(&mut buffer, &nested, options.version)?;
let repetition_levels_byte_length = buffer.len();

nested::write_def_levels(&mut buffer, &nested, options.version)?;
let definition_levels_byte_length = buffer.len() - repetition_levels_byte_length;
let (repetition_levels_byte_length, definition_levels_byte_length) =
nested::write_rep_and_def(options.version, &nested, &mut buffer)?;

encode_plain(array, is_optional, &mut buffer)?;

Expand Down
40 changes: 34 additions & 6 deletions src/io/parquet/write/nested/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,26 @@ fn write_levels_v1<F: FnOnce(&mut Vec<u8>) -> Result<()>>(
}

/// writes the rep levels to a `Vec<u8>`.
pub fn write_rep_levels(buffer: &mut Vec<u8>, nested: &[Nested], version: Version) -> Result<()> {
let num_bits = get_bit_width(max_rep_level(nested) as i16) as u8;
fn write_rep_levels(buffer: &mut Vec<u8>, nested: &[Nested], version: Version) -> Result<()> {
let max_level = max_rep_level(nested) as i16;
if max_level == 0 {
return Ok(());
}
let num_bits = get_bit_width(max_level) as u8;

let levels = rep::RepLevelsIter::new(nested);

let mut buffer1 = vec![];
encode_u32(&mut buffer1, rep::RepLevelsIter::new(nested), num_bits).unwrap();

match version {
Version::V1 => {
write_levels_v1(buffer, |buffer: &mut Vec<u8>| {
let levels = rep::RepLevelsIter::new(nested);
encode_u32(buffer, levels, num_bits)?;
Ok(())
})?;
}
Version::V2 => {
let levels = rep::RepLevelsIter::new(nested);
encode_u32(buffer, levels, num_bits)?;
}
}
Expand All @@ -49,11 +56,18 @@ pub fn write_rep_levels(buffer: &mut Vec<u8>, nested: &[Nested], version: Versio
}

/// writes the rep levels to a `Vec<u8>`.
pub fn write_def_levels(buffer: &mut Vec<u8>, nested: &[Nested], version: Version) -> Result<()> {
let num_bits = get_bit_width(max_def_level(nested) as i16) as u8;
fn write_def_levels(buffer: &mut Vec<u8>, nested: &[Nested], version: Version) -> Result<()> {
let max_level = max_def_level(nested) as i16;
if max_level == 0 {
return Ok(());
}
let num_bits = get_bit_width(max_level) as u8;

let levels = def::DefLevelsIter::new(nested);

let mut buffer1 = vec![];
encode_u32(&mut buffer1, def::DefLevelsIter::new(nested), num_bits).unwrap();

match version {
Version::V1 => write_levels_v1(buffer, move |buffer: &mut Vec<u8>| {
encode_u32(buffer, levels, num_bits)?;
Expand Down Expand Up @@ -92,3 +106,17 @@ fn to_length<O: Offset>(
.windows(2)
.map(|w| w[1].to_usize() - w[0].to_usize())
}

pub fn write_rep_and_def(
page_version: Version,
nested: &[Nested],
buffer: &mut Vec<u8>,
) -> Result<(usize, usize)> {
write_rep_levels(buffer, nested, page_version)?;
let repetition_levels_byte_length = buffer.len();

write_def_levels(buffer, nested, page_version)?;
let definition_levels_byte_length = buffer.len() - repetition_levels_byte_length;

Ok((repetition_levels_byte_length, definition_levels_byte_length))
}
11 changes: 11 additions & 0 deletions src/io/parquet/write/nested/rep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,17 @@ mod tests {

#[test]
fn struct_required() {
let nested = vec![
Nested::Struct(None, false, 10),
Nested::Primitive(None, true, 10),
];
let expected = vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0];

test(nested, expected)
}

#[test]
fn struct_optional() {
let nested = vec![
Nested::Struct(None, true, 10),
Nested::Primitive(None, true, 10),
Expand Down
7 changes: 2 additions & 5 deletions src/io/parquet/write/primitive/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,8 @@ where
let is_optional = is_nullable(&type_.field_info);

let mut buffer = vec![];
nested::write_rep_levels(&mut buffer, &nested, options.version)?;
let repetition_levels_byte_length = buffer.len();

nested::write_def_levels(&mut buffer, &nested, options.version)?;
let definition_levels_byte_length = buffer.len() - repetition_levels_byte_length;
let (repetition_levels_byte_length, definition_levels_byte_length) =
nested::write_rep_and_def(options.version, &nested, &mut buffer)?;

encode_plain(array, is_optional, &mut buffer);

Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/write/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use super::{Encoding, SchemaDescriptor, WriteOptions};
/// let schema = Schema::from(vec![
/// Field::new("values", DataType::Int32, true),
/// ]);
/// let encoding = vec![Encoding::Plain];
/// let encoding = vec![vec![Encoding::Plain]];
/// let options = WriteOptions {
/// write_statistics: true,
/// compression: CompressionOptions::Uncompressed,
Expand Down
7 changes: 2 additions & 5 deletions src/io/parquet/write/utf8/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,8 @@ where
let is_optional = is_nullable(&type_.field_info);

let mut buffer = vec![];
nested::write_rep_levels(&mut buffer, &nested, options.version)?;
let repetition_levels_byte_length = buffer.len();

nested::write_def_levels(&mut buffer, &nested, options.version)?;
let definition_levels_byte_length = buffer.len() - repetition_levels_byte_length;
let (repetition_levels_byte_length, definition_levels_byte_length) =
nested::write_rep_and_def(options.version, &nested, &mut buffer)?;

encode_plain(array, is_optional, &mut buffer);

Expand Down
1 change: 0 additions & 1 deletion tests/it/io/parquet/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ fn test_pyarrow_integration(

let mut file = File::open(path).unwrap();
let (array, statistics) = read_column(&mut file, column)?;
println!("{:?}", array);

let expected = match (type_, required) {
("basic", true) => pyarrow_required(column),
Expand Down
13 changes: 12 additions & 1 deletion tests/it/io/parquet/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ fn decimal_26_required_v2() -> Result<()> {
}

#[test]
fn struct_() -> Result<()> {
fn struct_v1() -> Result<()> {
round_trip(
"struct",
"struct",
Expand All @@ -518,3 +518,14 @@ fn struct_() -> Result<()> {
vec![Encoding::Plain, Encoding::Plain],
)
}

#[test]
fn struct_v2() -> Result<()> {
round_trip(
"struct",
"struct",
Version::V2,
CompressionOptions::Uncompressed,
vec![Encoding::Plain, Encoding::Plain],
)
}

0 comments on commit 22a1c84

Please sign in to comment.