diff --git a/commons/zenoh-task/src/lib.rs b/commons/zenoh-task/src/lib.rs index 5f7c3c26d2..a733a3de13 100644 --- a/commons/zenoh-task/src/lib.rs +++ b/commons/zenoh-task/src/lib.rs @@ -130,10 +130,16 @@ impl TaskController { } pub struct TerminatableTask { - handle: JoinHandle<()>, + handle: Option>, token: CancellationToken, } +impl Drop for TerminatableTask { + fn drop(&mut self) { + self.terminate(std::time::Duration::from_secs(10)); + } +} + impl TerminatableTask { pub fn create_cancellation_token() -> CancellationToken { CancellationToken::new() @@ -147,7 +153,7 @@ impl TerminatableTask { T: Send + 'static, { TerminatableTask { - handle: rt.spawn(future.map(|_f| ())), + handle: Some(rt.spawn(future.map(|_f| ()))), token, } } @@ -168,24 +174,26 @@ impl TerminatableTask { }; TerminatableTask { - handle: rt.spawn(task), + handle: Some(rt.spawn(task)), token, } } /// Attempts to terminate the task. /// Returns true if task completed / aborted within timeout duration, false otherwise. - pub fn terminate(self, timeout: Duration) -> bool { + pub fn terminate(&mut self, timeout: Duration) -> bool { ResolveFuture::new(async move { self.terminate_async(timeout).await }).res_sync() } /// Async version of [`TerminatableTask::terminate()`]. - pub async fn terminate_async(self, timeout: Duration) -> bool { + pub async fn terminate_async(&mut self, timeout: Duration) -> bool { self.token.cancel(); - if tokio::time::timeout(timeout, self.handle).await.is_err() { - tracing::error!("Failed to terminate the task"); - return false; - }; + if let Some(handle) = self.handle.take() { + if tokio::time::timeout(timeout, handle).await.is_err() { + tracing::error!("Failed to terminate the task"); + return false; + }; + } true } } diff --git a/zenoh-ext/src/publication_cache.rs b/zenoh-ext/src/publication_cache.rs index 431ccd2dde..821e621482 100644 --- a/zenoh-ext/src/publication_cache.rs +++ b/zenoh-ext/src/publication_cache.rs @@ -257,7 +257,7 @@ impl<'a> PublicationCache<'a> { let PublicationCache { _queryable, local_sub, - task, + mut task, } = self; _queryable.undeclare().res_async().await?; local_sub.undeclare().res_async().await?; diff --git a/zenoh/src/net/routing/hat/linkstate_peer/mod.rs b/zenoh/src/net/routing/hat/linkstate_peer/mod.rs index ad4e1667f0..b1eeca261f 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/mod.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/mod.rs @@ -47,7 +47,6 @@ use std::{ any::Any, collections::{HashMap, HashSet}, sync::Arc, - time::Duration, }; use zenoh_config::{unwrap_or_default, ModeDependent, WhatAmI, WhatAmIMatcher, ZenohId}; use zenoh_protocol::{ @@ -116,15 +115,6 @@ struct HatTables { peers_trees_task: Option, } -impl Drop for HatTables { - fn drop(&mut self) { - if self.peers_trees_task.is_some() { - let task = self.peers_trees_task.take().unwrap(); - task.terminate(Duration::from_secs(10)); - } - } -} - impl HatTables { fn new() -> Self { Self { diff --git a/zenoh/src/net/routing/hat/router/mod.rs b/zenoh/src/net/routing/hat/router/mod.rs index 3be278aa02..2b988917c2 100644 --- a/zenoh/src/net/routing/hat/router/mod.rs +++ b/zenoh/src/net/routing/hat/router/mod.rs @@ -52,7 +52,6 @@ use std::{ collections::{hash_map::DefaultHasher, HashMap, HashSet}, hash::Hasher, sync::Arc, - time::Duration, }; use zenoh_config::{unwrap_or_default, ModeDependent, WhatAmI, WhatAmIMatcher, ZenohId}; use zenoh_protocol::{ @@ -127,19 +126,6 @@ struct HatTables { router_peers_failover_brokering: bool, } -impl Drop for HatTables { - fn drop(&mut self) { - if self.peers_trees_task.is_some() { - let task = self.peers_trees_task.take().unwrap(); - task.terminate(Duration::from_secs(10)); - } - if self.routers_trees_task.is_some() { - let task = self.routers_trees_task.take().unwrap(); - task.terminate(Duration::from_secs(10)); - } - } -} - impl HatTables { fn new(router_peers_failover_brokering: bool) -> Self { Self {