Skip to content

Commit

Permalink
Improve memory Ordering
Browse files Browse the repository at this point in the history
## This Commit

Downgrades the memory ordering for the `odd_next` bool operations from
`SeqCst` to what is appropriate.

## Why?

For improved performance and clarity. This should continue working after
we remove the `Mutex` around the stream (once we figure out how). This
could very well be too relaxed, but it'd be hard to tell without Loom or
Miri.
  • Loading branch information
mlodato517 committed Jun 25, 2024
1 parent b27f422 commit b168b07
Showing 1 changed file with 20 additions and 17 deletions.
37 changes: 20 additions & 17 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
// - tokio::test to skip time
// 4. Performance optimizations
// - Get rid of Mutex (AtomicBool governs mutual exclusion)
// - Improved Ordering variant
// - Improve use of pin_project

use std::pin::Pin;
Expand Down Expand Up @@ -52,7 +51,7 @@ impl<S: Stream> Stream for Even<S> {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();

let odd_next = this.shared_state.odd_next.load(Ordering::SeqCst);
let odd_next = this.shared_state.odd_next.load(Ordering::Acquire);
if odd_next {
match &mut this.shared_state.wakers.lock().unwrap().even_waker {
Some(waker) => waker.clone_from(cx.waker()),
Expand All @@ -61,15 +60,17 @@ impl<S: Stream> Stream for Even<S> {
return Poll::Pending;
}

let inner_stream = &mut *this.shared_state.stream.lock().unwrap();
let next_item = {
let inner_stream = &mut *this.shared_state.stream.lock().unwrap();

// SAFETY: This probably isn't actually safe, but I'm hoping the fact that we own this
// stream and we maybe promise not to move it is fine. We'll test with MIRI later and still
// probably be wrong...
let mut inner_stream = unsafe { Pin::new_unchecked(inner_stream) };
let next_item = ready!(inner_stream.as_mut().poll_next(cx));
// SAFETY: This probably isn't actually safe, but I'm hoping the fact that we own this
// stream and we maybe promise not to move it is fine. We'll test with MIRI later and still
// probably be wrong...
let mut inner_stream = unsafe { Pin::new_unchecked(inner_stream) };
ready!(inner_stream.as_mut().poll_next(cx))
};

this.shared_state.odd_next.store(true, Ordering::SeqCst);
this.shared_state.odd_next.store(true, Ordering::Release);
if let Some(waker) = this.shared_state.wakers.lock().unwrap().odd_waker.take() {
waker.wake();
}
Expand All @@ -82,7 +83,7 @@ impl<S: Stream> Stream for Odd<S> {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();

let odd_next = this.shared_state.odd_next.load(Ordering::SeqCst);
let odd_next = this.shared_state.odd_next.load(Ordering::Acquire);
if !odd_next {
match &mut this.shared_state.wakers.lock().unwrap().odd_waker {
Some(waker) => waker.clone_from(cx.waker()),
Expand All @@ -91,15 +92,17 @@ impl<S: Stream> Stream for Odd<S> {
return Poll::Pending;
}

let inner_stream = &mut *this.shared_state.stream.lock().unwrap();
let next_item = {
let inner_stream = &mut *this.shared_state.stream.lock().unwrap();

// SAFETY: This probably isn't actually safe, but I'm hoping the fact that we own this
// stream and we maybe promise not to move it is fine. We'll test with MIRI later and still
// probably be wrong...
let mut inner_stream = unsafe { Pin::new_unchecked(inner_stream) };
let next_item = ready!(inner_stream.as_mut().poll_next(cx));
// SAFETY: This probably isn't actually safe, but I'm hoping the fact that we own this
// stream and we maybe promise not to move it is fine. We'll test with MIRI later and still
// probably be wrong...
let mut inner_stream = unsafe { Pin::new_unchecked(inner_stream) };
ready!(inner_stream.as_mut().poll_next(cx))
};

this.shared_state.odd_next.store(false, Ordering::SeqCst);
this.shared_state.odd_next.store(false, Ordering::Release);
if let Some(waker) = this.shared_state.wakers.lock().unwrap().even_waker.take() {
waker.wake();
}
Expand Down

0 comments on commit b168b07

Please sign in to comment.