Skip to content

Commit

Permalink
merge: #8239
Browse files Browse the repository at this point in the history
8239: [Backport stable/1.2] fix(snapshot): create snapshot exporter position is -1 r=romansmirnov a=romansmirnov

## Description

backports #8176 

## Related issues

<!-- Which issues are closed by this PR or are related -->

relates to #7978 



Co-authored-by: Roman <[email protected]>
  • Loading branch information
zeebe-bors-cloud[bot] and romansmirnov authored Nov 26, 2021
2 parents 5fd6471 + f10e864 commit 3e74390
Show file tree
Hide file tree
Showing 4 changed files with 323 additions and 49 deletions.
24 changes: 24 additions & 0 deletions atomix/cluster/src/test/java/io/atomix/raft/RaftFailOverTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,30 @@ public void shouldTruncateLogOnNewerSnapshotEvenAfterRestart() throws Throwable
assertThat(entries.get(0).index()).isEqualTo(66);
}

@Test
public void shouldNotReplicateSnapshotWhenIndexIsZero() throws Exception {
// given
final var follower = raftRule.shutdownFollower();

raftRule.appendEntries(100);
raftRule.doSnapshot(0);

// expect
final var leaderSnapshot = raftRule.getSnapshotFromLeader();
assertThat(leaderSnapshot.getIndex()).isEqualTo(0);

// when
raftRule.joinCluster(follower);

// then
final var memberLogs = raftRule.getMemberLogs();
assertMemberLogs(memberLogs);

// snapshot is not replicated to follower
final var followerSnapshotStore = raftRule.getPersistedSnapshotStore(follower);
assertThat(followerSnapshotStore.getLatestSnapshot()).isNotPresent();
}

private void assertMemberLogs(final Map<String, List<IndexedRaftLogEntry>> memberLog) {
final var members = memberLog.keySet();
final var iterator = members.iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.camunda.zeebe.db.ZeebeDbFactory;
import io.camunda.zeebe.logstreams.impl.Loggers;
import io.camunda.zeebe.snapshots.ConstructableSnapshotStore;
import io.camunda.zeebe.snapshots.PersistedSnapshot;
import io.camunda.zeebe.snapshots.TransientSnapshot;
import io.camunda.zeebe.util.FileUtil;
import io.camunda.zeebe.util.sched.ConcurrencyControl;
Expand Down Expand Up @@ -141,26 +142,46 @@ private void takeTransientSnapshotInternal(
return;
}

final long exportedPosition = exporterPositionSupplier.applyAsLong(db);
final long snapshotPosition =
determineSnapshotPosition(lowerBoundSnapshotPosition, exportedPosition);
final var optionalIndexed = entrySupplier.getPreviousIndexedEntry(snapshotPosition);
if (optionalIndexed.isEmpty()) {
future.completeExceptionally(
new IllegalStateException(
String.format(
"Failed to take snapshot. Expected to find an indexed entry for determined snapshot position %d (processedPosition = %d, exportedPosition=%d), but found no matching indexed entry which contains this position.",
snapshotPosition, lowerBoundSnapshotPosition, exportedPosition)));
return;
long index = 0;
long term = 0;
long exportedPosition = exporterPositionSupplier.applyAsLong(db);

if (exportedPosition != -1) {

final long snapshotPosition =
determineSnapshotPosition(lowerBoundSnapshotPosition, exportedPosition);
final var optionalIndexed = entrySupplier.getPreviousIndexedEntry(snapshotPosition);

if (optionalIndexed.isEmpty()) {
future.completeExceptionally(
new IllegalStateException(
String.format(
"Failed to take snapshot. Expected to find an indexed entry for determined snapshot position %d (processedPosition = %d, exportedPosition=%d), but found no matching indexed entry which contains this position.",
snapshotPosition, lowerBoundSnapshotPosition, exportedPosition)));
return;
}

final var snapshotIndexedEntry = optionalIndexed.get();
index = snapshotIndexedEntry.index();
term = snapshotIndexedEntry.term();
} else {
final Optional<PersistedSnapshot> latestSnapshot =
constructableSnapshotStore.getLatestSnapshot();
exportedPosition = 0;

if (latestSnapshot.isPresent()) {
// re-use index and term from the latest snapshot
// to ensure that the records from there are not
// compacted until they get exported
final PersistedSnapshot persistedSnapshot = latestSnapshot.get();
index = persistedSnapshot.getIndex();
term = persistedSnapshot.getTerm();
} // otherwise index/term remains 0
}

final var snapshotIndexedEntry = optionalIndexed.get();
final Optional<TransientSnapshot> transientSnapshot =
final var transientSnapshot =
constructableSnapshotStore.newTransientSnapshot(
snapshotIndexedEntry.index(),
snapshotIndexedEntry.term(),
lowerBoundSnapshotPosition,
exportedPosition);
index, term, lowerBoundSnapshotPosition, exportedPosition);

// Now takeSnapshot result can be either true, false or error.
transientSnapshot.ifPresentOrElse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,49 @@ public void shouldCloseDbOnlyAfterTakingSnapshot() {
assertThat(snapshotTaken.join()).isNotEmpty();
}

@Test
public void shouldSetExporterPositionToZero() {
// given
snapshotController.recover().join();

exporterPosition.set(-1L);
final long snapshotPosition = 5;

// when
final var transientSnapshot = snapshotController.takeTransientSnapshot(snapshotPosition).join();

// then
final var snapshot = transientSnapshot.get().snapshotId();
assertThat(snapshot.getIndex()).isEqualTo(0);
assertThat(snapshot.getTerm()).isEqualTo(0);
assertThat(snapshot.getProcessedPosition()).isEqualTo(snapshotPosition);
assertThat(snapshot.getExportedPosition()).isEqualTo(0);
}

@Test
public void shouldKeepIndexAndTerm() {
// given
snapshotController.recover().join();

final long snapshotPosition = 5;
exporterPosition.set(4L);
takeSnapshot(snapshotPosition);

final var latestSnapshot = store.getLatestSnapshot().get();

exporterPosition.set(-1L);

// when
final var transientSnapshot = snapshotController.takeTransientSnapshot(snapshotPosition).join();

// then
final var snapshot = transientSnapshot.get().snapshotId();
assertThat(snapshot.getIndex()).isEqualTo(latestSnapshot.getIndex());
assertThat(snapshot.getTerm()).isEqualTo(latestSnapshot.getTerm());
assertThat(snapshot.getProcessedPosition()).isEqualTo(snapshotPosition);
assertThat(snapshot.getExportedPosition()).isEqualTo(0);
}

private File takeSnapshot(final long position) {
final var snapshot = snapshotController.takeTransientSnapshot(position).join().orElseThrow();
return snapshot.persist().join().getPath().toFile();
Expand Down
Loading

0 comments on commit 3e74390

Please sign in to comment.