Skip to content

Commit

Permalink
Merge #6331
Browse files Browse the repository at this point in the history
6331: [Backport stable/0.26] fix(snapshot): make FilebasedSnapshotStore an actor r=MiguelPires a=deepthidevaki

Backport of #6283

closes #6255 

Co-authored-by: Deepthi Devaki Akkoorath <[email protected]>
  • Loading branch information
zeebe-bors[bot] and deepthidevaki authored Feb 12, 2021
2 parents b80ddcf + c44bfd1 commit 3a2d486
Show file tree
Hide file tree
Showing 37 changed files with 999 additions and 610 deletions.
20 changes: 20 additions & 0 deletions atomix/cluster/src/main/java/io/atomix/raft/impl/RaftContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import io.atomix.utils.concurrent.ThreadContext;
import io.atomix.utils.logging.ContextualLoggerFactory;
import io.atomix.utils.logging.LoggerContext;
import io.zeebe.snapshots.raft.PersistedSnapshot;
import io.zeebe.snapshots.raft.ReceivableSnapshotStore;
import java.time.Duration;
import java.util.Objects;
Expand Down Expand Up @@ -113,6 +114,7 @@ public class RaftContext implements AutoCloseable {
private final int maxAppendsPerFollower;
// Used for randomizing election timeout
private final Random random;
private PersistedSnapshot currentSnapshot;

public RaftContext(
final String name,
Expand Down Expand Up @@ -158,6 +160,12 @@ public RaftContext(

// Open the snapshot store.
persistedSnapshotStore = storage.getPersistedSnapshotStore();
persistedSnapshotStore.addSnapshotListener(this::setSnapshot);
// Update the current snapshot because the listener only notifies when a new snapshot is
// created.
persistedSnapshotStore
.getLatestSnapshot()
.ifPresent(persistedSnapshot -> currentSnapshot = persistedSnapshot);

logCompactor = new LogCompactor(this);

Expand All @@ -174,6 +182,10 @@ public RaftContext(
started = true;
}

private void setSnapshot(final PersistedSnapshot persistedSnapshot) {
threadContext.execute(() -> currentSnapshot = persistedSnapshot);
}

private void onUncaughtException(final Throwable error) {
log.error("An uncaught exception occurred, transition to inactive role", error);
try {
Expand Down Expand Up @@ -953,6 +965,14 @@ public void setLeader(final MemberId leader) {
}
}

public PersistedSnapshot getCurrentSnapshot() {
return currentSnapshot;
}

public long getCurrentSnapshotIndex() {
return currentSnapshot != null ? currentSnapshot.getIndex() : 0L;
}

public boolean isRunning() {
return started;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.esotericsoftware.kryo.serializers.FieldSerializer.Optional;
import io.atomix.storage.StorageLevel;
import io.atomix.utils.memory.MemorySize;
import io.zeebe.snapshots.broker.impl.FileBasedSnapshotStoreFactory;
import io.zeebe.snapshots.raft.ReceivableSnapshotStoreFactory;

/** Raft storage configuration. */
Expand All @@ -33,8 +32,6 @@ public class RaftStorageConfig {
private static final int DEFAULT_MAX_ENTRY_SIZE = 1024 * 1024;
private static final boolean DEFAULT_FLUSH_EXPLICITLY = true;
private static final long DEFAULT_FREE_DISK_SPACE = 1024L * 1024 * 1024 * 1; // 1GB
private static final ReceivableSnapshotStoreFactory DEFAULT_SNAPSHOT_STORE_FACTORY =
new FileBasedSnapshotStoreFactory();

private String directory;
private StorageLevel level = DEFAULT_STORAGE_LEVEL;
Expand All @@ -44,8 +41,7 @@ public class RaftStorageConfig {
private long freeDiskSpace = DEFAULT_FREE_DISK_SPACE;

@Optional("SnapshotStoreFactory")
private ReceivableSnapshotStoreFactory persistedSnapshotStoreFactory =
DEFAULT_SNAPSHOT_STORE_FACTORY;
private ReceivableSnapshotStoreFactory persistedSnapshotStoreFactory;

/**
* Returns the partition data directory.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,8 @@ private AppendRequest.Builder builderWithPreviousEntry(final Indexed<RaftLogEntr
prevIndex = prevEntry.index();
prevTerm = prevEntry.entry().term();
} else {
final var optCurrentSnapshot = raft.getPersistedSnapshotStore().getLatestSnapshot();
if (optCurrentSnapshot.isPresent()) {
final var currentSnapshot = optCurrentSnapshot.get();
final var currentSnapshot = raft.getCurrentSnapshot();
if (currentSnapshot != null) {
prevIndex = currentSnapshot.getIndex();
prevTerm = currentSnapshot.getTerm();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,16 +343,15 @@ public void close() {
}

private void tryToReplicateSnapshot(final RaftMemberContext member) {
final var optSnapshot = raft.getPersistedSnapshotStore().getLatestSnapshot();
final var persistedSnapshot = raft.getCurrentSnapshot();

if (optSnapshot.isPresent()
&& member.getSnapshotIndex() < optSnapshot.get().getIndex()
&& optSnapshot.get().getIndex() >= member.getLogReader().getCurrentIndex()) {
if (persistedSnapshot != null
&& member.getSnapshotIndex() < persistedSnapshot.getIndex()
&& persistedSnapshot.getIndex() >= member.getLogReader().getCurrentIndex()) {
if (!member.canInstall()) {
return;
}

final var persistedSnapshot = optSnapshot.get();
log.debug(
"Replicating snapshot {} to {}",
persistedSnapshot.getIndex(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ public CompletableFuture<AppendResponse> onAppend(final AppendRequest request) {
.withTerm(raft.getTerm())
.withSucceeded(false)
.withLastLogIndex(raft.getLogWriter().getLastIndex())
.withLastSnapshotIndex(raft.getPersistedSnapshotStore().getCurrentSnapshotIndex())
.withLastSnapshotIndex(raft.getCurrentSnapshotIndex())
.build()));
} else {
raft.setLeader(request.leader());
Expand Down
76 changes: 35 additions & 41 deletions atomix/cluster/src/main/java/io/atomix/raft/roles/PassiveRole.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import io.zeebe.snapshots.raft.PersistedSnapshot;
import io.zeebe.snapshots.raft.PersistedSnapshotListener;
import io.zeebe.snapshots.raft.ReceivedSnapshot;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import org.agrona.concurrent.UnsafeBuffer;
import org.slf4j.Logger;
Expand All @@ -57,6 +57,7 @@ public class PassiveRole extends InactiveRole {
private long pendingSnapshotStartTimestamp;
private ReceivedSnapshot pendingSnapshot;
private PersistedSnapshotListener snapshotListener;
private ByteBuffer nextPendingSnapshotChunkId;

public PassiveRole(final RaftContext context) {
super(context);
Expand All @@ -81,6 +82,15 @@ public CompletableFuture<Void> stop() {
if (snapshotListener != null) {
raft.getPersistedSnapshotStore().removeSnapshotListener(snapshotListener);
}

// as a safe guard, we clean up any orphaned pending snapshots
try {
raft.getPersistedSnapshotStore().purgePendingSnapshots().join();
} catch (final Exception e) {
log.warn(
"Failed to purge pending snapshots, which may result in unnecessary disk usage and should be monitored",
e);
}
return super.stop();
}

Expand All @@ -94,12 +104,9 @@ private void truncateUncommittedEntries() {
// to fix the edge case where we might have been stopped
// between persisting snapshot and truncating log we need to call on restart snapshot listener
// again, such that we truncate the log when necessary
final var latestSnapshot = raft.getPersistedSnapshotStore().getLatestSnapshot();
if (latestSnapshot.isPresent()) {
final var persistedSnapshot = latestSnapshot.get();
if (snapshotListener != null) {
snapshotListener.onNewSnapshot(persistedSnapshot);
}
final var latestSnapshot = raft.getCurrentSnapshot();
if (latestSnapshot != null && snapshotListener != null) {
snapshotListener.onNewSnapshot(latestSnapshot);
}
}

Expand Down Expand Up @@ -165,13 +172,8 @@ public CompletableFuture<InstallResponse> onInstall(final InstallRequest request
}

// If the snapshot already exists locally, do not overwrite it with a replicated snapshot.
// Simply reply to the
// request successfully.
final var latestIndex =
raft.getPersistedSnapshotStore()
.getLatestSnapshot()
.map(PersistedSnapshot::getIndex)
.orElse(Long.MIN_VALUE);
// Simply reply to the request successfully.
final var latestIndex = raft.getCurrentSnapshotIndex();
if (latestIndex >= request.index()) {
abortPendingSnapshots();

Expand Down Expand Up @@ -221,14 +223,8 @@ public CompletableFuture<InstallResponse> onInstall(final InstallRequest request
pendingSnapshotStartTimestamp = System.currentTimeMillis();
snapshotReplicationMetrics.incrementCount();
} else {
// skip if we already have this chunk
if (pendingSnapshot.containsChunk(request.chunkId())) {
return CompletableFuture.completedFuture(
logResponse(InstallResponse.builder().withStatus(RaftResponse.Status.OK).build()));
}

// fail the request if this is not the expected next chunk
if (!pendingSnapshot.isExpectedChunk(request.chunkId())) {
if (!isExpectedChunk(request.chunkId())) {
return CompletableFuture.completedFuture(
logResponse(
InstallResponse.builder()
Expand All @@ -242,7 +238,7 @@ public CompletableFuture<InstallResponse> onInstall(final InstallRequest request

boolean snapshotChunkConsumptionFailed;
try {
snapshotChunkConsumptionFailed = !pendingSnapshot.apply(snapshotChunk);
snapshotChunkConsumptionFailed = !pendingSnapshot.apply(snapshotChunk).join();
} catch (final Exception e) {
log.error("Failed to write pending snapshot chunk {}, rolling back", pendingSnapshot, e);
snapshotChunkConsumptionFailed = true;
Expand All @@ -265,8 +261,11 @@ public CompletableFuture<InstallResponse> onInstall(final InstallRequest request
final long elapsed = System.currentTimeMillis() - pendingSnapshotStartTimestamp;
log.debug("Committing snapshot {}", pendingSnapshot);
try {
final var snapshot = pendingSnapshot.persist();
final var snapshot = pendingSnapshot.persist().join();
log.info("Committed snapshot {}", snapshot);
// Must be executed immediately before any other operation on this threadcontext. Hence
// don't wait for the listener to be notified by the snapshot store.
snapshotListener.onNewSnapshot(snapshot);
} catch (final Exception e) {
log.error("Failed to commit pending snapshot {}, rolling back", pendingSnapshot, e);
abortPendingSnapshots();
Expand All @@ -284,7 +283,7 @@ public CompletableFuture<InstallResponse> onInstall(final InstallRequest request
snapshotReplicationMetrics.decrementCount();
snapshotReplicationMetrics.observeDuration(elapsed);
} else {
pendingSnapshot.setNextExpected(request.nextChunkId());
setNextExpected(request.nextChunkId());
}

return CompletableFuture.completedFuture(
Expand Down Expand Up @@ -399,7 +398,16 @@ public CompletableFuture<VoteResponse> onVote(final VoteRequest request) {
.build()));
}

private void setNextExpected(final ByteBuffer nextChunkId) {
nextPendingSnapshotChunkId = nextChunkId;
}

private boolean isExpectedChunk(final ByteBuffer chunkId) {
return nextPendingSnapshotChunkId == null || nextPendingSnapshotChunkId.equals(chunkId);
}

private void abortPendingSnapshots() {
setNextExpected(null);
if (pendingSnapshot != null) {
log.info("Rolling back snapshot {}", pendingSnapshot);
try {
Expand All @@ -412,15 +420,6 @@ private void abortPendingSnapshots() {

snapshotReplicationMetrics.decrementCount();
}

// as a safe guard, we clean up any orphaned pending snapshots
try {
raft.getPersistedSnapshotStore().purgePendingSnapshots();
} catch (final IOException e) {
log.error(
"Failed to purge pending snapshots, which may result in unnecessary disk usage and should be monitored",
e);
}
}

/** Handles an AppendRequest. */
Expand Down Expand Up @@ -477,10 +476,9 @@ protected boolean checkPreviousEntry(
if (lastEntry != null) {
return checkPreviousEntry(request, lastEntry.index(), lastEntry.entry().term(), future);
} else {
final var optCurrentSnapshot = raft.getPersistedSnapshotStore().getLatestSnapshot();
final var currentSnapshot = raft.getCurrentSnapshot();

if (optCurrentSnapshot.isPresent()) {
final var currentSnapshot = optCurrentSnapshot.get();
if (currentSnapshot != null) {
return checkPreviousEntry(
request, currentSnapshot.getIndex(), currentSnapshot.getTerm(), future);
} else {
Expand Down Expand Up @@ -768,11 +766,7 @@ protected boolean completeAppend(
.withTerm(raft.getTerm())
.withSucceeded(succeeded)
.withLastLogIndex(lastLogIndex)
.withLastSnapshotIndex(
raft.getPersistedSnapshotStore()
.getLatestSnapshot()
.map(PersistedSnapshot::getIndex)
.orElse(0L))
.withLastSnapshotIndex(raft.getCurrentSnapshotIndex())
.build()));
return succeeded;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@
package io.atomix.raft.snapshot;

import io.atomix.utils.time.WallClockTimestamp;
import io.zeebe.snapshots.broker.SnapshotId;
import io.zeebe.snapshots.raft.PersistedSnapshot;
import io.zeebe.snapshots.raft.ReceivedSnapshot;
import io.zeebe.snapshots.raft.SnapshotChunk;
import io.zeebe.snapshots.raft.SnapshotChunkReader;
import io.zeebe.util.StringUtil;
import io.zeebe.util.buffer.BufferUtil;
import io.zeebe.util.sched.future.ActorFuture;
import io.zeebe.util.sched.future.CompletableActorFuture;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Path;
Expand All @@ -38,7 +41,6 @@ public class InMemorySnapshot implements PersistedSnapshot, ReceivedSnapshot {
private final WallClockTimestamp timestamp;
private final String id;
private final NavigableMap<String, String> chunks = new TreeMap<>();
private ByteBuffer nextExpected;

InMemorySnapshot(final TestSnapshotStore testSnapshotStore, final String snapshotId) {
this.testSnapshotStore = testSnapshotStore;
Expand All @@ -58,7 +60,7 @@ public class InMemorySnapshot implements PersistedSnapshot, ReceivedSnapshot {
this.index = index;
this.term = term;
this.timestamp = timestamp;
this.id = String.format("%d-%d-%d", index, term, timestamp.unixTimestamp());
id = String.format("%d-%d-%d", index, term, timestamp.unixTimestamp());
}

public static InMemorySnapshot newPersistedSnapshot(
Expand Down Expand Up @@ -161,33 +163,55 @@ public long index() {
}

@Override
public boolean containsChunk(final ByteBuffer chunkId) {
return chunks.containsKey(BufferUtil.bufferAsString(new UnsafeBuffer(chunkId)));
public ActorFuture<Boolean> apply(final SnapshotChunk chunk) throws IOException {
chunks.put(chunk.getChunkName(), StringUtil.fromBytes(chunk.getContent()));
return CompletableActorFuture.completed(true);
}

@Override
public boolean isExpectedChunk(final ByteBuffer chunkId) {
return chunkId.equals(nextExpected);
public ActorFuture<Void> abort() {
return CompletableActorFuture.completed(null);
}

@Override
public void setNextExpected(final ByteBuffer nextChunkId) {
nextExpected = nextChunkId;
public ActorFuture<PersistedSnapshot> persist() {
testSnapshotStore.newSnapshot(this);
return CompletableActorFuture.completed(this);
}

@Override
public boolean apply(final SnapshotChunk chunk) throws IOException {
chunks.put(chunk.getChunkName(), StringUtil.fromBytes(chunk.getContent()));
return true;
}
public SnapshotId snapshotId() {
return new SnapshotId() {
@Override
public long getIndex() {
return index;
}

@Override
public void abort() {}
@Override
public long getTerm() {
return term;
}

@Override
public PersistedSnapshot persist() {
testSnapshotStore.newSnapshot(this);
return this;
@Override
public long getProcessedPosition() {
return 0;
}

@Override
public long getExportedPosition() {
return 0;
}

@Override
public WallClockTimestamp getTimestamp() {
return timestamp;
}

@Override
public String getSnapshotIdAsString() {
return id;
}
};
}

@Override
Expand Down
Loading

0 comments on commit 3a2d486

Please sign in to comment.