From ce30fb3a4df205a805775f8627c072ab6cff8f55 Mon Sep 17 00:00:00 2001 From: Chojan Shang Date: Fri, 29 Oct 2021 11:49:52 +0800 Subject: [PATCH] Bumped prost and tonic (#550) --- .github/workflows/test.yml | 6 ++-- Cargo.toml | 4 +-- integration-testing/Cargo.toml | 8 ++--- .../src/bin/flight-test-integration-client.rs | 7 ++--- .../src/bin/flight-test-integration-server.rs | 4 +-- .../auth_basic_proto.rs | 12 ++------ .../integration_test.rs | 26 +++++++++------- .../src/flight_client_scenarios/middleware.rs | 4 +-- .../auth_basic_proto.rs | 30 +++++++------------ .../integration_test.rs | 26 +++++----------- .../src/flight_server_scenarios/middleware.rs | 5 ++-- 11 files changed, 53 insertions(+), 79 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index c588c236422..1e147fcc9f5 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -74,7 +74,7 @@ jobs: - uses: actions/checkout@v2 - uses: actions-rs/toolchain@v1 with: - toolchain: nightly-2021-07-09 + toolchain: nightly-2021-10-24 override: true - uses: Swatinem/rust-cache@v1 - name: Install Miri @@ -93,7 +93,7 @@ jobs: - uses: actions/checkout@v2 - uses: actions-rs/toolchain@v1 with: - toolchain: nightly-2021-07-09 + toolchain: nightly-2021-10-24 override: true - uses: Swatinem/rust-cache@v1 - name: Install Miri @@ -112,6 +112,8 @@ jobs: - uses: actions/checkout@v2 with: submodules: true + - name: Install Rust + run: rustup update stable - name: Setup parquet files run: | apt update && apt install python3-pip python3-venv -y -q diff --git a/Cargo.toml b/Cargo.toml index 73642d0f39c..b33954ec06f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,7 @@ bench = false [dependencies] num-traits = "0.2" chrono = { version = "0.4", default_features = false, features = ["std"] } -chrono-tz = { version = "0.5", optional = true } +chrono-tz = { version = "0.6", optional = true } # To efficiently cast numbers to strings lexical-core = { version = "0.8", optional = true } # We need to Hash values before sending them to an hasher. This @@ -39,7 +39,7 @@ indexmap = { version = "^1.6", optional = true } # used to print columns in a nice columnar format comfy-table = { version = "4.0", optional = true, default-features = false } -arrow-format = { version = "*", optional = true, features = ["ipc"] } +arrow-format = { version = "0.3.0", optional = true, features = ["ipc"] } hex = { version = "^0.4", optional = true } diff --git a/integration-testing/Cargo.toml b/integration-testing/Cargo.toml index 9c2782e760c..a9487cb37bb 100644 --- a/integration-testing/Cargo.toml +++ b/integration-testing/Cargo.toml @@ -29,15 +29,15 @@ logging = ["tracing-subscriber"] [dependencies] arrow2 = { path = "../", features = ["io_ipc", "io_ipc_compression", "io_flight", "io_json_integration"] } -arrow-format = { version = "*", features = ["ipc", "flight-service"] } +arrow-format = { version = "0.3.0", features = ["full"] } async-trait = "0.1.41" clap = "2.33" futures = "0.3" hex = "0.4" -prost = "0.8" +prost = "0.9" serde = { version = "1.0", features = ["rc"] } serde_derive = "1.0" serde_json = { version = "1.0", features = ["preserve_order"] } tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread"] } -tonic = "0.5.2" -tracing-subscriber = { version = "0.2.15", optional = true } +tonic = "0.6.0" +tracing-subscriber = { version = "0.3.1", optional = true } diff --git a/integration-testing/src/bin/flight-test-integration-client.rs b/integration-testing/src/bin/flight-test-integration-client.rs index 1901553109f..b7bb5d1d79f 100644 --- a/integration-testing/src/bin/flight-test-integration-client.rs +++ b/integration-testing/src/bin/flight-test-integration-client.rs @@ -42,9 +42,7 @@ async fn main() -> Result { let port = matches.value_of("port").expect("Port is required"); match matches.value_of("scenario") { - Some("middleware") => { - flight_client_scenarios::middleware::run_scenario(host, port).await? - } + Some("middleware") => flight_client_scenarios::middleware::run_scenario(host, port).await?, Some("auth:basic_proto") => { flight_client_scenarios::auth_basic_proto::run_scenario(host, port).await? } @@ -53,8 +51,7 @@ async fn main() -> Result { let path = matches .value_of("path") .expect("Path is required if scenario is not specified"); - flight_client_scenarios::integration_test::run_scenario(host, port, path) - .await?; + flight_client_scenarios::integration_test::run_scenario(host, port, path).await?; } } diff --git a/integration-testing/src/bin/flight-test-integration-server.rs b/integration-testing/src/bin/flight-test-integration-server.rs index b1b280743c3..45a08080499 100644 --- a/integration-testing/src/bin/flight-test-integration-server.rs +++ b/integration-testing/src/bin/flight-test-integration-server.rs @@ -40,9 +40,7 @@ async fn main() -> Result { let port = matches.value_of("port").unwrap_or("0"); match matches.value_of("scenario") { - Some("middleware") => { - flight_server_scenarios::middleware::scenario_setup(port).await? - } + Some("middleware") => flight_server_scenarios::middleware::scenario_setup(port).await?, Some("auth:basic_proto") => { flight_server_scenarios::auth_basic_proto::scenario_setup(port).await? } diff --git a/integration-testing/src/flight_client_scenarios/auth_basic_proto.rs b/integration-testing/src/flight_client_scenarios/auth_basic_proto.rs index 1b07e6358d5..58258e2164c 100644 --- a/integration-testing/src/flight_client_scenarios/auth_basic_proto.rs +++ b/integration-testing/src/flight_client_scenarios/auth_basic_proto.rs @@ -17,10 +17,8 @@ use crate::{AUTH_PASSWORD, AUTH_USERNAME}; -use arrow_format::flight::data::{Action, HandshakeRequest, BasicAuth}; -use arrow_format::flight::service::{ - flight_service_client::FlightServiceClient, -}; +use arrow_format::flight::data::{Action, BasicAuth, HandshakeRequest}; +use arrow_format::flight::service::flight_service_client::FlightServiceClient; use futures::{stream, StreamExt}; use prost::Message; use tonic::{metadata::MetadataValue, Request, Status}; @@ -81,11 +79,7 @@ pub async fn run_scenario(host: &str, port: &str) -> Result { Ok(()) } -async fn authenticate( - client: &mut Client, - username: &str, - password: &str, -) -> Result { +async fn authenticate(client: &mut Client, username: &str, password: &str) -> Result { let auth = BasicAuth { username: username.into(), password: password.into(), diff --git a/integration-testing/src/flight_client_scenarios/integration_test.rs b/integration-testing/src/flight_client_scenarios/integration_test.rs index 98fe0c2192b..7c67b34096e 100644 --- a/integration-testing/src/flight_client_scenarios/integration_test.rs +++ b/integration-testing/src/flight_client_scenarios/integration_test.rs @@ -17,14 +17,19 @@ use crate::{read_json_file, ArrowFile}; -use arrow2::{array::*, datatypes::*, io::flight::{self, deserialize_batch, serialize_batch}, io::ipc::{read, write}, record_batch::RecordBatch}; -use arrow_format::ipc; -use arrow_format::ipc::Message::MessageHeader; +use arrow2::{ + array::*, + datatypes::*, + io::flight::{self, deserialize_batch, serialize_batch}, + io::ipc::{read, write}, + record_batch::RecordBatch, +}; use arrow_format::flight::data::{ - flight_descriptor::DescriptorType, - FlightData, FlightDescriptor, Location, Ticket, + flight_descriptor::DescriptorType, FlightData, FlightDescriptor, Location, Ticket, }; use arrow_format::flight::service::flight_service_client::FlightServiceClient; +use arrow_format::ipc; +use arrow_format::ipc::Message::MessageHeader; use futures::{channel::mpsc, sink::SinkExt, stream, StreamExt}; use tonic::{Request, Streaming}; @@ -126,8 +131,7 @@ async fn send_batch( batch: &RecordBatch, options: &write::IpcWriteOptions, ) -> Result { - let (dictionary_flight_data, mut batch_flight_data) = - serialize_batch(batch, options); + let (dictionary_flight_data, mut batch_flight_data) = serialize_batch(batch, options); upload_tx .send_all(&mut stream::iter(dictionary_flight_data).map(Ok)) @@ -210,9 +214,8 @@ async fn consume_flight_location( let metadata = counter.to_string().into_bytes(); assert_eq!(metadata, data.app_metadata); - let actual_batch = - deserialize_batch(&data, schema.clone(), true, &dictionaries_by_field) - .expect("Unable to convert flight data to Arrow batch"); + let actual_batch = deserialize_batch(&data, schema.clone(), true, &dictionaries_by_field) + .expect("Unable to convert flight data to Arrow batch"); assert_eq!(expected_batch.schema(), actual_batch.schema()); assert_eq!(expected_batch.num_columns(), actual_batch.num_columns()); @@ -262,7 +265,8 @@ async fn receive_batch_flight_data( .expect("Error reading dictionary"); data = resp.next().await?.ok()?; - message = ipc::Message::root_as_message(&data.data_header[..]).expect("Error parsing message"); + message = + ipc::Message::root_as_message(&data.data_header[..]).expect("Error parsing message"); } Some(data) diff --git a/integration-testing/src/flight_client_scenarios/middleware.rs b/integration-testing/src/flight_client_scenarios/middleware.rs index 0694fef55c7..f67580ada87 100644 --- a/integration-testing/src/flight_client_scenarios/middleware.rs +++ b/integration-testing/src/flight_client_scenarios/middleware.rs @@ -15,10 +15,8 @@ // specific language governing permissions and limitations // under the License. -use arrow_format::flight::service::{ - flight_service_client::FlightServiceClient, -}; use arrow_format::flight::data::{flight_descriptor::DescriptorType, FlightDescriptor}; +use arrow_format::flight::service::flight_service_client::FlightServiceClient; use tonic::{Request, Status}; type Error = Box; diff --git a/integration-testing/src/flight_server_scenarios/auth_basic_proto.rs b/integration-testing/src/flight_server_scenarios/auth_basic_proto.rs index 5223aaa297b..baa181e4efc 100644 --- a/integration-testing/src/flight_server_scenarios/auth_basic_proto.rs +++ b/integration-testing/src/flight_server_scenarios/auth_basic_proto.rs @@ -18,13 +18,11 @@ use std::pin::Pin; use std::sync::Arc; -use arrow_format::flight::service::flight_service_server::{FlightService, FlightServiceServer}; use arrow_format::flight::data::*; +use arrow_format::flight::service::flight_service_server::{FlightService, FlightServiceServer}; use futures::{channel::mpsc, sink::SinkExt, Stream, StreamExt}; use tokio::sync::Mutex; -use tonic::{ - metadata::MetadataMap, transport::Server, Request, Response, Status, Streaming, -}; +use tonic::{metadata::MetadataMap, transport::Server, Request, Response, Status, Streaming}; type TonicStream = Pin + Send + Sync + 'static>>; @@ -60,10 +58,7 @@ pub struct AuthBasicProtoScenarioImpl { } impl AuthBasicProtoScenarioImpl { - async fn check_auth( - &self, - metadata: &MetadataMap, - ) -> Result { + async fn check_auth(&self, metadata: &MetadataMap) -> Result { let token = metadata .get_bin("auth-token-bin") .and_then(|v| v.to_bytes().ok()) @@ -71,10 +66,7 @@ impl AuthBasicProtoScenarioImpl { self.is_valid(token).await } - async fn is_valid( - &self, - token: Option, - ) -> Result { + async fn is_valid(&self, token: Option) -> Result { match token { Some(t) if t == *self.username => Ok(GrpcServerCallContext { peer_identity: self.username.to_string(), @@ -139,12 +131,10 @@ impl FlightService for AuthBasicProtoScenarioImpl { let req = req.expect("Error reading handshake request"); let HandshakeRequest { payload, .. } = req; - let auth = BasicAuth::decode(&*payload) - .expect("Error parsing handshake request"); + let auth = + BasicAuth::decode(&*payload).expect("Error parsing handshake request"); - let resp = if *auth.username == *username - && *auth.password == *password - { + let resp = if *auth.username == *username && *auth.password == *password { Ok(HandshakeResponse { payload: username.as_bytes().to_vec(), ..HandshakeResponse::default() @@ -189,7 +179,8 @@ impl FlightService for AuthBasicProtoScenarioImpl { &self, request: Request>, ) -> Result, Status> { - self.check_auth(request.metadata()).await?; + let metadata = request.metadata(); + self.check_auth(metadata).await?; Err(Status::unimplemented("Not yet implemented")) } @@ -217,7 +208,8 @@ impl FlightService for AuthBasicProtoScenarioImpl { &self, request: Request>, ) -> Result, Status> { - self.check_auth(request.metadata()).await?; + let metadata = request.metadata(); + self.check_auth(metadata).await?; Err(Status::unimplemented("Not yet implemented")) } } diff --git a/integration-testing/src/flight_server_scenarios/integration_test.rs b/integration-testing/src/flight_server_scenarios/integration_test.rs index 28ded196fc0..4d3349bff44 100644 --- a/integration-testing/src/flight_server_scenarios/integration_test.rs +++ b/integration-testing/src/flight_server_scenarios/integration_test.rs @@ -16,23 +16,20 @@ // under the License. use std::collections::HashMap; +use std::convert::TryFrom; use std::pin::Pin; use std::sync::Arc; -use std::convert::TryFrom; use arrow2::io::flight::{serialize_batch, serialize_schema}; +use arrow_format::flight::data::*; use arrow_format::flight::data::flight_descriptor::*; use arrow_format::flight::service::flight_service_server::*; -use arrow_format::flight::data::*; +use arrow_format::ipc::Message::{root_as_message, Message, MessageHeader}; use arrow_format::ipc::Schema as ArrowSchema; -use arrow_format::ipc::Message::{Message, MessageHeader, root_as_message}; use arrow2::{ - array::Array, - datatypes::*, + array::Array, datatypes::*, io::flight::serialize_schema_to_info, io::ipc, record_batch::RecordBatch, - io::ipc, - io::flight::serialize_schema_to_info }; use futures::{channel::mpsc, sink::SinkExt, Stream, StreamExt}; @@ -113,12 +110,7 @@ impl FlightService for FlightServiceImpl { let options = ipc::write::IpcWriteOptions::default(); - let schema = std::iter::once({ - Ok(serialize_schema( - &flight.schema, - &options, - )) - }); + let schema = std::iter::once(Ok(serialize_schema(&flight.schema, &options))); let batches = flight .chunks @@ -180,12 +172,10 @@ impl FlightService for FlightServiceImpl { let total_records: usize = flight.chunks.iter().map(|chunk| chunk.num_rows()).sum(); let options = ipc::write::IpcWriteOptions::default(); - let schema = - serialize_schema_to_info(&flight.schema, &options) - .expect( - "Could not generate schema bytes from schema stored by a DoPut; \ + let schema = serialize_schema_to_info(&flight.schema, &options).expect( + "Could not generate schema bytes from schema stored by a DoPut; \ this should be impossible", - ); + ); let info = FlightInfo { schema, diff --git a/integration-testing/src/flight_server_scenarios/middleware.rs b/integration-testing/src/flight_server_scenarios/middleware.rs index ed686cfc7a4..ceffe5a8c9b 100644 --- a/integration-testing/src/flight_server_scenarios/middleware.rs +++ b/integration-testing/src/flight_server_scenarios/middleware.rs @@ -17,9 +17,9 @@ use std::pin::Pin; +use arrow_format::flight::data::*; use arrow_format::flight::data::flight_descriptor::DescriptorType; use arrow_format::flight::service::flight_service_server::{FlightService, FlightServiceServer}; -use arrow_format::flight::data::*; use futures::Stream; use tonic::{transport::Server, Request, Response, Status, Streaming}; @@ -90,8 +90,7 @@ impl FlightService for MiddlewareScenarioImpl { let descriptor = request.into_inner(); - if descriptor.r#type == DescriptorType::Cmd as i32 && descriptor.cmd == b"success" - { + if descriptor.r#type == DescriptorType::Cmd as i32 && descriptor.cmd == b"success" { // Return a fake location - the test doesn't read it let endpoint = super::endpoint("foo", "grpc+tcp://localhost:10010");