Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RATIS-2174. Move future.join outside the lock #1168

Merged
merged 29 commits into from
Oct 21, 2024

Conversation

133tosakarin
Copy link
Contributor

@133tosakarin 133tosakarin commented Oct 15, 2024

What changes were proposed in this pull request?

In recent observations, I found that some functions wait for a future while holding a lock, which is a very dangerous thing.
To avoid the deadlock problem caused by holding the lock and waiting for the future, I am considering moving the future waiting outside the lock.

Currently, I found that calling changeToFollower in the following functions causes the above situation:

  1. RaftServerImpl.appendEntries
  2. RaftServerImpl.RequestVote
  3. checkAndInstallSnapshot

see https://issues.apache.org/jira/browse/RATIS-2174

@133tosakarin 133tosakarin marked this pull request as draft October 15, 2024 12:47
@133tosakarin 133tosakarin force-pushed the move_future_out_of_the_lock branch 2 times, most recently from 2943809 to 2ec2d11 Compare October 15, 2024 13:44
@133tosakarin 133tosakarin marked this pull request as ready for review October 15, 2024 14:02
@133tosakarin 133tosakarin changed the title Demo for moving future.join outside the lock Move future.join outside the lock Oct 15, 2024
@133tosakarin
Copy link
Contributor Author

@szetszwo
please take a look. If you agree to write it this way, I will create an issue in jira.

@133tosakarin 133tosakarin changed the title Move future.join outside the lock RATIS-2174. Move future.join outside the lock Oct 16, 2024
checkStyle

use synchronized(server) instead of synchronized method

format

exception handle
@133tosakarin 133tosakarin force-pushed the move_future_out_of_the_lock branch from bcc128e to 2b0ce8f Compare October 16, 2024 03:08
@133tosakarin
Copy link
Contributor Author

@szetszwo @OneSizeFitsQuorum

Please check the comments below. could you help me refer to which one to use as the final solution?
There are currently two main options:

  1. Move future.join outsize the lock (current commit)
  2. Use wait/notify (Code Block below)
Subject: [PATCH] use wait/notify
---
Index: ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java
--- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java	(revision 62ae6d9f068ce01dd467a6fa83fb74dad9c2c8d8)
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java	(date 1729085480985)
@@ -94,6 +94,9 @@
         logAppender.restart();
       }
       closeFuture.complete(finalState);
+      synchronized (logAppender.getServer()) {
+        logAppender.getServer().notifyAll();
+      }
     }
   }
 
Index: ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java	(revision 62ae6d9f068ce01dd467a6fa83fb74dad9c2c8d8)
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java	(date 1729086757702)
@@ -130,6 +130,9 @@
       runImpl();
     } finally {
       stopped.complete(null);
+      synchronized (server) {
+        server.notifyAll();
+      }
     }
   }
 
Index: ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java	(revision 62ae6d9f068ce01dd467a6fa83fb74dad9c2c8d8)
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java	(date 1729087936568)
@@ -565,16 +565,38 @@
     }
   }
 
+   static class Pair<U, V> {
+    private final U first;
+    private final V second;
+
+    private Pair(U first, V second) {
+      this.first = first;
+      this.second = second;
+    }
+
+    static <U,V> Pair<U, V> makePair(U u,  V v) {
+      return new Pair<>(u, v);
+    }
+
+    U first() {
+      return first;
+    }
+
+    V second() {
+      return second;
+    }
+  }
+
   /**
    * Change the server state to Follower if this server is in a different role or force is true.
    * @param newTerm The new term.
    * @param force Force to start a new {@link FollowerState} even if this server is already a follower.
    * @return if the term/votedFor should be updated to the new term
    */
-  private boolean changeToFollower(long newTerm, boolean force, boolean allowListener, Object reason) {
+  private Pair<Boolean, CompletableFuture<Void>> changeToFollower(long newTerm, boolean force, boolean allowListener, Object reason) {
     final AtomicReference<Boolean> metadataUpdated = new AtomicReference<>();
-    changeToFollowerAsync(newTerm, force, allowListener, reason, metadataUpdated).join();
-    return metadataUpdated.get();
+    CompletableFuture<Void> future = changeToFollowerAsync(newTerm, force, allowListener, reason, metadataUpdated);
+    return Pair.makePair(metadataUpdated.get(), future);
   }
 
   private synchronized CompletableFuture<Void> changeToFollowerAsync(
@@ -617,20 +639,37 @@
       long newTerm,
       boolean allowListener,
       Object reason) throws IOException {
-    if (changeToFollower(newTerm, false, allowListener, reason)) {
-      state.persistMetadata();
+    Pair<Boolean, CompletableFuture<Void>> pair = changeToFollower(newTerm, false, allowListener, reason);
+    try {
+      if (pair.first()) {
+        state.persistMetadata();
+      }
+    } finally {
+      waitFutureAndJoin(pair.second());
     }
   }
 
-  synchronized void changeToLeader() {
+  synchronized void waitFutureAndJoin(CompletableFuture<?> future) {
+    while (!future.isDone()) {
+      try {
+        this.wait();
+      } catch (InterruptedException e) {
+        // ignore
+      }
+    }
+    future.join();
+  }
+  
+  synchronized CompletableFuture<Void> changeToLeader() {
     Preconditions.assertTrue(getInfo().isCandidate());
-    role.shutdownLeaderElection();
+    CompletableFuture<Void> future = role.shutdownLeaderElection();
     setRole(RaftPeerRole.LEADER, "changeToLeader");
     final LeaderStateImpl leader = role.updateLeaderState(this);
     state.becomeLeader();
 
     // start sending AppendEntries RPC to followers
     leader.start();
+    return future;
   }
 
   @Override
@@ -1460,8 +1499,9 @@
       final boolean voteGranted = context.decideVote(candidate, candidateLastEntry);
       if (candidate != null && phase == Phase.ELECTION) {
         // change server state in the ELECTION phase
-        final boolean termUpdated =
-            changeToFollower(candidateTerm, true, false, "candidate:" + candidateId);
+        Pair<Boolean, CompletableFuture<Void>> pair = changeToFollower(candidateTerm, true, false, "candidate:" + candidateId);
+        final boolean termUpdated = pair.first;
+        waitFutureAndJoin(pair.second);
         if (voteGranted) {
           state.grantVote(candidate.getId());
         }
Index: ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java	(revision 62ae6d9f068ce01dd467a6fa83fb74dad9c2c8d8)
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java	(date 1729087936578)
@@ -242,6 +242,9 @@
       runImpl();
     } finally {
       stopped.complete(null);
+      synchronized (server) {
+        server.notifyAll();
+      }
     }
   }
 
@@ -256,7 +259,7 @@
       for (int round = 0; shouldRun(); round++) {
         if (skipPreVote || askForVotes(Phase.PRE_VOTE, round)) {
           if (askForVotes(Phase.ELECTION, round)) {
-            server.changeToLeader();
+            server.changeToLeader().join();
           }
         }
       }

@szetszwo
Copy link
Contributor

@133tosakarin , sorry for the delay. Will review this soon.

Copy link
Contributor

@szetszwo szetszwo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@133tosakarin , thanks a lot for working on this! The idea is great.

  • We should avoid using Optional. Using final is better since the compiler will enforce that the variable must be initialized before use.
  • Let's pass an AtomicBoolean instead of adding the new Pair class. It will have much less code change.

See https://issues.apache.org/jira/secure/attachment/13072253/1168_review.patch

@133tosakarin
Copy link
Contributor Author

@133tosakarin , thanks a lot for working on this! The idea is great.

  • We should avoid using Optional. Using final is better since the compiler will enforce that the variable must be initialized before use.
  • Let's pass an AtomicBoolean instead of adding the new Pair class. It will have much less code change.

See https://issues.apache.org/jira/secure/attachment/13072253/1168_review.patch

Sze, thank you for your advice!

Our current commit is to move future.join out of the lock, but OneSizeFitsQuorum thinks there may be concurrency issues.

What do you think?

@szetszwo
Copy link
Contributor

Suppose we have the following code:

synchronized(server) {
  ...
  final CompletableFuture<Void> future = state.shudown()
  f.join()
}
  • I agree with you that calling join() at the end of synchronized(server) block seems useless since it just waits for the state thread to terminate. If anything needs to be synchronized, it should be done inside the shutdown() method. Of course, the state thread could be synchronized(server) in its run() method. The same synchronization will happen also in the non-shutdown cases. It there is a bug, the non-shutdown cases should also have the same bug.

  • I could see that calling join() at the middle of synchronized(server) may be useful in some cases -- the caller wants to wait of the state thread to complete for some reasons such as getting a final result. However, we are not doing that in our shutdown code.

@133tosakarin
Copy link
Contributor Author

Suppose we have the following code:

synchronized(server) {
  ...
  final CompletableFuture<Void> future = state.shudown()
  f.join()
}
  • I agree with you that calling join() at the end of synchronized(server) block seems useless since it just waits for the state thread to terminate. If anything needs to be synchronized, it should be done inside the shutdown() method. Of course, the state thread could be synchronized(server) in its run() method. The same synchronization will happen also in the non-shutdown cases. It there is a bug, the non-shutdown cases should also have the same bug.
  • I could see that calling join() at the middle of synchronized(server) may be useful in some cases -- the caller wants to wait of the state thread to complete for some reasons such as getting a final result. However, we are not doing that in our shutdown code.

Thank you for your valuable advice! We have reached a consensus

Regarding the question raised by OneSizeFitsQuorum, the concern might be that when thread A waits for the state to shut down outside the synchronized (server) block, it is possible that the state thread could modify some data of the server. When thread A is woken up, since some server data may have been changed by the state thread before its shutdown, the question is whether this could have any subsequent impact on thread A.

OneSizeFitsQuorum recommends writing it as follows:

For Thread A:

synchronized (server) {
    future = state.shutdown();
    while (!future.isDone()) {
        server.wait();
    }
}

For Thread State:

Future<Void> stopped;
...
try {
    run();
} finally {
    stopped.complete();
    synchronized (server) {
        server.notify();
    }
}

In our code, the State thread may be LogAppender, FollowerState, LeaderElection

Copy link
Contributor

@szetszwo szetszwo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@133tosakarin , thanks for the update!

Please don't use CompletableFuture.runAsync(future::join);. We either want to

  • sync: call future.join(), or
  • async: do nothing (the code is already async).

See the details inlined.

@@ -333,7 +333,7 @@ private boolean askForVotes(Phase phase, int round) throws InterruptedException,
LOG.info("{} {} round {}: submit vote requests at term {} for {}", this, phase, round, electionTerm, conf);
final ResultAndTerm r = submitRequestAndWaitResult(phase, conf, electionTerm);
LOG.info("{} {} round {}: result {}", this, phase, round, r);

CompletableFuture<Void> future = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is better to use final CompletableFuture<Void> future;. Then future.join() will never have NPE.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your valuable advice

state.persistMetadata();
}
} catch (IOException e) {
CompletableFuture.runAsync(future::join);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CompletableFuture.runAsync means use another thread to call join(). It is not useful. More details:

  • Without this line, the shutdown thread will run and complete the future.
  • With this line, the shutdown thread will run and complete the future. In addition, we use another thread to wait for the shutdown thread. Why use one more thread?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming we encounter an exception here, we either do not handle the future or need another thread to handle it. If we do not use another thread to handle the future, there is still a deadlock problem of holding the lock and waiting for the future.

The same consideration applies to using runAsync elsewhere

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we remove line 620, how could we have a deadlock? Could you give more details?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replacing CompleteFuture.runAsync(future::join) with future.join still presents the deadlock issue where thread A holds the lock while waiting for the future, and thread B needs to acquire the lock.

If we simply ignore the future and throw an exception without handling it, then no deadlock will occur.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replacing CompleteFuture.runAsync(future::join) with future.join ...

I mean "remove the line", not "replace it with future.join", i.e. case 3 below.

Three cases:

  1. With future.join(), the current thread will wait for the shutdown thread to complete
  2. With CompleteFuture.runAsync(future::join), the current thread and the shutdown thread will run in parallel. In addition, it uses one more thread to wait for the shutdown thread.
  3. With an empty line, the current thread and the shutdown thread will run in parallel.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see.

case 1. We should avoid this situation, which seems to contradict our issue
case 2. This writing style does not seem to be recommended
case 3. It seems that we can only use this

Then we can use the third case

@szetszwo
Copy link
Contributor

  • Version 1
synchronized (server) {
    future = state.shutdown();
    while (!future.isDone()) {
        server.wait();
    }
}
try {
    run();
} finally {
    stopped.complete();
    synchronized (server) {
        server.notify();
    }
}
  • Version 2
synchronized (server) {
    future = state.shutdown();
}
future.join()
try {
    run();
} finally {
    stopped.complete();
}

Version 1 and Version 2 seem to be the same. Could you point out when they will be different?

@133tosakarin
Copy link
Contributor Author

  • Version 1
synchronized (server) {
    future = state.shutdown();
    while (!future.isDone()) {
        server.wait();
    }
}
try {
    run();
} finally {
    stopped.complete();
    synchronized (server) {
        server.notify();
    }
}
  • Version 2
synchronized (server) {
    future = state.shutdown();
}
future.join()
try {
    run();
} finally {
    stopped.complete();
}

Version 1 and Version 2 seem to be the same. Could you point out when they will be different?

For version 1, we need to wait for another thread to wake up before continuing execution.

For version 2, the current thread and the thread that needs to be shut down may execute in parallel.

@szetszwo
Copy link
Contributor

For version 2, the current thread and the thread that needs to be shut down may execute in parallel.

No. future.join() means that it is waiting for stopped to complete.

@133tosakarin
Copy link
Contributor Author

BTW, future.thenApply(..) is not the same as join(). It still returns a un-joined future.

Thank you very much for your explanation.

state.setLeader(leaderId, "installSnapshot");

server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_START);
if (snapshotChunkRequest.getRequestIndex() == 0) {
nextChunkIndex.set(0);
} else if (nextChunkIndex.get() != snapshotChunkRequest.getRequestIndex()) {
// exception new throw, we need join future in another thread
CompletableFuture.runAsync(future::join);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@133tosakarin , we need to remove all CompletableFuture.runAsync(future::join). They are not useful. See #1168 (comment)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I will modify this part of the code.

Copy link
Contributor

@szetszwo szetszwo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@133tosakarin , thanks for the update! Please see the comment below and the comments inlined.

  • In LeaderElection, it should NOT complete stopped in shutdown() since the thread is still running at that time.
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
@@ -227,7 +227,6 @@ class LeaderElection implements Runnable {
 
   CompletableFuture<Void> shutdown() {
     lifeCycle.checkStateAndClose();
-    stopped.complete(null);
     return stopped;
   }

RaftPeerId leaderId) throws IOException {
final long currentTerm;
final long leaderTerm = request.getLeaderTerm();
final InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunkRequest = request.getSnapshotChunk();
final TermIndex lastIncluded = TermIndex.valueOf(snapshotChunkRequest.getTermIndex());
final long lastIncludedIndex = lastIncluded.getIndex();
final CompletableFuture<Void> future;
final InstallSnapshotReplyProto reply;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove reply and declare it in the code below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, thanks for your comment

@@ -186,8 +189,9 @@ private InstallSnapshotReplyProto checkAndInstallSnapshot(InstallSnapshotRequest
// have a lot of requests
if (state.getLog().getLastCommittedIndex() >= lastIncludedIndex) {
nextChunkIndex.set(snapshotChunkRequest.getRequestIndex() + 1);
return toInstallSnapshotReplyProto(leaderId, getMemberId(),
reply = toInstallSnapshotReplyProto(leaderId, getMemberId(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final InstallSnapshotReplyProto reply = toInstallSnapshotReplyProto(leaderId, getMemberId(),

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok

Comment on lines 217 to 218
return future.thenApply(dummy -> toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.SUCCESS));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Create the reply first since it does not have to wait for the shutdown.

    final InstallSnapshotReplyProto reply = toInstallSnapshotReplyProto(leaderId, getMemberId(),
        currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.SUCCESS);
    return future.thenApply(dummy -> reply);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok

InstallSnapshotRequestProto request, RaftPeerId leaderId) throws IOException {
final long currentTerm;
final long leaderTerm = request.getLeaderTerm();
final TermIndex firstAvailableLogTermIndex = TermIndex.valueOf(
request.getNotification().getFirstAvailableTermIndex());
final long firstAvailableLogIndex = firstAvailableLogTermIndex.getIndex();
final CompletableFuture<Void> future;
final InstallSnapshotReplyProto replyProto;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to before, move down the declaration, create the reply and return future.thanApply(..).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok

}
}

private RoleInfoProto getRoleInfoProto(RaftPeer leader) {
private RoleInfoProto getRoleInfoProto (RaftPeer leader){
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert this whitespace change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok

Copy link
Contributor

@szetszwo szetszwo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 the change looks good

(Used the web editor to revert a whitespace change but it did not go well. Then, edited it locally and pushed another whitespace reverts.)

@133tosakarin
Copy link
Contributor Author

Remove join in FollowerState and LeaderElection.

For FollowerState, changing to Candidate will close FollowerState and then return to FollowerState.stopped, which is equivalent to waiting for oneself to close.

LeaderElection is similar to FollowerState. When changed to Leader, LeaderElection will be closed and then returned to LeaderElection.stop.

@133tosakarin
Copy link
Contributor Author

There seems to be a strange error: the address has already been used

@szetszwo
Copy link
Contributor

There seems to be a strange error: the address has already been used

It happens occasionally. It may be a test problem (or library, OS bugs.) Not sure if there is a way to prevent it.

Let's simply rerun the tests for now.

@133tosakarin
Copy link
Contributor Author

#[d0cd3de]

@133tosakarin
Copy link
Contributor Author

133tosakarin commented Oct 21, 2024

In this commit [d0cd3de ] , the following modifications are mainly made

  1. Since Follower and Candidate close themselves, calling future.join at this time will deadlock, so cancel the corresponding return or ignore it.
  2. There are still some tests with LeakSize > 0, I'm not sure if this is related to the current modification.

Copy link
Contributor

@szetszwo szetszwo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@133tosakarin , good job! The latest change can pass everything at the first run.

@szetszwo szetszwo merged commit d08ba81 into apache:master Oct 21, 2024
12 checks passed
OneSizeFitsQuorum pushed a commit to OneSizeFitsQuorum/ratis that referenced this pull request Nov 1, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants