diff --git a/Cargo.toml b/Cargo.toml index 04c3d94cc1c..413a3023688 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,6 @@ members = [ # "tokio-tls", # "tokio-trace", # "tokio-trace/tokio-trace-core", - # "tokio-udp", + "tokio-udp", # "tokio-uds", ] diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 7214cc984b4..c39e8ca65f8 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -33,7 +33,7 @@ jobs: # - tokio-signal # - tokio-tcp # - tokio-tls -# - tokio-udp + - tokio-udp # - tokio-uds # Test crates that are NOT platform specific diff --git a/tokio-macros/src/lib.rs b/tokio-macros/src/lib.rs index 80c838e6188..3cf5d76a093 100644 --- a/tokio-macros/src/lib.rs +++ b/tokio-macros/src/lib.rs @@ -79,7 +79,7 @@ pub fn test(_attr: TokenStream, item: TokenStream) -> TokenStream { #(#attrs)* fn #name() #ret { let mut rt = tokio::runtime::current_thread::Runtime::new().unwrap(); - rt.block_on_async(async { #body }) + rt.block_on(async { #body }) } }; diff --git a/tokio-udp/Cargo.toml b/tokio-udp/Cargo.toml index 2414ca13e1d..d3dc66d8ac4 100644 --- a/tokio-udp/Cargo.toml +++ b/tokio-udp/Cargo.toml @@ -22,13 +22,13 @@ categories = ["asynchronous"] publish = false [dependencies] -tokio-codec = { version = "0.2.0", path = "../tokio-codec" } -tokio-io = { version = "0.2.0", path = "../tokio-io" } +# tokio-codec = { version = "0.2.0", path = "../tokio-codec" } +# tokio-io = { version = "0.2.0", path = "../tokio-io" } tokio-reactor = { version = "0.2.0", path = "../tokio-reactor" } -bytes = "0.4" +# bytes = "0.4" mio = "0.6.14" log = "0.4" -futures = "0.1.19" [dev-dependencies] env_logger = { version = "0.5", default-features = false } +tokio = { version = "0.2.0", path = "../tokio", default-features = false, features = ["rt-full"] } diff --git a/tokio-udp/src/lib.rs b/tokio-udp/src/lib.rs index 5202e6def7f..2d33ede14d9 100644 --- a/tokio-udp/src/lib.rs +++ b/tokio-udp/src/lib.rs @@ -10,23 +10,28 @@ //! //! The main struct for UDP is the [`UdpSocket`], which represents a UDP socket. //! Reading and writing to it can be done using futures, which return the -//! [`RecvDgram`] and [`SendDgram`] structs respectively. -//! -//! For convenience it's also possible to convert raw datagrams into higher-level -//! frames. -//! -//! [`UdpSocket`]: struct.UdpSocket.html -//! [`RecvDgram`]: struct.RecvDgram.html -//! [`SendDgram`]: struct.SendDgram.html -//! [`UdpFramed`]: struct.UdpFramed.html -//! [`framed`]: struct.UdpSocket.html#method.framed +//! [`Recv`], [`Send`], [`RecvFrom`] and [`SendTo`] structs respectively. -mod frame; -mod recv_dgram; -mod send_dgram; +macro_rules! ready { + ($e:expr) => { + match $e { + ::std::task::Poll::Ready(t) => t, + ::std::task::Poll::Pending => return ::std::task::Poll::Pending, + } + }; +} + +// mod frame; +mod recv; +mod recv_from; +mod send; +mod send_to; mod socket; -pub use self::frame::UdpFramed; -pub use self::recv_dgram::RecvDgram; -pub use self::send_dgram::SendDgram; +// pub use self::frame::UdpFramed; +pub use self::recv::Recv; +pub use self::recv_from::RecvFrom; +pub use self::send::Send; +pub use self::send_to::SendTo; + pub use self::socket::UdpSocket; diff --git a/tokio-udp/src/recv.rs b/tokio-udp/src/recv.rs new file mode 100644 index 00000000000..7e425823ce7 --- /dev/null +++ b/tokio-udp/src/recv.rs @@ -0,0 +1,30 @@ +use super::UdpSocket; +use std::future::Future; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// A future that receives a datagram from the connected address. +/// +/// This `struct` is created by [`recv`](super::UdpSocket::recv). +#[must_use = "futures do nothing unless polled"] +#[derive(Debug)] +pub struct Recv<'a, 'b> { + socket: &'a mut UdpSocket, + buf: &'b mut [u8], +} + +impl<'a, 'b> Recv<'a, 'b> { + pub(super) fn new(socket: &'a mut UdpSocket, buf: &'b mut [u8]) -> Self { + Self { socket, buf } + } +} + +impl<'a, 'b> Future for Recv<'a, 'b> { + type Output = io::Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let Recv { socket, buf } = self.get_mut(); + Pin::new(&mut **socket).poll_recv(cx, buf) + } +} diff --git a/tokio-udp/src/recv_dgram.rs b/tokio-udp/src/recv_dgram.rs deleted file mode 100644 index 962f13d3fd1..00000000000 --- a/tokio-udp/src/recv_dgram.rs +++ /dev/null @@ -1,103 +0,0 @@ -use super::socket::UdpSocket; -use futures::{try_ready, Async, Future, Poll}; -use std::io; -use std::net::SocketAddr; - -/// A future used to receive a datagram from a UDP socket. -/// -/// This is created by the `UdpSocket::recv_dgram` method. -#[must_use = "futures do nothing unless polled"] -#[derive(Debug)] -pub struct RecvDgram { - /// None means future was completed - state: Option>, -} - -/// A struct is used to represent the full info of RecvDgram. -#[derive(Debug)] -struct RecvDgramInner { - /// Rx socket - socket: UdpSocket, - /// The received data will be put in the buffer - buffer: T, -} - -/// Components of a `RecvDgram` future, returned from `into_parts`. -#[derive(Debug)] -pub struct Parts { - /// The socket - pub socket: UdpSocket, - /// The buffer - pub buffer: T, - - _priv: (), -} - -impl RecvDgram { - /// Create a new future to receive UDP Datagram - pub(crate) fn new(socket: UdpSocket, buffer: T) -> RecvDgram { - let inner = RecvDgramInner { - socket: socket, - buffer: buffer, - }; - RecvDgram { state: Some(inner) } - } - - /// Consume the `RecvDgram`, returning the socket and buffer. - /// - /// # Panics - /// - /// If called after the future has completed. - /// - /// # Examples - /// - /// ``` - /// use tokio_udp::UdpSocket; - /// - /// let socket = UdpSocket::bind(&([127, 0, 0, 1], 0).into()).unwrap(); - /// let mut buffer = vec![0; 4096]; - /// - /// let future = socket.recv_dgram(buffer); - /// - /// // ... polling `future` ... giving up (e.g. after timeout) - /// - /// let parts = future.into_parts(); - /// - /// let socket = parts.socket; // extract the socket - /// let buffer = parts.buffer; // extract the buffer - /// ``` - pub fn into_parts(mut self) -> Parts { - let state = self - .state - .take() - .expect("into_parts called after completion"); - - Parts { - socket: state.socket, - buffer: state.buffer, - _priv: (), - } - } -} - -impl Future for RecvDgram -where - T: AsMut<[u8]>, -{ - type Item = (UdpSocket, T, usize, SocketAddr); - type Error = io::Error; - - fn poll(&mut self) -> Poll { - let (n, addr) = { - let ref mut inner = self - .state - .as_mut() - .expect("RecvDgram polled after completion"); - - try_ready!(inner.socket.poll_recv_from(inner.buffer.as_mut())) - }; - - let inner = self.state.take().unwrap(); - Ok(Async::Ready((inner.socket, inner.buffer, n, addr))) - } -} diff --git a/tokio-udp/src/recv_from.rs b/tokio-udp/src/recv_from.rs new file mode 100644 index 00000000000..4d42fd102aa --- /dev/null +++ b/tokio-udp/src/recv_from.rs @@ -0,0 +1,31 @@ +use super::UdpSocket; +use std::future::Future; +use std::io; +use std::net::SocketAddr; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// A future that receives a datagram. +/// +/// This `struct` is created by [`recv_from`](super::UdpSocket::recv_from). +#[must_use = "futures do nothing unless polled"] +#[derive(Debug)] +pub struct RecvFrom<'a, 'b> { + socket: &'a mut UdpSocket, + buf: &'b mut [u8], +} + +impl<'a, 'b> RecvFrom<'a, 'b> { + pub(super) fn new(socket: &'a mut UdpSocket, buf: &'b mut [u8]) -> Self { + Self { socket, buf } + } +} + +impl<'a, 'b> Future for RecvFrom<'a, 'b> { + type Output = io::Result<(usize, SocketAddr)>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let RecvFrom { socket, buf } = self.get_mut(); + Pin::new(&mut **socket).poll_recv_from(cx, buf) + } +} diff --git a/tokio-udp/src/send.rs b/tokio-udp/src/send.rs new file mode 100644 index 00000000000..77557cd2cf3 --- /dev/null +++ b/tokio-udp/src/send.rs @@ -0,0 +1,30 @@ +use super::UdpSocket; +use std::future::Future; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// A future that sends a datagram to the connected address. +/// +/// This `struct` is created by [`send`](super::UdpSocket::send). +#[must_use = "futures do nothing unless polled"] +#[derive(Debug)] +pub struct Send<'a, 'b> { + socket: &'a mut UdpSocket, + buf: &'b [u8], +} + +impl<'a, 'b> Send<'a, 'b> { + pub(super) fn new(socket: &'a mut UdpSocket, buf: &'b [u8]) -> Self { + Self { socket, buf } + } +} + +impl<'a, 'b> Future for Send<'a, 'b> { + type Output = io::Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let Send { socket, buf } = self.get_mut(); + Pin::new(&mut **socket).poll_send(cx, buf) + } +} diff --git a/tokio-udp/src/send_dgram.rs b/tokio-udp/src/send_dgram.rs deleted file mode 100644 index eae8b896298..00000000000 --- a/tokio-udp/src/send_dgram.rs +++ /dev/null @@ -1,70 +0,0 @@ -use super::socket::UdpSocket; -use futures::{try_ready, Async, Future, Poll}; -use std::io; -use std::net::SocketAddr; - -/// A future used to write the entire contents of some data to a UDP socket. -/// -/// This is created by the `UdpSocket::send_dgram` method. -#[must_use = "futures do nothing unless polled"] -#[derive(Debug)] -pub struct SendDgram { - /// None means future was completed - state: Option>, -} - -/// A struct is used to represent the full info of SendDgram. -#[derive(Debug)] -struct SendDgramInner { - /// Tx socket - socket: UdpSocket, - /// The whole buffer will be sent - buffer: T, - /// Destination addr - addr: SocketAddr, -} - -impl SendDgram { - /// Create a new future to send UDP Datagram - pub(crate) fn new(socket: UdpSocket, buffer: T, addr: SocketAddr) -> SendDgram { - let inner = SendDgramInner { - socket: socket, - buffer: buffer, - addr: addr, - }; - SendDgram { state: Some(inner) } - } -} - -fn incomplete_write(reason: &str) -> io::Error { - io::Error::new(io::ErrorKind::Other, reason) -} - -impl Future for SendDgram -where - T: AsRef<[u8]>, -{ - type Item = (UdpSocket, T); - type Error = io::Error; - - fn poll(&mut self) -> Poll<(UdpSocket, T), io::Error> { - { - let ref mut inner = self - .state - .as_mut() - .expect("SendDgram polled after completion"); - let n = try_ready!(inner - .socket - .poll_send_to(inner.buffer.as_ref(), &inner.addr)); - if n != inner.buffer.as_ref().len() { - return Err(incomplete_write( - "failed to send entire message \ - in datagram", - )); - } - } - - let inner = self.state.take().unwrap(); - Ok(Async::Ready((inner.socket, inner.buffer))) - } -} diff --git a/tokio-udp/src/send_to.rs b/tokio-udp/src/send_to.rs new file mode 100644 index 00000000000..96d79f569b8 --- /dev/null +++ b/tokio-udp/src/send_to.rs @@ -0,0 +1,40 @@ +use super::UdpSocket; +use std::future::Future; +use std::io; +use std::net::SocketAddr; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// A future that sends a datagram to a given address. +/// +/// This `struct` is created by [`send_to`](super::UdpSocket::send_to). +#[must_use = "futures do nothing unless polled"] +#[derive(Debug)] +pub struct SendTo<'a, 'b> { + socket: &'a mut UdpSocket, + buf: &'b [u8], + target: &'b SocketAddr, +} + +impl<'a, 'b> SendTo<'a, 'b> { + pub(super) fn new(socket: &'a mut UdpSocket, buf: &'b [u8], target: &'b SocketAddr) -> Self { + Self { + socket, + buf, + target, + } + } +} + +impl<'a, 'b> Future for SendTo<'a, 'b> { + type Output = io::Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let SendTo { + socket, + buf, + target, + } = self.get_mut(); + Pin::new(&mut **socket).poll_send_to(cx, buf, target) + } +} diff --git a/tokio-udp/src/socket.rs b/tokio-udp/src/socket.rs index 765018da437..1bc5bbea3ec 100644 --- a/tokio-udp/src/socket.rs +++ b/tokio-udp/src/socket.rs @@ -1,9 +1,10 @@ -use super::{RecvDgram, SendDgram}; -use futures::{try_ready, Async, Poll}; +use super::{Recv, RecvFrom, Send, SendTo}; use mio; use std::fmt; use std::io; use std::net::{self, Ipv4Addr, Ipv6Addr, SocketAddr}; +use std::pin::Pin; +use std::task::{Context, Poll}; use tokio_reactor::{Handle, PollEvented}; /// An I/O object representing a UDP socket. @@ -52,13 +53,15 @@ impl UdpSocket { self.io.get_ref().connect(*addr) } - #[deprecated(since = "0.1.2", note = "use poll_send instead")] - #[doc(hidden)] - pub fn send(&mut self, buf: &[u8]) -> io::Result { - match self.poll_send(buf)? { - Async::Ready(n) => Ok(n), - Async::NotReady => Err(io::ErrorKind::WouldBlock.into()), - } + /// Returns a future that sends data on the socket to the remote address to which it is connected. + /// On success, the future will resolve to the number of bytes written. + /// + /// The [`connect`] method will connect this socket to a remote address. The future + /// will resolve to an error if the socket is not connected. + /// + /// [`connect`]: #method.connect + pub fn send<'a, 'b>(&'a mut self, buf: &'b [u8]) -> Send<'a, 'b> { + Send::new(self, buf) } /// Sends data on the socket to the remote address to which it is connected. @@ -70,35 +73,41 @@ impl UdpSocket { /// /// # Return /// - /// On success, returns `Ok(Async::Ready(num_bytes_written))`. + /// On success, returns `Poll::Ready(Ok(num_bytes_written))`. /// /// If the socket is not ready for writing, the method returns - /// `Ok(Async::NotReady)` and arranges for the current task to receive a + /// `Poll::Pending` and arranges for the current task to receive a /// notification when the socket becomes writable. - /// - /// # Panics - /// - /// This function will panic if called from outside of a task context. - pub fn poll_send(&mut self, buf: &[u8]) -> Poll { - try_ready!(self.io.poll_write_ready()); + pub fn poll_send( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + ready!(self.io.poll_write_ready(cx))?; match self.io.get_ref().send(buf) { - Ok(n) => Ok(n.into()), Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_write_ready()?; - Ok(Async::NotReady) + self.io.clear_write_ready(cx)?; + Poll::Pending } - Err(e) => Err(e), + x => Poll::Ready(x), } } - #[deprecated(since = "0.1.2", note = "use poll_recv instead")] - #[doc(hidden)] - pub fn recv(&mut self, buf: &mut [u8]) -> io::Result { - match self.poll_recv(buf)? { - Async::Ready(n) => Ok(n), - Async::NotReady => Err(io::ErrorKind::WouldBlock.into()), - } + /// Returns a future that receives a single datagram message on the socket from + /// the remote address to which it is connected. On success, the future will resolve + /// to the number of bytes read. + /// + /// The function must be called with valid byte array `buf` of sufficient size to + /// hold the message bytes. If a message is too long to fit in the supplied buffer, + /// excess bytes may be discarded. + /// + /// The [`connect`] method will connect this socket to a remote address. The future + /// will fail if the socket is not connected. + /// + /// [`connect`]: #method.connect + pub fn recv<'a, 'b>(&'a mut self, buf: &'b mut [u8]) -> Recv<'a, 'b> { + Recv::new(self, buf) } /// Receives a single datagram message on the socket from the remote address to @@ -115,35 +124,34 @@ impl UdpSocket { /// /// # Return /// - /// On success, returns `Ok(Async::Ready(num_bytes_read))`. + /// On success, returns `Poll::Ready(Ok(num_bytes_read))`. /// /// If no data is available for reading, the method returns - /// `Ok(Async::NotReady)` and arranges for the current task to receive a + /// `Poll::Pending` and arranges for the current task to receive a /// notification when the socket becomes receivable or is closed. - /// - /// # Panics - /// - /// This function will panic if called from outside of a task context. - pub fn poll_recv(&mut self, buf: &mut [u8]) -> Poll { - try_ready!(self.io.poll_read_ready(mio::Ready::readable())); + pub fn poll_recv( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; match self.io.get_ref().recv(buf) { - Ok(n) => Ok(n.into()), Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_read_ready(mio::Ready::readable())?; - Ok(Async::NotReady) + self.io.clear_read_ready(cx, mio::Ready::readable())?; + Poll::Pending } - Err(e) => Err(e), + x => Poll::Ready(x), } } - #[deprecated(since = "0.1.2", note = "use poll_send_to instead")] - #[doc(hidden)] - pub fn send_to(&mut self, buf: &[u8], target: &SocketAddr) -> io::Result { - match self.poll_send_to(buf, target)? { - Async::Ready(n) => Ok(n), - Async::NotReady => Err(io::ErrorKind::WouldBlock.into()), - } + /// Returns a future that sends data on the socket to the given address. + /// On success, the future will resolve to the number of bytes written. + /// + /// The future will resolve to an error if the IP version of the socket does + /// not match that of `target`. + pub fn send_to<'a, 'b>(&'a mut self, buf: &'b [u8], target: &'b SocketAddr) -> SendTo<'a, 'b> { + SendTo::new(self, buf, target) } /// Sends data on the socket to the given address. On success, returns the @@ -154,133 +162,90 @@ impl UdpSocket { /// /// # Return /// - /// On success, returns `Ok(Async::Ready(num_bytes_written))`. + /// On success, returns `Poll::Ready(Ok(num_bytes_written))`. /// /// If the socket is not ready for writing, the method returns - /// `Ok(Async::NotReady)` and arranges for the current task to receive a + /// `Poll::Pending` and arranges for the current task to receive a /// notification when the socket becomes writable. - /// - /// # Panics - /// - /// This function will panic if called from outside of a task context. - pub fn poll_send_to(&mut self, buf: &[u8], target: &SocketAddr) -> Poll { - try_ready!(self.io.poll_write_ready()); + pub fn poll_send_to( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + target: &SocketAddr, + ) -> Poll> { + ready!(self.io.poll_write_ready(cx))?; match self.io.get_ref().send_to(buf, target) { - Ok(n) => Ok(n.into()), Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_write_ready()?; - Ok(Async::NotReady) + self.io.clear_write_ready(cx)?; + Poll::Pending } - Err(e) => Err(e), + x => Poll::Ready(x), } } - /// Creates a future that will write the entire contents of the buffer - /// `buf` provided as a datagram to this socket. - /// - /// The returned future will return after data has been written to the - /// outbound socket. The future will resolve to the stream as well as the - /// buffer (for reuse if needed). - /// - /// Any error which happens during writing will cause both the stream and - /// the buffer to get destroyed. Note that failure to write the entire - /// buffer is considered an error for the purposes of sending a datagram. - /// - /// The `buf` parameter here only requires the `AsRef<[u8]>` trait, which - /// should be broadly applicable to accepting data which can be converted - /// to a slice. - pub fn send_dgram(self, buf: T, addr: &SocketAddr) -> SendDgram - where - T: AsRef<[u8]>, - { - SendDgram::new(self, buf, *addr) - } - - #[deprecated(since = "0.1.2", note = "use poll_recv_from instead")] - #[doc(hidden)] - pub fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { - match self.poll_recv_from(buf)? { - Async::Ready(ret) => Ok(ret), - Async::NotReady => Err(io::ErrorKind::WouldBlock.into()), - } + /// Returns a future that receives a single datagram on the socket. On success, + /// the future resolves to the number of bytes read and the origin. + /// + /// The function must be called with valid byte array `buf` of sufficient size + /// to hold the message bytes. If a message is too long to fit in the supplied + /// buffer, excess bytes may be discarded. + pub fn recv_from<'a, 'b>(&'a mut self, buf: &'b mut [u8]) -> RecvFrom<'a, 'b> { + RecvFrom::new(self, buf) } /// Receives data from the socket. On success, returns the number of bytes /// read and the address from whence the data came. - /// - /// # Panics - /// - /// This function will panic if called outside the context of a future's - /// task. - pub fn poll_recv_from(&mut self, buf: &mut [u8]) -> Poll<(usize, SocketAddr), io::Error> { - try_ready!(self.io.poll_read_ready(mio::Ready::readable())); + pub fn poll_recv_from( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; match self.io.get_ref().recv_from(buf) { - Ok(n) => Ok(n.into()), Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_read_ready(mio::Ready::readable())?; - Ok(Async::NotReady) + self.io.clear_read_ready(cx, mio::Ready::readable())?; + Poll::Pending } - Err(e) => Err(e), + x => Poll::Ready(x), } } - /// Creates a future that receive a datagram to be written to the buffer - /// provided. - /// - /// The returned future will return after a datagram has been received on - /// this socket. The future will resolve to the socket, the buffer, the - /// amount of data read, and the address the data was received from. - /// - /// An error during reading will cause the socket and buffer to get - /// destroyed. - /// - /// The `buf` parameter here only requires the `AsMut<[u8]>` trait, which - /// should be broadly applicable to accepting data which can be converted - /// to a slice. - pub fn recv_dgram(self, buf: T) -> RecvDgram - where - T: AsMut<[u8]>, - { - RecvDgram::new(self, buf) - } - /// Check the UDP socket's read readiness state. /// /// The mask argument allows specifying what readiness to notify on. This /// can be any value, including platform specific readiness, **except** /// `writable`. /// - /// If the socket is not ready for receiving then `Async::NotReady` is + /// If the socket is not ready for receiving then `Poll::Pending` is /// returned and the current task is notified once a new event is received. /// /// The socket will remain in a read-ready state until calls to `poll_recv` - /// return `NotReady`. + /// return `Poll::Pending`. /// /// # Panics /// /// This function panics if: /// /// * `ready` includes writable. - /// * called from outside of a task context. - pub fn poll_read_ready(&self, mask: mio::Ready) -> Poll { - self.io.poll_read_ready(mask) + pub fn poll_read_ready( + &self, + cx: &mut Context<'_>, + mask: mio::Ready, + ) -> Poll> { + self.io.poll_read_ready(cx, mask) } /// Check the UDP socket's write readiness state. /// - /// If the socket is not ready for sending then `Async::NotReady` is + /// If the socket is not ready for sending then `Poll::Pending` is /// returned and the current task is notified once a new event is received. /// /// The I/O resource will remain in a write-ready state until calls to - /// `poll_send` return `NotReady`. - /// - /// # Panics - /// - /// This function panics if called from outside of a task context. - pub fn poll_write_ready(&self) -> Poll { - self.io.poll_write_ready() + /// `poll_send` return `Poll::Pending`. + pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll> { + self.io.poll_write_ready(cx) } /// Gets the value of the `SO_BROADCAST` option for this socket. diff --git a/tokio-udp/tests/udp.rs b/tokio-udp/tests/udp.rs index ed25a4d1cf4..1739c488d30 100644 --- a/tokio-udp/tests/udp.rs +++ b/tokio-udp/tests/udp.rs @@ -1,288 +1,105 @@ +#![feature(async_await)] #![deny(warnings, rust_2018_idioms)] -use bytes::{BufMut, BytesMut}; -use env_logger; -use futures::{Future, Poll, Sink, Stream}; -use std::io; -use std::net::SocketAddr; -use tokio_codec::{Decoder, Encoder}; -use tokio_io::try_nb; -use tokio_udp::{UdpFramed, UdpSocket}; +use tokio_udp::UdpSocket; -macro_rules! t { - ($e:expr) => { - match $e { - Ok(e) => e, - Err(e) => panic!("{} failed with {:?}", stringify!($e), e), - } - }; -} - -fn send_messages(send: S, recv: R) { - let mut a = t!(UdpSocket::bind(&([127, 0, 0, 1], 0).into())); - let mut b = t!(UdpSocket::bind(&([127, 0, 0, 1], 0).into())); - let a_addr = t!(a.local_addr()); - let b_addr = t!(b.local_addr()); - - { - let send = SendMessage::new(a, send.clone(), b_addr, b"1234"); - let recv = RecvMessage::new(b, recv.clone(), a_addr, b"1234"); - let (sendt, received) = t!(send.join(recv).wait()); - a = sendt; - b = received; - } - - { - let send = SendMessage::new(a, send, b_addr, b""); - let recv = RecvMessage::new(b, recv, a_addr, b""); - t!(send.join(recv).wait()); - } -} - -#[test] -fn send_to_and_recv_from() { - send_messages(SendTo {}, RecvFrom {}); -} - -#[test] -fn send_and_recv() { - send_messages(Send {}, Recv {}); -} - -trait SendFn { - fn send(&self, _: &mut UdpSocket, _: &[u8], _: &SocketAddr) -> Result; -} - -#[derive(Debug, Clone)] -struct SendTo {} - -impl SendFn for SendTo { - fn send( - &self, - socket: &mut UdpSocket, - buf: &[u8], - addr: &SocketAddr, - ) -> Result { - #[allow(deprecated)] - socket.send_to(buf, addr) - } -} - -#[derive(Debug, Clone)] -struct Send {} - -impl SendFn for Send { - fn send( - &self, - socket: &mut UdpSocket, - buf: &[u8], - addr: &SocketAddr, - ) -> Result { - socket.connect(addr).expect("could not connect"); - #[allow(deprecated)] - socket.send(buf) - } -} - -struct SendMessage { - socket: Option, - send: S, - addr: SocketAddr, - data: &'static [u8], -} - -impl SendMessage { - fn new(socket: UdpSocket, send: S, addr: SocketAddr, data: &'static [u8]) -> SendMessage { - SendMessage { - socket: Some(socket), - send: send, - addr: addr, - data: data, - } - } -} - -impl Future for SendMessage { - type Item = UdpSocket; - type Error = io::Error; - - fn poll(&mut self) -> Poll { - let n = try_nb!(self - .send - .send(self.socket.as_mut().unwrap(), &self.data[..], &self.addr)); - - assert_eq!(n, self.data.len()); - - Ok(self.socket.take().unwrap().into()) - } -} - -trait RecvFn { - fn recv(&self, _: &mut UdpSocket, _: &mut [u8], _: &SocketAddr) -> Result; -} - -#[derive(Debug, Clone)] -struct RecvFrom {} - -impl RecvFn for RecvFrom { - fn recv( - &self, - socket: &mut UdpSocket, - buf: &mut [u8], - expected_addr: &SocketAddr, - ) -> Result { - #[allow(deprecated)] - socket.recv_from(buf).map(|(s, addr)| { - assert_eq!(addr, *expected_addr); - s - }) - } -} +#[tokio::test] +async fn send_recv() -> std::io::Result<()> { + let mut sender = UdpSocket::bind(&"127.0.0.1:0".parse().unwrap())?; + let mut receiver = UdpSocket::bind(&"127.0.0.1:0".parse().unwrap())?; -#[derive(Debug, Clone)] -struct Recv {} + sender.connect(&receiver.local_addr()?)?; + receiver.connect(&sender.local_addr()?)?; -impl RecvFn for Recv { - fn recv( - &self, - socket: &mut UdpSocket, - buf: &mut [u8], - _: &SocketAddr, - ) -> Result { - #[allow(deprecated)] - socket.recv(buf) - } -} + let message = b"hello!"; + sender.send(message).await?; -struct RecvMessage { - socket: Option, - recv: R, - expected_addr: SocketAddr, - expected_data: &'static [u8], -} + let mut recv_buf = [0u8; 32]; + let len = receiver.recv(&mut recv_buf[..]).await?; -impl RecvMessage { - fn new( - socket: UdpSocket, - recv: R, - expected_addr: SocketAddr, - expected_data: &'static [u8], - ) -> RecvMessage { - RecvMessage { - socket: Some(socket), - recv: recv, - expected_addr: expected_addr, - expected_data: expected_data, - } - } + assert_eq!(&recv_buf[..len], message); + Ok(()) } -impl Future for RecvMessage { - type Item = UdpSocket; - type Error = io::Error; - - fn poll(&mut self) -> Poll { - let mut buf = vec![0u8; 10 + self.expected_data.len() * 10]; - let n = try_nb!(self.recv.recv( - &mut self.socket.as_mut().unwrap(), - &mut buf[..], - &self.expected_addr - )); - - assert_eq!(n, self.expected_data.len()); - assert_eq!(&buf[..self.expected_data.len()], &self.expected_data[..]); +#[tokio::test] +async fn send_to_recv_from() -> std::io::Result<()> { + let mut sender = UdpSocket::bind(&"127.0.0.1:0".parse().unwrap())?; + let mut receiver = UdpSocket::bind(&"127.0.0.1:0".parse().unwrap())?; - Ok(self.socket.take().unwrap().into()) - } -} - -#[test] -fn send_dgrams() { - let mut a = t!(UdpSocket::bind(&t!("127.0.0.1:0".parse()))); - let mut b = t!(UdpSocket::bind(&t!("127.0.0.1:0".parse()))); - let mut buf = [0u8; 50]; - let b_addr = t!(b.local_addr()); + let message = b"hello!"; + let receiver_addr = receiver.local_addr()?; + sender.send_to(message, &receiver_addr).await?; - { - let send = a.send_dgram(&b"4321"[..], &b_addr); - let recv = b.recv_dgram(&mut buf[..]); - let (sendt, received) = t!(send.join(recv).wait()); - assert_eq!(received.2, 4); - assert_eq!(&received.1[..4], b"4321"); - a = sendt.0; - b = received.0; - } + let mut recv_buf = [0u8; 32]; + let (len, addr) = receiver.recv_from(&mut recv_buf[..]).await?; - { - let send = a.send_dgram(&b""[..], &b_addr); - let recv = b.recv_dgram(&mut buf[..]); - let received = t!(send.join(recv).wait()).1; - assert_eq!(received.2, 0); - } + assert_eq!(&recv_buf[..len], message); + assert_eq!(addr, sender.local_addr()?); + Ok(()) } -pub struct ByteCodec; +// pub struct ByteCodec; -impl Decoder for ByteCodec { - type Item = Vec; - type Error = io::Error; +// impl Decoder for ByteCodec { +// type Item = Vec; +// type Error = io::Error; - fn decode(&mut self, buf: &mut BytesMut) -> Result>, io::Error> { - let len = buf.len(); - Ok(Some(buf.split_to(len).to_vec())) - } -} +// fn decode(&mut self, buf: &mut BytesMut) -> Result>, io::Error> { +// let len = buf.len(); +// Ok(Some(buf.split_to(len).to_vec())) +// } +// } -impl Encoder for ByteCodec { - type Item = Vec; - type Error = io::Error; +// impl Encoder for ByteCodec { +// type Item = Vec; +// type Error = io::Error; - fn encode(&mut self, data: Vec, buf: &mut BytesMut) -> Result<(), io::Error> { - buf.reserve(data.len()); - buf.put(data); - Ok(()) - } -} +// fn encode(&mut self, data: Vec, buf: &mut BytesMut) -> Result<(), io::Error> { +// buf.reserve(data.len()); +// buf.put(data); +// Ok(()) +// } +// } -#[test] -fn send_framed() { - drop(env_logger::try_init()); +// #[test] +// fn send_framed() { +// drop(env_logger::try_init()); - let mut a_soc = t!(UdpSocket::bind(&t!("127.0.0.1:0".parse()))); - let mut b_soc = t!(UdpSocket::bind(&t!("127.0.0.1:0".parse()))); - let a_addr = t!(a_soc.local_addr()); - let b_addr = t!(b_soc.local_addr()); +// let mut a_soc = t!(UdpSocket::bind(&t!("127.0.0.1:0".parse()))); +// let mut b_soc = t!(UdpSocket::bind(&t!("127.0.0.1:0".parse()))); +// let a_addr = t!(a_soc.local_addr()); +// let b_addr = t!(b_soc.local_addr()); - { - let a = UdpFramed::new(a_soc, ByteCodec); - let b = UdpFramed::new(b_soc, ByteCodec); +// { +// let a = UdpFramed::new(a_soc, ByteCodec); +// let b = UdpFramed::new(b_soc, ByteCodec); - let msg = b"4567".to_vec(); +// let msg = b"4567".to_vec(); - let send = a.send((msg.clone(), b_addr)); - let recv = b.into_future().map_err(|e| e.0); - let (sendt, received) = t!(send.join(recv).wait()); +// let send = a.send((msg.clone(), b_addr)); +// let recv = b.into_future().map_err(|e| e.0); +// let (sendt, received) = t!(send.join(recv).wait()); - let (data, addr) = received.0.unwrap(); - assert_eq!(msg, data); - assert_eq!(a_addr, addr); +// let (data, addr) = received.0.unwrap(); +// assert_eq!(msg, data); +// assert_eq!(a_addr, addr); - a_soc = sendt.into_inner(); - b_soc = received.1.into_inner(); - } +// a_soc = sendt.into_inner(); +// b_soc = received.1.into_inner(); +// } - { - let a = UdpFramed::new(a_soc, ByteCodec); - let b = UdpFramed::new(b_soc, ByteCodec); +// { +// let a = UdpFramed::new(a_soc, ByteCodec); +// let b = UdpFramed::new(b_soc, ByteCodec); - let msg = b"".to_vec(); +// let msg = b"".to_vec(); - let send = a.send((msg.clone(), b_addr)); - let recv = b.into_future().map_err(|e| e.0); - let received = t!(send.join(recv).wait()).1; +// let send = a.send((msg.clone(), b_addr)); +// let recv = b.into_future().map_err(|e| e.0); +// let received = t!(send.join(recv).wait()).1; - let (data, addr) = received.0.unwrap(); - assert_eq!(msg, data); - assert_eq!(a_addr, addr); - } -} +// let (data, addr) = received.0.unwrap(); +// assert_eq!(msg, data); +// assert_eq!(a_addr, addr); +// } +// } diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 23ad5faf1ce..4f5a949fa86 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -34,7 +34,7 @@ default = [ "sync", "tcp", # "timer", -# "udp", + "udp", # "uds", ] @@ -55,7 +55,7 @@ rt-full = [ sync = ["tokio-sync"] tcp = ["tokio-tcp"] #timer = ["tokio-timer"] -#udp = ["tokio-udp"] +udp = ["tokio-udp"] #uds = ["tokio-uds"] [dependencies] @@ -75,7 +75,7 @@ tokio-reactor = { version = "0.2.0", optional = true, path = "../tokio-reactor" tokio-sync = { version = "0.2.0", optional = true, path = "../tokio-sync" } #tokio-threadpool = { version = "0.2.0", optional = true, path = "../tokio-threadpool" } tokio-tcp = { version = "0.2.0", optional = true, path = "../tokio-tcp" } -#tokio-udp = { version = "0.2.0", optional = true, path = "../tokio-udp" } +tokio-udp = { version = "0.2.0", optional = true, path = "../tokio-udp" } #tokio-timer = { version = "0.3.0", optional = true, path = "../tokio-timer" } #tokio-trace-core = { version = "0.2", optional = true } diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index b96c2580ad0..89f051334f1 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -102,4 +102,5 @@ if_runtime! { #[cfg(not(test))] // Work around for rust-lang/rust#62127 pub use tokio_macros::main; + pub use tokio_macros::test; } diff --git a/tokio/src/net.rs b/tokio/src/net.rs index 6e7efb37e43..872ec22ad9f 100644 --- a/tokio/src/net.rs +++ b/tokio/src/net.rs @@ -6,7 +6,7 @@ //! # Organization //! //! * [`TcpListener`] and [`TcpStream`] provide functionality for communication over TCP -//! * [`UdpSocket`] and [`UdpFramed`] provide functionality for communication over UDP +//! * [`UdpSocket`] provides functionality for communication over UDP //! * [`UnixListener`] and [`UnixStream`] provide functionality for communication over a //! Unix Domain Stream Socket **(available on Unix only)** //! * [`UnixDatagram`] and [`UnixDatagramFramed`] provide functionality for communication @@ -16,7 +16,6 @@ //! [`TcpListener`]: struct.TcpListener.html //! [`TcpStream`]: struct.TcpStream.html //! [`UdpSocket`]: struct.UdpSocket.html -//! [`UdpFramed`]: struct.UdpFramed.html //! [`UnixListener`]: struct.UnixListener.html //! [`UnixStream`]: struct.UnixStream.html //! [`UnixDatagram`]: struct.UnixDatagram.html @@ -52,20 +51,17 @@ pub mod udp { //! //! The main struct for UDP is the [`UdpSocket`], which represents a UDP socket. //! Reading and writing to it can be done using futures, which return the - //! [`RecvDgram`] and [`SendDgram`] structs respectively. - //! - //! For convenience it's also possible to convert raw datagrams into higher-level - //! frames. + //! [`Recv`], [`Send`], [`RecvFrom`], [`SendTo`] structs respectively. //! //! [`UdpSocket`]: struct.UdpSocket.html - //! [`RecvDgram`]: struct.RecvDgram.html - //! [`SendDgram`]: struct.SendDgram.html - //! [`UdpFramed`]: struct.UdpFramed.html - //! [`framed`]: struct.UdpSocket.html#method.framed - pub use tokio_udp::{RecvDgram, SendDgram, UdpFramed, UdpSocket}; + //! [`Recv`]: struct.Recv.html + //! [`Send`]: struct.Send.html + //! [`RecvFrom`]: struct.RecvFrom.html + //! [`SendTo`]: struct.SendTo.html + pub use tokio_udp::{UdpSocket, Recv, Send, RecvFrom, SendTo}; } #[cfg(feature = "udp")] -pub use self::udp::{UdpFramed, UdpSocket}; +pub use self::udp::UdpSocket; #[cfg(all(unix, feature = "uds"))] pub mod unix {