diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java index 4badd09cd1..39b401dda1 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java @@ -227,7 +227,6 @@ private void startIfNew(Runnable starter) { CompletableFuture shutdown() { lifeCycle.checkStateAndClose(); - stopped.complete(null); return stopped; } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index 9ce7c4404e..5c472c826c 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -707,7 +707,7 @@ void submitStepDownEvent(long term, StepDownReason reason) { private void stepDown(long term, StepDownReason reason) { try { lease.getAndSetEnabled(false); - server.changeToFollowerAndPersistMetadata(term, false, reason); + server.changeToFollowerAndPersistMetadata(term, false, reason).join(); pendingStepDown.complete(server::newSuccessReply); } catch(IOException e) { final String s = this + ": Failed to persist metadata for term " + term; diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 2e748f4967..cfcdd1519d 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -133,7 +133,6 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -571,14 +570,8 @@ void setFirstElection(Object reason) { * @param force Force to start a new {@link FollowerState} even if this server is already a follower. * @return if the term/votedFor should be updated to the new term */ - private boolean changeToFollower(long newTerm, boolean force, boolean allowListener, Object reason) { - final AtomicReference metadataUpdated = new AtomicReference<>(); - changeToFollowerAsync(newTerm, force, allowListener, reason, metadataUpdated).join(); - return metadataUpdated.get(); - } - - private synchronized CompletableFuture changeToFollowerAsync( - long newTerm, boolean force, boolean allowListener, Object reason, AtomicReference metadataUpdated) { + private synchronized CompletableFuture changeToFollower( + long newTerm, boolean force, boolean allowListener, Object reason, AtomicBoolean metadataUpdated) { final RaftPeerRole old = role.getCurrentRole(); if (old == RaftPeerRole.LISTENER && !allowListener) { throw new IllegalStateException("Unexpected role " + old); @@ -613,13 +606,16 @@ private synchronized CompletableFuture changeToFollowerAsync( return future; } - synchronized void changeToFollowerAndPersistMetadata( + synchronized CompletableFuture changeToFollowerAndPersistMetadata( long newTerm, boolean allowListener, Object reason) throws IOException { - if (changeToFollower(newTerm, false, allowListener, reason)) { + final AtomicBoolean metadataUpdated = new AtomicBoolean(); + final CompletableFuture future = changeToFollower(newTerm, false, allowListener, reason, metadataUpdated); + if (metadataUpdated.get()) { state.persistMetadata(); } + return future; } synchronized void changeToLeader() { @@ -1451,6 +1447,7 @@ private RequestVoteReplyProto requestVote(Phase phase, boolean shouldShutdown = false; final RequestVoteReplyProto reply; + CompletableFuture future = null; synchronized (this) { // Check life cycle state again to avoid the PAUSING/PAUSED state. assertLifeCycleState(LifeCycle.States.RUNNING); @@ -1460,12 +1457,12 @@ private RequestVoteReplyProto requestVote(Phase phase, final boolean voteGranted = context.decideVote(candidate, candidateLastEntry); if (candidate != null && phase == Phase.ELECTION) { // change server state in the ELECTION phase - final boolean termUpdated = - changeToFollower(candidateTerm, true, false, "candidate:" + candidateId); + final AtomicBoolean termUpdated = new AtomicBoolean(); + future = changeToFollower(candidateTerm, true, false, "candidate:" + candidateId, termUpdated); if (voteGranted) { state.grantVote(candidate.getId()); } - if (termUpdated || voteGranted) { + if (termUpdated.get() || voteGranted) { state.persistMetadata(); // sync metafile } } @@ -1481,6 +1478,9 @@ private RequestVoteReplyProto requestVote(Phase phase, getMemberId(), phase, toRequestVoteReplyString(reply), state); } } + if (future != null) { + future.join(); + } return reply; } @@ -1582,6 +1582,7 @@ private CompletableFuture appendEntriesAsync(RaftPeerId final long followerCommit = state.getLog().getLastCommittedIndex(); final Optional followerState; final Timekeeper.Context timer = raftServerMetrics.getFollowerAppendEntryTimer(isHeartbeat).time(); + final CompletableFuture future; synchronized (this) { // Check life cycle state again to avoid the PAUSING/PAUSED state. assertLifeCycleState(LifeCycle.States.STARTING_OR_RUNNING); @@ -1593,7 +1594,7 @@ leaderId, getMemberId(), currentTerm, followerCommit, state.getNextIndex(), AppendResult.NOT_LEADER, callId, RaftLog.INVALID_LOG_INDEX, isHeartbeat)); } try { - changeToFollowerAndPersistMetadata(leaderTerm, true, "appendEntries"); + future = changeToFollowerAndPersistMetadata(leaderTerm, true, "appendEntries"); } catch (IOException e) { return JavaUtils.completeExceptionally(e); } @@ -1618,12 +1619,12 @@ leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextInde AppendResult.INCONSISTENCY, callId, RaftLog.INVALID_LOG_INDEX, isHeartbeat); LOG.info("{}: appendEntries* reply {}", getMemberId(), toAppendEntriesReplyString(reply)); followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE)); - return CompletableFuture.completedFuture(reply); + return future.thenApply(dummy -> reply); } state.updateConfiguration(entries); } - + future.join(); final List> futures = entries.isEmpty() ? Collections.emptyList() : state.getLog().append(requestRef.delegate(entries)); diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java index 537b384c64..8de9a37569 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java @@ -46,6 +46,7 @@ import java.io.IOException; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -122,12 +123,12 @@ private InstallSnapshotReplyProto installSnapshotImpl(InstallSnapshotRequestProt if (installSnapshotEnabled) { // Leader has sent InstallSnapshot request with SnapshotInfo. Install the snapshot. if (request.hasSnapshotChunk()) { - reply = checkAndInstallSnapshot(request, leaderId); + reply = checkAndInstallSnapshot(request, leaderId).join(); } } else { // Leader has only sent a notification to install snapshot. Inform State Machine to install snapshot. if (request.hasNotification()) { - reply = notifyStateMachineToInstallSnapshot(request, leaderId); + reply = notifyStateMachineToInstallSnapshot(request, leaderId).join(); } } @@ -156,21 +157,22 @@ private InstallSnapshotReplyProto installSnapshotImpl(InstallSnapshotRequestProt return failedReply; } - private InstallSnapshotReplyProto checkAndInstallSnapshot(InstallSnapshotRequestProto request, + private CompletableFuture checkAndInstallSnapshot(InstallSnapshotRequestProto request, RaftPeerId leaderId) throws IOException { final long currentTerm; final long leaderTerm = request.getLeaderTerm(); final InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunkRequest = request.getSnapshotChunk(); final TermIndex lastIncluded = TermIndex.valueOf(snapshotChunkRequest.getTermIndex()); final long lastIncludedIndex = lastIncluded.getIndex(); + final CompletableFuture future; synchronized (server) { final boolean recognized = state.recognizeLeader(RaftServerProtocol.Op.INSTALL_SNAPSHOT, leaderId, leaderTerm); currentTerm = state.getCurrentTerm(); if (!recognized) { - return toInstallSnapshotReplyProto(leaderId, getMemberId(), - currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.NOT_LEADER); + return CompletableFuture.completedFuture(toInstallSnapshotReplyProto(leaderId, getMemberId(), + currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.NOT_LEADER)); } - server.changeToFollowerAndPersistMetadata(leaderTerm, true, "installSnapshot"); + future = server.changeToFollowerAndPersistMetadata(leaderTerm, true, "installSnapshot"); state.setLeader(leaderId, "installSnapshot"); server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_START); @@ -186,8 +188,9 @@ private InstallSnapshotReplyProto checkAndInstallSnapshot(InstallSnapshotRequest // have a lot of requests if (state.getLog().getLastCommittedIndex() >= lastIncludedIndex) { nextChunkIndex.set(snapshotChunkRequest.getRequestIndex() + 1); - return toInstallSnapshotReplyProto(leaderId, getMemberId(), + final InstallSnapshotReplyProto reply = toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.ALREADY_INSTALLED); + return future.thenApply(dummy -> reply); } //TODO: We should only update State with installed snapshot once the request is done. @@ -210,25 +213,27 @@ private InstallSnapshotReplyProto checkAndInstallSnapshot(InstallSnapshotRequest if (snapshotChunkRequest.getDone()) { LOG.info("{}: successfully install the entire snapshot-{}", getMemberId(), lastIncludedIndex); } - return toInstallSnapshotReplyProto(leaderId, getMemberId(), + final InstallSnapshotReplyProto reply = toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.SUCCESS); + return future.thenApply(dummy -> reply); } - private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot( + private CompletableFuture notifyStateMachineToInstallSnapshot( InstallSnapshotRequestProto request, RaftPeerId leaderId) throws IOException { final long currentTerm; final long leaderTerm = request.getLeaderTerm(); final TermIndex firstAvailableLogTermIndex = TermIndex.valueOf( request.getNotification().getFirstAvailableTermIndex()); final long firstAvailableLogIndex = firstAvailableLogTermIndex.getIndex(); + final CompletableFuture future; synchronized (server) { final boolean recognized = state.recognizeLeader("notifyInstallSnapshot", leaderId, leaderTerm); currentTerm = state.getCurrentTerm(); if (!recognized) { - return toInstallSnapshotReplyProto(leaderId, getMemberId(), - currentTerm, InstallSnapshotResult.NOT_LEADER); + return CompletableFuture.completedFuture(toInstallSnapshotReplyProto(leaderId, getMemberId(), + currentTerm, InstallSnapshotResult.NOT_LEADER)); } - server.changeToFollowerAndPersistMetadata(leaderTerm, true, "installSnapshot"); + future = server.changeToFollowerAndPersistMetadata(leaderTerm, true, "installSnapshot"); state.setLeader(leaderId, "installSnapshot"); server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_NOTIFICATION); @@ -245,8 +250,9 @@ private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot( inProgressInstallSnapshotIndex.compareAndSet(firstAvailableLogIndex, INVALID_LOG_INDEX); LOG.info("{}: InstallSnapshot notification result: {}, current snapshot index: {}", getMemberId(), InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex); - return toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm, + final InstallSnapshotReplyProto reply = toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm, InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex); + return future.thenApply(dummy -> reply); } final RaftPeerProto leaderProto; @@ -323,8 +329,9 @@ private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot( inProgressInstallSnapshotIndex.set(INVALID_LOG_INDEX); server.getStateMachine().event().notifySnapshotInstalled( InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, INVALID_LOG_INDEX, server.getPeer()); - return toInstallSnapshotReplyProto(leaderId, getMemberId(), + final InstallSnapshotReplyProto reply = toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm, InstallSnapshotResult.SNAPSHOT_UNAVAILABLE); + return future.thenApply(dummy -> reply); } // If a snapshot has been installed, return SNAPSHOT_INSTALLED with the installed snapshot index and reset @@ -341,8 +348,9 @@ private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot( server.getStateMachine().event().notifySnapshotInstalled( InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledIndex, server.getPeer()); installedIndex.set(latestInstalledIndex); - return toInstallSnapshotReplyProto(leaderId, getMemberId(), + final InstallSnapshotReplyProto reply = toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm, InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledSnapshotTermIndex.getIndex()); + return future.thenApply(dummy -> reply); } // Otherwise, Snapshot installation is in progress. @@ -350,8 +358,9 @@ private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot( LOG.debug("{}: InstallSnapshot notification result: {}", getMemberId(), InstallSnapshotResult.IN_PROGRESS); } - return toInstallSnapshotReplyProto(leaderId, getMemberId(), + final InstallSnapshotReplyProto reply = toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm, InstallSnapshotResult.IN_PROGRESS); + return future.thenApply(dummy -> reply); } }