Skip to content

Commit

Permalink
Fixed writing nested parquet (jorgecarleitao#1390)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored and ritchie46 committed Mar 29, 2023
1 parent 75133eb commit d03bdca
Show file tree
Hide file tree
Showing 12 changed files with 1,040 additions and 190 deletions.
96 changes: 86 additions & 10 deletions parquet_integration/write_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,34 @@ def case_nested() -> Tuple[dict, pa.Schema, str]:
None,
[""],
]

list_struct_nullable = [
[{"a": "a"}, {"a": "b"}],
None,
[{"a": "b"}, None, {"a": "b"}],
[{"a": None}, {"a": None}, {"a": None}],
[],
[{"a": "d"}, {"a": "d"}, {"a": "d"}],
None,
[{"a": "e"}],
]

struct_list_nullable = pa.StructArray.from_arrays(
[pa.array(string)],
fields=[("a", pa.list_(pa.utf8()))],
)

list_struct_list_nullable = [
[{"a": ["a"]}, {"a": ["b"]}],
None,
[{"a": ["b"]}, None, {"a": ["b"]}],
[{"a": None}, {"a": None}, {"a": None}],
[],
[{"a": ["d"]}, {"a": [None]}, {"a": ["c", "d"]}],
None,
[{"a": []}],
]

fields = [
pa.field("list_int64", pa.list_(pa.int64())),
pa.field("list_int64_required", pa.list_(pa.field("item", pa.int64(), False))),
Expand All @@ -180,6 +208,18 @@ def case_nested() -> Tuple[dict, pa.Schema, str]:
pa.field(
"list_nested_inner_required_required_i64", pa.list_(pa.list_(pa.int64()))
),
pa.field(
"list_struct_nullable",
pa.list_(pa.struct([("a", pa.utf8())])),
),
pa.field(
"struct_list_nullable",
pa.struct([("a", pa.list_(pa.utf8()))]),
),
pa.field(
"list_struct_list_nullable",
pa.list_(pa.struct([("a", pa.list_(pa.utf8()))])),
),
]
schema = pa.schema(fields)
return (
Expand All @@ -195,6 +235,9 @@ def case_nested() -> Tuple[dict, pa.Schema, str]:
"list_nested_i64": items_nested,
"list_nested_inner_required_i64": items_required_nested,
"list_nested_inner_required_required_i64": items_required_nested_2,
"list_struct_nullable": list_struct_nullable,
"struct_list_nullable": struct_list_nullable,
"list_struct_list_nullable": list_struct_list_nullable,
},
schema,
f"nested_nullable_10.parquet",
Expand Down Expand Up @@ -246,7 +289,9 @@ def case_struct() -> Tuple[dict, pa.Schema, str]:
struct_nullable = pa.StructArray.from_arrays(
[pa.array(string), pa.array(boolean)],
fields=struct_fields,
mask=pa.array([False, False, True, False, False, False, False, False, False, False]),
mask=pa.array(
[False, False, True, False, False, False, False, False, False, False]
),
)

return (
Expand All @@ -260,7 +305,20 @@ def case_struct() -> Tuple[dict, pa.Schema, str]:
"struct_struct_nullable": pa.StructArray.from_arrays(
[struct, pa.array(boolean)],
names=["f1", "f2"],
mask=pa.array([False, False, True, False, False, False, False, False, False, False]),
mask=pa.array(
[
False,
False,
True,
False,
False,
False,
False,
False,
False,
False,
]
),
),
},
schema,
Expand All @@ -271,30 +329,48 @@ def case_struct() -> Tuple[dict, pa.Schema, str]:
def case_nested_edge():
simple = [[0, 1]]
null = [None]
empty = [[]]

struct_list_nullable = pa.StructArray.from_arrays(
[pa.array([["a", "b", None, "c"]])],
fields=[
("f1", pa.list_(pa.utf8())),
],
fields=[("f1", pa.list_(pa.utf8()))],
)

list_struct_list_nullable = pa.ListArray.from_arrays([0, 1], struct_list_nullable)

fields = [
pa.field("simple", pa.list_(pa.int64())),
pa.field("null", pa.list_(pa.field("item", pa.int64(), True))),
pa.field("empty", pa.list_(pa.field("item", pa.int64(), True))),
pa.field(
"struct_list_nullable",
pa.struct([
("f1", pa.list_(pa.utf8())),
]),
)
"struct_list_nullable",
pa.struct(
[("f1", pa.list_(pa.utf8()))],
),
),
pa.field(
"list_struct_list_nullable",
pa.list_(
pa.field(
"item",
pa.struct(
[
("f1", pa.list_(pa.utf8())),
]
),
True,
)
),
),
]
schema = pa.schema(fields)
return (
{
"simple": simple,
"null": null,
"empty": empty,
"struct_list_nullable": struct_list_nullable,
"list_struct_list_nullable": list_struct_list_nullable,
},
schema,
f"nested_edge_nullable_10.parquet",
Expand Down
12 changes: 10 additions & 2 deletions src/io/parquet/write/binary/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,17 @@ where
// By slicing the leaf array we also don't write too many values.
let (start, len) = slice_nested_leaf(nested);

let mut nested = nested.to_vec();
let array = array.slice(start, len);
if let Some(Nested::Primitive(_, _, c)) = nested.last_mut() {
*c = len;
} else {
unreachable!("")
}

let mut buffer = vec![];
let (repetition_levels_byte_length, definition_levels_byte_length) =
nested::write_rep_and_def(options.version, nested, &mut buffer, start)?;
nested::write_rep_and_def(options.version, &nested, &mut buffer)?;

let array = array.slice(start, len);
encode_plain(&array, is_optional, &mut buffer);
Expand All @@ -42,7 +50,7 @@ where

utils::build_plain_page(
buffer,
nested::num_values(nested),
nested::num_values(&nested),
nested[0].len(),
array.null_count(),
repetition_levels_byte_length,
Expand Down
14 changes: 11 additions & 3 deletions src/io/parquet/write/boolean/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,18 @@ pub fn array_to_page(
// By slicing the leaf array we also don't write too many values.
let (start, len) = slice_nested_leaf(nested);

let mut nested = nested.to_vec();
let array = array.slice(start, len);
if let Some(Nested::Primitive(_, _, c)) = nested.last_mut() {
*c = len;
} else {
unreachable!("")
}

let mut buffer = vec![];
let (repetition_levels_byte_length, definition_levels_byte_length) =
nested::write_rep_and_def(options.version, nested, &mut buffer, start)?;
let array = array.slice(start, len);
nested::write_rep_and_def(options.version, &nested, &mut buffer)?;

encode_plain(&array, is_optional, &mut buffer)?;

let statistics = if options.write_statistics {
Expand All @@ -37,7 +45,7 @@ pub fn array_to_page(

utils::build_plain_page(
buffer,
nested::num_values(nested),
nested::num_values(&nested),
nested[0].len(),
array.null_count(),
repetition_levels_byte_length,
Expand Down
17 changes: 11 additions & 6 deletions src/io/parquet/write/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,14 @@ fn serialize_levels(
nested: &[Nested],
options: WriteOptions,
buffer: &mut Vec<u8>,
offset: usize,
) -> Result<(usize, usize)> {
if nested.len() == 1 {
let is_optional = is_nullable(&type_.field_info);
serialize_def_levels_simple(validity, length, is_optional, options, buffer)?;
let definition_levels_byte_length = buffer.len();
Ok((0, definition_levels_byte_length))
} else {
nested::write_rep_and_def(options.version, nested, buffer, offset)
nested::write_rep_and_def(options.version, nested, buffer)
}
}

Expand Down Expand Up @@ -115,23 +114,29 @@ fn serialize_keys<K: DictionaryKey>(
let validity = normalized_validity(array);
let (start, len) = slice_nested_leaf(nested);

let mut nested = nested.to_vec();
let array = array.slice(start, len);
if let Some(Nested::Primitive(_, _, c)) = nested.last_mut() {
*c = len;
} else {
unreachable!("")
}

let (repetition_levels_byte_length, definition_levels_byte_length) = serialize_levels(
validity.as_ref(),
array.len(),
&type_,
nested,
&nested,
options,
&mut buffer,
start,
)?;
let array = array.slice(start, len);

serialize_keys_values(&array, validity.as_ref(), &mut buffer)?;

let (num_values, num_rows) = if nested.len() == 1 {
(array.len(), array.len())
} else {
(nested::num_values(nested), nested[0].len())
(nested::num_values(&nested), nested[0].len())
};

utils::build_plain_page(
Expand Down
Loading

0 comments on commit d03bdca

Please sign in to comment.