Skip to content

Commit

Permalink
refactor(raft): remove unused snapshot listener
Browse files Browse the repository at this point in the history
(cherry picked from commit 43c96cd)
  • Loading branch information
deepthidevaki committed Aug 29, 2022
1 parent 20d5aca commit 0a72e69
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import io.atomix.utils.concurrent.Futures;
import io.atomix.utils.concurrent.Scheduled;
import io.camunda.zeebe.journal.JournalException;
import io.camunda.zeebe.snapshots.PersistedSnapshotListener;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.Instant;
Expand Down Expand Up @@ -106,11 +105,6 @@ public synchronized CompletableFuture<Void> stop() {
.thenRun(this::stepDown);
}

@Override
protected PersistedSnapshotListener createSnapshotListener() {
return null;
}

@Override
public RaftServer.Role role() {
return RaftServer.Role.LEADER;
Expand Down
65 changes: 0 additions & 65 deletions atomix/cluster/src/main/java/io/atomix/raft/roles/PassiveRole.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,27 +34,21 @@
import io.atomix.raft.snapshot.impl.SnapshotChunkImpl;
import io.atomix.raft.storage.log.IndexedRaftLogEntry;
import io.atomix.raft.storage.log.PersistedRaftRecord;
import io.atomix.raft.storage.log.RaftLog;
import io.atomix.raft.storage.log.RaftLogReader;
import io.atomix.utils.concurrent.ThreadContext;
import io.camunda.zeebe.journal.JournalException;
import io.camunda.zeebe.journal.JournalException.InvalidChecksum;
import io.camunda.zeebe.journal.JournalException.InvalidIndex;
import io.camunda.zeebe.snapshots.PersistedSnapshot;
import io.camunda.zeebe.snapshots.PersistedSnapshotListener;
import io.camunda.zeebe.snapshots.ReceivedSnapshot;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import org.agrona.concurrent.UnsafeBuffer;
import org.slf4j.Logger;

/** Passive state. */
public class PassiveRole extends InactiveRole {

private final SnapshotReplicationMetrics snapshotReplicationMetrics;
private long pendingSnapshotStartTimestamp;
private ReceivedSnapshot pendingSnapshot;
private PersistedSnapshotListener snapshotListener;
private ByteBuffer nextPendingSnapshotChunkId;

public PassiveRole(final RaftContext context) {
Expand All @@ -66,17 +60,13 @@ public PassiveRole(final RaftContext context) {

@Override
public CompletableFuture<RaftRole> start() {
snapshotListener = createSnapshotListener();

return super.start().thenRun(this::truncateUncommittedEntries).thenApply(v -> this);
}

@Override
public CompletableFuture<Void> stop() {
abortPendingSnapshots();
if (snapshotListener != null) {
raft.getPersistedSnapshotStore().removeSnapshotListener(snapshotListener);
}

// as a safe guard, we clean up any orphaned pending snapshots
try {
Expand All @@ -99,23 +89,6 @@ private void truncateUncommittedEntries() {
}
}

/**
* Should be overwritten by sub classes to introduce different snapshot listener.
*
* <p>If null no snapshot listener will be installed
*
* @return the snapshot listener which will be installed
*/
protected PersistedSnapshotListener createSnapshotListener() {
return new ResetWriterSnapshotListener(log, raft.getThreadContext(), raft.getLog());
}

private void addSnapshotListener() {
if (snapshotListener != null) {
raft.getPersistedSnapshotStore().addSnapshotListener(snapshotListener);
}
}

@Override
public RaftServer.Role role() {
return RaftServer.Role.PASSIVE;
Expand Down Expand Up @@ -745,42 +718,4 @@ private void resetLogOnReceivingSnapshot(final long snapshotIndex) {
snapshotIndex + 1);
raftLog.reset(snapshotIndex + 1);
}

private static final class ResetWriterSnapshotListener implements PersistedSnapshotListener {

private final ThreadContext threadContext;
private final RaftLog raftLog;
private final Logger log;

ResetWriterSnapshotListener(
final Logger log, final ThreadContext threadContext, final RaftLog raftLog) {
this.log = log;
this.threadContext = threadContext;
this.raftLog = raftLog;
}

@Override
public void onNewSnapshot(final PersistedSnapshot persistedSnapshot) {
if (threadContext.isCurrentContext()) {
// this is called after the snapshot is commited
// on install requests and on Zeebe snapshot replication

final var index = persistedSnapshot.getIndex();
// It might happen that the last index is far behind our current snapshot index.
// E. g. on slower followers, we need to throw away the existing log,
// otherwise we might end with an inconsistent log (gaps between last index and
// snapshot index)
final var lastIndex = raftLog.getLastIndex();
if (lastIndex < index) {
log.info(
"Delete existing log (lastIndex '{}') and replace with received snapshot (index '{}')",
lastIndex,
index);
raftLog.reset(index + 1);
}
} else {
threadContext.execute(() -> onNewSnapshot(persistedSnapshot));
}
}
}
}

0 comments on commit 0a72e69

Please sign in to comment.