Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prototype: New epoch algorithm #963

Open
wants to merge 20 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 29 additions & 166 deletions crossbeam-epoch/src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,21 @@
/// ```
use core::fmt;

use alloc::rc::Rc;

use crate::guard::Guard;
use crate::internal::{Global, Local};
use crate::primitive::sync::Arc;

/// An epoch-based garbage collector.
#[derive(Clone, Default, Debug)]
pub struct Collector {
pub(crate) global: Arc<Global>,
}

unsafe impl Send for Collector {}
unsafe impl Sync for Collector {}

impl Default for Collector {
fn default() -> Self {
Self {
global: Arc::new(Global::new()),
}
}
}

impl Collector {
/// Creates a new collector.
pub fn new() -> Self {
Expand All @@ -42,22 +37,7 @@ impl Collector {

/// Registers a new handle for the collector.
pub fn register(&self) -> LocalHandle {
Local::register(self)
}
}

impl Clone for Collector {
/// Creates another reference to the same garbage collector.
fn clone(&self) -> Self {
Collector {
global: self.global.clone(),
}
}
}

impl fmt::Debug for Collector {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.pad("Collector { .. }")
Local::new(self)
}
}

Expand All @@ -70,36 +50,31 @@ impl PartialEq for Collector {
impl Eq for Collector {}

/// A handle to a garbage collector.

pub struct LocalHandle {
pub(crate) local: *const Local,
pub(crate) local: Rc<Local>,
}

impl LocalHandle {
/// Pins the handle.
#[inline]
pub fn pin(&self) -> Guard {
unsafe { (*self.local).pin() }
self.local.pin();
Guard {
local: Some(self.local.clone()),
}
}

/// Returns `true` if the handle is pinned.
#[inline]
pub fn is_pinned(&self) -> bool {
unsafe { (*self.local).is_pinned() }
self.local.is_pinned()
}

/// Returns the `Collector` associated with this handle.
#[inline]
pub fn collector(&self) -> &Collector {
unsafe { (*self.local).collector() }
}
}

impl Drop for LocalHandle {
#[inline]
fn drop(&mut self) {
unsafe {
Local::release_handle(&*self.local);
}
self.local.collector()
}
}

Expand Down Expand Up @@ -139,43 +114,6 @@ mod tests {
assert!(!handle.is_pinned());
}

#[test]
fn flush_local_bag() {
let collector = Collector::new();
let handle = collector.register();
drop(collector);

for _ in 0..100 {
let guard = &handle.pin();
unsafe {
let a = Owned::new(7).into_shared(guard);
guard.defer_destroy(a);

assert!(!(*guard.local).bag.with(|b| (*b).is_empty()));

while !(*guard.local).bag.with(|b| (*b).is_empty()) {
guard.flush();
}
}
}
}

#[test]
fn garbage_buffering() {
let collector = Collector::new();
let handle = collector.register();
drop(collector);

let guard = &handle.pin();
unsafe {
for _ in 0..10 {
let a = Owned::new(7).into_shared(guard);
guard.defer_destroy(a);
}
assert!(!(*guard.local).bag.with(|b| (*b).is_empty()));
}
}

#[test]
fn pin_holds_advance() {
#[cfg(miri)]
Expand All @@ -193,9 +131,12 @@ mod tests {
let guard = &handle.pin();

let before = collector.global.epoch.load(Ordering::Relaxed);
collector.global.collect(guard);
let after = collector.global.epoch.load(Ordering::Relaxed);
guard.flush();
let mut after = collector.global.epoch.load(Ordering::Relaxed);

if after < before {
after += 3;
}
assert!(after.wrapping_sub(before) <= 2);
}
});
Expand All @@ -204,80 +145,6 @@ mod tests {
.unwrap();
}

#[cfg(not(crossbeam_sanitize))] // TODO: assertions failed due to `cfg(crossbeam_sanitize)` reduce `internal::MAX_OBJECTS`
#[test]
fn incremental() {
#[cfg(miri)]
const COUNT: usize = 500;
#[cfg(not(miri))]
const COUNT: usize = 100_000;
static DESTROYS: AtomicUsize = AtomicUsize::new(0);

let collector = Collector::new();
let handle = collector.register();

unsafe {
let guard = &handle.pin();
for _ in 0..COUNT {
let a = Owned::new(7i32).into_shared(guard);
guard.defer_unchecked(move || {
drop(a.into_owned());
DESTROYS.fetch_add(1, Ordering::Relaxed);
});
}
guard.flush();
}

let mut last = 0;

while last < COUNT {
let curr = DESTROYS.load(Ordering::Relaxed);
assert!(curr - last <= 1024);
last = curr;

let guard = &handle.pin();
collector.global.collect(guard);
}
assert!(DESTROYS.load(Ordering::Relaxed) == COUNT);
}

#[test]
fn buffering() {
const COUNT: usize = 10;
#[cfg(miri)]
const N: usize = 500;
#[cfg(not(miri))]
const N: usize = 100_000;
static DESTROYS: AtomicUsize = AtomicUsize::new(0);

let collector = Collector::new();
let handle = collector.register();

unsafe {
let guard = &handle.pin();
for _ in 0..COUNT {
let a = Owned::new(7i32).into_shared(guard);
guard.defer_unchecked(move || {
drop(a.into_owned());
DESTROYS.fetch_add(1, Ordering::Relaxed);
});
}
}

for _ in 0..N {
collector.global.collect(&handle.pin());
}
assert!(DESTROYS.load(Ordering::Relaxed) < COUNT);

handle.pin().flush();

while DESTROYS.load(Ordering::Relaxed) < COUNT {
let guard = &handle.pin();
collector.global.collect(guard);
}
assert_eq!(DESTROYS.load(Ordering::Relaxed), COUNT);
}

#[test]
fn count_drops() {
#[cfg(miri)]
Expand Down Expand Up @@ -307,9 +174,8 @@ mod tests {
guard.flush();
}

while DROPS.load(Ordering::Relaxed) < COUNT {
let guard = &handle.pin();
collector.global.collect(guard);
for _ in 0..6 {
handle.pin().flush();
}
assert_eq!(DROPS.load(Ordering::Relaxed), COUNT);
}
Expand Down Expand Up @@ -338,9 +204,8 @@ mod tests {
guard.flush();
}

while DESTROYS.load(Ordering::Relaxed) < COUNT {
let guard = &handle.pin();
collector.global.collect(guard);
for _ in 0..6 {
handle.pin().flush();
}
assert_eq!(DESTROYS.load(Ordering::Relaxed), COUNT);
}
Expand Down Expand Up @@ -376,9 +241,9 @@ mod tests {
guard.flush();
}

while DROPS.load(Ordering::Relaxed) < COUNT {
for _ in 0..6 {
guard.repin();
collector.global.collect(&guard);
guard.flush();
}
assert_eq!(DROPS.load(Ordering::Relaxed), COUNT);
}
Expand Down Expand Up @@ -411,16 +276,15 @@ mod tests {
guard.flush();
}

while DESTROYS.load(Ordering::Relaxed) < COUNT {
let guard = &handle.pin();
collector.global.collect(guard);
for _ in 0..6 {
handle.pin().flush();
}
assert_eq!(DESTROYS.load(Ordering::Relaxed), COUNT);
}

#[test]
fn stress() {
const THREADS: usize = 8;
const THREADS: usize = 3;
#[cfg(miri)]
const COUNT: usize = 500;
#[cfg(not(miri))]
Expand All @@ -435,7 +299,7 @@ mod tests {
}
}

let collector = Collector::new();
let collector = &Collector::new();

thread::scope(|scope| {
for _ in 0..THREADS {
Expand All @@ -454,9 +318,8 @@ mod tests {
.unwrap();

let handle = collector.register();
while DROPS.load(Ordering::Relaxed) < COUNT * THREADS {
let guard = &handle.pin();
collector.global.collect(guard);
for _ in 0..6 {
handle.pin().flush();
}
assert_eq!(DROPS.load(Ordering::Relaxed), COUNT * THREADS);
}
Expand Down
1 change: 1 addition & 0 deletions crossbeam-epoch/src/deferred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ impl fmt::Debug for Deferred {
}

impl Deferred {
#[cfg(crossbeam_loom)]
pub(crate) const NO_OP: Self = {
fn no_op_call(_raw: *mut u8) {}
Self {
Expand Down
Loading