diff --git a/Cargo.toml b/Cargo.toml
index 7bb948d2..d18fab3b 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -10,6 +10,21 @@ members = [
"examples/echo",
"examples/greeter",
"dubbo-build",
- "common/logger"
+ "remoting/net"
]
+
+[workspace.dependencies]
+pin-project = "1"
+tokio = "1.0"
+tower = "0.4"
+tokio-stream = "0.1"
+tokio-util = "0.7"
+socket2 = "0.4"
+async-trait = "0.1"
+dashmap = "5"
+lazy_static = "1"
+futures = "0.3"
+tracing = "0.1"
+tracing-subscriber = "0.3.15"
+
diff --git a/config/src/protocol.rs b/config/src/protocol.rs
index cdc357ad..4a47ac98 100644
--- a/config/src/protocol.rs
+++ b/config/src/protocol.rs
@@ -73,10 +73,10 @@ impl ProtocolRetrieve for ProtocolConfig {
fn get_protocol_or_default(&self, protocol_key: &str) -> Protocol {
let result = self.get_protocol(protocol_key);
if let Some(..) = result {
- result.unwrap().clone()
+ result.unwrap()
} else {
let result = self.get_protocol(protocol_key);
- if result.is_none() {
+ if let Some(..) = result {
panic!("default triple protocol dose not defined.")
} else {
result.unwrap()
diff --git a/remoting/net/Cargo.toml b/remoting/net/Cargo.toml
new file mode 100644
index 00000000..c39a362e
--- /dev/null
+++ b/remoting/net/Cargo.toml
@@ -0,0 +1,17 @@
+[package]
+name = "remoting"
+version = "0.1.0"
+edition = "2021"
+
+
+[dependencies]
+pin-project.workspace = true
+tokio = { workspace = true, features = ["net", "time", "sync", "io-util","test-util","macros"] }
+tokio-stream = { workspace = true, features = ["net"] }
+tower.workspace = true
+socket2.workspace = true
+async-trait.workspace = true
+dashmap.workspace = true
+lazy_static.workspace = true
+futures.workspace = true
+tracing.workspace = true
diff --git a/remoting/net/LICENSE b/remoting/net/LICENSE
new file mode 100644
index 00000000..d6456956
--- /dev/null
+++ b/remoting/net/LICENSE
@@ -0,0 +1,202 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
diff --git a/remoting/net/src/conn.rs b/remoting/net/src/conn.rs
new file mode 100644
index 00000000..c3155017
--- /dev/null
+++ b/remoting/net/src/conn.rs
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use std::{
+ io,
+ pin::Pin,
+ task::{Context, Poll},
+};
+
+use pin_project::pin_project;
+#[cfg(target_family = "unix")]
+use tokio::net::{unix, UnixStream};
+use tokio::{
+ io::{AsyncRead, AsyncWrite, ReadBuf},
+ net::{tcp, TcpStream},
+};
+
+use super::Address;
+
+#[derive(Clone)]
+pub struct ConnInfo {
+ pub peer_addr: Option
,
+}
+
+pub trait DynStream: AsyncRead + AsyncWrite + Send + 'static {}
+
+impl DynStream for T where T: AsyncRead + AsyncWrite + Send + 'static {}
+
+#[pin_project(project = IoStreamProj)]
+pub enum ConnStream {
+ Tcp(#[pin] TcpStream),
+ #[cfg(target_family = "unix")]
+ Unix(#[pin] UnixStream),
+}
+
+#[pin_project(project = OwnedWriteHalfProj)]
+pub enum OwnedWriteHalf {
+ Tcp(#[pin] tcp::OwnedWriteHalf),
+ #[cfg(target_family = "unix")]
+ Unix(#[pin] unix::OwnedWriteHalf),
+}
+
+impl AsyncWrite for OwnedWriteHalf {
+ #[inline]
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll> {
+ match self.project() {
+ OwnedWriteHalfProj::Tcp(half) => half.poll_write(cx, buf),
+ #[cfg(target_family = "unix")]
+ OwnedWriteHalfProj::Unix(half) => half.poll_write(cx, buf),
+ }
+ }
+
+ #[inline]
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> {
+ match self.project() {
+ OwnedWriteHalfProj::Tcp(half) => half.poll_flush(cx),
+ #[cfg(target_family = "unix")]
+ OwnedWriteHalfProj::Unix(half) => half.poll_flush(cx),
+ }
+ }
+
+ #[inline]
+ fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> {
+ match self.project() {
+ OwnedWriteHalfProj::Tcp(half) => half.poll_shutdown(cx),
+ #[cfg(target_family = "unix")]
+ OwnedWriteHalfProj::Unix(half) => half.poll_shutdown(cx),
+ }
+ }
+
+ #[inline]
+ fn poll_write_vectored(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ bufs: &[io::IoSlice<'_>],
+ ) -> Poll> {
+ match self.project() {
+ OwnedWriteHalfProj::Tcp(half) => half.poll_write_vectored(cx, bufs),
+ #[cfg(target_family = "unix")]
+ OwnedWriteHalfProj::Unix(half) => half.poll_write_vectored(cx, bufs),
+ }
+ }
+
+ #[inline]
+ fn is_write_vectored(&self) -> bool {
+ match self {
+ Self::Tcp(half) => half.is_write_vectored(),
+ #[cfg(target_family = "unix")]
+ Self::Unix(half) => half.is_write_vectored(),
+ }
+ }
+}
+
+#[pin_project(project = OwnedReadHalfProj)]
+pub enum OwnedReadHalf {
+ Tcp(#[pin] tcp::OwnedReadHalf),
+ #[cfg(target_family = "unix")]
+ Unix(#[pin] unix::OwnedReadHalf),
+}
+
+impl AsyncRead for OwnedReadHalf {
+ #[inline]
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll> {
+ match self.project() {
+ OwnedReadHalfProj::Tcp(half) => half.poll_read(cx, buf),
+ #[cfg(target_family = "unix")]
+ OwnedReadHalfProj::Unix(half) => half.poll_read(cx, buf),
+ }
+ }
+}
+
+impl ConnStream {
+ #[allow(clippy::type_complexity)]
+ pub fn into_split(self) -> (OwnedReadHalf, OwnedWriteHalf) {
+ match self {
+ Self::Tcp(stream) => {
+ let (rh, wh) = stream.into_split();
+ (OwnedReadHalf::Tcp(rh), OwnedWriteHalf::Tcp(wh))
+ }
+ #[cfg(target_family = "unix")]
+ Self::Unix(stream) => {
+ let (rh, wh) = stream.into_split();
+ (OwnedReadHalf::Unix(rh), OwnedWriteHalf::Unix(wh))
+ }
+ }
+ }
+}
+
+impl From for ConnStream {
+ #[inline]
+ fn from(s: TcpStream) -> Self {
+ let _ = s.set_nodelay(true);
+ Self::Tcp(s)
+ }
+}
+
+#[cfg(target_family = "unix")]
+impl From for ConnStream {
+ #[inline]
+ fn from(s: UnixStream) -> Self {
+ Self::Unix(s)
+ }
+}
+
+impl AsyncRead for ConnStream {
+ #[inline]
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll> {
+ match self.project() {
+ IoStreamProj::Tcp(s) => s.poll_read(cx, buf),
+ #[cfg(target_family = "unix")]
+ IoStreamProj::Unix(s) => s.poll_read(cx, buf),
+ }
+ }
+}
+
+impl AsyncWrite for ConnStream {
+ #[inline]
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll> {
+ match self.project() {
+ IoStreamProj::Tcp(s) => s.poll_write(cx, buf),
+ #[cfg(target_family = "unix")]
+ IoStreamProj::Unix(s) => s.poll_write(cx, buf),
+ }
+ }
+
+ #[inline]
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> {
+ match self.project() {
+ IoStreamProj::Tcp(s) => s.poll_flush(cx),
+ #[cfg(target_family = "unix")]
+ IoStreamProj::Unix(s) => s.poll_flush(cx),
+ }
+ }
+
+ #[inline]
+ fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> {
+ match self.project() {
+ IoStreamProj::Tcp(s) => s.poll_shutdown(cx),
+ #[cfg(target_family = "unix")]
+ IoStreamProj::Unix(s) => s.poll_shutdown(cx),
+ }
+ }
+
+ #[inline]
+ fn poll_write_vectored(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ bufs: &[io::IoSlice<'_>],
+ ) -> Poll> {
+ match self.project() {
+ IoStreamProj::Tcp(s) => s.poll_write_vectored(cx, bufs),
+ #[cfg(target_family = "unix")]
+ IoStreamProj::Unix(s) => s.poll_write_vectored(cx, bufs),
+ }
+ }
+
+ #[inline]
+ fn is_write_vectored(&self) -> bool {
+ match self {
+ Self::Tcp(s) => s.is_write_vectored(),
+ #[cfg(target_family = "unix")]
+ Self::Unix(s) => s.is_write_vectored(),
+ }
+ }
+}
+
+impl ConnStream {
+ #[inline]
+ pub fn peer_addr(&self) -> Option {
+ match self {
+ Self::Tcp(s) => s.peer_addr().map(Address::from).ok(),
+ #[cfg(target_family = "unix")]
+ Self::Unix(s) => s.peer_addr().ok().and_then(|s| Address::try_from(s).ok()),
+ }
+ }
+}
+pub struct Conn {
+ pub stream: ConnStream,
+ pub info: ConnInfo,
+}
+
+impl Conn {
+ #[inline]
+ pub fn new(stream: ConnStream, info: ConnInfo) -> Self {
+ Conn { stream, info }
+ }
+}
+
+impl From for Conn
+where
+ T: Into,
+{
+ #[inline]
+ fn from(i: T) -> Self {
+ let i = i.into();
+ let peer_addr = i.peer_addr();
+ Conn::new(i, ConnInfo { peer_addr })
+ }
+}
+
+impl AsyncRead for Conn {
+ #[inline]
+ fn poll_read(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll> {
+ Pin::new(&mut self.stream).poll_read(cx, buf)
+ }
+}
+
+impl AsyncWrite for Conn {
+ #[inline]
+ fn poll_write(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll> {
+ Pin::new(&mut self.stream).poll_write(cx, buf)
+ }
+
+ #[inline]
+ fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> {
+ Pin::new(&mut self.stream).poll_flush(cx)
+ }
+
+ #[inline]
+ fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> {
+ Pin::new(&mut self.stream).poll_shutdown(cx)
+ }
+
+ #[inline]
+ fn poll_write_vectored(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ bufs: &[io::IoSlice<'_>],
+ ) -> Poll> {
+ Pin::new(&mut self.stream).poll_write_vectored(cx, bufs)
+ }
+
+ #[inline]
+ fn is_write_vectored(&self) -> bool {
+ self.stream.is_write_vectored()
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use crate::{dial::DefaultMakeTransport, Address};
+ use tokio::io::AsyncWriteExt;
+
+ // listen by command: nc -l 8858 -v
+ #[tokio::test(flavor = "current_thread")]
+ async fn test_write_bytes() {
+ let transport = DefaultMakeTransport::new();
+ let mut conn = transport
+ .make_connection(Address::Ip("127.0.0.1:8858".parse().unwrap()))
+ .await
+ .unwrap();
+ conn.write_all("\n\rhello dubbo-rust\n\r".to_string().as_bytes())
+ .await
+ .unwrap();
+ }
+}
diff --git a/remoting/net/src/dial.rs b/remoting/net/src/dial.rs
new file mode 100644
index 00000000..704e7cd3
--- /dev/null
+++ b/remoting/net/src/dial.rs
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use std::io;
+
+use socket2::{Domain, Protocol, Socket, Type};
+#[cfg(target_family = "unix")]
+use tokio::net::UnixStream;
+use tokio::{
+ io::{AsyncRead, AsyncWrite},
+ net::TcpSocket,
+ time::{timeout, Duration},
+};
+
+use super::{
+ conn::{Conn, OwnedReadHalf, OwnedWriteHalf},
+ Address,
+};
+
+/// [`MakeTransport`] creates an [`AsyncRead`] and an [`AsyncWrite`] for the given [`Address`].
+#[async_trait::async_trait]
+pub trait MakeTransport: Clone + Send + Sync + 'static {
+ type ReadHalf: AsyncRead + Send + Sync + Unpin + 'static;
+ type WriteHalf: AsyncWrite + Send + Sync + Unpin + 'static;
+
+ async fn make_transport(&self, addr: Address) -> io::Result<(Self::ReadHalf, Self::WriteHalf)>;
+ fn set_connect_timeout(&mut self, timeout: Option);
+ fn set_read_timeout(&mut self, timeout: Option);
+ fn set_write_timeout(&mut self, timeout: Option);
+}
+
+#[derive(Default, Debug, Clone, Copy)]
+pub struct DefaultMakeTransport {
+ cfg: Config,
+}
+
+#[derive(Default, Debug, Clone, Copy)]
+pub struct Config {
+ pub connect_timeout: Option,
+ pub read_timeout: Option,
+ pub write_timeout: Option,
+}
+
+impl Config {
+ pub fn new(
+ connect_timeout: Option,
+ read_timeout: Option,
+ write_timeout: Option,
+ ) -> Self {
+ Self {
+ connect_timeout,
+ read_timeout,
+ write_timeout,
+ }
+ }
+
+ pub fn with_connect_timeout(mut self, timeout: Option) -> Self {
+ self.connect_timeout = timeout;
+ self
+ }
+
+ pub fn with_read_timeout(mut self, timeout: Option) -> Self {
+ self.read_timeout = timeout;
+ self
+ }
+
+ pub fn with_write_timeout(mut self, timeout: Option) -> Self {
+ self.write_timeout = timeout;
+ self
+ }
+}
+
+impl DefaultMakeTransport {
+ pub fn new() -> Self {
+ Self::default()
+ }
+}
+
+#[async_trait::async_trait]
+impl MakeTransport for DefaultMakeTransport {
+ type ReadHalf = OwnedReadHalf;
+
+ type WriteHalf = OwnedWriteHalf;
+
+ async fn make_transport(&self, addr: Address) -> io::Result<(Self::ReadHalf, Self::WriteHalf)> {
+ let conn = self.make_connection(addr).await?;
+ let (read, write) = conn.stream.into_split();
+ Ok((read, write))
+ }
+
+ fn set_connect_timeout(&mut self, timeout: Option) {
+ self.cfg = self.cfg.with_connect_timeout(timeout);
+ }
+
+ fn set_read_timeout(&mut self, timeout: Option) {
+ self.cfg = self.cfg.with_read_timeout(timeout);
+ }
+
+ fn set_write_timeout(&mut self, timeout: Option) {
+ self.cfg = self.cfg.with_write_timeout(timeout);
+ }
+}
+
+impl DefaultMakeTransport {
+ pub async fn make_connection(&self, addr: Address) -> Result {
+ match addr {
+ Address::Ip(addr) => {
+ let stream = {
+ let domain = Domain::for_address(addr);
+ let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))?;
+ socket.set_nonblocking(true)?;
+ socket.set_read_timeout(self.cfg.read_timeout)?;
+ socket.set_write_timeout(self.cfg.write_timeout)?;
+
+ #[cfg(unix)]
+ let socket = unsafe {
+ use std::os::unix::io::{FromRawFd, IntoRawFd};
+ TcpSocket::from_raw_fd(socket.into_raw_fd())
+ };
+ #[cfg(windows)]
+ let socket = unsafe {
+ use std::os::windows::io::{FromRawSocket, IntoRawSocket};
+ TcpSocket::from_raw_socket(socket.into_raw_socket())
+ };
+
+ let connect = socket.connect(addr);
+
+ if let Some(conn_timeout) = self.cfg.connect_timeout {
+ timeout(conn_timeout, connect).await??
+ } else {
+ connect.await?
+ }
+ };
+ stream.set_nodelay(true)?;
+ Ok(Conn::from(stream))
+ }
+ #[cfg(target_family = "unix")]
+ Address::Unix(addr) => UnixStream::connect(addr).await.map(Conn::from),
+ }
+ }
+}
diff --git a/remoting/net/src/incoming.rs b/remoting/net/src/incoming.rs
new file mode 100644
index 00000000..82672605
--- /dev/null
+++ b/remoting/net/src/incoming.rs
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use std::{
+ fmt, io,
+ task::{Context, Poll},
+};
+
+use futures::Stream;
+use pin_project::pin_project;
+use tokio::net::TcpListener;
+#[cfg(target_family = "unix")]
+use tokio::net::UnixListener;
+#[cfg(target_family = "unix")]
+use tokio_stream::wrappers::UnixListenerStream;
+use tokio_stream::{wrappers::TcpListenerStream, StreamExt};
+
+use super::{conn::Conn, Address};
+
+#[pin_project(project = IncomingProj)]
+#[derive(Debug)]
+pub enum DefaultIncoming {
+ Tcp(#[pin] TcpListenerStream),
+ #[cfg(target_family = "unix")]
+ Unix(#[pin] UnixListenerStream),
+}
+
+#[async_trait::async_trait]
+impl MakeIncoming for DefaultIncoming {
+ type Incoming = DefaultIncoming;
+
+ async fn make_incoming(self) -> io::Result {
+ Ok(self)
+ }
+}
+
+#[cfg(target_family = "unix")]
+impl From for DefaultIncoming {
+ fn from(l: UnixListener) -> Self {
+ DefaultIncoming::Unix(UnixListenerStream::new(l))
+ }
+}
+
+impl From for DefaultIncoming {
+ fn from(l: TcpListener) -> Self {
+ DefaultIncoming::Tcp(TcpListenerStream::new(l))
+ }
+}
+
+#[async_trait::async_trait]
+pub trait Incoming: fmt::Debug + Send + 'static {
+ async fn accept(&mut self) -> io::Result