Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Improved integration testing with parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Jan 7, 2022
1 parent 299df30 commit 5445366
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 15 deletions.
2 changes: 1 addition & 1 deletion arrow-parquet-integration-testing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ edition = "2018"

[dependencies]
clap = "^2.33"
arrow2 = { path = "../", default-features = false, features = ["io_parquet", "io_json_integration"] }
arrow2 = { path = "../", default-features = false, features = ["io_parquet", "io_json_integration", "io_parquet_compression"] }
flate2 = "^1"
serde = { version = "^1.0", features = ["rc"] }
serde_derive = { version = "^1.0" }
Expand Down
13 changes: 9 additions & 4 deletions arrow-parquet-integration-testing/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ def get_file_path(file: str):
return f"../testing/arrow-testing/data/arrow-ipc-stream/integration/1.0.0-littleendian/{file}.arrow_file"


def _prepare(file: str, version: str, encoding_utf8: str, projection=None):
def _prepare(
file: str, version: str, compression: str, encoding_utf8: str, projection=None
):
write = f"{file}.parquet"

args = [
Expand All @@ -24,6 +26,8 @@ def _prepare(file: str, version: str, encoding_utf8: str, projection=None):
version,
"--encoding-utf8",
encoding_utf8,
"--compression",
compression,
]

if projection:
Expand Down Expand Up @@ -76,13 +80,14 @@ def variations():
# pyarrow does not support decoding "delta"-encoded values.
# for encoding in ["plain", "delta"]:
for encoding in ["plain"]:
yield (version, file, encoding)
for compression in ["uncompressed", "zstd", "snappy"]:
yield (version, file, compression, encoding)


if __name__ == "__main__":
for (version, file, encoding_utf8) in variations():
for (version, file, compression, encoding_utf8) in variations():
expected = _expected(file)
path = _prepare(file, version, encoding_utf8)
path = _prepare(file, version, compression, encoding_utf8)

table = pq.read_table(path)
os.remove(path)
Expand Down
25 changes: 16 additions & 9 deletions arrow-parquet-integration-testing/main_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from main import _prepare, _expected


def test(file: str, version: str, column, encoding: str):
def test(file: str, version: str, column, compression: str, encoding: str):
"""
Tests that pyspark can read a parquet file written by arrow2.
Expand All @@ -17,7 +17,7 @@ def test(file: str, version: str, column, encoding: str):
assert that they are equal
"""
# write parquet
path = _prepare(file, version, encoding, [column[1]])
path = _prepare(file, version, compression, encoding, [column[1]])

# read IPC to Python
expected = _expected(file)
Expand All @@ -39,13 +39,20 @@ def test(file: str, version: str, column, encoding: str):
assert expected == result


test("generated_primitive", "2", ("utf8_nullable", 24), "delta")
test("generated_primitive", "2", ("utf8_nullable", 24), "uncompressed", "delta")
test("generated_primitive", "2", ("utf8_nullable", 24), "snappy", "delta")

test("generated_dictionary", "1", ("dict0", 0), "")
test("generated_dictionary", "2", ("dict0", 0), "")
test("generated_dictionary", "1", ("dict0", 0), "uncompressed", "")
test("generated_dictionary", "1", ("dict0", 0), "snappy", "")
test("generated_dictionary", "2", ("dict0", 0), "uncompressed", "")
test("generated_dictionary", "2", ("dict0", 0), "snappy", "")

test("generated_dictionary", "1", ("dict1", 1), "")
test("generated_dictionary", "2", ("dict1", 1), "")
test("generated_dictionary", "1", ("dict1", 1), "uncompressed", "")
test("generated_dictionary", "1", ("dict1", 1), "snappy", "")
test("generated_dictionary", "2", ("dict1", 1), "uncompressed", "")
test("generated_dictionary", "2", ("dict1", 1), "snappy", "")

test("generated_dictionary", "1", ("dict2", 2), "")
test("generated_dictionary", "2", ("dict2", 2), "")
test("generated_dictionary", "1", ("dict2", 2), "uncompressed", "")
test("generated_dictionary", "1", ("dict2", 2), "snappy", "")
test("generated_dictionary", "2", ("dict2", 2), "uncompressed", "")
test("generated_dictionary", "2", ("dict2", 2), "snappy", "")
18 changes: 17 additions & 1 deletion arrow-parquet-integration-testing/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ fn main() -> Result<()> {
.required(true)
.takes_value(true),
)
.arg(
Arg::with_name("compression")
.long("compression")
.required(true)
.takes_value(true),
)
.get_matches();
let json_file = matches
.value_of("json")
Expand All @@ -105,6 +111,9 @@ fn main() -> Result<()> {
let utf8_encoding = matches
.value_of("encoding-utf8")
.expect("must provide utf8 type encoding");
let compression = matches
.value_of("compression")
.expect("must provide compression");

let projection = projection.map(|x| {
x.split(',')
Expand Down Expand Up @@ -161,9 +170,16 @@ fn main() -> Result<()> {
Version::V2
};

let compression = match compression {
"uncompressed" => Compression::Uncompressed,
"zstd" => Compression::Zstd,
"snappy" => Compression::Snappy,
other => todo!("{}", other),
};

let options = WriteOptions {
write_statistics: true,
compression: Compression::Uncompressed,
compression,
version,
};

Expand Down

0 comments on commit 5445366

Please sign in to comment.