-
-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
TcpSocket specialized split #1217
Conversation
I'd like to have some feedback. I will finish the docs and |
This seems like a great idea, and I think the approach looks reasonable. There are some docs missing, but you're aware of that :) I've left a couple of smaller inline comments. |
OK, I will not include the proxy example in this PR. Post it here: //! A proxy that forwards data to another server and forwards that server's
//! responses back to clients.
//!
//! Because the Tokio runtime uses a thread pool, each TCP connection is
//! processed concurrently with all other TCP connections across multiple
//! threads.
//!
//! You can showcase this by running this in one terminal:
//!
//! cargo run --example proxy
//!
//! This in another terminal
//!
//! cargo run --example echo
//!
//! And finally this in another terminal
//!
//! cargo run --example connect 127.0.0.1:8081
//!
//! This final terminal will connect to our proxy, which will in turn connect to
//! the echo server, and you'll be able to see data flowing between them.
#![feature(async_await)]
use futures::future::try_join;
use futures::prelude::StreamExt;
use std::env;
use std::net::SocketAddr;
use tokio;
use tokio::io::AsyncReadExt;
use tokio::net::tcp::split::{TcpStreamReadHalf, TcpStreamWriteHalf};
use tokio::net::tcp::{TcpListener, TcpStream};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let listen_addr = env::args().nth(1).unwrap_or("127.0.0.1:8081".to_string());
let listen_addr = listen_addr.parse::<SocketAddr>()?;
let server_addr = env::args().nth(2).unwrap_or("127.0.0.1:8080".to_string());
let server_addr = server_addr.parse::<SocketAddr>()?;
// Create a TCP listener which will listen for incoming connections.
let socket = TcpListener::bind(&listen_addr)?;
println!("Listening on: {}", listen_addr);
println!("Proxying to: {}", server_addr);
let mut incoming = socket.incoming();
loop {
let stream = incoming.next().await.unwrap()?;
tokio::spawn(async move {
match proxy_client(stream, server_addr).await {
Err(e) => {
eprintln!("Error: {}", e);
}
_ => (),
}
});
}
}
async fn proxy_client(
client_stream: TcpStream,
server_addr: SocketAddr,
) -> Result<(), std::io::Error> {
let server_stream = TcpStream::connect(&server_addr).await?;
// Create separate read/write handles for the TCP clients that we're
// proxying data between.
//
// Note that while you can use `AsyncRead::split` for this operation,
// `TcpStream::split` gives you handles that are faster, smaller and allow
// proper shutdown operations.
let (client_r, client_w) = client_stream.split();
let (server_r, server_w) = server_stream.split();
let client_to_server = copy_shutdown(client_r, server_w);
let server_to_client = copy_shutdown(server_r, client_w);
// Run the two futures in parallel.
let (l1, l2) = try_join(client_to_server, server_to_client).await?;
println!("client wrote {} bytes and received {} bytes", l1, l2);
Ok(())
}
// Copy data from a read half to a write half. After the copy is done we
// indicate to the remote side that we've finished by shutting down the
// connection.
async fn copy_shutdown(
mut r: TcpStreamReadHalf,
mut w: TcpStreamWriteHalf,
) -> Result<u64, std::io::Error> {
let l = r.copy(&mut w).await?;
// Use this instead after `shutdown` is implemented in `AsyncWriteExt`:
// w.shutdown().await?;
w.as_ref().shutdown(std::net::Shutdown::Write)?;
Ok(l)
} |
This PR is really useful, thanks! |
What's your use case? I think in rust async/await model individual IO resources do not need to have any special support for cancellation. If you no longer poll the future, it is "cancelled". That said, with the current design, it is possible to shutdown the read half from the write half (or vice versa). |
Ah, sorry, what I meant about the |
Yes, that's correct. I think it's a common pattern in |
@sopium Yeah, I think you're right, I'd just like to see that argument made in the code to make it clear why we think unwrapping/expecting there is okay :) Other than that I'm 👍 on this PR! |
Sure, I added a comment roughly based on your comment. You explained it very well. |
Thanks 👍 I have some minor bikeshed level stuff, but nothing worth blocking merging 👍 I added notes to #1209 to track polish TODOs. |
A specialized split is implemented for
TcpStream
(close #174, related: #1108).Compared to the generic
AsyncRead::split
, the specialized split gives handles (halves) that are faster and smaller, because they do not use locks.The write half handle properly implements the shutdown operation, by shutting down the underlying stream in the "write" direction (close #852).
The handles also implement
AsRef<TcpStream>
, allowing access to methods of theTcpStream
that takes&self
: to get local and peer addresses, to get/set various socket options, to shutdown the socket, etc. (Close #852 even if write half shutdown does not call stream shutdown.)Future Work
Other IO resources should have similar specialized split support as well.