From 5187d3d5275a019a5aae5fc72057aff37464d546 Mon Sep 17 00:00:00 2001 From: Chojan Shang Date: Thu, 26 Aug 2021 21:36:22 +0800 Subject: [PATCH 1/2] Bumped tonic & prost Signed-off-by: Chojan Shang --- arrow-flight/Cargo.toml | 6 +- arrow-flight/src/arrow.flight.protocol.rs | 212 +++++++++++++--------- 2 files changed, 125 insertions(+), 93 deletions(-) diff --git a/arrow-flight/Cargo.toml b/arrow-flight/Cargo.toml index a8b0057a9a4..ad423b6d0d4 100644 --- a/arrow-flight/Cargo.toml +++ b/arrow-flight/Cargo.toml @@ -27,10 +27,10 @@ license = "Apache-2.0" [dependencies] arrow2 = { path = "../", features = ["io_ipc"], default-features = false } -tonic = "0.4" +tonic = "0.5.2" bytes = "1" -prost = "0.7" -prost-derive = "0.7" +prost = "0.8.0" +prost-derive = "0.8.0" tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread"] } futures = { version = "0.3", default-features = false, features = ["alloc"]} diff --git a/arrow-flight/src/arrow.flight.protocol.rs b/arrow-flight/src/arrow.flight.protocol.rs index 5fce526ff6e..5db746f6fda 100644 --- a/arrow-flight/src/arrow.flight.protocol.rs +++ b/arrow-flight/src/arrow.flight.protocol.rs @@ -204,13 +204,14 @@ pub struct PutResult { } #[doc = r" Generated client implementations."] pub mod flight_service_client { - #![allow(unused_variables, dead_code, missing_docs)] + #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] use tonic::codegen::*; #[doc = ""] #[doc = " A flight service is an endpoint for retrieving or storing Arrow data. A"] #[doc = " flight service can expose one or more predefined endpoints that can be"] #[doc = " accessed using the Arrow Flight Protocol. Additionally, a flight service"] #[doc = " can expose a set of actions that are available."] + #[derive(Debug, Clone)] pub struct FlightServiceClient { inner: tonic::client::Grpc, } @@ -228,20 +229,43 @@ pub mod flight_service_client { impl FlightServiceClient where T: tonic::client::GrpcService, - T::ResponseBody: Body + HttpBody + Send + 'static, + T::ResponseBody: Body + Send + Sync + 'static, T::Error: Into, - ::Error: Into + Send, + ::Error: Into + Send, { pub fn new(inner: T) -> Self { let inner = tonic::client::Grpc::new(inner); Self { inner } } - pub fn with_interceptor( + pub fn with_interceptor( inner: T, - interceptor: impl Into, - ) -> Self { - let inner = tonic::client::Grpc::with_interceptor(inner, interceptor); - Self { inner } + interceptor: F, + ) -> FlightServiceClient> + where + F: tonic::service::Interceptor, + T: tonic::codegen::Service< + http::Request, + Response = http::Response< + >::ResponseBody, + >, + >, + >>::Error: + Into + Send + Sync, + { + FlightServiceClient::new(InterceptedService::new(inner, interceptor)) + } + #[doc = r" Compress requests with `gzip`."] + #[doc = r""] + #[doc = r" This requires the server to support it otherwise it might respond with an"] + #[doc = r" error."] + pub fn send_gzip(mut self) -> Self { + self.inner = self.inner.send_gzip(); + self + } + #[doc = r" Enable decompressing responses with `gzip`."] + pub fn accept_gzip(mut self) -> Self { + self.inner = self.inner.accept_gzip(); + self } #[doc = ""] #[doc = " Handshake between client and server. Depending on the server, the"] @@ -478,22 +502,10 @@ pub mod flight_service_client { .await } } - impl Clone for FlightServiceClient { - fn clone(&self) -> Self { - Self { - inner: self.inner.clone(), - } - } - } - impl std::fmt::Debug for FlightServiceClient { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "FlightServiceClient {{ ... }}") - } - } } #[doc = r" Generated server implementations."] pub mod flight_service_server { - #![allow(unused_variables, dead_code, missing_docs)] + #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] use tonic::codegen::*; #[doc = "Generated trait containing gRPC methods that should be implemented for use with FlightServiceServer."] #[async_trait] @@ -635,27 +647,34 @@ pub mod flight_service_server { #[derive(Debug)] pub struct FlightServiceServer { inner: _Inner, + accept_compression_encodings: (), + send_compression_encodings: (), } - struct _Inner(Arc, Option); + struct _Inner(Arc); impl FlightServiceServer { pub fn new(inner: T) -> Self { let inner = Arc::new(inner); - let inner = _Inner(inner, None); - Self { inner } + let inner = _Inner(inner); + Self { + inner, + accept_compression_encodings: Default::default(), + send_compression_encodings: Default::default(), + } } - pub fn with_interceptor( + pub fn with_interceptor( inner: T, - interceptor: impl Into, - ) -> Self { - let inner = Arc::new(inner); - let inner = _Inner(inner, Some(interceptor.into())); - Self { inner } + interceptor: F, + ) -> InterceptedService + where + F: tonic::service::Interceptor, + { + InterceptedService::new(Self::new(inner), interceptor) } } - impl Service> for FlightServiceServer + impl tonic::codegen::Service> for FlightServiceServer where T: FlightService, - B: HttpBody + Send + Sync + 'static, + B: Body + Send + Sync + 'static, B::Error: Into + Send + 'static, { type Response = http::Response; @@ -691,17 +710,18 @@ pub mod flight_service_server { Box::pin(fut) } } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; let inner = self.inner.clone(); let fut = async move { - let interceptor = inner.1; let inner = inner.0; let method = HandshakeSvc(inner); let codec = tonic::codec::ProstCodec::default(); - let mut grpc = if let Some(interceptor) = interceptor { - tonic::server::Grpc::with_interceptor(codec, interceptor) - } else { - tonic::server::Grpc::new(codec) - }; + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ); let res = grpc.streaming(method, req).await; Ok(res) }; @@ -729,17 +749,18 @@ pub mod flight_service_server { Box::pin(fut) } } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; let inner = self.inner.clone(); let fut = async move { - let interceptor = inner.1; let inner = inner.0; let method = ListFlightsSvc(inner); let codec = tonic::codec::ProstCodec::default(); - let mut grpc = if let Some(interceptor) = interceptor { - tonic::server::Grpc::with_interceptor(codec, interceptor) - } else { - tonic::server::Grpc::new(codec) - }; + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ); let res = grpc.server_streaming(method, req).await; Ok(res) }; @@ -765,17 +786,18 @@ pub mod flight_service_server { Box::pin(fut) } } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; let inner = self.inner.clone(); let fut = async move { - let interceptor = inner.1.clone(); let inner = inner.0; let method = GetFlightInfoSvc(inner); let codec = tonic::codec::ProstCodec::default(); - let mut grpc = if let Some(interceptor) = interceptor { - tonic::server::Grpc::with_interceptor(codec, interceptor) - } else { - tonic::server::Grpc::new(codec) - }; + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -800,17 +822,18 @@ pub mod flight_service_server { Box::pin(fut) } } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; let inner = self.inner.clone(); let fut = async move { - let interceptor = inner.1.clone(); let inner = inner.0; let method = GetSchemaSvc(inner); let codec = tonic::codec::ProstCodec::default(); - let mut grpc = if let Some(interceptor) = interceptor { - tonic::server::Grpc::with_interceptor(codec, interceptor) - } else { - tonic::server::Grpc::new(codec) - }; + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -838,17 +861,18 @@ pub mod flight_service_server { Box::pin(fut) } } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; let inner = self.inner.clone(); let fut = async move { - let interceptor = inner.1; let inner = inner.0; let method = DoGetSvc(inner); let codec = tonic::codec::ProstCodec::default(); - let mut grpc = if let Some(interceptor) = interceptor { - tonic::server::Grpc::with_interceptor(codec, interceptor) - } else { - tonic::server::Grpc::new(codec) - }; + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ); let res = grpc.server_streaming(method, req).await; Ok(res) }; @@ -876,17 +900,18 @@ pub mod flight_service_server { Box::pin(fut) } } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; let inner = self.inner.clone(); let fut = async move { - let interceptor = inner.1; let inner = inner.0; let method = DoPutSvc(inner); let codec = tonic::codec::ProstCodec::default(); - let mut grpc = if let Some(interceptor) = interceptor { - tonic::server::Grpc::with_interceptor(codec, interceptor) - } else { - tonic::server::Grpc::new(codec) - }; + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ); let res = grpc.streaming(method, req).await; Ok(res) }; @@ -914,17 +939,18 @@ pub mod flight_service_server { Box::pin(fut) } } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; let inner = self.inner.clone(); let fut = async move { - let interceptor = inner.1; let inner = inner.0; let method = DoExchangeSvc(inner); let codec = tonic::codec::ProstCodec::default(); - let mut grpc = if let Some(interceptor) = interceptor { - tonic::server::Grpc::with_interceptor(codec, interceptor) - } else { - tonic::server::Grpc::new(codec) - }; + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ); let res = grpc.streaming(method, req).await; Ok(res) }; @@ -952,17 +978,18 @@ pub mod flight_service_server { Box::pin(fut) } } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; let inner = self.inner.clone(); let fut = async move { - let interceptor = inner.1; let inner = inner.0; let method = DoActionSvc(inner); let codec = tonic::codec::ProstCodec::default(); - let mut grpc = if let Some(interceptor) = interceptor { - tonic::server::Grpc::with_interceptor(codec, interceptor) - } else { - tonic::server::Grpc::new(codec) - }; + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ); let res = grpc.server_streaming(method, req).await; Ok(res) }; @@ -990,17 +1017,18 @@ pub mod flight_service_server { Box::pin(fut) } } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; let inner = self.inner.clone(); let fut = async move { - let interceptor = inner.1; let inner = inner.0; let method = ListActionsSvc(inner); let codec = tonic::codec::ProstCodec::default(); - let mut grpc = if let Some(interceptor) = interceptor { - tonic::server::Grpc::with_interceptor(codec, interceptor) - } else { - tonic::server::Grpc::new(codec) - }; + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ); let res = grpc.server_streaming(method, req).await; Ok(res) }; @@ -1011,7 +1039,7 @@ pub mod flight_service_server { .status(200) .header("grpc-status", "12") .header("content-type", "application/grpc") - .body(tonic::body::BoxBody::empty()) + .body(empty_body()) .unwrap()) }), } @@ -1020,12 +1048,16 @@ pub mod flight_service_server { impl Clone for FlightServiceServer { fn clone(&self) -> Self { let inner = self.inner.clone(); - Self { inner } + Self { + inner, + accept_compression_encodings: self.accept_compression_encodings, + send_compression_encodings: self.send_compression_encodings, + } } } impl Clone for _Inner { fn clone(&self) -> Self { - Self(self.0.clone(), self.1.clone()) + Self(self.0.clone()) } } impl std::fmt::Debug for _Inner { @@ -1036,4 +1068,4 @@ pub mod flight_service_server { impl tonic::transport::NamedService for FlightServiceServer { const NAME: &'static str = "arrow.flight.protocol.FlightService"; } -} +} \ No newline at end of file From 727c3b28658ffedff70269ce407060147cfe3eb4 Mon Sep 17 00:00:00 2001 From: Chojan Shang Date: Thu, 26 Aug 2021 23:02:36 +0800 Subject: [PATCH 2/2] Bumped tonic & prost in integration-testing Signed-off-by: Chojan Shang --- integration-testing/Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/integration-testing/Cargo.toml b/integration-testing/Cargo.toml index deedd34193b..f2c6e31d229 100644 --- a/integration-testing/Cargo.toml +++ b/integration-testing/Cargo.toml @@ -36,10 +36,10 @@ async-trait = "0.1.41" clap = "2.33" futures = "0.3" hex = "0.4" -prost = "0.7" +prost = "0.8" serde = { version = "1.0", features = ["rc"] } serde_derive = "1.0" serde_json = { version = "1.0", features = ["preserve_order"] } tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread"] } -tonic = "0.4" +tonic = "0.5.2" tracing-subscriber = { version = "0.2.15", optional = true }