Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RATIS-2162. When closing leaderState, if the logAppender thread sends a snapshot, a deadlock may occur #1154

Merged
merged 10 commits into from
Sep 30, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ public class ConfigurationManager {
* the last entry of the map. Otherwise is initialConf.
*/
@SuppressWarnings({"squid:S3077"}) // Suppress volatile for generic type
private volatile RaftConfigurationImpl currentConf;
private RaftConfigurationImpl currentConf;
/** Cache the peer corresponding to {@link #id}. */
@SuppressWarnings({"squid:S3077"}) // Suppress volatile for generic type
private volatile RaftPeer currentPeer;
private RaftPeer currentPeer;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove also the @SuppressWarnings({"squid:S3077"}) for both fields. It was for the volatile false positive.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok!


ConfigurationManager(RaftPeerId id, RaftConfigurationImpl initialConf) {
this.id = id;
Expand Down Expand Up @@ -78,11 +78,11 @@ private void addRaftConfigurationImpl(long logIndex, RaftConfigurationImpl conf)
}
}

RaftConfigurationImpl getCurrent() {
synchronized RaftConfigurationImpl getCurrent() {
return currentConf;
}

szetszwo marked this conversation as resolved.
Show resolved Hide resolved
RaftPeer getCurrentPeer() {
synchronized RaftPeer getCurrentPeer() {
return currentPeer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.ToIntFunction;
Expand Down Expand Up @@ -62,6 +63,7 @@ int update(AtomicInteger outstanding) {
@SuppressWarnings({"squid:S3077"}) // Suppress volatile for generic type
private volatile Timestamp lastRpcTime = creationTime;
private volatile boolean isRunning = true;
private final CompletableFuture<Void> stopped = new CompletableFuture<>();
private final AtomicInteger outstandingOp = new AtomicInteger();

FollowerState(RaftServerImpl server, Object reason) {
Expand Down Expand Up @@ -93,8 +95,10 @@ boolean isCurrentLeaderValid() {
return lastRpcTime.elapsedTime().compareTo(server.properties().minRpcTimeout()) < 0;
}

void stopRunning() {
CompletableFuture<Void> stopRunning() {
this.isRunning = false;
interrupt();
return stopped;
}

boolean lostMajorityHeartbeatsRecently() {
Expand Down Expand Up @@ -122,6 +126,14 @@ private boolean shouldRun() {

@Override
public void run() {
try {
runImpl();
} finally {
stopped.complete(null);
}
}

private void runImpl() {
final TimeDuration sleepDeviationThreshold = server.getSleepDeviationThreshold();
while (shouldRun()) {
final TimeDuration electionTimeout = server.getRandomElectionTimeout();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -183,6 +184,7 @@ public String toString() {
private final String name;
private final LifeCycle lifeCycle;
private final Daemon daemon;
private final CompletableFuture<Void> stopped = new CompletableFuture<>();

private final RaftServerImpl server;
private final boolean skipPreVote;
Expand Down Expand Up @@ -223,8 +225,9 @@ private void startIfNew(Runnable starter) {
}
}

void shutdown() {
CompletableFuture<Void> shutdown() {
lifeCycle.checkStateAndClose();
return stopped;
}

@VisibleForTesting
Expand All @@ -234,6 +237,14 @@ LifeCycle.State getCurrentState() {

@Override
public void run() {
try {
runImpl();
} finally {
stopped.complete(null);
}
}

private void runImpl() {
if (!lifeCycle.compareAndTransition(STARTING, RUNNING)) {
final LifeCycle.State state = lifeCycle.getCurrentState();
LOG.info("{}: skip running since this is already {}", this, state);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ public Iterator<LogAppender> iterator() {
return senders.iterator();
}

void addAll(Collection<LogAppender> newSenders) {
synchronized void addAll(Collection<LogAppender> newSenders) {
if (newSenders.isEmpty()) {
return;
}
Expand All @@ -219,16 +219,13 @@ void addAll(Collection<LogAppender> newSenders) {
Preconditions.assertTrue(changed);
}

boolean removeAll(Collection<LogAppender> c) {
synchronized boolean removeAll(Collection<LogAppender> c) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need synchronized for both addAll and removeAll since it uses a CopyOnWriteArrayList.

return senders.removeAll(c);
}

CompletableFuture<Void> stopAll() {
final CompletableFuture<?>[] futures = new CompletableFuture<?>[senders.size()];
for(int i = 0; i < futures.length; i++) {
futures[i] = senders.get(i).stopAsync();
}
return CompletableFuture.allOf(futures);
return CompletableFuture.allOf(senders.stream().
map(LogAppender::stopAsync).toArray(CompletableFuture[]::new));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -566,20 +567,23 @@ void setFirstElection(Object reason) {
* @param force Force to start a new {@link FollowerState} even if this server is already a follower.
* @return if the term/votedFor should be updated to the new term
*/
private synchronized boolean changeToFollower(
long newTerm,
boolean force,
boolean allowListener,
Object reason) {
private boolean changeToFollower(long newTerm, boolean force, boolean allowListener, Object reason) {
final AtomicReference<Boolean> metadataUpdated = new AtomicReference<>();
changeToFollowerAsync(newTerm, force, allowListener, reason, metadataUpdated).join();
return metadataUpdated.get();
}

private synchronized CompletableFuture<Void> changeToFollowerAsync(
long newTerm, boolean force, boolean allowListener, Object reason, AtomicReference<Boolean> metadataUpdated) {
final RaftPeerRole old = role.getCurrentRole();
if (old == RaftPeerRole.LISTENER && !allowListener) {
throw new IllegalStateException("Unexpected role " + old);
}
boolean metadataUpdated;
CompletableFuture<Void> future = CompletableFuture.completedFuture(null);
if ((old != RaftPeerRole.FOLLOWER || force) && old != RaftPeerRole.LISTENER) {
setRole(RaftPeerRole.FOLLOWER, reason);
if (old == RaftPeerRole.LEADER) {
role.shutdownLeaderState(false)
future = role.shutdownLeaderState(false)
.exceptionally(e -> {
if (e != null) {
if (!getInfo().isAlive()) {
Expand All @@ -588,21 +592,21 @@ private synchronized boolean changeToFollower(
}
}
throw new CompletionException("Failed to shutdownLeaderState: " + this, e);
})
.join();
});
state.setLeader(null, reason);
} else if (old == RaftPeerRole.CANDIDATE) {
role.shutdownLeaderElection();
future = role.shutdownLeaderElection();
} else if (old == RaftPeerRole.FOLLOWER) {
role.shutdownFollowerState();
future = role.shutdownFollowerState();
}
metadataUpdated = state.updateCurrentTerm(newTerm);

metadataUpdated.set(state.updateCurrentTerm(newTerm));
role.startFollowerState(this, reason);
setFirstElection(reason);
} else {
metadataUpdated = state.updateCurrentTerm(newTerm);
metadataUpdated.set(state.updateCurrentTerm(newTerm));
}
return metadataUpdated;
return future;
}

synchronized void changeToFollowerAndPersistMetadata(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class RoleInfo {
public static final Logger LOG = LoggerFactory.getLogger(RoleInfo.class);

private final RaftPeerId id;
private volatile RaftPeerRole role;
private final AtomicReference<RaftPeerRole> role = new AtomicReference<>();
szetszwo marked this conversation as resolved.
Show resolved Hide resolved
/** Used when the peer is leader */
private final AtomicReference<LeaderStateImpl> leaderState = new AtomicReference<>();
/** Used when the peer is follower, to monitor election timeout */
Expand All @@ -64,7 +64,7 @@ class RoleInfo {
}

void transitionRole(RaftPeerRole newRole) {
this.role = newRole;
this.role.set(newRole);
this.transitionTime.set(Timestamp.currentTime());
}

Expand All @@ -73,7 +73,7 @@ long getRoleElapsedTimeMs() {
}

RaftPeerRole getCurrentRole() {
return role;
return role.get();
}

boolean isLeaderReady() {
Expand Down Expand Up @@ -113,13 +113,13 @@ void startFollowerState(RaftServerImpl server, Object reason) {
updateAndGet(followerState, new FollowerState(server, reason)).start();
}

void shutdownFollowerState() {
CompletableFuture<Void> shutdownFollowerState() {
final FollowerState follower = followerState.getAndSet(null);
if (follower != null) {
LOG.info("{}: shutdown {}", id, follower);
follower.stopRunning();
follower.interrupt();
if (follower == null) {
return CompletableFuture.completedFuture(null);
}
LOG.info("{}: shutdown {}", id, follower);
return follower.stopRunning();
}

void startLeaderElection(RaftServerImpl server, boolean force) {
Expand All @@ -133,13 +133,13 @@ void setLeaderElectionPause(boolean pause) {
pauseLeaderElection.set(pause);
}

void shutdownLeaderElection() {
CompletableFuture<Void> shutdownLeaderElection() {
final LeaderElection election = leaderElection.getAndSet(null);
if (election != null) {
LOG.info("{}: shutdown {}", id, election);
election.shutdown();
// no need to interrupt the election thread
if (election == null) {
return CompletableFuture.completedFuture(null);
}
LOG.info("{}: shutdown {}", id, election);
return election.shutdown();
}

private <T> T updateAndGet(AtomicReference<T> ref, T current) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public void start() {

@Override
public boolean isRunning() {
return daemon.isWorking();
return daemon.isWorking() && server.getInfo().isLeader();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good change!

}

@Override
Expand Down
Loading