Skip to content

Commit

Permalink
feat(transport): Add Router::into_service (#419)
Browse files Browse the repository at this point in the history
Co-authored-by: Danny Hua <[email protected]>
  • Loading branch information
tjtelan and danny-osmos authored Aug 20, 2020
1 parent 9085892 commit 37f6733
Show file tree
Hide file tree
Showing 4 changed files with 292 additions and 1 deletion.
8 changes: 8 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,14 @@ path = "src/autoreload/server.rs"
name = "optional-server"
path = "src/optional/server.rs"

[[bin]]
name = "hyper-warp-multiplex-client"
path = "src/hyper_warp_multiplex/client.rs"

[[bin]]
name = "hyper-warp-multiplex-server"
path = "src/hyper_warp_multiplex/server.rs"

[dependencies]
tonic = { path = "../tonic", features = ["tls"] }
prost = "0.6"
Expand Down
61 changes: 61 additions & 0 deletions examples/src/hyper_warp_multiplex/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
//! To hit the gRPC endpoint you must run this client via:
//! `cargo run --bin hyper-warp-multiplex-client
//! To hit the warp server you can run this command:
//! `curl localhost:50051/hello`
pub mod hello_world {
tonic::include_proto!("helloworld");
}

pub mod echo {
tonic::include_proto!("grpc.examples.echo");
}

use echo::{echo_client::EchoClient, EchoRequest};
use hello_world::greeter_client::GreeterClient;
use hello_world::HelloRequest;
use hyper::{Client, Uri};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = Client::builder().http2_only(true).build_http();

let uri = Uri::from_static("http://[::1]:50051");

// Hyper's client requires that requests contain full Uris include a scheme and
// an authority. Tonic's transport will handle this for you but when using the client
// manually you need ensure the uri's are set correctly.
let add_origin = tower::service_fn(|mut req: hyper::Request<tonic::body::BoxBody>| {
let uri = Uri::builder()
.scheme(uri.scheme().unwrap().clone())
.authority(uri.authority().unwrap().clone())
.path_and_query(req.uri().path_and_query().unwrap().clone())
.build()
.unwrap();

*req.uri_mut() = uri;

client.request(req)
});

let mut greeter_client = GreeterClient::new(add_origin);
let mut echo_client = EchoClient::new(add_origin);

let request = tonic::Request::new(HelloRequest {
name: "Tonic".into(),
});

let response = greeter_client.say_hello(request).await?;

println!("GREETER RESPONSE={:?}", response);

let request = tonic::Request::new(EchoRequest {
message: "hello".into(),
});

let response = echo_client.unary_echo(request).await?;

println!("ECHO RESPONSE={:?}", response);

Ok(())
}
181 changes: 181 additions & 0 deletions examples/src/hyper_warp_multiplex/server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
//! To hit the gRPC endpoint you must run this client via:
//! `cargo run --bin hyper-warp-multiplex-client
//! To hit the warp server you can run this command:
//! `curl localhost:50051/hello`
use futures::future::{self, Either, TryFutureExt};
use futures::Stream;
use http::version::Version;
use hyper::{service::make_service_fn, Server};
use std::convert::Infallible;
use std::{
pin::Pin,
task::{Context, Poll},
};
use tonic::{transport::Server as TonicServer, Request, Response, Status};
use tower::Service;
use warp::Filter;

type Error = Box<dyn std::error::Error + Send + Sync + 'static>;

pub mod hello_world {
tonic::include_proto!("helloworld");
}

pub mod echo {
tonic::include_proto!("grpc.examples.echo");
}
use hello_world::{
greeter_server::{Greeter, GreeterServer},
HelloReply, HelloRequest,
};

use echo::{
echo_server::{Echo, EchoServer},
EchoRequest, EchoResponse,
};

type ResponseStream = Pin<Box<dyn Stream<Item = Result<EchoResponse, Status>> + Send + Sync>>;

#[derive(Default)]
pub struct MyGreeter {}

#[tonic::async_trait]
impl Greeter for MyGreeter {
async fn say_hello(
&self,
request: Request<HelloRequest>,
) -> Result<Response<HelloReply>, Status> {
let reply = hello_world::HelloReply {
message: format!("Hello {}!", request.into_inner().name),
};
Ok(Response::new(reply))
}
}

#[derive(Default)]
pub struct MyEcho;

#[tonic::async_trait]
impl Echo for MyEcho {
async fn unary_echo(
&self,
request: Request<EchoRequest>,
) -> Result<Response<EchoResponse>, Status> {
let message = request.into_inner().message;
Ok(Response::new(EchoResponse { message }))
}

type ServerStreamingEchoStream = ResponseStream;

async fn server_streaming_echo(
&self,
_: Request<EchoRequest>,
) -> Result<Response<Self::ServerStreamingEchoStream>, Status> {
Err(Status::unimplemented("Not yet implemented"))
}

async fn client_streaming_echo(
&self,
_: Request<tonic::Streaming<EchoRequest>>,
) -> Result<Response<EchoResponse>, Status> {
Err(Status::unimplemented("Not yet implemented"))
}

type BidirectionalStreamingEchoStream = ResponseStream;

async fn bidirectional_streaming_echo(
&self,
_: Request<tonic::Streaming<EchoRequest>>,
) -> Result<Response<Self::BidirectionalStreamingEchoStream>, Status> {
Err(Status::unimplemented("Not yet implemented"))
}
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = "[::1]:50051".parse().unwrap();

//println!("GreeterServer listening on {}", addr);

let warp = warp::service(warp::path("hello").map(|| "hello, world!"));

Server::bind(&addr)
.serve(make_service_fn(move |_| {
let greeter = GreeterServer::new(MyGreeter::default());
let echo = EchoServer::new(MyEcho::default());

let mut tonic = TonicServer::builder()
.add_service(greeter)
.add_service(echo)
.into_service();

let mut warp = warp.clone();
future::ok::<_, Infallible>(tower::service_fn(
move |req: hyper::Request<hyper::Body>| match req.version() {
Version::HTTP_11 | Version::HTTP_10 => Either::Left(
warp.call(req)
.map_ok(|res| res.map(EitherBody::Left))
.map_err(Error::from),
),
Version::HTTP_2 => Either::Right(
tonic
.call(req)
.map_ok(|res| res.map(EitherBody::Right))
.map_err(Error::from),
),
_ => unimplemented!(),
},
))
}))
.await?;

Ok(())
}

enum EitherBody<A, B> {
Left(A),
Right(B),
}

impl<A, B> http_body::Body for EitherBody<A, B>
where
A: http_body::Body + Send + Unpin,
B: http_body::Body<Data = A::Data> + Send + Unpin,
A::Error: Into<Error>,
B::Error: Into<Error>,
{
type Data = A::Data;
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;

fn is_end_stream(&self) -> bool {
match self {
EitherBody::Left(b) => b.is_end_stream(),
EitherBody::Right(b) => b.is_end_stream(),
}
}

fn poll_data(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
match self.get_mut() {
EitherBody::Left(b) => Pin::new(b).poll_data(cx).map(map_option_err),
EitherBody::Right(b) => Pin::new(b).poll_data(cx).map(map_option_err),
}
}

fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
match self.get_mut() {
EitherBody::Left(b) => Pin::new(b).poll_trailers(cx).map_err(Into::into),
EitherBody::Right(b) => Pin::new(b).poll_trailers(cx).map_err(Into::into),
}
}
}

fn map_option_err<T, U: Into<Error>>(err: Option<Result<T, U>>) -> Option<Result<T, Error>> {
err.map(|e| e.map_err(Into::into))
}
43 changes: 42 additions & 1 deletion tonic/src/transport/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use super::service::{Or, Routes, ServerIo, ServiceBuilderExt};
use crate::{body::BoxBody, request::ConnectionInfo};
use futures_core::Stream;
use futures_util::{
future::{self, MapErr},
future::{self, Either as FutureEither, MapErr},
TryFutureExt,
};
use http::{HeaderMap, Request, Response};
Expand Down Expand Up @@ -78,6 +78,42 @@ pub struct Router<A, B> {
routes: Routes<A, B, Request<Body>>,
}

/// A service that is produced from a Tonic `Router`.
///
/// This service implementation will route between multiple Tonic
/// gRPC endpoints and can be consumed with the rest of the `tower`
/// ecosystem.
#[derive(Debug)]
pub struct RouterService<A, B> {
router: Router<A, B>,
}

impl<A, B> Service<Request<Body>> for RouterService<A, B>
where
A: Service<Request<Body>, Response = Response<BoxBody>> + Clone + Send + 'static,
A::Future: Send + 'static,
A::Error: Into<crate::Error> + Send,
B: Service<Request<Body>, Response = Response<BoxBody>> + Clone + Send + 'static,
B::Future: Send + 'static,
B::Error: Into<crate::Error> + Send,
{
type Response = Response<BoxBody>;
type Future = FutureEither<
MapErr<A::Future, fn(A::Error) -> crate::Error>,
MapErr<B::Future, fn(B::Error) -> crate::Error>,
>;
type Error = crate::Error;

fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

#[inline]
fn call(&mut self, req: Request<Body>) -> Self::Future {
self.router.routes.call(req)
}
}

/// A trait to provide a static reference to the service's
/// name. This is used for routing service's within the router.
pub trait NamedService {
Expand Down Expand Up @@ -476,6 +512,11 @@ where
.serve_with_shutdown(self.routes, incoming, Some(signal))
.await
}

/// Create a tower service out of a router.
pub fn into_service(self) -> RouterService<A, B> {
RouterService { router: self }
}
}

impl fmt::Debug for Server {
Expand Down

0 comments on commit 37f6733

Please sign in to comment.