diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java index acf6fb8cfc..9ebbe7c18e 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java @@ -33,7 +33,6 @@ public final class ReferenceCountedLeakDetector { private static final Logger LOG = LoggerFactory.getLogger(ReferenceCountedLeakDetector.class); // Leak detection is turned off by default. - private static final AtomicReference FACTORY = new AtomicReference<>(Mode.NONE); private static final Supplier SUPPLIER = MemoizedSupplier.valueOf(() -> new LeakDetector(FACTORY.get().name()).start()); diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java index 0784eaf047..927bf8ce5c 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java @@ -302,8 +302,21 @@ private long errorWaitTimeMs() { @Override public CompletableFuture stopAsync() { - grpcServerMetrics.unregister(); - return super.stopAsync(); + try (AutoCloseableLock ignored = lock.writeLock(caller, LOG::trace)) { + if (appendLogRequestObserver != null) { + appendLogRequestObserver.stop(); + appendLogRequestObserver = null; + } + grpcServerMetrics.unregister(); + return super.stopAsync(); + } + /*grpcServerMetrics.unregister(); + CompletableFuture future = super.stopAsync(); + if (appendLogRequestObserver != null) { + appendLogRequestObserver.stop(); + appendLogRequestObserver = null; + } + return future;*/ } @Override @@ -382,6 +395,9 @@ private void appendLog(boolean heartbeat) throws IOException { final ReferenceCountedObject pending; final AppendEntriesRequest request; try (AutoCloseableLock writeLock = lock.writeLock(caller, LOG::trace)) { + if (!isRunning()) { + return; + } // Prepare and send the append request. // Note changes on follower's nextIndex and ops on pendingRequests should always be done under the write-lock pending = nextAppendEntriesRequest(callId.getAndIncrement(), heartbeat); diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java index d1557588d6..e6c92ce046 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java @@ -261,12 +261,19 @@ protected ReferenceCountedObject nextAppendEntriesReq final long halfMs = heartbeatWaitTimeMs/2; final Map> offered = new HashMap<>(); for (long next = followerNext; leaderNext > next && getHeartbeatWaitTimeMs() - halfMs > 0; next++) { - final ReferenceCountedObject entryWithData = getRaftLog().retainEntryWithData(next); - if (!buffer.offer(entryWithData.get())) { - entryWithData.release(); - break; + ReferenceCountedObject entryWithData = null; + try { + entryWithData = getRaftLog().retainEntryWithData(next); + if (!buffer.offer(entryWithData.get())) { + entryWithData.release(); + break; + } + offered.put(next, entryWithData); + } catch (Exception e){ + if (entryWithData != null) { + entryWithData.release(); + } } - offered.put(next, entryWithData); } if (buffer.isEmpty()) { return null; diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java index 2eea0f90f1..5a5d3eb1f8 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java @@ -281,12 +281,16 @@ public ReferenceCountedObject load(LogRecord key) throws IOExcept final AtomicReference> toReturn = new AtomicReference<>(); final LogSegmentStartEnd startEnd = LogSegmentStartEnd.valueOf(startIndex, endIndex, isOpen); readSegmentFile(file, startEnd, maxOpSize, getLogCorruptionPolicy(), raftLogMetrics, entryRef -> { - final LogEntryProto entry = entryRef.retain(); - final TermIndex ti = TermIndex.valueOf(entry); - putEntryCache(ti, entryRef, Op.LOAD_SEGMENT_FILE); - if (ti.equals(key.getTermIndex())) { - toReturn.set(entryRef); - } else { + try { + final LogEntryProto entry = entryRef.retain(); + final TermIndex ti = TermIndex.valueOf(entry); + putEntryCache(ti, entryRef, Op.LOAD_SEGMENT_FILE); + if (ti.equals(key.getTermIndex())) { + toReturn.set(entryRef); + } else { + entryRef.release(); + } + } catch (Exception e) { entryRef.release(); } }); diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java index 01143c753f..beebeba350 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java @@ -282,14 +282,17 @@ private void loadLogSegments(long lastIndexInSnapshot, @Override public LogEntryProto get(long index) throws RaftLogIOException { - final ReferenceCountedObject ref = retainLog(index); - if (ref == null) { - return null; - } + ReferenceCountedObject ref = null; try { + ref = retainLog(index); + if (ref == null) { + return null; + } return LogProtoUtils.copy(ref.get()); } finally { - ref.release(); + if (ref != null) { + ref.release(); + } } } diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java index 86ebfa52ca..e180f3ecbb 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java @@ -859,12 +859,11 @@ public void shutdown() { getServers().forEach(proxy -> executor.submit(() -> JavaUtils.runAsUnchecked(proxy::close))); final int maxRetries = 30; final TimeDuration retrySleep = TimeDuration.ONE_SECOND; - try { - executor.shutdown(); - // just wait for a few seconds - boolean terminated = false; + executor.shutdown(); + boolean terminated = false; - for(int i = 0; i < maxRetries && !terminated; ) { + for(int i = 0; i < maxRetries && !terminated; ) { + try { terminated = executor.awaitTermination(retrySleep.getDuration(), retrySleep.getUnit()); if (!terminated) { i++; @@ -874,10 +873,9 @@ public void shutdown() { LOG.error("Failed to shutdown executor, some servers may be still running:\n{}", printServers()); } } - } - } catch (InterruptedException e) { + } catch (InterruptedException e) { LOG.warn("shutdown interrupted", e); - Thread.currentThread().interrupt(); + } } Optional.ofNullable(timer.get()).ifPresent(Timer::cancel);