Skip to content

Commit

Permalink
fix(interactive): fix memory leak in pegasus (#3367)
Browse files Browse the repository at this point in the history
<!--
Thanks for your contribution! please review
https://github.com/alibaba/GraphScope/blob/main/CONTRIBUTING.md before
opening an issue.
-->

## What do these changes do?

<!-- Please give a short brief about these changes. -->

## Related issue number

<!-- Are there any issues opened that will be resolved by merging this
change? -->

Fixes
#3366
  • Loading branch information
lnfjpt authored Nov 21, 2023
1 parent 9c227dd commit 2657681
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 5 deletions.
4 changes: 3 additions & 1 deletion interactive_engine/executor/engine/pegasus/common/src/rc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ impl<T: std::fmt::Debug> std::fmt::Debug for RcPointer<T> {
impl<T: ?Sized> Drop for RcPointer<T> {
fn drop(&mut self) {
if self.count.fetch_sub(1, Ordering::SeqCst) == 1 {
unsafe { std::ptr::drop_in_place(self.ptr.as_ptr()) }
unsafe {
let _ = Box::<T>::from_raw(self.ptr.as_ptr());
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ impl From<std::io::Error> for StartupError {
#[derive(Debug)]
pub enum CancelError {
JobNotFoundError(u64),
CancelMapPoisonedError,
}

impl Display for CancelError {
Expand All @@ -351,6 +352,9 @@ impl Display for CancelError {
CancelError::JobNotFoundError(e) => {
write!(f, "fail to find job, job id: {};", e)
}
CancelError::CancelMapPoisonedError => {
write!(f, "JOB_CANCEL_MAP is poisoned!;")
}
}
}
}
Expand Down
20 changes: 16 additions & 4 deletions interactive_engine/executor/engine/pegasus/pegasus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,11 +299,23 @@ where
}

pub fn cancel_job(job_id: u64) -> Result<(), CancelError> {
let mut hook = JOB_CANCEL_MAP.write().expect("lock poisoned");
if let Some(cancel_hook) = hook.get_mut(&job_id) {
cancel_hook.store(true, Ordering::SeqCst);
if let Ok(mut hook) = JOB_CANCEL_MAP.write() {
if let Some(cancel_hook) = hook.get_mut(&job_id) {
cancel_hook.store(true, Ordering::SeqCst);
} else {
return Err(CancelError::JobNotFoundError(job_id));
}
} else {
return Err(CancelError::CancelMapPoisonedError);
}
Ok(())
}

pub fn remove_cancel_hook(job_id: u64) -> Result<(), CancelError> {
if let Ok(mut hook) = JOB_CANCEL_MAP.write() {
hook.remove(&job_id);
} else {
return Err(CancelError::JobNotFoundError(job_id));
return Err(CancelError::CancelMapPoisonedError);
}
Ok(())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ impl<D: Data, T: Debug + Send + 'static> Worker<D, T> {

fn release(&mut self) {
self.peer_guard.fetch_sub(1, Ordering::SeqCst);
if !crate::remove_cancel_hook(self.conf.job_id).is_ok() {
error!("JOB_CANCEL_MAP is poisoned!");
}
}
}

Expand Down

0 comments on commit 2657681

Please sign in to comment.