Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RATIS-2162. When closing leaderState, if the logAppender thread sends a snapshot, a deadlock may occur #1154

Merged
merged 10 commits into from
Sep 30, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ private void addRaftConfigurationImpl(long logIndex, RaftConfigurationImpl conf)
}
}

RaftConfigurationImpl getCurrent() {
synchronized RaftConfigurationImpl getCurrent() {
return currentConf;
}

szetszwo marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ public Iterator<LogAppender> iterator() {
return senders.iterator();
}

void addAll(Collection<LogAppender> newSenders) {
synchronized void addAll(Collection<LogAppender> newSenders) {
if (newSenders.isEmpty()) {
return;
}
Expand All @@ -219,11 +219,11 @@ void addAll(Collection<LogAppender> newSenders) {
Preconditions.assertTrue(changed);
}

boolean removeAll(Collection<LogAppender> c) {
synchronized boolean removeAll(Collection<LogAppender> c) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need synchronized for both addAll and removeAll since it uses a CopyOnWriteArrayList.

return senders.removeAll(c);
}

CompletableFuture<Void> stopAll() {
synchronized CompletableFuture<Void> stopAll() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it is a CopyOnWriteArrayList, we should use iterator() or stream() instead of get(i), i.e.

    CompletableFuture<Void> stopAll() {
      return CompletableFuture.allOf(senders.stream().map(LogAppender::stopAsync).toArray(CompletableFuture[]::new));
    }

Then, we don't need synchronized for all the methods in SenderList.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review, I will make changes

final CompletableFuture<?>[] futures = new CompletableFuture<?>[senders.size()];
for(int i = 0; i < futures.length; i++) {
futures[i] = senders.get(i).stopAsync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class RoleInfo {
public static final Logger LOG = LoggerFactory.getLogger(RoleInfo.class);

private final RaftPeerId id;
private volatile RaftPeerRole role;
private final AtomicReference<RaftPeerRole> role = new AtomicReference<>();
szetszwo marked this conversation as resolved.
Show resolved Hide resolved
/** Used when the peer is leader */
private final AtomicReference<LeaderStateImpl> leaderState = new AtomicReference<>();
/** Used when the peer is follower, to monitor election timeout */
Expand All @@ -64,7 +64,7 @@ class RoleInfo {
}

void transitionRole(RaftPeerRole newRole) {
this.role = newRole;
this.role.set(newRole);
this.transitionTime.set(Timestamp.currentTime());
}

Expand All @@ -73,7 +73,7 @@ long getRoleElapsedTimeMs() {
}

RaftPeerRole getCurrentRole() {
return role;
return role.get();
}

boolean isLeaderReady() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
* Common states of a raft peer. Protected by RaftServer's lock.
*/
class ServerState {
private final RaftGroupMemberId memberId;
private final AtomicReference<RaftGroupMemberId> memberId = new AtomicReference<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's don't add AtomicReference since memberId is never changed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review, I will make changes

private final RaftServerImpl server;
/** Raft log */
private final MemoizedSupplier<RaftLog> log;
Expand Down Expand Up @@ -99,7 +99,7 @@ class ServerState {

ServerState(RaftPeerId id, RaftGroup group, StateMachine stateMachine, RaftServerImpl server,
RaftStorage.StartupOption option, RaftProperties prop) {
this.memberId = RaftGroupMemberId.valueOf(id, group.getGroupId());
this.memberId.set(RaftGroupMemberId.valueOf(id, group.getGroupId()));
this.server = server;
Collection<RaftPeer> followerPeers = group.getPeers().stream()
.filter(peer -> peer.getStartupRole() == RaftPeerRole.FOLLOWER)
Expand Down Expand Up @@ -151,7 +151,7 @@ void initialize(StateMachine stateMachine) throws IOException {
}

RaftGroupMemberId getMemberId() {
return memberId;
return memberId.get();
}

void writeRaftConfiguration(LogEntryProto conf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,9 @@ public InstallSnapshotRequestProto next() {
}

private InstallSnapshotRequestProto newInstallSnapshotRequest(FileChunkProto chunk, boolean done) {
synchronized (server) {
final SnapshotChunkProto.Builder b = LeaderProtoUtils.toSnapshotChunkProtoBuilder(
requestId, requestIndex++, snapshot.getTermIndex(), chunk, totalSize, done);
return LeaderProtoUtils.toInstallSnapshotRequestProto(server, followerId, b);
}
final SnapshotChunkProto.Builder b = LeaderProtoUtils.toSnapshotChunkProtoBuilder(
requestId, requestIndex++, snapshot.getTermIndex(), chunk, totalSize, done);
return LeaderProtoUtils.toInstallSnapshotRequestProto(server, followerId, b);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep synchronized (server). Otherwise, it could create an inconsistent request.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review, I will make changes

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public void start() {

@Override
public boolean isRunning() {
return daemon.isWorking();
return daemon.isWorking() && server.getInfo().isLeader();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good change!

}

@Override
Expand Down
Loading