Skip to content

Commit

Permalink
RATIS-2184. Improve TestRaftWithGrpc test stability (#1177)
Browse files Browse the repository at this point in the history
  • Loading branch information
jianghuazhu authored Dec 23, 2024
1 parent 0514e09 commit b210965
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,14 @@ private long errorWaitTimeMs() {

@Override
public CompletableFuture<LifeCycle.State> stopAsync() {
grpcServerMetrics.unregister();
return super.stopAsync();
try (AutoCloseableLock ignored = lock.writeLock(caller, LOG::trace)) {
if (appendLogRequestObserver != null) {
appendLogRequestObserver.stop();
appendLogRequestObserver = null;
}
grpcServerMetrics.unregister();
return super.stopAsync();
}
}

@Override
Expand Down Expand Up @@ -382,6 +388,9 @@ private void appendLog(boolean heartbeat) throws IOException {
final ReferenceCountedObject<AppendEntriesRequestProto> pending;
final AppendEntriesRequest request;
try (AutoCloseableLock writeLock = lock.writeLock(caller, LOG::trace)) {
if (!isRunning()) {
return;
}
// Prepare and send the append request.
// Note changes on follower's nextIndex and ops on pendingRequests should always be done under the write-lock
pending = nextAppendEntriesRequest(callId.getAndIncrement(), heartbeat);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,12 +261,19 @@ protected ReferenceCountedObject<AppendEntriesRequestProto> nextAppendEntriesReq
final long halfMs = heartbeatWaitTimeMs/2;
final Map<Long, ReferenceCountedObject<EntryWithData>> offered = new HashMap<>();
for (long next = followerNext; leaderNext > next && getHeartbeatWaitTimeMs() - halfMs > 0; next++) {
final ReferenceCountedObject<EntryWithData> entryWithData = getRaftLog().retainEntryWithData(next);
if (!buffer.offer(entryWithData.get())) {
entryWithData.release();
break;
ReferenceCountedObject<EntryWithData> entryWithData = null;
try {
entryWithData = getRaftLog().retainEntryWithData(next);
if (!buffer.offer(entryWithData.get())) {
entryWithData.release();
break;
}
offered.put(next, entryWithData);
} catch (Exception e){
if (entryWithData != null) {
entryWithData.release();
}
}
offered.put(next, entryWithData);
}
if (buffer.isEmpty()) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,11 +282,15 @@ public ReferenceCountedObject<LogEntryProto> load(LogRecord key) throws IOExcept
final LogSegmentStartEnd startEnd = LogSegmentStartEnd.valueOf(startIndex, endIndex, isOpen);
readSegmentFile(file, startEnd, maxOpSize, getLogCorruptionPolicy(), raftLogMetrics, entryRef -> {
final LogEntryProto entry = entryRef.retain();
final TermIndex ti = TermIndex.valueOf(entry);
putEntryCache(ti, entryRef, Op.LOAD_SEGMENT_FILE);
if (ti.equals(key.getTermIndex())) {
toReturn.set(entryRef);
} else {
try {
final TermIndex ti = TermIndex.valueOf(entry);
putEntryCache(ti, entryRef, Op.LOAD_SEGMENT_FILE);
if (ti.equals(key.getTermIndex())) {
toReturn.set(entryRef);
} else {
entryRef.release();
}
} catch (Exception e) {
entryRef.release();
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -859,12 +859,11 @@ public void shutdown() {
getServers().forEach(proxy -> executor.submit(() -> JavaUtils.runAsUnchecked(proxy::close)));
final int maxRetries = 30;
final TimeDuration retrySleep = TimeDuration.ONE_SECOND;
try {
executor.shutdown();
// just wait for a few seconds
boolean terminated = false;
executor.shutdown();
boolean terminated = false;

for(int i = 0; i < maxRetries && !terminated; ) {
for(int i = 0; i < maxRetries && !terminated; ) {
try {
terminated = executor.awaitTermination(retrySleep.getDuration(), retrySleep.getUnit());
if (!terminated) {
i++;
Expand All @@ -874,10 +873,9 @@ public void shutdown() {
LOG.error("Failed to shutdown executor, some servers may be still running:\n{}", printServers());
}
}
}
} catch (InterruptedException e) {
} catch (InterruptedException e) {
LOG.warn("shutdown interrupted", e);
Thread.currentThread().interrupt();
}
}

Optional.ofNullable(timer.get()).ifPresent(Timer::cancel);
Expand Down

0 comments on commit b210965

Please sign in to comment.