Skip to content

Commit

Permalink
Fixing the naive implementation of sleep and adding a tokio based
Browse files Browse the repository at this point in the history
alternative
  • Loading branch information
msabansal committed Dec 21, 2023
1 parent aa6f234 commit 7648d5d
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 9 deletions.
5 changes: 3 additions & 2 deletions sdk/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ rustc_version = "0.4"

[dev-dependencies]
env_logger = "0.10"
tokio = { version = "1.0", features = ["default"] }
tokio = { version = "1.0", features = ["default", "macros", "rt", "time"] }
thiserror = "1.0"

[features]
Expand All @@ -62,7 +62,8 @@ hmac_openssl = ["dep:openssl"]
test_e2e = []
azurite_workaround = []
xml = ["quick-xml"]
tokio-fs = ["tokio/fs", "tokio/io-util"]
tokio-fs = ["tokio/fs", "tokio/sync", "tokio/io-util"]
tokio-sleep = ["tokio"]

[package.metadata.docs.rs]
features = ["xml", "tokio-fs", "enable_reqwest", "enable_reqwest_gzip", "enable_reqwest_rustls", "hmac_rust", "hmac_openssl", "xml"]
1 change: 0 additions & 1 deletion sdk/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ use uuid::Uuid;
#[cfg(feature = "xml")]
pub mod xml;

#[cfg(feature = "tokio")]
pub mod tokio;

pub mod base64;
Expand Down
42 changes: 42 additions & 0 deletions sdk/core/src/sleep/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#[cfg(not(feature = "tokio-sleep"))]
mod thread;

#[cfg(not(feature = "tokio-sleep"))]
pub use self::thread::{sleep, Sleep};

#[cfg(feature = "tokio-sleep")]
pub use tokio::time::{sleep, Sleep};

// Unit tests
#[cfg(test)]
mod tests {

/// Basic test that launches 10k futures and waits for them to complete
/// Has a high chance of failing if there is a race condition in sleep method
/// Runs quickly otherwise
#[cfg(not(feature = "tokio-sleep"))]
#[tokio::test]
async fn test_timeout() {
use super::*;
use std::time::Duration;
use tokio::task::JoinSet;

let mut join_set = JoinSet::default();
let total = 10000;
for _i in 0..total {
join_set.spawn(async move {
sleep(Duration::from_millis(10)).await;
});
}

loop {
let res =
tokio::time::timeout(std::time::Duration::from_secs(10), join_set.join_next())
.await;
assert!(res.is_ok());
if let Ok(None) = res {
break;
}
}
}
}
20 changes: 14 additions & 6 deletions sdk/core/src/sleep.rs → sdk/core/src/sleep/thread.rs
Original file line number Diff line number Diff line change
@@ -1,39 +1,47 @@
use futures::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use std::thread;
use std::time::Duration;

/// Creates a future that resolves after a specified duration of time.
/// Uses a simple thread based implementation for sleep. A more efficient
/// implementation is available by using the `tokio-sleep` crate feature.
pub fn sleep(duration: Duration) -> Sleep {
Sleep {
thread: None,
signal: None,
duration,
}
}

#[derive(Debug)]
pub struct Sleep {
thread: Option<thread::JoinHandle<()>>,
signal: Option<Arc<AtomicBool>>,
duration: Duration,
}

impl Future for Sleep {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if let Some(thread) = &self.thread {
if thread.is_finished() {
if let Some(signal) = &self.signal {
if signal.load(Ordering::Acquire) {
Poll::Ready(())
} else {
Poll::Pending
}
} else {
let signal = Arc::new(AtomicBool::new(false));
let waker = cx.waker().clone();
let duration = self.duration;
self.get_mut().thread = Some(thread::spawn(move || {
self.get_mut().signal = Some(signal.clone());
thread::spawn(move || {
thread::sleep(duration);
signal.store(true, Ordering::Release);
waker.wake();
}));
});
Poll::Pending
}
}
Expand Down
1 change: 1 addition & 0 deletions sdk/core/src/tokio/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
#[cfg(feature = "tokio-fs")]
pub mod fs;

0 comments on commit 7648d5d

Please sign in to comment.