Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Simplified code to generate parquet files for tests (#883)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Mar 5, 2022
1 parent f71124b commit 3b55eeb
Show file tree
Hide file tree
Showing 4 changed files with 283 additions and 263 deletions.
109 changes: 55 additions & 54 deletions parquet_integration/write_parquet.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Tuple

import pyarrow as pa
import pyarrow.parquet
import os
Expand All @@ -6,7 +8,7 @@
PYARROW_PATH = "fixtures/pyarrow3"


def case_basic_nullable(size=1):
def case_basic_nullable() -> Tuple[dict, pa.Schema, str]:
int64 = [0, 1, None, 3, None, 5, 6, 7, None, 9]
float64 = [0.0, 1.0, None, 3.0, None, 5.0, 6.0, 7.0, None, 9.0]
string = ["Hello", None, "aa", "", None, "abc", None, None, "def", "aaa"]
Expand Down Expand Up @@ -38,27 +40,27 @@ def case_basic_nullable(size=1):

return (
{
"int64": int64 * size,
"float64": float64 * size,
"string": string * size,
"bool": boolean * size,
"date": int64 * size,
"uint32": int64 * size,
"string_large": string_large * size,
"decimal_9": decimal * size,
"decimal_18": decimal * size,
"decimal_26": decimal * size,
"timestamp_us": int64 * size,
"timestamp_s": int64 * size,
"emoji": emoji * size,
"timestamp_s_utc": int64 * size,
"int64": int64,
"float64": float64,
"string": string,
"bool": boolean,
"date": int64,
"uint32": int64,
"string_large": string_large,
"decimal_9": decimal,
"decimal_18": decimal,
"decimal_26": decimal,
"timestamp_us": int64,
"timestamp_s": int64,
"emoji": emoji,
"timestamp_s_utc": int64,
},
schema,
f"basic_nullable_{size*10}.parquet",
f"basic_nullable_10.parquet",
)


def case_basic_required(size=1):
def case_basic_required() -> Tuple[dict, pa.Schema, str]:
int64 = [-256, -1, 0, 1, 2, 3, 4, 5, 6, 7]
uint32 = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
float64 = [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
Expand Down Expand Up @@ -87,22 +89,22 @@ def case_basic_required(size=1):

return (
{
"int64": int64 * size,
"float64": float64 * size,
"string": string * size,
"bool": boolean * size,
"date": int64 * size,
"uint32": uint32 * size,
"decimal_9": decimal * size,
"decimal_18": decimal * size,
"decimal_26": decimal * size,
"int64": int64,
"float64": float64,
"string": string,
"bool": boolean,
"date": int64,
"uint32": uint32,
"decimal_9": decimal,
"decimal_18": decimal,
"decimal_26": decimal,
},
schema,
f"basic_required_{size*10}.parquet",
f"basic_required_10.parquet",
)


def case_nested(size):
def case_nested() -> Tuple[dict, pa.Schema, str]:
items_nullable = [[0, 1], None, [2, None, 3], [4, 5, 6], [], [7, 8, 9], None, [10]]
items_required = [[0, 1], None, [2, 0, 3], [4, 5, 6], [], [7, 8, 9], None, [10]]
all_required = [[0, 1], [], [2, 0, 3], [4, 5, 6], [], [7, 8, 9], [], [10]]
Expand Down Expand Up @@ -178,23 +180,23 @@ def case_nested(size):
schema = pa.schema(fields)
return (
{
"list_int64": items_nullable * size,
"list_int64_required": items_required * size,
"list_int64_required_required": all_required * size,
"list_int16": i16 * size,
"list_bool": boolean * size,
"list_utf8": string * size,
"list_large_binary": string * size,
"list_nested_i64": items_nested * size,
"list_nested_inner_required_i64": items_required_nested * size,
"list_nested_inner_required_required_i64": items_required_nested_2 * size,
"list_int64": items_nullable,
"list_int64_required": items_required,
"list_int64_required_required": all_required,
"list_int16": i16,
"list_bool": boolean,
"list_utf8": string,
"list_large_binary": string,
"list_nested_i64": items_nested,
"list_nested_inner_required_i64": items_required_nested,
"list_nested_inner_required_required_i64": items_required_nested_2,
},
schema,
f"nested_nullable_{size*10}.parquet",
f"nested_nullable_10.parquet",
)


def case_struct(size):
def case_struct() -> Tuple[dict, pa.Schema, str]:
string = ["Hello", None, "aa", "", None, "abc", None, None, "def", "aaa"]
boolean = [True, None, False, False, None, True, None, None, True, True]
struct_fields = [
Expand All @@ -220,31 +222,30 @@ def case_struct(size):
)

struct = pa.StructArray.from_arrays(
[pa.array(string * size), pa.array(boolean * size)],
[pa.array(string), pa.array(boolean)],
fields=struct_fields,
)
return (
{
"struct": struct,
"struct_struct": pa.StructArray.from_arrays(
[struct, pa.array(boolean * size)],
[struct, pa.array(boolean)],
names=["f1", "f2"],
),
},
schema,
f"struct_nullable_{size*10}.parquet",
f"struct_nullable_10.parquet",
)


def write_pyarrow(
case,
size: int,
page_version: int,
use_dictionary: bool,
multiple_pages: bool,
compression: str,
):
data, schema, path = case(size)
data, schema, path = case

base_path = f"{PYARROW_PATH}/v{page_version}"
if use_dictionary:
Expand Down Expand Up @@ -279,20 +280,20 @@ def write_pyarrow(
for version in [1, 2]:
for use_dict in [True, False]:
for compression in ["lz4", None, "snappy"]:
write_pyarrow(case, 1, version, use_dict, False, compression)
write_pyarrow(case(), version, use_dict, False, compression)


def case_benches(size):
assert size % 8 == 0
data, schema, _ = case_basic_nullable(1)
data, schema, _ = case_basic_nullable()
for k in data:
data[k] = data[k][:8] * (size // 8)
return data, schema, f"benches_{size}.parquet"


def case_benches_required(size):
assert size % 8 == 0
data, schema, _ = case_basic_required(1)
data, schema, _ = case_basic_required()
for k in data:
data[k] = data[k][:8] * (size // 8)
return data, schema, f"benches_required_{size}.parquet"
Expand All @@ -301,14 +302,14 @@ def case_benches_required(size):
# for read benchmarks
for i in range(10, 22, 2):
# two pages (dict)
write_pyarrow(case_benches, 2 ** i, 1, True, False, None)
write_pyarrow(case_benches(2 ** i), 1, True, False, None)
# single page
write_pyarrow(case_benches, 2 ** i, 1, False, False, None)
write_pyarrow(case_benches(2 ** i), 1, False, False, None)
# single page required
write_pyarrow(case_benches_required, 2 ** i, 1, False, False, None)
write_pyarrow(case_benches_required(2 ** i), 1, False, False, None)
# multiple pages
write_pyarrow(case_benches, 2 ** i, 1, False, True, None)
write_pyarrow(case_benches(2 ** i), 1, False, True, None)
# multiple compressed pages
write_pyarrow(case_benches, 2 ** i, 1, False, True, "snappy")
write_pyarrow(case_benches(2 ** i), 1, False, True, "snappy")
# single compressed page
write_pyarrow(case_benches, 2 ** i, 1, False, False, "snappy")
write_pyarrow(case_benches(2 ** i), 1, False, False, "snappy")
Loading

0 comments on commit 3b55eeb

Please sign in to comment.