Skip to content

Commit

Permalink
perf: Pack the state and future of unfolds in the same memory (#2283)
Browse files Browse the repository at this point in the history
* Pack the state and future of unfolds in the same memory

* Use the same type for both sink and stream unfolds
  • Loading branch information
Marwes authored Dec 26, 2020
1 parent 5559680 commit 7ce5ce8
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 74 deletions.
90 changes: 51 additions & 39 deletions futures-util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,21 @@
#![cfg_attr(feature = "cfg-target-has-atomic", feature(cfg_target_has_atomic))]
#![cfg_attr(feature = "read-initializer", feature(read_initializer))]
#![cfg_attr(feature = "write-all-vectored", feature(io_slice_advance))]

#![cfg_attr(not(feature = "std"), no_std)]
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms, unreachable_pub)]
#![warn(
missing_docs,
missing_debug_implementations,
rust_2018_idioms,
unreachable_pub
)]
// It cannot be included in the published code because this lints have false positives in the minimum required version.
#![cfg_attr(test, warn(single_use_lifetimes))]
#![warn(clippy::all)]

// mem::take requires Rust 1.40, matches! requires Rust 1.42
// Can be removed if the minimum supported version increased or if https://github.com/rust-lang/rust-clippy/issues/3941
// get's implemented.
#![allow(clippy::mem_replace_with_default, clippy::match_like_matches_macro)]

#![doc(test(attr(deny(warnings), allow(dead_code, unused_assignments, unused_variables))))]

#![cfg_attr(docsrs, feature(doc_cfg))]

#[cfg(all(feature = "cfg-target-has-atomic", not(feature = "unstable")))]
Expand Down Expand Up @@ -49,7 +50,7 @@ pub use self::async_await::*;
pub mod __private {
pub use crate::*;
pub use core::{
option::Option::{self, Some, None},
option::Option::{self, None, Some},
pin::Pin,
result::Result::{Err, Ok},
};
Expand All @@ -76,10 +77,7 @@ macro_rules! delegate_sink {
self.project().$field.poll_ready(cx)
}

fn start_send(
self: core::pin::Pin<&mut Self>,
item: $item,
) -> Result<(), Self::Error> {
fn start_send(self: core::pin::Pin<&mut Self>, item: $item) -> Result<(), Self::Error> {
self.project().$field.start_send(item)
}

Expand All @@ -96,7 +94,7 @@ macro_rules! delegate_sink {
) -> core::task::Poll<Result<(), Self::Error>> {
self.project().$field.poll_close(cx)
}
}
};
}

macro_rules! delegate_future {
Expand All @@ -107,7 +105,7 @@ macro_rules! delegate_future {
) -> core::task::Poll<Self::Output> {
self.project().$field.poll(cx)
}
}
};
}

macro_rules! delegate_stream {
Expand All @@ -121,34 +119,40 @@ macro_rules! delegate_stream {
fn size_hint(&self) -> (usize, Option<usize>) {
self.$field.size_hint()
}
}
};
}

#[cfg(feature = "io")]
#[cfg(feature = "std")]
macro_rules! delegate_async_write {
($field:ident) => {
fn poll_write(self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>, buf: &[u8])
-> core::task::Poll<std::io::Result<usize>>
{
fn poll_write(
self: core::pin::Pin<&mut Self>,
cx: &mut core::task::Context<'_>,
buf: &[u8],
) -> core::task::Poll<std::io::Result<usize>> {
self.project().$field.poll_write(cx, buf)
}
fn poll_write_vectored(self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>, bufs: &[std::io::IoSlice<'_>])
-> core::task::Poll<std::io::Result<usize>>
{
fn poll_write_vectored(
self: core::pin::Pin<&mut Self>,
cx: &mut core::task::Context<'_>,
bufs: &[std::io::IoSlice<'_>],
) -> core::task::Poll<std::io::Result<usize>> {
self.project().$field.poll_write_vectored(cx, bufs)
}
fn poll_flush(self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>)
-> core::task::Poll<std::io::Result<()>>
{
fn poll_flush(
self: core::pin::Pin<&mut Self>,
cx: &mut core::task::Context<'_>,
) -> core::task::Poll<std::io::Result<()>> {
self.project().$field.poll_flush(cx)
}
fn poll_close(self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>)
-> core::task::Poll<std::io::Result<()>>
{
fn poll_close(
self: core::pin::Pin<&mut Self>,
cx: &mut core::task::Context<'_>,
) -> core::task::Poll<std::io::Result<()>> {
self.project().$field.poll_close(cx)
}
}
};
}

#[cfg(feature = "io")]
Expand All @@ -160,18 +164,22 @@ macro_rules! delegate_async_read {
self.$field.initializer()
}

fn poll_read(self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>, buf: &mut [u8])
-> core::task::Poll<std::io::Result<usize>>
{
fn poll_read(
self: core::pin::Pin<&mut Self>,
cx: &mut core::task::Context<'_>,
buf: &mut [u8],
) -> core::task::Poll<std::io::Result<usize>> {
self.project().$field.poll_read(cx, buf)
}

fn poll_read_vectored(self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>, bufs: &mut [std::io::IoSliceMut<'_>])
-> core::task::Poll<std::io::Result<usize>>
{
fn poll_read_vectored(
self: core::pin::Pin<&mut Self>,
cx: &mut core::task::Context<'_>,
bufs: &mut [std::io::IoSliceMut<'_>],
) -> core::task::Poll<std::io::Result<usize>> {
self.project().$field.poll_read_vectored(cx, bufs)
}
}
};
}

#[cfg(feature = "io")]
Expand All @@ -188,7 +196,7 @@ macro_rules! delegate_async_buf_read {
fn consume(self: core::pin::Pin<&mut Self>, amt: usize) {
self.project().$field.consume(amt)
}
}
};
}

macro_rules! delegate_access_inner {
Expand Down Expand Up @@ -304,16 +312,19 @@ macro_rules! delegate_all {
}

pub mod future;
#[doc(hidden)] pub use crate::future::{FutureExt, TryFutureExt};
#[doc(hidden)]
pub use crate::future::{FutureExt, TryFutureExt};

pub mod stream;
#[doc(hidden)] pub use crate::stream::{StreamExt, TryStreamExt};
#[doc(hidden)]
pub use crate::stream::{StreamExt, TryStreamExt};

#[cfg(feature = "sink")]
#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
pub mod sink;
#[cfg(feature = "sink")]
#[doc(hidden)] pub use crate::sink::SinkExt;
#[doc(hidden)]
pub use crate::sink::SinkExt;

pub mod task;

Expand All @@ -329,10 +340,11 @@ pub mod compat;
pub mod io;
#[cfg(feature = "io")]
#[cfg(feature = "std")]
#[doc(hidden)] pub use crate::io::{AsyncReadExt, AsyncWriteExt, AsyncSeekExt, AsyncBufReadExt};
#[doc(hidden)]
pub use crate::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt};

mod fns;

mod unfold_state;

cfg_target_has_atomic! {
#[cfg(feature = "alloc")]
Expand Down
25 changes: 12 additions & 13 deletions futures-util/src/sink/unfold.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::unfold_state::UnfoldState;
use core::{future::Future, pin::Pin};
use futures_core::ready;
use futures_core::task::{Context, Poll};
Expand All @@ -9,10 +10,9 @@ pin_project! {
#[derive(Debug)]
#[must_use = "sinks do nothing unless polled"]
pub struct Unfold<T, F, R> {
state: Option<T>,
function: F,
#[pin]
future: Option<R>,
state: UnfoldState<T, R>,
}
}

Expand All @@ -37,9 +37,8 @@ pin_project! {
/// ```
pub fn unfold<T, F, R>(init: T, function: F) -> Unfold<T, F, R> {
Unfold {
state: Some(init),
function,
future: None,
state: UnfoldState::Value(init),
}
}

Expand All @@ -56,24 +55,24 @@ where

fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
let mut this = self.project();
debug_assert!(this.future.is_none());
let future = (this.function)(this.state.take().unwrap(), item);
this.future.set(Some(future));
let future = match this.state.as_mut().take_value() {
Some(value) => (this.function)(value, item),
None => panic!("start_send called without poll_ready being called first"),
};
this.state.set(UnfoldState::Future(future));
Ok(())
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let mut this = self.project();
Poll::Ready(if let Some(future) = this.future.as_mut().as_pin_mut() {
let result = match ready!(future.poll(cx)) {
Poll::Ready(if let Some(future) = this.state.as_mut().project_future() {
match ready!(future.poll(cx)) {
Ok(state) => {
*this.state = Some(state);
this.state.set(UnfoldState::Value(state));
Ok(())
}
Err(err) => Err(err),
};
this.future.set(None);
result
}
} else {
Ok(())
})
Expand Down
48 changes: 26 additions & 22 deletions futures-util/src/stream/unfold.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::unfold_state::UnfoldState;
use core::fmt;
use core::pin::Pin;
use futures_core::future::Future;
Expand Down Expand Up @@ -46,13 +47,13 @@ use pin_project_lite::pin_project;
/// # });
/// ```
pub fn unfold<T, F, Fut, Item>(init: T, f: F) -> Unfold<T, F, Fut>
where F: FnMut(T) -> Fut,
Fut: Future<Output = Option<(Item, T)>>,
where
F: FnMut(T) -> Fut,
Fut: Future<Output = Option<(Item, T)>>,
{
Unfold {
f,
state: Some(init),
fut: None,
state: UnfoldState::Value(init),
}
}

Expand All @@ -61,9 +62,8 @@ pin_project! {
#[must_use = "streams do nothing unless polled"]
pub struct Unfold<T, F, Fut> {
f: F,
state: Option<T>,
#[pin]
fut: Option<Fut>,
state: UnfoldState<T, Fut>,
}
}

Expand All @@ -75,44 +75,48 @@ where
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Unfold")
.field("state", &self.state)
.field("fut", &self.fut)
.finish()
}
}

impl<T, F, Fut, Item> FusedStream for Unfold<T, F, Fut>
where F: FnMut(T) -> Fut,
Fut: Future<Output = Option<(Item, T)>>,
where
F: FnMut(T) -> Fut,
Fut: Future<Output = Option<(Item, T)>>,
{
fn is_terminated(&self) -> bool {
self.state.is_none() && self.fut.is_none()
if let UnfoldState::Empty = self.state {
true
} else {
false
}
}
}

impl<T, F, Fut, Item> Stream for Unfold<T, F, Fut>
where F: FnMut(T) -> Fut,
Fut: Future<Output = Option<(Item, T)>>,
where
F: FnMut(T) -> Fut,
Fut: Future<Output = Option<(Item, T)>>,
{
type Item = Item;

fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();

if let Some(state) = this.state.take() {
this.fut.set(Some((this.f)(state)));
if let Some(state) = this.state.as_mut().take_value() {
this.state.set(UnfoldState::Future((this.f)(state)));
}

let step = ready!(this.fut.as_mut().as_pin_mut()
.expect("Unfold must not be polled after it returned `Poll::Ready(None)`").poll(cx));
this.fut.set(None);
let step = match this.state.as_mut().project_future() {
Some(fut) => ready!(fut.poll(cx)),
None => panic!("Unfold must not be polled after it returned `Poll::Ready(None)`"),
};

if let Some((item, next_state)) = step {
*this.state = Some(next_state);
this.state.set(UnfoldState::Value(next_state));
Poll::Ready(Some(item))
} else {
this.state.set(UnfoldState::Empty);
Poll::Ready(None)
}
}
Expand Down
34 changes: 34 additions & 0 deletions futures-util/src/unfold_state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use core::pin::Pin;

/// UnfoldState used for stream and sink unfolds
#[derive(Debug)]
pub(crate) enum UnfoldState<T, R> {
Value(T),
Future(/* #[pin] */ R),
Empty,
}

impl<T, R> UnfoldState<T, R> {
pub(crate) fn project_future(self: Pin<&mut Self>) -> Option<Pin<&mut R>> {
// SAFETY Normal pin projection on the `Future` variant
unsafe {
match self.get_unchecked_mut() {
Self::Future(f) => Some(Pin::new_unchecked(f)),
_ => None,
}
}
}

pub(crate) fn take_value(self: Pin<&mut Self>) -> Option<T> {
// SAFETY We only move out of the `Value` variant which is not pinned
match *self {
Self::Value(_) => unsafe {
match core::mem::replace(self.get_unchecked_mut(), UnfoldState::Empty) {
UnfoldState::Value(v) => Some(v),
_ => core::hint::unreachable_unchecked(),
}
},
_ => None,
}
}
}

0 comments on commit 7ce5ce8

Please sign in to comment.