diff --git a/.licenserc.yaml b/.licenserc.yaml index d5778c31..6bbef770 100644 --- a/.licenserc.yaml +++ b/.licenserc.yaml @@ -65,6 +65,7 @@ header: # `header` section is configurations for source codes license header. - '.github' - "**/*.yaml" - "**/generated/**" + - "**/fixtures/**" comment: on-failure # on what condition license-eye will comment on the pull request, `on-failure`, `always`, `never`. # license-location-threshold specifies the index threshold where the license header can be located, diff --git a/dubbo/Cargo.toml b/dubbo/Cargo.toml index d0ab2a4d..b656aed1 100644 --- a/dubbo/Cargo.toml +++ b/dubbo/Cargo.toml @@ -19,7 +19,9 @@ futures-util = "0.3.23" futures-core ="0.3.23" argh = "0.1" rustls-pemfile = "1.0.0" -tokio-rustls="0.23.4" +rustls-webpki = "0.101.3" +rustls-native-certs = "0.6.3" +tokio-rustls="0.24.1" tokio = { version = "1.0", features = [ "rt-multi-thread", "time", "fs", "macros", "net", "signal", "full" ] } prost = "0.10.4" async-trait = "0.1.56" diff --git a/dubbo/src/cluster/directory.rs b/dubbo/src/cluster/directory.rs index afe9657b..144f0111 100644 --- a/dubbo/src/cluster/directory.rs +++ b/dubbo/src/cluster/directory.rs @@ -62,7 +62,8 @@ impl StaticDirectory { impl Directory for StaticDirectory { fn list(&self, invocation: Arc) -> Vec { let url = Url::from_url(&format!( - "tri://{}:{}/{}", + "{}://{}:{}/{}", + self.uri.scheme_str().unwrap_or("tri"), self.uri.host().unwrap(), self.uri.port().unwrap(), invocation.get_target_service_unique_name(), diff --git a/dubbo/src/protocol/triple/triple_invoker.rs b/dubbo/src/protocol/triple/triple_invoker.rs index fb661f9e..42dfebe0 100644 --- a/dubbo/src/protocol/triple/triple_invoker.rs +++ b/dubbo/src/protocol/triple/triple_invoker.rs @@ -38,9 +38,13 @@ pub struct TripleInvoker { impl TripleInvoker { pub fn new(url: Url) -> TripleInvoker { let uri = http::Uri::from_str(&url.to_url()).unwrap(); + let mut conn = Connection::new().with_host(uri.clone()); + if let Some(scheme) = uri.scheme_str() { + conn = conn.with_connector(scheme.to_string()); + } Self { url, - conn: BoxCloneService::new(Connection::new().with_host(uri)), + conn: BoxCloneService::new(conn), } } } diff --git a/dubbo/src/triple/client/builder.rs b/dubbo/src/triple/client/builder.rs index 06ecd627..9dae0f93 100644 --- a/dubbo/src/triple/client/builder.rs +++ b/dubbo/src/triple/client/builder.rs @@ -59,7 +59,7 @@ impl ClientBuilder { connector: "", directory: Some(Box::new(StaticDirectory::new(&host))), direct: true, - host: host.clone().to_string(), + host: host.to_string(), } } diff --git a/dubbo/src/triple/transport/connection.rs b/dubbo/src/triple/transport/connection.rs index 1b978750..10a879a5 100644 --- a/dubbo/src/triple/transport/connection.rs +++ b/dubbo/src/triple/transport/connection.rs @@ -26,7 +26,7 @@ use crate::{boxed, triple::transport::connector::get_connector}; #[derive(Debug, Clone)] pub struct Connection { host: hyper::Uri, - connector: &'static str, + connector: String, builder: Builder, } @@ -40,12 +40,12 @@ impl Connection { pub fn new() -> Self { Connection { host: hyper::Uri::default(), - connector: "http", + connector: "http".to_string(), builder: Builder::new(), } } - pub fn with_connector(mut self, connector: &'static str) -> Self { + pub fn with_connector(mut self, connector: String) -> Self { self.connector = connector; self } @@ -82,7 +82,7 @@ where fn call(&mut self, req: http::Request) -> Self::Future { let builder = self.builder.clone().http2_only(true).to_owned(); - let mut connector = Connect::new(get_connector(self.connector), builder); + let mut connector = Connect::new(get_connector(self.connector.as_str()), builder); let uri = self.host.clone(); let fut = async move { debug!("send base call to {}", uri); diff --git a/dubbo/src/triple/transport/connector/https_connector.rs b/dubbo/src/triple/transport/connector/https_connector.rs new file mode 100644 index 00000000..eb217cb6 --- /dev/null +++ b/dubbo/src/triple/transport/connector/https_connector.rs @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::{ + net::{Ipv4Addr, SocketAddr, SocketAddrV4}, + str::FromStr, + sync::Arc, +}; + +use dubbo_logger::tracing; +use http::Uri; +use hyper::client::connect::dns::Name; +use rustls_native_certs::load_native_certs; +use tokio::net::TcpStream; +use tokio_rustls::{ + client::TlsStream, + rustls::{self}, + TlsConnector as TlsConnectorTokio, +}; +use tower_service::Service; + +use crate::triple::transport::resolver::{dns::DnsResolver, Resolve}; + +#[derive(Clone, Default)] +pub struct HttpsConnector { + resolver: R, +} + +impl HttpsConnector { + pub fn new() -> Self { + Self { + resolver: DnsResolver::default(), + } + } +} + +impl HttpsConnector { + pub fn new_with_resolver(resolver: R) -> HttpsConnector { + Self { resolver } + } +} + +impl Service for HttpsConnector +where + R: Resolve + Clone + Send + Sync + 'static, + R::Future: Send, +{ + type Response = TlsStream; + + type Error = crate::Error; + + type Future = crate::BoxFuture; + + fn poll_ready( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.resolver.poll_ready(cx).map_err(|err| err.into()) + } + + fn call(&mut self, uri: Uri) -> Self::Future { + let mut inner = self.clone(); + + Box::pin(async move { inner.call_async(uri).await }) + } +} + +impl HttpsConnector +where + R: Resolve + Send + Sync + 'static, +{ + async fn call_async(&mut self, uri: Uri) -> Result, crate::Error> { + let host = uri.host().unwrap(); + let port = uri.port_u16().unwrap(); + + let addr = if let Ok(addr) = host.parse::() { + tracing::info!("host is ip address: {:?}", host); + SocketAddr::V4(SocketAddrV4::new(addr, port)) + } else { + tracing::info!("host is dns: {:?}", host); + let addrs = self + .resolver + .resolve(Name::from_str(host).unwrap()) + .await + .map_err(|err| err.into())?; + let addrs: Vec = addrs + .map(|mut addr| { + addr.set_port(port); + addr + }) + .collect(); + addrs[0] + }; + + let mut root_store = rustls::RootCertStore::empty(); + + for cert in load_native_certs()? { + root_store.add(&rustls::Certificate(cert.0))?; + } + + let config = rustls::ClientConfig::builder() + .with_safe_defaults() + .with_root_certificates(root_store) + .with_no_client_auth(); + + let connector = TlsConnectorTokio::from(Arc::new(config)); + + let stream = TcpStream::connect(&addr).await?; + let domain = rustls::ServerName::try_from(host).map_err(|err| { + crate::status::Status::new(crate::status::Code::Internal, err.to_string()) + })?; + let stream = connector.connect(domain, stream).await?; + + Ok(stream) + } +} diff --git a/dubbo/src/triple/transport/connector/mod.rs b/dubbo/src/triple/transport/connector/mod.rs index 703201ee..690b781a 100644 --- a/dubbo/src/triple/transport/connector/mod.rs +++ b/dubbo/src/triple/transport/connector/mod.rs @@ -16,6 +16,7 @@ */ pub mod http_connector; +pub mod https_connector; #[cfg(any(target_os = "macos", target_os = "unix"))] pub mod unix_connector; @@ -73,12 +74,16 @@ where } } -pub fn get_connector(connector: &'static str) -> BoxCloneService { +pub fn get_connector(connector: &str) -> BoxCloneService { match connector { "http" => { let c = http_connector::HttpConnector::new(); BoxCloneService::new(Connector::new(c)) } + "https" => { + let c = https_connector::HttpsConnector::new(); + BoxCloneService::new(Connector::new(c)) + } #[cfg(any(target_os = "macos", target_os = "unix"))] "unix" => { let c = unix_connector::UnixConnector::new(); diff --git a/dubbo/src/utils/tls.rs b/dubbo/src/utils/tls.rs index 0072bf24..aaf59c94 100644 --- a/dubbo/src/utils/tls.rs +++ b/dubbo/src/utils/tls.rs @@ -15,10 +15,10 @@ * limitations under the License. */ -use rustls_pemfile::{certs, rsa_private_keys}; +use rustls_pemfile::{certs, ec_private_keys, pkcs8_private_keys, rsa_private_keys}; use std::{ fs::File, - io::{self, BufReader}, + io::{self, BufReader, Cursor, Read}, path::Path, }; use tokio_rustls::rustls::{Certificate, PrivateKey}; @@ -30,7 +30,22 @@ pub fn load_certs(path: &Path) -> io::Result> { } pub fn load_keys(path: &Path) -> io::Result> { - rsa_private_keys(&mut BufReader::new(File::open(path)?)) - .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "invalid key")) - .map(|mut keys| keys.drain(..).map(PrivateKey).collect()) + let file = &mut BufReader::new(File::open(path)?); + let mut data = Vec::new(); + file.read_to_end(&mut data)?; + + let mut cursor = Cursor::new(data); + + let parsers = [rsa_private_keys, pkcs8_private_keys, ec_private_keys]; + + for parser in &parsers { + if let Ok(mut key) = parser(&mut cursor) { + if !key.is_empty() { + return Ok(key.drain(..).map(PrivateKey).collect()); + } + } + cursor.set_position(0); + } + + Err(io::Error::new(io::ErrorKind::InvalidInput, "invalid key")) } diff --git a/examples/echo/Cargo.toml b/examples/echo/Cargo.toml index bc6638b8..6ea81bf7 100644 --- a/examples/echo/Cargo.toml +++ b/examples/echo/Cargo.toml @@ -19,6 +19,14 @@ path = "src/echo/server.rs" name = "echo-client" path = "src/echo/client.rs" +[[bin]] +name = "echo-tls-server" +path = "src/echo-tls/server.rs" + +[[bin]] +name = "echo-tls-client" +path = "src/echo-tls/client.rs" + [dependencies] http = "0.2" http-body = "0.4.4" diff --git a/examples/echo/README.md b/examples/echo/README.md index e2f73794..14868718 100644 --- a/examples/echo/README.md +++ b/examples/echo/README.md @@ -15,3 +15,21 @@ reply: EchoResponse { message: "msg1 from server" } reply: EchoResponse { message: "msg2 from server" } reply: EchoResponse { message: "msg3 from server" } ``` + +## build and run `echo-tls` + +**Please first install the `ca.crt` certificate file under the `fixtures` path to the platform's native certificate store.** + +```sh +$ cd github.com/apache/dubbo-rust/examples/echo-tls/ +$ cargo build + +$ # run sever +$ ../../target/debug/echo-tls-server + +$ # run client +$ ../../target/debug/echo-tls-client +reply: EchoResponse { message: "msg1 from tls-server" } +reply: EchoResponse { message: "msg2 from tls-server" } +reply: EchoResponse { message: "msg3 from tls-server" } +``` diff --git a/examples/echo/README_CN.md b/examples/echo/README_CN.md index b52cc1c4..a37862c6 100644 --- a/examples/echo/README_CN.md +++ b/examples/echo/README_CN.md @@ -15,3 +15,21 @@ reply: EchoResponse { message: "msg1 from server" } reply: EchoResponse { message: "msg2 from server" } reply: EchoResponse { message: "msg3 from server" } ``` + +## 构建并运行`echo-tls` + +**请先将`fixtures`路径下的`ca.crt`证书文件安装到系统信任根证书中.** + +```sh +$ cd github.com/apache/dubbo-rust/examples/echo-tls/ +$ cargo build + +$ # 运行服务端 +$ ../../target/debug/echo-tls-server + +$ # 运行客户端 +$ ../../target/debug/echo-tls-client +reply: EchoResponse { message: "msg1 from tls-server" } +reply: EchoResponse { message: "msg2 from tls-server" } +reply: EchoResponse { message: "msg3 from tls-server" } +``` \ No newline at end of file diff --git a/examples/echo/fixtures/ca.crt b/examples/echo/fixtures/ca.crt new file mode 100644 index 00000000..ea2b4858 --- /dev/null +++ b/examples/echo/fixtures/ca.crt @@ -0,0 +1,21 @@ +-----BEGIN CERTIFICATE----- +MIIDiTCCAnGgAwIBAgIUGJWxrMGe9qRZzAfd5w4XIT3lkcEwDQYJKoZIhvcNAQEL +BQAwVDEVMBMGA1UEAwwMVGVzdCBSb290IENBMQswCQYDVQQGEwJVUzENMAsGA1UE +CAwEVGVzdDENMAsGA1UEBwwEVGVzdDEQMA4GA1UECgwHT3BlbmRhbDAeFw0yMzA4 +MTQxMTEzMzRaFw0yNDA4MTMxMTEzMzRaMFQxFTATBgNVBAMMDFRlc3QgUm9vdCBD +QTELMAkGA1UEBhMCVVMxDTALBgNVBAgMBFRlc3QxDTALBgNVBAcMBFRlc3QxEDAO +BgNVBAoMB09wZW5kYWwwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCt +UF6mcDgSyH5t78XnJusvQxsUfv2XydHtvLcIpwkCkIuIj7nF2WH064Gv12x+y42W +mb+5z6JTgHRMRqcyQM8q4PQFrKvxPX8R2Limd7VLBJzYjR7Ma7JIrDohLnfywxUP +19P5SzaGiro+ZK3t3xCnmtHcYoM+An0mQdKyVV7ytzAfg1PqkfDme19I28fH8cOP +tF+RU8/LEHnte519O1bawx7xNdPsyykMrFij02o1VUeum2K9Wya8xHDixokveYDW +swg5G4Tsy1QfgqFgxAXahIroPIwQvZOGkWVsmPXRXHtHNFG91ntJivv2HBFniUTq +A0UbVdj09T+h+JLc19G9AgMBAAGjUzBRMB0GA1UdDgQWBBQ2672x8uh6Lud0EkjO +wt2aEioeKjAfBgNVHSMEGDAWgBQ2672x8uh6Lud0EkjOwt2aEioeKjAPBgNVHRMB +Af8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4IBAQCQkLp3GzZOXXXOKiMF6Iev1OUW +w1jr7hVdJHOVGNCD6uZLuwSXJOWnEP8+hp8WvMl7SQAPpVYsTjdqhLATLaAZDucG +sDq6oUTh/v8QVIBm0qF8+iMU8XZfgoeKuY13RXs23hneMAPQ5rcPwQhQEQkkqUvi +Fq8qYFVd5mEr6Z62DT0s544WaBrpHr37mHOv0hIkHtX7Dy2Juc23MYw+W4PSD4fm +sr1kARwHtY1meX+H3iRsX+7juTa33v+7H4IivhcPobIxFp+Hs9R5mx5u80wKMjVv +t3STmB4nE7pABzucrjkSo43jIUwYN4rwydlSma9VkzvY6ry86HQuemycRb9H +-----END CERTIFICATE----- diff --git a/examples/echo/fixtures/server.crt b/examples/echo/fixtures/server.crt new file mode 100644 index 00000000..6788eb4c --- /dev/null +++ b/examples/echo/fixtures/server.crt @@ -0,0 +1,23 @@ +-----BEGIN CERTIFICATE----- +MIID1zCCAr+gAwIBAgIUJKTfvASV+RnwF7oLO84HZJDyvLwwDQYJKoZIhvcNAQEL +BQAwVDEVMBMGA1UEAwwMVGVzdCBSb290IENBMQswCQYDVQQGEwJVUzENMAsGA1UE +CAwEVGVzdDENMAsGA1UEBwwEVGVzdDEQMA4GA1UECgwHT3BlbmRhbDAeFw0yMzA4 +MTQxNDU5MzZaFw0yNDA2MDkxNDU5MzZaMFkxGjAYBgNVBAMMEVJlZGlzIGNlcnRp +ZmljYXRlMQswCQYDVQQGEwJVUzENMAsGA1UECAwEVGVzdDENMAsGA1UEBwwEVGVz +dDEQMA4GA1UECgwHT3BlbmRhbDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoC +ggEBAIwREKDrRgZ2jlR3tpLHvMiW8JDu4JiLBxyrlJJE5ndhuH7MEgwz8HnXvxbD +eyuamzkAzQIvqfVFVTRuVEYyEtoGzIegDL76H9ybuMGhKBK1m0TmiH7bOsAVMqZN +vDtQJiw8qePtSq3G3H7Sw+/oudrJIc/f7kDox/lndKHTBmLbjSrvpkOJk2qnvhPJ +ih4SuLNiW+tHv4sUdYBXXxn2wLHXNLGrlpeW28jtWGfu2noRCzikOYL/jwg2xzXV +cBSuFwQ3swLDG/htqpePVA/sLxbXTt03A8fCajYcKiJdW88gqw4dW01ya8rCr5MU +1C7lPwNCB8qNn8pdkmrh/Oc0zDsCAwEAAaOBmzCBmDAfBgNVHSMEGDAWgBQ2672x +8uh6Lud0EkjOwt2aEioeKjAJBgNVHRMEAjAAMAsGA1UdDwQEAwIE8DA+BgNVHREE +NzA1gglsb2NhbGhvc3SHBH8AAAGHBKweAAKHBKweAAOHBKweAASHBKweAAWHBKwe +AAaHBKweAAcwHQYDVR0OBBYEFGvNF07RBwyi3tbpFIJtvWhXAGblMA0GCSqGSIb3 +DQEBCwUAA4IBAQAd57+0YXfg8eIe2UkqLshIEonoIpKhmsIpRJyXLOUWYaSHri4w +aDqPjogA39w34UcZsumfSReWBGrCyBroSCqQZOM166tw79+AVdjHHtgNm8pFRhO7 +0vnFdAU30TOQP+mRF3mXz3hcK68U/4cRhXC5jXq8YRLiAG74G3PmXmmk2phtluEL +SLLCvF5pCz3EaYsEKP+ZQpdY3BLp6Me7XDpGWPuNYVwVTJwwM9CLjQ8pxMlz1O1x +HVN7xGtLz4dw9nEqnmjYBvH8aum+iAQPiHVuGfQfqIea28XeuyV4c5TL2b+OUsLY +BRhX+z5OkGHXcMc1QDKo3PZcs8C1w8SC1x9D +-----END CERTIFICATE----- diff --git a/examples/echo/fixtures/server.key b/examples/echo/fixtures/server.key new file mode 100644 index 00000000..ad53eac5 --- /dev/null +++ b/examples/echo/fixtures/server.key @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCMERCg60YGdo5U +d7aSx7zIlvCQ7uCYiwccq5SSROZ3Ybh+zBIMM/B5178Ww3srmps5AM0CL6n1RVU0 +blRGMhLaBsyHoAy++h/cm7jBoSgStZtE5oh+2zrAFTKmTbw7UCYsPKnj7Uqtxtx+ +0sPv6LnaySHP3+5A6Mf5Z3Sh0wZi240q76ZDiZNqp74TyYoeErizYlvrR7+LFHWA +V18Z9sCx1zSxq5aXltvI7Vhn7tp6EQs4pDmC/48INsc11XAUrhcEN7MCwxv4baqX +j1QP7C8W107dNwPHwmo2HCoiXVvPIKsOHVtNcmvKwq+TFNQu5T8DQgfKjZ/KXZJq +4fznNMw7AgMBAAECggEAA5l0ABv7s90mbDTwBqvxBdtHJFpXKuRhEui1NosPEctQ +6+/qQ3uu4YVcqO+YweFFEufFMkSvopjHse4R5q87vR7xRkej0Yvo914zFxrBRmB6 +CdZoyeXFXTv442gvqaXgzUCOgcfOeafDcSjmayBjwk5qkDEqKhXb/w9HDS+N7vVk +BU+b/lMzQbGWb/oc3pHmEYXqFR+sFkHM2nCWBvQ1hRX4TVaeZpUWH7RBE95z87ug +F21yqEjQfaTh2cidKXWtnozxIv2XgUncY40njdhRRzyJqWIW4CEdxIAX0IWT0z2+ +4L59DoNyYaimnaaSmNDj5WgDgL7tlTziXBJfBTpqDQKBgQDAe6Tf49eZINZ6bxHC +RDzFSKikBOvC9rkhGOzD6JBALPbdWH9HnnH4cT5F4b5y96rPEqzO1+RYQdIF4GDx +NYafMx8Nht0j0WWJLkygUCa8guqaaFczaquaIHQ2YpzzhpLlmAdEz1Jrsk2LfM2Q +58b7JHb+Aq/+UAIuBhL7FlRf7QKBgQC6SXR6opkjUfkrsWcQiB5CJDk7zuL8G+Ks +Jle2S1TzFdBL5rNnVttC7yYmZIP7qN6zDtsbDxqW5gEeDmyGDixLeQ0kQ/oPU2/y +lPr9rHx+BUWEGyDiCfG21Y6R/jdfrA4R5T8vPmJXOnnppXZ5Gqi1X0aHprbLYWhz +HpkvBaHHxwKBgAGx1PzHo8FMYbcIPU7JjQNrpVh0VqMLywt4jbUX2hVGkBHY0p4N +zhES5ip1V1jpx041auITUoZYZgH5PMFC6GGEcLSMyGulT1CK4M/UhNLKEEi1vHbO +bJ5ZxMwpyBn4yFhPI1k+vgoGstoUijbJY54YbxfDbEs/5xUCpq4hPzLtAoGAZVBr +3AKwnMgJZxz9u7z8D+bZhdCYHJsh5ZSY4ZkI44f6mD0pV0uixj2AlyLVsTn/nIy4 +13eYc3c2Jl2b4jC1IHr+jbm2tz0exmUGOI7lyjgdvaJveOAFqPVuq7IB9bOCl3MB +sTURkPVJtqv5yhWYqcPefQpLokMg5nM+xpcejKMCgYEAqv5gj3ez0HmLv/9k86Zs +8/780lNcYnB1dQYNJ7g3T6wu8WVGNtzOdPXGTMX9sbv9Smq0cZLZKNMtXDsc5aJT +5yzysPkDxqSK4vJmng74aHUI2HW+HvPqWLZnXC0IYGFvN8KyVkdp/FyhQMNMp6ip +5rgp5RpXJk5MhvvlYdZrz5Q= +-----END PRIVATE KEY----- diff --git a/examples/echo/src/echo-tls/client.rs b/examples/echo/src/echo-tls/client.rs new file mode 100644 index 00000000..70e4e8f0 --- /dev/null +++ b/examples/echo/src/echo-tls/client.rs @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use dubbo::codegen::*; +use example_echo::generated::generated::{echo_client::EchoClient, EchoRequest}; +use futures_util::StreamExt; + +pub struct FakeFilter {} + +impl Filter for FakeFilter { + fn call(&mut self, req: Request<()>) -> Result, dubbo::status::Status> { + println!("fake filter: {:?}", req.metadata); + Ok(req) + } +} + +#[tokio::main] +async fn main() { + dubbo_logger::init(); + + let builder = ClientBuilder::from_static(&"https://127.0.0.1:8889").with_timeout(1000000); + let mut cli = EchoClient::new(builder); + + let resp = cli + .unary_echo(Request::new(EchoRequest { + message: "message from tls-client".to_string(), + })) + .await; + let resp = match resp { + Ok(resp) => resp, + Err(err) => return println!("{:?}", err), + }; + let (_parts, body) = resp.into_parts(); + println!("Response: {:?}", body); + + let data = vec![ + EchoRequest { + message: "msg1 from tls-client streaming".to_string(), + }, + EchoRequest { + message: "msg2 from tls-client streaming".to_string(), + }, + EchoRequest { + message: "msg3 from tls-client streaming".to_string(), + }, + ]; + let req = futures_util::stream::iter(data); + let resp = cli.client_streaming_echo(req).await; + let client_streaming_resp = match resp { + Ok(resp) => resp, + Err(err) => return println!("{:?}", err), + }; + let (_parts, resp_body) = client_streaming_resp.into_parts(); + println!("tls-client streaming, Response: {:?}", resp_body); + + let data = vec![ + EchoRequest { + message: "msg1 from tls-client".to_string(), + }, + EchoRequest { + message: "msg2 from tls-client".to_string(), + }, + EchoRequest { + message: "msg3 from tls-client".to_string(), + }, + ]; + let req = futures_util::stream::iter(data); + + let bidi_resp = cli.bidirectional_streaming_echo(req).await.unwrap(); + + let (parts, mut body) = bidi_resp.into_parts(); + println!("parts: {:?}", parts); + while let Some(item) = body.next().await { + match item { + Ok(v) => { + println!("reply: {:?}", v); + } + Err(err) => { + println!("err: {:?}", err); + } + } + } + let trailer = body.trailer().await.unwrap(); + println!("trailer: {:?}", trailer); + + let resp = cli + .server_streaming_echo(Request::new(EchoRequest { + message: "server streaming req".to_string(), + })) + .await + .unwrap(); + + let (parts, mut body) = resp.into_parts(); + println!("parts: {:?}", parts); + while let Some(item) = body.next().await { + match item { + Ok(v) => { + println!("reply: {:?}", v); + } + Err(err) => { + println!("err: {:?}", err); + } + } + } + let trailer = body.trailer().await.unwrap(); + println!("trailer: {:?}", trailer); +} diff --git a/examples/echo/src/echo-tls/server.rs b/examples/echo/src/echo-tls/server.rs new file mode 100644 index 00000000..c7bcd153 --- /dev/null +++ b/examples/echo/src/echo-tls/server.rs @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::{io::ErrorKind, pin::Pin}; + +use async_trait::async_trait; +use dubbo::filter::{context::ContextFilter, timeout::TimeoutFilter}; +use futures_util::{Stream, StreamExt}; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; + +use dubbo::codegen::*; +use example_echo::generated::generated::{ + echo_server::{register_server, Echo, EchoServer}, + EchoRequest, EchoResponse, +}; + +type ResponseStream = + Pin> + Send>>; + +#[derive(Clone)] +pub struct FakeFilter {} + +impl Filter for FakeFilter { + fn call(&mut self, req: Request<()>) -> Result, dubbo::status::Status> { + println!("server fake filter: {:?}", req.metadata); + Ok(req) + } +} + +#[tokio::main] +async fn main() { + dubbo_logger::init(); + register_server(EchoServerImpl { + name: "echo".to_string(), + }); + let server = EchoServerImpl::default(); + let s = EchoServer::::with_filter(server, FakeFilter {}); + let timeout_filter = FilterService::new(s, TimeoutFilter {}); + let context_filter = FilterService::new(timeout_filter, ContextFilter {}); + + dubbo::protocol::triple::TRIPLE_SERVICES + .write() + .unwrap() + .insert( + "grpc.examples.echo.Echo".to_string(), + dubbo::utils::boxed_clone::BoxCloneService::new(context_filter), + ); + + let builder = ServerBuilder::new() + .with_listener("tcp".to_string()) + .with_tls("fixtures/server.crt", "fixtures/server.key") + .with_service_names(vec!["grpc.examples.echo.Echo".to_string()]) + .with_addr("127.0.0.1:8889"); + builder.build().serve().await.unwrap(); +} + +#[allow(dead_code)] +#[derive(Default, Clone)] +struct EchoServerImpl { + name: String, +} + +// #[async_trait] +#[async_trait] +impl Echo for EchoServerImpl { + async fn unary_echo( + &self, + req: Request, + ) -> Result, dubbo::status::Status> { + println!("EchoServer::hello {:?}", req.metadata); + + Ok(Response::new(EchoResponse { + message: "hello, dubbo-rust".to_string(), + })) + } + + type ServerStreamingEchoStream = ResponseStream; + + async fn server_streaming_echo( + &self, + req: Request, + ) -> Result, dubbo::status::Status> { + println!("server_streaming_echo: {:?}", req.into_inner()); + + let data = vec![ + Result::<_, dubbo::status::Status>::Ok(EchoResponse { + message: "msg1 from tls-server".to_string(), + }), + Result::<_, dubbo::status::Status>::Ok(EchoResponse { + message: "msg2 from tls-server".to_string(), + }), + Result::<_, dubbo::status::Status>::Ok(EchoResponse { + message: "msg3 from tls-server".to_string(), + }), + ]; + let resp = futures_util::stream::iter(data); + + Ok(Response::new(Box::pin(resp))) + } + async fn client_streaming_echo( + &self, + req: Request>, + ) -> Result, dubbo::status::Status> { + let mut s = req.into_inner(); + loop { + let result = s.next().await; + match result { + Some(Ok(val)) => println!("result: {:?}", val), + Some(Err(val)) => println!("err: {:?}", val), + None => break, + } + } + Ok(Response::new(EchoResponse { + message: "hello tls-client streaming".to_string(), + })) + } + + type BidirectionalStreamingEchoStream = ResponseStream; + + async fn bidirectional_streaming_echo( + &self, + request: Request>, + ) -> Result, dubbo::status::Status> { + println!( + "EchoServer::bidirectional_streaming_echo, grpc header: {:?}", + request.metadata + ); + + let mut in_stream = request.into_inner(); + let (tx, rx) = mpsc::channel(128); + + // this spawn here is required if you want to handle connection error. + // If we just map `in_stream` and write it back as `out_stream` the `out_stream` + // will be drooped when connection error occurs and error will never be propagated + // to mapped version of `in_stream`. + tokio::spawn(async move { + while let Some(result) = in_stream.next().await { + match result { + Ok(v) => { + // if v.name.starts_with("msg2") { + // tx.send(Err(dubbo::status::Status::internal(format!("err: args is invalid, {:?}", v.name)) + // )).await.expect("working rx"); + // continue; + // } + tx.send(Ok(EchoResponse { + message: format!("server reply: {:?}", v.message), + })) + .await + .expect("working rx") + } + Err(err) => { + if let Some(io_err) = match_for_io_error(&err) { + if io_err.kind() == ErrorKind::BrokenPipe { + // here you can handle special case when client + // disconnected in unexpected way + eprintln!("\tclient disconnected: broken pipe"); + break; + } + } + + match tx.send(Err(err)).await { + Ok(_) => (), + Err(_err) => break, // response was droped + } + } + } + } + println!("\tstream ended"); + }); + + // echo just write the same data that was received + let out_stream = ReceiverStream::new(rx); + + Ok(Response::new( + Box::pin(out_stream) as Self::BidirectionalStreamingEchoStream + )) + } +} + +fn match_for_io_error(err_status: &dubbo::status::Status) -> Option<&std::io::Error> { + let mut err: &(dyn std::error::Error + 'static) = err_status; + + loop { + if let Some(io_err) = err.downcast_ref::() { + return Some(io_err); + } + + err = match err.source() { + Some(err) => err, + None => return None, + }; + } +}