Skip to content

Commit

Permalink
sync: Add disarm to mpsc::Sender (#2358)
Browse files Browse the repository at this point in the history
Fixes #898.
  • Loading branch information
jonhoo authored Apr 2, 2020
1 parent fa4fe9e commit 7fb1698
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 1 deletion.
81 changes: 80 additions & 1 deletion tokio/src/sync/mpsc/bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,11 +196,90 @@ impl<T> Sender<T> {
Sender { chan }
}

#[doc(hidden)] // TODO: document
/// Returns `Poll::Ready(Ok(()))` when the channel is able to accept another item.
///
/// If the channel is full, then `Poll::Pending` is returned and the task is notified when a
/// slot becomes available.
///
/// Once `poll_ready` returns `Poll::Ready(Ok(()))`, a call to `try_send` will succeed unless
/// the channel has since been closed. To provide this guarantee, the channel reserves one slot
/// in the channel for the coming send. This reserved slot is not available to other `Sender`
/// instances, so you need to be careful to not end up with deadlocks by blocking after calling
/// `poll_ready` but before sending an element.
///
/// If, after `poll_ready` succeeds, you decide you do not wish to send an item after all, you
/// can use [`disarm`](Sender::disarm) to release the reserved slot.
///
/// Until an item is sent or [`disarm`](Sender::disarm) is called, repeated calls to
/// `poll_ready` will return either `Poll::Ready(Ok(()))` or `Poll::Ready(Err(_))` if channel
/// is closed.
pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ClosedError>> {
self.chan.poll_ready(cx).map_err(|_| ClosedError::new())
}

/// Undo a successful call to `poll_ready`.
///
/// Once a call to `poll_ready` returns `Poll::Ready(Ok(()))`, it holds up one slot in the
/// channel to make room for the coming send. `disarm` allows you to give up that slot if you
/// decide you do not wish to send an item after all. After calling `disarm`, you must call
/// `poll_ready` until it returns `Poll::Ready(Ok(()))` before attempting to send again.
///
/// Returns `false` if no slot is reserved for this sender (usually because `poll_ready` was
/// not previously called, or did not succeed).
///
/// # Motivation
///
/// Since `poll_ready` takes up one of the finite number of slots in a bounded channel, callers
/// need to send an item shortly after `poll_ready` succeeds. If they do not, idle senders may
/// take up all the slots of the channel, and prevent active senders from getting any requests
/// through. Consider this code that forwards from one channel to another:
///
/// ```rust,ignore
/// loop {
/// ready!(tx.poll_ready(cx))?;
/// if let Some(item) = ready!(rx.poll_recv(cx)) {
/// tx.try_send(item)?;
/// } else {
/// break;
/// }
/// }
/// ```
///
/// If many such forwarders exist, and they all forward into a single (cloned) `Sender`, then
/// any number of forwarders may be waiting for `rx.poll_recv` at the same time. While they do,
/// they are effectively each reducing the channel's capacity by 1. If enough of these
/// forwarders are idle, forwarders whose `rx` _do_ have elements will be unable to find a spot
/// for them through `poll_ready`, and the system will deadlock.
///
/// `disarm` solves this problem by allowing you to give up the reserved slot if you find that
/// you have to block. We can then fix the code above by writing:
///
/// ```rust,ignore
/// loop {
/// ready!(tx.poll_ready(cx))?;
/// let item = rx.poll_recv(cx);
/// if let Poll::Ready(Ok(_)) = item {
/// // we're going to send the item below, so don't disarm
/// } else {
/// // give up our send slot, we won't need it for a while
/// tx.disarm();
/// }
/// if let Some(item) = ready!(item) {
/// tx.try_send(item)?;
/// } else {
/// break;
/// }
/// }
/// ```
pub fn disarm(&mut self) -> bool {
if self.chan.is_ready() {
self.chan.disarm();
true
} else {
false
}
}

/// Attempts to immediately send a message on this `Sender`
///
/// This method differs from [`send`] by returning immediately if the channel's
Expand Down
11 changes: 11 additions & 0 deletions tokio/src/sync/mpsc/chan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,12 +190,23 @@ where
self.inner.semaphore.poll_acquire(cx, &mut self.permit)
}

pub(crate) fn disarm(&mut self) {
// TODO: should this error if not acquired?
self.inner.semaphore.drop_permit(&mut self.permit)
}

/// Send a message and notify the receiver.
pub(crate) fn try_send(&mut self, value: T) -> Result<(), (T, TrySendError)> {
self.inner.try_send(value, &mut self.permit)
}
}

impl<T> Tx<T, (crate::sync::semaphore_ll::Semaphore, usize)> {
pub(crate) fn is_ready(&self) -> bool {
self.permit.is_acquired()
}
}

impl<T> Tx<T, AtomicUsize> {
pub(crate) fn send_unbounded(&self, value: T) -> Result<(), (T, TrySendError)> {
self.inner.try_send(value, &mut ())
Expand Down
38 changes: 38 additions & 0 deletions tokio/tests/sync_mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,44 @@ fn send_recv_with_buffer() {
assert!(val.is_none());
}

#[test]
fn disarm() {
let (tx, rx) = mpsc::channel::<i32>(2);
let mut tx1 = task::spawn(tx.clone());
let mut tx2 = task::spawn(tx.clone());
let mut tx3 = task::spawn(tx.clone());
let mut tx4 = task::spawn(tx);
let mut rx = task::spawn(rx);

// We should be able to `poll_ready` two handles without problem
assert_ready_ok!(tx1.enter(|cx, mut tx| tx.poll_ready(cx)));
assert_ready_ok!(tx2.enter(|cx, mut tx| tx.poll_ready(cx)));

// But a third should not be ready
assert_pending!(tx3.enter(|cx, mut tx| tx.poll_ready(cx)));

// Using one of the reserved slots should allow a new handle to become ready
tx1.try_send(1).unwrap();
// We also need to receive for the slot to be free
let _ = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx))).unwrap();
// Now there's a free slot!
assert_ready_ok!(tx3.enter(|cx, mut tx| tx.poll_ready(cx)));
assert_pending!(tx4.enter(|cx, mut tx| tx.poll_ready(cx)));

// Dropping a ready handle should also open up a slot
drop(tx2);
assert_ready_ok!(tx4.enter(|cx, mut tx| tx.poll_ready(cx)));
assert_pending!(tx1.enter(|cx, mut tx| tx.poll_ready(cx)));

// Explicitly disarming a handle should also open a slot
assert!(tx3.disarm());
assert_ready_ok!(tx1.enter(|cx, mut tx| tx.poll_ready(cx)));

// Disarming a non-armed sender does not free up a slot
assert!(!tx3.disarm());
assert_pending!(tx3.enter(|cx, mut tx| tx.poll_ready(cx)));
}

#[tokio::test]
async fn send_recv_stream_with_buffer() {
use tokio::stream::StreamExt;
Expand Down

0 comments on commit 7fb1698

Please sign in to comment.