Skip to content

Commit

Permalink
Allow for work stealing when only holding read locks (#4012)
Browse files Browse the repository at this point in the history
This allows tasks with only read locks to be stolen by threads that are
currently holding only read locks.

---------

Co-authored-by: Leonard Brünings <[email protected]>
  • Loading branch information
marcphilipp and leonard84 committed Sep 21, 2024
1 parent 0d25a5a commit 6529d8d
Show file tree
Hide file tree
Showing 12 changed files with 560 additions and 150 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,36 @@

package org.junit.platform.engine.support.hierarchical;

import static java.util.Collections.unmodifiableList;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.locks.Lock;

import org.junit.platform.commons.util.Preconditions;
import org.junit.platform.commons.util.ToStringBuilder;

/**
* @since 1.3
*/
class CompositeLock implements ResourceLock {

private final List<ExclusiveResource> resources;
private final List<Lock> locks;
private final boolean exclusive;

CompositeLock(List<Lock> locks) {
CompositeLock(List<ExclusiveResource> resources, List<Lock> locks) {
Preconditions.condition(resources.size() == locks.size(), "Resources and locks must have the same size");
this.resources = unmodifiableList(resources);
this.locks = Preconditions.notEmpty(locks, "Locks must not be empty");
this.exclusive = resources.stream().anyMatch(
resource -> resource.getLockMode() == ExclusiveResource.LockMode.READ_WRITE);
}

@Override
public List<ExclusiveResource> getResources() {
return resources;
}

// for tests only
Expand Down Expand Up @@ -64,6 +78,18 @@ private void release(List<Lock> acquiredLocks) {
}
}

@Override
public boolean isExclusive() {
return exclusive;
}

@Override
public String toString() {
return new ToStringBuilder(this) //
.append("resources", resources) //
.toString();
}

private class CompositeLockManagedBlocker implements ForkJoinPool.ManagedBlocker {

private volatile boolean acquired;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@

package org.junit.platform.engine.support.hierarchical;

import static java.util.Comparator.comparing;
import static java.util.Comparator.naturalOrder;
import static org.apiguardian.api.API.Status.STABLE;

import java.util.Comparator;
import java.util.Objects;
import java.util.concurrent.locks.ReadWriteLock;

Expand Down Expand Up @@ -50,6 +53,14 @@ public class ExclusiveResource {
static final ExclusiveResource GLOBAL_READ = new ExclusiveResource(GLOBAL_KEY, LockMode.READ);
static final ExclusiveResource GLOBAL_READ_WRITE = new ExclusiveResource(GLOBAL_KEY, LockMode.READ_WRITE);

static final Comparator<ExclusiveResource> COMPARATOR //
= comparing(ExclusiveResource::getKey, globalKeyFirst().thenComparing(naturalOrder())) //
.thenComparing(ExclusiveResource::getLockMode);

private static Comparator<String> globalKeyFirst() {
return comparing(key -> !GLOBAL_KEY.equals(key));
}

private final String key;
private final LockMode lockMode;
private int hash;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apiguardian.api.API.Status.STABLE;
import static org.junit.platform.engine.support.hierarchical.ExclusiveResource.GLOBAL_READ_WRITE;
import static org.junit.platform.engine.support.hierarchical.Node.ExecutionMode.CONCURRENT;
import static org.junit.platform.engine.support.hierarchical.Node.ExecutionMode.SAME_THREAD;

Expand Down Expand Up @@ -39,7 +40,6 @@
import org.junit.platform.commons.logging.LoggerFactory;
import org.junit.platform.commons.util.ExceptionUtils;
import org.junit.platform.engine.ConfigurationParameters;
import org.junit.platform.engine.support.hierarchical.SingleLock.GlobalReadWriteLock;

/**
* A {@link ForkJoinPool}-based
Expand All @@ -53,7 +53,9 @@
@API(status = STABLE, since = "1.10")
public class ForkJoinPoolHierarchicalTestExecutorService implements HierarchicalTestExecutorService {

private final ForkJoinPool forkJoinPool;
// package-private for testing
final ForkJoinPool forkJoinPool;

private final TaskEventListener taskEventListener;
private final int parallelism;
private final ThreadLocal<ThreadLock> threadLocks = ThreadLocal.withInitial(ThreadLock::new);
Expand Down Expand Up @@ -170,7 +172,7 @@ private void forkConcurrentTasks(List<? extends TestTask> tasks, Deque<Exclusive
Deque<ExclusiveTask> sameThreadTasks, Deque<ExclusiveTask> concurrentTasksInReverseOrder) {
for (TestTask testTask : tasks) {
ExclusiveTask exclusiveTask = new ExclusiveTask(testTask);
if (testTask.getResourceLock() instanceof GlobalReadWriteLock) {
if (requiresGlobalReadWriteLock(testTask)) {
isolatedTasks.add(exclusiveTask);
}
else if (testTask.getExecutionMode() == SAME_THREAD) {
Expand All @@ -183,6 +185,10 @@ else if (testTask.getExecutionMode() == SAME_THREAD) {
}
}

private static boolean requiresGlobalReadWriteLock(TestTask testTask) {
return testTask.getResourceLock().getResources().contains(GLOBAL_READ_WRITE);
}

private void executeSync(Deque<ExclusiveTask> tasks) {
for (ExclusiveTask task : tasks) {
task.execSync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,15 @@

import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static java.util.Comparator.comparing;
import static java.util.Comparator.naturalOrder;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.toList;
import static org.junit.platform.commons.util.CollectionUtils.getOnlyElement;
import static org.junit.platform.engine.support.hierarchical.ExclusiveResource.GLOBAL_KEY;
import static org.junit.platform.commons.util.CollectionUtils.toUnmodifiableList;
import static org.junit.platform.engine.support.hierarchical.ExclusiveResource.GLOBAL_READ;
import static org.junit.platform.engine.support.hierarchical.ExclusiveResource.GLOBAL_READ_WRITE;
import static org.junit.platform.engine.support.hierarchical.ExclusiveResource.LockMode.READ;

import java.util.Collection;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -32,83 +29,75 @@
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.junit.platform.engine.support.hierarchical.SingleLock.GlobalReadLock;
import org.junit.platform.engine.support.hierarchical.SingleLock.GlobalReadWriteLock;

/**
* @since 1.3
*/
class LockManager {

private static final Comparator<ExclusiveResource> COMPARATOR //
= comparing(ExclusiveResource::getKey, globalKeyFirst().thenComparing(naturalOrder())) //
.thenComparing(ExclusiveResource::getLockMode);

private static Comparator<String> globalKeyFirst() {
return comparing(key -> !GLOBAL_KEY.equals(key));
}

private final Map<String, ReadWriteLock> locksByKey = new ConcurrentHashMap<>();
private final GlobalReadLock globalReadLock;
private final GlobalReadWriteLock globalReadWriteLock;
private final SingleLock globalReadLock;
private final SingleLock globalReadWriteLock;

public LockManager() {
globalReadLock = new GlobalReadLock(toLock(GLOBAL_READ));
globalReadWriteLock = new GlobalReadWriteLock(toLock(GLOBAL_READ_WRITE));
globalReadLock = new SingleLock(GLOBAL_READ, toLock(GLOBAL_READ));
globalReadWriteLock = new SingleLock(GLOBAL_READ_WRITE, toLock(GLOBAL_READ_WRITE));
}

ResourceLock getLockForResources(Collection<ExclusiveResource> resources) {
return toResourceLock(toDistinctSortedLocks(resources));
return toResourceLock(toDistinctSortedResources(resources));
}

ResourceLock getLockForResource(ExclusiveResource resource) {
return toResourceLock(toLock(resource));
return toResourceLock(singletonList(resource));
}

private List<Lock> toDistinctSortedLocks(Collection<ExclusiveResource> resources) {
private List<ExclusiveResource> toDistinctSortedResources(Collection<ExclusiveResource> resources) {
if (resources.isEmpty()) {
return emptyList();
}
if (resources.size() == 1) {
return singletonList(toLock(getOnlyElement(resources)));
return singletonList(getOnlyElement(resources));
}
// @formatter:off
Map<String, List<ExclusiveResource>> resourcesByKey = resources.stream()
.sorted(COMPARATOR)
.sorted(ExclusiveResource.COMPARATOR)
.distinct()
.collect(groupingBy(ExclusiveResource::getKey, LinkedHashMap::new, toList()));

return resourcesByKey.values().stream()
.map(resourcesWithSameKey -> resourcesWithSameKey.get(0))
.map(this::toLock)
.collect(toList());
.collect(toUnmodifiableList());
// @formatter:on
}

private Lock toLock(ExclusiveResource resource) {
ReadWriteLock lock = this.locksByKey.computeIfAbsent(resource.getKey(), key -> new ReentrantReadWriteLock());
return resource.getLockMode() == READ ? lock.readLock() : lock.writeLock();
}

private ResourceLock toResourceLock(List<Lock> locks) {
switch (locks.size()) {
private ResourceLock toResourceLock(List<ExclusiveResource> resources) {
switch (resources.size()) {
case 0:
return NopLock.INSTANCE;
case 1:
return toResourceLock(locks.get(0));
return toSingleLock(getOnlyElement(resources));
default:
return new CompositeLock(locks);
return new CompositeLock(resources, toLocks(resources));
}
}

private ResourceLock toResourceLock(Lock lock) {
if (lock == toLock(GLOBAL_READ)) {
private SingleLock toSingleLock(ExclusiveResource resource) {
if (GLOBAL_READ.equals(resource)) {
return globalReadLock;
}
if (lock == toLock(GLOBAL_READ_WRITE)) {
if (GLOBAL_READ_WRITE.equals(resource)) {
return globalReadWriteLock;
}
return new SingleLock(lock);
return new SingleLock(resource, toLock(resource));
}

private List<Lock> toLocks(List<ExclusiveResource> resources) {
return resources.stream().map(this::toLock).collect(toUnmodifiableList());
}

private Lock toLock(ExclusiveResource resource) {
ReadWriteLock lock = this.locksByKey.computeIfAbsent(resource.getKey(), key -> new ReentrantReadWriteLock());
return resource.getLockMode() == READ ? lock.readLock() : lock.writeLock();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@

package org.junit.platform.engine.support.hierarchical;

import static java.util.Collections.emptyList;

import java.util.List;

import org.junit.platform.commons.util.ToStringBuilder;

/**
* No-op {@link ResourceLock} implementation.
*
Expand All @@ -22,6 +28,11 @@ class NopLock implements ResourceLock {
private NopLock() {
}

@Override
public List<ExclusiveResource> getResources() {
return emptyList();
}

@Override
public ResourceLock acquire() {
return this;
Expand All @@ -32,4 +43,13 @@ public void release() {
// nothing to do
}

@Override
public boolean isExclusive() {
return false;
}

@Override
public String toString() {
return new ToStringBuilder(this).toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@

import static org.apiguardian.api.API.Status.STABLE;

import java.util.List;
import java.util.Optional;

import org.apiguardian.api.API;

/**
Expand Down Expand Up @@ -43,12 +46,50 @@ default void close() {
release();
}

/**
* {@return the exclusive resources this lock represents}
*/
List<ExclusiveResource> getResources();

/**
* {@return whether this lock requires exclusiveness}
*/
boolean isExclusive();

/**
* {@return whether the given lock is compatible with this lock}
* @param other the other lock to check for compatibility
*/
default boolean isCompatible(ResourceLock other) {
return this instanceof NopLock || other instanceof NopLock;
}

List<ExclusiveResource> ownResources = this.getResources();
List<ExclusiveResource> otherResources = other.getResources();

if (ownResources.isEmpty() || otherResources.isEmpty()) {
return true;
}

// Whenever there's a READ_WRITE lock, it's incompatible with any other lock
// because we guarantee that all children will have exclusive access to the
// resource in question. In practice, whenever a READ_WRITE lock is present,
// NodeTreeWalker will force all children to run in the same thread so that
// it should never attempt to steal work from another thread, and we shouldn't
// actually reach this point.
// The global read lock (which is always on direct children of the engine node)
// needs special treatment so that it is compatible with the first write lock
// (which may be on a test method).
boolean isGlobalReadLock = ownResources.size() == 1
&& ExclusiveResource.GLOBAL_READ.equals(ownResources.get(0));
if ((!isGlobalReadLock && other.isExclusive()) || this.isExclusive()) {
return false;
}

Optional<ExclusiveResource> potentiallyDeadlockCausingAdditionalResource = otherResources.stream() //
.filter(resource -> !ownResources.contains(resource)) //
.findFirst() //
.filter(resource -> ExclusiveResource.COMPARATOR.compare(resource,
ownResources.get(ownResources.size() - 1)) < 0);

return !(potentiallyDeadlockCausingAdditionalResource.isPresent());
}
}
Loading

0 comments on commit 6529d8d

Please sign in to comment.