From 37f6733f85a42e828c124026c3a0f21919549b12 Mon Sep 17 00:00:00 2001 From: "T.J. Telan" Date: Thu, 20 Aug 2020 12:11:31 -0700 Subject: [PATCH] feat(transport): Add `Router::into_service` (#419) Co-authored-by: Danny Hua --- examples/Cargo.toml | 8 + examples/src/hyper_warp_multiplex/client.rs | 61 +++++++ examples/src/hyper_warp_multiplex/server.rs | 181 ++++++++++++++++++++ tonic/src/transport/server/mod.rs | 43 ++++- 4 files changed, 292 insertions(+), 1 deletion(-) create mode 100644 examples/src/hyper_warp_multiplex/client.rs create mode 100644 examples/src/hyper_warp_multiplex/server.rs diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 24dcce2cb..483cf2545 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -126,6 +126,14 @@ path = "src/autoreload/server.rs" name = "optional-server" path = "src/optional/server.rs" +[[bin]] +name = "hyper-warp-multiplex-client" +path = "src/hyper_warp_multiplex/client.rs" + +[[bin]] +name = "hyper-warp-multiplex-server" +path = "src/hyper_warp_multiplex/server.rs" + [dependencies] tonic = { path = "../tonic", features = ["tls"] } prost = "0.6" diff --git a/examples/src/hyper_warp_multiplex/client.rs b/examples/src/hyper_warp_multiplex/client.rs new file mode 100644 index 000000000..62faa213c --- /dev/null +++ b/examples/src/hyper_warp_multiplex/client.rs @@ -0,0 +1,61 @@ +//! To hit the gRPC endpoint you must run this client via: +//! `cargo run --bin hyper-warp-multiplex-client +//! To hit the warp server you can run this command: +//! `curl localhost:50051/hello` + +pub mod hello_world { + tonic::include_proto!("helloworld"); +} + +pub mod echo { + tonic::include_proto!("grpc.examples.echo"); +} + +use echo::{echo_client::EchoClient, EchoRequest}; +use hello_world::greeter_client::GreeterClient; +use hello_world::HelloRequest; +use hyper::{Client, Uri}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let client = Client::builder().http2_only(true).build_http(); + + let uri = Uri::from_static("http://[::1]:50051"); + + // Hyper's client requires that requests contain full Uris include a scheme and + // an authority. Tonic's transport will handle this for you but when using the client + // manually you need ensure the uri's are set correctly. + let add_origin = tower::service_fn(|mut req: hyper::Request| { + let uri = Uri::builder() + .scheme(uri.scheme().unwrap().clone()) + .authority(uri.authority().unwrap().clone()) + .path_and_query(req.uri().path_and_query().unwrap().clone()) + .build() + .unwrap(); + + *req.uri_mut() = uri; + + client.request(req) + }); + + let mut greeter_client = GreeterClient::new(add_origin); + let mut echo_client = EchoClient::new(add_origin); + + let request = tonic::Request::new(HelloRequest { + name: "Tonic".into(), + }); + + let response = greeter_client.say_hello(request).await?; + + println!("GREETER RESPONSE={:?}", response); + + let request = tonic::Request::new(EchoRequest { + message: "hello".into(), + }); + + let response = echo_client.unary_echo(request).await?; + + println!("ECHO RESPONSE={:?}", response); + + Ok(()) +} diff --git a/examples/src/hyper_warp_multiplex/server.rs b/examples/src/hyper_warp_multiplex/server.rs new file mode 100644 index 000000000..cbcccc91c --- /dev/null +++ b/examples/src/hyper_warp_multiplex/server.rs @@ -0,0 +1,181 @@ +//! To hit the gRPC endpoint you must run this client via: +//! `cargo run --bin hyper-warp-multiplex-client +//! To hit the warp server you can run this command: +//! `curl localhost:50051/hello` + +use futures::future::{self, Either, TryFutureExt}; +use futures::Stream; +use http::version::Version; +use hyper::{service::make_service_fn, Server}; +use std::convert::Infallible; +use std::{ + pin::Pin, + task::{Context, Poll}, +}; +use tonic::{transport::Server as TonicServer, Request, Response, Status}; +use tower::Service; +use warp::Filter; + +type Error = Box; + +pub mod hello_world { + tonic::include_proto!("helloworld"); +} + +pub mod echo { + tonic::include_proto!("grpc.examples.echo"); +} +use hello_world::{ + greeter_server::{Greeter, GreeterServer}, + HelloReply, HelloRequest, +}; + +use echo::{ + echo_server::{Echo, EchoServer}, + EchoRequest, EchoResponse, +}; + +type ResponseStream = Pin> + Send + Sync>>; + +#[derive(Default)] +pub struct MyGreeter {} + +#[tonic::async_trait] +impl Greeter for MyGreeter { + async fn say_hello( + &self, + request: Request, + ) -> Result, Status> { + let reply = hello_world::HelloReply { + message: format!("Hello {}!", request.into_inner().name), + }; + Ok(Response::new(reply)) + } +} + +#[derive(Default)] +pub struct MyEcho; + +#[tonic::async_trait] +impl Echo for MyEcho { + async fn unary_echo( + &self, + request: Request, + ) -> Result, Status> { + let message = request.into_inner().message; + Ok(Response::new(EchoResponse { message })) + } + + type ServerStreamingEchoStream = ResponseStream; + + async fn server_streaming_echo( + &self, + _: Request, + ) -> Result, Status> { + Err(Status::unimplemented("Not yet implemented")) + } + + async fn client_streaming_echo( + &self, + _: Request>, + ) -> Result, Status> { + Err(Status::unimplemented("Not yet implemented")) + } + + type BidirectionalStreamingEchoStream = ResponseStream; + + async fn bidirectional_streaming_echo( + &self, + _: Request>, + ) -> Result, Status> { + Err(Status::unimplemented("Not yet implemented")) + } +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let addr = "[::1]:50051".parse().unwrap(); + + //println!("GreeterServer listening on {}", addr); + + let warp = warp::service(warp::path("hello").map(|| "hello, world!")); + + Server::bind(&addr) + .serve(make_service_fn(move |_| { + let greeter = GreeterServer::new(MyGreeter::default()); + let echo = EchoServer::new(MyEcho::default()); + + let mut tonic = TonicServer::builder() + .add_service(greeter) + .add_service(echo) + .into_service(); + + let mut warp = warp.clone(); + future::ok::<_, Infallible>(tower::service_fn( + move |req: hyper::Request| match req.version() { + Version::HTTP_11 | Version::HTTP_10 => Either::Left( + warp.call(req) + .map_ok(|res| res.map(EitherBody::Left)) + .map_err(Error::from), + ), + Version::HTTP_2 => Either::Right( + tonic + .call(req) + .map_ok(|res| res.map(EitherBody::Right)) + .map_err(Error::from), + ), + _ => unimplemented!(), + }, + )) + })) + .await?; + + Ok(()) +} + +enum EitherBody { + Left(A), + Right(B), +} + +impl http_body::Body for EitherBody +where + A: http_body::Body + Send + Unpin, + B: http_body::Body + Send + Unpin, + A::Error: Into, + B::Error: Into, +{ + type Data = A::Data; + type Error = Box; + + fn is_end_stream(&self) -> bool { + match self { + EitherBody::Left(b) => b.is_end_stream(), + EitherBody::Right(b) => b.is_end_stream(), + } + } + + fn poll_data( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + match self.get_mut() { + EitherBody::Left(b) => Pin::new(b).poll_data(cx).map(map_option_err), + EitherBody::Right(b) => Pin::new(b).poll_data(cx).map(map_option_err), + } + } + + fn poll_trailers( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>> { + match self.get_mut() { + EitherBody::Left(b) => Pin::new(b).poll_trailers(cx).map_err(Into::into), + EitherBody::Right(b) => Pin::new(b).poll_trailers(cx).map_err(Into::into), + } + } +} + +fn map_option_err>(err: Option>) -> Option> { + err.map(|e| e.map_err(Into::into)) +} diff --git a/tonic/src/transport/server/mod.rs b/tonic/src/transport/server/mod.rs index b5b292fae..ba107e491 100644 --- a/tonic/src/transport/server/mod.rs +++ b/tonic/src/transport/server/mod.rs @@ -25,7 +25,7 @@ use super::service::{Or, Routes, ServerIo, ServiceBuilderExt}; use crate::{body::BoxBody, request::ConnectionInfo}; use futures_core::Stream; use futures_util::{ - future::{self, MapErr}, + future::{self, Either as FutureEither, MapErr}, TryFutureExt, }; use http::{HeaderMap, Request, Response}; @@ -78,6 +78,42 @@ pub struct Router { routes: Routes>, } +/// A service that is produced from a Tonic `Router`. +/// +/// This service implementation will route between multiple Tonic +/// gRPC endpoints and can be consumed with the rest of the `tower` +/// ecosystem. +#[derive(Debug)] +pub struct RouterService { + router: Router, +} + +impl Service> for RouterService +where + A: Service, Response = Response> + Clone + Send + 'static, + A::Future: Send + 'static, + A::Error: Into + Send, + B: Service, Response = Response> + Clone + Send + 'static, + B::Future: Send + 'static, + B::Error: Into + Send, +{ + type Response = Response; + type Future = FutureEither< + MapErr crate::Error>, + MapErr crate::Error>, + >; + type Error = crate::Error; + + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + #[inline] + fn call(&mut self, req: Request) -> Self::Future { + self.router.routes.call(req) + } +} + /// A trait to provide a static reference to the service's /// name. This is used for routing service's within the router. pub trait NamedService { @@ -476,6 +512,11 @@ where .serve_with_shutdown(self.routes, incoming, Some(signal)) .await } + + /// Create a tower service out of a router. + pub fn into_service(self) -> RouterService { + RouterService { router: self } + } } impl fmt::Debug for Server {