Skip to content

Commit

Permalink
cargo fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
Cassy343 committed Jul 14, 2022
1 parent 92383f5 commit 4985ca5
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 113 deletions.
235 changes: 130 additions & 105 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,17 @@ use core::{
};

#[cfg(not(loom))]
use core::{cell::UnsafeCell, hint, sync::atomic::{fence, AtomicU8, Ordering::*}};
use core::{
cell::UnsafeCell,
hint,
sync::atomic::{fence, AtomicU8, Ordering::*},
};
#[cfg(loom)]
use loom::{cell::UnsafeCell, hint, sync::atomic::{fence, AtomicU8, Ordering::*}};
use loom::{
cell::UnsafeCell,
hint,
sync::atomic::{fence, AtomicU8, Ordering::*},
};

#[cfg(feature = "async")]
use core::{
Expand Down Expand Up @@ -270,7 +278,7 @@ impl<T> Sender<T> {
}

// Set the state to signal there is a message on the channel.
// ORDERING: we use release ordering to ensure the write of the message is visible to the
// ORDERING: we use release ordering to ensure the write of the message is visible to the
// receiving thread. The EMPTY and DISCONNECTED branches do not observe any shared state,
// and thus we do not need acquire orderng. The RECEIVING branch manages synchronization
// independent of this operation.
Expand All @@ -295,7 +303,7 @@ impl<T> Sender<T> {
// does not access the waker while in this state, nor does it free the channel
// allocation in this state.
let waker = unsafe { channel.take_waker() };

// ORDERING: this ordering serves two-fold: it synchronizes with the acquire load
// the state in the receiving thread, ensuring that both our read of the waker and
// write of the message happen-before the taking of the message and freeing of the
Expand All @@ -309,7 +317,7 @@ impl<T> Sender<T> {
// waker is wrapped in MaybeUninit. Therefore this data is valid regardless of
// whether or not the receive has completed by this point.
waker.unpark();

Ok(())
}
// The receiver was already dropped. The error is responsible for freeing the channel.
Expand Down Expand Up @@ -349,15 +357,15 @@ impl<T> Drop for Sender<T> {
// See comments in Sender::send

fence(Acquire);

let waker = unsafe { channel.take_waker() };

// We still need release ordering here to make sure our read of the waker happens
// before this.
channel.state.store(DISCONNECTED, Release);

waker.unpark();
},
}
// The receiver was already dropped. We are responsible for freeing the channel.
DISCONNECTED => {
// SAFETY: when the receiver switches the state to DISCONNECTED they have received
Expand Down Expand Up @@ -405,7 +413,7 @@ impl<T> Receiver<T> {
DISCONNECTED => Err(TryRecvError::Disconnected),
#[cfg(feature = "async")]
RECEIVING => Err(TryRecvError::Empty),
_ => unreachable!()
_ => unreachable!(),
}
}

Expand Down Expand Up @@ -434,7 +442,7 @@ impl<T> Receiver<T> {
// of the channel are inactive and cleaned up.

let channel_ptr = self.channel_ptr;

// Don't run our Drop implementation if we are receiving consuming ourselves.
mem::forget(self);

Expand Down Expand Up @@ -577,30 +585,33 @@ impl<T> Receiver<T> {
/// Panics if called after this receiver has been polled asynchronously.
#[cfg(feature = "std")]
pub fn recv_ref(&self) -> Result<T, RecvError> {
self.start_recv_ref(|| RecvError, |channel| {
loop {
thread::park();

// ORDERING: we use acquire ordering to synchronize with the write of the message
match channel.state.load(Acquire) {
// The sender sent the message while we were parked.
// We take the message and mark the channel disconnected.
MESSAGE => {
// ORDERING: the sender is inactive at this point so we don't need to make
// and reads or writes visible to the sending thread
channel.state.store(DISCONNECTED, Relaxed);

// SAFETY: we were just in the message state so the message is valid
break Ok(unsafe { channel.take_message() })
},
// The sender was dropped while we were parked.
DISCONNECTED => break Err(RecvError),
// State did not change, spurious wakeup, park again.
RECEIVING | UNPARKING => (),
_ => unreachable!(),
self.start_recv_ref(
|| RecvError,
|channel| {
loop {
thread::park();

// ORDERING: we use acquire ordering to synchronize with the write of the message
match channel.state.load(Acquire) {
// The sender sent the message while we were parked.
// We take the message and mark the channel disconnected.
MESSAGE => {
// ORDERING: the sender is inactive at this point so we don't need to make
// and reads or writes visible to the sending thread
channel.state.store(DISCONNECTED, Relaxed);

// SAFETY: we were just in the message state so the message is valid
break Ok(unsafe { channel.take_message() });
}
// The sender was dropped while we were parked.
DISCONNECTED => break Err(RecvError),
// State did not change, spurious wakeup, park again.
RECEIVING | UNPARKING => (),
_ => unreachable!(),
}
}
}
})
},
)
}

/// Like [`Receiver::recv`], but will not block longer than `timeout`. Returns:
Expand Down Expand Up @@ -652,93 +663,96 @@ impl<T> Receiver<T> {

channel.state.store(DISCONNECTED, Relaxed);
break Ok(unsafe { channel.take_message() });
},
}
DISCONNECTED => {
break Err(RecvTimeoutError::Disconnected);
},
}
// We continue on the empty state here since the current implementation eagerly
// sets the state to EMPTY upon timeout.
EMPTY => (),
_ => unreachable!()
_ => unreachable!(),
}
}
}

self.start_recv_ref(|| RecvTimeoutError::Disconnected, |channel| {
loop {
match deadline.checked_duration_since(Instant::now()) {
Some(timeout) => {
thread::park_timeout(timeout);

// ORDERING: synchronize with the write of the message
match channel.state.load(Acquire) {
// The sender sent the message while we were parked.
MESSAGE => {
// ORDERING: the sender has been `mem::forget`-ed so this update
// only needs to be visible to us.
channel.state.store(DISCONNECTED, Relaxed);

// SAFETY: we either are in the message state or were just in the
// message state
break Ok(unsafe { channel.take_message() });
self.start_recv_ref(
|| RecvTimeoutError::Disconnected,
|channel| {
loop {
match deadline.checked_duration_since(Instant::now()) {
Some(timeout) => {
thread::park_timeout(timeout);

// ORDERING: synchronize with the write of the message
match channel.state.load(Acquire) {
// The sender sent the message while we were parked.
MESSAGE => {
// ORDERING: the sender has been `mem::forget`-ed so this update
// only needs to be visible to us.
channel.state.store(DISCONNECTED, Relaxed);

// SAFETY: we either are in the message state or were just in the
// message state
break Ok(unsafe { channel.take_message() });
}
// The sender was dropped while we were parked.
DISCONNECTED => break Err(RecvTimeoutError::Disconnected),
// State did not change, spurious wakeup, park again.
RECEIVING | UNPARKING => (),
_ => unreachable!(),
}
// The sender was dropped while we were parked.
DISCONNECTED => break Err(RecvTimeoutError::Disconnected),
// State did not change, spurious wakeup, park again.
RECEIVING | UNPARKING => (),
_ => unreachable!(),
}
},
None => {
// ORDERING: synchronize with the write of the message
match channel.state.swap(EMPTY, Acquire) {
// We reached the end of the timeout without receiving a message
RECEIVING => {
// SAFETY: we were in th receiving state and are now in the empty
// state, so the sender has not and will not try to read the waker,
// so we have exclusive access to drop it.
unsafe { channel.drop_waker() };

break Err(RecvTimeoutError::Timeout);
None => {
// ORDERING: synchronize with the write of the message
match channel.state.swap(EMPTY, Acquire) {
// We reached the end of the timeout without receiving a message
RECEIVING => {
// SAFETY: we were in th receiving state and are now in the empty
// state, so the sender has not and will not try to read the waker,
// so we have exclusive access to drop it.
unsafe { channel.drop_waker() };

break Err(RecvTimeoutError::Timeout);
}
// The sender sent the message while we were parked.
MESSAGE => {
// Same safety and ordering as the Some branch

channel.state.store(DISCONNECTED, Relaxed);
break Ok(unsafe { channel.take_message() });
}
// The sender was dropped while we were parked.
DISCONNECTED => {
// ORDERING: we were originally in the disconnected state meaning
// that the sender is inactive and no longer observing the state,
// so we only need to change it back to DISCONNECTED for if the
// receiver is dropped or a recv* method is called again
channel.state.store(DISCONNECTED, Relaxed);

break Err(RecvTimeoutError::Disconnected);
}
UNPARKING => {
// We were in the UNPARKING state and are now in the EMPTY state.
// We wait to be unparked since this is the only way to maintain
// correctness with intrusive memory.

break wait_for_unpark(channel);
}
_ => unreachable!(),
}
// The sender sent the message while we were parked.
MESSAGE => {
// Same safety and ordering as the Some branch

channel.state.store(DISCONNECTED, Relaxed);
break Ok(unsafe { channel.take_message() });
}
// The sender was dropped while we were parked.
DISCONNECTED => {
// ORDERING: we were originally in the disconnected state meaning
// that the sender is inactive and no longer observing the state,
// so we only need to change it back to DISCONNECTED for if the
// receiver is dropped or a recv* method is called again
channel.state.store(DISCONNECTED, Relaxed);

break Err(RecvTimeoutError::Disconnected);
},
UNPARKING => {
// We were in the UNPARKING state and are now in the EMPTY state.
// We wait to be unparked since this is the only way to maintain
// correctness with intrusive memory.

break wait_for_unpark(channel);
}
_ => unreachable!(),
}
}
}
}
})
},
)
}

#[cfg(feature = "std")]
#[inline(always)]
fn start_recv_ref<E>(
&self,
disconnected: impl FnOnce() -> E,
finish: impl FnOnce(&Channel<T>) -> Result<T, E>
finish: impl FnOnce(&Channel<T>) -> Result<T, E>,
) -> Result<T, E> {
// SAFETY: the existence of the `self` parameter serves as a certificate that the receiver
// is still alive, meaning that even if the sender was dropped then it would have observed
Expand All @@ -757,14 +771,19 @@ impl<T> Receiver<T> {
std::thread::sleep(std::time::Duration::from_millis(10));

// Write our waker instance to the channel.
unsafe { channel.write_waker(ReceiverWaker::current_thread()); }
unsafe {
channel.write_waker(ReceiverWaker::current_thread());
}

// ORDERING: we use release ordering on success so the sender can synchronize with
// our write of the waker. We use relaxed ordering on failure since the sender does
// not need to syncrhonize with our write and the individual match arms handle any
// additional synchronization
match channel.state.compare_exchange(EMPTY, RECEIVING, Release, Relaxed) {
// We stored our waker, now we delegate to the callback to finish the receive
match channel
.state
.compare_exchange(EMPTY, RECEIVING, Release, Relaxed)
{
// We stored our waker, now we delegate to the callback to finish the receive
// operation
Ok(EMPTY) => finish(channel),
// The sender sent the message while we prepared to finish
Expand All @@ -774,7 +793,7 @@ impl<T> Receiver<T> {
fence(Acquire);

unsafe { channel.drop_waker() };

// ORDERING: the sender has been `mem::forget`-ed so this update only
// needs to be visible to us
channel.state.store(DISCONNECTED, Relaxed);
Expand Down Expand Up @@ -824,7 +843,10 @@ impl<T> core::future::Future for Receiver<T> {
EMPTY => unsafe { channel.write_async_waker(cx) },
// We were polled again while waiting for the sender. Replace the waker with the new one.
RECEIVING => {
match channel.state.compare_exchange(RECEIVING, EMPTY, Relaxed, Relaxed) {
match channel
.state
.compare_exchange(RECEIVING, EMPTY, Relaxed, Relaxed)
{
// We successfully changed the state back to EMPTY. Replace the waker.
Ok(RECEIVING) => {
unsafe { channel.drop_waker() };
Expand All @@ -845,7 +867,7 @@ impl<T> core::future::Future for Receiver<T> {
cx.waker().wake_by_ref();
hint::spin_loop();
Poll::Pending
},
}
_ => unreachable!(),
}
}
Expand Down Expand Up @@ -1035,7 +1057,10 @@ impl<T> Channel<T> {
// Write our thread instance to the channel.
self.write_waker(ReceiverWaker::task_waker(cx));

match self.state.compare_exchange(EMPTY, RECEIVING, AcqRel, Acquire) {
match self
.state
.compare_exchange(EMPTY, RECEIVING, AcqRel, Acquire)
{
// We stored our waker, now we return and let the sender wake us up
Ok(EMPTY) => Poll::Pending,
// The sender was dropped before sending anything while we prepared to park.
Expand Down
5 changes: 4 additions & 1 deletion tests/assert_mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,8 @@ fn error_sizes() {
assert_eq!(mem::size_of::<oneshot::SendError<[u8; 1024]>>(), EXPECTED);

// This test would fail before switching to `NonNull`
assert_eq!(mem::size_of::<Result<(), oneshot::SendError<()>>>(), EXPECTED);
assert_eq!(
mem::size_of::<Result<(), oneshot::SendError<()>>>(),
EXPECTED
);
}
Loading

0 comments on commit 4985ca5

Please sign in to comment.