Skip to content

Commit

Permalink
merge: #10210
Browse files Browse the repository at this point in the history
10210: [Backport stable/8.0] fix(raft): follower resets pending snapshot after rejecting install request r=oleschoenburg a=deepthidevaki

## Description

Backport #10183 

closes #10180 #10202 

Co-authored-by: Deepthi Devaki Akkoorath <[email protected]>
Co-authored-by: Deepthi Devaki Akkoorath <[email protected]>
  • Loading branch information
3 people authored Aug 29, 2022
2 parents 07c481e + 886ccb1 commit f687fae
Show file tree
Hide file tree
Showing 6 changed files with 173 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import io.atomix.utils.concurrent.Futures;
import io.atomix.utils.concurrent.Scheduled;
import io.camunda.zeebe.journal.JournalException;
import io.camunda.zeebe.snapshots.PersistedSnapshotListener;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.Instant;
Expand Down Expand Up @@ -106,11 +105,6 @@ public synchronized CompletableFuture<Void> stop() {
.thenRun(this::stepDown);
}

@Override
protected PersistedSnapshotListener createSnapshotListener() {
return null;
}

@Override
public RaftServer.Role role() {
return RaftServer.Role.LEADER;
Expand Down
95 changes: 17 additions & 78 deletions atomix/cluster/src/main/java/io/atomix/raft/roles/PassiveRole.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,27 +34,21 @@
import io.atomix.raft.snapshot.impl.SnapshotChunkImpl;
import io.atomix.raft.storage.log.IndexedRaftLogEntry;
import io.atomix.raft.storage.log.PersistedRaftRecord;
import io.atomix.raft.storage.log.RaftLog;
import io.atomix.raft.storage.log.RaftLogReader;
import io.atomix.utils.concurrent.ThreadContext;
import io.camunda.zeebe.journal.JournalException;
import io.camunda.zeebe.journal.JournalException.InvalidChecksum;
import io.camunda.zeebe.journal.JournalException.InvalidIndex;
import io.camunda.zeebe.snapshots.PersistedSnapshot;
import io.camunda.zeebe.snapshots.PersistedSnapshotListener;
import io.camunda.zeebe.snapshots.ReceivedSnapshot;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import org.agrona.concurrent.UnsafeBuffer;
import org.slf4j.Logger;

/** Passive state. */
public class PassiveRole extends InactiveRole {

private final SnapshotReplicationMetrics snapshotReplicationMetrics;
private long pendingSnapshotStartTimestamp;
private ReceivedSnapshot pendingSnapshot;
private PersistedSnapshotListener snapshotListener;
private ByteBuffer nextPendingSnapshotChunkId;

public PassiveRole(final RaftContext context) {
Expand All @@ -66,20 +60,13 @@ public PassiveRole(final RaftContext context) {

@Override
public CompletableFuture<RaftRole> start() {
snapshotListener = createSnapshotListener();

return super.start()
.thenRun(this::truncateUncommittedEntries)
.thenRun(this::addSnapshotListener)
.thenApply(v -> this);
return super.start().thenRun(this::truncateUncommittedEntries).thenApply(v -> this);
}

@Override
public CompletableFuture<Void> stop() {
abortPendingSnapshots();
if (snapshotListener != null) {
raft.getPersistedSnapshotStore().removeSnapshotListener(snapshotListener);
}

// as a safe guard, we clean up any orphaned pending snapshots
try {
Expand All @@ -100,31 +87,6 @@ private void truncateUncommittedEntries() {
raft.getLog().flush();
raft.setLastWrittenIndex(raft.getCommitIndex());
}

// to fix the edge case where we might have been stopped
// between persisting snapshot and truncating log we need to call on restart snapshot listener
// again, such that we truncate the log when necessary
final var latestSnapshot = raft.getCurrentSnapshot();
if (latestSnapshot != null && snapshotListener != null) {
snapshotListener.onNewSnapshot(latestSnapshot);
}
}

/**
* Should be overwritten by sub classes to introduce different snapshot listener.
*
* <p>If null no snapshot listener will be installed
*
* @return the snapshot listener which will be installed
*/
protected PersistedSnapshotListener createSnapshotListener() {
return new ResetWriterSnapshotListener(log, raft.getThreadContext(), raft.getLog());
}

private void addSnapshotListener() {
if (snapshotListener != null) {
raft.getPersistedSnapshotStore().addSnapshotListener(snapshotListener);
}
}

@Override
Expand Down Expand Up @@ -182,6 +144,7 @@ public CompletableFuture<InstallResponse> onInstall(final InstallRequest request
}

if (!request.complete() && request.nextChunkId() == null) {
abortPendingSnapshots();
return CompletableFuture.completedFuture(
logResponse(
InstallResponse.builder()
Expand All @@ -195,6 +158,7 @@ public CompletableFuture<InstallResponse> onInstall(final InstallRequest request
final var snapshotChunk = new SnapshotChunkImpl();
final var snapshotChunkBuffer = new UnsafeBuffer(request.data());
if (!snapshotChunk.tryWrap(snapshotChunkBuffer)) {
abortPendingSnapshots();
return CompletableFuture.completedFuture(
logResponse(
InstallResponse.builder()
Expand Down Expand Up @@ -229,13 +193,14 @@ public CompletableFuture<InstallResponse> onInstall(final InstallRequest request
} else {
// fail the request if this is not the expected next chunk
if (!isExpectedChunk(request.chunkId())) {
abortPendingSnapshots();
return CompletableFuture.completedFuture(
logResponse(
InstallResponse.builder()
.withStatus(RaftResponse.Status.ERROR)
.withError(
RaftError.Type.ILLEGAL_MEMBER_STATE,
"Request chunk is was received out of order")
"Snapshot chunk is received out of order")
.build()));
}
}
Expand Down Expand Up @@ -265,11 +230,12 @@ public CompletableFuture<InstallResponse> onInstall(final InstallRequest request
final long elapsed = System.currentTimeMillis() - pendingSnapshotStartTimestamp;
log.debug("Committing snapshot {}", pendingSnapshot);
try {
// Reset before committing to prevent the edge case where the system crashes after
// committing the snapshot, and restart with a snapshot and invalid log.
resetLogOnReceivingSnapshot(pendingSnapshot.index());

final var snapshot = pendingSnapshot.persist().join();
log.info("Committed snapshot {}", snapshot);
// Must be executed immediately before any other operation on this threadcontext. Hence
// don't wait for the listener to be notified by the snapshot store.
snapshotListener.onNewSnapshot(snapshot);
} catch (final Exception e) {
log.error("Failed to commit pending snapshot {}, rolling back", pendingSnapshot, e);
abortPendingSnapshots();
Expand Down Expand Up @@ -742,41 +708,14 @@ protected boolean completeAppend(
return succeeded;
}

private static final class ResetWriterSnapshotListener implements PersistedSnapshotListener {

private final ThreadContext threadContext;
private final RaftLog raftLog;
private final Logger log;
private void resetLogOnReceivingSnapshot(final long snapshotIndex) {
final var raftLog = raft.getLog();

ResetWriterSnapshotListener(
final Logger log, final ThreadContext threadContext, final RaftLog raftLog) {
this.log = log;
this.threadContext = threadContext;
this.raftLog = raftLog;
}

@Override
public void onNewSnapshot(final PersistedSnapshot persistedSnapshot) {
if (threadContext.isCurrentContext()) {
// this is called after the snapshot is commited
// on install requests and on Zeebe snapshot replication

final var index = persistedSnapshot.getIndex();
// It might happen that the last index is far behind our current snapshot index.
// E. g. on slower followers, we need to throw away the existing log,
// otherwise we might end with an inconsistent log (gaps between last index and
// snapshot index)
final var lastIndex = raftLog.getLastIndex();
if (lastIndex < index) {
log.info(
"Delete existing log (lastIndex '{}') and replace with received snapshot (index '{}')",
lastIndex,
index);
raftLog.reset(index + 1);
}
} else {
threadContext.execute(() -> onNewSnapshot(persistedSnapshot));
}
}
log.info(
"Delete existing log (lastIndex '{}') and replace with received snapshot (index '{}'). First entry in the log will be at index {}",
raftLog.getLastIndex(),
snapshotIndex,
snapshotIndex + 1);
raftLog.reset(snapshotIndex + 1);
}
}
Loading

0 comments on commit f687fae

Please sign in to comment.