diff --git a/arrow-parquet-integration-testing/Cargo.toml b/arrow-parquet-integration-testing/Cargo.toml index 3bc31198779..5e8e3454ce8 100644 --- a/arrow-parquet-integration-testing/Cargo.toml +++ b/arrow-parquet-integration-testing/Cargo.toml @@ -6,7 +6,7 @@ edition = "2018" [dependencies] clap = "^2.33" -arrow2 = { path = "../", default-features = false, features = ["io_parquet", "io_json_integration"] } +arrow2 = { path = "../", default-features = false, features = ["io_parquet", "io_json_integration", "io_parquet_compression"] } flate2 = "^1" serde = { version = "^1.0", features = ["rc"] } serde_derive = { version = "^1.0" } diff --git a/arrow-parquet-integration-testing/main.py b/arrow-parquet-integration-testing/main.py index 62a477d398a..545a6e08b51 100644 --- a/arrow-parquet-integration-testing/main.py +++ b/arrow-parquet-integration-testing/main.py @@ -9,7 +9,9 @@ def get_file_path(file: str): return f"../testing/arrow-testing/data/arrow-ipc-stream/integration/1.0.0-littleendian/{file}.arrow_file" -def _prepare(file: str, version: str, encoding_utf8: str, projection=None): +def _prepare( + file: str, version: str, compression: str, encoding_utf8: str, projection=None +): write = f"{file}.parquet" args = [ @@ -24,6 +26,8 @@ def _prepare(file: str, version: str, encoding_utf8: str, projection=None): version, "--encoding-utf8", encoding_utf8, + "--compression", + compression, ] if projection: @@ -76,13 +80,14 @@ def variations(): # pyarrow does not support decoding "delta"-encoded values. # for encoding in ["plain", "delta"]: for encoding in ["plain"]: - yield (version, file, encoding) + for compression in ["uncompressed", "zstd", "snappy"]: + yield (version, file, compression, encoding) if __name__ == "__main__": - for (version, file, encoding_utf8) in variations(): + for (version, file, compression, encoding_utf8) in variations(): expected = _expected(file) - path = _prepare(file, version, encoding_utf8) + path = _prepare(file, version, compression, encoding_utf8) table = pq.read_table(path) os.remove(path) diff --git a/arrow-parquet-integration-testing/main_spark.py b/arrow-parquet-integration-testing/main_spark.py index 876a1a808c4..87bb350399e 100644 --- a/arrow-parquet-integration-testing/main_spark.py +++ b/arrow-parquet-integration-testing/main_spark.py @@ -7,7 +7,7 @@ from main import _prepare, _expected -def test(file: str, version: str, column, encoding: str): +def test(file: str, version: str, column, compression: str, encoding: str): """ Tests that pyspark can read a parquet file written by arrow2. @@ -17,7 +17,7 @@ def test(file: str, version: str, column, encoding: str): assert that they are equal """ # write parquet - path = _prepare(file, version, encoding, [column[1]]) + path = _prepare(file, version, compression, encoding, [column[1]]) # read IPC to Python expected = _expected(file) @@ -39,13 +39,20 @@ def test(file: str, version: str, column, encoding: str): assert expected == result -test("generated_primitive", "2", ("utf8_nullable", 24), "delta") +test("generated_primitive", "2", ("utf8_nullable", 24), "uncompressed", "delta") +test("generated_primitive", "2", ("utf8_nullable", 24), "snappy", "delta") -test("generated_dictionary", "1", ("dict0", 0), "") -test("generated_dictionary", "2", ("dict0", 0), "") +test("generated_dictionary", "1", ("dict0", 0), "uncompressed", "") +test("generated_dictionary", "1", ("dict0", 0), "snappy", "") +test("generated_dictionary", "2", ("dict0", 0), "uncompressed", "") +test("generated_dictionary", "2", ("dict0", 0), "snappy", "") -test("generated_dictionary", "1", ("dict1", 1), "") -test("generated_dictionary", "2", ("dict1", 1), "") +test("generated_dictionary", "1", ("dict1", 1), "uncompressed", "") +test("generated_dictionary", "1", ("dict1", 1), "snappy", "") +test("generated_dictionary", "2", ("dict1", 1), "uncompressed", "") +test("generated_dictionary", "2", ("dict1", 1), "snappy", "") -test("generated_dictionary", "1", ("dict2", 2), "") -test("generated_dictionary", "2", ("dict2", 2), "") +test("generated_dictionary", "1", ("dict2", 2), "uncompressed", "") +test("generated_dictionary", "1", ("dict2", 2), "snappy", "") +test("generated_dictionary", "2", ("dict2", 2), "uncompressed", "") +test("generated_dictionary", "2", ("dict2", 2), "snappy", "") diff --git a/arrow-parquet-integration-testing/src/main.rs b/arrow-parquet-integration-testing/src/main.rs index a8fa3e6d899..e3f438f9e2f 100644 --- a/arrow-parquet-integration-testing/src/main.rs +++ b/arrow-parquet-integration-testing/src/main.rs @@ -91,6 +91,12 @@ fn main() -> Result<()> { .required(true) .takes_value(true), ) + .arg( + Arg::with_name("compression") + .long("compression") + .required(true) + .takes_value(true), + ) .get_matches(); let json_file = matches .value_of("json") @@ -105,6 +111,9 @@ fn main() -> Result<()> { let utf8_encoding = matches .value_of("encoding-utf8") .expect("must provide utf8 type encoding"); + let compression = matches + .value_of("compression") + .expect("must provide compression"); let projection = projection.map(|x| { x.split(',') @@ -161,9 +170,16 @@ fn main() -> Result<()> { Version::V2 }; + let compression = match compression { + "uncompressed" => Compression::Uncompressed, + "zstd" => Compression::Zstd, + "snappy" => Compression::Snappy, + other => todo!("{}", other), + }; + let options = WriteOptions { write_statistics: true, - compression: Compression::Uncompressed, + compression, version, };