diff --git a/arrow-parquet-integration-testing/Cargo.toml b/arrow-parquet-integration-testing/Cargo.toml index 347818280a1..570bd8fa6f9 100644 --- a/arrow-parquet-integration-testing/Cargo.toml +++ b/arrow-parquet-integration-testing/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Jorge C. Leitao "] edition = "2021" [dependencies] -clap = "^2.33" +clap = { version = "^3", features = ["derive"] } arrow2 = { path = "../", default-features = false, features = ["io_parquet", "io_json_integration", "io_parquet_compression"] } flate2 = "^1" serde = { version = "^1.0", features = ["rc"] } diff --git a/arrow-parquet-integration-testing/main_spark.py b/arrow-parquet-integration-testing/main_spark.py index 87bb350399e..c33a1aa63ad 100644 --- a/arrow-parquet-integration-testing/main_spark.py +++ b/arrow-parquet-integration-testing/main_spark.py @@ -42,17 +42,17 @@ def test(file: str, version: str, column, compression: str, encoding: str): test("generated_primitive", "2", ("utf8_nullable", 24), "uncompressed", "delta") test("generated_primitive", "2", ("utf8_nullable", 24), "snappy", "delta") -test("generated_dictionary", "1", ("dict0", 0), "uncompressed", "") -test("generated_dictionary", "1", ("dict0", 0), "snappy", "") -test("generated_dictionary", "2", ("dict0", 0), "uncompressed", "") -test("generated_dictionary", "2", ("dict0", 0), "snappy", "") - -test("generated_dictionary", "1", ("dict1", 1), "uncompressed", "") -test("generated_dictionary", "1", ("dict1", 1), "snappy", "") -test("generated_dictionary", "2", ("dict1", 1), "uncompressed", "") -test("generated_dictionary", "2", ("dict1", 1), "snappy", "") - -test("generated_dictionary", "1", ("dict2", 2), "uncompressed", "") -test("generated_dictionary", "1", ("dict2", 2), "snappy", "") -test("generated_dictionary", "2", ("dict2", 2), "uncompressed", "") -test("generated_dictionary", "2", ("dict2", 2), "snappy", "") +test("generated_dictionary", "1", ("dict0", 0), "uncompressed", "plain") +test("generated_dictionary", "1", ("dict0", 0), "snappy", "plain") +test("generated_dictionary", "2", ("dict0", 0), "uncompressed", "plain") +test("generated_dictionary", "2", ("dict0", 0), "snappy", "plain") + +test("generated_dictionary", "1", ("dict1", 1), "uncompressed", "plain") +test("generated_dictionary", "1", ("dict1", 1), "snappy", "plain") +test("generated_dictionary", "2", ("dict1", 1), "uncompressed", "plain") +test("generated_dictionary", "2", ("dict1", 1), "snappy", "plain") + +test("generated_dictionary", "1", ("dict2", 2), "uncompressed", "plain") +test("generated_dictionary", "1", ("dict2", 2), "snappy", "plain") +test("generated_dictionary", "2", ("dict2", 2), "uncompressed", "plain") +test("generated_dictionary", "2", ("dict2", 2), "snappy", "plain") diff --git a/arrow-parquet-integration-testing/src/main.rs b/arrow-parquet-integration-testing/src/main.rs index b3eac1a6c44..787474cce7d 100644 --- a/arrow-parquet-integration-testing/src/main.rs +++ b/arrow-parquet-integration-testing/src/main.rs @@ -1,7 +1,3 @@ -use std::fs::File; -use std::sync::Arc; -use std::{collections::HashMap, io::Read}; - use arrow2::array::Array; use arrow2::io::ipc::IpcField; use arrow2::{ @@ -12,14 +8,16 @@ use arrow2::{ json_integration::read, json_integration::ArrowJson, parquet::write::{ - Compression, Encoding, FileWriter, RowGroupIterator, Version, WriteOptions, + Compression as ParquetCompression, Encoding, FileWriter, RowGroupIterator, + Version as ParquetVersion, WriteOptions, }, }, }; - -use clap::{App, Arg}; - +use clap::Parser; use flate2::read::GzDecoder; +use std::fs::File; +use std::sync::Arc; +use std::{collections::HashMap, io::Read}; /// Read gzipped JSON file pub fn read_gzip_json( @@ -59,69 +57,72 @@ pub fn read_gzip_json( Ok((schema, ipc_fields, batches)) } +#[derive(clap::ArgEnum, Debug, Clone)] +enum Version { + #[clap(name = "1")] + V1, + #[clap(name = "2")] + V2, +} + +impl Into for Version { + fn into(self) -> ParquetVersion { + match self { + Version::V1 => ParquetVersion::V1, + Version::V2 => ParquetVersion::V2, + } + } +} + +#[derive(clap::ArgEnum, Debug, Clone)] +enum Compression { + Zstd, + Snappy, + Uncompressed, +} + +impl Into for Compression { + fn into(self) -> ParquetCompression { + match self { + Compression::Zstd => ParquetCompression::Zstd, + Compression::Snappy => ParquetCompression::Snappy, + Compression::Uncompressed => ParquetCompression::Uncompressed, + } + } +} + +#[derive(clap::ArgEnum, PartialEq, Debug, Clone)] +enum EncodingScheme { + Plain, + Delta, +} + +#[derive(Debug, Parser)] +struct Args { + #[clap(short, long, help = "Path to JSON file")] + json: String, + #[clap(short('o'), long("output"), help = "Path to write parquet file")] + write_path: String, + #[clap(short, long, arg_enum, help = "Parquet version", default_value_t = Version::V2)] + version: Version, + #[clap(short, long, help = "commas separated projection")] + projection: Option, + #[clap(short, long, arg_enum, help = "encoding scheme for utf8", default_value_t = EncodingScheme::Plain)] + encoding_utf8: EncodingScheme, + #[clap(short, long, arg_enum)] + compression: Compression, +} + fn main() -> Result<()> { - let matches = App::new("json-parquet-integration") - .arg( - Arg::with_name("json") - .long("json") - .required(true) - .takes_value(true), - ) - .arg( - Arg::with_name("write_path") - .long("output") - .required(true) - .takes_value(true), - ) - .arg( - Arg::with_name("version") - .long("version") - .required(true) - .takes_value(true), - ) - .arg( - Arg::with_name("projection") - .long("projection") - .required(false) - .takes_value(true), - ) - .arg( - Arg::with_name("encoding-utf8") - .long("encoding-utf8") - .required(true) - .takes_value(true), - ) - .arg( - Arg::with_name("compression") - .long("compression") - .required(true) - .takes_value(true), - ) - .get_matches(); - let json_file = matches - .value_of("json") - .expect("must provide path to json file"); - let write_path = matches - .value_of("write_path") - .expect("must provide path to write parquet"); - let version = matches - .value_of("version") - .expect("must provide version of parquet"); - let projection = matches.value_of("projection"); - let utf8_encoding = matches - .value_of("encoding-utf8") - .expect("must provide utf8 type encoding"); - let compression = matches - .value_of("compression") - .expect("must provide compression"); - - let projection = projection.map(|x| { + let args = Args::parse(); + + let projection = args.projection.map(|x| { x.split(',') .map(|x| x.parse::().unwrap()) .collect::>() }); - let (schema, _, batches) = read_gzip_json("1.0.0-littleendian", json_file)?; + let (schema, _, batches) = read_gzip_json("1.0.0-littleendian", &args.json)?; let schema = if let Some(projection) = &projection { let fields = schema @@ -164,23 +165,10 @@ fn main() -> Result<()> { batches }; - let version = if version == "1" { - Version::V1 - } else { - Version::V2 - }; - - let compression = match compression { - "uncompressed" => Compression::Uncompressed, - "zstd" => Compression::Zstd, - "snappy" => Compression::Snappy, - other => todo!("{}", other), - }; - let options = WriteOptions { write_statistics: true, - compression, - version, + compression: args.compression.into(), + version: args.version.into(), }; let encodings = schema @@ -189,7 +177,7 @@ fn main() -> Result<()> { .map(|x| match x.data_type() { DataType::Dictionary(..) => Encoding::RleDictionary, DataType::Utf8 | DataType::LargeUtf8 => { - if utf8_encoding == "delta" { + if args.encoding_utf8 == EncodingScheme::Delta { Encoding::DeltaLengthByteArray } else { Encoding::Plain @@ -202,7 +190,7 @@ fn main() -> Result<()> { let row_groups = RowGroupIterator::try_new(batches.into_iter().map(Ok), &schema, options, encodings)?; - let writer = File::create(write_path)?; + let writer = File::create(args.write_path)?; let mut writer = FileWriter::try_new(writer, schema, options)?; diff --git a/integration-testing/Cargo.toml b/integration-testing/Cargo.toml index b2343dd15ff..3501c3ddea6 100644 --- a/integration-testing/Cargo.toml +++ b/integration-testing/Cargo.toml @@ -31,7 +31,7 @@ logging = ["tracing-subscriber"] arrow2 = { path = "../", features = ["io_ipc", "io_ipc_compression", "io_flight", "io_json_integration"] } arrow-format = { version = "0.4", features = ["full"] } async-trait = "0.1.41" -clap = "2.33" +clap = { version = "^3", features = ["derive"] } futures = "0.3" hex = "0.4" prost = "0.9" diff --git a/integration-testing/src/bin/arrow-file-to-stream.rs b/integration-testing/src/bin/arrow-file-to-stream.rs index f0f18009dc5..ae8ed094fdc 100644 --- a/integration-testing/src/bin/arrow-file-to-stream.rs +++ b/integration-testing/src/bin/arrow-file-to-stream.rs @@ -15,16 +15,21 @@ // specific language governing permissions and limitations // under the License. -use std::env; -use std::fs::File; - use arrow2::error::Result; use arrow2::io::ipc::read; use arrow2::io::ipc::write; +use clap::Parser; +use std::fs::File; + +#[derive(Debug, Parser)] +#[clap(author, version, about("Read an arrow file and stream to stdout"), long_about = None)] +struct Args { + file_name: String, +} fn main() -> Result<()> { - let args: Vec = env::args().collect(); - let filename = &args[1]; + let args = Args::parse(); + let filename = args.file_name; let mut f = File::open(filename)?; let metadata = read::read_file_metadata(&mut f)?; let mut reader = read::FileReader::new(f, metadata.clone(), None); diff --git a/integration-testing/src/bin/arrow-json-integration-test.rs b/integration-testing/src/bin/arrow-json-integration-test.rs index dfd61913af3..b8bc0c8e8ac 100644 --- a/integration-testing/src/bin/arrow-json-integration-test.rs +++ b/integration-testing/src/bin/arrow-json-integration-test.rs @@ -1,7 +1,7 @@ use std::fs::File; use arrow2::io::json_integration::ArrowJson; -use clap::{App, Arg}; +use clap::Parser; use arrow2::io::ipc::read; use arrow2::io::ipc::write; @@ -11,49 +11,40 @@ use arrow2::{ }; use arrow_integration_testing::read_json_file; +#[derive(Debug, Clone, clap::ArgEnum)] +#[clap(rename_all = "SCREAMING_SNAKE_CASE")] +enum Mode { + ArrowToJson, + JsonToArrow, + Validate, +} + +#[derive(Debug, Parser)] +struct Args { + #[clap(short, long, help = "integration flag")] + integration: bool, + #[clap(short, long, help = "Path to arrow file")] + arrow: String, + #[clap(short, long, help = "Path to json file")] + json: String, + #[clap(arg_enum, short, long)] + mode: Mode, + #[clap(short, long, help = "enable/disable verbose mode")] + verbose: bool, +} + fn main() -> Result<()> { - let matches = App::new("rust arrow-json-integration-test") - .arg(Arg::with_name("integration").long("integration")) - .arg( - Arg::with_name("arrow") - .long("arrow") - .help("path to ARROW file") - .takes_value(true), - ) - .arg( - Arg::with_name("json") - .long("json") - .help("path to JSON file") - .takes_value(true), - ) - .arg( - Arg::with_name("mode") - .long("mode") - .help("mode of integration testing tool (ARROW_TO_JSON, JSON_TO_ARROW, VALIDATE)") - .takes_value(true) - .default_value("VALIDATE"), - ) - .arg( - Arg::with_name("verbose") - .long("verbose") - .help("enable/disable verbose mode"), - ) - .get_matches(); - - let arrow_file = matches - .value_of("arrow") - .expect("must provide path to arrow file"); - let json_file = matches - .value_of("json") - .expect("must provide path to json file"); - let mode = matches.value_of("mode").unwrap(); - let verbose = true; //matches.value_of("verbose").is_some(); + let args = Args::parse(); + + let arrow_file = args.arrow; + let json_file = args.json; + let mode = args.mode; + let verbose = args.verbose; match mode { - "JSON_TO_ARROW" => json_to_arrow(json_file, arrow_file, verbose), - "ARROW_TO_JSON" => arrow_to_json(arrow_file, json_file, verbose), - "VALIDATE" => validate(arrow_file, json_file, verbose), - _ => panic!("mode {} not supported", mode), + Mode::JsonToArrow => json_to_arrow(&json_file, &arrow_file, verbose), + Mode::ArrowToJson => arrow_to_json(&arrow_file, &json_file, verbose), + Mode::Validate => validate(&arrow_file, &json_file, verbose), } } diff --git a/integration-testing/src/bin/flight-test-integration-client.rs b/integration-testing/src/bin/flight-test-integration-client.rs index b7bb5d1d79f..f380627b04c 100644 --- a/integration-testing/src/bin/flight-test-integration-client.rs +++ b/integration-testing/src/bin/flight-test-integration-client.rs @@ -16,42 +16,54 @@ // under the License. use arrow_integration_testing::flight_client_scenarios; - -use clap::{App, Arg}; - +use clap::Parser; type Error = Box; type Result = std::result::Result; +#[derive(clap::ArgEnum, Debug, Clone)] +enum Scenario { + Middleware, + #[clap(name = "auth:basic_proto")] + AuthBasicProto, +} + +#[derive(Debug, Parser)] +#[clap(author, version, about("rust flight-test-integration-client"), long_about = None)] +struct Args { + #[clap(long, help = "host of flight server")] + host: String, + #[clap(long, help = "port of flight server")] + port: u16, + #[clap( + short, + long, + help = "path to the descriptor file, only used when scenario is not provided" + )] + path: Option, + #[clap(long, arg_enum)] + scenario: Option, +} + #[tokio::main] async fn main() -> Result { #[cfg(feature = "logging")] tracing_subscriber::fmt::init(); - let matches = App::new("rust flight-test-integration-client") - .arg(Arg::with_name("host").long("host").takes_value(true)) - .arg(Arg::with_name("port").long("port").takes_value(true)) - .arg(Arg::with_name("path").long("path").takes_value(true)) - .arg( - Arg::with_name("scenario") - .long("scenario") - .takes_value(true), - ) - .get_matches(); + let args = Args::parse(); + let host = args.host; + let port = args.port; - let host = matches.value_of("host").expect("Host is required"); - let port = matches.value_of("port").expect("Port is required"); - - match matches.value_of("scenario") { - Some("middleware") => flight_client_scenarios::middleware::run_scenario(host, port).await?, - Some("auth:basic_proto") => { - flight_client_scenarios::auth_basic_proto::run_scenario(host, port).await? + match args.scenario { + Some(Scenario::Middleware) => { + flight_client_scenarios::middleware::run_scenario(&host, port).await? + } + Some(Scenario::AuthBasicProto) => { + flight_client_scenarios::auth_basic_proto::run_scenario(&host, port).await? } - Some(scenario_name) => unimplemented!("Scenario not found: {}", scenario_name), None => { - let path = matches - .value_of("path") - .expect("Path is required if scenario is not specified"); - flight_client_scenarios::integration_test::run_scenario(host, port, path).await?; + let path = args.path.expect("No path is given"); + flight_client_scenarios::integration_test::run_scenario(&host, port, &path) + .await?; } } diff --git a/integration-testing/src/bin/flight-test-integration-server.rs b/integration-testing/src/bin/flight-test-integration-server.rs index 45a08080499..6ed22ad81d9 100644 --- a/integration-testing/src/bin/flight-test-integration-server.rs +++ b/integration-testing/src/bin/flight-test-integration-server.rs @@ -15,36 +15,43 @@ // specific language governing permissions and limitations // under the License. -use clap::{App, Arg}; - use arrow_integration_testing::flight_server_scenarios; +use clap::Parser; type Error = Box; type Result = std::result::Result; +#[derive(clap::ArgEnum, Debug, Clone)] +enum Scenario { + Middleware, + #[clap(name = "auth:basic_proto")] + AuthBasicProto, +} + +#[derive(Debug, Parser)] +#[clap(author, version, about("rust flight-test-integration-server"), long_about = None)] +struct Args { + #[clap(long)] + port: u16, + #[clap(long, arg_enum)] + scenario: Option, +} + #[tokio::main] async fn main() -> Result { #[cfg(feature = "logging")] tracing_subscriber::fmt::init(); - let matches = App::new("rust flight-test-integration-server") - .about("Integration testing server for Flight.") - .arg(Arg::with_name("port").long("port").takes_value(true)) - .arg( - Arg::with_name("scenario") - .long("scenario") - .takes_value(true), - ) - .get_matches(); - - let port = matches.value_of("port").unwrap_or("0"); + let args = Args::parse(); + let port = args.port; - match matches.value_of("scenario") { - Some("middleware") => flight_server_scenarios::middleware::scenario_setup(port).await?, - Some("auth:basic_proto") => { + match args.scenario { + Some(Scenario::Middleware) => { + flight_server_scenarios::middleware::scenario_setup(port).await? + } + Some(Scenario::AuthBasicProto) => { flight_server_scenarios::auth_basic_proto::scenario_setup(port).await? } - Some(scenario_name) => unimplemented!("Scenario not found: {}", scenario_name), None => { flight_server_scenarios::integration_test::scenario_setup(port).await?; } diff --git a/integration-testing/src/flight_client_scenarios/auth_basic_proto.rs b/integration-testing/src/flight_client_scenarios/auth_basic_proto.rs index 58258e2164c..2c47705e45a 100644 --- a/integration-testing/src/flight_client_scenarios/auth_basic_proto.rs +++ b/integration-testing/src/flight_client_scenarios/auth_basic_proto.rs @@ -28,7 +28,7 @@ type Result = std::result::Result; type Client = FlightServiceClient; -pub async fn run_scenario(host: &str, port: &str) -> Result { +pub async fn run_scenario(host: &str, port: u16) -> Result { let url = format!("http://{}:{}", host, port); let mut client = FlightServiceClient::connect(url).await?; diff --git a/integration-testing/src/flight_client_scenarios/integration_test.rs b/integration-testing/src/flight_client_scenarios/integration_test.rs index fe56eff4d96..48d134d48a7 100644 --- a/integration-testing/src/flight_client_scenarios/integration_test.rs +++ b/integration-testing/src/flight_client_scenarios/integration_test.rs @@ -50,7 +50,7 @@ type Client = FlightServiceClient; type ChunkArc = Chunk>; -pub async fn run_scenario(host: &str, port: &str, path: &str) -> Result { +pub async fn run_scenario(host: &str, port: u16, path: &str) -> Result { let url = format!("http://{}:{}", host, port); let client = FlightServiceClient::connect(url).await?; diff --git a/integration-testing/src/flight_client_scenarios/middleware.rs b/integration-testing/src/flight_client_scenarios/middleware.rs index f67580ada87..29c96ce67ea 100644 --- a/integration-testing/src/flight_client_scenarios/middleware.rs +++ b/integration-testing/src/flight_client_scenarios/middleware.rs @@ -22,7 +22,7 @@ use tonic::{Request, Status}; type Error = Box; type Result = std::result::Result; -pub async fn run_scenario(host: &str, port: &str) -> Result { +pub async fn run_scenario(host: &str, port: u16) -> Result { let url = format!("http://{}:{}", host, port); let conn = tonic::transport::Endpoint::new(url)?.connect().await?; let mut client = FlightServiceClient::with_interceptor(conn, middleware_interceptor); diff --git a/integration-testing/src/flight_server_scenarios.rs b/integration-testing/src/flight_server_scenarios.rs index a8aab14712e..5cdb930d09d 100644 --- a/integration-testing/src/flight_server_scenarios.rs +++ b/integration-testing/src/flight_server_scenarios.rs @@ -27,7 +27,7 @@ pub mod middleware; type Error = Box; type Result = std::result::Result; -pub async fn listen_on(port: &str) -> Result { +pub async fn listen_on(port: u16) -> Result { let addr: SocketAddr = format!("0.0.0.0:{}", port).parse()?; let listener = TcpListener::bind(addr).await?; diff --git a/integration-testing/src/flight_server_scenarios/auth_basic_proto.rs b/integration-testing/src/flight_server_scenarios/auth_basic_proto.rs index fc5c936a3a3..0938de27dbe 100644 --- a/integration-testing/src/flight_server_scenarios/auth_basic_proto.rs +++ b/integration-testing/src/flight_server_scenarios/auth_basic_proto.rs @@ -32,7 +32,7 @@ use prost::Message; use crate::{AUTH_PASSWORD, AUTH_USERNAME}; -pub async fn scenario_setup(port: &str) -> Result { +pub async fn scenario_setup(port: u16) -> Result { let service = AuthBasicProtoScenarioImpl { username: AUTH_USERNAME.into(), password: AUTH_PASSWORD.into(), diff --git a/integration-testing/src/flight_server_scenarios/integration_test.rs b/integration-testing/src/flight_server_scenarios/integration_test.rs index f971036e0ef..5c3129dcc8c 100644 --- a/integration-testing/src/flight_server_scenarios/integration_test.rs +++ b/integration-testing/src/flight_server_scenarios/integration_test.rs @@ -41,7 +41,7 @@ type TonicStream = Pin + Send + Sync + 'static>>; type Error = Box; type Result = std::result::Result; -pub async fn scenario_setup(port: &str) -> Result { +pub async fn scenario_setup(port: u16) -> Result { let addr = super::listen_on(port).await?; let service = FlightServiceImpl { @@ -175,11 +175,12 @@ impl FlightService for FlightServiceImpl { let total_records: usize = flight.chunks.iter().map(|chunk| chunk.len()).sum(); - let schema = serialize_schema_to_info(&flight.schema, Some(&flight.ipc_schema.fields)) - .expect( - "Could not generate schema bytes from schema stored by a DoPut; \ + let schema = + serialize_schema_to_info(&flight.schema, Some(&flight.ipc_schema.fields)) + .expect( + "Could not generate schema bytes from schema stored by a DoPut; \ this should be impossible", - ); + ); let info = FlightInfo { schema, diff --git a/integration-testing/src/flight_server_scenarios/middleware.rs b/integration-testing/src/flight_server_scenarios/middleware.rs index ceffe5a8c9b..3873dca75ae 100644 --- a/integration-testing/src/flight_server_scenarios/middleware.rs +++ b/integration-testing/src/flight_server_scenarios/middleware.rs @@ -17,8 +17,8 @@ use std::pin::Pin; -use arrow_format::flight::data::*; use arrow_format::flight::data::flight_descriptor::DescriptorType; +use arrow_format::flight::data::*; use arrow_format::flight::service::flight_service_server::{FlightService, FlightServiceServer}; use futures::Stream; use tonic::{transport::Server, Request, Response, Status, Streaming}; @@ -28,7 +28,7 @@ type TonicStream = Pin + Send + Sync + 'static>>; type Error = Box; type Result = std::result::Result; -pub async fn scenario_setup(port: &str) -> Result { +pub async fn scenario_setup(port: u16) -> Result { let service = MiddlewareScenarioImpl {}; let svc = FlightServiceServer::new(service); let addr = super::listen_on(port).await?;