Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Testing zero-copy bugs fixes (not for merging) #1156

Open
wants to merge 31 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
0d2a4dc
RATIS-2164. LeakDetector has a race condition.
szetszwo Sep 28, 2024
205c720
Fix a bug and checkstyle.
szetszwo Sep 28, 2024
58f296f
Enable advanced detection for debugging.
szetszwo Sep 28, 2024
67577ff
Fixed some bugs.
szetszwo Sep 28, 2024
90321af
Some minor changes.
szetszwo Sep 28, 2024
1c5c6eb
try-catch MiniRaftCluster shutdown.
szetszwo Sep 29, 2024
9159532
Report earlier leaks at shutdown.
szetszwo Sep 29, 2024
0f4b61e
Enable advance leak detection.
szetszwo Sep 29, 2024
fe29cde
Move the enable method to ReferenceCountedLeakDetector.
szetszwo Sep 29, 2024
7a6fef9
Use HashMap.
szetszwo Oct 3, 2024
77db48e
Fix a bug in LogAppenderDefault.
szetszwo Oct 3, 2024
c8e3ac8
Rewrite AdvancedTracing.
szetszwo Oct 4, 2024
aea498f
Fix a bug in LogSegment cache.
szetszwo Oct 4, 2024
0104ece
Add synchronized to get()
szetszwo Oct 4, 2024
43980fb
Fix javac error.
szetszwo Oct 4, 2024
9150bdc
Restore RaftBasicTests.
szetszwo Oct 4, 2024
38f5c69
Move ReferenceCountedLeakDetector.enable(..) to MiniRaftCluster.
szetszwo Oct 4, 2024
23af8ed
Fix bugs in LogSegment.EntryCache.
szetszwo Oct 5, 2024
3512387
Fix a bug in SimpleStateMachine4Testing.
szetszwo Oct 5, 2024
b548373
Copy LogEntryProto in SimpleStateMachine4Testing.
szetszwo Oct 5, 2024
c4ac263
Use Throwable in MiniRaftCluster.
szetszwo Oct 5, 2024
b831226
New entries can to added after EntryCache is closed.
szetszwo Oct 6, 2024
55a3896
Bump test related plugin versions.
szetszwo Oct 7, 2024
1d49431
Reduce messages to 100
szetszwo Oct 7, 2024
b57a748
Fix checkstyle.
szetszwo Oct 7, 2024
483b6ae
Resest test Xmx to 2g
szetszwo Oct 7, 2024
6547e14
Retry assertNoLeaks multiple times.
szetszwo Oct 7, 2024
dc690e4
Copy log entries in MemoryRaftLog.
szetszwo Oct 7, 2024
632809e
SegmentedRaftLogWorker should clean up unfinished tasks in the queue.
szetszwo Oct 7, 2024
3b07ab9
Fix checkstyle
szetszwo Oct 7, 2024
6c15124
Revert pom.xml changes.
szetszwo Oct 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 71 additions & 26 deletions ratis-common/src/main/java/org/apache/ratis/util/LeakDetector.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,14 @@

import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Supplier;

/**
* Simple general resource leak detector using {@link ReferenceQueue} and {@link java.lang.ref.WeakReference} to
Expand Down Expand Up @@ -55,13 +59,56 @@
*/
public class LeakDetector {
private static final Logger LOG = LoggerFactory.getLogger(LeakDetector.class);

private static class LeakTrackerSet {
private final Set<LeakTracker> set = Collections.newSetFromMap(new ConcurrentHashMap<>());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All methods are synchronized, do we still need to use ConcurrentHashMap?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right! We don't need ConcurrentHashMap anymore.


synchronized boolean remove(LeakTracker tracker) {
return set.remove(tracker);
}

synchronized void removeExisting(LeakTracker tracker) {
final boolean removed = set.remove(tracker);
Preconditions.assertTrue(removed, () -> "Failed to remove existing " + tracker);
}

synchronized LeakTracker add(Object referent, ReferenceQueue<Object> queue, Supplier<String> leakReporter) {
final LeakTracker tracker = new LeakTracker(referent, queue, this::removeExisting, leakReporter);
final boolean added = set.add(tracker);
Preconditions.assertTrue(added, () -> "Failed to add " + tracker + " for " + referent);
return tracker;
}

synchronized void assertNoLeaks() {
if (set.isEmpty()) {
return;
}

int n = 0;
for (LeakTracker tracker : set) {
if (tracker.reportLeak() != null) {
n++;
}
}
assertNoLeaks(n);
}

synchronized void assertNoLeaks(int leaks) {
Preconditions.assertTrue(leaks == 0, () -> {
final int size = set.size();
return "#leaks = " + leaks + (leaks == size? "==" : "!=") + " set.size = " + size;
});
}
}

private static final AtomicLong COUNTER = new AtomicLong();

private final ReferenceQueue<Object> queue = new ReferenceQueue<>();
private final Set<LeakTracker> allLeaks = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final LeakTrackerSet allLeaks = new LeakTrackerSet();
private final List<String> leakMessages = Collections.synchronizedList(new ArrayList<>());
private final String name;

public LeakDetector(String name) {
LeakDetector(String name) {
this.name = name + COUNTER.getAndIncrement();
}

Expand All @@ -81,7 +128,10 @@ private void run() {
// Original resource already been GCed, if tracker is not closed yet,
// report a leak.
if (allLeaks.remove(tracker)) {
tracker.reportLeak();
final String leak = tracker.reportLeak();
if (leak != null) {
leakMessages.add(leak);
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand All @@ -93,48 +143,43 @@ private void run() {
LOG.warn("Exiting leak detector {}.", name);
}

public UncheckedAutoCloseable track(Object leakable, Runnable reportLeak) {
Runnable track(Object leakable, Supplier<String> reportLeak) {
// A rate filter can be put here to only track a subset of all objects, e.g. 5%, 10%,
// if we have proofs that leak tracking impacts performance, or a single LeakDetector
// thread can't keep up with the pace of object allocation.
// For now, it looks effective enough and let keep it simple.
LeakTracker tracker = new LeakTracker(leakable, queue, allLeaks, reportLeak);
allLeaks.add(tracker);
return tracker;
return allLeaks.add(leakable, queue, reportLeak)::remove;
}

public void assertNoLeaks() {
Preconditions.assertTrue(allLeaks.isEmpty(), this::allLeaksString);
}

String allLeaksString() {
if (allLeaks.isEmpty()) {
return "allLeaks = <empty>";
synchronized (leakMessages) {
Preconditions.assertTrue(leakMessages.isEmpty(),
() -> "#leaks = " + leakMessages.size() + "\n" + leakMessages);
}
allLeaks.forEach(LeakTracker::reportLeak);
return "allLeaks.size = " + allLeaks.size();
allLeaks.assertNoLeaks();
}

private static final class LeakTracker extends WeakReference<Object> implements UncheckedAutoCloseable {
private final Set<LeakTracker> allLeaks;
private final Runnable leakReporter;
private static final class LeakTracker extends WeakReference<Object> {
private final Consumer<LeakTracker> removeMethod;
private final Supplier<String> leakReporter;

LeakTracker(Object referent, ReferenceQueue<Object> referenceQueue,
Set<LeakTracker> allLeaks, Runnable leakReporter) {
Consumer<LeakTracker> removeMethod, Supplier<String> leakReporter) {
super(referent, referenceQueue);
this.allLeaks = allLeaks;
this.removeMethod = removeMethod;
this.leakReporter = leakReporter;
}

/**
* Called by the tracked resource when closing.
* Called by the tracked resource when releasing the object.
*/
@Override
public void close() {
allLeaks.remove(this);
void remove() {
removeMethod.accept(this);
}

void reportLeak() {
leakReporter.run();
/** @return the leak message if there is a leak; return null if there is no leak. */
String reportLeak() {
return leakReporter.get();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public static LeakDetector getLeakDetector() {
private ReferenceCountedLeakDetector() {
}

static synchronized void enable(boolean advanced) {
public static synchronized void enable(boolean advanced) {
FACTORY.set(advanced ? Mode.ADVANCED : Mode.SIMPLE);
}

Expand Down Expand Up @@ -108,6 +108,10 @@ public V get() {
return value;
}

final int getCount() {
return count.get();
}

@Override
public V retain() {
// n < 0: exception
Expand Down Expand Up @@ -138,85 +142,108 @@ public boolean release() {
}

private static class SimpleTracing<T> extends Impl<T> {
private final UncheckedAutoCloseable leakTracker;
private final LeakDetector leakDetector;
private final Class<?> valueClass;

private Runnable removeMethod = null;

SimpleTracing(T value, Runnable retainMethod, Consumer<Boolean> releaseMethod, LeakDetector leakDetector) {
super(value, retainMethod, releaseMethod);
final Class<?> clazz = value.getClass();
this.leakTracker = leakDetector.track(this,
() -> LOG.warn("LEAK: A {} is not released properly", clazz.getName()));
this.valueClass = value.getClass();
this.leakDetector = leakDetector;
}

String getInfo(int count) {
return "(" + valueClass + ", count=" + count + ")";
}

/** @return the leak message if there is a leak; return null if there is no leak. */
String logLeakMessage() {
final int count = getCount();
if (count == 0) {
return null;
}
final String message = "LEAK: " + getInfo(count);
LOG.warn(message);
return message;
}

@Override
public boolean release() {
boolean released = super.release();
public synchronized T retain() {
if (getCount() == 0) {
this.removeMethod = leakDetector.track(this, this::logLeakMessage);
}
try {
return super.retain();
} catch (Exception e) {
throw new IllegalStateException("Failed to retain: " + getInfo(getCount()), e);
}
}

@Override
public synchronized boolean release() {
final boolean released;
try {
released = super.release();
} catch (Exception e) {
throw new IllegalStateException("Failed to release: " + getInfo(getCount()), e);
}

if (released) {
leakTracker.close();
Preconditions.assertNotNull(removeMethod, () -> "Not yet retained (removeMethod == null): " + valueClass);
removeMethod.run();
}
return released;
}
}

private static class AdvancedTracing<T> extends Impl<T> {
private final UncheckedAutoCloseable leakTracker;
private final List<StackTraceElement[]> retainsTraces;
private final List<StackTraceElement[]> releaseTraces;
private static class AdvancedTracing<T> extends SimpleTracing<T> {
private final StackTraceElement[] createStrace = Thread.currentThread().getStackTrace();
private final List<StackTraceElement[]> retainsTraces = new LinkedList<>();
private final List<StackTraceElement[]> releaseTraces = new LinkedList<>();

AdvancedTracing(T value, Runnable retainMethod, Consumer<Boolean> releaseMethod, LeakDetector leakDetector) {
super(value, retainMethod, releaseMethod);

StackTraceElement[] createStrace = Thread.currentThread().getStackTrace();
final Class<?> clazz = value.getClass();
final List<StackTraceElement[]> localRetainsTraces = new LinkedList<>();
final List<StackTraceElement[]> localReleaseTraces = new LinkedList<>();

this.leakTracker = leakDetector.track(this, () ->
LOG.warn("LEAK: A {} is not released properly.\nCreation trace:\n{}\n" +
"Retain traces({}):\n{}\nRelease traces({}):\n{}",
clazz.getName(), formatStackTrace(createStrace, 3),
localRetainsTraces.size(), formatStackTraces(localRetainsTraces, 2),
localReleaseTraces.size(), formatStackTraces(localReleaseTraces, 2)));
super(value, retainMethod, releaseMethod, leakDetector);
}

this.retainsTraces = localRetainsTraces;
this.releaseTraces = localReleaseTraces;
@Override
synchronized String getInfo(int count) {
return super.getInfo(count)
+ "\n Creation trace: " + formatStackTrace(createStrace)
+ "\n Retain traces: " + formatStackTraces("retain", retainsTraces)
+ "\n Release traces: " + formatStackTraces("release", releaseTraces);
}

@Override
public T retain() {
T retain = super.retain();
public synchronized T retain() {
retainsTraces.add(Thread.currentThread().getStackTrace());
return retain;
return super.retain();
}

@Override
public boolean release() {
boolean released = super.release();
if (released) {
leakTracker.close();
}
releaseTraces.add(Thread.currentThread().getStackTrace());
return released;
return super.release();
}
}

private static String formatStackTrace(StackTraceElement[] stackTrace, int startIdx) {
final StringBuilder sb = new StringBuilder();
for (int line = startIdx; line < stackTrace.length; line++) {
sb.append(stackTrace[line]).append("\n");
private static String formatStackTrace(StackTraceElement[] stackTrace) {
return formatStackTrace(stackTrace, new StringBuilder()).toString();
}

private static StringBuilder formatStackTrace(StackTraceElement[] stackTrace, StringBuilder sb) {
for (int line = 2; line < stackTrace.length; line++) {
sb.append(" ").append(stackTrace[line]).append("\n");
}
return sb.toString();
return sb;
}

private static String formatStackTraces(List<StackTraceElement[]> stackTraces, int startIdx) {
final StringBuilder sb = new StringBuilder();
stackTraces.forEach(stackTrace -> {
if (sb.length() > 0) {
sb.append("\n");
}
for (int line = startIdx; line < stackTrace.length; line++) {
sb.append(stackTrace[line]).append("\n");
}
});
private static String formatStackTraces(String name, List<StackTraceElement[]> stackTraces) {
final StringBuilder sb = new StringBuilder(stackTraces.size()).append(" trace(s)");
for (int i = 0; i < stackTraces.size(); i++) {
sb.append("\n").append(name).append(" ").append(i).append(":\n");
formatStackTrace(stackTraces.get(i), sb);
}
return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -182,13 +182,4 @@ static <V> ReferenceCountedObject<V> wrap(V value, Runnable retainMethod, Consum
static <V> ReferenceCountedObject<V> wrap(V value, Runnable retainMethod, Runnable releaseMethod) {
return wrap(value, retainMethod, ignored -> releaseMethod.run());
}

static void enableLeakDetection() {
ReferenceCountedLeakDetector.enable(false);
}

static void enableAdvancedLeakDetection() {
ReferenceCountedLeakDetector.enable(true);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.util.NetUtils;
import org.apache.ratis.util.ReferenceCountedLeakDetector;
import org.apache.ratis.util.ReferenceCountedObject;
import org.junit.Assert;

Expand All @@ -51,7 +52,7 @@ public MiniRaftClusterWithGrpc newCluster(String[] ids, String[] listenerIds, Ra
};

static {
ReferenceCountedObject.enableLeakDetection();
ReferenceCountedLeakDetector.enable(false);
}

public interface FactoryGet extends Factory.Get<MiniRaftClusterWithGrpc> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,14 @@ private AppendEntriesReplyProto sendAppendEntriesWithRetries(AtomicLong requestF
}

final AppendEntriesRequestProto proto = request.get();
final AppendEntriesReplyProto reply = sendAppendEntries(proto);
final long first = proto.getEntriesCount() > 0 ? proto.getEntries(0).getIndex() : RaftLog.INVALID_LOG_INDEX;
requestFirstIndex.set(first);
request.release();
final AppendEntriesReplyProto reply;
try {
reply = sendAppendEntries(proto);
final long first = proto.getEntriesCount() > 0 ? proto.getEntries(0).getIndex() : RaftLog.INVALID_LOG_INDEX;
requestFirstIndex.set(first);
} finally {
request.release();
}
return reply;
} catch (InterruptedIOException | RaftLogIOException e) {
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,9 +306,13 @@ public ReferenceCountedObject<LogEntryProto> retainLog(long index) throws RaftLo
}
final ReferenceCountedObject<LogEntryProto> entry = segment.getEntryFromCache(record.getTermIndex());
if (entry != null) {
getRaftLogMetrics().onRaftLogCacheHit();
entry.retain();
return entry;
try {
entry.retain();
getRaftLogMetrics().onRaftLogCacheHit();
return entry;
} catch (IllegalStateException ignore) {
// the entry could be removed from the cache and released.
}
}

// the entry is not in the segment's cache. Load the cache without holding the lock.
Expand Down
Loading
Loading