Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(interactive): fix memory leak in pegasus #3367

Merged
merged 3 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion interactive_engine/executor/engine/pegasus/common/src/rc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,10 @@ impl<T: std::fmt::Debug> std::fmt::Debug for RcPointer<T> {
impl<T: ?Sized> Drop for RcPointer<T> {
fn drop(&mut self) {
if self.count.fetch_sub(1, Ordering::SeqCst) == 1 {
unsafe { std::ptr::drop_in_place(self.ptr.as_ptr()) }
//unsafe { std::ptr::drop_in_place(self.ptr.as_ptr()) }
unsafe {
let _ = Box::from_raw(self.ptr.as_ptr());
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ impl From<std::io::Error> for StartupError {
#[derive(Debug)]
pub enum CancelError {
JobNotFoundError(u64),
CancelMapPoisonedError,
}

impl Display for CancelError {
Expand All @@ -351,6 +352,9 @@ impl Display for CancelError {
CancelError::JobNotFoundError(e) => {
write!(f, "fail to find job, job id: {};", e)
}
CancelError::CancelMapPoisonedError => {
write!(f, "JOB_CANCEL_MAP is poisoned!;")
}
}
}
}
Expand Down
20 changes: 16 additions & 4 deletions interactive_engine/executor/engine/pegasus/pegasus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,11 +299,23 @@ where
}

pub fn cancel_job(job_id: u64) -> Result<(), CancelError> {
let mut hook = JOB_CANCEL_MAP.write().expect("lock poisoned");
if let Some(cancel_hook) = hook.get_mut(&job_id) {
cancel_hook.store(true, Ordering::SeqCst);
if let Ok(mut hook) = JOB_CANCEL_MAP.write() {
if let Some(cancel_hook) = hook.get_mut(&job_id) {
cancel_hook.store(true, Ordering::SeqCst);
} else {
return Err(CancelError::JobNotFoundError(job_id));
}
} else {
return Err(CancelError::CancelMapPoisonedError);
}
Ok(())
}

pub fn remove_cancel_hook(job_id: u64) -> Result<(), CancelError> {
if let Ok(mut hook) = JOB_CANCEL_MAP.write() {
hook.remove(&job_id);
} else {
return Err(CancelError::JobNotFoundError(job_id));
return Err(CancelError::CancelMapPoisonedError);
}
Ok(())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,12 @@ impl<D: Data, T: Debug + Send + 'static> Worker<D, T> {
}

fn release(&mut self) {
self.peer_guard.fetch_sub(1, Ordering::SeqCst);
if self.peer_guard.fetch_sub(1, Ordering::SeqCst) == 1 {
pegasus_memory::alloc::remove_task(self.conf.job_id as usize);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bmmcq This usage of alloc::remove_task is not correct, right?

}
if !crate::remove_cancel_hook(self.conf.job_id).is_ok() {
error!("JOB_CANCEL_MAP is poisoned!");
}
}
}

Expand Down
Loading