Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing the naive implementation of sleep #1523

Merged
merged 1 commit into from
Dec 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions sdk/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ rustc_version = "0.4"

[dev-dependencies]
env_logger = "0.10"
tokio = { version = "1.0", features = ["default"] }
tokio = { version = "1.0", features = ["default", "macros", "rt", "time"] }
thiserror = "1.0"

[features]
Expand All @@ -62,7 +62,8 @@ hmac_openssl = ["dep:openssl"]
test_e2e = []
azurite_workaround = []
xml = ["quick-xml"]
tokio-fs = ["tokio/fs", "tokio/io-util"]
tokio-fs = ["tokio/fs", "tokio/sync", "tokio/io-util"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What changed to the tokio-fs feature that requires tokio/sync?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It uses mutex which is behind sync feature flag. It just didn't compile. Should fix ci tests as well

tokio-sleep = ["tokio"]

[package.metadata.docs.rs]
features = ["xml", "tokio-fs", "enable_reqwest", "enable_reqwest_gzip", "enable_reqwest_rustls", "hmac_rust", "hmac_openssl", "xml"]
1 change: 0 additions & 1 deletion sdk/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ use uuid::Uuid;
#[cfg(feature = "xml")]
pub mod xml;

#[cfg(feature = "tokio")]
pub mod tokio;

pub mod base64;
Expand Down
42 changes: 42 additions & 0 deletions sdk/core/src/sleep/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#[cfg(not(feature = "tokio-sleep"))]
mod thread;

#[cfg(not(feature = "tokio-sleep"))]
pub use self::thread::{sleep, Sleep};

#[cfg(feature = "tokio-sleep")]
pub use tokio::time::{sleep, Sleep};

// Unit tests
#[cfg(test)]
mod tests {

/// Basic test that launches 10k futures and waits for them to complete
/// Has a high chance of failing if there is a race condition in sleep method
/// Runs quickly otherwise
#[cfg(not(feature = "tokio-sleep"))]
#[tokio::test]
async fn test_timeout() {
use super::*;
use std::time::Duration;
use tokio::task::JoinSet;

let mut join_set = JoinSet::default();
let total = 10000;
for _i in 0..total {
join_set.spawn(async move {
sleep(Duration::from_millis(10)).await;
});
}

loop {
let res =
tokio::time::timeout(std::time::Duration::from_secs(10), join_set.join_next())
.await;
assert!(res.is_ok());
if let Ok(None) = res {
break;
}
}
}
}
20 changes: 14 additions & 6 deletions sdk/core/src/sleep.rs → sdk/core/src/sleep/thread.rs
Original file line number Diff line number Diff line change
@@ -1,39 +1,47 @@
use futures::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use std::thread;
use std::time::Duration;

/// Creates a future that resolves after a specified duration of time.
/// Uses a simple thread based implementation for sleep. A more efficient
/// implementation is available by using the `tokio-sleep` crate feature.
pub fn sleep(duration: Duration) -> Sleep {
Sleep {
thread: None,
signal: None,
duration,
}
}

#[derive(Debug)]
pub struct Sleep {
thread: Option<thread::JoinHandle<()>>,
signal: Option<Arc<AtomicBool>>,
duration: Duration,
}

impl Future for Sleep {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if let Some(thread) = &self.thread {
if thread.is_finished() {
if let Some(signal) = &self.signal {
if signal.load(Ordering::Acquire) {
Poll::Ready(())
} else {
Poll::Pending
}
} else {
let signal = Arc::new(AtomicBool::new(false));
let waker = cx.waker().clone();
let duration = self.duration;
self.get_mut().thread = Some(thread::spawn(move || {
self.get_mut().signal = Some(signal.clone());
thread::spawn(move || {
thread::sleep(duration);
signal.store(true, Ordering::Release);
waker.wake();
}));
});
Poll::Pending
}
}
Expand Down
1 change: 1 addition & 0 deletions sdk/core/src/tokio/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
#[cfg(feature = "tokio-fs")]
pub mod fs;