Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RATIS-2174. Move future.join outside the lock #1168

Merged
merged 29 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
2b0ce8f
format
133tosakarin Oct 15, 2024
d4e072c
checkstyle
133tosakarin Oct 16, 2024
0b5df03
checkstyle
133tosakarin Oct 16, 2024
4c6719f
checkstyle
133tosakarin Oct 17, 2024
60a0c3d
exception handle
133tosakarin Oct 17, 2024
139be1e
format
133tosakarin Oct 17, 2024
a7d8a9c
format
133tosakarin Oct 17, 2024
5a228b4
remove pair
133tosakarin Oct 17, 2024
bb4520d
fix
133tosakarin Oct 17, 2024
9123b1e
checkstyle
133tosakarin Oct 17, 2024
309dd5c
catch IOException then async execute future.join
133tosakarin Oct 17, 2024
f28665a
cancel runAsync(future::join) and exception catching in changeToFollo…
133tosakarin Oct 18, 2024
62e74b6
use future.thenApply
133tosakarin Oct 18, 2024
ea7ff59
Add the final keyword to some temporary variables
133tosakarin Oct 18, 2024
3f65d6f
remove all runAsync
133tosakarin Oct 18, 2024
091bce9
fix break is ignored
133tosakarin Oct 18, 2024
3363969
fix break is ignored
133tosakarin Oct 18, 2024
8eb3dba
join at close
133tosakarin Oct 18, 2024
756a305
Change the way variables are defined
133tosakarin Oct 18, 2024
d0f00ae
format
133tosakarin Oct 18, 2024
421583d
Revert whitespace change.
szetszwo Oct 18, 2024
f8a72fc
Revert whitespace changes.
szetszwo Oct 18, 2024
9a50ce8
revert join at close, followerState.runImpl, LeaderElection.askForVot…
133tosakarin Oct 19, 2024
3d051e0
fix
133tosakarin Oct 20, 2024
a445d49
Merge branch 'master' into move_future_out_of_the_lock
133tosakarin Oct 21, 2024
d0cd3de
add sync for LeaderElection.shouldRun and revert return value in chan…
133tosakarin Oct 21, 2024
da1cc16
format
133tosakarin Oct 21, 2024
f9b74f6
remove useless line
133tosakarin Oct 21, 2024
a7264ef
revert and fix spotbugs
133tosakarin Oct 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -156,16 +156,20 @@ private void runImpl() {
if (!shouldRun()) {
break;
}
CompletableFuture<Void> future = null;
synchronized (server) {
if (roleChangeChecking(electionTimeout)) {
LOG.info("{}: change to CANDIDATE, lastRpcElapsedTime:{}, electionTimeout:{}",
this, lastRpcTime.elapsedTime(), electionTimeout);
server.getLeaderElectionMetrics().onLeaderElectionTimeout(); // Update timeout metric counters.
// election timeout, should become a candidate
server.changeToCandidate(false);
break;
future = server.changeToCandidate(false);
}
}
if (future != null) {
future.join();
break;
}
} catch (InterruptedException e) {
LOG.info("{} was interrupted", this);
LOG.trace("TRACE", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ private void runImpl() {
for (int round = 0; shouldRun(); round++) {
if (skipPreVote || askForVotes(Phase.PRE_VOTE, round)) {
if (askForVotes(Phase.ELECTION, round)) {
server.changeToLeader();
server.changeToLeader().join();
}
}
}
Expand Down Expand Up @@ -333,7 +333,7 @@ private boolean askForVotes(Phase phase, int round) throws InterruptedException,
LOG.info("{} {} round {}: submit vote requests at term {} for {}", this, phase, round, electionTerm, conf);
final ResultAndTerm r = submitRequestAndWaitResult(phase, conf, electionTerm);
LOG.info("{} {} round {}: result {}", this, phase, round, r);

final CompletableFuture<Void> future;
synchronized (server) {
if (!shouldRun(electionTerm)) {
return false; // term already passed or this should not run anymore.
Expand All @@ -353,11 +353,13 @@ private boolean askForVotes(Phase phase, int round) throws InterruptedException,
case REJECTED:
case DISCOVERED_A_NEW_TERM:
final long term = r.maxTerm(server.getState().getCurrentTerm());
server.changeToFollowerAndPersistMetadata(term, false, r);
return false;
future = server.changeToFollowerAndPersistMetadata(term, false, r);
break;
default: throw new IllegalArgumentException("Unable to process result " + r.result);
}
}
future.join();
return false;
}

private int submitRequests(Phase phase, long electionTerm, TermIndex lastEntry,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,7 @@ void submitStepDownEvent(long term, StepDownReason reason) {
private void stepDown(long term, StepDownReason reason) {
try {
lease.getAndSetEnabled(false);
server.changeToFollowerAndPersistMetadata(term, false, reason);
server.changeToFollowerAndPersistMetadata(term, false, reason).join();
pendingStepDown.complete(server::newSuccessReply);
} catch(IOException e) {
final String s = this + ": Failed to persist metadata for term " + term;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -519,12 +518,12 @@ public void close() {
LOG.warn("{}: Failed to un-register RaftServer JMX bean", getMemberId(), e);
}
try {
role.shutdownFollowerState();
role.shutdownFollowerState().join();
} catch (Exception e) {
LOG.warn("{}: Failed to shutdown FollowerState", getMemberId(), e);
}
try{
role.shutdownLeaderElection();
role.shutdownLeaderElection().join();
} catch (Exception e) {
LOG.warn("{}: Failed to shutdown LeaderElection", getMemberId(), e);
}
Expand Down Expand Up @@ -571,14 +570,8 @@ void setFirstElection(Object reason) {
* @param force Force to start a new {@link FollowerState} even if this server is already a follower.
* @return if the term/votedFor should be updated to the new term
*/
private boolean changeToFollower(long newTerm, boolean force, boolean allowListener, Object reason) {
final AtomicReference<Boolean> metadataUpdated = new AtomicReference<>();
changeToFollowerAsync(newTerm, force, allowListener, reason, metadataUpdated).join();
return metadataUpdated.get();
}

private synchronized CompletableFuture<Void> changeToFollowerAsync(
long newTerm, boolean force, boolean allowListener, Object reason, AtomicReference<Boolean> metadataUpdated) {
private synchronized CompletableFuture<Void> changeToFollower(
long newTerm, boolean force, boolean allowListener, Object reason, AtomicBoolean metadataUpdated) {
final RaftPeerRole old = role.getCurrentRole();
if (old == RaftPeerRole.LISTENER && !allowListener) {
throw new IllegalStateException("Unexpected role " + old);
Expand Down Expand Up @@ -613,24 +606,28 @@ private synchronized CompletableFuture<Void> changeToFollowerAsync(
return future;
}

synchronized void changeToFollowerAndPersistMetadata(
synchronized CompletableFuture<Void> changeToFollowerAndPersistMetadata(
long newTerm,
boolean allowListener,
Object reason) throws IOException {
if (changeToFollower(newTerm, false, allowListener, reason)) {
final AtomicBoolean metadataUpdated = new AtomicBoolean();
final CompletableFuture<Void> future = changeToFollower(newTerm, false, allowListener, reason, metadataUpdated);
if (metadataUpdated.get()) {
state.persistMetadata();
}
return future;
}

synchronized void changeToLeader() {
synchronized CompletableFuture<Void> changeToLeader() {
Preconditions.assertTrue(getInfo().isCandidate());
role.shutdownLeaderElection();
CompletableFuture<Void> future = role.shutdownLeaderElection();
setRole(RaftPeerRole.LEADER, "changeToLeader");
final LeaderStateImpl leader = role.updateLeaderState(this);
state.becomeLeader();

// start sending AppendEntries RPC to followers
leader.start();
return future;
}

@Override
Expand Down Expand Up @@ -684,15 +681,16 @@ RoleInfoProto getRoleInfoProto() {
return role.buildRoleInfoProto(this);
}

synchronized void changeToCandidate(boolean forceStartLeaderElection) {
synchronized CompletableFuture<Void> changeToCandidate(boolean forceStartLeaderElection) {
Preconditions.assertTrue(getInfo().isFollower());
role.shutdownFollowerState();
CompletableFuture<Void> future = role.shutdownFollowerState();
setRole(RaftPeerRole.CANDIDATE, "changeToCandidate");
if (state.shouldNotifyExtendedNoLeader()) {
stateMachine.followerEvent().notifyExtendedNoLeader(getRoleInfoProto());
}
// start election
role.startLeaderElection(this, forceStartLeaderElection);
return future;
}

@Override
Expand Down Expand Up @@ -1451,6 +1449,7 @@ private RequestVoteReplyProto requestVote(Phase phase,

boolean shouldShutdown = false;
final RequestVoteReplyProto reply;
CompletableFuture<Void> future = null;
synchronized (this) {
// Check life cycle state again to avoid the PAUSING/PAUSED state.
assertLifeCycleState(LifeCycle.States.RUNNING);
Expand All @@ -1460,12 +1459,12 @@ private RequestVoteReplyProto requestVote(Phase phase,
final boolean voteGranted = context.decideVote(candidate, candidateLastEntry);
if (candidate != null && phase == Phase.ELECTION) {
// change server state in the ELECTION phase
final boolean termUpdated =
changeToFollower(candidateTerm, true, false, "candidate:" + candidateId);
final AtomicBoolean termUpdated = new AtomicBoolean();
future = changeToFollower(candidateTerm, true, false, "candidate:" + candidateId, termUpdated);
if (voteGranted) {
state.grantVote(candidate.getId());
}
if (termUpdated || voteGranted) {
if (termUpdated.get() || voteGranted) {
state.persistMetadata(); // sync metafile
}
}
Expand All @@ -1481,6 +1480,9 @@ private RequestVoteReplyProto requestVote(Phase phase,
getMemberId(), phase, toRequestVoteReplyString(reply), state);
}
}
if (future != null) {
future.join();
}
return reply;
}

Expand Down Expand Up @@ -1582,6 +1584,7 @@ private CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(RaftPeerId
final long followerCommit = state.getLog().getLastCommittedIndex();
final Optional<FollowerState> followerState;
final Timekeeper.Context timer = raftServerMetrics.getFollowerAppendEntryTimer(isHeartbeat).time();
final CompletableFuture<Void> future;
synchronized (this) {
// Check life cycle state again to avoid the PAUSING/PAUSED state.
assertLifeCycleState(LifeCycle.States.STARTING_OR_RUNNING);
Expand All @@ -1593,7 +1596,7 @@ leaderId, getMemberId(), currentTerm, followerCommit, state.getNextIndex(),
AppendResult.NOT_LEADER, callId, RaftLog.INVALID_LOG_INDEX, isHeartbeat));
}
try {
changeToFollowerAndPersistMetadata(leaderTerm, true, "appendEntries");
future = changeToFollowerAndPersistMetadata(leaderTerm, true, "appendEntries");
} catch (IOException e) {
return JavaUtils.completeExceptionally(e);
}
Expand All @@ -1618,12 +1621,12 @@ leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextInde
AppendResult.INCONSISTENCY, callId, RaftLog.INVALID_LOG_INDEX, isHeartbeat);
LOG.info("{}: appendEntries* reply {}", getMemberId(), toAppendEntriesReplyString(reply));
followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE));
return CompletableFuture.completedFuture(reply);
return future.thenApply(dummy -> reply);
}

state.updateConfiguration(entries);
}

future.join();

final List<CompletableFuture<Long>> futures = entries.isEmpty() ? Collections.emptyList()
: state.getLog().append(requestRef.delegate(entries));
Expand Down Expand Up @@ -1749,7 +1752,8 @@ public StartLeaderElectionReplyProto startLeaderElection(StartLeaderElectionRequ

assertLifeCycleState(LifeCycle.States.RUNNING);
assertGroup(getMemberId(), leaderId, leaderGroupId);

CompletableFuture<Void> future;
StartLeaderElectionReplyProto reply;
synchronized (this) {
// Check life cycle state again to avoid the PAUSING/PAUSED state.
assertLifeCycleState(LifeCycle.States.STARTING_OR_RUNNING);
Expand All @@ -1770,9 +1774,11 @@ public StartLeaderElectionReplyProto startLeaderElection(StartLeaderElectionRequ
return toStartLeaderElectionReplyProto(leaderId, getMemberId(), false);
}

changeToCandidate(true);
return toStartLeaderElectionReplyProto(leaderId, getMemberId(), true);
future = changeToCandidate(true);
reply = toStartLeaderElectionReplyProto(leaderId, getMemberId(), true);
}
future.join();
return reply;
}

void submitUpdateCommitEvent() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@

import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -122,12 +123,12 @@ private InstallSnapshotReplyProto installSnapshotImpl(InstallSnapshotRequestProt
if (installSnapshotEnabled) {
// Leader has sent InstallSnapshot request with SnapshotInfo. Install the snapshot.
if (request.hasSnapshotChunk()) {
reply = checkAndInstallSnapshot(request, leaderId);
reply = checkAndInstallSnapshot(request, leaderId).join();
}
} else {
// Leader has only sent a notification to install snapshot. Inform State Machine to install snapshot.
if (request.hasNotification()) {
reply = notifyStateMachineToInstallSnapshot(request, leaderId);
reply = notifyStateMachineToInstallSnapshot(request, leaderId).join();
}
}

Expand Down Expand Up @@ -156,21 +157,23 @@ private InstallSnapshotReplyProto installSnapshotImpl(InstallSnapshotRequestProt
return failedReply;
}

private InstallSnapshotReplyProto checkAndInstallSnapshot(InstallSnapshotRequestProto request,
private CompletableFuture<InstallSnapshotReplyProto> checkAndInstallSnapshot(InstallSnapshotRequestProto request,
RaftPeerId leaderId) throws IOException {
final long currentTerm;
final long leaderTerm = request.getLeaderTerm();
final InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunkRequest = request.getSnapshotChunk();
final TermIndex lastIncluded = TermIndex.valueOf(snapshotChunkRequest.getTermIndex());
final long lastIncludedIndex = lastIncluded.getIndex();
final CompletableFuture<Void> future;
final InstallSnapshotReplyProto reply;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove reply and declare it in the code below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, thanks for your comment

synchronized (server) {
final boolean recognized = state.recognizeLeader(RaftServerProtocol.Op.INSTALL_SNAPSHOT, leaderId, leaderTerm);
currentTerm = state.getCurrentTerm();
if (!recognized) {
return toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.NOT_LEADER);
return CompletableFuture.completedFuture(toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.NOT_LEADER));
}
server.changeToFollowerAndPersistMetadata(leaderTerm, true, "installSnapshot");
future = server.changeToFollowerAndPersistMetadata(leaderTerm, true, "installSnapshot");
state.setLeader(leaderId, "installSnapshot");

server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_START);
Expand All @@ -186,8 +189,9 @@ private InstallSnapshotReplyProto checkAndInstallSnapshot(InstallSnapshotRequest
// have a lot of requests
if (state.getLog().getLastCommittedIndex() >= lastIncludedIndex) {
nextChunkIndex.set(snapshotChunkRequest.getRequestIndex() + 1);
return toInstallSnapshotReplyProto(leaderId, getMemberId(),
reply = toInstallSnapshotReplyProto(leaderId, getMemberId(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final InstallSnapshotReplyProto reply = toInstallSnapshotReplyProto(leaderId, getMemberId(),

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok

currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.ALREADY_INSTALLED);
return future.thenApply(dummy -> reply);
}

//TODO: We should only update State with installed snapshot once the request is done.
Expand All @@ -210,25 +214,27 @@ private InstallSnapshotReplyProto checkAndInstallSnapshot(InstallSnapshotRequest
if (snapshotChunkRequest.getDone()) {
LOG.info("{}: successfully install the entire snapshot-{}", getMemberId(), lastIncludedIndex);
}
return toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.SUCCESS);
return future.thenApply(dummy -> toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.SUCCESS));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Create the reply first since it does not have to wait for the shutdown.

    final InstallSnapshotReplyProto reply = toInstallSnapshotReplyProto(leaderId, getMemberId(),
        currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.SUCCESS);
    return future.thenApply(dummy -> reply);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok

}

private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot(
private CompletableFuture<InstallSnapshotReplyProto> notifyStateMachineToInstallSnapshot(
InstallSnapshotRequestProto request, RaftPeerId leaderId) throws IOException {
final long currentTerm;
final long leaderTerm = request.getLeaderTerm();
final TermIndex firstAvailableLogTermIndex = TermIndex.valueOf(
request.getNotification().getFirstAvailableTermIndex());
final long firstAvailableLogIndex = firstAvailableLogTermIndex.getIndex();
final CompletableFuture<Void> future;
final InstallSnapshotReplyProto replyProto;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to before, move down the declaration, create the reply and return future.thanApply(..).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok

synchronized (server) {
final boolean recognized = state.recognizeLeader("notifyInstallSnapshot", leaderId, leaderTerm);
currentTerm = state.getCurrentTerm();
if (!recognized) {
return toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, InstallSnapshotResult.NOT_LEADER);
return CompletableFuture.completedFuture(toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, InstallSnapshotResult.NOT_LEADER));
}
server.changeToFollowerAndPersistMetadata(leaderTerm, true, "installSnapshot");
future = server.changeToFollowerAndPersistMetadata(leaderTerm, true, "installSnapshot");
state.setLeader(leaderId, "installSnapshot");
server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_NOTIFICATION);

Expand All @@ -245,8 +251,9 @@ private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot(
inProgressInstallSnapshotIndex.compareAndSet(firstAvailableLogIndex, INVALID_LOG_INDEX);
LOG.info("{}: InstallSnapshot notification result: {}, current snapshot index: {}", getMemberId(),
InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex);
return toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm,
replyProto = toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm,
InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex);
return future.thenApply(dummy -> replyProto);
}

final RaftPeerProto leaderProto;
Expand Down Expand Up @@ -323,8 +330,9 @@ private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot(
inProgressInstallSnapshotIndex.set(INVALID_LOG_INDEX);
server.getStateMachine().event().notifySnapshotInstalled(
InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, INVALID_LOG_INDEX, server.getPeer());
return toInstallSnapshotReplyProto(leaderId, getMemberId(),
replyProto = toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, InstallSnapshotResult.SNAPSHOT_UNAVAILABLE);
return future.thenApply(dummy -> replyProto);
}

// If a snapshot has been installed, return SNAPSHOT_INSTALLED with the installed snapshot index and reset
Expand All @@ -341,21 +349,23 @@ private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot(
server.getStateMachine().event().notifySnapshotInstalled(
InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledIndex, server.getPeer());
installedIndex.set(latestInstalledIndex);
return toInstallSnapshotReplyProto(leaderId, getMemberId(),
replyProto = toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledSnapshotTermIndex.getIndex());
return future.thenApply(dummy -> replyProto);
}

// Otherwise, Snapshot installation is in progress.
if (LOG.isDebugEnabled()) {
LOG.debug("{}: InstallSnapshot notification result: {}", getMemberId(),
InstallSnapshotResult.IN_PROGRESS);
}
return toInstallSnapshotReplyProto(leaderId, getMemberId(),
replyProto = toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, InstallSnapshotResult.IN_PROGRESS);
return future.thenApply(dummy -> replyProto);
}
}

private RoleInfoProto getRoleInfoProto(RaftPeer leader) {
private RoleInfoProto getRoleInfoProto (RaftPeer leader){
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert this whitespace change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok

final RoleInfo role = server.getRole();
final Optional<FollowerState> fs = role.getFollowerState();
final ServerRpcProto leaderInfo = toServerRpcProto(leader,
Expand Down