From 2b0ce8f5f9f6a641db15615296831a7643e5f324 Mon Sep 17 00:00:00 2001 From: dengchao <2326884052@qq.com> Date: Tue, 15 Oct 2024 21:44:15 +0800 Subject: [PATCH 01/28] format checkStyle use synchronized(server) instead of synchronized method format exception handle --- .../ratis/server/impl/FollowerState.java | 7 +- .../ratis/server/impl/LeaderElection.java | 22 +- .../ratis/server/impl/LeaderStateImpl.java | 2 +- .../ratis/server/impl/RaftServerImpl.java | 194 +++++++------ .../impl/SnapshotInstallationHandler.java | 261 +++++++++--------- 5 files changed, 273 insertions(+), 213 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java index 1be160f182..301a854d60 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java @@ -26,6 +26,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -98,6 +99,7 @@ boolean isCurrentLeaderValid() { CompletableFuture stopRunning() { this.isRunning = false; interrupt(); + stopped.complete(null); return stopped; } @@ -143,6 +145,7 @@ private boolean roleChangeChecking(TimeDuration electionTimeout) { private void runImpl() { final TimeDuration sleepDeviationThreshold = server.getSleepDeviationThreshold(); + Optional> future = Optional.empty(); while (shouldRun()) { final TimeDuration electionTimeout = server.getRandomElectionTimeout(); try { @@ -162,7 +165,7 @@ private void runImpl() { this, lastRpcTime.elapsedTime(), electionTimeout); server.getLeaderElectionMetrics().onLeaderElectionTimeout(); // Update timeout metric counters. // election timeout, should become a candidate - server.changeToCandidate(false); + future = Optional.ofNullable(server.changeToCandidate(false)); break; } } @@ -173,6 +176,8 @@ private void runImpl() { return; } catch (Exception e) { LOG.warn("{} caught an exception", this, e); + } finally { + future.ifPresent(CompletableFuture::join); } } } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java index 4badd09cd1..b8ee2623b3 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java @@ -256,7 +256,7 @@ private void runImpl() { for (int round = 0; shouldRun(); round++) { if (skipPreVote || askForVotes(Phase.PRE_VOTE, round)) { if (askForVotes(Phase.ELECTION, round)) { - server.changeToLeader(); + server.changeToLeader().join(); } } } @@ -334,28 +334,34 @@ private boolean askForVotes(Phase phase, int round) throws InterruptedException, final ResultAndTerm r = submitRequestAndWaitResult(phase, conf, electionTerm); LOG.info("{} {} round {}: result {}", this, phase, round, r); + RaftServerImpl.Pair> pair = askForVotesImpl(electionTerm, r); + pair.second.join(); + return pair.first; + } + + private RaftServerImpl.Pair> askForVotesImpl(long electionTerm, ResultAndTerm r) throws IOException { synchronized (server) { if (!shouldRun(electionTerm)) { - return false; // term already passed or this should not run anymore. + return RaftServerImpl.Pair.makePair(false, CompletableFuture.completedFuture(null)); // term already passed or this should not run anymore. } switch (r.getResult()) { case PASSED: case SINGLE_MODE_PASSED: - return true; + return RaftServerImpl.Pair.makePair(true, CompletableFuture.completedFuture(null)); case NOT_IN_CONF: case SHUTDOWN: server.close(); server.getStateMachine().event().notifyServerShutdown(server.getRoleInfoProto(), false); - return false; + return RaftServerImpl.Pair.makePair(false, CompletableFuture.completedFuture(null)); case TIMEOUT: - return false; // should retry + return RaftServerImpl.Pair.makePair(false, CompletableFuture.completedFuture(null)); // should retry case REJECTED: case DISCOVERED_A_NEW_TERM: final long term = r.maxTerm(server.getState().getCurrentTerm()); - server.changeToFollowerAndPersistMetadata(term, false, r); - return false; - default: throw new IllegalArgumentException("Unable to process result " + r.result); + return RaftServerImpl.Pair.makePair(false, server.changeToFollowerAndPersistMetadata(term, false, r)); + default: + throw new IllegalArgumentException("Unable to process result " + r.result); } } } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index 9ce7c4404e..5c472c826c 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -707,7 +707,7 @@ void submitStepDownEvent(long term, StepDownReason reason) { private void stepDown(long term, StepDownReason reason) { try { lease.getAndSetEnabled(false); - server.changeToFollowerAndPersistMetadata(term, false, reason); + server.changeToFollowerAndPersistMetadata(term, false, reason).join(); pendingStepDown.complete(server::newSuccessReply); } catch(IOException e) { final String s = this + ": Failed to persist metadata for term " + term; diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 2e748f4967..94dbd3c328 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -565,16 +565,30 @@ void setFirstElection(Object reason) { } } + static class Pair { + public final U first; + public final V second; + + Pair(U metadataUpdated, V future) { + this.first = metadataUpdated; + this.second = future; + } + + public static Pair makePair(U first, V second) { + return new Pair<>(first, second); + } + } + /** * Change the server state to Follower if this server is in a different role or force is true. * @param newTerm The new term. * @param force Force to start a new {@link FollowerState} even if this server is already a follower. * @return if the term/votedFor should be updated to the new term */ - private boolean changeToFollower(long newTerm, boolean force, boolean allowListener, Object reason) { + private Pair> changeToFollower(long newTerm, boolean force, boolean allowListener, Object reason) { final AtomicReference metadataUpdated = new AtomicReference<>(); - changeToFollowerAsync(newTerm, force, allowListener, reason, metadataUpdated).join(); - return metadataUpdated.get(); + CompletableFuture future = changeToFollowerAsync(newTerm, force, allowListener, reason, metadataUpdated); + return new Pair<>(metadataUpdated.get(), future); } private synchronized CompletableFuture changeToFollowerAsync( @@ -613,24 +627,32 @@ private synchronized CompletableFuture changeToFollowerAsync( return future; } - synchronized void changeToFollowerAndPersistMetadata( + synchronized CompletableFuture changeToFollowerAndPersistMetadata( long newTerm, boolean allowListener, Object reason) throws IOException { - if (changeToFollower(newTerm, false, allowListener, reason)) { - state.persistMetadata(); + final Pair> pair = changeToFollower(newTerm, false, allowListener, reason); + try { + if (pair.first) { + state.persistMetadata(); + } + return pair.second; + } catch (IOException e) { + pair.second.join(); + throw e; } } - synchronized void changeToLeader() { + synchronized CompletableFuture changeToLeader() { Preconditions.assertTrue(getInfo().isCandidate()); - role.shutdownLeaderElection(); + CompletableFuture future = role.shutdownLeaderElection(); setRole(RaftPeerRole.LEADER, "changeToLeader"); final LeaderStateImpl leader = role.updateLeaderState(this); state.becomeLeader(); // start sending AppendEntries RPC to followers leader.start(); + return future; } @Override @@ -684,15 +706,16 @@ RoleInfoProto getRoleInfoProto() { return role.buildRoleInfoProto(this); } - synchronized void changeToCandidate(boolean forceStartLeaderElection) { + synchronized CompletableFuture changeToCandidate(boolean forceStartLeaderElection) { Preconditions.assertTrue(getInfo().isFollower()); - role.shutdownFollowerState(); + CompletableFuture future = role.shutdownFollowerState(); setRole(RaftPeerRole.CANDIDATE, "changeToCandidate"); if (state.shouldNotifyExtendedNoLeader()) { stateMachine.followerEvent().notifyExtendedNoLeader(getRoleInfoProto()); } // start election role.startLeaderElection(this, forceStartLeaderElection); + return future; } @Override @@ -1448,42 +1471,47 @@ private RequestVoteReplyProto requestVote(Phase phase, getMemberId(), phase, candidateId, candidateGroupId, candidateTerm, candidateLastEntry); assertLifeCycleState(LifeCycle.States.RUNNING); assertGroup(getMemberId(), candidateId, candidateGroupId); + Pair> replyAndFuture = requestVoteImpl(phase, candidateId, candidateGroupId, candidateTerm, candidateLastEntry); + replyAndFuture.second.join(); + return replyAndFuture.first; + } + synchronized Pair> requestVoteImpl(Phase phase, + RaftPeerId candidateId, RaftGroupId candidateGroupId, + long candidateTerm, TermIndex candidateLastEntry) throws IOException { + // Check life cycle state again to avoid the PAUSING/PAUSED state. + assertLifeCycleState(LifeCycle.States.RUNNING); boolean shouldShutdown = false; - final RequestVoteReplyProto reply; - synchronized (this) { - // Check life cycle state again to avoid the PAUSING/PAUSED state. - assertLifeCycleState(LifeCycle.States.RUNNING); - - final VoteContext context = new VoteContext(this, phase, candidateId); - final RaftPeer candidate = context.recognizeCandidate(candidateTerm); - final boolean voteGranted = context.decideVote(candidate, candidateLastEntry); - if (candidate != null && phase == Phase.ELECTION) { - // change server state in the ELECTION phase - final boolean termUpdated = - changeToFollower(candidateTerm, true, false, "candidate:" + candidateId); - if (voteGranted) { - state.grantVote(candidate.getId()); - } - if (termUpdated || voteGranted) { - state.persistMetadata(); // sync metafile - } - } + final VoteContext context = new VoteContext(this, phase, candidateId); + final RaftPeer candidate = context.recognizeCandidate(candidateTerm); + final boolean voteGranted = context.decideVote(candidate, candidateLastEntry); + Pair> termUpdatedAndFuturePair = null; + if (candidate != null && phase == Phase.ELECTION) { + // change server state in the ELECTION phase + termUpdatedAndFuturePair = + changeToFollower(candidateTerm, true, false, "candidate:" + candidateId); if (voteGranted) { - role.getFollowerState().ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.REQUEST_VOTE)); - } else if(shouldSendShutdown(candidateId, candidateLastEntry)) { - shouldShutdown = true; + state.grantVote(candidate.getId()); + } + if (termUpdatedAndFuturePair.first || voteGranted) { + state.persistMetadata(); // sync metafile } - reply = toRequestVoteReplyProto(candidateId, getMemberId(), + } + if (voteGranted) { + role.getFollowerState().ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.REQUEST_VOTE)); + } else if(shouldSendShutdown(candidateId, candidateLastEntry)) { + shouldShutdown = true; + } + RequestVoteReplyProto reply = toRequestVoteReplyProto(candidateId, getMemberId(), voteGranted, state.getCurrentTerm(), shouldShutdown); - if (LOG.isInfoEnabled()) { - LOG.info("{} replies to {} vote request: {}. Peer's state: {}", + if (LOG.isInfoEnabled()) { + LOG.info("{} replies to {} vote request: {}. Peer's state: {}", getMemberId(), phase, toRequestVoteReplyString(reply), state); - } } - return reply; + return Pair.makePair(reply, termUpdatedAndFuturePair != null + ? termUpdatedAndFuturePair.second + : CompletableFuture.completedFuture(null)); } - @Override public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto r) throws IOException { @@ -1582,46 +1610,51 @@ private CompletableFuture appendEntriesAsync(RaftPeerId final long followerCommit = state.getLog().getLastCommittedIndex(); final Optional followerState; final Timekeeper.Context timer = raftServerMetrics.getFollowerAppendEntryTimer(isHeartbeat).time(); - synchronized (this) { - // Check life cycle state again to avoid the PAUSING/PAUSED state. - assertLifeCycleState(LifeCycle.States.STARTING_OR_RUNNING); - currentTerm = state.getCurrentTerm(); - final boolean recognized = state.recognizeLeader(Op.APPEND_ENTRIES, leaderId, leaderTerm); - if (!recognized) { - return CompletableFuture.completedFuture(toAppendEntriesReplyProto( - leaderId, getMemberId(), currentTerm, followerCommit, state.getNextIndex(), - AppendResult.NOT_LEADER, callId, RaftLog.INVALID_LOG_INDEX, isHeartbeat)); - } - try { - changeToFollowerAndPersistMetadata(leaderTerm, true, "appendEntries"); - } catch (IOException e) { - return JavaUtils.completeExceptionally(e); - } - state.setLeader(leaderId, "appendEntries"); + Optional> future = Optional.empty(); + try { + synchronized (this) { + // Check life cycle state again to avoid the PAUSING/PAUSED state. + assertLifeCycleState(LifeCycle.States.STARTING_OR_RUNNING); + currentTerm = state.getCurrentTerm(); + final boolean recognized = state.recognizeLeader(Op.APPEND_ENTRIES, leaderId, leaderTerm); + if (!recognized) { + return CompletableFuture.completedFuture(toAppendEntriesReplyProto( + leaderId, getMemberId(), currentTerm, followerCommit, state.getNextIndex(), + AppendResult.NOT_LEADER, callId, RaftLog.INVALID_LOG_INDEX, isHeartbeat)); + } + try { + future = Optional.ofNullable(changeToFollowerAndPersistMetadata(leaderTerm, true, "appendEntries")); + } catch (IOException e) { + return JavaUtils.completeExceptionally(e); + } + state.setLeader(leaderId, "appendEntries"); - if (!proto.getInitializing() && lifeCycle.compareAndTransition(State.STARTING, State.RUNNING)) { - role.startFollowerState(this, Op.APPEND_ENTRIES); - } - followerState = updateLastRpcTime(FollowerState.UpdateType.APPEND_START); - - // Check that the append entries are not inconsistent. There are 3 - // scenarios which can result in inconsistency: - // 1. There is a snapshot installation in progress - // 2. There is an overlap between the snapshot index and the entries - // 3. There is a gap between the local log and the entries - // In any of these scenarios, we should return an INCONSISTENCY reply - // back to leader so that the leader can update this follower's next index. - final long inconsistencyReplyNextIndex = checkInconsistentAppendEntries(previous, entries); - if (inconsistencyReplyNextIndex > RaftLog.INVALID_LOG_INDEX) { - final AppendEntriesReplyProto reply = toAppendEntriesReplyProto( - leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextIndex, - AppendResult.INCONSISTENCY, callId, RaftLog.INVALID_LOG_INDEX, isHeartbeat); - LOG.info("{}: appendEntries* reply {}", getMemberId(), toAppendEntriesReplyString(reply)); - followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE)); - return CompletableFuture.completedFuture(reply); - } + if (!proto.getInitializing() && lifeCycle.compareAndTransition(State.STARTING, State.RUNNING)) { + role.startFollowerState(this, Op.APPEND_ENTRIES); + } + followerState = updateLastRpcTime(FollowerState.UpdateType.APPEND_START); + + // Check that the append entries are not inconsistent. There are 3 + // scenarios which can result in inconsistency: + // 1. There is a snapshot installation in progress + // 2. There is an overlap between the snapshot index and the entries + // 3. There is a gap between the local log and the entries + // In any of these scenarios, we should return an INCONSISTENCY reply + // back to leader so that the leader can update this follower's next index. + final long inconsistencyReplyNextIndex = checkInconsistentAppendEntries(previous, entries); + if (inconsistencyReplyNextIndex > RaftLog.INVALID_LOG_INDEX) { + final AppendEntriesReplyProto reply = toAppendEntriesReplyProto( + leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextIndex, + AppendResult.INCONSISTENCY, callId, RaftLog.INVALID_LOG_INDEX, isHeartbeat); + LOG.info("{}: appendEntries* reply {}", getMemberId(), toAppendEntriesReplyString(reply)); + followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE)); + return CompletableFuture.completedFuture(reply); + } - state.updateConfiguration(entries); + state.updateConfiguration(entries); + } + } finally { + future.ifPresent(CompletableFuture::join); } @@ -1749,7 +1782,8 @@ public StartLeaderElectionReplyProto startLeaderElection(StartLeaderElectionRequ assertLifeCycleState(LifeCycle.States.RUNNING); assertGroup(getMemberId(), leaderId, leaderGroupId); - + Optional> future = Optional.empty(); + StartLeaderElectionReplyProto reply; synchronized (this) { // Check life cycle state again to avoid the PAUSING/PAUSED state. assertLifeCycleState(LifeCycle.States.STARTING_OR_RUNNING); @@ -1770,9 +1804,11 @@ public StartLeaderElectionReplyProto startLeaderElection(StartLeaderElectionRequ return toStartLeaderElectionReplyProto(leaderId, getMemberId(), false); } - changeToCandidate(true); - return toStartLeaderElectionReplyProto(leaderId, getMemberId(), true); + future = Optional.of(changeToCandidate(true)); + reply = toStartLeaderElectionReplyProto(leaderId, getMemberId(), true); } + future.ifPresent(CompletableFuture::join); + return reply; } void submitUpdateCommitEvent() { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java index 537b384c64..06c10c8632 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java @@ -46,6 +46,7 @@ import java.io.IOException; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -163,49 +164,54 @@ private InstallSnapshotReplyProto checkAndInstallSnapshot(InstallSnapshotRequest final InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunkRequest = request.getSnapshotChunk(); final TermIndex lastIncluded = TermIndex.valueOf(snapshotChunkRequest.getTermIndex()); final long lastIncludedIndex = lastIncluded.getIndex(); - synchronized (server) { - final boolean recognized = state.recognizeLeader(RaftServerProtocol.Op.INSTALL_SNAPSHOT, leaderId, leaderTerm); - currentTerm = state.getCurrentTerm(); - if (!recognized) { - return toInstallSnapshotReplyProto(leaderId, getMemberId(), - currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.NOT_LEADER); - } - server.changeToFollowerAndPersistMetadata(leaderTerm, true, "installSnapshot"); - state.setLeader(leaderId, "installSnapshot"); - - server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_START); - if (snapshotChunkRequest.getRequestIndex() == 0) { - nextChunkIndex.set(0); - } else if (nextChunkIndex.get() != snapshotChunkRequest.getRequestIndex()) { - throw new IOException("Snapshot request already failed at chunk index " + nextChunkIndex.get() - + "; ignoring request with chunk index " + snapshotChunkRequest.getRequestIndex()); - } - try { - // Check and append the snapshot chunk. We simply put this in lock - // considering a follower peer requiring a snapshot installation does not - // have a lot of requests - if (state.getLog().getLastCommittedIndex() >= lastIncludedIndex) { - nextChunkIndex.set(snapshotChunkRequest.getRequestIndex() + 1); + Optional> future = Optional.empty(); + try { + synchronized (server) { + final boolean recognized = state.recognizeLeader(RaftServerProtocol.Op.INSTALL_SNAPSHOT, leaderId, leaderTerm); + currentTerm = state.getCurrentTerm(); + if (!recognized) { return toInstallSnapshotReplyProto(leaderId, getMemberId(), - currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.ALREADY_INSTALLED); + currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.NOT_LEADER); } - - //TODO: We should only update State with installed snapshot once the request is done. - state.installSnapshot(request); - - final int expectedChunkIndex = nextChunkIndex.getAndIncrement(); - if (expectedChunkIndex != snapshotChunkRequest.getRequestIndex()) { - throw new IOException("Unexpected request chunk index: " + snapshotChunkRequest.getRequestIndex() - + " (the expected index is " + expectedChunkIndex + ")"); + future = Optional.ofNullable(server.changeToFollowerAndPersistMetadata(leaderTerm, true, "installSnapshot")); + state.setLeader(leaderId, "installSnapshot"); + + server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_START); + if (snapshotChunkRequest.getRequestIndex() == 0) { + nextChunkIndex.set(0); + } else if (nextChunkIndex.get() != snapshotChunkRequest.getRequestIndex()) { + throw new IOException("Snapshot request already failed at chunk index " + nextChunkIndex.get() + + "; ignoring request with chunk index " + snapshotChunkRequest.getRequestIndex()); } - // update the committed index - // re-load the state machine if this is the last chunk - if (snapshotChunkRequest.getDone()) { - state.reloadStateMachine(lastIncluded); + try { + // Check and append the snapshot chunk. We simply put this in lock + // considering a follower peer requiring a snapshot installation does not + // have a lot of requests + if (state.getLog().getLastCommittedIndex() >= lastIncludedIndex) { + nextChunkIndex.set(snapshotChunkRequest.getRequestIndex() + 1); + return toInstallSnapshotReplyProto(leaderId, getMemberId(), + currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.ALREADY_INSTALLED); + } + + //TODO: We should only update State with installed snapshot once the request is done. + state.installSnapshot(request); + + final int expectedChunkIndex = nextChunkIndex.getAndIncrement(); + if (expectedChunkIndex != snapshotChunkRequest.getRequestIndex()) { + throw new IOException("Unexpected request chunk index: " + snapshotChunkRequest.getRequestIndex() + + " (the expected index is " + expectedChunkIndex + ")"); + } + // update the committed index + // re-load the state machine if this is the last chunk + if (snapshotChunkRequest.getDone()) { + state.reloadStateMachine(lastIncluded); + } + } finally { + server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_COMPLETE); } - } finally { - server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_COMPLETE); } + } finally { + future.ifPresent(CompletableFuture::join); } if (snapshotChunkRequest.getDone()) { LOG.info("{}: successfully install the entire snapshot-{}", getMemberId(), lastIncludedIndex); @@ -216,71 +222,79 @@ private InstallSnapshotReplyProto checkAndInstallSnapshot(InstallSnapshotRequest private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot( InstallSnapshotRequestProto request, RaftPeerId leaderId) throws IOException { - final long currentTerm; final long leaderTerm = request.getLeaderTerm(); final TermIndex firstAvailableLogTermIndex = TermIndex.valueOf( request.getNotification().getFirstAvailableTermIndex()); final long firstAvailableLogIndex = firstAvailableLogTermIndex.getIndex(); - synchronized (server) { - final boolean recognized = state.recognizeLeader("notifyInstallSnapshot", leaderId, leaderTerm); - currentTerm = state.getCurrentTerm(); - if (!recognized) { - return toInstallSnapshotReplyProto(leaderId, getMemberId(), - currentTerm, InstallSnapshotResult.NOT_LEADER); - } - server.changeToFollowerAndPersistMetadata(leaderTerm, true, "installSnapshot"); - state.setLeader(leaderId, "installSnapshot"); - server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_NOTIFICATION); - - if (inProgressInstallSnapshotIndex.compareAndSet(INVALID_LOG_INDEX, firstAvailableLogIndex)) { - LOG.info("{}: Received notification to install snapshot at index {}", getMemberId(), firstAvailableLogIndex); - // Check if snapshot index is already at par or ahead of the first - // available log index of the Leader. - final long snapshotIndex = state.getLog().getSnapshotIndex(); - if (snapshotIndex != INVALID_LOG_INDEX && snapshotIndex + 1 >= firstAvailableLogIndex && + RaftServerImpl.Pair> pair = + notifyStateMachineToInstallSnapshotImpl(request, leaderId, + leaderTerm, firstAvailableLogTermIndex, firstAvailableLogIndex); + pair.second.join(); + return pair.first; + } + + synchronized private RaftServerImpl.Pair> + notifyStateMachineToInstallSnapshotImpl(InstallSnapshotRequestProto request, RaftPeerId leaderId, + long leaderTerm, TermIndex firstAvailableLogTermIndex, long firstAvailableLogIndex) throws IOException { + final boolean recognized = state.recognizeLeader("notifyInstallSnapshot", leaderId, leaderTerm); + final long currentTerm = state.getCurrentTerm(); + if (!recognized) { + return RaftServerImpl.Pair.makePair(toInstallSnapshotReplyProto(leaderId, getMemberId(), + currentTerm, InstallSnapshotResult.NOT_LEADER), CompletableFuture.completedFuture(null)); + } + CompletableFuture future = server.changeToFollowerAndPersistMetadata(leaderTerm, true, "installSnapshot"); + state.setLeader(leaderId, "installSnapshot"); + server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_NOTIFICATION); + + if (inProgressInstallSnapshotIndex.compareAndSet(INVALID_LOG_INDEX, firstAvailableLogIndex)) { + LOG.info("{}: Received notification to install snapshot at index {}", getMemberId(), firstAvailableLogIndex); + // Check if snapshot index is already at par or ahead of the first + // available log index of the Leader. + final long snapshotIndex = state.getLog().getSnapshotIndex(); + if (snapshotIndex != INVALID_LOG_INDEX && snapshotIndex + 1 >= firstAvailableLogIndex && firstAvailableLogIndex > INVALID_LOG_INDEX) { - // State Machine has already installed the snapshot. Return the - // latest snapshot index to the Leader. + // State Machine has already installed the snapshot. Return the + // latest snapshot index to the Leader. - inProgressInstallSnapshotIndex.compareAndSet(firstAvailableLogIndex, INVALID_LOG_INDEX); - LOG.info("{}: InstallSnapshot notification result: {}, current snapshot index: {}", getMemberId(), + inProgressInstallSnapshotIndex.compareAndSet(firstAvailableLogIndex, INVALID_LOG_INDEX); + LOG.info("{}: InstallSnapshot notification result: {}, current snapshot index: {}", getMemberId(), InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex); - return toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm, - InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex); - } + return RaftServerImpl.Pair.makePair(toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm, + InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex), future); + } - final RaftPeerProto leaderProto; - if (!request.hasLastRaftConfigurationLogEntryProto()) { - leaderProto = null; - } else { - leaderProto = request.getLastRaftConfigurationLogEntryProto().getConfigurationEntry().getPeersList() + final RaftPeerProto leaderProto; + if (!request.hasLastRaftConfigurationLogEntryProto()) { + leaderProto = null; + } else { + leaderProto = request.getLastRaftConfigurationLogEntryProto().getConfigurationEntry().getPeersList() .stream() .filter(p -> RaftPeerId.valueOf(p.getId()).equals(leaderId)) .findFirst() .orElseThrow(() -> new IllegalArgumentException("Leader " + leaderId + " not found from the last configuration LogEntryProto, request = " + request)); - } + } - // For the cases where RaftConf is empty on newly started peer with empty peer list, - // we retrieve leader info from installSnapShotRequestProto. - final RoleInfoProto proto = leaderProto == null || server.getRaftConf().getPeer(state.getLeaderId()) != null? + // For the cases where RaftConf is empty on newly started peer with empty peer list, + // we retrieve leader info from installSnapShotRequestProto. + final RoleInfoProto proto = leaderProto == null || server.getRaftConf().getPeer(state.getLeaderId()) != null? server.getRoleInfoProto(): getRoleInfoProto(ProtoUtils.toRaftPeer(leaderProto)); - // This is the first installSnapshot notify request for this term and - // index. Notify the state machine to install the snapshot. - LOG.info("{}: notifyInstallSnapshot: nextIndex is {} but the leader's first available index is {}.", + // This is the first installSnapshot notify request for this term and + // index. Notify the state machine to install the snapshot. + LOG.info("{}: notifyInstallSnapshot: nextIndex is {} but the leader's first available index is {}.", getMemberId(), state.getLog().getNextIndex(), firstAvailableLogIndex); - // If notifyInstallSnapshotFromLeader future is done asynchronously, the main thread will go through the - // downside part. As the time consumed by user-defined statemachine is uncontrollable(e.g. the RocksDB - // checkpoint could be constantly increasing, the transmission will always exceed one boundary), we expect that - // once snapshot installed, follower could work ASAP. For the rest of time, server can respond snapshot - // installation in progress. - - // There is another appendLog thread appending raft entries, which returns inconsistency entries with - // nextIndex and commitIndex to the leader when install snapshot in progress. The nextIndex on follower side - // is updated when state.reloadStateMachine. We shall keep this index upgraded synchronously with main thread, - // otherwise leader could get this follower's latest nextIndex from appendEntries instead of after - // acknowledging the SNAPSHOT_INSTALLED. - server.getStateMachine().followerEvent().notifyInstallSnapshotFromLeader(proto, firstAvailableLogTermIndex) + // If notifyInstallSnapshotFromLeader future is done asynchronously, the main thread will go through the + // downside part. As the time consumed by user-defined statemachine is uncontrollable(e.g. the RocksDB + // checkpoint could be constantly increasing, the transmission will always exceed one boundary), we expect that + // once snapshot installed, follower could work ASAP. For the rest of time, server can respond snapshot + // installation in progress. + + // There is another appendLog thread appending raft entries, which returns inconsistency entries with + // nextIndex and commitIndex to the leader when install snapshot in progress. The nextIndex on follower side + // is updated when state.reloadStateMachine. We shall keep this index upgraded synchronously with main thread, + // otherwise leader could get this follower's latest nextIndex from appendEntries instead of after + // acknowledging the SNAPSHOT_INSTALLED. + server.getStateMachine().followerEvent().notifyInstallSnapshotFromLeader(proto, firstAvailableLogTermIndex) .whenComplete((reply, exception) -> { if (exception != null) { LOG.error("{}: Failed to notify StateMachine to InstallSnapshot. Exception: {}", @@ -301,58 +315,57 @@ private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot( } }); - if (LOG.isDebugEnabled()) { - LOG.debug("{}: StateMachine is processing Snapshot Installation Request.", getMemberId()); - } - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("{}: StateMachine is already installing a snapshot.", getMemberId()); - } + if (LOG.isDebugEnabled()) { + LOG.debug("{}: StateMachine is processing Snapshot Installation Request.", getMemberId()); + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("{}: StateMachine is already installing a snapshot.", getMemberId()); } + } - final long inProgressInstallSnapshotIndexValue = getInProgressInstallSnapshotIndex(); - Preconditions.assertTrue(inProgressInstallSnapshotIndexValue <= firstAvailableLogIndex + final long inProgressInstallSnapshotIndexValue = getInProgressInstallSnapshotIndex(); + Preconditions.assertTrue(inProgressInstallSnapshotIndexValue <= firstAvailableLogIndex && inProgressInstallSnapshotIndexValue > INVALID_LOG_INDEX, "inProgressInstallSnapshotRequest: %s is not eligible, firstAvailableLogIndex: %s", getInProgressInstallSnapshotIndex(), firstAvailableLogIndex); - // If the snapshot is null or unavailable, return SNAPSHOT_UNAVAILABLE. - if (isSnapshotNull.compareAndSet(true, false)) { - LOG.info("{}: InstallSnapshot notification result: {}", getMemberId(), + // If the snapshot is null or unavailable, return SNAPSHOT_UNAVAILABLE. + if (isSnapshotNull.compareAndSet(true, false)) { + LOG.info("{}: InstallSnapshot notification result: {}", getMemberId(), InstallSnapshotResult.SNAPSHOT_UNAVAILABLE); - inProgressInstallSnapshotIndex.set(INVALID_LOG_INDEX); - server.getStateMachine().event().notifySnapshotInstalled( + inProgressInstallSnapshotIndex.set(INVALID_LOG_INDEX); + server.getStateMachine().event().notifySnapshotInstalled( InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, INVALID_LOG_INDEX, server.getPeer()); - return toInstallSnapshotReplyProto(leaderId, getMemberId(), - currentTerm, InstallSnapshotResult.SNAPSHOT_UNAVAILABLE); - } + return RaftServerImpl.Pair.makePair(toInstallSnapshotReplyProto(leaderId, getMemberId(), + currentTerm, InstallSnapshotResult.SNAPSHOT_UNAVAILABLE), future); + } - // If a snapshot has been installed, return SNAPSHOT_INSTALLED with the installed snapshot index and reset - // installedSnapshotIndex to (0,-1). - final TermIndex latestInstalledSnapshotTermIndex = this.installedSnapshotTermIndex + // If a snapshot has been installed, return SNAPSHOT_INSTALLED with the installed snapshot index and reset + // installedSnapshotIndex to (0,-1). + final TermIndex latestInstalledSnapshotTermIndex = this.installedSnapshotTermIndex .getAndSet(INVALID_TERM_INDEX); - if (latestInstalledSnapshotTermIndex.getIndex() > INVALID_LOG_INDEX) { - server.getStateMachine().pause(); - state.reloadStateMachine(latestInstalledSnapshotTermIndex); - LOG.info("{}: InstallSnapshot notification result: {}, at index: {}", getMemberId(), + if (latestInstalledSnapshotTermIndex.getIndex() > INVALID_LOG_INDEX) { + server.getStateMachine().pause(); + state.reloadStateMachine(latestInstalledSnapshotTermIndex); + LOG.info("{}: InstallSnapshot notification result: {}, at index: {}", getMemberId(), InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledSnapshotTermIndex); - inProgressInstallSnapshotIndex.set(INVALID_LOG_INDEX); - final long latestInstalledIndex = latestInstalledSnapshotTermIndex.getIndex(); - server.getStateMachine().event().notifySnapshotInstalled( + inProgressInstallSnapshotIndex.set(INVALID_LOG_INDEX); + final long latestInstalledIndex = latestInstalledSnapshotTermIndex.getIndex(); + server.getStateMachine().event().notifySnapshotInstalled( InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledIndex, server.getPeer()); - installedIndex.set(latestInstalledIndex); - return toInstallSnapshotReplyProto(leaderId, getMemberId(), - currentTerm, InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledSnapshotTermIndex.getIndex()); - } + installedIndex.set(latestInstalledIndex); + return RaftServerImpl.Pair.makePair(toInstallSnapshotReplyProto(leaderId, getMemberId(), + currentTerm, InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledSnapshotTermIndex.getIndex()), future); + } - // Otherwise, Snapshot installation is in progress. - if (LOG.isDebugEnabled()) { - LOG.debug("{}: InstallSnapshot notification result: {}", getMemberId(), + // Otherwise, Snapshot installation is in progress. + if (LOG.isDebugEnabled()) { + LOG.debug("{}: InstallSnapshot notification result: {}", getMemberId(), InstallSnapshotResult.IN_PROGRESS); - } - return toInstallSnapshotReplyProto(leaderId, getMemberId(), - currentTerm, InstallSnapshotResult.IN_PROGRESS); } + return RaftServerImpl.Pair.makePair(toInstallSnapshotReplyProto(leaderId, getMemberId(), + currentTerm, InstallSnapshotResult.IN_PROGRESS), future); } private RoleInfoProto getRoleInfoProto(RaftPeer leader) { From d4e072c3bca77689b87d8aea59a1a71b6a85e616 Mon Sep 17 00:00:00 2001 From: dengchao <2326884052@qq.com> Date: Wed, 16 Oct 2024 12:04:33 +0800 Subject: [PATCH 02/28] checkstyle --- .../apache/ratis/server/impl/LeaderElection.java | 10 ++++++---- .../apache/ratis/server/impl/RaftServerImpl.java | 14 ++++++++++++-- .../server/impl/SnapshotInstallationHandler.java | 4 ++-- 3 files changed, 20 insertions(+), 8 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java index b8ee2623b3..ffd9ac8b07 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java @@ -335,14 +335,16 @@ private boolean askForVotes(Phase phase, int round) throws InterruptedException, LOG.info("{} {} round {}: result {}", this, phase, round, r); RaftServerImpl.Pair> pair = askForVotesImpl(electionTerm, r); - pair.second.join(); - return pair.first; + pair.second().join(); + return pair.first(); } - private RaftServerImpl.Pair> askForVotesImpl(long electionTerm, ResultAndTerm r) throws IOException { + private RaftServerImpl.Pair> + askForVotesImpl(long electionTerm, ResultAndTerm r) throws IOException { synchronized (server) { if (!shouldRun(electionTerm)) { - return RaftServerImpl.Pair.makePair(false, CompletableFuture.completedFuture(null)); // term already passed or this should not run anymore. + return RaftServerImpl.Pair.makePair(false, + CompletableFuture.completedFuture(null)); // term already passed or this should not run anymore. } switch (r.getResult()) { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 94dbd3c328..ec54da20ad 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -566,8 +566,8 @@ void setFirstElection(Object reason) { } static class Pair { - public final U first; - public final V second; + private final U first; + private final V second; Pair(U metadataUpdated, V future) { this.first = metadataUpdated; @@ -577,6 +577,16 @@ static class Pair { public static Pair makePair(U first, V second) { return new Pair<>(first, second); } + + U first() { + return first; + } + + V second() { + return second; + } + + } /** diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java index 06c10c8632..c6cc1c20f4 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java @@ -229,8 +229,8 @@ private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot( RaftServerImpl.Pair> pair = notifyStateMachineToInstallSnapshotImpl(request, leaderId, leaderTerm, firstAvailableLogTermIndex, firstAvailableLogIndex); - pair.second.join(); - return pair.first; + pair.second().join(); + return pair.first(); } synchronized private RaftServerImpl.Pair> From 0b5df033416083a3acf5294c1093d09090f3d14c Mon Sep 17 00:00:00 2001 From: dengchao <2326884052@qq.com> Date: Wed, 16 Oct 2024 21:20:57 +0800 Subject: [PATCH 03/28] checkstyle --- .../main/java/org/apache/ratis/server/impl/FollowerState.java | 1 - .../main/java/org/apache/ratis/server/impl/RaftServerImpl.java | 3 +-- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java index 301a854d60..0858caaa58 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java @@ -99,7 +99,6 @@ boolean isCurrentLeaderValid() { CompletableFuture stopRunning() { this.isRunning = false; interrupt(); - stopped.complete(null); return stopped; } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index ec54da20ad..8229437744 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -586,7 +586,6 @@ V second() { return second; } - } /** @@ -1486,7 +1485,7 @@ private RequestVoteReplyProto requestVote(Phase phase, return replyAndFuture.first; } - synchronized Pair> requestVoteImpl(Phase phase, + private synchronized Pair> requestVoteImpl(Phase phase, RaftPeerId candidateId, RaftGroupId candidateGroupId, long candidateTerm, TermIndex candidateLastEntry) throws IOException { // Check life cycle state again to avoid the PAUSING/PAUSED state. From 4c6719f5c6574f93b3feb96149b0413044cf6286 Mon Sep 17 00:00:00 2001 From: dengchao <2326884052@qq.com> Date: Thu, 17 Oct 2024 10:26:12 +0800 Subject: [PATCH 04/28] checkstyle --- .../ratis/server/impl/RaftServerImpl.java | 6 +- .../impl/SnapshotInstallationHandler.java | 273 +++++++++--------- 2 files changed, 142 insertions(+), 137 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 8229437744..e64c76b09d 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -594,7 +594,8 @@ V second() { * @param force Force to start a new {@link FollowerState} even if this server is already a follower. * @return if the term/votedFor should be updated to the new term */ - private Pair> changeToFollower(long newTerm, boolean force, boolean allowListener, Object reason) { + private Pair> changeToFollower( + long newTerm, boolean force, boolean allowListener, Object reason) { final AtomicReference metadataUpdated = new AtomicReference<>(); CompletableFuture future = changeToFollowerAsync(newTerm, force, allowListener, reason, metadataUpdated); return new Pair<>(metadataUpdated.get(), future); @@ -1480,7 +1481,8 @@ private RequestVoteReplyProto requestVote(Phase phase, getMemberId(), phase, candidateId, candidateGroupId, candidateTerm, candidateLastEntry); assertLifeCycleState(LifeCycle.States.RUNNING); assertGroup(getMemberId(), candidateId, candidateGroupId); - Pair> replyAndFuture = requestVoteImpl(phase, candidateId, candidateGroupId, candidateTerm, candidateLastEntry); + Pair> replyAndFuture = + requestVoteImpl(phase, candidateId, candidateGroupId, candidateTerm, candidateLastEntry); replyAndFuture.second.join(); return replyAndFuture.first; } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java index c6cc1c20f4..1e0fa9a767 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java @@ -233,154 +233,157 @@ private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot( return pair.first(); } - synchronized private RaftServerImpl.Pair> + private RaftServerImpl.Pair> notifyStateMachineToInstallSnapshotImpl(InstallSnapshotRequestProto request, RaftPeerId leaderId, long leaderTerm, TermIndex firstAvailableLogTermIndex, long firstAvailableLogIndex) throws IOException { - final boolean recognized = state.recognizeLeader("notifyInstallSnapshot", leaderId, leaderTerm); - final long currentTerm = state.getCurrentTerm(); - if (!recognized) { - return RaftServerImpl.Pair.makePair(toInstallSnapshotReplyProto(leaderId, getMemberId(), - currentTerm, InstallSnapshotResult.NOT_LEADER), CompletableFuture.completedFuture(null)); - } - CompletableFuture future = server.changeToFollowerAndPersistMetadata(leaderTerm, true, "installSnapshot"); - state.setLeader(leaderId, "installSnapshot"); - server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_NOTIFICATION); - - if (inProgressInstallSnapshotIndex.compareAndSet(INVALID_LOG_INDEX, firstAvailableLogIndex)) { - LOG.info("{}: Received notification to install snapshot at index {}", getMemberId(), firstAvailableLogIndex); - // Check if snapshot index is already at par or ahead of the first - // available log index of the Leader. - final long snapshotIndex = state.getLog().getSnapshotIndex(); - if (snapshotIndex != INVALID_LOG_INDEX && snapshotIndex + 1 >= firstAvailableLogIndex && - firstAvailableLogIndex > INVALID_LOG_INDEX) { - // State Machine has already installed the snapshot. Return the - // latest snapshot index to the Leader. - - inProgressInstallSnapshotIndex.compareAndSet(firstAvailableLogIndex, INVALID_LOG_INDEX); - LOG.info("{}: InstallSnapshot notification result: {}, current snapshot index: {}", getMemberId(), - InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex); - return RaftServerImpl.Pair.makePair(toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm, - InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex), future); + synchronized (server) { + final boolean recognized = state.recognizeLeader("notifyInstallSnapshot", leaderId, leaderTerm); + final long currentTerm = state.getCurrentTerm(); + if (!recognized) { + return RaftServerImpl.Pair.makePair(toInstallSnapshotReplyProto(leaderId, getMemberId(), + currentTerm, InstallSnapshotResult.NOT_LEADER), CompletableFuture.completedFuture(null)); } + CompletableFuture future = server.changeToFollowerAndPersistMetadata(leaderTerm, true, "installSnapshot"); + state.setLeader(leaderId, "installSnapshot"); + server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_NOTIFICATION); + + if (inProgressInstallSnapshotIndex.compareAndSet(INVALID_LOG_INDEX, firstAvailableLogIndex)) { + LOG.info("{}: Received notification to install snapshot at index {}", getMemberId(), firstAvailableLogIndex); + // Check if snapshot index is already at par or ahead of the first + // available log index of the Leader. + final long snapshotIndex = state.getLog().getSnapshotIndex(); + if (snapshotIndex != INVALID_LOG_INDEX && snapshotIndex + 1 >= firstAvailableLogIndex && + firstAvailableLogIndex > INVALID_LOG_INDEX) { + // State Machine has already installed the snapshot. Return the + // latest snapshot index to the Leader. + + inProgressInstallSnapshotIndex.compareAndSet(firstAvailableLogIndex, INVALID_LOG_INDEX); + LOG.info("{}: InstallSnapshot notification result: {}, current snapshot index: {}", getMemberId(), + InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex); + return RaftServerImpl.Pair.makePair(toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm, + InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex), future); + } - final RaftPeerProto leaderProto; - if (!request.hasLastRaftConfigurationLogEntryProto()) { - leaderProto = null; - } else { - leaderProto = request.getLastRaftConfigurationLogEntryProto().getConfigurationEntry().getPeersList() - .stream() - .filter(p -> RaftPeerId.valueOf(p.getId()).equals(leaderId)) - .findFirst() - .orElseThrow(() -> new IllegalArgumentException("Leader " + leaderId - + " not found from the last configuration LogEntryProto, request = " + request)); - } + final RaftPeerProto leaderProto; + if (!request.hasLastRaftConfigurationLogEntryProto()) { + leaderProto = null; + } else { + leaderProto = request.getLastRaftConfigurationLogEntryProto().getConfigurationEntry().getPeersList() + .stream() + .filter(p -> RaftPeerId.valueOf(p.getId()).equals(leaderId)) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException("Leader " + leaderId + + " not found from the last configuration LogEntryProto, request = " + request)); + } - // For the cases where RaftConf is empty on newly started peer with empty peer list, - // we retrieve leader info from installSnapShotRequestProto. - final RoleInfoProto proto = leaderProto == null || server.getRaftConf().getPeer(state.getLeaderId()) != null? - server.getRoleInfoProto(): getRoleInfoProto(ProtoUtils.toRaftPeer(leaderProto)); - // This is the first installSnapshot notify request for this term and - // index. Notify the state machine to install the snapshot. - LOG.info("{}: notifyInstallSnapshot: nextIndex is {} but the leader's first available index is {}.", - getMemberId(), state.getLog().getNextIndex(), firstAvailableLogIndex); - // If notifyInstallSnapshotFromLeader future is done asynchronously, the main thread will go through the - // downside part. As the time consumed by user-defined statemachine is uncontrollable(e.g. the RocksDB - // checkpoint could be constantly increasing, the transmission will always exceed one boundary), we expect that - // once snapshot installed, follower could work ASAP. For the rest of time, server can respond snapshot - // installation in progress. - - // There is another appendLog thread appending raft entries, which returns inconsistency entries with - // nextIndex and commitIndex to the leader when install snapshot in progress. The nextIndex on follower side - // is updated when state.reloadStateMachine. We shall keep this index upgraded synchronously with main thread, - // otherwise leader could get this follower's latest nextIndex from appendEntries instead of after - // acknowledging the SNAPSHOT_INSTALLED. - server.getStateMachine().followerEvent().notifyInstallSnapshotFromLeader(proto, firstAvailableLogTermIndex) - .whenComplete((reply, exception) -> { - if (exception != null) { - LOG.error("{}: Failed to notify StateMachine to InstallSnapshot. Exception: {}", - getMemberId(), exception.getMessage()); - inProgressInstallSnapshotIndex.compareAndSet(firstAvailableLogIndex, INVALID_LOG_INDEX); - return; - } - - if (reply != null) { - LOG.info("{}: StateMachine successfully installed snapshot index {}. Reloading the StateMachine.", - getMemberId(), reply.getIndex()); - installedSnapshotTermIndex.set(reply); - } else { - isSnapshotNull.set(true); - if (LOG.isDebugEnabled()) { - LOG.debug("{}: StateMachine could not install snapshot as it is not available", this); + // For the cases where RaftConf is empty on newly started peer with empty peer list, + // we retrieve leader info from installSnapShotRequestProto. + final RoleInfoProto proto = leaderProto == null || server.getRaftConf().getPeer(state.getLeaderId()) != null ? + server.getRoleInfoProto() : getRoleInfoProto(ProtoUtils.toRaftPeer(leaderProto)); + // This is the first installSnapshot notify request for this term and + // index. Notify the state machine to install the snapshot. + LOG.info("{}: notifyInstallSnapshot: nextIndex is {} but the leader's first available index is {}.", + getMemberId(), state.getLog().getNextIndex(), firstAvailableLogIndex); + // If notifyInstallSnapshotFromLeader future is done asynchronously, the main thread will go through the + // downside part. As the time consumed by user-defined statemachine is uncontrollable(e.g. the RocksDB + // checkpoint could be constantly increasing, the transmission will always exceed one boundary), we expect that + // once snapshot installed, follower could work ASAP. For the rest of time, server can respond snapshot + // installation in progress. + + // There is another appendLog thread appending raft entries, which returns inconsistency entries with + // nextIndex and commitIndex to the leader when install snapshot in progress. The nextIndex on follower side + // is updated when state.reloadStateMachine. We shall keep this index upgraded synchronously with main thread, + // otherwise leader could get this follower's latest nextIndex from appendEntries instead of after + // acknowledging the SNAPSHOT_INSTALLED. + server.getStateMachine().followerEvent().notifyInstallSnapshotFromLeader(proto, firstAvailableLogTermIndex) + .whenComplete((reply, exception) -> { + if (exception != null) { + LOG.error("{}: Failed to notify StateMachine to InstallSnapshot. Exception: {}", + getMemberId(), exception.getMessage()); + inProgressInstallSnapshotIndex.compareAndSet(firstAvailableLogIndex, INVALID_LOG_INDEX); + return; } - } - }); - if (LOG.isDebugEnabled()) { - LOG.debug("{}: StateMachine is processing Snapshot Installation Request.", getMemberId()); + if (reply != null) { + LOG.info("{}: StateMachine successfully installed snapshot index {}. Reloading the StateMachine.", + getMemberId(), reply.getIndex()); + installedSnapshotTermIndex.set(reply); + } else { + isSnapshotNull.set(true); + if (LOG.isDebugEnabled()) { + LOG.debug("{}: StateMachine could not install snapshot as it is not available", this); + } + } + }); + + if (LOG.isDebugEnabled()) { + LOG.debug("{}: StateMachine is processing Snapshot Installation Request.", getMemberId()); + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("{}: StateMachine is already installing a snapshot.", getMemberId()); + } } - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("{}: StateMachine is already installing a snapshot.", getMemberId()); + + final long inProgressInstallSnapshotIndexValue = getInProgressInstallSnapshotIndex(); + Preconditions.assertTrue(inProgressInstallSnapshotIndexValue <= firstAvailableLogIndex + && inProgressInstallSnapshotIndexValue > INVALID_LOG_INDEX, + "inProgressInstallSnapshotRequest: %s is not eligible, firstAvailableLogIndex: %s", + getInProgressInstallSnapshotIndex(), firstAvailableLogIndex); + + // If the snapshot is null or unavailable, return SNAPSHOT_UNAVAILABLE. + if (isSnapshotNull.compareAndSet(true, false)) { + LOG.info("{}: InstallSnapshot notification result: {}", getMemberId(), + InstallSnapshotResult.SNAPSHOT_UNAVAILABLE); + inProgressInstallSnapshotIndex.set(INVALID_LOG_INDEX); + server.getStateMachine().event().notifySnapshotInstalled( + InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, INVALID_LOG_INDEX, server.getPeer()); + return RaftServerImpl.Pair.makePair(toInstallSnapshotReplyProto(leaderId, getMemberId(), + currentTerm, InstallSnapshotResult.SNAPSHOT_UNAVAILABLE), future); } - } - final long inProgressInstallSnapshotIndexValue = getInProgressInstallSnapshotIndex(); - Preconditions.assertTrue(inProgressInstallSnapshotIndexValue <= firstAvailableLogIndex - && inProgressInstallSnapshotIndexValue > INVALID_LOG_INDEX, - "inProgressInstallSnapshotRequest: %s is not eligible, firstAvailableLogIndex: %s", - getInProgressInstallSnapshotIndex(), firstAvailableLogIndex); - - // If the snapshot is null or unavailable, return SNAPSHOT_UNAVAILABLE. - if (isSnapshotNull.compareAndSet(true, false)) { - LOG.info("{}: InstallSnapshot notification result: {}", getMemberId(), - InstallSnapshotResult.SNAPSHOT_UNAVAILABLE); - inProgressInstallSnapshotIndex.set(INVALID_LOG_INDEX); - server.getStateMachine().event().notifySnapshotInstalled( - InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, INVALID_LOG_INDEX, server.getPeer()); - return RaftServerImpl.Pair.makePair(toInstallSnapshotReplyProto(leaderId, getMemberId(), - currentTerm, InstallSnapshotResult.SNAPSHOT_UNAVAILABLE), future); - } + // If a snapshot has been installed, return SNAPSHOT_INSTALLED with the installed snapshot index and reset + // installedSnapshotIndex to (0,-1). + final TermIndex latestInstalledSnapshotTermIndex = this.installedSnapshotTermIndex + .getAndSet(INVALID_TERM_INDEX); + if (latestInstalledSnapshotTermIndex.getIndex() > INVALID_LOG_INDEX) { + server.getStateMachine().pause(); + state.reloadStateMachine(latestInstalledSnapshotTermIndex); + LOG.info("{}: InstallSnapshot notification result: {}, at index: {}", getMemberId(), + InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledSnapshotTermIndex); + inProgressInstallSnapshotIndex.set(INVALID_LOG_INDEX); + final long latestInstalledIndex = latestInstalledSnapshotTermIndex.getIndex(); + server.getStateMachine().event().notifySnapshotInstalled( + InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledIndex, server.getPeer()); + installedIndex.set(latestInstalledIndex); + return RaftServerImpl.Pair.makePair(toInstallSnapshotReplyProto(leaderId, getMemberId(), + currentTerm, InstallSnapshotResult.SNAPSHOT_INSTALLED, + latestInstalledSnapshotTermIndex.getIndex()), future); + } - // If a snapshot has been installed, return SNAPSHOT_INSTALLED with the installed snapshot index and reset - // installedSnapshotIndex to (0,-1). - final TermIndex latestInstalledSnapshotTermIndex = this.installedSnapshotTermIndex - .getAndSet(INVALID_TERM_INDEX); - if (latestInstalledSnapshotTermIndex.getIndex() > INVALID_LOG_INDEX) { - server.getStateMachine().pause(); - state.reloadStateMachine(latestInstalledSnapshotTermIndex); - LOG.info("{}: InstallSnapshot notification result: {}, at index: {}", getMemberId(), - InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledSnapshotTermIndex); - inProgressInstallSnapshotIndex.set(INVALID_LOG_INDEX); - final long latestInstalledIndex = latestInstalledSnapshotTermIndex.getIndex(); - server.getStateMachine().event().notifySnapshotInstalled( - InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledIndex, server.getPeer()); - installedIndex.set(latestInstalledIndex); + // Otherwise, Snapshot installation is in progress. + if (LOG.isDebugEnabled()) { + LOG.debug("{}: InstallSnapshot notification result: {}", getMemberId(), + InstallSnapshotResult.IN_PROGRESS); + } return RaftServerImpl.Pair.makePair(toInstallSnapshotReplyProto(leaderId, getMemberId(), - currentTerm, InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledSnapshotTermIndex.getIndex()), future); - } - - // Otherwise, Snapshot installation is in progress. - if (LOG.isDebugEnabled()) { - LOG.debug("{}: InstallSnapshot notification result: {}", getMemberId(), - InstallSnapshotResult.IN_PROGRESS); + currentTerm, InstallSnapshotResult.IN_PROGRESS), future); } - return RaftServerImpl.Pair.makePair(toInstallSnapshotReplyProto(leaderId, getMemberId(), - currentTerm, InstallSnapshotResult.IN_PROGRESS), future); } - private RoleInfoProto getRoleInfoProto(RaftPeer leader) { - final RoleInfo role = server.getRole(); - final Optional fs = role.getFollowerState(); - final ServerRpcProto leaderInfo = toServerRpcProto(leader, - fs.map(FollowerState::getLastRpcTime).map(Timestamp::elapsedTimeMs).orElse(0L)); - final FollowerInfoProto.Builder followerInfo = FollowerInfoProto.newBuilder() - .setLeaderInfo(leaderInfo) - .setOutstandingOp(fs.map(FollowerState::getOutstandingOp).orElse(0)); - return RoleInfoProto.newBuilder() - .setSelf(server.getPeer().getRaftPeerProto()) - .setRole(role.getCurrentRole()) - .setRoleElapsedTimeMs(role.getRoleElapsedTimeMs()) - .setFollowerInfo(followerInfo) - .build(); - } + private RoleInfoProto getRoleInfoProto (RaftPeer leader){ + final RoleInfo role = server.getRole(); + final Optional fs = role.getFollowerState(); + final ServerRpcProto leaderInfo = toServerRpcProto(leader, + fs.map(FollowerState::getLastRpcTime).map(Timestamp::elapsedTimeMs).orElse(0L)); + final FollowerInfoProto.Builder followerInfo = FollowerInfoProto.newBuilder() + .setLeaderInfo(leaderInfo) + .setOutstandingOp(fs.map(FollowerState::getOutstandingOp).orElse(0)); + return RoleInfoProto.newBuilder() + .setSelf(server.getPeer().getRaftPeerProto()) + .setRole(role.getCurrentRole()) + .setRoleElapsedTimeMs(role.getRoleElapsedTimeMs()) + .setFollowerInfo(followerInfo) + .build(); + } } \ No newline at end of file From 60a0c3de27534a6d720beb5b22b2823ada741d66 Mon Sep 17 00:00:00 2001 From: dengchao <2326884052@qq.com> Date: Thu, 17 Oct 2024 10:29:48 +0800 Subject: [PATCH 05/28] exception handle --- .../main/java/org/apache/ratis/server/impl/RaftServerImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index e64c76b09d..8fa899a7ef 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -648,7 +648,7 @@ synchronized CompletableFuture changeToFollowerAndPersistMetadata( } return pair.second; } catch (IOException e) { - pair.second.join(); + CompletableFuture.runAsync(pair.second::join); throw e; } } From 139be1e5e6c2e57be3bcfdd09418e7da51b8a181 Mon Sep 17 00:00:00 2001 From: dengchao <2326884052@qq.com> Date: Thu, 17 Oct 2024 10:34:59 +0800 Subject: [PATCH 06/28] format --- .../impl/SnapshotInstallationHandler.java | 40 +++++++++---------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java index 1e0fa9a767..f4779c70a2 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java @@ -253,13 +253,13 @@ private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot( // available log index of the Leader. final long snapshotIndex = state.getLog().getSnapshotIndex(); if (snapshotIndex != INVALID_LOG_INDEX && snapshotIndex + 1 >= firstAvailableLogIndex && - firstAvailableLogIndex > INVALID_LOG_INDEX) { + firstAvailableLogIndex > INVALID_LOG_INDEX) { // State Machine has already installed the snapshot. Return the // latest snapshot index to the Leader. inProgressInstallSnapshotIndex.compareAndSet(firstAvailableLogIndex, INVALID_LOG_INDEX); LOG.info("{}: InstallSnapshot notification result: {}, current snapshot index: {}", getMemberId(), - InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex); + InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex); return RaftServerImpl.Pair.makePair(toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm, InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex), future); } @@ -269,21 +269,21 @@ private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot( leaderProto = null; } else { leaderProto = request.getLastRaftConfigurationLogEntryProto().getConfigurationEntry().getPeersList() - .stream() - .filter(p -> RaftPeerId.valueOf(p.getId()).equals(leaderId)) - .findFirst() - .orElseThrow(() -> new IllegalArgumentException("Leader " + leaderId - + " not found from the last configuration LogEntryProto, request = " + request)); + .stream() + .filter(p -> RaftPeerId.valueOf(p.getId()).equals(leaderId)) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException("Leader " + leaderId + + " not found from the last configuration LogEntryProto, request = " + request)); } // For the cases where RaftConf is empty on newly started peer with empty peer list, // we retrieve leader info from installSnapShotRequestProto. - final RoleInfoProto proto = leaderProto == null || server.getRaftConf().getPeer(state.getLeaderId()) != null ? - server.getRoleInfoProto() : getRoleInfoProto(ProtoUtils.toRaftPeer(leaderProto)); + final RoleInfoProto proto = leaderProto == null || server.getRaftConf().getPeer(state.getLeaderId()) != null? + server.getRoleInfoProto() : getRoleInfoProto(ProtoUtils.toRaftPeer(leaderProto)); // This is the first installSnapshot notify request for this term and // index. Notify the state machine to install the snapshot. LOG.info("{}: notifyInstallSnapshot: nextIndex is {} but the leader's first available index is {}.", - getMemberId(), state.getLog().getNextIndex(), firstAvailableLogIndex); + getMemberId(), state.getLog().getNextIndex(), firstAvailableLogIndex); // If notifyInstallSnapshotFromLeader future is done asynchronously, the main thread will go through the // downside part. As the time consumed by user-defined statemachine is uncontrollable(e.g. the RocksDB // checkpoint could be constantly increasing, the transmission will always exceed one boundary), we expect that @@ -327,34 +327,34 @@ private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot( final long inProgressInstallSnapshotIndexValue = getInProgressInstallSnapshotIndex(); Preconditions.assertTrue(inProgressInstallSnapshotIndexValue <= firstAvailableLogIndex - && inProgressInstallSnapshotIndexValue > INVALID_LOG_INDEX, - "inProgressInstallSnapshotRequest: %s is not eligible, firstAvailableLogIndex: %s", - getInProgressInstallSnapshotIndex(), firstAvailableLogIndex); + && inProgressInstallSnapshotIndexValue > INVALID_LOG_INDEX, + "inProgressInstallSnapshotRequest: %s is not eligible, firstAvailableLogIndex: %s", + getInProgressInstallSnapshotIndex(), firstAvailableLogIndex); // If the snapshot is null or unavailable, return SNAPSHOT_UNAVAILABLE. if (isSnapshotNull.compareAndSet(true, false)) { LOG.info("{}: InstallSnapshot notification result: {}", getMemberId(), - InstallSnapshotResult.SNAPSHOT_UNAVAILABLE); + InstallSnapshotResult.SNAPSHOT_UNAVAILABLE); inProgressInstallSnapshotIndex.set(INVALID_LOG_INDEX); server.getStateMachine().event().notifySnapshotInstalled( - InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, INVALID_LOG_INDEX, server.getPeer()); + InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, INVALID_LOG_INDEX, server.getPeer()); return RaftServerImpl.Pair.makePair(toInstallSnapshotReplyProto(leaderId, getMemberId(), - currentTerm, InstallSnapshotResult.SNAPSHOT_UNAVAILABLE), future); + currentTerm, InstallSnapshotResult.SNAPSHOT_UNAVAILABLE), future); } // If a snapshot has been installed, return SNAPSHOT_INSTALLED with the installed snapshot index and reset // installedSnapshotIndex to (0,-1). final TermIndex latestInstalledSnapshotTermIndex = this.installedSnapshotTermIndex - .getAndSet(INVALID_TERM_INDEX); + .getAndSet(INVALID_TERM_INDEX); if (latestInstalledSnapshotTermIndex.getIndex() > INVALID_LOG_INDEX) { server.getStateMachine().pause(); state.reloadStateMachine(latestInstalledSnapshotTermIndex); LOG.info("{}: InstallSnapshot notification result: {}, at index: {}", getMemberId(), - InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledSnapshotTermIndex); + InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledSnapshotTermIndex); inProgressInstallSnapshotIndex.set(INVALID_LOG_INDEX); final long latestInstalledIndex = latestInstalledSnapshotTermIndex.getIndex(); server.getStateMachine().event().notifySnapshotInstalled( - InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledIndex, server.getPeer()); + InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledIndex, server.getPeer()); installedIndex.set(latestInstalledIndex); return RaftServerImpl.Pair.makePair(toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm, InstallSnapshotResult.SNAPSHOT_INSTALLED, @@ -367,7 +367,7 @@ private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot( InstallSnapshotResult.IN_PROGRESS); } return RaftServerImpl.Pair.makePair(toInstallSnapshotReplyProto(leaderId, getMemberId(), - currentTerm, InstallSnapshotResult.IN_PROGRESS), future); + currentTerm, InstallSnapshotResult.IN_PROGRESS), future); } } From a7d8a9c87927081ba09a9336e36f1c5835dafc76 Mon Sep 17 00:00:00 2001 From: dengchao <2326884052@qq.com> Date: Thu, 17 Oct 2024 10:54:58 +0800 Subject: [PATCH 07/28] format --- .../impl/SnapshotInstallationHandler.java | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java index f4779c70a2..39ce752d59 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java @@ -364,26 +364,26 @@ private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot( // Otherwise, Snapshot installation is in progress. if (LOG.isDebugEnabled()) { LOG.debug("{}: InstallSnapshot notification result: {}", getMemberId(), - InstallSnapshotResult.IN_PROGRESS); + InstallSnapshotResult.IN_PROGRESS); } return RaftServerImpl.Pair.makePair(toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm, InstallSnapshotResult.IN_PROGRESS), future); } } - private RoleInfoProto getRoleInfoProto (RaftPeer leader){ - final RoleInfo role = server.getRole(); - final Optional fs = role.getFollowerState(); - final ServerRpcProto leaderInfo = toServerRpcProto(leader, - fs.map(FollowerState::getLastRpcTime).map(Timestamp::elapsedTimeMs).orElse(0L)); - final FollowerInfoProto.Builder followerInfo = FollowerInfoProto.newBuilder() - .setLeaderInfo(leaderInfo) - .setOutstandingOp(fs.map(FollowerState::getOutstandingOp).orElse(0)); - return RoleInfoProto.newBuilder() - .setSelf(server.getPeer().getRaftPeerProto()) - .setRole(role.getCurrentRole()) - .setRoleElapsedTimeMs(role.getRoleElapsedTimeMs()) - .setFollowerInfo(followerInfo) - .build(); - } + private RoleInfoProto getRoleInfoProto (RaftPeer leader){ + final RoleInfo role = server.getRole(); + final Optional fs = role.getFollowerState(); + final ServerRpcProto leaderInfo = toServerRpcProto(leader, + fs.map(FollowerState::getLastRpcTime).map(Timestamp::elapsedTimeMs).orElse(0L)); + final FollowerInfoProto.Builder followerInfo = FollowerInfoProto.newBuilder() + .setLeaderInfo(leaderInfo) + .setOutstandingOp(fs.map(FollowerState::getOutstandingOp).orElse(0)); + return RoleInfoProto.newBuilder() + .setSelf(server.getPeer().getRaftPeerProto()) + .setRole(role.getCurrentRole()) + .setRoleElapsedTimeMs(role.getRoleElapsedTimeMs()) + .setFollowerInfo(followerInfo) + .build(); + } } \ No newline at end of file From 5a228b404cbfde08b55e6e50562a529e357472fe Mon Sep 17 00:00:00 2001 From: dengchao <2326884052@qq.com> Date: Thu, 17 Oct 2024 12:38:16 +0800 Subject: [PATCH 08/28] remove pair --- .../ratis/server/impl/FollowerState.java | 10 +- .../ratis/server/impl/LeaderElection.java | 26 +-- .../ratis/server/impl/RaftServerImpl.java | 205 +++++++----------- .../impl/SnapshotInstallationHandler.java | 169 +++++++-------- 4 files changed, 181 insertions(+), 229 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java index 0858caaa58..4f1096cfaf 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java @@ -144,7 +144,6 @@ private boolean roleChangeChecking(TimeDuration electionTimeout) { private void runImpl() { final TimeDuration sleepDeviationThreshold = server.getSleepDeviationThreshold(); - Optional> future = Optional.empty(); while (shouldRun()) { final TimeDuration electionTimeout = server.getRandomElectionTimeout(); try { @@ -158,16 +157,19 @@ private void runImpl() { if (!shouldRun()) { break; } + CompletableFuture future = null; synchronized (server) { if (roleChangeChecking(electionTimeout)) { LOG.info("{}: change to CANDIDATE, lastRpcElapsedTime:{}, electionTimeout:{}", this, lastRpcTime.elapsedTime(), electionTimeout); server.getLeaderElectionMetrics().onLeaderElectionTimeout(); // Update timeout metric counters. // election timeout, should become a candidate - future = Optional.ofNullable(server.changeToCandidate(false)); - break; + future = server.changeToCandidate(false); } } + if (future != null) { + future.join(); + } } catch (InterruptedException e) { LOG.info("{} was interrupted", this); LOG.trace("TRACE", e); @@ -175,8 +177,6 @@ private void runImpl() { return; } catch (Exception e) { LOG.warn("{} caught an exception", this, e); - } finally { - future.ifPresent(CompletableFuture::join); } } } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java index ffd9ac8b07..174a2b81f5 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java @@ -333,39 +333,33 @@ private boolean askForVotes(Phase phase, int round) throws InterruptedException, LOG.info("{} {} round {}: submit vote requests at term {} for {}", this, phase, round, electionTerm, conf); final ResultAndTerm r = submitRequestAndWaitResult(phase, conf, electionTerm); LOG.info("{} {} round {}: result {}", this, phase, round, r); - - RaftServerImpl.Pair> pair = askForVotesImpl(electionTerm, r); - pair.second().join(); - return pair.first(); - } - - private RaftServerImpl.Pair> - askForVotesImpl(long electionTerm, ResultAndTerm r) throws IOException { + CompletableFuture future = null; synchronized (server) { if (!shouldRun(electionTerm)) { - return RaftServerImpl.Pair.makePair(false, - CompletableFuture.completedFuture(null)); // term already passed or this should not run anymore. + return false; // term already passed or this should not run anymore. } switch (r.getResult()) { case PASSED: case SINGLE_MODE_PASSED: - return RaftServerImpl.Pair.makePair(true, CompletableFuture.completedFuture(null)); + return true; case NOT_IN_CONF: case SHUTDOWN: server.close(); server.getStateMachine().event().notifyServerShutdown(server.getRoleInfoProto(), false); - return RaftServerImpl.Pair.makePair(false, CompletableFuture.completedFuture(null)); + return false; case TIMEOUT: - return RaftServerImpl.Pair.makePair(false, CompletableFuture.completedFuture(null)); // should retry + return false; // should retry case REJECTED: case DISCOVERED_A_NEW_TERM: final long term = r.maxTerm(server.getState().getCurrentTerm()); - return RaftServerImpl.Pair.makePair(false, server.changeToFollowerAndPersistMetadata(term, false, r)); - default: - throw new IllegalArgumentException("Unable to process result " + r.result); + future = server.changeToFollowerAndPersistMetadata(term, false, r); + break; + default: throw new IllegalArgumentException("Unable to process result " + r.result); } } + future.join(); + return false; } private int submitRequests(Phase phase, long electionTerm, TermIndex lastEntry, diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 8fa899a7ef..ace54ebc70 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -565,44 +565,14 @@ void setFirstElection(Object reason) { } } - static class Pair { - private final U first; - private final V second; - - Pair(U metadataUpdated, V future) { - this.first = metadataUpdated; - this.second = future; - } - - public static Pair makePair(U first, V second) { - return new Pair<>(first, second); - } - - U first() { - return first; - } - - V second() { - return second; - } - - } - /** * Change the server state to Follower if this server is in a different role or force is true. * @param newTerm The new term. * @param force Force to start a new {@link FollowerState} even if this server is already a follower. * @return if the term/votedFor should be updated to the new term */ - private Pair> changeToFollower( - long newTerm, boolean force, boolean allowListener, Object reason) { - final AtomicReference metadataUpdated = new AtomicReference<>(); - CompletableFuture future = changeToFollowerAsync(newTerm, force, allowListener, reason, metadataUpdated); - return new Pair<>(metadataUpdated.get(), future); - } - - private synchronized CompletableFuture changeToFollowerAsync( - long newTerm, boolean force, boolean allowListener, Object reason, AtomicReference metadataUpdated) { + private synchronized CompletableFuture changeToFollower( + long newTerm, boolean force, boolean allowListener, Object reason, AtomicBoolean metadataUpdated) { final RaftPeerRole old = role.getCurrentRole(); if (old == RaftPeerRole.LISTENER && !allowListener) { throw new IllegalStateException("Unexpected role " + old); @@ -641,16 +611,12 @@ synchronized CompletableFuture changeToFollowerAndPersistMetadata( long newTerm, boolean allowListener, Object reason) throws IOException { - final Pair> pair = changeToFollower(newTerm, false, allowListener, reason); - try { - if (pair.first) { - state.persistMetadata(); - } - return pair.second; - } catch (IOException e) { - CompletableFuture.runAsync(pair.second::join); - throw e; + final AtomicBoolean metadataUpdated = new AtomicBoolean(); + final CompletableFuture future = changeToFollower(newTerm, false, allowListener, reason, metadataUpdated); + if (metadataUpdated.get()) { + state.persistMetadata(); } + return future; } synchronized CompletableFuture changeToLeader() { @@ -1481,48 +1447,46 @@ private RequestVoteReplyProto requestVote(Phase phase, getMemberId(), phase, candidateId, candidateGroupId, candidateTerm, candidateLastEntry); assertLifeCycleState(LifeCycle.States.RUNNING); assertGroup(getMemberId(), candidateId, candidateGroupId); - Pair> replyAndFuture = - requestVoteImpl(phase, candidateId, candidateGroupId, candidateTerm, candidateLastEntry); - replyAndFuture.second.join(); - return replyAndFuture.first; - } - private synchronized Pair> requestVoteImpl(Phase phase, - RaftPeerId candidateId, RaftGroupId candidateGroupId, - long candidateTerm, TermIndex candidateLastEntry) throws IOException { - // Check life cycle state again to avoid the PAUSING/PAUSED state. - assertLifeCycleState(LifeCycle.States.RUNNING); boolean shouldShutdown = false; - final VoteContext context = new VoteContext(this, phase, candidateId); - final RaftPeer candidate = context.recognizeCandidate(candidateTerm); - final boolean voteGranted = context.decideVote(candidate, candidateLastEntry); - Pair> termUpdatedAndFuturePair = null; - if (candidate != null && phase == Phase.ELECTION) { - // change server state in the ELECTION phase - termUpdatedAndFuturePair = - changeToFollower(candidateTerm, true, false, "candidate:" + candidateId); - if (voteGranted) { - state.grantVote(candidate.getId()); + final RequestVoteReplyProto reply; + CompletableFuture future = null; + synchronized (this) { + // Check life cycle state again to avoid the PAUSING/PAUSED state. + assertLifeCycleState(LifeCycle.States.RUNNING); + + final VoteContext context = new VoteContext(this, phase, candidateId); + final RaftPeer candidate = context.recognizeCandidate(candidateTerm); + final boolean voteGranted = context.decideVote(candidate, candidateLastEntry); + if (candidate != null && phase == Phase.ELECTION) { + // change server state in the ELECTION phase + final AtomicBoolean termUpdated = new AtomicBoolean(); + future = changeToFollower(candidateTerm, true, false, "candidate:" + candidateId, termUpdated); + if (voteGranted) { + state.grantVote(candidate.getId()); + } + if (termUpdated.get() || voteGranted) { + state.persistMetadata(); // sync metafile + } } - if (termUpdatedAndFuturePair.first || voteGranted) { - state.persistMetadata(); // sync metafile + if (voteGranted) { + role.getFollowerState().ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.REQUEST_VOTE)); + } else if(shouldSendShutdown(candidateId, candidateLastEntry)) { + shouldShutdown = true; } - } - if (voteGranted) { - role.getFollowerState().ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.REQUEST_VOTE)); - } else if(shouldSendShutdown(candidateId, candidateLastEntry)) { - shouldShutdown = true; - } - RequestVoteReplyProto reply = toRequestVoteReplyProto(candidateId, getMemberId(), + reply = toRequestVoteReplyProto(candidateId, getMemberId(), voteGranted, state.getCurrentTerm(), shouldShutdown); - if (LOG.isInfoEnabled()) { - LOG.info("{} replies to {} vote request: {}. Peer's state: {}", + if (LOG.isInfoEnabled()) { + LOG.info("{} replies to {} vote request: {}. Peer's state: {}", getMemberId(), phase, toRequestVoteReplyString(reply), state); + } + } + if (future != null) { + future.join(); } - return Pair.makePair(reply, termUpdatedAndFuturePair != null - ? termUpdatedAndFuturePair.second - : CompletableFuture.completedFuture(null)); + return reply; } + @Override public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto r) throws IOException { @@ -1621,53 +1585,50 @@ private CompletableFuture appendEntriesAsync(RaftPeerId final long followerCommit = state.getLog().getLastCommittedIndex(); final Optional followerState; final Timekeeper.Context timer = raftServerMetrics.getFollowerAppendEntryTimer(isHeartbeat).time(); - Optional> future = Optional.empty(); - try { - synchronized (this) { - // Check life cycle state again to avoid the PAUSING/PAUSED state. - assertLifeCycleState(LifeCycle.States.STARTING_OR_RUNNING); - currentTerm = state.getCurrentTerm(); - final boolean recognized = state.recognizeLeader(Op.APPEND_ENTRIES, leaderId, leaderTerm); - if (!recognized) { - return CompletableFuture.completedFuture(toAppendEntriesReplyProto( - leaderId, getMemberId(), currentTerm, followerCommit, state.getNextIndex(), - AppendResult.NOT_LEADER, callId, RaftLog.INVALID_LOG_INDEX, isHeartbeat)); - } - try { - future = Optional.ofNullable(changeToFollowerAndPersistMetadata(leaderTerm, true, "appendEntries")); - } catch (IOException e) { - return JavaUtils.completeExceptionally(e); - } - state.setLeader(leaderId, "appendEntries"); - - if (!proto.getInitializing() && lifeCycle.compareAndTransition(State.STARTING, State.RUNNING)) { - role.startFollowerState(this, Op.APPEND_ENTRIES); - } - followerState = updateLastRpcTime(FollowerState.UpdateType.APPEND_START); - - // Check that the append entries are not inconsistent. There are 3 - // scenarios which can result in inconsistency: - // 1. There is a snapshot installation in progress - // 2. There is an overlap between the snapshot index and the entries - // 3. There is a gap between the local log and the entries - // In any of these scenarios, we should return an INCONSISTENCY reply - // back to leader so that the leader can update this follower's next index. - final long inconsistencyReplyNextIndex = checkInconsistentAppendEntries(previous, entries); - if (inconsistencyReplyNextIndex > RaftLog.INVALID_LOG_INDEX) { - final AppendEntriesReplyProto reply = toAppendEntriesReplyProto( - leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextIndex, - AppendResult.INCONSISTENCY, callId, RaftLog.INVALID_LOG_INDEX, isHeartbeat); - LOG.info("{}: appendEntries* reply {}", getMemberId(), toAppendEntriesReplyString(reply)); - followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE)); - return CompletableFuture.completedFuture(reply); - } + final CompletableFuture future; + synchronized (this) { + // Check life cycle state again to avoid the PAUSING/PAUSED state. + assertLifeCycleState(LifeCycle.States.STARTING_OR_RUNNING); + currentTerm = state.getCurrentTerm(); + final boolean recognized = state.recognizeLeader(Op.APPEND_ENTRIES, leaderId, leaderTerm); + if (!recognized) { + return CompletableFuture.completedFuture(toAppendEntriesReplyProto( + leaderId, getMemberId(), currentTerm, followerCommit, state.getNextIndex(), + AppendResult.NOT_LEADER, callId, RaftLog.INVALID_LOG_INDEX, isHeartbeat)); + } + try { + future = changeToFollowerAndPersistMetadata(leaderTerm, true, "appendEntries"); + } catch (IOException e) { + return JavaUtils.completeExceptionally(e); + } + state.setLeader(leaderId, "appendEntries"); - state.updateConfiguration(entries); + if (!proto.getInitializing() && lifeCycle.compareAndTransition(State.STARTING, State.RUNNING)) { + role.startFollowerState(this, Op.APPEND_ENTRIES); + } + followerState = updateLastRpcTime(FollowerState.UpdateType.APPEND_START); + + // Check that the append entries are not inconsistent. There are 3 + // scenarios which can result in inconsistency: + // 1. There is a snapshot installation in progress + // 2. There is an overlap between the snapshot index and the entries + // 3. There is a gap between the local log and the entries + // In any of these scenarios, we should return an INCONSISTENCY reply + // back to leader so that the leader can update this follower's next index. + final long inconsistencyReplyNextIndex = checkInconsistentAppendEntries(previous, entries); + if (inconsistencyReplyNextIndex > RaftLog.INVALID_LOG_INDEX) { + final AppendEntriesReplyProto reply = toAppendEntriesReplyProto( + leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextIndex, + AppendResult.INCONSISTENCY, callId, RaftLog.INVALID_LOG_INDEX, isHeartbeat); + LOG.info("{}: appendEntries* reply {}", getMemberId(), toAppendEntriesReplyString(reply)); + followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE)); + CompletableFuture.runAsync(future::join); + return CompletableFuture.completedFuture(reply); } - } finally { - future.ifPresent(CompletableFuture::join); - } + state.updateConfiguration(entries); + } + future.join(); final List> futures = entries.isEmpty() ? Collections.emptyList() : state.getLog().append(requestRef.delegate(entries)); @@ -1793,7 +1754,7 @@ public StartLeaderElectionReplyProto startLeaderElection(StartLeaderElectionRequ assertLifeCycleState(LifeCycle.States.RUNNING); assertGroup(getMemberId(), leaderId, leaderGroupId); - Optional> future = Optional.empty(); + CompletableFuture future; StartLeaderElectionReplyProto reply; synchronized (this) { // Check life cycle state again to avoid the PAUSING/PAUSED state. @@ -1815,10 +1776,10 @@ public StartLeaderElectionReplyProto startLeaderElection(StartLeaderElectionRequ return toStartLeaderElectionReplyProto(leaderId, getMemberId(), false); } - future = Optional.of(changeToCandidate(true)); + future = changeToCandidate(true); reply = toStartLeaderElectionReplyProto(leaderId, getMemberId(), true); } - future.ifPresent(CompletableFuture::join); + future.join(); return reply; } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java index 39ce752d59..71bb43cee3 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java @@ -164,55 +164,55 @@ private InstallSnapshotReplyProto checkAndInstallSnapshot(InstallSnapshotRequest final InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunkRequest = request.getSnapshotChunk(); final TermIndex lastIncluded = TermIndex.valueOf(snapshotChunkRequest.getTermIndex()); final long lastIncludedIndex = lastIncluded.getIndex(); - Optional> future = Optional.empty(); - try { - synchronized (server) { - final boolean recognized = state.recognizeLeader(RaftServerProtocol.Op.INSTALL_SNAPSHOT, leaderId, leaderTerm); - currentTerm = state.getCurrentTerm(); - if (!recognized) { + CompletableFuture future; + synchronized (server) { + final boolean recognized = state.recognizeLeader(RaftServerProtocol.Op.INSTALL_SNAPSHOT, leaderId, leaderTerm); + currentTerm = state.getCurrentTerm(); + if (!recognized) { + return toInstallSnapshotReplyProto(leaderId, getMemberId(), + currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.NOT_LEADER); + } + future = server.changeToFollowerAndPersistMetadata(leaderTerm, true, "installSnapshot"); + state.setLeader(leaderId, "installSnapshot"); + + server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_START); + if (snapshotChunkRequest.getRequestIndex() == 0) { + nextChunkIndex.set(0); + } else if (nextChunkIndex.get() != snapshotChunkRequest.getRequestIndex()) { + CompletableFuture.runAsync(future::join); + throw new IOException("Snapshot request already failed at chunk index " + nextChunkIndex.get() + + "; ignoring request with chunk index " + snapshotChunkRequest.getRequestIndex()); + } + try { + // Check and append the snapshot chunk. We simply put this in lock + // considering a follower peer requiring a snapshot installation does not + // have a lot of requests + if (state.getLog().getLastCommittedIndex() >= lastIncludedIndex) { + nextChunkIndex.set(snapshotChunkRequest.getRequestIndex() + 1); + CompletableFuture.runAsync(future::join); return toInstallSnapshotReplyProto(leaderId, getMemberId(), - currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.NOT_LEADER); + currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.ALREADY_INSTALLED); } - future = Optional.ofNullable(server.changeToFollowerAndPersistMetadata(leaderTerm, true, "installSnapshot")); - state.setLeader(leaderId, "installSnapshot"); - - server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_START); - if (snapshotChunkRequest.getRequestIndex() == 0) { - nextChunkIndex.set(0); - } else if (nextChunkIndex.get() != snapshotChunkRequest.getRequestIndex()) { - throw new IOException("Snapshot request already failed at chunk index " + nextChunkIndex.get() - + "; ignoring request with chunk index " + snapshotChunkRequest.getRequestIndex()); + + //TODO: We should only update State with installed snapshot once the request is done. + state.installSnapshot(request); + + final int expectedChunkIndex = nextChunkIndex.getAndIncrement(); + if (expectedChunkIndex != snapshotChunkRequest.getRequestIndex()) { + CompletableFuture.runAsync(future::join); + throw new IOException("Unexpected request chunk index: " + snapshotChunkRequest.getRequestIndex() + + " (the expected index is " + expectedChunkIndex + ")"); } - try { - // Check and append the snapshot chunk. We simply put this in lock - // considering a follower peer requiring a snapshot installation does not - // have a lot of requests - if (state.getLog().getLastCommittedIndex() >= lastIncludedIndex) { - nextChunkIndex.set(snapshotChunkRequest.getRequestIndex() + 1); - return toInstallSnapshotReplyProto(leaderId, getMemberId(), - currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.ALREADY_INSTALLED); - } - - //TODO: We should only update State with installed snapshot once the request is done. - state.installSnapshot(request); - - final int expectedChunkIndex = nextChunkIndex.getAndIncrement(); - if (expectedChunkIndex != snapshotChunkRequest.getRequestIndex()) { - throw new IOException("Unexpected request chunk index: " + snapshotChunkRequest.getRequestIndex() - + " (the expected index is " + expectedChunkIndex + ")"); - } - // update the committed index - // re-load the state machine if this is the last chunk - if (snapshotChunkRequest.getDone()) { - state.reloadStateMachine(lastIncluded); - } - } finally { - server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_COMPLETE); + // update the committed index + // re-load the state machine if this is the last chunk + if (snapshotChunkRequest.getDone()) { + state.reloadStateMachine(lastIncluded); } + } finally { + server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_COMPLETE); } - } finally { - future.ifPresent(CompletableFuture::join); } + future.join(); if (snapshotChunkRequest.getDone()) { LOG.info("{}: successfully install the entire snapshot-{}", getMemberId(), lastIncludedIndex); } @@ -220,30 +220,23 @@ private InstallSnapshotReplyProto checkAndInstallSnapshot(InstallSnapshotRequest currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.SUCCESS); } - private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot( + private CompletableFuture notifyStateMachineToInstallSnapshot( InstallSnapshotRequestProto request, RaftPeerId leaderId) throws IOException { + final long currentTerm; final long leaderTerm = request.getLeaderTerm(); final TermIndex firstAvailableLogTermIndex = TermIndex.valueOf( request.getNotification().getFirstAvailableTermIndex()); final long firstAvailableLogIndex = firstAvailableLogTermIndex.getIndex(); - RaftServerImpl.Pair> pair = - notifyStateMachineToInstallSnapshotImpl(request, leaderId, - leaderTerm, firstAvailableLogTermIndex, firstAvailableLogIndex); - pair.second().join(); - return pair.first(); - } - - private RaftServerImpl.Pair> - notifyStateMachineToInstallSnapshotImpl(InstallSnapshotRequestProto request, RaftPeerId leaderId, - long leaderTerm, TermIndex firstAvailableLogTermIndex, long firstAvailableLogIndex) throws IOException { + final CompletableFuture future; + InstallSnapshotReplyProto rep; synchronized (server) { final boolean recognized = state.recognizeLeader("notifyInstallSnapshot", leaderId, leaderTerm); - final long currentTerm = state.getCurrentTerm(); + currentTerm = state.getCurrentTerm(); if (!recognized) { - return RaftServerImpl.Pair.makePair(toInstallSnapshotReplyProto(leaderId, getMemberId(), - currentTerm, InstallSnapshotResult.NOT_LEADER), CompletableFuture.completedFuture(null)); + return CompletableFuture.completedFuture(toInstallSnapshotReplyProto(leaderId, getMemberId(), + currentTerm, InstallSnapshotResult.NOT_LEADER)); } - CompletableFuture future = server.changeToFollowerAndPersistMetadata(leaderTerm, true, "installSnapshot"); + future = server.changeToFollowerAndPersistMetadata(leaderTerm, true, "installSnapshot"); state.setLeader(leaderId, "installSnapshot"); server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_NOTIFICATION); @@ -260,8 +253,9 @@ private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot( inProgressInstallSnapshotIndex.compareAndSet(firstAvailableLogIndex, INVALID_LOG_INDEX); LOG.info("{}: InstallSnapshot notification result: {}, current snapshot index: {}", getMemberId(), InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex); - return RaftServerImpl.Pair.makePair(toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm, - InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex), future); + CompletableFuture.runAsync(future::join); + return CompletableFuture.completedFuture(toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm, + InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex)); } final RaftPeerProto leaderProto; @@ -279,7 +273,7 @@ private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot( // For the cases where RaftConf is empty on newly started peer with empty peer list, // we retrieve leader info from installSnapShotRequestProto. final RoleInfoProto proto = leaderProto == null || server.getRaftConf().getPeer(state.getLeaderId()) != null? - server.getRoleInfoProto() : getRoleInfoProto(ProtoUtils.toRaftPeer(leaderProto)); + server.getRoleInfoProto(): getRoleInfoProto(ProtoUtils.toRaftPeer(leaderProto)); // This is the first installSnapshot notify request for this term and // index. Notify the state machine to install the snapshot. LOG.info("{}: notifyInstallSnapshot: nextIndex is {} but the leader's first available index is {}.", @@ -296,25 +290,26 @@ private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot( // otherwise leader could get this follower's latest nextIndex from appendEntries instead of after // acknowledging the SNAPSHOT_INSTALLED. server.getStateMachine().followerEvent().notifyInstallSnapshotFromLeader(proto, firstAvailableLogTermIndex) - .whenComplete((reply, exception) -> { - if (exception != null) { - LOG.error("{}: Failed to notify StateMachine to InstallSnapshot. Exception: {}", - getMemberId(), exception.getMessage()); - inProgressInstallSnapshotIndex.compareAndSet(firstAvailableLogIndex, INVALID_LOG_INDEX); - return; - } - - if (reply != null) { - LOG.info("{}: StateMachine successfully installed snapshot index {}. Reloading the StateMachine.", - getMemberId(), reply.getIndex()); - installedSnapshotTermIndex.set(reply); - } else { - isSnapshotNull.set(true); - if (LOG.isDebugEnabled()) { - LOG.debug("{}: StateMachine could not install snapshot as it is not available", this); - } + .whenComplete((reply, exception) -> { + if (exception != null) { + LOG.error("{}: Failed to notify StateMachine to InstallSnapshot. Exception: {}", + getMemberId(), exception.getMessage()); + inProgressInstallSnapshotIndex.compareAndSet(firstAvailableLogIndex, INVALID_LOG_INDEX); + CompletableFuture.runAsync(future::join); + return; + } + + if (reply != null) { + LOG.info("{}: StateMachine successfully installed snapshot index {}. Reloading the StateMachine.", + getMemberId(), reply.getIndex()); + installedSnapshotTermIndex.set(reply); + } else { + isSnapshotNull.set(true); + if (LOG.isDebugEnabled()) { + LOG.debug("{}: StateMachine could not install snapshot as it is not available", this); } - }); + } + }); if (LOG.isDebugEnabled()) { LOG.debug("{}: StateMachine is processing Snapshot Installation Request.", getMemberId()); @@ -338,8 +333,9 @@ private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot( inProgressInstallSnapshotIndex.set(INVALID_LOG_INDEX); server.getStateMachine().event().notifySnapshotInstalled( InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, INVALID_LOG_INDEX, server.getPeer()); - return RaftServerImpl.Pair.makePair(toInstallSnapshotReplyProto(leaderId, getMemberId(), - currentTerm, InstallSnapshotResult.SNAPSHOT_UNAVAILABLE), future); + CompletableFuture.runAsync(future::join); + return CompletableFuture.completedFuture(toInstallSnapshotReplyProto(leaderId, getMemberId(), + currentTerm, InstallSnapshotResult.SNAPSHOT_UNAVAILABLE)); } // If a snapshot has been installed, return SNAPSHOT_INSTALLED with the installed snapshot index and reset @@ -356,9 +352,9 @@ private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot( server.getStateMachine().event().notifySnapshotInstalled( InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledIndex, server.getPeer()); installedIndex.set(latestInstalledIndex); - return RaftServerImpl.Pair.makePair(toInstallSnapshotReplyProto(leaderId, getMemberId(), - currentTerm, InstallSnapshotResult.SNAPSHOT_INSTALLED, - latestInstalledSnapshotTermIndex.getIndex()), future); + CompletableFuture.runAsync(future::join); + return CompletableFuture.completedFuture(toInstallSnapshotReplyProto(leaderId, getMemberId(), + currentTerm, InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledSnapshotTermIndex.getIndex())); } // Otherwise, Snapshot installation is in progress. @@ -366,9 +362,10 @@ private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot( LOG.debug("{}: InstallSnapshot notification result: {}", getMemberId(), InstallSnapshotResult.IN_PROGRESS); } - return RaftServerImpl.Pair.makePair(toInstallSnapshotReplyProto(leaderId, getMemberId(), - currentTerm, InstallSnapshotResult.IN_PROGRESS), future); + rep = toInstallSnapshotReplyProto(leaderId, getMemberId(), + currentTerm, InstallSnapshotResult.IN_PROGRESS); } + return future.thenApply(dummy -> rep); } private RoleInfoProto getRoleInfoProto (RaftPeer leader){ From bb4520d75ce265cb88df4072500f44bb4aa2d492 Mon Sep 17 00:00:00 2001 From: dengchao <2326884052@qq.com> Date: Thu, 17 Oct 2024 12:38:35 +0800 Subject: [PATCH 09/28] fix --- .../apache/ratis/server/impl/SnapshotInstallationHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java index 71bb43cee3..2b69d2d68c 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java @@ -128,7 +128,7 @@ private InstallSnapshotReplyProto installSnapshotImpl(InstallSnapshotRequestProt } else { // Leader has only sent a notification to install snapshot. Inform State Machine to install snapshot. if (request.hasNotification()) { - reply = notifyStateMachineToInstallSnapshot(request, leaderId); + reply = notifyStateMachineToInstallSnapshot(request, leaderId).join(); } } From 9123b1ee500f3b14ec8688b085e8c602796c2614 Mon Sep 17 00:00:00 2001 From: dengchao <2326884052@qq.com> Date: Thu, 17 Oct 2024 14:25:25 +0800 Subject: [PATCH 10/28] checkstyle --- .../main/java/org/apache/ratis/server/impl/FollowerState.java | 1 - .../main/java/org/apache/ratis/server/impl/RaftServerImpl.java | 1 - 2 files changed, 2 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java index 4f1096cfaf..d63acaba77 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java @@ -26,7 +26,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index ace54ebc70..eb5314d536 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -133,7 +133,6 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; From 309dd5c8b0c6a4b68c3e40400684a70b240c7703 Mon Sep 17 00:00:00 2001 From: dengchao <2326884052@qq.com> Date: Thu, 17 Oct 2024 22:37:09 +0800 Subject: [PATCH 11/28] catch IOException then async execute future.join --- .../org/apache/ratis/server/impl/RaftServerImpl.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index eb5314d536..3065249e0e 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -612,8 +612,13 @@ synchronized CompletableFuture changeToFollowerAndPersistMetadata( Object reason) throws IOException { final AtomicBoolean metadataUpdated = new AtomicBoolean(); final CompletableFuture future = changeToFollower(newTerm, false, allowListener, reason, metadataUpdated); - if (metadataUpdated.get()) { - state.persistMetadata(); + try { + if (metadataUpdated.get()) { + state.persistMetadata(); + } + } catch (IOException e) { + CompletableFuture.runAsync(future::join); + throw e; } return future; } From f28665a092a2b88c07071b51ba39d8e59e2bf919 Mon Sep 17 00:00:00 2001 From: dengchao <2326884052@qq.com> Date: Fri, 18 Oct 2024 11:56:03 +0800 Subject: [PATCH 12/28] cancel runAsync(future::join) and exception catching in changeToFollowerAndPersist --- .../org/apache/ratis/server/impl/LeaderElection.java | 2 +- .../org/apache/ratis/server/impl/RaftServerImpl.java | 9 ++------- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java index 174a2b81f5..2122147e68 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java @@ -333,7 +333,7 @@ private boolean askForVotes(Phase phase, int round) throws InterruptedException, LOG.info("{} {} round {}: submit vote requests at term {} for {}", this, phase, round, electionTerm, conf); final ResultAndTerm r = submitRequestAndWaitResult(phase, conf, electionTerm); LOG.info("{} {} round {}: result {}", this, phase, round, r); - CompletableFuture future = null; + final CompletableFuture future; synchronized (server) { if (!shouldRun(electionTerm)) { return false; // term already passed or this should not run anymore. diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 3065249e0e..eb5314d536 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -612,13 +612,8 @@ synchronized CompletableFuture changeToFollowerAndPersistMetadata( Object reason) throws IOException { final AtomicBoolean metadataUpdated = new AtomicBoolean(); final CompletableFuture future = changeToFollower(newTerm, false, allowListener, reason, metadataUpdated); - try { - if (metadataUpdated.get()) { - state.persistMetadata(); - } - } catch (IOException e) { - CompletableFuture.runAsync(future::join); - throw e; + if (metadataUpdated.get()) { + state.persistMetadata(); } return future; } From 62e74b6acde702f5f1a4deb06892fd51b761c0d1 Mon Sep 17 00:00:00 2001 From: dengchao <2326884052@qq.com> Date: Fri, 18 Oct 2024 13:15:31 +0800 Subject: [PATCH 13/28] use future.thenApply --- .../ratis/server/impl/RaftServerImpl.java | 3 +- .../impl/SnapshotInstallationHandler.java | 42 +++++++++---------- 2 files changed, 22 insertions(+), 23 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index eb5314d536..dcd14a8bad 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -1621,8 +1621,7 @@ leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextInde AppendResult.INCONSISTENCY, callId, RaftLog.INVALID_LOG_INDEX, isHeartbeat); LOG.info("{}: appendEntries* reply {}", getMemberId(), toAppendEntriesReplyString(reply)); followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE)); - CompletableFuture.runAsync(future::join); - return CompletableFuture.completedFuture(reply); + return future.thenApply(dummy -> reply); } state.updateConfiguration(entries); diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java index 2b69d2d68c..00a9e37013 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java @@ -123,7 +123,7 @@ private InstallSnapshotReplyProto installSnapshotImpl(InstallSnapshotRequestProt if (installSnapshotEnabled) { // Leader has sent InstallSnapshot request with SnapshotInfo. Install the snapshot. if (request.hasSnapshotChunk()) { - reply = checkAndInstallSnapshot(request, leaderId); + reply = checkAndInstallSnapshot(request, leaderId).join(); } } else { // Leader has only sent a notification to install snapshot. Inform State Machine to install snapshot. @@ -157,7 +157,7 @@ private InstallSnapshotReplyProto installSnapshotImpl(InstallSnapshotRequestProt return failedReply; } - private InstallSnapshotReplyProto checkAndInstallSnapshot(InstallSnapshotRequestProto request, + private CompletableFuture checkAndInstallSnapshot(InstallSnapshotRequestProto request, RaftPeerId leaderId) throws IOException { final long currentTerm; final long leaderTerm = request.getLeaderTerm(); @@ -165,12 +165,13 @@ private InstallSnapshotReplyProto checkAndInstallSnapshot(InstallSnapshotRequest final TermIndex lastIncluded = TermIndex.valueOf(snapshotChunkRequest.getTermIndex()); final long lastIncludedIndex = lastIncluded.getIndex(); CompletableFuture future; + InstallSnapshotReplyProto reply; synchronized (server) { final boolean recognized = state.recognizeLeader(RaftServerProtocol.Op.INSTALL_SNAPSHOT, leaderId, leaderTerm); currentTerm = state.getCurrentTerm(); if (!recognized) { - return toInstallSnapshotReplyProto(leaderId, getMemberId(), - currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.NOT_LEADER); + return CompletableFuture.completedFuture(toInstallSnapshotReplyProto(leaderId, getMemberId(), + currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.NOT_LEADER)); } future = server.changeToFollowerAndPersistMetadata(leaderTerm, true, "installSnapshot"); state.setLeader(leaderId, "installSnapshot"); @@ -179,6 +180,7 @@ private InstallSnapshotReplyProto checkAndInstallSnapshot(InstallSnapshotRequest if (snapshotChunkRequest.getRequestIndex() == 0) { nextChunkIndex.set(0); } else if (nextChunkIndex.get() != snapshotChunkRequest.getRequestIndex()) { + // exception new throw, we need join future in another thread CompletableFuture.runAsync(future::join); throw new IOException("Snapshot request already failed at chunk index " + nextChunkIndex.get() + "; ignoring request with chunk index " + snapshotChunkRequest.getRequestIndex()); @@ -189,9 +191,9 @@ private InstallSnapshotReplyProto checkAndInstallSnapshot(InstallSnapshotRequest // have a lot of requests if (state.getLog().getLastCommittedIndex() >= lastIncludedIndex) { nextChunkIndex.set(snapshotChunkRequest.getRequestIndex() + 1); - CompletableFuture.runAsync(future::join); - return toInstallSnapshotReplyProto(leaderId, getMemberId(), + reply = toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.ALREADY_INSTALLED); + return future.thenApply(dummy -> reply); } //TODO: We should only update State with installed snapshot once the request is done. @@ -212,12 +214,11 @@ private InstallSnapshotReplyProto checkAndInstallSnapshot(InstallSnapshotRequest server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_COMPLETE); } } - future.join(); if (snapshotChunkRequest.getDone()) { LOG.info("{}: successfully install the entire snapshot-{}", getMemberId(), lastIncludedIndex); } - return toInstallSnapshotReplyProto(leaderId, getMemberId(), - currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.SUCCESS); + return future.thenApply(dummy -> toInstallSnapshotReplyProto(leaderId, getMemberId(), + currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.SUCCESS)); } private CompletableFuture notifyStateMachineToInstallSnapshot( @@ -253,9 +254,9 @@ private CompletableFuture notifyStateMachineToInstall inProgressInstallSnapshotIndex.compareAndSet(firstAvailableLogIndex, INVALID_LOG_INDEX); LOG.info("{}: InstallSnapshot notification result: {}, current snapshot index: {}", getMemberId(), InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex); - CompletableFuture.runAsync(future::join); - return CompletableFuture.completedFuture(toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm, - InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex)); + final InstallSnapshotReplyProto reply = toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm, + InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex); + return future.thenApply(dummy -> reply); } final RaftPeerProto leaderProto; @@ -295,7 +296,6 @@ private CompletableFuture notifyStateMachineToInstall LOG.error("{}: Failed to notify StateMachine to InstallSnapshot. Exception: {}", getMemberId(), exception.getMessage()); inProgressInstallSnapshotIndex.compareAndSet(firstAvailableLogIndex, INVALID_LOG_INDEX); - CompletableFuture.runAsync(future::join); return; } @@ -333,9 +333,9 @@ private CompletableFuture notifyStateMachineToInstall inProgressInstallSnapshotIndex.set(INVALID_LOG_INDEX); server.getStateMachine().event().notifySnapshotInstalled( InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, INVALID_LOG_INDEX, server.getPeer()); - CompletableFuture.runAsync(future::join); - return CompletableFuture.completedFuture(toInstallSnapshotReplyProto(leaderId, getMemberId(), - currentTerm, InstallSnapshotResult.SNAPSHOT_UNAVAILABLE)); + final InstallSnapshotReplyProto reply = toInstallSnapshotReplyProto(leaderId, getMemberId(), + currentTerm, InstallSnapshotResult.SNAPSHOT_UNAVAILABLE); + return future.thenApply(dummy -> reply); } // If a snapshot has been installed, return SNAPSHOT_INSTALLED with the installed snapshot index and reset @@ -352,9 +352,9 @@ private CompletableFuture notifyStateMachineToInstall server.getStateMachine().event().notifySnapshotInstalled( InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledIndex, server.getPeer()); installedIndex.set(latestInstalledIndex); - CompletableFuture.runAsync(future::join); - return CompletableFuture.completedFuture(toInstallSnapshotReplyProto(leaderId, getMemberId(), - currentTerm, InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledSnapshotTermIndex.getIndex())); + final InstallSnapshotReplyProto reply = toInstallSnapshotReplyProto(leaderId, getMemberId(), + currentTerm, InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledSnapshotTermIndex.getIndex()); + return future.thenApply(dummy -> reply); } // Otherwise, Snapshot installation is in progress. @@ -362,10 +362,10 @@ private CompletableFuture notifyStateMachineToInstall LOG.debug("{}: InstallSnapshot notification result: {}", getMemberId(), InstallSnapshotResult.IN_PROGRESS); } - rep = toInstallSnapshotReplyProto(leaderId, getMemberId(), + final InstallSnapshotReplyProto reply = toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm, InstallSnapshotResult.IN_PROGRESS); + return future.thenApply(dummy -> reply); } - return future.thenApply(dummy -> rep); } private RoleInfoProto getRoleInfoProto (RaftPeer leader){ From ea7ff59936bd56a5e1c365d22bdd76590e2864b9 Mon Sep 17 00:00:00 2001 From: dengchao <2326884052@qq.com> Date: Fri, 18 Oct 2024 14:18:13 +0800 Subject: [PATCH 14/28] Add the final keyword to some temporary variables --- .../impl/SnapshotInstallationHandler.java | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java index 00a9e37013..113a9cffb3 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java @@ -164,8 +164,8 @@ private CompletableFuture checkAndInstallSnapshot(Ins final InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunkRequest = request.getSnapshotChunk(); final TermIndex lastIncluded = TermIndex.valueOf(snapshotChunkRequest.getTermIndex()); final long lastIncludedIndex = lastIncluded.getIndex(); - CompletableFuture future; - InstallSnapshotReplyProto reply; + final CompletableFuture future; + final InstallSnapshotReplyProto reply; synchronized (server) { final boolean recognized = state.recognizeLeader(RaftServerProtocol.Op.INSTALL_SNAPSHOT, leaderId, leaderTerm); currentTerm = state.getCurrentTerm(); @@ -180,7 +180,7 @@ private CompletableFuture checkAndInstallSnapshot(Ins if (snapshotChunkRequest.getRequestIndex() == 0) { nextChunkIndex.set(0); } else if (nextChunkIndex.get() != snapshotChunkRequest.getRequestIndex()) { - // exception new throw, we need join future in another thread + // throw a exception, we need join future in another thread or ignore future CompletableFuture.runAsync(future::join); throw new IOException("Snapshot request already failed at chunk index " + nextChunkIndex.get() + "; ignoring request with chunk index " + snapshotChunkRequest.getRequestIndex()); @@ -229,7 +229,7 @@ private CompletableFuture notifyStateMachineToInstall request.getNotification().getFirstAvailableTermIndex()); final long firstAvailableLogIndex = firstAvailableLogTermIndex.getIndex(); final CompletableFuture future; - InstallSnapshotReplyProto rep; + final InstallSnapshotReplyProto replyProto; synchronized (server) { final boolean recognized = state.recognizeLeader("notifyInstallSnapshot", leaderId, leaderTerm); currentTerm = state.getCurrentTerm(); @@ -254,9 +254,9 @@ private CompletableFuture notifyStateMachineToInstall inProgressInstallSnapshotIndex.compareAndSet(firstAvailableLogIndex, INVALID_LOG_INDEX); LOG.info("{}: InstallSnapshot notification result: {}, current snapshot index: {}", getMemberId(), InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex); - final InstallSnapshotReplyProto reply = toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm, + replyProto = toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm, InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex); - return future.thenApply(dummy -> reply); + return future.thenApply(dummy -> replyProto); } final RaftPeerProto leaderProto; @@ -333,9 +333,9 @@ private CompletableFuture notifyStateMachineToInstall inProgressInstallSnapshotIndex.set(INVALID_LOG_INDEX); server.getStateMachine().event().notifySnapshotInstalled( InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, INVALID_LOG_INDEX, server.getPeer()); - final InstallSnapshotReplyProto reply = toInstallSnapshotReplyProto(leaderId, getMemberId(), + replyProto = toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm, InstallSnapshotResult.SNAPSHOT_UNAVAILABLE); - return future.thenApply(dummy -> reply); + return future.thenApply(dummy -> replyProto); } // If a snapshot has been installed, return SNAPSHOT_INSTALLED with the installed snapshot index and reset @@ -352,9 +352,9 @@ private CompletableFuture notifyStateMachineToInstall server.getStateMachine().event().notifySnapshotInstalled( InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledIndex, server.getPeer()); installedIndex.set(latestInstalledIndex); - final InstallSnapshotReplyProto reply = toInstallSnapshotReplyProto(leaderId, getMemberId(), + replyProto = toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm, InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledSnapshotTermIndex.getIndex()); - return future.thenApply(dummy -> reply); + return future.thenApply(dummy -> replyProto); } // Otherwise, Snapshot installation is in progress. @@ -362,9 +362,9 @@ private CompletableFuture notifyStateMachineToInstall LOG.debug("{}: InstallSnapshot notification result: {}", getMemberId(), InstallSnapshotResult.IN_PROGRESS); } - final InstallSnapshotReplyProto reply = toInstallSnapshotReplyProto(leaderId, getMemberId(), + replyProto = toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm, InstallSnapshotResult.IN_PROGRESS); - return future.thenApply(dummy -> reply); + return future.thenApply(dummy -> replyProto); } } From 3f65d6fee52b5324edfe422eb7b5311d2a55b797 Mon Sep 17 00:00:00 2001 From: dengchao <2326884052@qq.com> Date: Fri, 18 Oct 2024 14:31:50 +0800 Subject: [PATCH 15/28] remove all runAsync --- .../apache/ratis/server/impl/SnapshotInstallationHandler.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java index 113a9cffb3..8623b05fe2 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java @@ -180,8 +180,6 @@ private CompletableFuture checkAndInstallSnapshot(Ins if (snapshotChunkRequest.getRequestIndex() == 0) { nextChunkIndex.set(0); } else if (nextChunkIndex.get() != snapshotChunkRequest.getRequestIndex()) { - // throw a exception, we need join future in another thread or ignore future - CompletableFuture.runAsync(future::join); throw new IOException("Snapshot request already failed at chunk index " + nextChunkIndex.get() + "; ignoring request with chunk index " + snapshotChunkRequest.getRequestIndex()); } @@ -201,7 +199,6 @@ private CompletableFuture checkAndInstallSnapshot(Ins final int expectedChunkIndex = nextChunkIndex.getAndIncrement(); if (expectedChunkIndex != snapshotChunkRequest.getRequestIndex()) { - CompletableFuture.runAsync(future::join); throw new IOException("Unexpected request chunk index: " + snapshotChunkRequest.getRequestIndex() + " (the expected index is " + expectedChunkIndex + ")"); } From 091bce9f8ba64148dff4cf9e2dd2c46806efac3d Mon Sep 17 00:00:00 2001 From: dengchao <2326884052@qq.com> Date: Fri, 18 Oct 2024 14:54:47 +0800 Subject: [PATCH 16/28] fix break is ignored --- .../main/java/org/apache/ratis/server/impl/FollowerState.java | 1 + 1 file changed, 1 insertion(+) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java index d63acaba77..0cffc6c161 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java @@ -169,6 +169,7 @@ private void runImpl() { if (future != null) { future.join(); } + break; } catch (InterruptedException e) { LOG.info("{} was interrupted", this); LOG.trace("TRACE", e); From 33639691f16779df604fe123c7d01ccc4b386138 Mon Sep 17 00:00:00 2001 From: dengchao <2326884052@qq.com> Date: Fri, 18 Oct 2024 14:56:34 +0800 Subject: [PATCH 17/28] fix break is ignored --- .../main/java/org/apache/ratis/server/impl/FollowerState.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java index 0cffc6c161..4e3520a5ba 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java @@ -168,8 +168,8 @@ private void runImpl() { } if (future != null) { future.join(); + break; } - break; } catch (InterruptedException e) { LOG.info("{} was interrupted", this); LOG.trace("TRACE", e); From 8eb3dbab4db8eb97bfccc301e22d6076a6168c5f Mon Sep 17 00:00:00 2001 From: dengchao <2326884052@qq.com> Date: Sat, 19 Oct 2024 00:18:21 +0800 Subject: [PATCH 18/28] join at close --- .../java/org/apache/ratis/server/impl/RaftServerImpl.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index dcd14a8bad..38ae789727 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -518,12 +518,12 @@ public void close() { LOG.warn("{}: Failed to un-register RaftServer JMX bean", getMemberId(), e); } try { - role.shutdownFollowerState(); + role.shutdownFollowerState().join(); } catch (Exception e) { LOG.warn("{}: Failed to shutdown FollowerState", getMemberId(), e); } try{ - role.shutdownLeaderElection(); + role.shutdownLeaderElection().join(); } catch (Exception e) { LOG.warn("{}: Failed to shutdown LeaderElection", getMemberId(), e); } From 756a3053005e63d7c34a9c08aa8ae233fca390bd Mon Sep 17 00:00:00 2001 From: dengchao <2326884052@qq.com> Date: Sat, 19 Oct 2024 00:39:31 +0800 Subject: [PATCH 19/28] Change the way variables are defined --- .../ratis/server/impl/LeaderElection.java | 1 - .../impl/SnapshotInstallationHandler.java | 22 +++++++++---------- 2 files changed, 10 insertions(+), 13 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java index 2122147e68..aed7f4447c 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java @@ -227,7 +227,6 @@ private void startIfNew(Runnable starter) { CompletableFuture shutdown() { lifeCycle.checkStateAndClose(); - stopped.complete(null); return stopped; } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java index 8623b05fe2..2b72501646 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java @@ -165,7 +165,6 @@ private CompletableFuture checkAndInstallSnapshot(Ins final TermIndex lastIncluded = TermIndex.valueOf(snapshotChunkRequest.getTermIndex()); final long lastIncludedIndex = lastIncluded.getIndex(); final CompletableFuture future; - final InstallSnapshotReplyProto reply; synchronized (server) { final boolean recognized = state.recognizeLeader(RaftServerProtocol.Op.INSTALL_SNAPSHOT, leaderId, leaderTerm); currentTerm = state.getCurrentTerm(); @@ -189,7 +188,7 @@ private CompletableFuture checkAndInstallSnapshot(Ins // have a lot of requests if (state.getLog().getLastCommittedIndex() >= lastIncludedIndex) { nextChunkIndex.set(snapshotChunkRequest.getRequestIndex() + 1); - reply = toInstallSnapshotReplyProto(leaderId, getMemberId(), + final InstallSnapshotReplyProto reply = toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.ALREADY_INSTALLED); return future.thenApply(dummy -> reply); } @@ -226,7 +225,6 @@ private CompletableFuture notifyStateMachineToInstall request.getNotification().getFirstAvailableTermIndex()); final long firstAvailableLogIndex = firstAvailableLogTermIndex.getIndex(); final CompletableFuture future; - final InstallSnapshotReplyProto replyProto; synchronized (server) { final boolean recognized = state.recognizeLeader("notifyInstallSnapshot", leaderId, leaderTerm); currentTerm = state.getCurrentTerm(); @@ -251,9 +249,9 @@ private CompletableFuture notifyStateMachineToInstall inProgressInstallSnapshotIndex.compareAndSet(firstAvailableLogIndex, INVALID_LOG_INDEX); LOG.info("{}: InstallSnapshot notification result: {}, current snapshot index: {}", getMemberId(), InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex); - replyProto = toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm, + final InstallSnapshotReplyProto reply = toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm, InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex); - return future.thenApply(dummy -> replyProto); + return future.thenApply(dummy -> reply); } final RaftPeerProto leaderProto; @@ -330,9 +328,9 @@ private CompletableFuture notifyStateMachineToInstall inProgressInstallSnapshotIndex.set(INVALID_LOG_INDEX); server.getStateMachine().event().notifySnapshotInstalled( InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, INVALID_LOG_INDEX, server.getPeer()); - replyProto = toInstallSnapshotReplyProto(leaderId, getMemberId(), + final InstallSnapshotReplyProto reply = toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm, InstallSnapshotResult.SNAPSHOT_UNAVAILABLE); - return future.thenApply(dummy -> replyProto); + return future.thenApply(dummy -> reply); } // If a snapshot has been installed, return SNAPSHOT_INSTALLED with the installed snapshot index and reset @@ -349,9 +347,9 @@ private CompletableFuture notifyStateMachineToInstall server.getStateMachine().event().notifySnapshotInstalled( InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledIndex, server.getPeer()); installedIndex.set(latestInstalledIndex); - replyProto = toInstallSnapshotReplyProto(leaderId, getMemberId(), + final InstallSnapshotReplyProto reply = toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm, InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledSnapshotTermIndex.getIndex()); - return future.thenApply(dummy -> replyProto); + return future.thenApply(dummy -> reply); } // Otherwise, Snapshot installation is in progress. @@ -359,13 +357,13 @@ private CompletableFuture notifyStateMachineToInstall LOG.debug("{}: InstallSnapshot notification result: {}", getMemberId(), InstallSnapshotResult.IN_PROGRESS); } - replyProto = toInstallSnapshotReplyProto(leaderId, getMemberId(), + final InstallSnapshotReplyProto reply = toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm, InstallSnapshotResult.IN_PROGRESS); - return future.thenApply(dummy -> replyProto); + return future.thenApply(dummy -> reply); } } - private RoleInfoProto getRoleInfoProto (RaftPeer leader){ + private RoleInfoProto getRoleInfoProto (RaftPeer leader) { final RoleInfo role = server.getRole(); final Optional fs = role.getFollowerState(); final ServerRpcProto leaderInfo = toServerRpcProto(leader, From d0f00ae0d28b4590ee76c7a833d39431c51708e3 Mon Sep 17 00:00:00 2001 From: dengchao <2326884052@qq.com> Date: Sat, 19 Oct 2024 00:41:34 +0800 Subject: [PATCH 20/28] format --- .../ratis/server/impl/SnapshotInstallationHandler.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java index 2b72501646..e8e6c2e6a2 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java @@ -213,8 +213,9 @@ private CompletableFuture checkAndInstallSnapshot(Ins if (snapshotChunkRequest.getDone()) { LOG.info("{}: successfully install the entire snapshot-{}", getMemberId(), lastIncludedIndex); } - return future.thenApply(dummy -> toInstallSnapshotReplyProto(leaderId, getMemberId(), - currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.SUCCESS)); + final InstallSnapshotReplyProto reply = toInstallSnapshotReplyProto(leaderId, getMemberId(), + currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.SUCCESS); + return future.thenApply(dummy -> reply); } private CompletableFuture notifyStateMachineToInstallSnapshot( From 421583d7c3d87e96db3d4329b4381b987539de72 Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Fri, 18 Oct 2024 13:34:50 -0700 Subject: [PATCH 21/28] Revert whitespace change. --- .../apache/ratis/server/impl/SnapshotInstallationHandler.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java index e8e6c2e6a2..7a95b29bbb 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java @@ -364,7 +364,7 @@ private CompletableFuture notifyStateMachineToInstall } } - private RoleInfoProto getRoleInfoProto (RaftPeer leader) { + private RoleInfoProto getRoleInfoProto(RaftPeer leader) { final RoleInfo role = server.getRole(); final Optional fs = role.getFollowerState(); final ServerRpcProto leaderInfo = toServerRpcProto(leader, @@ -379,4 +379,4 @@ private RoleInfoProto getRoleInfoProto (RaftPeer leader) { .setFollowerInfo(followerInfo) .build(); } -} \ No newline at end of file +} From f8a72fc4c5f706b2cbfd31b687cca64a41878997 Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Fri, 18 Oct 2024 13:40:49 -0700 Subject: [PATCH 22/28] Revert whitespace changes. --- .../main/java/org/apache/ratis/server/impl/LeaderElection.java | 1 + .../main/java/org/apache/ratis/server/impl/RaftServerImpl.java | 1 + .../apache/ratis/server/impl/SnapshotInstallationHandler.java | 2 +- 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java index aed7f4447c..3819fc7dec 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java @@ -332,6 +332,7 @@ private boolean askForVotes(Phase phase, int round) throws InterruptedException, LOG.info("{} {} round {}: submit vote requests at term {} for {}", this, phase, round, electionTerm, conf); final ResultAndTerm r = submitRequestAndWaitResult(phase, conf, electionTerm); LOG.info("{} {} round {}: result {}", this, phase, round, r); + final CompletableFuture future; synchronized (server) { if (!shouldRun(electionTerm)) { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 38ae789727..8de1a9053f 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -1752,6 +1752,7 @@ public StartLeaderElectionReplyProto startLeaderElection(StartLeaderElectionRequ assertLifeCycleState(LifeCycle.States.RUNNING); assertGroup(getMemberId(), leaderId, leaderGroupId); + CompletableFuture future; StartLeaderElectionReplyProto reply; synchronized (this) { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java index 7a95b29bbb..8de9a37569 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java @@ -379,4 +379,4 @@ private RoleInfoProto getRoleInfoProto(RaftPeer leader) { .setFollowerInfo(followerInfo) .build(); } -} +} \ No newline at end of file From 9a50ce89649391d73f844d3bd92a0f4aebc677e5 Mon Sep 17 00:00:00 2001 From: dengchao <2326884052@qq.com> Date: Sat, 19 Oct 2024 12:34:04 +0800 Subject: [PATCH 23/28] revert join at close, followerState.runImpl, LeaderElection.askForVotes/runImpl --- .../java/org/apache/ratis/server/impl/FollowerState.java | 7 +------ .../java/org/apache/ratis/server/impl/LeaderElection.java | 5 ++--- .../java/org/apache/ratis/server/impl/RaftServerImpl.java | 6 +++--- 3 files changed, 6 insertions(+), 12 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java index 4e3520a5ba..a343643982 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java @@ -156,20 +156,15 @@ private void runImpl() { if (!shouldRun()) { break; } - CompletableFuture future = null; synchronized (server) { if (roleChangeChecking(electionTimeout)) { LOG.info("{}: change to CANDIDATE, lastRpcElapsedTime:{}, electionTimeout:{}", this, lastRpcTime.elapsedTime(), electionTimeout); server.getLeaderElectionMetrics().onLeaderElectionTimeout(); // Update timeout metric counters. // election timeout, should become a candidate - future = server.changeToCandidate(false); + server.changeToCandidate(false); } } - if (future != null) { - future.join(); - break; - } } catch (InterruptedException e) { LOG.info("{} was interrupted", this); LOG.trace("TRACE", e); diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java index 3819fc7dec..e0f0a5be8d 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java @@ -255,7 +255,7 @@ private void runImpl() { for (int round = 0; shouldRun(); round++) { if (skipPreVote || askForVotes(Phase.PRE_VOTE, round)) { if (askForVotes(Phase.ELECTION, round)) { - server.changeToLeader().join(); + server.changeToLeader(); } } } @@ -353,12 +353,11 @@ private boolean askForVotes(Phase phase, int round) throws InterruptedException, case REJECTED: case DISCOVERED_A_NEW_TERM: final long term = r.maxTerm(server.getState().getCurrentTerm()); - future = server.changeToFollowerAndPersistMetadata(term, false, r); + server.changeToFollowerAndPersistMetadata(term, false, r); break; default: throw new IllegalArgumentException("Unable to process result " + r.result); } } - future.join(); return false; } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 8de1a9053f..66cb6f2624 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -518,17 +518,17 @@ public void close() { LOG.warn("{}: Failed to un-register RaftServer JMX bean", getMemberId(), e); } try { - role.shutdownFollowerState().join(); + role.shutdownFollowerState(); } catch (Exception e) { LOG.warn("{}: Failed to shutdown FollowerState", getMemberId(), e); } try{ - role.shutdownLeaderElection().join(); + role.shutdownLeaderElection(); } catch (Exception e) { LOG.warn("{}: Failed to shutdown LeaderElection", getMemberId(), e); } try{ - role.shutdownLeaderState(true).join(); + role.shutdownLeaderState(true); } catch (Exception e) { LOG.warn("{}: Failed to shutdown LeaderState monitor", getMemberId(), e); } From 3d051e006af86bffa1756a240ca72ace8ad663ae Mon Sep 17 00:00:00 2001 From: dengchao <2326884052@qq.com> Date: Sun, 20 Oct 2024 17:44:18 +0800 Subject: [PATCH 24/28] fix --- .../main/java/org/apache/ratis/server/impl/FollowerState.java | 1 + 1 file changed, 1 insertion(+) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java index a343643982..1be160f182 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java @@ -163,6 +163,7 @@ private void runImpl() { server.getLeaderElectionMetrics().onLeaderElectionTimeout(); // Update timeout metric counters. // election timeout, should become a candidate server.changeToCandidate(false); + break; } } } catch (InterruptedException e) { From d0cd3de231afbd9f56fdc204fb4a0bc9bf07b873 Mon Sep 17 00:00:00 2001 From: dengchao <2326884052@qq.com> Date: Mon, 21 Oct 2024 17:36:11 +0800 Subject: [PATCH 25/28] add sync for LeaderElection.shouldRun and revert return value in changeToLeader, changeToCandidate --- .../apache/ratis/server/impl/LeaderElection.java | 7 ++++--- .../apache/ratis/server/impl/RaftServerImpl.java | 13 +++++-------- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java index e0f0a5be8d..5b42d53b2d 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java @@ -282,8 +282,10 @@ private void runImpl() { } private boolean shouldRun() { - final DivisionInfo info = server.getInfo(); - return lifeCycle.getCurrentState().isRunning() && info.isCandidate() && info.isAlive(); + synchronized (server) { + final DivisionInfo info = server.getInfo(); + return lifeCycle.getCurrentState().isRunning() && info.isCandidate() && info.isAlive(); + } } private boolean shouldRun(long electionTerm) { @@ -333,7 +335,6 @@ private boolean askForVotes(Phase phase, int round) throws InterruptedException, final ResultAndTerm r = submitRequestAndWaitResult(phase, conf, electionTerm); LOG.info("{} {} round {}: result {}", this, phase, round, r); - final CompletableFuture future; synchronized (server) { if (!shouldRun(electionTerm)) { return false; // term already passed or this should not run anymore. diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 66cb6f2624..daf1416f10 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -618,7 +618,7 @@ synchronized CompletableFuture changeToFollowerAndPersistMetadata( return future; } - synchronized CompletableFuture changeToLeader() { + synchronized void changeToLeader() { Preconditions.assertTrue(getInfo().isCandidate()); CompletableFuture future = role.shutdownLeaderElection(); setRole(RaftPeerRole.LEADER, "changeToLeader"); @@ -627,7 +627,6 @@ synchronized CompletableFuture changeToLeader() { // start sending AppendEntries RPC to followers leader.start(); - return future; } @Override @@ -681,16 +680,15 @@ RoleInfoProto getRoleInfoProto() { return role.buildRoleInfoProto(this); } - synchronized CompletableFuture changeToCandidate(boolean forceStartLeaderElection) { + synchronized void changeToCandidate(boolean forceStartLeaderElection) { Preconditions.assertTrue(getInfo().isFollower()); - CompletableFuture future = role.shutdownFollowerState(); + role.shutdownFollowerState(); setRole(RaftPeerRole.CANDIDATE, "changeToCandidate"); if (state.shouldNotifyExtendedNoLeader()) { stateMachine.followerEvent().notifyExtendedNoLeader(getRoleInfoProto()); } // start election role.startLeaderElection(this, forceStartLeaderElection); - return future; } @Override @@ -1775,10 +1773,9 @@ public StartLeaderElectionReplyProto startLeaderElection(StartLeaderElectionRequ return toStartLeaderElectionReplyProto(leaderId, getMemberId(), false); } - future = changeToCandidate(true); - reply = toStartLeaderElectionReplyProto(leaderId, getMemberId(), true); + changeToCandidate(true); + reply = toStartLeaderElectionReplyProto(leaderId, getMemberId(), true); } - future.join(); return reply; } From da1cc164a84c4f36dd0d3ad9d48ca12c6da91f13 Mon Sep 17 00:00:00 2001 From: dengchao <2326884052@qq.com> Date: Mon, 21 Oct 2024 17:51:04 +0800 Subject: [PATCH 26/28] format --- .../main/java/org/apache/ratis/server/impl/LeaderElection.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java index 5b42d53b2d..2a9ad724c2 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java @@ -355,11 +355,10 @@ private boolean askForVotes(Phase phase, int round) throws InterruptedException, case DISCOVERED_A_NEW_TERM: final long term = r.maxTerm(server.getState().getCurrentTerm()); server.changeToFollowerAndPersistMetadata(term, false, r); - break; + return false; default: throw new IllegalArgumentException("Unable to process result " + r.result); } } - return false; } private int submitRequests(Phase phase, long electionTerm, TermIndex lastEntry, From f9b74f64ad6721fae500bec20cb6b87b39a0c479 Mon Sep 17 00:00:00 2001 From: dengchao <2326884052@qq.com> Date: Mon, 21 Oct 2024 18:17:25 +0800 Subject: [PATCH 27/28] remove useless line --- .../java/org/apache/ratis/server/impl/RaftServerImpl.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index daf1416f10..0acbdc0dec 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -1751,8 +1751,6 @@ public StartLeaderElectionReplyProto startLeaderElection(StartLeaderElectionRequ assertLifeCycleState(LifeCycle.States.RUNNING); assertGroup(getMemberId(), leaderId, leaderGroupId); - CompletableFuture future; - StartLeaderElectionReplyProto reply; synchronized (this) { // Check life cycle state again to avoid the PAUSING/PAUSED state. assertLifeCycleState(LifeCycle.States.STARTING_OR_RUNNING); @@ -1774,9 +1772,8 @@ public StartLeaderElectionReplyProto startLeaderElection(StartLeaderElectionRequ } changeToCandidate(true); - reply = toStartLeaderElectionReplyProto(leaderId, getMemberId(), true); + return toStartLeaderElectionReplyProto(leaderId, getMemberId(), true); } - return reply; } void submitUpdateCommitEvent() { From a7264ef67559a41f597c624207cb879658317d61 Mon Sep 17 00:00:00 2001 From: dengchao <2326884052@qq.com> Date: Tue, 22 Oct 2024 00:52:08 +0800 Subject: [PATCH 28/28] revert and fix spotbugs --- .../java/org/apache/ratis/server/impl/LeaderElection.java | 6 ++---- .../java/org/apache/ratis/server/impl/RaftServerImpl.java | 4 ++-- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java index 2a9ad724c2..39b401dda1 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java @@ -282,10 +282,8 @@ private void runImpl() { } private boolean shouldRun() { - synchronized (server) { - final DivisionInfo info = server.getInfo(); - return lifeCycle.getCurrentState().isRunning() && info.isCandidate() && info.isAlive(); - } + final DivisionInfo info = server.getInfo(); + return lifeCycle.getCurrentState().isRunning() && info.isCandidate() && info.isAlive(); } private boolean shouldRun(long electionTerm) { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 0acbdc0dec..cfcdd1519d 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -528,7 +528,7 @@ public void close() { LOG.warn("{}: Failed to shutdown LeaderElection", getMemberId(), e); } try{ - role.shutdownLeaderState(true); + role.shutdownLeaderState(true).join(); } catch (Exception e) { LOG.warn("{}: Failed to shutdown LeaderState monitor", getMemberId(), e); } @@ -620,7 +620,7 @@ synchronized CompletableFuture changeToFollowerAndPersistMetadata( synchronized void changeToLeader() { Preconditions.assertTrue(getInfo().isCandidate()); - CompletableFuture future = role.shutdownLeaderElection(); + role.shutdownLeaderElection(); setRole(RaftPeerRole.LEADER, "changeToLeader"); final LeaderStateImpl leader = role.updateLeaderState(this); state.becomeLeader();