From eee5cd591ba08e2bcd05c5c865fa0b4982f3d4f4 Mon Sep 17 00:00:00 2001 From: "nengli.ln" Date: Mon, 20 Nov 2023 20:47:27 +0800 Subject: [PATCH 1/3] fix memory leak in pegasus --- .../executor/engine/pegasus/common/src/rc.rs | 5 ++++- .../engine/pegasus/pegasus/src/errors/mod.rs | 4 ++++ .../engine/pegasus/pegasus/src/lib.rs | 20 +++++++++++++++---- .../engine/pegasus/pegasus/src/worker.rs | 7 ++++++- 4 files changed, 30 insertions(+), 6 deletions(-) diff --git a/interactive_engine/executor/engine/pegasus/common/src/rc.rs b/interactive_engine/executor/engine/pegasus/common/src/rc.rs index df46a789393c..5c5b0e4e4510 100644 --- a/interactive_engine/executor/engine/pegasus/common/src/rc.rs +++ b/interactive_engine/executor/engine/pegasus/common/src/rc.rs @@ -97,7 +97,10 @@ impl std::fmt::Debug for RcPointer { impl Drop for RcPointer { fn drop(&mut self) { if self.count.fetch_sub(1, Ordering::SeqCst) == 1 { - unsafe { std::ptr::drop_in_place(self.ptr.as_ptr()) } + //unsafe { std::ptr::drop_in_place(self.ptr.as_ptr()) } + unsafe { + let _ = Box::from_raw(self.ptr.as_ptr()); + } } } } diff --git a/interactive_engine/executor/engine/pegasus/pegasus/src/errors/mod.rs b/interactive_engine/executor/engine/pegasus/pegasus/src/errors/mod.rs index a23b59448c44..439c155dd6a0 100644 --- a/interactive_engine/executor/engine/pegasus/pegasus/src/errors/mod.rs +++ b/interactive_engine/executor/engine/pegasus/pegasus/src/errors/mod.rs @@ -343,6 +343,7 @@ impl From for StartupError { #[derive(Debug)] pub enum CancelError { JobNotFoundError(u64), + CancelMapPoisonedError, } impl Display for CancelError { @@ -351,6 +352,9 @@ impl Display for CancelError { CancelError::JobNotFoundError(e) => { write!(f, "fail to find job, job id: {};", e) } + CancelError::CancelMapPoisonedError => { + write!(f, "JOB_CANCEL_MAP is poisoned!;") + } } } } diff --git a/interactive_engine/executor/engine/pegasus/pegasus/src/lib.rs b/interactive_engine/executor/engine/pegasus/pegasus/src/lib.rs index 931fbec060c6..bc3a7dc1465d 100644 --- a/interactive_engine/executor/engine/pegasus/pegasus/src/lib.rs +++ b/interactive_engine/executor/engine/pegasus/pegasus/src/lib.rs @@ -299,11 +299,23 @@ where } pub fn cancel_job(job_id: u64) -> Result<(), CancelError> { - let mut hook = JOB_CANCEL_MAP.write().expect("lock poisoned"); - if let Some(cancel_hook) = hook.get_mut(&job_id) { - cancel_hook.store(true, Ordering::SeqCst); + if let Ok(mut hook) = JOB_CANCEL_MAP.write() { + if let Some(cancel_hook) = hook.get_mut(&job_id) { + cancel_hook.store(true, Ordering::SeqCst); + } else { + return Err(CancelError::JobNotFoundError(job_id)); + } + } else { + return Err(CancelError::CancelMapPoisonedError); + } + Ok(()) +} + +pub fn remove_cancel_hook(job_id: u64) -> Result<(), CancelError> { + if let Ok(mut hook) = JOB_CANCEL_MAP.write() { + hook.remove(&job_id); } else { - return Err(CancelError::JobNotFoundError(job_id)); + return Err(CancelError::CancelMapPoisonedError); } Ok(()) } diff --git a/interactive_engine/executor/engine/pegasus/pegasus/src/worker.rs b/interactive_engine/executor/engine/pegasus/pegasus/src/worker.rs index 31eb772e3218..3c75293243dc 100644 --- a/interactive_engine/executor/engine/pegasus/pegasus/src/worker.rs +++ b/interactive_engine/executor/engine/pegasus/pegasus/src/worker.rs @@ -131,7 +131,12 @@ impl Worker { } fn release(&mut self) { - self.peer_guard.fetch_sub(1, Ordering::SeqCst); + if self.peer_guard.fetch_sub(1, Ordering::SeqCst) == 1 { + pegasus_memory::alloc::remove_task(self.conf.job_id as usize); + } + if !crate::remove_cancel_hook(self.conf.job_id).is_ok() { + error!("JOB_CANCEL_MAP is poisoned!"); + } } } From ccb028f9d0279dfeb02ccc4e91058ee84aef7670 Mon Sep 17 00:00:00 2001 From: "nengli.ln" Date: Tue, 21 Nov 2023 10:27:39 +0800 Subject: [PATCH 2/3] specify type for from_raw --- interactive_engine/executor/engine/pegasus/common/src/rc.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/interactive_engine/executor/engine/pegasus/common/src/rc.rs b/interactive_engine/executor/engine/pegasus/common/src/rc.rs index 5c5b0e4e4510..4e37d193874b 100644 --- a/interactive_engine/executor/engine/pegasus/common/src/rc.rs +++ b/interactive_engine/executor/engine/pegasus/common/src/rc.rs @@ -97,9 +97,8 @@ impl std::fmt::Debug for RcPointer { impl Drop for RcPointer { fn drop(&mut self) { if self.count.fetch_sub(1, Ordering::SeqCst) == 1 { - //unsafe { std::ptr::drop_in_place(self.ptr.as_ptr()) } unsafe { - let _ = Box::from_raw(self.ptr.as_ptr()); + let _ = Box::::from_raw(self.ptr.as_ptr()); } } } From 5728325285d2d2aa0b5efc44a492d1ef2c52cf35 Mon Sep 17 00:00:00 2001 From: "nengli.ln" Date: Tue, 21 Nov 2023 10:48:49 +0800 Subject: [PATCH 3/3] update worker --- .../executor/engine/pegasus/pegasus/src/worker.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/interactive_engine/executor/engine/pegasus/pegasus/src/worker.rs b/interactive_engine/executor/engine/pegasus/pegasus/src/worker.rs index 3c75293243dc..f5f4f9cb9cba 100644 --- a/interactive_engine/executor/engine/pegasus/pegasus/src/worker.rs +++ b/interactive_engine/executor/engine/pegasus/pegasus/src/worker.rs @@ -131,9 +131,7 @@ impl Worker { } fn release(&mut self) { - if self.peer_guard.fetch_sub(1, Ordering::SeqCst) == 1 { - pegasus_memory::alloc::remove_task(self.conf.job_id as usize); - } + self.peer_guard.fetch_sub(1, Ordering::SeqCst); if !crate::remove_cancel_hook(self.conf.job_id).is_ok() { error!("JOB_CANCEL_MAP is poisoned!"); }