From 4db04451f566d85c988e75d4589737a27c411c9d Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Thu, 3 Feb 2022 12:36:50 +0800 Subject: [PATCH] upgrade to clap 3.0 --- arrow-parquet-integration-testing/Cargo.toml | 2 +- arrow-parquet-integration-testing/src/main.rs | 152 +++++++-------- integration-testing/Cargo.toml | 2 +- .../src/bin/arrow-file-to-stream.rs | 33 ++-- .../src/bin/arrow-json-integration-test.rs | 180 ++++++++++-------- .../src/bin/arrow-stream-to-file.rs | 23 +-- .../src/bin/flight-test-integration-client.rs | 62 +++--- .../src/bin/flight-test-integration-server.rs | 41 ++-- 8 files changed, 257 insertions(+), 238 deletions(-) diff --git a/arrow-parquet-integration-testing/Cargo.toml b/arrow-parquet-integration-testing/Cargo.toml index 347818280a1..570bd8fa6f9 100644 --- a/arrow-parquet-integration-testing/Cargo.toml +++ b/arrow-parquet-integration-testing/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Jorge C. Leitao "] edition = "2021" [dependencies] -clap = "^2.33" +clap = { version = "^3", features = ["derive"] } arrow2 = { path = "../", default-features = false, features = ["io_parquet", "io_json_integration", "io_parquet_compression"] } flate2 = "^1" serde = { version = "^1.0", features = ["rc"] } diff --git a/arrow-parquet-integration-testing/src/main.rs b/arrow-parquet-integration-testing/src/main.rs index b3eac1a6c44..77886e03fb6 100644 --- a/arrow-parquet-integration-testing/src/main.rs +++ b/arrow-parquet-integration-testing/src/main.rs @@ -1,7 +1,3 @@ -use std::fs::File; -use std::sync::Arc; -use std::{collections::HashMap, io::Read}; - use arrow2::array::Array; use arrow2::io::ipc::IpcField; use arrow2::{ @@ -12,14 +8,16 @@ use arrow2::{ json_integration::read, json_integration::ArrowJson, parquet::write::{ - Compression, Encoding, FileWriter, RowGroupIterator, Version, WriteOptions, + Compression as ParquetCompression, Encoding, FileWriter, RowGroupIterator, + Version as ParquetVersion, WriteOptions, }, }, }; - -use clap::{App, Arg}; - +use clap::Parser; use flate2::read::GzDecoder; +use std::fs::File; +use std::sync::Arc; +use std::{collections::HashMap, io::Read}; /// Read gzipped JSON file pub fn read_gzip_json( @@ -59,69 +57,72 @@ pub fn read_gzip_json( Ok((schema, ipc_fields, batches)) } +#[derive(clap::ArgEnum, Debug, Clone)] +enum Version { + #[clap(name = "1")] + V1, + #[clap(name = "2")] + V2, +} + +impl Into for Version { + fn into(self) -> ParquetVersion { + match self { + Version::V1 => ParquetVersion::V1, + Version::V2 => ParquetVersion::V2, + } + } +} + +#[derive(clap::ArgEnum, Debug, Clone)] +enum Compression { + Zstd, + Snappy, + Uncompressed, +} + +impl Into for Compression { + fn into(self) -> ParquetCompression { + match self { + Compression::Zstd => ParquetCompression::Zstd, + Compression::Snappy => ParquetCompression::Snappy, + Compression::Uncompressed => ParquetCompression::Uncompressed, + } + } +} + +#[derive(clap::ArgEnum, PartialEq, Debug, Clone)] +enum EncodingScheme { + Plain, + Delta, +} + +#[derive(Debug, Parser)] +struct Args { + #[clap(short, long, help = "Path to JSON file")] + json: String, + #[clap(short('o'), long("output"), help = "Path to write parquet file")] + write_path: String, + #[clap(short, long, arg_enum, help = "Parquet version", default_value_t = Version::V2)] + version: Version, + #[clap(short, long, help = "commas separated projection")] + projection: Option, + #[clap(short, long, arg_enum, help = "encoding scheme for utf8")] + encoding_utf8: EncodingScheme, + #[clap(short, long, arg_enum)] + compression: Compression, +} + fn main() -> Result<()> { - let matches = App::new("json-parquet-integration") - .arg( - Arg::with_name("json") - .long("json") - .required(true) - .takes_value(true), - ) - .arg( - Arg::with_name("write_path") - .long("output") - .required(true) - .takes_value(true), - ) - .arg( - Arg::with_name("version") - .long("version") - .required(true) - .takes_value(true), - ) - .arg( - Arg::with_name("projection") - .long("projection") - .required(false) - .takes_value(true), - ) - .arg( - Arg::with_name("encoding-utf8") - .long("encoding-utf8") - .required(true) - .takes_value(true), - ) - .arg( - Arg::with_name("compression") - .long("compression") - .required(true) - .takes_value(true), - ) - .get_matches(); - let json_file = matches - .value_of("json") - .expect("must provide path to json file"); - let write_path = matches - .value_of("write_path") - .expect("must provide path to write parquet"); - let version = matches - .value_of("version") - .expect("must provide version of parquet"); - let projection = matches.value_of("projection"); - let utf8_encoding = matches - .value_of("encoding-utf8") - .expect("must provide utf8 type encoding"); - let compression = matches - .value_of("compression") - .expect("must provide compression"); - - let projection = projection.map(|x| { + let args = Args::parse(); + + let projection = args.projection.map(|x| { x.split(',') .map(|x| x.parse::().unwrap()) .collect::>() }); - let (schema, _, batches) = read_gzip_json("1.0.0-littleendian", json_file)?; + let (schema, _, batches) = read_gzip_json("1.0.0-littleendian", &args.json)?; let schema = if let Some(projection) = &projection { let fields = schema @@ -164,23 +165,10 @@ fn main() -> Result<()> { batches }; - let version = if version == "1" { - Version::V1 - } else { - Version::V2 - }; - - let compression = match compression { - "uncompressed" => Compression::Uncompressed, - "zstd" => Compression::Zstd, - "snappy" => Compression::Snappy, - other => todo!("{}", other), - }; - let options = WriteOptions { write_statistics: true, - compression, - version, + compression: args.compression.into(), + version: args.version.into(), }; let encodings = schema @@ -189,7 +177,7 @@ fn main() -> Result<()> { .map(|x| match x.data_type() { DataType::Dictionary(..) => Encoding::RleDictionary, DataType::Utf8 | DataType::LargeUtf8 => { - if utf8_encoding == "delta" { + if args.encoding_utf8 == EncodingScheme::Delta { Encoding::DeltaLengthByteArray } else { Encoding::Plain @@ -202,7 +190,7 @@ fn main() -> Result<()> { let row_groups = RowGroupIterator::try_new(batches.into_iter().map(Ok), &schema, options, encodings)?; - let writer = File::create(write_path)?; + let writer = File::create(args.write_path)?; let mut writer = FileWriter::try_new(writer, schema, options)?; diff --git a/integration-testing/Cargo.toml b/integration-testing/Cargo.toml index b2343dd15ff..3501c3ddea6 100644 --- a/integration-testing/Cargo.toml +++ b/integration-testing/Cargo.toml @@ -31,7 +31,7 @@ logging = ["tracing-subscriber"] arrow2 = { path = "../", features = ["io_ipc", "io_ipc_compression", "io_flight", "io_json_integration"] } arrow-format = { version = "0.4", features = ["full"] } async-trait = "0.1.41" -clap = "2.33" +clap = { version = "^3", features = ["derive"] } futures = "0.3" hex = "0.4" prost = "0.9" diff --git a/integration-testing/src/bin/arrow-file-to-stream.rs b/integration-testing/src/bin/arrow-file-to-stream.rs index f0f18009dc5..fbc217aab85 100644 --- a/integration-testing/src/bin/arrow-file-to-stream.rs +++ b/integration-testing/src/bin/arrow-file-to-stream.rs @@ -15,30 +15,31 @@ // specific language governing permissions and limitations // under the License. -use std::env; +use arrow::error::Result; +use arrow::ipc::reader::FileReader; +use arrow::ipc::writer::StreamWriter; +use clap::Parser; use std::fs::File; +use std::io::{self, BufReader}; -use arrow2::error::Result; -use arrow2::io::ipc::read; -use arrow2::io::ipc::write; +#[derive(Debug, Parser)] +#[clap(author, version, about("Read an arrow file and stream to stdout"), long_about = None)] +struct Args { + file_name: String, +} fn main() -> Result<()> { - let args: Vec = env::args().collect(); - let filename = &args[1]; - let mut f = File::open(filename)?; - let metadata = read::read_file_metadata(&mut f)?; - let mut reader = read::FileReader::new(f, metadata.clone(), None); - - let options = write::WriteOptions { compression: None }; - let mut writer = write::StreamWriter::new(std::io::stdout(), options); - - let fields = metadata.ipc_schema.fields.clone(); + let args = Args::parse(); + let f = File::open(&args.file_name)?; + let reader = BufReader::new(f); + let mut reader = FileReader::try_new(reader)?; + let schema = reader.schema(); - writer.start(&metadata.schema, Some(fields))?; + let mut writer = StreamWriter::try_new(io::stdout(), &schema)?; reader.try_for_each(|batch| { let batch = batch?; - writer.write(&batch, None) + writer.write(&batch) })?; writer.finish()?; diff --git a/integration-testing/src/bin/arrow-json-integration-test.rs b/integration-testing/src/bin/arrow-json-integration-test.rs index dfd61913af3..81200b99f98 100644 --- a/integration-testing/src/bin/arrow-json-integration-test.rs +++ b/integration-testing/src/bin/arrow-json-integration-test.rs @@ -1,59 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::error::{ArrowError, Result}; +use arrow::ipc::reader::FileReader; +use arrow::ipc::writer::FileWriter; +use arrow::util::integration_util::*; +use arrow_integration_testing::read_json_file; +use clap::Parser; use std::fs::File; -use arrow2::io::json_integration::ArrowJson; -use clap::{App, Arg}; +#[derive(clap::ArgEnum, Debug, Clone)] +#[clap(rename_all = "SCREAMING_SNAKE_CASE")] +enum Mode { + ArrowToJson, + JsonToArrow, + Validate, +} -use arrow2::io::ipc::read; -use arrow2::io::ipc::write; -use arrow2::{ - error::{ArrowError, Result}, - io::json_integration::write as json_write, -}; -use arrow_integration_testing::read_json_file; +#[derive(Debug, Parser)] +#[clap(author, version, about("rust arrow-json-integration-test"), long_about = None)] +struct Args { + #[clap(short, long)] + integration: bool, + #[clap(short, long, help("Path to ARROW file"))] + arrow: String, + #[clap(short, long, help("Path to JSON file"))] + json: String, + #[clap(arg_enum, short, long, default_value_t = Mode::Validate, help="Mode of integration testing tool")] + mode: Mode, + #[clap(short, long)] + verbose: bool, +} fn main() -> Result<()> { - let matches = App::new("rust arrow-json-integration-test") - .arg(Arg::with_name("integration").long("integration")) - .arg( - Arg::with_name("arrow") - .long("arrow") - .help("path to ARROW file") - .takes_value(true), - ) - .arg( - Arg::with_name("json") - .long("json") - .help("path to JSON file") - .takes_value(true), - ) - .arg( - Arg::with_name("mode") - .long("mode") - .help("mode of integration testing tool (ARROW_TO_JSON, JSON_TO_ARROW, VALIDATE)") - .takes_value(true) - .default_value("VALIDATE"), - ) - .arg( - Arg::with_name("verbose") - .long("verbose") - .help("enable/disable verbose mode"), - ) - .get_matches(); - - let arrow_file = matches - .value_of("arrow") - .expect("must provide path to arrow file"); - let json_file = matches - .value_of("json") - .expect("must provide path to json file"); - let mode = matches.value_of("mode").unwrap(); - let verbose = true; //matches.value_of("verbose").is_some(); - - match mode { - "JSON_TO_ARROW" => json_to_arrow(json_file, arrow_file, verbose), - "ARROW_TO_JSON" => arrow_to_json(arrow_file, json_file, verbose), - "VALIDATE" => validate(arrow_file, json_file, verbose), - _ => panic!("mode {} not supported", mode), + let args = Args::parse(); + let arrow_file = args.arrow; + let json_file = args.json; + let verbose = args.verbose; + match args.mode { + Mode::JsonToArrow => json_to_arrow(&json_file, &arrow_file, verbose), + Mode::ArrowToJson => arrow_to_json(&arrow_file, &json_file, verbose), + Mode::Validate => validate(&arrow_file, &json_file, verbose), } } @@ -65,16 +66,10 @@ fn json_to_arrow(json_name: &str, arrow_name: &str, verbose: bool) -> Result<()> let json_file = read_json_file(json_name)?; let arrow_file = File::create(arrow_name)?; - let options = write::WriteOptions { compression: None }; - let mut writer = write::FileWriter::try_new( - arrow_file, - &json_file.schema, - Some(json_file.fields), - options, - )?; + let mut writer = FileWriter::try_new(arrow_file, &json_file.schema)?; for b in json_file.batches { - writer.write(&b, None)?; + writer.write(&b)?; } writer.finish()?; @@ -87,21 +82,17 @@ fn arrow_to_json(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()> eprintln!("Converting {} to {}", arrow_name, json_name); } - let mut arrow_file = File::open(arrow_name)?; - let metadata = read::read_file_metadata(&mut arrow_file)?; - let reader = read::FileReader::new(arrow_file, metadata.clone(), None); - - let names = metadata - .schema - .fields - .iter() - .map(|f| &f.name) - .collect::>(); + let arrow_file = File::open(arrow_name)?; + let reader = FileReader::try_new(arrow_file)?; - let schema = json_write::serialize_schema(&metadata.schema, &metadata.ipc_schema.fields); + let mut fields: Vec = vec![]; + for f in reader.schema().fields() { + fields.push(ArrowJsonField::from(f)); + } + let schema = ArrowJsonSchema { fields }; let batches = reader - .map(|batch| Ok(json_write::serialize_chunk(&batch?, &names))) + .map(|batch| Ok(ArrowJsonBatch::from_batch(&batch?))) .collect::>>()?; let arrow_json = ArrowJson { @@ -125,20 +116,25 @@ fn validate(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()> { let json_file = read_json_file(json_name)?; // open Arrow file - let mut arrow_file = File::open(arrow_name)?; - let metadata = read::read_file_metadata(&mut arrow_file)?; - let reader = read::FileReader::new(arrow_file, metadata, None); - let arrow_schema = reader.schema(); + let arrow_file = File::open(arrow_name)?; + let mut arrow_reader = FileReader::try_new(arrow_file)?; + let arrow_schema = arrow_reader.schema().as_ref().to_owned(); // compare schemas - if &json_file.schema != arrow_schema { - return Err(ArrowError::InvalidArgumentError(format!( + if json_file.schema != arrow_schema { + return Err(ArrowError::ComputeError(format!( "Schemas do not match. JSON: {:?}. Arrow: {:?}", json_file.schema, arrow_schema ))); } - let json_batches = json_file.batches; + let json_batches = &json_file.batches; + + // compare number of batches + assert!( + json_batches.len() == arrow_reader.num_batches(), + "JSON batches and Arrow batches are unequal" + ); if verbose { eprintln!( @@ -147,8 +143,32 @@ fn validate(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()> { ); } - let batches = reader.collect::>>().unwrap(); + for json_batch in json_batches { + if let Some(Ok(arrow_batch)) = arrow_reader.next() { + // compare batches + let num_columns = arrow_batch.num_columns(); + assert!(num_columns == json_batch.num_columns()); + assert!(arrow_batch.num_rows() == json_batch.num_rows()); + + for i in 0..num_columns { + assert_eq!( + arrow_batch.column(i).data(), + json_batch.column(i).data(), + "Arrow and JSON batch columns not the same" + ); + } + } else { + return Err(ArrowError::ComputeError( + "no more arrow batches left".to_owned(), + )); + } + } + + if arrow_reader.next().is_some() { + return Err(ArrowError::ComputeError( + "no more json batches left".to_owned(), + )); + } - assert_eq!(json_batches, batches); Ok(()) } diff --git a/integration-testing/src/bin/arrow-stream-to-file.rs b/integration-testing/src/bin/arrow-stream-to-file.rs index ab0855bf677..f81d42e6eda 100644 --- a/integration-testing/src/bin/arrow-stream-to-file.rs +++ b/integration-testing/src/bin/arrow-stream-to-file.rs @@ -17,26 +17,17 @@ use std::io; -use arrow2::error::Result; -use arrow2::io::ipc::read; -use arrow2::io::ipc::write; +use arrow::error::Result; +use arrow::ipc::reader::StreamReader; +use arrow::ipc::writer::FileWriter; fn main() -> Result<()> { - let mut reader = io::stdin(); - let metadata = read::read_stream_metadata(&mut reader)?; - let mut arrow_stream_reader = read::StreamReader::new(reader, metadata.clone()); + let mut arrow_stream_reader = StreamReader::try_new(io::stdin())?; + let schema = arrow_stream_reader.schema(); - let writer = io::stdout(); + let mut writer = FileWriter::try_new(io::stdout(), &schema)?; - let options = write::WriteOptions { compression: None }; - let mut writer = write::FileWriter::try_new( - writer, - &metadata.schema, - Some(metadata.ipc_schema.fields), - options, - )?; - - arrow_stream_reader.try_for_each(|batch| writer.write(&batch?.unwrap(), None))?; + arrow_stream_reader.try_for_each(|batch| writer.write(&batch?))?; writer.finish()?; Ok(()) diff --git a/integration-testing/src/bin/flight-test-integration-client.rs b/integration-testing/src/bin/flight-test-integration-client.rs index b7bb5d1d79f..f380627b04c 100644 --- a/integration-testing/src/bin/flight-test-integration-client.rs +++ b/integration-testing/src/bin/flight-test-integration-client.rs @@ -16,42 +16,54 @@ // under the License. use arrow_integration_testing::flight_client_scenarios; - -use clap::{App, Arg}; - +use clap::Parser; type Error = Box; type Result = std::result::Result; +#[derive(clap::ArgEnum, Debug, Clone)] +enum Scenario { + Middleware, + #[clap(name = "auth:basic_proto")] + AuthBasicProto, +} + +#[derive(Debug, Parser)] +#[clap(author, version, about("rust flight-test-integration-client"), long_about = None)] +struct Args { + #[clap(long, help = "host of flight server")] + host: String, + #[clap(long, help = "port of flight server")] + port: u16, + #[clap( + short, + long, + help = "path to the descriptor file, only used when scenario is not provided" + )] + path: Option, + #[clap(long, arg_enum)] + scenario: Option, +} + #[tokio::main] async fn main() -> Result { #[cfg(feature = "logging")] tracing_subscriber::fmt::init(); - let matches = App::new("rust flight-test-integration-client") - .arg(Arg::with_name("host").long("host").takes_value(true)) - .arg(Arg::with_name("port").long("port").takes_value(true)) - .arg(Arg::with_name("path").long("path").takes_value(true)) - .arg( - Arg::with_name("scenario") - .long("scenario") - .takes_value(true), - ) - .get_matches(); + let args = Args::parse(); + let host = args.host; + let port = args.port; - let host = matches.value_of("host").expect("Host is required"); - let port = matches.value_of("port").expect("Port is required"); - - match matches.value_of("scenario") { - Some("middleware") => flight_client_scenarios::middleware::run_scenario(host, port).await?, - Some("auth:basic_proto") => { - flight_client_scenarios::auth_basic_proto::run_scenario(host, port).await? + match args.scenario { + Some(Scenario::Middleware) => { + flight_client_scenarios::middleware::run_scenario(&host, port).await? + } + Some(Scenario::AuthBasicProto) => { + flight_client_scenarios::auth_basic_proto::run_scenario(&host, port).await? } - Some(scenario_name) => unimplemented!("Scenario not found: {}", scenario_name), None => { - let path = matches - .value_of("path") - .expect("Path is required if scenario is not specified"); - flight_client_scenarios::integration_test::run_scenario(host, port, path).await?; + let path = args.path.expect("No path is given"); + flight_client_scenarios::integration_test::run_scenario(&host, port, &path) + .await?; } } diff --git a/integration-testing/src/bin/flight-test-integration-server.rs b/integration-testing/src/bin/flight-test-integration-server.rs index 45a08080499..6ed22ad81d9 100644 --- a/integration-testing/src/bin/flight-test-integration-server.rs +++ b/integration-testing/src/bin/flight-test-integration-server.rs @@ -15,36 +15,43 @@ // specific language governing permissions and limitations // under the License. -use clap::{App, Arg}; - use arrow_integration_testing::flight_server_scenarios; +use clap::Parser; type Error = Box; type Result = std::result::Result; +#[derive(clap::ArgEnum, Debug, Clone)] +enum Scenario { + Middleware, + #[clap(name = "auth:basic_proto")] + AuthBasicProto, +} + +#[derive(Debug, Parser)] +#[clap(author, version, about("rust flight-test-integration-server"), long_about = None)] +struct Args { + #[clap(long)] + port: u16, + #[clap(long, arg_enum)] + scenario: Option, +} + #[tokio::main] async fn main() -> Result { #[cfg(feature = "logging")] tracing_subscriber::fmt::init(); - let matches = App::new("rust flight-test-integration-server") - .about("Integration testing server for Flight.") - .arg(Arg::with_name("port").long("port").takes_value(true)) - .arg( - Arg::with_name("scenario") - .long("scenario") - .takes_value(true), - ) - .get_matches(); - - let port = matches.value_of("port").unwrap_or("0"); + let args = Args::parse(); + let port = args.port; - match matches.value_of("scenario") { - Some("middleware") => flight_server_scenarios::middleware::scenario_setup(port).await?, - Some("auth:basic_proto") => { + match args.scenario { + Some(Scenario::Middleware) => { + flight_server_scenarios::middleware::scenario_setup(port).await? + } + Some(Scenario::AuthBasicProto) => { flight_server_scenarios::auth_basic_proto::scenario_setup(port).await? } - Some(scenario_name) => unimplemented!("Scenario not found: {}", scenario_name), None => { flight_server_scenarios::integration_test::scenario_setup(port).await?; }