diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java index 1e22e07bd0..9b0367213a 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java @@ -314,20 +314,28 @@ public final CompletableFuture truncate(long index) { @Override public final CompletableFuture purge(long suggestedIndex) { + final long adjustedIndex; if (purgePreservation > 0) { final long currentIndex = getNextIndex() - 1; - suggestedIndex = Math.min(suggestedIndex, currentIndex - purgePreservation); + adjustedIndex = Math.min(suggestedIndex, currentIndex - purgePreservation); + } else { + adjustedIndex = suggestedIndex; } final long lastPurge = purgeIndex.get(); - if (suggestedIndex - lastPurge < purgeGap) { + if (adjustedIndex - lastPurge < purgeGap) { + return CompletableFuture.completedFuture(lastPurge); + } + final long startIndex = getStartIndex(); + if (adjustedIndex < startIndex) { + LOG.info("{}: purge({}) is skipped: adjustedIndex = {} < startIndex = {}, purgePreservation = {}", + getName(), suggestedIndex, adjustedIndex, startIndex, purgePreservation); return CompletableFuture.completedFuture(lastPurge); } - LOG.info("{}: purge {}", getName(), suggestedIndex); - final long finalSuggestedIndex = suggestedIndex; - return purgeImpl(suggestedIndex).whenComplete((purged, e) -> { + LOG.info("{}: purge {}", getName(), adjustedIndex ); + return purgeImpl(adjustedIndex).whenComplete((purged, e) -> { updatePurgeIndex(purged); if (e != null) { - LOG.warn(getName() + ": Failed to purge " + finalSuggestedIndex, e); + LOG.warn(getName() + ": Failed to purge " + adjustedIndex, e); } }); } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java index 20b7a5c372..9509436c35 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java @@ -356,7 +356,11 @@ TruncationSegments truncate(long index, LogSegment openSegment, Runnable clearOp TruncationSegments purge(long index) { try (AutoCloseableLock writeLock = writeLock()) { int segmentIndex = binarySearch(index); - List list = new ArrayList<>(); + if (segmentIndex == -1) { + // nothing to purge + return null; + } + List list = new LinkedList<>(); if (segmentIndex == -segments.size() - 1) { for (LogSegment ls : segments) { diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java index 93eb7db0e6..a772b00029 100644 --- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java +++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java @@ -563,6 +563,7 @@ public void testPurgeOnOpenSegment() throws Exception { int segmentSize = 200; long beginIndexOfOpenSegment = segmentSize * (endTerm - startTerm - 1); long expectedIndex = segmentSize * (endTerm - startTerm - 1); + long purgePreservation = 0L; purgeAndVerify(startTerm, endTerm, segmentSize, 1, beginIndexOfOpenSegment, expectedIndex); } @@ -599,15 +600,36 @@ public void testPurgeOnClosedSegmentsWithPurgeGap() throws Exception { purgeAndVerify(startTerm, endTerm, segmentSize, 1000, endIndexOfClosedSegment, expectedIndex); } + @Test + public void testPurgeWithLargePurgePreservationAndSmallPurgeGap() throws Exception { + int startTerm = 0; + int endTerm = 5; + int segmentSize = 200; + long endIndex = segmentSize * (endTerm - startTerm) - 1; + // start index is set so that the suggested index will not be negative, which will not trigger any purge + long startIndex = 200; + // purge preservation is larger than the total size of the log entries + // which causes suggested index to be lower than the start index + long purgePreservation = (segmentSize * (endTerm - startTerm )) + 100; + // if the suggested index is lower than the start index due to the purge preservation, we should not purge anything + purgeAndVerify(startTerm, endTerm, segmentSize, 1, endIndex, startIndex, startIndex, purgePreservation); + } + private void purgeAndVerify(int startTerm, int endTerm, int segmentSize, int purgeGap, long purgeIndex, - long expectedIndex) throws Exception { - List ranges = prepareRanges(startTerm, endTerm, segmentSize, 0); + long expectedIndex) throws Exception { + purgeAndVerify(startTerm, endTerm, segmentSize, purgeGap, purgeIndex, expectedIndex, 0, 0); + } + + private void purgeAndVerify(int startTerm, int endTerm, int segmentSize, int purgeGap, long purgeIndex, + long expectedIndex, long startIndex, long purgePreservation) throws Exception { + List ranges = prepareRanges(startTerm, endTerm, segmentSize, startIndex); List entries = prepareLogEntries(ranges, null); final RaftProperties p = new RaftProperties(); RaftServerConfigKeys.Log.setPurgeGap(p, purgeGap); - try (SegmentedRaftLog raftLog = newSegmentedRaftLog(storage, p)) { - raftLog.open(RaftLog.INVALID_LOG_INDEX, null); + RaftServerConfigKeys.Log.setPurgePreservationLogNum(p, purgePreservation); + try (SegmentedRaftLog raftLog = newSegmentedRaftLogWithSnapshotIndex(storage, p, () -> startIndex - 1)) { + raftLog.open(startIndex - 1, null); entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join); final CompletableFuture f = raftLog.purge(purgeIndex); final Long purged = f.get();