Skip to content

Commit

Permalink
RATIS-2186. Raft log should not purge index lower than the log start …
Browse files Browse the repository at this point in the history
…index (#1175)
  • Loading branch information
ivandika3 authored Nov 12, 2024
1 parent 26c1f04 commit 13b8cdd
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -318,20 +318,28 @@ public final CompletableFuture<Long> truncate(long index) {

@Override
public final CompletableFuture<Long> purge(long suggestedIndex) {
final long adjustedIndex;
if (purgePreservation > 0) {
final long currentIndex = getNextIndex() - 1;
suggestedIndex = Math.min(suggestedIndex, currentIndex - purgePreservation);
adjustedIndex = Math.min(suggestedIndex, currentIndex - purgePreservation);
} else {
adjustedIndex = suggestedIndex;
}
final long lastPurge = purgeIndex.get();
if (suggestedIndex - lastPurge < purgeGap) {
if (adjustedIndex - lastPurge < purgeGap) {
return CompletableFuture.completedFuture(lastPurge);
}
final long startIndex = getStartIndex();
if (adjustedIndex < startIndex) {
LOG.info("{}: purge({}) is skipped: adjustedIndex = {} < startIndex = {}, purgePreservation = {}",
getName(), suggestedIndex, adjustedIndex, startIndex, purgePreservation);
return CompletableFuture.completedFuture(lastPurge);
}
LOG.info("{}: purge {}", getName(), suggestedIndex);
final long finalSuggestedIndex = suggestedIndex;
return purgeImpl(suggestedIndex).whenComplete((purged, e) -> {
LOG.info("{}: purge {}", getName(), adjustedIndex );
return purgeImpl(adjustedIndex).whenComplete((purged, e) -> {
updatePurgeIndex(purged);
if (e != null) {
LOG.warn(getName() + ": Failed to purge " + finalSuggestedIndex, e);
LOG.warn(getName() + ": Failed to purge " + adjustedIndex, e);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,10 @@ TruncationSegments truncate(long index, LogSegment openSegment, Runnable clearOp
TruncationSegments purge(long index) {
try (AutoCloseableLock writeLock = writeLock()) {
int segmentIndex = binarySearch(index);
if (segmentIndex == -1) {
// nothing to purge
return null;
}
List<LogSegment> list = new LinkedList<>();

if (segmentIndex == -segments.size() - 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,7 @@ public void testPurgeOnOpenSegment() throws Exception {
int segmentSize = 200;
long beginIndexOfOpenSegment = segmentSize * (endTerm - startTerm - 1);
long expectedIndex = segmentSize * (endTerm - startTerm - 1);
long purgePreservation = 0L;
purgeAndVerify(startTerm, endTerm, segmentSize, 1, beginIndexOfOpenSegment, expectedIndex);
}

Expand Down Expand Up @@ -602,15 +603,36 @@ public void testPurgeOnClosedSegmentsWithPurgeGap() throws Exception {
purgeAndVerify(startTerm, endTerm, segmentSize, 1000, endIndexOfClosedSegment, expectedIndex);
}

@Test
public void testPurgeWithLargePurgePreservationAndSmallPurgeGap() throws Exception {
int startTerm = 0;
int endTerm = 5;
int segmentSize = 200;
long endIndex = segmentSize * (endTerm - startTerm) - 1;
// start index is set so that the suggested index will not be negative, which will not trigger any purge
long startIndex = 200;
// purge preservation is larger than the total size of the log entries
// which causes suggested index to be lower than the start index
long purgePreservation = (segmentSize * (endTerm - startTerm )) + 100;
// if the suggested index is lower than the start index due to the purge preservation, we should not purge anything
purgeAndVerify(startTerm, endTerm, segmentSize, 1, endIndex, startIndex, startIndex, purgePreservation);
}

private void purgeAndVerify(int startTerm, int endTerm, int segmentSize, int purgeGap, long purgeIndex,
long expectedIndex) throws Exception {
List<SegmentRange> ranges = prepareRanges(startTerm, endTerm, segmentSize, 0);
long expectedIndex) throws Exception {
purgeAndVerify(startTerm, endTerm, segmentSize, purgeGap, purgeIndex, expectedIndex, 0, 0);
}

private void purgeAndVerify(int startTerm, int endTerm, int segmentSize, int purgeGap, long purgeIndex,
long expectedIndex, long startIndex, long purgePreservation) throws Exception {
List<SegmentRange> ranges = prepareRanges(startTerm, endTerm, segmentSize, startIndex);
List<LogEntryProto> entries = prepareLogEntries(ranges, null);

final RaftProperties p = new RaftProperties();
RaftServerConfigKeys.Log.setPurgeGap(p, purgeGap);
try (SegmentedRaftLog raftLog = newSegmentedRaftLog(storage, p)) {
raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
RaftServerConfigKeys.Log.setPurgePreservationLogNum(p, purgePreservation);
try (SegmentedRaftLog raftLog = newSegmentedRaftLogWithSnapshotIndex(storage, p, () -> startIndex - 1)) {
raftLog.open(startIndex - 1, null);
entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join);
final CompletableFuture<Long> f = raftLog.purge(purgeIndex);
final Long purged = f.get();
Expand Down

0 comments on commit 13b8cdd

Please sign in to comment.