Skip to content

Commit

Permalink
RATIS-2184. Improve TestRaftWithGrpc test stability
Browse files Browse the repository at this point in the history
  • Loading branch information
jianghuazhu committed Dec 18, 2024
1 parent 26c1f04 commit 415799f
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
public final class ReferenceCountedLeakDetector {
private static final Logger LOG = LoggerFactory.getLogger(ReferenceCountedLeakDetector.class);
// Leak detection is turned off by default.

private static final AtomicReference<Mode> FACTORY = new AtomicReference<>(Mode.NONE);
private static final Supplier<LeakDetector> SUPPLIER
= MemoizedSupplier.valueOf(() -> new LeakDetector(FACTORY.get().name()).start());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,21 @@ private long errorWaitTimeMs() {

@Override
public CompletableFuture<LifeCycle.State> stopAsync() {
grpcServerMetrics.unregister();
return super.stopAsync();
try (AutoCloseableLock ignored = lock.writeLock(caller, LOG::trace)) {
if (appendLogRequestObserver != null) {
appendLogRequestObserver.stop();
appendLogRequestObserver = null;
}
grpcServerMetrics.unregister();
return super.stopAsync();
}
/*grpcServerMetrics.unregister();
CompletableFuture<LifeCycle.State> future = super.stopAsync();
if (appendLogRequestObserver != null) {
appendLogRequestObserver.stop();
appendLogRequestObserver = null;
}
return future;*/
}

@Override
Expand Down Expand Up @@ -382,6 +395,9 @@ private void appendLog(boolean heartbeat) throws IOException {
final ReferenceCountedObject<AppendEntriesRequestProto> pending;
final AppendEntriesRequest request;
try (AutoCloseableLock writeLock = lock.writeLock(caller, LOG::trace)) {
if (!isRunning()) {
return;
}
// Prepare and send the append request.
// Note changes on follower's nextIndex and ops on pendingRequests should always be done under the write-lock
pending = nextAppendEntriesRequest(callId.getAndIncrement(), heartbeat);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,12 +261,19 @@ protected ReferenceCountedObject<AppendEntriesRequestProto> nextAppendEntriesReq
final long halfMs = heartbeatWaitTimeMs/2;
final Map<Long, ReferenceCountedObject<EntryWithData>> offered = new HashMap<>();
for (long next = followerNext; leaderNext > next && getHeartbeatWaitTimeMs() - halfMs > 0; next++) {
final ReferenceCountedObject<EntryWithData> entryWithData = getRaftLog().retainEntryWithData(next);
if (!buffer.offer(entryWithData.get())) {
entryWithData.release();
break;
ReferenceCountedObject<EntryWithData> entryWithData = null;
try {
entryWithData = getRaftLog().retainEntryWithData(next);
if (!buffer.offer(entryWithData.get())) {
entryWithData.release();
break;
}
offered.put(next, entryWithData);
} catch (Exception e){
if (entryWithData != null) {
entryWithData.release();
}
}
offered.put(next, entryWithData);
}
if (buffer.isEmpty()) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,12 +281,16 @@ public ReferenceCountedObject<LogEntryProto> load(LogRecord key) throws IOExcept
final AtomicReference<ReferenceCountedObject<LogEntryProto>> toReturn = new AtomicReference<>();
final LogSegmentStartEnd startEnd = LogSegmentStartEnd.valueOf(startIndex, endIndex, isOpen);
readSegmentFile(file, startEnd, maxOpSize, getLogCorruptionPolicy(), raftLogMetrics, entryRef -> {
final LogEntryProto entry = entryRef.retain();
final TermIndex ti = TermIndex.valueOf(entry);
putEntryCache(ti, entryRef, Op.LOAD_SEGMENT_FILE);
if (ti.equals(key.getTermIndex())) {
toReturn.set(entryRef);
} else {
try {
final LogEntryProto entry = entryRef.retain();
final TermIndex ti = TermIndex.valueOf(entry);
putEntryCache(ti, entryRef, Op.LOAD_SEGMENT_FILE);
if (ti.equals(key.getTermIndex())) {
toReturn.set(entryRef);
} else {
entryRef.release();
}
} catch (Exception e) {
entryRef.release();
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,14 +282,17 @@ private void loadLogSegments(long lastIndexInSnapshot,

@Override
public LogEntryProto get(long index) throws RaftLogIOException {
final ReferenceCountedObject<LogEntryProto> ref = retainLog(index);
if (ref == null) {
return null;
}
ReferenceCountedObject<LogEntryProto> ref = null;
try {
ref = retainLog(index);
if (ref == null) {
return null;
}
return LogProtoUtils.copy(ref.get());
} finally {
ref.release();
if (ref != null) {
ref.release();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -859,12 +859,11 @@ public void shutdown() {
getServers().forEach(proxy -> executor.submit(() -> JavaUtils.runAsUnchecked(proxy::close)));
final int maxRetries = 30;
final TimeDuration retrySleep = TimeDuration.ONE_SECOND;
try {
executor.shutdown();
// just wait for a few seconds
boolean terminated = false;
executor.shutdown();
boolean terminated = false;

for(int i = 0; i < maxRetries && !terminated; ) {
for(int i = 0; i < maxRetries && !terminated; ) {
try {
terminated = executor.awaitTermination(retrySleep.getDuration(), retrySleep.getUnit());
if (!terminated) {
i++;
Expand All @@ -874,10 +873,9 @@ public void shutdown() {
LOG.error("Failed to shutdown executor, some servers may be still running:\n{}", printServers());
}
}
}
} catch (InterruptedException e) {
} catch (InterruptedException e) {
LOG.warn("shutdown interrupted", e);
Thread.currentThread().interrupt();
}
}

Optional.ofNullable(timer.get()).ifPresent(Timer::cancel);
Expand Down

0 comments on commit 415799f

Please sign in to comment.